Skip to content

Bulkhead(Executor) does not always release permits #393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
Techdaan opened this issue Mar 23, 2025 · 3 comments
Open

Bulkhead(Executor) does not always release permits #393

Techdaan opened this issue Mar 23, 2025 · 3 comments

Comments

@Techdaan
Copy link

Techdaan commented Mar 23, 2025

I believe the root cause is how the FutureLinkedList is used in BulkheadImpl.

When trying to acquire a permit from the BulkheadImpl with a maxWaitTime, if there are no permits immediately available, a new Future will be added to the FutureLinkedList. When that future is completed, the Future is removed from the FutureLinkedList. If a permit is immediately available, a completed future is returned.

The returned future is completed in BulkheadImpl#releasePermit, so unless BulkheadImpl#releasePermit is called, the future is never removed from the FutureLinkedList. This causes a memory leak (The node is never removed from the list), and over time new permits cannot be issued.

This is a big problem when using a bulkhead in a FailsafeExecutor. Permits are acquired with a maxWaitTime in BulkheadExecutor#preExecute. If you submit more work to the executor than there are permits, the permits rejected due to BulkheadFullException are never released. When that happens, the bulkhead enters an irrecoverable, broken state. The permits are never released because in this case neither onSuccess or onFailure is called in BulkheadExecutor.

I wrote a small example showcasing how this bug can be triggered.

public static void main(String[] args) {
    Bulkhead<Object> bulkhead = Bulkhead.builder(1) // Max concurrency of 1.
            .withMaxWaitTime(Duration.ZERO)
            .build();
    FailsafeExecutor<Object> failsafe = Failsafe.with(bulkhead);

    AtomicInteger numBulkheadFullErrors = new AtomicInteger();
    List<Thread> threads = new ArrayList<>();
    for (int i = 0; i < 3; i++) { // Create 3 threads submitting work to the bulkhead
        threads.add(new Thread(() -> {
            for (int j = 0; j < 10; j++) {
                try {
                    failsafe.get(() -> null); // Submit work to the bulkhead
                } catch (BulkheadFullException e) {
                    numBulkheadFullErrors.incrementAndGet();
                }
            }
        }));
    }

    // Start and join the threads
    threads.forEach(Thread::start);
    threads.forEach(t -> {
        try {
            t.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    // Wait for the executor to finish all work. 250ms is plenty of time to finish the work submitted above.
    try {
        Thread.sleep(250);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    // Expected behaviour: Program outputs "Success" with a few errors.
    // Actual behaviour: Program outputs "Error: BulkheadFullException" with many errors (29 on my local machine).
    try {
        System.out.println("Success: " + failsafe.get(() -> true) + ", total during looping: " + numBulkheadFullErrors.get());
    } catch (BulkheadFullException e) {
        System.out.println("Error: BulkheadFullException, total during looping: " + numBulkheadFullErrors.get());
    }
}

Note that this bug can also be replicated using code found in the documentation, if a maxWaitTime is provided to the tryAcquirePermit function. Internally, tryAcquirePermit may create a future, but this future is never released (because tryAcquirePermit returned false).

if (bulkhead.tryAcquirePermit()) { 
  try {
    doSomething();
  } finally {
    bulkhead.releasePermit();
  }
}
@Tembrel
Copy link
Contributor

Tembrel commented Mar 23, 2025

Would adding future.complete(null) in the catch block here fix this?

try {
future.get(maxWaitTime.toNanos(), TimeUnit.NANOSECONDS);
return true;
} catch (CancellationException | ExecutionException | TimeoutException e) {
return false;
}

@Techdaan
Copy link
Author

Simply completing the future in that catch block still caused the test code above to fail. I think because the permits integer is not incremented. I tried the following, which did solve that specific test case for me:

  @Override
  public boolean tryAcquirePermit(Duration maxWaitTime) throws InterruptedException {
    CompletableFuture<Void> future = acquirePermitAsync();
    if (future == NULL_FUTURE)
      return true;

    try {
      future.get(maxWaitTime.toNanos(), TimeUnit.NANOSECONDS);
      return true;
    } catch (CancellationException | ExecutionException | TimeoutException e) {
      releaseSpecificPermit(future); // Release the specific permit
      return false;
    }
  }

  private synchronized void releaseSpecificPermit(CompletableFuture<Void> future) {
    future.complete(null);

    if (permits < maxPermits) {
      permits += 1;
    }
  }

I am not familiar with this code though, so I am not sure if this could break something or introduce other bugs.

@C0mbatwombat
Copy link

This seems to me to be a major/critical issue? If there are more BulkheadFullExceptions than permits, the whole system blocks.

Or am I missing something? This could very well be the case because the library seems to be widely used.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants