|
23 | 23 | import org.apache.flink.api.java.tuple.Tuple2;
|
24 | 24 | import org.apache.flink.connector.jdbc.JdbcDataTestBase;
|
25 | 25 | import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
|
| 26 | +import org.apache.flink.connector.jdbc.JdbcRowOutputFormat; |
26 | 27 | import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
|
27 | 28 | import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
|
28 | 29 | import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
|
|
34 | 35 | import org.junit.jupiter.api.Test;
|
35 | 36 | import org.mockito.Mockito;
|
36 | 37 |
|
| 38 | +import java.io.IOException; |
37 | 39 | import java.sql.Connection;
|
38 | 40 | import java.sql.DriverManager;
|
39 | 41 | import java.sql.PreparedStatement;
|
|
44 | 46 | import java.util.Arrays;
|
45 | 47 | import java.util.List;
|
46 | 48 |
|
| 49 | +import static org.apache.flink.connector.jdbc.JdbcTestFixture.INSERT_TEMPLATE; |
47 | 50 | import static org.apache.flink.connector.jdbc.JdbcTestFixture.OUTPUT_TABLE;
|
| 51 | +import static org.apache.flink.connector.jdbc.JdbcTestFixture.OUTPUT_TABLE_4; |
48 | 52 | import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
|
49 | 53 | import static org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry;
|
50 | 54 | import static org.assertj.core.api.Assertions.assertThat;
|
@@ -221,6 +225,30 @@ void testJdbcOutputFormat() throws Exception {
|
221 | 225 | check(expected);
|
222 | 226 | }
|
223 | 227 |
|
| 228 | + @Test |
| 229 | + public void testExceptionOnFlush() { |
| 230 | + JdbcRowOutputFormat jdbcOutputFormat = |
| 231 | + JdbcRowOutputFormat.buildJdbcOutputFormat() |
| 232 | + .setDrivername(getMetadata().getDriverClass()) |
| 233 | + .setDBUrl(getMetadata().getJdbcUrl()) |
| 234 | + .setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE_4)) |
| 235 | + .setBatchSize(2) |
| 236 | + .finish(); |
| 237 | + setRuntimeContext(jdbcOutputFormat, true); |
| 238 | + try { |
| 239 | + jdbcOutputFormat.open(0, 1); |
| 240 | + |
| 241 | + jdbcOutputFormat.writeRecord(toRow(TEST_DATA[1])); |
| 242 | + jdbcOutputFormat.writeRecord(toRow(TEST_DATA[1])); |
| 243 | + } catch (IOException e) { |
| 244 | + try { |
| 245 | + jdbcOutputFormat.close(); |
| 246 | + } catch (Exception e1) { |
| 247 | + assertThat(jdbcOutputFormat.getConnection()).isEqualTo(null); |
| 248 | + } |
| 249 | + } |
| 250 | + } |
| 251 | + |
224 | 252 | private void check(Row[] rows) throws SQLException {
|
225 | 253 | check(rows, getMetadata().getJdbcUrl(), OUTPUT_TABLE, fieldNames);
|
226 | 254 | }
|
|
0 commit comments