From 34056dcb838e0d3334789d47a7d18a6479091074 Mon Sep 17 00:00:00 2001 From: shenb15 Date: Fri, 3 Nov 2023 21:32:39 +0800 Subject: [PATCH] [FLINK-30960] Fix a corner case oom error --- .../jdbc/internal/JdbcOutputFormat.java | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java index 1ca8b635a..8883ccde5 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java @@ -153,6 +153,16 @@ public void open(int taskNumber, int numTasks) throws IOException { if (!closed) { try { flush(); + } catch (FlushingRuntimeException e) { + /* + * We ignore this FlushingRuntimeException, as it is + * only thrown when flushingException was assigned to, + * in a former run of this scheduler thread, in the next + * catch clause. In that case, we already have + * flushException cached, waiting for the next task + * manager thread's flush call which would throw a new + * FlushingRuntimeException causing job failure. + */ } catch (Exception e) { flushException = e; } @@ -178,7 +188,7 @@ private JdbcExec createAndOpenStatementExecutor( private void checkFlushException() { if (flushException != null) { - throw new RuntimeException("Writing records to JDBC failed.", flushException); + throw new FlushingRuntimeException("Writing records to JDBC failed.", flushException); } } @@ -415,4 +425,12 @@ public JdbcExecutionOptions getExecutionOptions() { public Connection getConnection() { return connectionProvider.getConnection(); } + + static class FlushingRuntimeException extends RuntimeException { + private static final long serialVersionUID = -8923632392030344592L; + + FlushingRuntimeException(String msg, Exception cause) { + super(msg, cause); + } + } }