Skip to content

[FLINK-34088] [hotfix] fix the problems with special table name characters of postgres and oracle  and sqlserver. #89

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
import org.apache.flink.connector.jdbc.utils.JdbcUtils;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;

Expand Down Expand Up @@ -68,7 +69,7 @@ public String dialectName() {

@Override
public String quoteIdentifier(String identifier) {
return identifier;
return JdbcUtils.handleDoubleQuotes(identifier);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.jdbc.dialect.AbstractPostgresCompatibleDialect;
import org.apache.flink.connector.jdbc.utils.JdbcUtils;
import org.apache.flink.table.types.logical.RowType;

import java.util.Optional;
Expand All @@ -44,4 +45,9 @@ public Optional<String> defaultDriverName() {
public String dialectName() {
return "PostgreSQL";
}

@Override
public String quoteIdentifier(String identifier) {
return JdbcUtils.handleDoubleQuotes(identifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public Optional<String> defaultDriverName() {

@Override
public String quoteIdentifier(String identifier) {
return identifier;
return "[" + identifier + "]";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,14 @@ public static Row getPrimaryKey(Row row, int[] pkFields) {
}
return pkRow;
}

public static String handleDoubleQuotes(String identifier) {
String[] split = identifier.split("\\.");
StringBuilder builder = new StringBuilder();
for (String s : split) {
builder.append("\"").append(s).append("\"");
builder.append(".");
}
return builder.deleteCharAt(builder.length() - 1).toString();
}
Comment on lines +172 to +180
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get why we are going to put logic with double quotes into some generic place if for different engines there could be different quotes?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because oracle and postgres have the same escape characters, It's all double quotes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this package is not only for oracle and postgres. E.g. MS SQL Server, MySQL are different

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think that the location of this method is not appropriate, I think it is OK, the specific escape characters of the data source in their own package implementation, there are public features to extract and did not find a better location

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ void testInsertStatement() {
String insertStmt = dialect.getInsertIntoStatement(tableName, fieldNames);
assertThat(insertStmt)
.isEqualTo(
"INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) "
"INSERT INTO \"tbl\"(\"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\") "
+ "VALUES (:id, :name, :email, :ts, :field1, :field_2, :__field_3__)");
NamedStatementMatcher.parsedSql(
"INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) "
"INSERT INTO \"tbl\"(\"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\") "
Comment on lines +51 to +54
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In issue description it was only something about table name
why does it impact columns?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the method ‘quoteIdentifier’ is common,column metadata is also by call the method.Columns should have this problem as well

Copy link
Contributor

@snuyanzin snuyanzin Jan 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have a failing test reproducing the problem?

+ "VALUES (?, ?, ?, ?, ?, ?, ?)")
.parameter("id", singletonList(1))
.parameter("name", singletonList(2))
Expand All @@ -67,8 +67,10 @@ void testInsertStatement() {
void testDeleteStatement() {
String deleteStmt = dialect.getDeleteStatement(tableName, keyFields);
assertThat(deleteStmt)
.isEqualTo("DELETE FROM tbl WHERE id = :id AND __field_3__ = :__field_3__");
NamedStatementMatcher.parsedSql("DELETE FROM tbl WHERE id = ? AND __field_3__ = ?")
.isEqualTo(
"DELETE FROM \"tbl\" WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__");
NamedStatementMatcher.parsedSql(
"DELETE FROM \"tbl\" WHERE \"id\" = ? AND \"__field_3__\" = ?")
.parameter("id", singletonList(1))
.parameter("__field_3__", singletonList(2))
.matches(deleteStmt);
Expand All @@ -78,8 +80,10 @@ void testDeleteStatement() {
void testRowExistsStatement() {
String rowExistStmt = dialect.getRowExistsStatement(tableName, keyFields);
assertThat(rowExistStmt)
.isEqualTo("SELECT 1 FROM tbl WHERE id = :id AND __field_3__ = :__field_3__");
NamedStatementMatcher.parsedSql("SELECT 1 FROM tbl WHERE id = ? AND __field_3__ = ?")
.isEqualTo(
"SELECT 1 FROM \"tbl\" WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__");
NamedStatementMatcher.parsedSql(
"SELECT 1 FROM \"tbl\" WHERE \"id\" = ? AND \"__field_3__\" = ?")
.parameter("id", singletonList(1))
.parameter("__field_3__", singletonList(2))
.matches(rowExistStmt);
Expand All @@ -90,12 +94,12 @@ void testUpdateStatement() {
String updateStmt = dialect.getUpdateStatement(tableName, fieldNames, keyFields);
assertThat(updateStmt)
.isEqualTo(
"UPDATE tbl SET id = :id, name = :name, email = :email, ts = :ts, "
+ "field1 = :field1, field_2 = :field_2, __field_3__ = :__field_3__ "
+ "WHERE id = :id AND __field_3__ = :__field_3__");
"UPDATE \"tbl\" SET \"id\" = :id, \"name\" = :name, \"email\" = :email, \"ts\" = :ts, "
+ "\"field1\" = :field1, \"field_2\" = :field_2, \"__field_3__\" = :__field_3__ "
+ "WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__");
NamedStatementMatcher.parsedSql(
"UPDATE tbl SET id = ?, name = ?, email = ?, ts = ?, field1 = ?, "
+ "field_2 = ?, __field_3__ = ? WHERE id = ? AND __field_3__ = ?")
"UPDATE \"tbl\" SET \"id\" = ?, \"name\" = ?, \"email\" = ?, \"ts\" = ?, \"field1\" = ?, "
+ "\"field_2\" = ?, \"__field_3__\" = ? WHERE \"id\" = ? AND \"__field_3__\" = ?")
.parameter("id", asList(1, 8))
.parameter("name", singletonList(2))
.parameter("email", singletonList(3))
Expand All @@ -112,18 +116,18 @@ void testUpsertStatement() {
assertThat(upsertStmt)
.isEqualTo(
" MERGE INTO tbl t "
+ " USING (SELECT :id id, :name name, :email email, :ts ts, :field1 field1, :field_2 field_2, :__field_3__ __field_3__ FROM DUAL) s "
+ " ON (t.id=s.id and t.__field_3__=s.__field_3__) "
+ " WHEN MATCHED THEN UPDATE SET t.name=s.name, t.email=s.email, t.ts=s.ts, t.field1=s.field1, t.field_2=s.field_2"
+ " WHEN NOT MATCHED THEN INSERT (id, name, email, ts, field1, field_2, __field_3__)"
+ " VALUES (s.id, s.name, s.email, s.ts, s.field1, s.field_2, s.__field_3__)");
+ " USING (SELECT :id \"id\", :name \"name\", :email \"email\", :ts \"ts\", :field1 \"field1\", :field_2 \"field_2\", :__field_3__ \"__field_3__\" FROM DUAL) s "
+ " ON (t.\"id\"=s.\"id\" and t.\"__field_3__\"=s.\"__field_3__\") "
+ " WHEN MATCHED THEN UPDATE SET t.\"name\"=s.\"name\", t.\"email\"=s.\"email\", t.\"ts\"=s.\"ts\", t.\"field1\"=s.\"field1\", t.\"field_2\"=s.\"field_2\""
+ " WHEN NOT MATCHED THEN INSERT (\"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\")"
+ " VALUES (s.\"id\", s.\"name\", s.\"email\", s.\"ts\", s.\"field1\", s.\"field_2\", s.\"__field_3__\")");
NamedStatementMatcher.parsedSql(
" MERGE INTO tbl t "
+ " USING (SELECT ? id, ? name, ? email, ? ts, ? field1, ? field_2, ? __field_3__ FROM DUAL) s "
+ " ON (t.id=s.id and t.__field_3__=s.__field_3__) "
+ " WHEN MATCHED THEN UPDATE SET t.name=s.name, t.email=s.email, t.ts=s.ts, t.field1=s.field1, t.field_2=s.field_2"
+ " WHEN NOT MATCHED THEN INSERT (id, name, email, ts, field1, field_2, __field_3__)"
+ " VALUES (s.id, s.name, s.email, s.ts, s.field1, s.field_2, s.__field_3__)")
+ " USING (SELECT ? \"id\", ? \"name\", ? \"email\", ? \"ts\", ? \"field1\", ? \"field_2\", ? \"__field_3__\" FROM DUAL) s "
+ " ON (t.\"id\"=s.\"id\" and t.\"__field_3__\"=s.\"__field_3__\") "
+ " WHEN MATCHED THEN UPDATE SET t.\"name\"=s.\"name\", t.\"email\"=s.\"email\", t.\"ts\"=s.\"ts\", t.\"field1\"=s.\"field1\", t.\"field_2\"=s.\"field_2\""
+ " WHEN NOT MATCHED THEN INSERT (\"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\")"
+ " VALUES (s.\"id\", s.\"name\", s.\"email\", s.\"ts\", s.\"field1\", s.\"field_2\", s.\"__field_3__\")")
.parameter("id", singletonList(1))
.parameter("name", singletonList(2))
.parameter("email", singletonList(3))
Expand All @@ -139,11 +143,11 @@ void testSelectStatement() {
String selectStmt = dialect.getSelectFromStatement(tableName, fieldNames, keyFields);
assertThat(selectStmt)
.isEqualTo(
"SELECT id, name, email, ts, field1, field_2, __field_3__ FROM tbl "
+ "WHERE id = :id AND __field_3__ = :__field_3__");
"SELECT \"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\" FROM \"tbl\" "
+ "WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__");
NamedStatementMatcher.parsedSql(
"SELECT id, name, email, ts, field1, field_2, __field_3__ FROM tbl "
+ "WHERE id = ? AND __field_3__ = ?")
"SELECT \"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\" FROM \"tbl\" "
+ "WHERE \"id\" = ? AND \"__field_3__\" = ?")
.parameter("id", singletonList(1))
.parameter("__field_3__", singletonList(2))
.matches(selectStmt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,54 +41,54 @@ void testInsertStatement() {
String insertStmt = dialect.getInsertIntoStatement(tableName, fieldNames);
assertThat(insertStmt)
.isEqualTo(
"INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) "
"INSERT INTO [tbl]([id], [name], [email], [ts], [field1], [field_2], [__field_3__]) "
+ "VALUES (:id, :name, :email, :ts, :field1, :field_2, :__field_3__)");
}

@Test
void testDeleteStatement() {
String deleteStmt = dialect.getDeleteStatement(tableName, keyFields);
assertThat(deleteStmt)
.isEqualTo("DELETE FROM tbl WHERE id = :id AND __field_3__ = :__field_3__");
.isEqualTo("DELETE FROM [tbl] WHERE [id] = :id AND [__field_3__] = :__field_3__");
}

@Test
void testRowExistsStatement() {
String rowExistStmt = dialect.getRowExistsStatement(tableName, keyFields);
assertThat(rowExistStmt)
.isEqualTo("SELECT 1 FROM tbl WHERE id = :id AND __field_3__ = :__field_3__");
.isEqualTo("SELECT 1 FROM [tbl] WHERE [id] = :id AND [__field_3__] = :__field_3__");
}

@Test
void testUpdateStatement() {
String updateStmt = dialect.getUpdateStatement(tableName, fieldNames, keyFields);
assertThat(updateStmt)
.isEqualTo(
"UPDATE tbl SET id = :id, name = :name, email = :email, ts = :ts, "
+ "field1 = :field1, field_2 = :field_2, __field_3__ = :__field_3__ "
+ "WHERE id = :id AND __field_3__ = :__field_3__");
"UPDATE [tbl] SET [id] = :id, [name] = :name, [email] = :email, [ts] = :ts, "
+ "[field1] = :field1, [field_2] = :field_2, [__field_3__] = :__field_3__ "
+ "WHERE [id] = :id AND [__field_3__] = :__field_3__");
}

@Test
void testUpsertStatement() {
String upsertStmt = dialect.getUpsertStatement(tableName, fieldNames, keyFields).get();
assertThat(upsertStmt)
.isEqualTo(
"MERGE INTO tbl AS [TARGET]"
+ " USING (SELECT :id id, :name name, :email email, :ts ts, :field1 field1, :field_2 field_2, :__field_3__ __field_3__) AS [SOURCE]"
+ " ON ([TARGET].id=[SOURCE].id AND [TARGET].__field_3__=[SOURCE].__field_3__)"
+ " WHEN MATCHED THEN UPDATE SET [TARGET].name=[SOURCE].name, [TARGET].email=[SOURCE].email,"
+ " [TARGET].ts=[SOURCE].ts, [TARGET].field1=[SOURCE].field1, [TARGET].field_2=[SOURCE].field_2"
+ " WHEN NOT MATCHED THEN INSERT (id, name, email, ts, field1, field_2, __field_3__)"
+ " VALUES ([SOURCE].id, [SOURCE].name, [SOURCE].email, [SOURCE].ts, [SOURCE].field1, [SOURCE].field_2, [SOURCE].__field_3__);");
"MERGE INTO [tbl] AS [TARGET]"
+ " USING (SELECT :id [id], :name [name], :email [email], :ts [ts], :field1 [field1], :field_2 [field_2], :__field_3__ [__field_3__]) AS [SOURCE]"
+ " ON ([TARGET].[id]=[SOURCE].[id] AND [TARGET].[__field_3__]=[SOURCE].[__field_3__])"
+ " WHEN MATCHED THEN UPDATE SET [TARGET].[name]=[SOURCE].[name], [TARGET].[email]=[SOURCE].[email],"
+ " [TARGET].[ts]=[SOURCE].[ts], [TARGET].[field1]=[SOURCE].[field1], [TARGET].[field_2]=[SOURCE].[field_2]"
+ " WHEN NOT MATCHED THEN INSERT ([id], [name], [email], [ts], [field1], [field_2], [__field_3__])"
+ " VALUES ([SOURCE].[id], [SOURCE].[name], [SOURCE].[email], [SOURCE].[ts], [SOURCE].[field1], [SOURCE].[field_2], [SOURCE].[__field_3__]);");
}

@Test
void testSelectStatement() {
String selectStmt = dialect.getSelectFromStatement(tableName, fieldNames, keyFields);
assertThat(selectStmt)
.isEqualTo(
"SELECT id, name, email, ts, field1, field_2, __field_3__ FROM tbl "
+ "WHERE id = :id AND __field_3__ = :__field_3__");
"SELECT [id], [name], [email], [ts], [field1], [field_2], [__field_3__] FROM [tbl] "
+ "WHERE [id] = :id AND [__field_3__] = :__field_3__");
}
}