Skip to content

Commit e713f7c

Browse files
authored
transport: pass network channel exceptions to close listeners (#127895)
Previously, exceptions encountered on a netty channel were caught and logged at some level, but not passed to the TcpChannel or Transport.Connection close listeners. This limited observability. This change implements this exception reporting and passing, with TcpChannel.onException and NodeChannels.closeAndFail reporting exceptions and their close listeners receiving them. Some test infrastructure (FakeTcpChannel) and assertions in close listener onFailure methods have been updated.
1 parent ab5ff67 commit e713f7c

File tree

16 files changed

+156
-29
lines changed

16 files changed

+156
-29
lines changed

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public void testConnectionLogging() throws IOException {
9696
"close connection log",
9797
TcpTransport.class.getCanonicalName(),
9898
Level.DEBUG,
99-
".*closed transport connection \\[[1-9][0-9]*\\] to .* with age \\[[0-9]+ms\\].*"
99+
".*closed transport connection \\[[1-9][0-9]*\\] to .* with age \\[[0-9]+ms\\]$"
100100
)
101101
);
102102

@@ -105,4 +105,24 @@ public void testConnectionLogging() throws IOException {
105105

106106
mockLog.assertAllExpectationsMatched();
107107
}
108+
109+
@TestLogging(
110+
value = "org.elasticsearch.transport.TcpTransport:DEBUG",
111+
reason = "to ensure we log exception disconnect events on DEBUG level"
112+
)
113+
public void testExceptionalDisconnectLogging() throws Exception {
114+
mockLog.addExpectation(
115+
new MockLog.PatternSeenEventExpectation(
116+
"exceptional close connection log",
117+
TcpTransport.class.getCanonicalName(),
118+
Level.DEBUG,
119+
".*closed transport connection \\[[1-9][0-9]*\\] to .* with age \\[[0-9]+ms\\], exception:.*"
120+
)
121+
);
122+
123+
final String nodeName = internalCluster().startNode();
124+
internalCluster().restartNode(nodeName);
125+
126+
mockLog.assertAllExpectationsMatched();
127+
}
108128
}

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,33 @@ public class Netty4TcpChannel implements TcpChannel {
3434
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
3535
private final ChannelStats stats = new ChannelStats();
3636
private final boolean rstOnClose;
37+
/**
38+
* Exception causing a close, reported to the {@link #closeContext} listener
39+
*/
40+
private volatile Exception closeException = null;
3741

3842
Netty4TcpChannel(Channel channel, boolean isServer, String profile, boolean rstOnClose, ChannelFuture connectFuture) {
3943
this.channel = channel;
4044
this.isServer = isServer;
4145
this.profile = profile;
4246
this.connectContext = new ListenableFuture<>();
4347
this.rstOnClose = rstOnClose;
44-
addListener(this.channel.closeFuture(), closeContext);
4548
addListener(connectFuture, connectContext);
49+
addListener(this.channel.closeFuture(), new ActionListener<>() {
50+
@Override
51+
public void onResponse(Void ignored) {
52+
if (closeException != null) {
53+
closeContext.onFailure(closeException);
54+
} else {
55+
closeContext.onResponse(null);
56+
}
57+
}
58+
59+
@Override
60+
public void onFailure(Exception e) {
61+
assert false : new AssertionError("netty channel closeFuture should never report a failure");
62+
}
63+
});
4664
}
4765

4866
@Override
@@ -95,6 +113,11 @@ public void addConnectListener(ActionListener<Void> listener) {
95113
connectContext.addListener(listener);
96114
}
97115

116+
@Override
117+
public void setCloseException(Exception e) {
118+
closeException = e;
119+
}
120+
98121
@Override
99122
public ChannelStats getChannelStats() {
100123
return stats;

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import java.util.Map;
5555

5656
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
57-
import static org.elasticsearch.core.Strings.format;
5857
import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE;
5958
import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED;
6059

@@ -308,18 +307,27 @@ protected void stopInternal() {
308307
}, serverBootstraps::clear, () -> clientBootstrap = null);
309308
}
310309

