diff --git a/docs/changelog/132243.yaml b/docs/changelog/132243.yaml new file mode 100644 index 0000000000000..9b83f92532c57 --- /dev/null +++ b/docs/changelog/132243.yaml @@ -0,0 +1,5 @@ +pr: 132243 +summary: Fix `NullPointerException` in transport trace logger +area: Network +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/transport/TransportLogger.java b/server/src/main/java/org/elasticsearch/transport/TransportLogger.java index 6cad97acab7ba..17338289a15fa 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportLogger.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportLogger.java @@ -13,7 +13,6 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.IOUtils; @@ -30,7 +29,7 @@ static void logInboundMessage(TcpChannel channel, BytesReference message) { try { String logMessage = format(channel, message, "READ"); logger.trace(logMessage); - } catch (IOException e) { + } catch (Exception e) { logger.warn("an exception occurred formatting a READ trace message", e); } } @@ -41,7 +40,7 @@ static void logInboundMessage(TcpChannel channel, InboundMessage message) { try { String logMessage = format(channel, message, "READ"); logger.trace(logMessage); - } catch (IOException e) { + } catch (Exception e) { logger.warn("an exception occurred formatting a READ trace message", e); } } @@ -57,7 +56,7 @@ static void logOutboundMessage(TcpChannel channel, BytesReference message) { BytesReference withoutHeader = message.slice(HEADER_SIZE, message.length() - HEADER_SIZE); String logMessage = format(channel, withoutHeader, "WRITE"); logger.trace(logMessage); - } catch (IOException e) { + } catch (Exception e) { logger.warn("an exception occurred formatting a WRITE trace message", e); } } @@ -111,55 +110,32 @@ private static String format(TcpChannel channel, BytesReference message, String return sb.toString(); } - private static String format(TcpChannel channel, InboundMessage message, String event) throws IOException { + private static String format(TcpChannel channel, InboundMessage message, String event) { final StringBuilder sb = new StringBuilder(); sb.append(channel); if (message.isPing()) { sb.append(" [ping]").append(' ').append(event).append(": ").append(6).append('B'); } else { - boolean success = false; Header header = message.getHeader(); int networkMessageSize = header.getNetworkMessageSize(); int messageLengthWithHeader = HEADER_SIZE + networkMessageSize; - StreamInput streamInput = message.openOrGetStreamInput(); - try { - final long requestId = header.getRequestId(); - final boolean isRequest = header.isRequest(); - final String type = isRequest ? "request" : "response"; - final String version = header.getVersion().toString(); - sb.append(" [length: ").append(messageLengthWithHeader); - sb.append(", request id: ").append(requestId); - sb.append(", type: ").append(type); - sb.append(", version: ").append(version); + final long requestId = header.getRequestId(); + final boolean isRequest = header.isRequest(); + final String type = isRequest ? "request" : "response"; + final String version = header.getVersion().toString(); + sb.append(" [length: ").append(messageLengthWithHeader); + sb.append(", request id: ").append(requestId); + sb.append(", type: ").append(type); + sb.append(", version: ").append(version); - // TODO: Maybe Fix for BWC - if (header.needsToReadVariableHeader() == false && isRequest) { - sb.append(", action: ").append(header.getActionName()); - } - sb.append(']'); - sb.append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B'); - success = true; - } finally { - if (success) { - IOUtils.close(streamInput); - } else { - IOUtils.closeWhileHandlingException(streamInput); - } + // TODO: Maybe Fix for BWC + if (header.needsToReadVariableHeader() == false && isRequest) { + sb.append(", action: ").append(header.getActionName()); } + sb.append(']'); + sb.append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B'); } return sb.toString(); } - - private static StreamInput decompressingStream(byte status, StreamInput streamInput) throws IOException { - if (TransportStatus.isCompress(status) && streamInput.available() > 0) { - try { - return CompressorFactory.COMPRESSOR.threadLocalStreamInput(streamInput); - } catch (IllegalArgumentException e) { - throw new IllegalStateException("stream marked as compressed, but is missing deflate header"); - } - } else { - return streamInput; - } - } } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java index a361f08009955..55bd3c805c7c8 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java @@ -66,6 +66,28 @@ public void testLoggingHandler() throws IOException { } } + public void testLoggingHandlerWithExceptionMessage() { + final String readPattern = ".*\\[length: \\d+" + ", request id: \\d+" + ", type: request" + ", version: .*" + " READ: \\d+B"; + + final MockLog.LoggingExpectation readExpectation = new MockLog.PatternSeenEventExpectation( + "spatial stats request", + TransportLogger.class.getCanonicalName(), + Level.TRACE, + readPattern + ); + + InboundMessage inboundMessage = new InboundMessage( + new Header(0, 0, TransportStatus.setRequest((byte) 0), TransportVersion.current()), + new ActionNotFoundTransportException("cluster:monitor/xpack/spatial/stats") + ); + + try (var mockLog = MockLog.capture(TransportLogger.class)) { + mockLog.addExpectation(readExpectation); + TransportLogger.logInboundMessage(mock(TcpChannel.class), inboundMessage); + mockLog.assertAllExpectationsMatched(); + } + } + private BytesReference buildRequest() throws IOException { BytesRefRecycler recycler = new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE); Compression.Scheme compress = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4, null);