Skip to content

Commit fd00668

Browse files
committed
Rework an observation for Rabbit Binder
The observation propagation doesn't work in multi-binder configuration * Remove `ObservationAutoConfiguration` since it is not visible in case of multi-binder configuration * Instead move `setObservationEnabled` flag setting to the `RabbitMessageChannelBinder` * Add `io.micrometer.observation.ObservationRegistry` into `shared.beans` to make it visible for binder-specific application context * Add `RabbitMultiBinderObservationTests` integration test where Rabbit Binder is in multi-binder environment As a side effect this fixes an observation propagation for Kafka binder as well in the multi-binder environment. Its configuration is OK, but an `ObservationRegistry` must make it visible for the binder-specific application context. See the mentioned `shared.beans` Related to spring-cloud#2901 Also see spring-cloud#2902 for possible evolution
1 parent d620ff9 commit fd00668

File tree

8 files changed

+196
-70
lines changed

8 files changed

+196
-70
lines changed

binders/rabbit-binder/spring-cloud-stream-binder-rabbit/pom.xml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,38 @@
9999
<version>1.17.1</version>
100100
<scope>test</scope>
101101
</dependency>
102+
<dependency>
103+
<groupId>io.micrometer</groupId>
104+
<artifactId>micrometer-tracing-bridge-brave</artifactId>
105+
<scope>test</scope>
106+
</dependency>
107+
<dependency>
108+
<groupId>io.micrometer</groupId>
109+
<artifactId>micrometer-tracing-integration-test</artifactId>
110+
<scope>test</scope>
111+
<exclusions>
112+
<exclusion>
113+
<groupId>io.opentelemetry</groupId>
114+
<artifactId>*</artifactId>
115+
</exclusion>
116+
<exclusion>
117+
<groupId>com.wavefront</groupId>
118+
<artifactId>*</artifactId>
119+
</exclusion>
120+
<exclusion>
121+
<groupId>io.zipkin.reporter2</groupId>
122+
<artifactId>*</artifactId>
123+
</exclusion>
124+
<exclusion>
125+
<groupId>io.micrometer</groupId>
126+
<artifactId>micrometer-tracing-bridge-otel</artifactId>
127+
</exclusion>
128+
<exclusion>
129+
<groupId>io.micrometer</groupId>
130+
<artifactId>micrometer-tracing-reporter-wavefront</artifactId>
131+
</exclusion>
132+
</exclusions>
133+
</dependency>
102134
<dependency>
103135
<!-- TODO: upgrade to httpclient 5 -->
104136
<groupId>org.apache.httpcomponents</groupId>

binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.rabbitmq.client.AMQP;
3333
import com.rabbitmq.client.Channel;
3434
import com.rabbitmq.client.Envelope;
35+
import io.micrometer.observation.ObservationRegistry;
3536
import jakarta.validation.constraints.NotNull;
3637