310+
static Exception exceptionFromThrowable(Throwable cause) {
311+
if (cause instanceof Error) {
312+
return new Exception(cause);
313+
} else {
314+
return (Exception) cause;
315+
}
316+
}
317+
311318
protected class ClientChannelInitializer extends ChannelInitializer<Channel> {
312319

313320
@Override
314321
protected void initChannel(Channel ch) throws Exception {
315-
addClosedExceptionLogger(ch);
316322
assert ch instanceof Netty4NioSocketChannel;
317323
NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
318324
setupPipeline(ch, false);
319325
}
320326

321327
@Override
322328
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
329+
Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get();
330+
channel.setCloseException(exceptionFromThrowable(cause));
323331
ExceptionsHelper.maybeDieOnAnotherThread(cause);
324332
super.exceptionCaught(ctx, cause);
325333
}
@@ -337,7 +345,6 @@ protected ServerChannelInitializer(String name) {
337345

338346
@Override
339347
protected void initChannel(Channel ch) throws Exception {
340-
addClosedExceptionLogger(ch);
341348
assert ch instanceof Netty4NioSocketChannel;
342349
NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
343350
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, rstOnClose, ch.newSucceededFuture());
@@ -348,6 +355,8 @@ protected void initChannel(Channel ch) throws Exception {
348355

349356
@Override
350357
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
358+
Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get();
359+
channel.setCloseException(exceptionFromThrowable(cause));
351360
ExceptionsHelper.maybeDieOnAnotherThread(cause);
352361
super.exceptionCaught(ctx, cause);
353362
}
@@ -383,26 +392,14 @@ protected InboundPipeline getInboundPipeline(Channel ch, boolean isRemoteCluster
383392
);
384393
}
385394

386-
private static void addClosedExceptionLogger(Channel channel) {
387-
Netty4Utils.addListener(channel.closeFuture(), channelFuture -> {
388-
if (channelFuture.isSuccess() == false && logger.isDebugEnabled()) {
389-
logger.debug(format("exception while closing channel: %s", channelFuture.channel()), channelFuture.cause());
390-
}
391-
});
392-
}
393-
394395
@ChannelHandler.Sharable
395396
private static class ServerChannelExceptionHandler extends ChannelInboundHandlerAdapter {
396397

397398
@Override
398399
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
399400
ExceptionsHelper.maybeDieOnAnotherThread(cause);
400401
Netty4TcpServerChannel serverChannel = ctx.channel().attr(SERVER_CHANNEL_KEY).get();
401-
if (cause instanceof Error) {
402-
onServerException(serverChannel, new Exception(cause));
403-
} else {
404-
onServerException(serverChannel, (Exception) cause);
405-
}
402+
onServerException(serverChannel, exceptionFromThrowable(cause));
406403
}
407404
}
408405
}

server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ public interface CloseableChannel extends Closeable {
3838
* channel. If the channel is already closed when the listener is added the listener will immediately be
3939
* executed by the thread that is attempting to add the listener.
4040
*
41+
* When the close completes but an exception prompted the closure, the exception will be passed to the
42+
* listener's onFailure method.
43+
*
4144
* @param listener to be executed
4245
*/
4346
void addCloseListener(ActionListener<Void> listener);

server/src/main/java/org/elasticsearch/tasks/TaskManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -736,11 +736,11 @@ private ChannelPendingTaskTracker startTrackingChannel(TcpChannel channel, Consu
736736
return curr;
737737
});
738738
if (tracker.registered.compareAndSet(false, true)) {
739-
channel.addCloseListener(ActionListener.wrap(r -> {
739+
channel.addCloseListener(ActionListener.running(() -> {
740740
final ChannelPendingTaskTracker removedTracker = channelPendingTaskTrackers.remove(channel);
741741
assert removedTracker == tracker;
742742
onChannelClosed(tracker);
743-
}, e -> { assert false : new AssertionError("must not be here", e); }));
743+
}));
744744
}
745745
return tracker;
746746
}

server/src/main/java/org/elasticsearch/transport/CloseableConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ public void close() {
4848
}
4949
}
5050

51+
public void closeAndFail(Exception e) {
52+
if (closed.compareAndSet(false, true)) {
53+
closeContext.onFailure(e);
54+
}
55+
}
56+
5157
@Override
5258
public void onRemoved() {
5359
if (removed.compareAndSet(false, true)) {

server/src/main/java/org/elasticsearch/transport/InboundHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,7 @@ private void handleHandshakeRequest(TcpChannel channel, InboundMessage message)
390390
() -> "error processing handshake version [" + header.getVersion() + "] received on [" + channel + "], closing channel",
391391
e
392392
);
393+
channel.setCloseException(e);
393394
channel.close();
394395
}
395396
}

