From b13a3b991d93c542f532493e96d30cc307feadd0 Mon Sep 17 00:00:00 2001 From: "hezijiang@deepexi.com" Date: Tue, 16 Jan 2024 14:12:32 +0800 Subject: [PATCH 1/4] [hotfix] fix the problems with special table name characters of postgres and oracle and sqlserver. --- .../jdbc/databases/oracle/dialect/OracleDialect.java | 3 ++- .../databases/postgres/dialect/PostgresDialect.java | 6 ++++++ .../databases/sqlserver/dialect/SqlServerDialect.java | 2 +- .../apache/flink/connector/jdbc/utils/JdbcUtils.java | 10 ++++++++++ 4 files changed, 19 insertions(+), 2 deletions(-) diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialect.java index 1ed061fee..2df9e3013 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialect.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialect.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; import org.apache.flink.connector.jdbc.dialect.AbstractDialect; +import org.apache.flink.connector.jdbc.utils.JdbcUtils; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; @@ -68,7 +69,7 @@ public String dialectName() { @Override public String quoteIdentifier(String identifier) { - return identifier; + return JdbcUtils.handleDoubleQuotes(identifier); } @Override diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java index d0924aee8..6f58e9a6f 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.jdbc.dialect.AbstractPostgresCompatibleDialect; +import org.apache.flink.connector.jdbc.utils.JdbcUtils; import org.apache.flink.table.types.logical.RowType; import java.util.Optional; @@ -44,4 +45,9 @@ public Optional defaultDriverName() { public String dialectName() { return "PostgreSQL"; } + + @Override + public String quoteIdentifier(String identifier) { + return JdbcUtils.handleDoubleQuotes(identifier); + } } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialect.java index 91469a6b3..adac1c03f 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialect.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialect.java @@ -64,7 +64,7 @@ public Optional defaultDriverName() { @Override public String quoteIdentifier(String identifier) { - return identifier; + return "[" + identifier + "]"; } @Override diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcUtils.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcUtils.java index 501a7a631..b0405235c 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcUtils.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcUtils.java @@ -168,4 +168,14 @@ public static Row getPrimaryKey(Row row, int[] pkFields) { } return pkRow; } + + public static String handleDoubleQuotes(String identifier) { + String[] split = identifier.split("\\."); + StringBuilder builder = new StringBuilder(); + for (String s : split) { + builder.append("\"").append(s).append("\""); + builder.append("."); + } + return builder.deleteCharAt(builder.length() - 1).toString(); + } } From f35971ec19d861b95311b540f0663d706da17587 Mon Sep 17 00:00:00 2001 From: "hezijiang@deepexi.com" Date: Tue, 16 Jan 2024 20:35:56 +0800 Subject: [PATCH 2/4] [hotfix] fix the problems with special table name characters of postgres and oracle and sqlserver. update test case. --- .../dialect/OraclePreparedStatementTest.java | 54 ++++++++++--------- .../SqlServerPreparedStatementTest.java | 30 +++++------ 2 files changed, 44 insertions(+), 40 deletions(-) diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OraclePreparedStatementTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OraclePreparedStatementTest.java index f69f6a334..3ff760eec 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OraclePreparedStatementTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OraclePreparedStatementTest.java @@ -48,10 +48,10 @@ void testInsertStatement() { String insertStmt = dialect.getInsertIntoStatement(tableName, fieldNames); assertThat(insertStmt) .isEqualTo( - "INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) " + "INSERT INTO \"tbl\"(\"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\") " + "VALUES (:id, :name, :email, :ts, :field1, :field_2, :__field_3__)"); NamedStatementMatcher.parsedSql( - "INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) " + "INSERT INTO \"tbl\"(\"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\") " + "VALUES (?, ?, ?, ?, ?, ?, ?)") .parameter("id", singletonList(1)) .parameter("name", singletonList(2)) @@ -67,8 +67,10 @@ void testInsertStatement() { void testDeleteStatement() { String deleteStmt = dialect.getDeleteStatement(tableName, keyFields); assertThat(deleteStmt) - .isEqualTo("DELETE FROM tbl WHERE id = :id AND __field_3__ = :__field_3__"); - NamedStatementMatcher.parsedSql("DELETE FROM tbl WHERE id = ? AND __field_3__ = ?") + .isEqualTo( + "DELETE FROM \"tbl\" WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__"); + NamedStatementMatcher.parsedSql( + "DELETE FROM \"tbl\" WHERE \"id\" = ? AND \"__field_3__\" = ?") .parameter("id", singletonList(1)) .parameter("__field_3__", singletonList(2)) .matches(deleteStmt); @@ -78,8 +80,10 @@ void testDeleteStatement() { void testRowExistsStatement() { String rowExistStmt = dialect.getRowExistsStatement(tableName, keyFields); assertThat(rowExistStmt) - .isEqualTo("SELECT 1 FROM tbl WHERE id = :id AND __field_3__ = :__field_3__"); - NamedStatementMatcher.parsedSql("SELECT 1 FROM tbl WHERE id = ? AND __field_3__ = ?") + .isEqualTo( + "SELECT 1 FROM \"tbl\" WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__"); + NamedStatementMatcher.parsedSql( + "SELECT 1 FROM \"tbl\" WHERE \"id\" = ? AND \"__field_3__\" = ?") .parameter("id", singletonList(1)) .parameter("__field_3__", singletonList(2)) .matches(rowExistStmt); @@ -90,12 +94,12 @@ void testUpdateStatement() { String updateStmt = dialect.getUpdateStatement(tableName, fieldNames, keyFields); assertThat(updateStmt) .isEqualTo( - "UPDATE tbl SET id = :id, name = :name, email = :email, ts = :ts, " - + "field1 = :field1, field_2 = :field_2, __field_3__ = :__field_3__ " - + "WHERE id = :id AND __field_3__ = :__field_3__"); + "UPDATE \"tbl\" SET \"id\" = :id, \"name\" = :name, \"email\" = :email, \"ts\" = :ts, " + + "\"field1\" = :field1, \"field_2\" = :field_2, \"__field_3__\" = :__field_3__ " + + "WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__"); NamedStatementMatcher.parsedSql( - "UPDATE tbl SET id = ?, name = ?, email = ?, ts = ?, field1 = ?, " - + "field_2 = ?, __field_3__ = ? WHERE id = ? AND __field_3__ = ?") + "UPDATE \"tbl\" SET \"id\" = ?, \"name\" = ?, \"email\" = ?, \"ts\" = ?, \"field1\" = ?, " + + "\"field_2\" = ?, \"__field_3__\" = ? WHERE \"id\" = ? AND \"__field_3__\" = ?") .parameter("id", asList(1, 8)) .parameter("name", singletonList(2)) .parameter("email", singletonList(3)) @@ -112,18 +116,18 @@ void testUpsertStatement() { assertThat(upsertStmt) .isEqualTo( " MERGE INTO tbl t " - + " USING (SELECT :id id, :name name, :email email, :ts ts, :field1 field1, :field_2 field_2, :__field_3__ __field_3__ FROM DUAL) s " - + " ON (t.id=s.id and t.__field_3__=s.__field_3__) " - + " WHEN MATCHED THEN UPDATE SET t.name=s.name, t.email=s.email, t.ts=s.ts, t.field1=s.field1, t.field_2=s.field_2" - + " WHEN NOT MATCHED THEN INSERT (id, name, email, ts, field1, field_2, __field_3__)" - + " VALUES (s.id, s.name, s.email, s.ts, s.field1, s.field_2, s.__field_3__)"); + + " USING (SELECT :id \"id\", :name \"name\", :email \"email\", :ts \"ts\", :field1 \"field1\", :field_2 \"field_2\", :__field_3__ \"__field_3__\" FROM DUAL) s " + + " ON (t.\"id\"=s.\"id\" and t.\"__field_3__\"=s.\"__field_3__\") " + + " WHEN MATCHED THEN UPDATE SET t.\"name\"=s.\"name\", t.\"email\"=s.\"email\", t.\"ts\"=s.\"ts\", t.\"field1\"=s.\"field1\", t.\"field_2\"=s.\"field_2\"" + + " WHEN NOT MATCHED THEN INSERT (\"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\")" + + " VALUES (s.\"id\", s.\"name\", s.\"email\", s.\"ts\", s.\"field1\", s.\"field_2\", s.\"__field_3__\")"); NamedStatementMatcher.parsedSql( " MERGE INTO tbl t " - + " USING (SELECT ? id, ? name, ? email, ? ts, ? field1, ? field_2, ? __field_3__ FROM DUAL) s " - + " ON (t.id=s.id and t.__field_3__=s.__field_3__) " - + " WHEN MATCHED THEN UPDATE SET t.name=s.name, t.email=s.email, t.ts=s.ts, t.field1=s.field1, t.field_2=s.field_2" - + " WHEN NOT MATCHED THEN INSERT (id, name, email, ts, field1, field_2, __field_3__)" - + " VALUES (s.id, s.name, s.email, s.ts, s.field1, s.field_2, s.__field_3__)") + + " USING (SELECT ? \"id\", ? \"name\", ? \"email\", ? \"ts\", ? \"field1\", ? \"field_2\", ? \"__field_3__\" FROM DUAL) s " + + " ON (t.\"id\"=s.\"id\" and t.\"__field_3__\"=s.\"__field_3__\") " + + " WHEN MATCHED THEN UPDATE SET t.\"name\"=s.\"name\", t.\"email\"=s.\"email\", t.\"ts\"=s.\"ts\", t.\"field1\"=s.\"field1\", t.\"field_2\"=s.\"field_2\"" + + " WHEN NOT MATCHED THEN INSERT (\"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\")" + + " VALUES (s.\"id\", s.\"name\", s.\"email\", s.\"ts\", s.\"field1\", s.\"field_2\", s.\"__field_3__\")") .parameter("id", singletonList(1)) .parameter("name", singletonList(2)) .parameter("email", singletonList(3)) @@ -139,11 +143,11 @@ void testSelectStatement() { String selectStmt = dialect.getSelectFromStatement(tableName, fieldNames, keyFields); assertThat(selectStmt) .isEqualTo( - "SELECT id, name, email, ts, field1, field_2, __field_3__ FROM tbl " - + "WHERE id = :id AND __field_3__ = :__field_3__"); + "SELECT \"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\" FROM \"tbl\" " + + "WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__"); NamedStatementMatcher.parsedSql( - "SELECT id, name, email, ts, field1, field_2, __field_3__ FROM tbl " - + "WHERE id = ? AND __field_3__ = ?") + "SELECT \"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\" FROM \"tbl\" " + + "WHERE \"id\" = ? AND \"__field_3__\" = ?") .parameter("id", singletonList(1)) .parameter("__field_3__", singletonList(2)) .matches(selectStmt); diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerPreparedStatementTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerPreparedStatementTest.java index 33fc3ed3b..c7a8eebfb 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerPreparedStatementTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerPreparedStatementTest.java @@ -41,7 +41,7 @@ void testInsertStatement() { String insertStmt = dialect.getInsertIntoStatement(tableName, fieldNames); assertThat(insertStmt) .isEqualTo( - "INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) " + "INSERT INTO [tbl]([id], [name], [email], [ts], [field1], [field_2], [__field_3__]) " + "VALUES (:id, :name, :email, :ts, :field1, :field_2, :__field_3__)"); } @@ -49,14 +49,14 @@ void testInsertStatement() { void testDeleteStatement() { String deleteStmt = dialect.getDeleteStatement(tableName, keyFields); assertThat(deleteStmt) - .isEqualTo("DELETE FROM tbl WHERE id = :id AND __field_3__ = :__field_3__"); + .isEqualTo("DELETE FROM [tbl] WHERE [id] = :id AND [__field_3__] = :__field_3__"); } @Test void testRowExistsStatement() { String rowExistStmt = dialect.getRowExistsStatement(tableName, keyFields); assertThat(rowExistStmt) - .isEqualTo("SELECT 1 FROM tbl WHERE id = :id AND __field_3__ = :__field_3__"); + .isEqualTo("SELECT 1 FROM [tbl] WHERE [id] = :id AND [__field_3__] = :__field_3__"); } @Test @@ -64,9 +64,9 @@ void testUpdateStatement() { String updateStmt = dialect.getUpdateStatement(tableName, fieldNames, keyFields); assertThat(updateStmt) .isEqualTo( - "UPDATE tbl SET id = :id, name = :name, email = :email, ts = :ts, " - + "field1 = :field1, field_2 = :field_2, __field_3__ = :__field_3__ " - + "WHERE id = :id AND __field_3__ = :__field_3__"); + "UPDATE [tbl] SET [id] = :id, [name] = :name, [email] = :email, [ts] = :ts, " + + "[field1] = :field1, [field_2] = :field_2, [__field_3__] = :__field_3__ " + + "WHERE [id] = :id AND [__field_3__] = :__field_3__"); } @Test @@ -74,13 +74,13 @@ void testUpsertStatement() { String upsertStmt = dialect.getUpsertStatement(tableName, fieldNames, keyFields).get(); assertThat(upsertStmt) .isEqualTo( - "MERGE INTO tbl AS [TARGET]" - + " USING (SELECT :id id, :name name, :email email, :ts ts, :field1 field1, :field_2 field_2, :__field_3__ __field_3__) AS [SOURCE]" - + " ON ([TARGET].id=[SOURCE].id AND [TARGET].__field_3__=[SOURCE].__field_3__)" - + " WHEN MATCHED THEN UPDATE SET [TARGET].name=[SOURCE].name, [TARGET].email=[SOURCE].email," - + " [TARGET].ts=[SOURCE].ts, [TARGET].field1=[SOURCE].field1, [TARGET].field_2=[SOURCE].field_2" - + " WHEN NOT MATCHED THEN INSERT (id, name, email, ts, field1, field_2, __field_3__)" - + " VALUES ([SOURCE].id, [SOURCE].name, [SOURCE].email, [SOURCE].ts, [SOURCE].field1, [SOURCE].field_2, [SOURCE].__field_3__);"); + "MERGE INTO [tbl] AS [TARGET]" + + " USING (SELECT :id [id], :name [name], :email [email], :ts [ts], :field1 [field1], :field_2 [field_2], :__field_3__ [__field_3__]) AS [SOURCE]" + + " ON ([TARGET].[id]=[SOURCE].[id] AND [TARGET].[__field_3__]=[SOURCE].[__field_3__])" + + " WHEN MATCHED THEN UPDATE SET [TARGET].[name]=[SOURCE].[name], [TARGET].[email]=[SOURCE].[email]," + + " [TARGET].[ts]=[SOURCE].[ts], [TARGET].[field1]=[SOURCE].[field1], [TARGET].[field_2]=[SOURCE].[field_2]" + + " WHEN NOT MATCHED THEN INSERT ([id], [name], [email], [ts], [field1], [field_2], [__field_3__])" + + " VALUES ([SOURCE].[id], [SOURCE].[name], [SOURCE].[email], [SOURCE].[ts], [SOURCE].[field1], [SOURCE].[field_2], [SOURCE].[__field_3__]);"); } @Test @@ -88,7 +88,7 @@ void testSelectStatement() { String selectStmt = dialect.getSelectFromStatement(tableName, fieldNames, keyFields); assertThat(selectStmt) .isEqualTo( - "SELECT id, name, email, ts, field1, field_2, __field_3__ FROM tbl " - + "WHERE id = :id AND __field_3__ = :__field_3__"); + "SELECT [id], [name], [email], [ts], [field1], [field_2], [__field_3__] FROM [tbl] " + + "WHERE [id] = :id AND [__field_3__] = :__field_3__"); } } From 7272a79c75755fd51a790d06f11d13b92a9bbc57 Mon Sep 17 00:00:00 2001 From: "hezijiang@deepexi.com" Date: Wed, 17 Jan 2024 15:39:28 +0800 Subject: [PATCH 3/4] [FLINK-34128] [bugfix] Some jdbc objects cannot be obtained properly in oracle jdbc,Specify the required type explicitly --- .../connector/jdbc/converter/AbstractJdbcRowConverter.java | 3 ++- .../jdbc/converter/AbstractJdbcRowConverterTest.java | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java index 1a19d127b..830d1f944 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java @@ -74,7 +74,8 @@ public AbstractJdbcRowConverter(RowType rowType) { public RowData toInternal(ResultSet resultSet) throws SQLException { GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount()); for (int pos = 0; pos < rowType.getFieldCount(); pos++) { - Object field = resultSet.getObject(pos + 1); + Object field = + resultSet.getObject(pos + 1, rowType.getTypeAt(pos).getDefaultConversion()); genericRowData.setField(pos, toInternalConverters[pos].deserialize(field)); } return genericRowData; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java index 9f9fee1e9..f6eeff317 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java @@ -49,8 +49,8 @@ public String converterName() { }; ResultSet resultSet = Mockito.mock(ResultSet.class); - Mockito.when(resultSet.getObject(1)).thenReturn(123); - Mockito.when(resultSet.getObject(2)) + Mockito.when(resultSet.getObject(1, Integer.class)).thenReturn(123); + Mockito.when(resultSet.getObject(2, LocalDateTime.class)) .thenReturn(LocalDateTime.parse("2021-04-07T00:00:05.999")); RowData res = rowConverter.toInternal(resultSet); From b28197e963e4dbd4f42b7f6fb5954a90ab6b9303 Mon Sep 17 00:00:00 2001 From: "hezijiang@deepexi.com" Date: Wed, 17 Jan 2024 15:43:03 +0800 Subject: [PATCH 4/4] Revert "[FLINK-34128] [bugfix] Some jdbc objects cannot be obtained properly in oracle jdbc,Specify the required type explicitly" This reverts commit 7272a79c75755fd51a790d06f11d13b92a9bbc57. --- .../connector/jdbc/converter/AbstractJdbcRowConverter.java | 3 +-- .../jdbc/converter/AbstractJdbcRowConverterTest.java | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java index 830d1f944..1a19d127b 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java @@ -74,8 +74,7 @@ public AbstractJdbcRowConverter(RowType rowType) { public RowData toInternal(ResultSet resultSet) throws SQLException { GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount()); for (int pos = 0; pos < rowType.getFieldCount(); pos++) { - Object field = - resultSet.getObject(pos + 1, rowType.getTypeAt(pos).getDefaultConversion()); + Object field = resultSet.getObject(pos + 1); genericRowData.setField(pos, toInternalConverters[pos].deserialize(field)); } return genericRowData; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java index f6eeff317..9f9fee1e9 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java @@ -49,8 +49,8 @@ public String converterName() { }; ResultSet resultSet = Mockito.mock(ResultSet.class); - Mockito.when(resultSet.getObject(1, Integer.class)).thenReturn(123); - Mockito.when(resultSet.getObject(2, LocalDateTime.class)) + Mockito.when(resultSet.getObject(1)).thenReturn(123); + Mockito.when(resultSet.getObject(2)) .thenReturn(LocalDateTime.parse("2021-04-07T00:00:05.999")); RowData res = rowConverter.toInternal(resultSet);