3738
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
@@ -54,6 +55,7 @@
5455
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
5556
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
5657
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
58+
import org.springframework.amqp.rabbit.listener.ObservableListenerContainer;
5759
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
5860
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
5961
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
@@ -87,6 +89,7 @@
8789
import org.springframework.cloud.stream.config.MessageSourceCustomizer;
8890
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
8991
import org.springframework.cloud.stream.provisioning.ProducerDestination;
92+
import org.springframework.context.support.AbstractApplicationContext;
9093
import org.springframework.context.support.GenericApplicationContext;
9194
import org.springframework.core.task.SimpleAsyncTaskExecutor;
9295
import org.springframework.expression.Expression;
@@ -500,7 +503,7 @@ protected MessageProducer createConsumerEndpoint(
500503
"the RabbitMQ binder does not support embedded headers since RabbitMQ supports headers natively");
501504
String destination = consumerDestination.getName();
502505
RabbitConsumerProperties extension = properties.getExtension();
503-
MessageListenerContainer listenerContainer = createAndConfigureContainer(consumerDestination, group,
506+
ObservableListenerContainer listenerContainer = createAndConfigureContainer(consumerDestination, group,
504507
properties, destination, extension);
505508
String[] queues = StringUtils.tokenizeToStringArray(destination, ",", true, true);
506509
if (properties.getExtension().getContainerType() != ContainerType.STREAM
@@ -509,6 +512,10 @@ protected MessageProducer createConsumerEndpoint(
509512
}
510513
getContainerCustomizer().configure(listenerContainer,
511514
consumerDestination.getName(), group);
515+
// TODO until https://github.com/spring-cloud/spring-cloud-stream/issues/2902
516+
getApplicationContext().getBeanProvider(ObservationRegistry.class)
517+
.ifAvailable((observationRegistry) -> listenerContainer.setObservationEnabled(true));
518+
512519
listenerContainer.afterPropertiesSet();
513520

514521
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
@@ -540,7 +547,7 @@ protected MessageProducer createConsumerEndpoint(
540547
return adapter;
541548
}
542549

543-
private MessageListenerContainer createAndConfigureContainer(ConsumerDestination consumerDestination,
550+
private ObservableListenerContainer createAndConfigureContainer(ConsumerDestination consumerDestination,
544551
String group, ExtendedConsumerProperties<RabbitConsumerProperties> properties, String destination,
545552
RabbitConsumerProperties extension) {
546553

@@ -597,6 +604,7 @@ else if (getApplicationContext() != null) {
597604
q -> extension.getConsumerTagPrefix() + "#"
598605
+ index.getAndIncrement());
599606
}
607+
listenerContainer.setApplicationContext(getApplicationContext());
600608
return listenerContainer;
601609
}
602610

@@ -1048,6 +1056,11 @@ private RabbitTemplate buildRabbitTemplate(RabbitProducerProperties properties,
10481056
retryTemplate.setBackOffPolicy(backOff);
10491057
rabbitTemplate.setRetryTemplate(retryTemplate);
10501058
}
1059+
// TODO until https://github.com/spring-cloud/spring-cloud-stream/issues/2902
1060+
AbstractApplicationContext applicationContext = getApplicationContext();
1061+
applicationContext.getBeanProvider(ObservationRegistry.class)
1062+
.ifAvailable((observationRegistry) -> rabbitTemplate.setObservationEnabled(true));
1063+
rabbitTemplate.setApplicationContext(applicationContext);
10511064
rabbitTemplate.afterPropertiesSet();
10521065
return rabbitTemplate;
10531066
}

binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/StreamUtils.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import com.rabbitmq.stream.Environment;
2323

2424
import org.springframework.amqp.core.MessageProperties;
25-
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
25+
import org.springframework.amqp.rabbit.listener.ObservableListenerContainer;
2626
import org.springframework.amqp.support.converter.MessageConverter;
2727
import org.springframework.cloud.stream.binder.BinderHeaders;
2828
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
@@ -52,6 +52,7 @@
5252
* spring-rabbit-stream.
5353
*
5454
* @author Gary Russell
55+
* @author Artem Bilan
5556
* @since 3.2
5657
*
5758
*/
@@ -67,11 +68,10 @@ private StreamUtils() {
6768
* @param group the group.
6869
* @param properties the properties.
6970
* @param destination the destination.
70-
* @param extension the properties extension.
7171
* @param applicationContext the application context.
7272
* @return the container.
7373
*/
74-
public static MessageListenerContainer createContainer(ConsumerDestination consumerDestination, String group,
74+
public static ObservableListenerContainer createContainer(ConsumerDestination consumerDestination, String group,
7575
ExtendedConsumerProperties<RabbitConsumerProperties> properties, String destination,
7676
ApplicationContext applicationContext) {
7777

@@ -143,7 +143,7 @@ public void fromHeadersToReply(MessageHeaders headers, MessageProperties target)
143143
* @param errorChannel the error channel
144144
* @param destination the destination.
145145
* @param extendedProperties the extended properties.
146-
* @param abstractApplicationContext the application context.
146+
* @param applicationContext the application context.
147147
* @param headerMapperFunction the header mapper function.
148148
* @return the handler.
149149
*/

binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/config/ObservationAutoConfiguration.java

Lines changed: 0 additions & 63 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
11
org.springframework.cloud.stream.binder.rabbit.config.ExtendedBindingHandlerMappingsProviderConfiguration
2-
org.springframework.cloud.stream.binder.rabbit.config.ObservationAutoConfiguration
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.binder.rabbit.integration;
18+
19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.function.Consumer;
22+
import java.util.stream.Collectors;
23+
24+
import brave.handler.SpanHandler;
25+
import brave.test.TestSpanHandler;
26+
import io.micrometer.observation.Observation;
27+
import io.micrometer.observation.ObservationRegistry;
28+
import io.micrometer.tracing.brave.bridge.BraveFinishedSpan;
29+
import io.micrometer.tracing.test.simple.SpansAssert;
30+
import org.junit.jupiter.api.Test;
31+
import org.testcontainers.containers.RabbitMQContainer;
32+
33+
import org.springframework.beans.factory.annotation.Autowired;
34+
import org.springframework.boot.SpringBootConfiguration;
35+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
36+
import org.springframework.boot.test.autoconfigure.actuate.observability.AutoConfigureObservability;
37+
import org.springframework.boot.test.context.SpringBootTest;
38+
import org.springframework.cloud.stream.binder.rabbit.RabbitTestContainer;
39+
import org.springframework.cloud.stream.function.StreamBridge;
40+
import org.springframework.context.annotation.Bean;
41+
import org.springframework.messaging.Message;
42+
import org.springframework.test.annotation.DirtiesContext;
43+
import org.springframework.test.context.DynamicPropertyRegistry;
44+
import org.springframework.test.context.DynamicPropertySource;
45+
46+
import static org.assertj.core.api.Assertions.assertThat;
47+
import static org.awaitility.Awaitility.await;
48+
49+
/**
50+
* @author Artem Bilan
51+
* @since 4.1.1
52+
*/
53+
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE,
54+
args = "--spring.config.location=classpath:/rabbit-multi-binder-observation.yml")
55+
@DirtiesContext
56+
@AutoConfigureObservability
57+
public class RabbitMultiBinderObservationTests {
58+
59+
private static final TestSpanHandler SPANS = new TestSpanHandler();
60+
61+
private static final RabbitMQContainer RABBITMQ = RabbitTestContainer.sharedInstance();
62+
63+
@Autowired
64+
StreamBridge streamBridge;
65+
66+
@Autowired
67+
ObservationRegistry observationRegistry;
68+
69+
@Autowired
70+
TestConfiguration testConfiguration;
71+
72+
@DynamicPropertySource
73+
static void rabbitProperties(DynamicPropertyRegistry registry) {
74+
registry.add("spring.rabbitmq.port", RABBITMQ::getAmqpPort);
75+
}
76+
77+
@Test
78+
void observationIsPropagatedInMultiBinderConfiguration() throws InterruptedException {
79+
Observation.createNotStarted("test parent observation", this.observationRegistry)
80+
.observe(() -> this.streamBridge.send("test-out-0", "test data"));
81+
82+
assertThat(this.testConfiguration.messageReceived.await(10, TimeUnit.SECONDS)).isTrue();
83+
84+
// There is a race condition when we already have a reply, but the span in the
85+
// Rabbit listener is not closed yet.
86+
// parent -> StreamBridge -> RabbitTemplate -> Rabbit Listener -> Consumer
87+
await().untilAsserted(() -> assertThat(SPANS.spans()).hasSize(5));
88+
SpansAssert.assertThat(SPANS.spans().stream().map(BraveFinishedSpan::fromBrave).collect(Collectors.toList()))
89+
.haveSameTraceId();
90+
}
91+
92+
@SpringBootConfiguration
93+
@EnableAutoConfiguration
94+
public static class TestConfiguration {
95+
96+
final CountDownLatch messageReceived = new CountDownLatch(1);
97+
98+
@Bean
99+
SpanHandler testSpanHandler() {
100+
return SPANS;
101+
}
102+
103+
@Bean
104+
public Consumer<Message<?>> testListener() {
105+
return message -> this.messageReceived.countDown();
106+
}
107+
108+
}
109+
110+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
spring:
2+
cloud:
3+
function:
4+
definition: testListener
5+
stream:
6+
output-bindings: test
7+
bindings:
8+
test-out-0:
9+
binder: rabbit
10+
destination: test
11+
group: test
12+
testListener-in-0:
13+
binder: rabbit
14+
destination: test
15+
group: test
16+
binders:
17+
rabbit:
18+
type: rabbit
19+
environment:
20+
spring:
21+
cloud:
22+
stream:
23+
rabbit:
24+
binder:
25+
enableObservation: true
26+
logging:
27+
pattern:
28+
level: "%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]"
29+
30+
management:
31+
tracing:
32+
sampling:
33+
probability: 1
34+

core/spring-cloud-stream/src/main/resources/META-INF/shared.beans

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer
1313
org.springframework.cloud.stream.binder.test.InputDestination
1414
org.springframework.cloud.stream.binder.test.OutputDestination
1515
org.springframework.cloud.stream.config.BindingHandlerAdvise
16+
io.micrometer.observation.ObservationRegistry

0 commit comments

Comments
 (0)