71
71
/**
72
72
* A class which allows user to send data to an output binding.
73
73
* While in a common scenario of a typical spring-cloud-stream application user rarely
74
- * has to manually send data, there are times when the sources of data are outside of
75
- * spring-cloud-stream context and therefore we need to bridge such foreign sources
74
+ * has to manually send data, there are times when the sources of data are outside
75
+ * spring-cloud-stream context, and therefore we need to bridge such foreign sources
76
76
* with spring-cloud-stream.
77
77
* <br><br>
78
78
* This utility class allows user to do just that - <i>bridge non-spring-cloud-stream applications
79
79
* with spring-cloud-stream</i> by providing a mechanism (bridge) to send data to an output binding while
80
- * maintaining the same invocation contract (i.e., type conversion, partitioning etc) as if it was
80
+ * maintaining the same invocation contract (i.e., type conversion, partitioning etc. ) as if it was
81
81
* done through a declared function.
82
82
*
83
83
* @author Oleg Zhurakousky
@@ -114,7 +114,7 @@ public final class StreamBridge implements StreamOperations, SmartInitializingSi
114
114
private final FunctionInvocationHelper <?> functionInvocationHelper ;
115
115
116
116
private ExecutorService executorService ;
117
-
117
+
118
118
private static final boolean isContextPropagationPresent = ClassUtils .isPresent (
119
119
"io.micrometer.context.ContextSnapshotFactory" , StreamBridge .class .getClassLoader ());
120
120
@@ -334,7 +334,7 @@ public void destroy() throws Exception {
334
334
else {
335
335
this .executorService .shutdown ();
336
336
}
337
-
337
+
338
338
this .executorService = null ;
339
339
this .async = false ;
340
340
channelCache .keySet ().forEach (bindingService ::unbindProducers );
@@ -349,11 +349,9 @@ public void setAsync(boolean async) {
349
349
if (isContextPropagationPresent ) {
350
350
this .executorService = ContextPropagationHelper .wrap (this .executorService );
351
351
}
352
- this .executorService = ContextExecutorService
353
- .wrap (Executors .newCachedThreadPool (), () -> ContextSnapshotFactory .builder ().build ().captureAll ());
354
352
this .async = async ;
355
353
}
356
-
354
+
357
355
private static final class ContextPropagationHelper {
358
356
static ExecutorService wrap (ExecutorService executorService ) {
359
357
return ContextExecutorService .wrap (executorService , () -> ContextSnapshotFactory .builder ().build ().captureAll ());
0 commit comments