diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java index be221a1100d..7378c911192 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java @@ -18,6 +18,8 @@ import java.util.concurrent.locks.ReentrantLock; +import org.jspecify.annotations.Nullable; + import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.integration.context.IntegrationContextUtils; @@ -50,9 +52,9 @@ public abstract class AbstractEndpoint extends IntegrationObjectSupport protected final ReentrantLock lifecycleLock = new ReentrantLock(); // NOSONAR - private String role; + private @Nullable String role; - private SmartLifecycleRoleController roleController; + private @Nullable SmartLifecycleRoleController roleController; private boolean autoStartup = true; @@ -81,11 +83,11 @@ public void setPhase(int phase) { * @see org.springframework.context.SmartLifecycle * @see org.springframework.integration.support.SmartLifecycleRoleController */ - public void setRole(String role) { + public void setRole(@Nullable String role) { this.role = role; } - public String getRole() { + public @Nullable String getRole() { return this.role; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractFetchLimitingMessageSource.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractFetchLimitingMessageSource.java index f17c51dda02..27b05343cb8 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractFetchLimitingMessageSource.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractFetchLimitingMessageSource.java @@ -16,6 +16,8 @@ package org.springframework.integration.endpoint; +import org.jspecify.annotations.Nullable; + import org.springframework.integration.support.management.MessageSourceManagement; /** @@ -43,7 +45,7 @@ public int getMaxFetchSize() { } @Override - protected Object doReceive() { + protected @Nullable Object doReceive() { return doReceive(this.maxFetchSize); } @@ -55,6 +57,6 @@ protected Object doReceive() { * necessary. * @return The value returned. */ - protected abstract Object doReceive(int maxFetchSizeToReceive); + protected abstract @Nullable Object doReceive(int maxFetchSizeToReceive); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractMessageSource.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractMessageSource.java index a0451cbca78..bd85a107ee9 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractMessageSource.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractMessageSource.java @@ -61,19 +61,20 @@ public abstract class AbstractMessageSource extends AbstractExpressionEvaluat private final Set meters = ConcurrentHashMap.newKeySet(); - private Map headerExpressions; + private @Nullable Map headerExpressions; + @SuppressWarnings("NullAway.Init") private String beanName; - private String managedType; + private @Nullable String managedType; - private String managedName; + private @Nullable String managedName; private boolean loggingEnabled = true; - private MetricsCaptor metricsCaptor; + private @Nullable MetricsCaptor metricsCaptor; - private CounterFacade receiveCounter; + private @Nullable CounterFacade receiveCounter; public void setHeaderExpressions(@Nullable Map headerExpressions) { if (!CollectionUtils.isEmpty(headerExpressions)) { @@ -97,7 +98,7 @@ public void setManagedType(String managedType) { } @Override - public String getManagedType() { + public @Nullable String getManagedType() { return this.managedType; } @@ -107,7 +108,7 @@ public void setManagedName(String managedName) { } @Override - public String getManagedName() { + public @Nullable String getManagedName() { return this.managedName; } @@ -133,7 +134,7 @@ public ManagementOverrides getOverrides() { } @Override - public final Message receive() { + public final @Nullable Message receive() { try { return buildMessage(doReceive()); } @@ -146,12 +147,12 @@ public final Message receive() { } @SuppressWarnings("unchecked") - protected Message buildMessage(Object result) { + protected @Nullable Message buildMessage(@Nullable Object result) { if (result == null) { return null; } Message message; - Map headers = evaluateHeaders(); + Map headers = evaluateHeaders(); if (result instanceof AbstractIntegrationMessageBuilder messageBuilder) { if (!CollectionUtils.isEmpty(headers)) { messageBuilder.copyHeaders(headers); @@ -190,9 +191,10 @@ private void incrementReceiveCounter() { this.receiveCounter.increment(); } + @SuppressWarnings("NullAway") // dataflow analysis limitation private CounterFacade createCounter(boolean success, String exception) { CounterFacade counter = this.metricsCaptor.counterBuilder(RECEIVE_COUNTER_NAME) - .tag("name", getComponentName() == null ? "unknown" : getComponentName()) + .tag("name", getComponentName()) .tag("type", "source") .tag("result", success ? "success" : "failure") .tag("exception", exception) @@ -203,7 +205,7 @@ private CounterFacade createCounter(boolean success, String exception) { } @Nullable - private Map evaluateHeaders() { + private Map evaluateHeaders() { return CollectionUtils.isEmpty(this.headerExpressions) ? null : ExpressionEvalMap.from(this.headerExpressions) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java index 6a30a60b2ee..166f336f7a6 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java @@ -62,7 +62,6 @@ import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; -import org.springframework.util.ClassUtils; import org.springframework.util.CollectionUtils; import org.springframework.util.ErrorHandler; import org.springframework.util.ReflectionUtils; @@ -98,27 +97,29 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint implement private boolean syncExecutor = true; - private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader(); + @SuppressWarnings("NullAway.Init") + private ClassLoader beanClassLoader; private Trigger trigger = new PeriodicTrigger(Duration.ofMillis(DEFAULT_POLLING_PERIOD)); - private ErrorHandler errorHandler; + private @Nullable ErrorHandler errorHandler; private boolean errorHandlerIsDefault; - private List adviceChain; + private @Nullable List adviceChain; - private TransactionSynchronizationFactory transactionSynchronizationFactory; + private @Nullable TransactionSynchronizationFactory transactionSynchronizationFactory; private volatile long maxMessagesPerPoll = -1; - private volatile Callable> pollingTask; + @SuppressWarnings("NullAway.Init") + private volatile Callable<@Nullable Message> pollingTask; - private volatile Flux> pollingFlux; + private volatile @Nullable Flux> pollingFlux; - private volatile Subscription subscription; + private volatile @Nullable Subscription subscription; - private volatile ScheduledFuture runningTask; + private volatile @Nullable ScheduledFuture runningTask; private volatile boolean initialized; @@ -127,11 +128,11 @@ public AbstractPollingEndpoint() { this.setPhase(Integer.MAX_VALUE / 2); } - public void setTaskExecutor(Executor taskExecutor) { + public void setTaskExecutor(@Nullable Executor taskExecutor) { this.taskExecutor = (taskExecutor != null ? taskExecutor : new SyncTaskExecutor()); this.syncExecutor = this.taskExecutor instanceof SyncTaskExecutor - || (this.taskExecutor instanceof ErrorHandlingTaskExecutor - && ((ErrorHandlingTaskExecutor) this.taskExecutor).isSyncExecutor()); + || (this.taskExecutor instanceof ErrorHandlingTaskExecutor errorHandlingTaskExecutor + && errorHandlingTaskExecutor.isSyncExecutor()); } protected Executor getTaskExecutor() { @@ -142,11 +143,11 @@ protected boolean isSyncExecutor() { return this.syncExecutor; } - public void setTrigger(Trigger trigger) { + public void setTrigger(@Nullable Trigger trigger) { this.trigger = (trigger != null ? trigger : new PeriodicTrigger(Duration.ofMillis(DEFAULT_POLLING_PERIOD))); } - public void setAdviceChain(List adviceChain) { + public void setAdviceChain(@Nullable List adviceChain) { this.adviceChain = adviceChain; } @@ -167,7 +168,7 @@ public long getMaxMessagesPerPoll() { return this.maxMessagesPerPoll; } - public void setErrorHandler(ErrorHandler errorHandler) { + public void setErrorHandler(@Nullable ErrorHandler errorHandler) { this.errorHandler = errorHandler; } @@ -177,7 +178,7 @@ public void setBeanClassLoader(ClassLoader classLoader) { } public void setTransactionSynchronizationFactory( - TransactionSynchronizationFactory transactionSynchronizationFactory) { + @Nullable TransactionSynchronizationFactory transactionSynchronizationFactory) { this.transactionSynchronizationFactory = transactionSynchronizationFactory; } @@ -188,7 +189,7 @@ public void setTransactionSynchronizationFactory( * @return the channel or null. * @since 4.3 */ - public MessageChannel getDefaultErrorChannel() { + public @Nullable MessageChannel getDefaultErrorChannel() { if (!this.errorHandlerIsDefault && this.errorHandler instanceof MessagePublishingErrorHandler messagePublishingErrorHandler) { @@ -255,11 +256,11 @@ protected boolean isReactive() { return false; } - protected Flux> getPollingFlux() { + protected @Nullable Flux> getPollingFlux() { return this.pollingFlux; } - protected Object getReceiveMessageSource() { + protected @Nullable Object getReceiveMessageSource() { return null; } @@ -275,7 +276,7 @@ protected void onInit() { return; } Assert.notNull(this.trigger, "Trigger is required"); - if (this.taskExecutor != null && !(this.taskExecutor instanceof ErrorHandlingTaskExecutor)) { + if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor)) { if (this.errorHandler == null) { this.errorHandler = ChannelUtils.getErrorHandler(getBeanFactory()); this.errorHandlerIsDefault = true; @@ -314,13 +315,12 @@ protected void doStart() { } else { TaskScheduler taskScheduler = getTaskScheduler(); - Assert.state(taskScheduler != null, "unable to start polling, no taskScheduler available"); this.runningTask = taskScheduler.schedule(createPoller(), this.trigger); } } @SuppressWarnings("unchecked") - private Callable> createPollingTask() { + private Callable<@Nullable Message> createPollingTask() { List receiveOnlyAdviceChain = null; if (!CollectionUtils.isEmpty(this.adviceChain)) { receiveOnlyAdviceChain = this.adviceChain.stream() @@ -328,7 +328,7 @@ private Callable> createPollingTask() { .toList(); } - Callable> task = this::doPoll; + Callable<@Nullable Message> task = this::doPoll; List advices = this.adviceChain; if (!CollectionUtils.isEmpty(advices)) { @@ -338,7 +338,7 @@ private Callable> createPollingTask() { .filter(advice -> !isReceiveOnlyAdvice(advice)) .forEach(proxyFactory::addAdvice); } - task = (Callable>) proxyFactory.getProxy(this.beanClassLoader); + task = (Callable<@Nullable Message>) proxyFactory.getProxy(this.beanClassLoader); } if (!CollectionUtils.isEmpty(receiveOnlyAdviceChain)) { applyReceiveOnlyAdviceChain(receiveOnlyAdviceChain); @@ -418,31 +418,36 @@ private Flux> createFluxGenerator() { .doOnSubscribe(subs -> this.subscription = subs); } - private Message pollForMessage() { + private @Nullable Message pollForMessage() { Exception pollingTaskError = null; try { return this.pollingTask.call(); } catch (Exception ex) { pollingTaskError = ex; - if (ex instanceof MessagingException messagingException) { // NOSONAR + if (ex instanceof MessagingException messagingException) { throw messagingException; } else { Message failedMessage = null; if (this.transactionSynchronizationFactory != null) { - Object resource = TransactionSynchronizationManager.getResource(getResourceToBind()); + Object resource = null; + Object resourceToBind = getResourceToBind(); + if (resourceToBind != null) { + resource = TransactionSynchronizationManager.getResource(resourceToBind); + } if (resource instanceof IntegrationResourceHolder integrationResourceHolder) { failedMessage = integrationResourceHolder.getMessage(); } } - throw new MessagingException(failedMessage, ex); // NOSONAR (null failedMessage) + throw failedMessage == null ? new MessagingException((String) null, ex) + : new MessagingException(failedMessage, ex); } } finally { if (this.transactionSynchronizationFactory != null) { Object resource = getResourceToBind(); - if (TransactionSynchronizationManager.hasResource(resource)) { + if (resource != null && TransactionSynchronizationManager.hasResource(resource)) { TransactionSynchronizationManager.unbindResource(resource); } } @@ -450,7 +455,7 @@ private Message pollForMessage() { } } - private Message doPoll() { + private @Nullable Message doPoll() { IntegrationResourceHolder holder = bindResourceHolderIfNecessary(getResourceKey(), getResourceToBind()); Message message = null; try { @@ -517,7 +522,7 @@ protected void doStop() { * if no message is immediately available. * @return The message or null. */ - protected abstract Message receiveMessage(); + protected abstract @Nullable Message receiveMessage(); /** * Handle a message. @@ -530,7 +535,7 @@ protected void doStop() { * synchronization. * @return The resource, or null if transaction synchronization is not required. */ - protected Object getResourceToBind() { + protected @Nullable Object getResourceToBind() { return null; } @@ -540,14 +545,14 @@ protected Object getResourceToBind() { * {@link org.springframework.integration.transaction.ExpressionEvaluatingTransactionSynchronizationProcessor} * makes this attribute available as a variable in SpEL expressions. * @return The key, or null (default) if the resource shouldn't be - * made available as a attribute. + * made available as an attribute. */ - protected String getResourceKey() { + protected @Nullable String getResourceKey() { return null; } @Nullable - private IntegrationResourceHolder bindResourceHolderIfNecessary(String key, Object resource) { + private IntegrationResourceHolder bindResourceHolderIfNecessary(@Nullable String key, @Nullable Object resource) { if (this.transactionSynchronizationFactory != null && resource != null && TransactionSynchronizationManager.isActualTransactionActive()) { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/EventDrivenConsumer.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/EventDrivenConsumer.java index e30d7d3d64c..a1604aa472d 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/EventDrivenConsumer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/EventDrivenConsumer.java @@ -16,6 +16,8 @@ package org.springframework.integration.endpoint; +import org.jspecify.annotations.Nullable; + import org.springframework.context.Lifecycle; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.router.MessageRouter; @@ -60,7 +62,7 @@ public MessageChannel getInputChannel() { } @Override - public MessageChannel getOutputChannel() { + public @Nullable MessageChannel getOutputChannel() { if (this.handler instanceof MessageProducer messageProducer) { return messageProducer.getOutputChannel(); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ExpressionEvaluatingMessageSource.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ExpressionEvaluatingMessageSource.java index 5ee81ce16c1..d64640844d3 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ExpressionEvaluatingMessageSource.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ExpressionEvaluatingMessageSource.java @@ -16,6 +16,8 @@ package org.springframework.integration.endpoint; +import org.jspecify.annotations.Nullable; + import org.springframework.expression.Expression; import org.springframework.integration.context.ExpressionCapable; import org.springframework.util.Assert; @@ -46,7 +48,7 @@ public String getComponentType() { } @Override - public T doReceive() { + public @Nullable T doReceive() { return this.evaluateExpression(this.expression, this.expectedType); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ExpressionMessageProducerSupport.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ExpressionMessageProducerSupport.java index 25eef23a7a8..d3c3aa9c1b8 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ExpressionMessageProducerSupport.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ExpressionMessageProducerSupport.java @@ -16,6 +16,8 @@ package org.springframework.integration.endpoint; +import org.jspecify.annotations.Nullable; + import org.springframework.expression.EvaluationContext; import org.springframework.expression.Expression; import org.springframework.integration.expression.ExpressionUtils; @@ -33,8 +35,9 @@ */ public abstract class ExpressionMessageProducerSupport extends MessageProducerSupport { - private volatile Expression payloadExpression; + private volatile @Nullable Expression payloadExpression; + @SuppressWarnings("NullAway.Init") private volatile EvaluationContext evaluationContext; /** @@ -70,6 +73,7 @@ protected Object evaluatePayloadExpression(Object payload) { Object evaluationResult = payload; if (this.payloadExpression != null) { evaluationResult = this.payloadExpression.getValue(this.evaluationContext, payload); + Assert.state(evaluationResult != null, "the evaluation result from the payloadExpression must not be null"); } return evaluationResult; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/IntegrationConsumer.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/IntegrationConsumer.java index de7fef994c1..aa550b906d8 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/IntegrationConsumer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/IntegrationConsumer.java @@ -16,6 +16,8 @@ package org.springframework.integration.endpoint; +import org.jspecify.annotations.Nullable; + import org.springframework.integration.support.context.NamedComponent; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @@ -40,7 +42,7 @@ public interface IntegrationConsumer extends NamedComponent { * Return the output channel (maybe null). * @return the output channel. */ - MessageChannel getOutputChannel(); + @Nullable MessageChannel getOutputChannel(); /** * Return the consumer's handler. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProcessorMessageSource.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProcessorMessageSource.java index 379df9b3901..2fe0c3724b9 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProcessorMessageSource.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProcessorMessageSource.java @@ -16,6 +16,8 @@ package org.springframework.integration.endpoint; +import org.jspecify.annotations.Nullable; + import org.springframework.integration.handler.MessageProcessor; /** @@ -42,7 +44,7 @@ public String getComponentType() { } @Override - protected Object doReceive() { + protected @Nullable Object doReceive() { return this.messageProcessor.processMessage(null); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java index 145f83f8a11..0e3615aed01 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java @@ -22,7 +22,6 @@ import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; -import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.core.AttributeAccessor; import org.springframework.integration.IntegrationPattern; @@ -68,17 +67,17 @@ public abstract class MessageProducerSupport extends AbstractEndpoint @Nullable private MessageReceiverObservationConvention observationConvention; - private MessageChannel outputChannel; + private @Nullable MessageChannel outputChannel; - private String outputChannelName; + private @Nullable String outputChannelName; - private MessageChannel errorChannel; + private @Nullable MessageChannel errorChannel; - private String errorChannelName; + private @Nullable String errorChannelName; private boolean shouldTrack = false; - private volatile Subscription subscription; + private volatile @Nullable Subscription subscription; @SuppressWarnings("this-escape") protected MessageProducerSupport() { @@ -103,7 +102,7 @@ public void setOutputChannelName(String outputChannelName) { } @Override - public MessageChannel getOutputChannel() { + public @Nullable MessageChannel getOutputChannel() { String channelName = this.outputChannelName; if (channelName != null) { this.outputChannel = getChannelResolver().resolveDestination(channelName); @@ -219,10 +218,7 @@ public void afterSingletonsInstantiated() { @Override protected void onInit() { super.onInit(); - BeanFactory beanFactory = getBeanFactory(); - if (beanFactory != null) { - this.messagingTemplate.setBeanFactory(beanFactory); - } + this.messagingTemplate.setBeanFactory(getBeanFactory()); } /** @@ -248,7 +244,7 @@ protected void doStop() { } } - protected void sendMessage(Message message) { + protected void sendMessage(@Nullable Message message) { if (message == null) { throw new MessagingException("cannot send a null message"); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MethodInvokingMessageSource.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MethodInvokingMessageSource.java index 3631fb6cb1e..f2c85af7857 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MethodInvokingMessageSource.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MethodInvokingMessageSource.java @@ -20,6 +20,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.jspecify.annotations.Nullable; + import org.springframework.context.Lifecycle; import org.springframework.integration.support.management.ManageableLifecycle; import org.springframework.messaging.MessagingException; @@ -37,11 +39,13 @@ */ public class MethodInvokingMessageSource extends AbstractMessageSource implements ManageableLifecycle { + @SuppressWarnings("NullAway.Init") private volatile Object object; + @SuppressWarnings("NullAway.Init") private volatile Method method; - private volatile String methodName; + private volatile @Nullable String methodName; private volatile boolean initialized; @@ -67,6 +71,7 @@ public String getComponentType() { return "inbound-channel-adapter"; } + @SuppressWarnings("NullAway") // dataflow analysis limitation @Override protected void onInit() { this.initializationMonitor.lock(); @@ -111,7 +116,7 @@ public boolean isRunning() { } @Override - protected Object doReceive() { + protected @Nullable Object doReceive() { try { if (!this.initialized) { this.afterPropertiesSet(); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/PollingConsumer.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/PollingConsumer.java index 2a71d189bd8..b8a4a9b4569 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/PollingConsumer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/PollingConsumer.java @@ -21,6 +21,7 @@ import java.util.Iterator; import java.util.List; +import org.jspecify.annotations.Nullable; import org.reactivestreams.Subscriber; import org.springframework.context.Lifecycle; @@ -59,7 +60,7 @@ public class PollingConsumer extends AbstractPollingEndpoint implements Integrat private final MessageHandler handler; - private final List channelInterceptors; + private final @Nullable List channelInterceptors; private PollableChannel inputChannel; @@ -92,7 +93,7 @@ public MessageChannel getInputChannel() { } @Override - public MessageChannel getOutputChannel() { + public @Nullable MessageChannel getOutputChannel() { if (this.handler instanceof MessageProducer messageProducer) { return messageProducer.getOutputChannel(); } @@ -177,7 +178,8 @@ protected void handleMessage(Message message) { } } - private Message applyBeforeHandle(Message message, Deque interceptorStack) { + @SuppressWarnings("NullAway") // dataflow analysis limitation + private @Nullable Message applyBeforeHandle(Message message, Deque interceptorStack) { Message theMessage = message; for (ChannelInterceptor interceptor : this.channelInterceptors) { if (interceptor instanceof ExecutorChannelInterceptor executorInterceptor) { @@ -194,7 +196,7 @@ private Message applyBeforeHandle(Message message, Deque message, Exception ex, + private void triggerAfterMessageHandled(Message message, @Nullable Exception ex, Deque interceptorStack) { Iterator iterator = interceptorStack.descendingIterator(); @@ -210,7 +212,7 @@ private void triggerAfterMessageHandled(Message message, Exception ex, } @Override - protected Message receiveMessage() { + protected @Nullable Message receiveMessage() { return (this.receiveTimeout >= 0) ? this.inputChannel.receive(this.receiveTimeout) : this.inputChannel.receive(); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java index 9fae5997a33..ea039ead9dc 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java @@ -75,9 +75,10 @@ public class ReactiveStreamsConsumer extends AbstractEndpoint implements Integra @Nullable private Function>, ? extends Publisher>> reactiveCustomizer; + @SuppressWarnings("NullAway.Init") private ErrorHandler errorHandler; - private volatile Disposable subscription; + private volatile @Nullable Disposable subscription; @SuppressWarnings("unchecked") public ReactiveStreamsConsumer(MessageChannel inputChannel, MessageHandler messageHandler) { @@ -145,7 +146,7 @@ public MessageChannel getInputChannel() { } @Override - public MessageChannel getOutputChannel() { + public @Nullable MessageChannel getOutputChannel() { if (this.handler instanceof MessageProducer messageProducer) { return messageProducer.getOutputChannel(); } @@ -212,7 +213,7 @@ private static final class MessageHandlerSubscriber private final Consumer> consumer; - private Subscription subscription; + private @Nullable Subscription subscription; private final MessageHandler messageHandler; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java index 0ede84b27eb..3b92186cd0f 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java @@ -22,11 +22,12 @@ import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationRegistry; import org.jspecify.annotations.Nullable; +import reactor.core.publisher.Flux; import org.springframework.aop.framework.Advised; import org.springframework.beans.factory.BeanCreationException; -import org.springframework.beans.factory.BeanFactory; import org.springframework.context.Lifecycle; +import org.springframework.expression.Expression; import org.springframework.integration.StaticMessageHeaderAccessor; import org.springframework.integration.acks.AckUtils; import org.springframework.integration.acks.AcknowledgmentCallback; @@ -64,17 +65,20 @@ public class SourcePollingChannelAdapter extends AbstractPollingEndpoint private final Lock lock = new ReentrantLock(); + @SuppressWarnings("NullAway.Init") private MessageSource originalSource; private ObservationRegistry observationRegistry = ObservationRegistry.NOOP; - private MessageReceiverObservationConvention observationConvention; + private @Nullable MessageReceiverObservationConvention observationConvention; + @SuppressWarnings("NullAway.Init") private volatile MessageSource source; + @SuppressWarnings("NullAway.Init") private volatile MessageChannel outputChannel; - private volatile String outputChannelName; + private volatile @Nullable String outputChannelName; private volatile boolean shouldTrack; @@ -89,7 +93,9 @@ public void setSource(MessageSource source) { this.originalSource = target != null ? (MessageSource) target : source; if (source instanceof ExpressionCapable expressionCapable) { - setPrimaryExpression(expressionCapable.getExpression()); + Expression expression = expressionCapable.getExpression(); + Assert.state(expression != null, "'expression' must not be null"); + setPrimaryExpression(expression); } } @@ -182,7 +188,9 @@ protected void doStart() { super.doStart(); if (isReactive()) { - ((ReactiveStreamsSubscribableChannel) this.outputChannel).subscribeTo(getPollingFlux()); + Flux> pollingFlux = getPollingFlux(); + Assert.state(pollingFlux != null, "'pollingFlux' must not be null"); + ((ReactiveStreamsSubscribableChannel) getOutputChannel()).subscribeTo(pollingFlux); } } @@ -201,10 +209,7 @@ protected void onInit() { || (this.outputChannelName != null && this.outputChannel == null), "One and only one of 'outputChannelName' or 'outputChannel' is required."); super.onInit(); - BeanFactory beanFactory = getBeanFactory(); - if (beanFactory != null) { - this.messagingTemplate.setBeanFactory(beanFactory); - } + this.messagingTemplate.setBeanFactory(getBeanFactory()); } public MessageChannel getOutputChannel() { @@ -246,7 +251,7 @@ protected void handleMessage(Message messageArg) { } @Override - protected Message receiveMessage() { + protected @Nullable Message receiveMessage() { return this.source.receive(); } @@ -296,7 +301,7 @@ protected String getResourceKey() { } @Nullable - private static Object extractProxyTarget(Object target) { + private static Object extractProxyTarget(@Nullable Object target) { if (!(target instanceof Advised advised)) { return target; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/management/package-info.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/management/package-info.java deleted file mode 100644 index 694a5401751..00000000000 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/management/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * Provides classes related to endpoint management. - */ -package org.springframework.integration.endpoint.management; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/package-info.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/package-info.java index 7d9c387e5e8..9f4677b7af2 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/package-info.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/package-info.java @@ -1,4 +1,5 @@ /** * Provides core classes related to Endpoints. */ +@org.jspecify.annotations.NullMarked package org.springframework.integration.endpoint; diff --git a/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ExpressionEvaluatingMessageSourceIntegrationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ExpressionEvaluatingMessageSourceIntegrationTests.java index 4b801ea45f5..f9c40c2800e 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ExpressionEvaluatingMessageSourceIntegrationTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ExpressionEvaluatingMessageSourceIntegrationTests.java @@ -74,6 +74,7 @@ public void test() throws Exception { adapter.setErrorHandler(t -> { throw new IllegalStateException("unexpected exception in test", t); }); + adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT); adapter.start(); List> messages = new ArrayList<>(); for (int i = 0; i < 3; i++) {