server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ void sendResponse(
168168
),
169169
ex
170170
);
171+
channel.setCloseException(ex);
171172
channel.close();
172173
} else {
173174
sendErrorResponse(transportVersion, channel, requestId, action, responseStatsConsumer, ex);
@@ -204,6 +205,7 @@ void sendErrorResponse(
204205
} catch (Exception sendException) {
205206
sendException.addSuppressed(error);
206207
logger.error(() -> format("Failed to send error response on channel [%s], closing channel", channel), sendException);
208+
channel.setCloseException(sendException);
207209
channel.close();
208210
}
209211
}
@@ -431,6 +433,7 @@ private void maybeLogSlowMessage(boolean success) {
431433
}
432434
});
433435
} catch (RuntimeException ex) {
436+
channel.setCloseException(ex);
434437
Releasables.closeExpectNoException(() -> listener.onFailure(ex), () -> CloseableChannel.closeChannel(channel));
435438
throw ex;
436439
}

server/src/main/java/org/elasticsearch/transport/TcpChannel.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,13 @@ public interface TcpChannel extends CloseableChannel {
6666
*/
6767
void addConnectListener(ActionListener<Void> listener);
6868

69+
/**
70+
* Report a close-causing exception on this channel
71+
*
72+
* @param e the exception
73+
*/
74+
void setCloseException(Exception e);
75+
6976
/**
7077
* Returns stats about this channel
7178
*/

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -278,13 +278,26 @@ public TcpChannel channel(TransportRequestOptions.Type type) {
278278

279279
@Override
280280
public void close() {
281+
handleClose(null);
282+
}
283+
284+
@Override
285+
public void closeAndFail(Exception e) {
286+
handleClose(e);
287+
}
288+
289+
private void handleClose(Exception e) {
281290
if (isClosing.compareAndSet(false, true)) {
282291
try {
283292
boolean block = lifecycle.stopped() && Transports.isTransportThread(Thread.currentThread()) == false;
284293
CloseableChannel.closeChannels(channels, block);
285294
} finally {
286295
// Call the super method to trigger listeners
287-
super.close();
296+
if (e == null) {
297+
super.close();
298+
} else {
299+
super.closeAndFail(e);
300+
}
288301
}
289302
}
290303
}
@@ -760,6 +773,7 @@ static void handleException(TcpChannel channel, Exception e, Lifecycle lifecycle
760773
}
761774
} finally {
762775
if (closeChannel) {
776+
channel.setCloseException(e);
763777
CloseableChannel.closeChannel(channel);
764778
}
765779
}
@@ -1120,7 +1134,17 @@ public void onResponse(Void v) {
11201134
nodeChannels.channels.forEach(ch -> {
11211135
// Mark the channel init time
11221136
ch.getChannelStats().markAccessed(relativeMillisTime);
1123-
ch.addCloseListener(ActionListener.running(nodeChannels::close));
1137+
ch.addCloseListener(new ActionListener<Void>() {
1138+
@Override
1139+
public void onResponse(Void ignored) {
1140+
nodeChannels.close();
1141+
}
1142+
1143+
@Override
1144+
public void onFailure(Exception e) {
1145+
nodeChannels.closeAndFail(e);
1146+
}
1147+
});
11241148
});
11251149
keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile);
11261150
nodeChannels.addCloseListener(new ChannelCloseLogger(node, connectionId, relativeMillisTime));
@@ -1181,7 +1205,16 @@ public void onResponse(Void ignored) {
11811205

11821206
@Override
11831207
public void onFailure(Exception e) {
1184-
assert false : e; // never called
1208+
long closeTimeMillis = threadPool.relativeTimeInMillis();
1209+
logger.debug(
1210+
() -> format(
1211+
"closed transport connection [%d] to [%s] with age [%dms], exception:",
1212+
connectionId,
1213+
node,
1214+
closeTimeMillis - openTimeMillis
1215+
),
1216+
e
1217+
);
11851218
}
11861219
}
11871220

0 commit comments

Comments
 (0)