diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialect.java index 1ed061fee..2df9e3013 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialect.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialect.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; import org.apache.flink.connector.jdbc.dialect.AbstractDialect; +import org.apache.flink.connector.jdbc.utils.JdbcUtils; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; @@ -68,7 +69,7 @@ public String dialectName() { @Override public String quoteIdentifier(String identifier) { - return identifier; + return JdbcUtils.handleDoubleQuotes(identifier); } @Override diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java index d0924aee8..6f58e9a6f 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.jdbc.dialect.AbstractPostgresCompatibleDialect; +import org.apache.flink.connector.jdbc.utils.JdbcUtils; import org.apache.flink.table.types.logical.RowType; import java.util.Optional; @@ -44,4 +45,9 @@ public Optional defaultDriverName() { public String dialectName() { return "PostgreSQL"; } + + @Override + public String quoteIdentifier(String identifier) { + return JdbcUtils.handleDoubleQuotes(identifier); + } } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialect.java index 91469a6b3..adac1c03f 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialect.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialect.java @@ -64,7 +64,7 @@ public Optional defaultDriverName() { @Override public String quoteIdentifier(String identifier) { - return identifier; + return "[" + identifier + "]"; } @Override diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcUtils.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcUtils.java index 501a7a631..b0405235c 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcUtils.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcUtils.java @@ -168,4 +168,14 @@ public static Row getPrimaryKey(Row row, int[] pkFields) { } return pkRow; } + + public static String handleDoubleQuotes(String identifier) { + String[] split = identifier.split("\\."); + StringBuilder builder = new StringBuilder(); + for (String s : split) { + builder.append("\"").append(s).append("\""); + builder.append("."); + } + return builder.deleteCharAt(builder.length() - 1).toString(); + } } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OraclePreparedStatementTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OraclePreparedStatementTest.java index f69f6a334..3ff760eec 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OraclePreparedStatementTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OraclePreparedStatementTest.java @@ -48,10 +48,10 @@ void testInsertStatement() { String insertStmt = dialect.getInsertIntoStatement(tableName, fieldNames); assertThat(insertStmt) .isEqualTo( - "INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) " + "INSERT INTO \"tbl\"(\"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\") " + "VALUES (:id, :name, :email, :ts, :field1, :field_2, :__field_3__)"); NamedStatementMatcher.parsedSql( - "INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) " + "INSERT INTO \"tbl\"(\"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\") " + "VALUES (?, ?, ?, ?, ?, ?, ?)") .parameter("id", singletonList(1)) .parameter("name", singletonList(2)) @@ -67,8 +67,10 @@ void testInsertStatement() { void testDeleteStatement() { String deleteStmt = dialect.getDeleteStatement(tableName, keyFields); assertThat(deleteStmt) - .isEqualTo("DELETE FROM tbl WHERE id = :id AND __field_3__ = :__field_3__"); - NamedStatementMatcher.parsedSql("DELETE FROM tbl WHERE id = ? AND __field_3__ = ?") + .isEqualTo( + "DELETE FROM \"tbl\" WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__"); + NamedStatementMatcher.parsedSql( + "DELETE FROM \"tbl\" WHERE \"id\" = ? AND \"__field_3__\" = ?") .parameter("id", singletonList(1)) .parameter("__field_3__", singletonList(2)) .matches(deleteStmt); @@ -78,8 +80,10 @@ void testDeleteStatement() { void testRowExistsStatement() { String rowExistStmt = dialect.getRowExistsStatement(tableName, keyFields); assertThat(rowExistStmt) - .isEqualTo("SELECT 1 FROM tbl WHERE id = :id AND __field_3__ = :__field_3__"); - NamedStatementMatcher.parsedSql("SELECT 1 FROM tbl WHERE id = ? AND __field_3__ = ?") + .isEqualTo( + "SELECT 1 FROM \"tbl\" WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__"); + NamedStatementMatcher.parsedSql( + "SELECT 1 FROM \"tbl\" WHERE \"id\" = ? AND \"__field_3__\" = ?") .parameter("id", singletonList(1)) .parameter("__field_3__", singletonList(2)) .matches(rowExistStmt); @@ -90,12 +94,12 @@ void testUpdateStatement() { String updateStmt = dialect.getUpdateStatement(tableName, fieldNames, keyFields); assertThat(updateStmt) .isEqualTo( - "UPDATE tbl SET id = :id, name = :name, email = :email, ts = :ts, " - + "field1 = :field1, field_2 = :field_2, __field_3__ = :__field_3__ " - + "WHERE id = :id AND __field_3__ = :__field_3__"); + "UPDATE \"tbl\" SET \"id\" = :id, \"name\" = :name, \"email\" = :email, \"ts\" = :ts, " + + "\"field1\" = :field1, \"field_2\" = :field_2, \"__field_3__\" = :__field_3__ " + + "WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__"); NamedStatementMatcher.parsedSql( - "UPDATE tbl SET id = ?, name = ?, email = ?, ts = ?, field1 = ?, " - + "field_2 = ?, __field_3__ = ? WHERE id = ? AND __field_3__ = ?") + "UPDATE \"tbl\" SET \"id\" = ?, \"name\" = ?, \"email\" = ?, \"ts\" = ?, \"field1\" = ?, " + + "\"field_2\" = ?, \"__field_3__\" = ? WHERE \"id\" = ? AND \"__field_3__\" = ?") .parameter("id", asList(1, 8)) .parameter("name", singletonList(2)) .parameter("email", singletonList(3)) @@ -112,18 +116,18 @@ void testUpsertStatement() { assertThat(upsertStmt) .isEqualTo( " MERGE INTO tbl t " - + " USING (SELECT :id id, :name name, :email email, :ts ts, :field1 field1, :field_2 field_2, :__field_3__ __field_3__ FROM DUAL) s " - + " ON (t.id=s.id and t.__field_3__=s.__field_3__) " - + " WHEN MATCHED THEN UPDATE SET t.name=s.name, t.email=s.email, t.ts=s.ts, t.field1=s.field1, t.field_2=s.field_2" - + " WHEN NOT MATCHED THEN INSERT (id, name, email, ts, field1, field_2, __field_3__)" - + " VALUES (s.id, s.name, s.email, s.ts, s.field1, s.field_2, s.__field_3__)"); + + " USING (SELECT :id \"id\", :name \"name\", :email \"email\", :ts \"ts\", :field1 \"field1\", :field_2 \"field_2\", :__field_3__ \"__field_3__\" FROM DUAL) s " + + " ON (t.\"id\"=s.\"id\" and t.\"__field_3__\"=s.\"__field_3__\") " + + " WHEN MATCHED THEN UPDATE SET t.\"name\"=s.\"name\", t.\"email\"=s.\"email\", t.\"ts\"=s.\"ts\", t.\"field1\"=s.\"field1\", t.\"field_2\"=s.\"field_2\"" + + " WHEN NOT MATCHED THEN INSERT (\"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\")" + + " VALUES (s.\"id\", s.\"name\", s.\"email\", s.\"ts\", s.\"field1\", s.\"field_2\", s.\"__field_3__\")"); NamedStatementMatcher.parsedSql( " MERGE INTO tbl t " - + " USING (SELECT ? id, ? name, ? email, ? ts, ? field1, ? field_2, ? __field_3__ FROM DUAL) s " - + " ON (t.id=s.id and t.__field_3__=s.__field_3__) " - + " WHEN MATCHED THEN UPDATE SET t.name=s.name, t.email=s.email, t.ts=s.ts, t.field1=s.field1, t.field_2=s.field_2" - + " WHEN NOT MATCHED THEN INSERT (id, name, email, ts, field1, field_2, __field_3__)" - + " VALUES (s.id, s.name, s.email, s.ts, s.field1, s.field_2, s.__field_3__)") + + " USING (SELECT ? \"id\", ? \"name\", ? \"email\", ? \"ts\", ? \"field1\", ? \"field_2\", ? \"__field_3__\" FROM DUAL) s " + + " ON (t.\"id\"=s.\"id\" and t.\"__field_3__\"=s.\"__field_3__\") " + + " WHEN MATCHED THEN UPDATE SET t.\"name\"=s.\"name\", t.\"email\"=s.\"email\", t.\"ts\"=s.\"ts\", t.\"field1\"=s.\"field1\", t.\"field_2\"=s.\"field_2\"" + + " WHEN NOT MATCHED THEN INSERT (\"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\")" + + " VALUES (s.\"id\", s.\"name\", s.\"email\", s.\"ts\", s.\"field1\", s.\"field_2\", s.\"__field_3__\")") .parameter("id", singletonList(1)) .parameter("name", singletonList(2)) .parameter("email", singletonList(3)) @@ -139,11 +143,11 @@ void testSelectStatement() { String selectStmt = dialect.getSelectFromStatement(tableName, fieldNames, keyFields); assertThat(selectStmt) .isEqualTo( - "SELECT id, name, email, ts, field1, field_2, __field_3__ FROM tbl " - + "WHERE id = :id AND __field_3__ = :__field_3__"); + "SELECT \"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\" FROM \"tbl\" " + + "WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__"); NamedStatementMatcher.parsedSql( - "SELECT id, name, email, ts, field1, field_2, __field_3__ FROM tbl " - + "WHERE id = ? AND __field_3__ = ?") + "SELECT \"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\" FROM \"tbl\" " + + "WHERE \"id\" = ? AND \"__field_3__\" = ?") .parameter("id", singletonList(1)) .parameter("__field_3__", singletonList(2)) .matches(selectStmt); diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerPreparedStatementTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerPreparedStatementTest.java index 33fc3ed3b..c7a8eebfb 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerPreparedStatementTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerPreparedStatementTest.java @@ -41,7 +41,7 @@ void testInsertStatement() { String insertStmt = dialect.getInsertIntoStatement(tableName, fieldNames); assertThat(insertStmt) .isEqualTo( - "INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) " + "INSERT INTO [tbl]([id], [name], [email], [ts], [field1], [field_2], [__field_3__]) " + "VALUES (:id, :name, :email, :ts, :field1, :field_2, :__field_3__)"); } @@ -49,14 +49,14 @@ void testInsertStatement() { void testDeleteStatement() { String deleteStmt = dialect.getDeleteStatement(tableName, keyFields); assertThat(deleteStmt) - .isEqualTo("DELETE FROM tbl WHERE id = :id AND __field_3__ = :__field_3__"); + .isEqualTo("DELETE FROM [tbl] WHERE [id] = :id AND [__field_3__] = :__field_3__"); } @Test void testRowExistsStatement() { String rowExistStmt = dialect.getRowExistsStatement(tableName, keyFields); assertThat(rowExistStmt) - .isEqualTo("SELECT 1 FROM tbl WHERE id = :id AND __field_3__ = :__field_3__"); + .isEqualTo("SELECT 1 FROM [tbl] WHERE [id] = :id AND [__field_3__] = :__field_3__"); } @Test @@ -64,9 +64,9 @@ void testUpdateStatement() { String updateStmt = dialect.getUpdateStatement(tableName, fieldNames, keyFields); assertThat(updateStmt) .isEqualTo( - "UPDATE tbl SET id = :id, name = :name, email = :email, ts = :ts, " - + "field1 = :field1, field_2 = :field_2, __field_3__ = :__field_3__ " - + "WHERE id = :id AND __field_3__ = :__field_3__"); + "UPDATE [tbl] SET [id] = :id, [name] = :name, [email] = :email, [ts] = :ts, " + + "[field1] = :field1, [field_2] = :field_2, [__field_3__] = :__field_3__ " + + "WHERE [id] = :id AND [__field_3__] = :__field_3__"); } @Test @@ -74,13 +74,13 @@ void testUpsertStatement() { String upsertStmt = dialect.getUpsertStatement(tableName, fieldNames, keyFields).get(); assertThat(upsertStmt) .isEqualTo( - "MERGE INTO tbl AS [TARGET]" - + " USING (SELECT :id id, :name name, :email email, :ts ts, :field1 field1, :field_2 field_2, :__field_3__ __field_3__) AS [SOURCE]" - + " ON ([TARGET].id=[SOURCE].id AND [TARGET].__field_3__=[SOURCE].__field_3__)" - + " WHEN MATCHED THEN UPDATE SET [TARGET].name=[SOURCE].name, [TARGET].email=[SOURCE].email," - + " [TARGET].ts=[SOURCE].ts, [TARGET].field1=[SOURCE].field1, [TARGET].field_2=[SOURCE].field_2" - + " WHEN NOT MATCHED THEN INSERT (id, name, email, ts, field1, field_2, __field_3__)" - + " VALUES ([SOURCE].id, [SOURCE].name, [SOURCE].email, [SOURCE].ts, [SOURCE].field1, [SOURCE].field_2, [SOURCE].__field_3__);"); + "MERGE INTO [tbl] AS [TARGET]" + + " USING (SELECT :id [id], :name [name], :email [email], :ts [ts], :field1 [field1], :field_2 [field_2], :__field_3__ [__field_3__]) AS [SOURCE]" + + " ON ([TARGET].[id]=[SOURCE].[id] AND [TARGET].[__field_3__]=[SOURCE].[__field_3__])" + + " WHEN MATCHED THEN UPDATE SET [TARGET].[name]=[SOURCE].[name], [TARGET].[email]=[SOURCE].[email]," + + " [TARGET].[ts]=[SOURCE].[ts], [TARGET].[field1]=[SOURCE].[field1], [TARGET].[field_2]=[SOURCE].[field_2]" + + " WHEN NOT MATCHED THEN INSERT ([id], [name], [email], [ts], [field1], [field_2], [__field_3__])" + + " VALUES ([SOURCE].[id], [SOURCE].[name], [SOURCE].[email], [SOURCE].[ts], [SOURCE].[field1], [SOURCE].[field_2], [SOURCE].[__field_3__]);"); } @Test @@ -88,7 +88,7 @@ void testSelectStatement() { String selectStmt = dialect.getSelectFromStatement(tableName, fieldNames, keyFields); assertThat(selectStmt) .isEqualTo( - "SELECT id, name, email, ts, field1, field_2, __field_3__ FROM tbl " - + "WHERE id = :id AND __field_3__ = :__field_3__"); + "SELECT [id], [name], [email], [ts], [field1], [field_2], [__field_3__] FROM [tbl] " + + "WHERE [id] = :id AND [__field_3__] = :__field_3__"); } }