Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.util.concurrent.locks.ReentrantLock;

import org.jspecify.annotations.Nullable;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.integration.context.IntegrationContextUtils;
Expand Down Expand Up @@ -50,9 +52,9 @@ public abstract class AbstractEndpoint extends IntegrationObjectSupport

protected final ReentrantLock lifecycleLock = new ReentrantLock(); // NOSONAR

private String role;
private @Nullable String role;

private SmartLifecycleRoleController roleController;
private @Nullable SmartLifecycleRoleController roleController;

private boolean autoStartup = true;

Expand Down Expand Up @@ -81,11 +83,11 @@ public void setPhase(int phase) {
* @see org.springframework.context.SmartLifecycle
* @see org.springframework.integration.support.SmartLifecycleRoleController
*/
public void setRole(String role) {
public void setRole(@Nullable String role) {
this.role = role;
}

public String getRole() {
public @Nullable String getRole() {
return this.role;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.integration.endpoint;

import org.jspecify.annotations.Nullable;

import org.springframework.integration.support.management.MessageSourceManagement;

/**
Expand Down Expand Up @@ -43,7 +45,7 @@ public int getMaxFetchSize() {
}

@Override
protected Object doReceive() {
protected @Nullable Object doReceive() {
return doReceive(this.maxFetchSize);
}

Expand All @@ -55,6 +57,6 @@ protected Object doReceive() {
* necessary.
* @return The value returned.
*/
protected abstract Object doReceive(int maxFetchSizeToReceive);
protected abstract @Nullable Object doReceive(int maxFetchSizeToReceive);

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,20 @@ public abstract class AbstractMessageSource<T> extends AbstractExpressionEvaluat

private final Set<MeterFacade> meters = ConcurrentHashMap.newKeySet();

private Map<String, Expression> headerExpressions;
private @Nullable Map<String, Expression> headerExpressions;

@SuppressWarnings("NullAway.Init")
private String beanName;

private String managedType;
private @Nullable String managedType;

private String managedName;
private @Nullable String managedName;

private boolean loggingEnabled = true;

private MetricsCaptor metricsCaptor;
private @Nullable MetricsCaptor metricsCaptor;

private CounterFacade receiveCounter;
private @Nullable CounterFacade receiveCounter;

public void setHeaderExpressions(@Nullable Map<String, Expression> headerExpressions) {
if (!CollectionUtils.isEmpty(headerExpressions)) {
Expand All @@ -97,7 +98,7 @@ public void setManagedType(String managedType) {
}

@Override
public String getManagedType() {
public @Nullable String getManagedType() {
return this.managedType;
}

Expand All @@ -107,7 +108,7 @@ public void setManagedName(String managedName) {
}

@Override
public String getManagedName() {
public @Nullable String getManagedName() {
return this.managedName;
}

Expand All @@ -133,7 +134,7 @@ public ManagementOverrides getOverrides() {
}

@Override
public final Message<T> receive() {
public final @Nullable Message<T> receive() {
try {
return buildMessage(doReceive());
}
Expand All @@ -146,12 +147,12 @@ public final Message<T> receive() {
}

@SuppressWarnings("unchecked")
protected Message<T> buildMessage(Object result) {
protected @Nullable Message<T> buildMessage(@Nullable Object result) {
if (result == null) {
return null;
}
Message<?> message;
Map<String, Object> headers = evaluateHeaders();
Map<String, @Nullable Object> headers = evaluateHeaders();
if (result instanceof AbstractIntegrationMessageBuilder<?> messageBuilder) {
if (!CollectionUtils.isEmpty(headers)) {
messageBuilder.copyHeaders(headers);
Expand Down Expand Up @@ -190,9 +191,10 @@ private void incrementReceiveCounter() {
this.receiveCounter.increment();
}

@SuppressWarnings("NullAway") // dataflow analysis limitation
private CounterFacade createCounter(boolean success, String exception) {
CounterFacade counter = this.metricsCaptor.counterBuilder(RECEIVE_COUNTER_NAME)
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
.tag("name", getComponentName())
.tag("type", "source")
.tag("result", success ? "success" : "failure")
.tag("exception", exception)
Expand All @@ -203,7 +205,7 @@ private CounterFacade createCounter(boolean success, String exception) {
}

@Nullable
private Map<String, Object> evaluateHeaders() {
private Map<String, @Nullable Object> evaluateHeaders() {
return CollectionUtils.isEmpty(this.headerExpressions)
? null
: ExpressionEvalMap.from(this.headerExpressions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ErrorHandler;
import org.springframework.util.ReflectionUtils;
Expand Down Expand Up @@ -98,27 +97,29 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint implement

private boolean syncExecutor = true;

private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader();
@SuppressWarnings("NullAway.Init")
private ClassLoader beanClassLoader;

private Trigger trigger = new PeriodicTrigger(Duration.ofMillis(DEFAULT_POLLING_PERIOD));

private ErrorHandler errorHandler;
private @Nullable ErrorHandler errorHandler;

private boolean errorHandlerIsDefault;

private List<Advice> adviceChain;
private @Nullable List<Advice> adviceChain;

private TransactionSynchronizationFactory transactionSynchronizationFactory;
private @Nullable TransactionSynchronizationFactory transactionSynchronizationFactory;

private volatile long maxMessagesPerPoll = -1;

private volatile Callable<Message<?>> pollingTask;
@SuppressWarnings("NullAway.Init")
private volatile Callable<@Nullable Message<?>> pollingTask;

private volatile Flux<Message<?>> pollingFlux;
private volatile @Nullable Flux<Message<?>> pollingFlux;

private volatile Subscription subscription;
private volatile @Nullable Subscription subscription;

private volatile ScheduledFuture<?> runningTask;
private volatile @Nullable ScheduledFuture<?> runningTask;

private volatile boolean initialized;

Expand All @@ -127,11 +128,11 @@ public AbstractPollingEndpoint() {
this.setPhase(Integer.MAX_VALUE / 2);
}

public void setTaskExecutor(Executor taskExecutor) {
public void setTaskExecutor(@Nullable Executor taskExecutor) {
this.taskExecutor = (taskExecutor != null ? taskExecutor : new SyncTaskExecutor());
this.syncExecutor = this.taskExecutor instanceof SyncTaskExecutor
|| (this.taskExecutor instanceof ErrorHandlingTaskExecutor
&& ((ErrorHandlingTaskExecutor) this.taskExecutor).isSyncExecutor());
|| (this.taskExecutor instanceof ErrorHandlingTaskExecutor errorHandlingTaskExecutor
&& errorHandlingTaskExecutor.isSyncExecutor());
}

protected Executor getTaskExecutor() {
Expand All @@ -142,11 +143,11 @@ protected boolean isSyncExecutor() {
return this.syncExecutor;
}

public void setTrigger(Trigger trigger) {
public void setTrigger(@Nullable Trigger trigger) {
this.trigger = (trigger != null ? trigger : new PeriodicTrigger(Duration.ofMillis(DEFAULT_POLLING_PERIOD)));
}

public void setAdviceChain(List<Advice> adviceChain) {
public void setAdviceChain(@Nullable List<Advice> adviceChain) {
this.adviceChain = adviceChain;
}

Expand All @@ -167,7 +168,7 @@ public long getMaxMessagesPerPoll() {
return this.maxMessagesPerPoll;
}

public void setErrorHandler(ErrorHandler errorHandler) {
public void setErrorHandler(@Nullable ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}

Expand All @@ -177,7 +178,7 @@ public void setBeanClassLoader(ClassLoader classLoader) {
}

public void setTransactionSynchronizationFactory(
TransactionSynchronizationFactory transactionSynchronizationFactory) {
@Nullable TransactionSynchronizationFactory transactionSynchronizationFactory) {

this.transactionSynchronizationFactory = transactionSynchronizationFactory;
}
Expand All @@ -188,7 +189,7 @@ public void setTransactionSynchronizationFactory(
* @return the channel or null.
* @since 4.3
*/
public MessageChannel getDefaultErrorChannel() {
public @Nullable MessageChannel getDefaultErrorChannel() {
if (!this.errorHandlerIsDefault && this.errorHandler
instanceof MessagePublishingErrorHandler messagePublishingErrorHandler) {

Expand Down Expand Up @@ -255,11 +256,11 @@ protected boolean isReactive() {
return false;
}

protected Flux<Message<?>> getPollingFlux() {
protected @Nullable Flux<Message<?>> getPollingFlux() {
return this.pollingFlux;
}

protected Object getReceiveMessageSource() {
protected @Nullable Object getReceiveMessageSource() {
return null;
}

Expand All @@ -275,7 +276,7 @@ protected void onInit() {
return;
}
Assert.notNull(this.trigger, "Trigger is required");
if (this.taskExecutor != null && !(this.taskExecutor instanceof ErrorHandlingTaskExecutor)) {
if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor)) {
if (this.errorHandler == null) {
this.errorHandler = ChannelUtils.getErrorHandler(getBeanFactory());
this.errorHandlerIsDefault = true;
Expand Down Expand Up @@ -314,21 +315,20 @@ protected void doStart() {
}
else {
TaskScheduler taskScheduler = getTaskScheduler();
Assert.state(taskScheduler != null, "unable to start polling, no taskScheduler available");
this.runningTask = taskScheduler.schedule(createPoller(), this.trigger);
}
}

@SuppressWarnings("unchecked")
private Callable<Message<?>> createPollingTask() {
private Callable<@Nullable Message<?>> createPollingTask() {
List<Advice> receiveOnlyAdviceChain = null;
if (!CollectionUtils.isEmpty(this.adviceChain)) {
receiveOnlyAdviceChain = this.adviceChain.stream()
.filter(this::isReceiveOnlyAdvice)
.toList();
}

Callable<Message<?>> task = this::doPoll;
Callable<@Nullable Message<?>> task = this::doPoll;

List<Advice> advices = this.adviceChain;
if (!CollectionUtils.isEmpty(advices)) {
Expand All @@ -338,7 +338,7 @@ private Callable<Message<?>> createPollingTask() {
.filter(advice -> !isReceiveOnlyAdvice(advice))
.forEach(proxyFactory::addAdvice);
}
task = (Callable<Message<?>>) proxyFactory.getProxy(this.beanClassLoader);
task = (Callable<@Nullable Message<?>>) proxyFactory.getProxy(this.beanClassLoader);
}
if (!CollectionUtils.isEmpty(receiveOnlyAdviceChain)) {
applyReceiveOnlyAdviceChain(receiveOnlyAdviceChain);
Expand Down Expand Up @@ -418,39 +418,44 @@ private Flux<Message<?>> createFluxGenerator() {
.doOnSubscribe(subs -> this.subscription = subs);
}

private Message<?> pollForMessage() {
private @Nullable Message<?> pollForMessage() {
Exception pollingTaskError = null;
try {
return this.pollingTask.call();
}
catch (Exception ex) {
pollingTaskError = ex;
if (ex instanceof MessagingException messagingException) { // NOSONAR
if (ex instanceof MessagingException messagingException) {
throw messagingException;
}
else {
Message<?> failedMessage = null;
if (this.transactionSynchronizationFactory != null) {
Object resource = TransactionSynchronizationManager.getResource(getResourceToBind());
Object resource = null;
Object resourceToBind = getResourceToBind();
if (resourceToBind != null) {
resource = TransactionSynchronizationManager.getResource(resourceToBind);
}
if (resource instanceof IntegrationResourceHolder integrationResourceHolder) {
failedMessage = integrationResourceHolder.getMessage();
}
}
throw new MessagingException(failedMessage, ex); // NOSONAR (null failedMessage)
throw failedMessage == null ? new MessagingException((String) null, ex)
: new MessagingException(failedMessage, ex);
}
}
finally {
if (this.transactionSynchronizationFactory != null) {
Object resource = getResourceToBind();
if (TransactionSynchronizationManager.hasResource(resource)) {
if (resource != null && TransactionSynchronizationManager.hasResource(resource)) {
TransactionSynchronizationManager.unbindResource(resource);
}
}
donePollingTask(pollingTaskError);
}
}

private Message<?> doPoll() {
private @Nullable Message<?> doPoll() {
IntegrationResourceHolder holder = bindResourceHolderIfNecessary(getResourceKey(), getResourceToBind());
Message<?> message = null;
try {
Expand Down Expand Up @@ -517,7 +522,7 @@ protected void doStop() {
* if no message is immediately available.
* @return The message or null.
*/
protected abstract Message<?> receiveMessage();
protected abstract @Nullable Message<?> receiveMessage();

/**
* Handle a message.
Expand All @@ -530,7 +535,7 @@ protected void doStop() {
* synchronization.
* @return The resource, or null if transaction synchronization is not required.
*/
protected Object getResourceToBind() {
protected @Nullable Object getResourceToBind() {
return null;
}

Expand All @@ -540,14 +545,14 @@ protected Object getResourceToBind() {
* {@link org.springframework.integration.transaction.ExpressionEvaluatingTransactionSynchronizationProcessor}
* makes this attribute available as a variable in SpEL expressions.
* @return The key, or null (default) if the resource shouldn't be
* made available as a attribute.
* made available as an attribute.
*/
protected String getResourceKey() {
protected @Nullable String getResourceKey() {
return null;
}

@Nullable
private IntegrationResourceHolder bindResourceHolderIfNecessary(String key, Object resource) {
private IntegrationResourceHolder bindResourceHolderIfNecessary(@Nullable String key, @Nullable Object resource) {
if (this.transactionSynchronizationFactory != null && resource != null &&
TransactionSynchronizationManager.isActualTransactionActive()) {

Expand Down
Loading