diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java index 1ca8b635a..49805fff6 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java @@ -249,33 +249,36 @@ protected void attemptFlush() throws SQLException { /** Executes prepared statement and closes all resources of this instance. */ @Override public synchronized void close() { - if (!closed) { - closed = true; + try { + if (!closed) { + closed = true; - if (this.scheduledFuture != null) { - scheduledFuture.cancel(false); - this.scheduler.shutdown(); - } + if (this.scheduledFuture != null) { + scheduledFuture.cancel(false); + this.scheduler.shutdown(); + } - if (batchCount > 0) { - try { - flush(); - } catch (Exception e) { - LOG.warn("Writing records to JDBC failed.", e); - throw new RuntimeException("Writing records to JDBC failed.", e); + if (batchCount > 0) { + try { + flush(); + } catch (Exception e) { + LOG.warn("Writing records to JDBC failed.", e); + throw new RuntimeException("Writing records to JDBC failed.", e); + } } - } - try { - if (jdbcStatementExecutor != null) { - jdbcStatementExecutor.closeStatements(); + try { + if (jdbcStatementExecutor != null) { + jdbcStatementExecutor.closeStatements(); + } + } catch (SQLException e) { + LOG.warn("Close JDBC writer failed.", e); } - } catch (SQLException e) { - LOG.warn("Close JDBC writer failed.", e); } + } finally { + connectionProvider.closeConnection(); + checkFlushException(); } - connectionProvider.closeConnection(); - checkFlushException(); } public static Builder builder() {