From a6f4ebd43b540c1dfb8eacdf27a522877ae76c73 Mon Sep 17 00:00:00 2001 From: Danny Cranmer Date: Fri, 7 Jun 2024 11:26:41 +0100 Subject: [PATCH 1/5] [FLINK-35137][Connectors/JDBC] Update website data for 3.2.0 --- docs/data/jdbc.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/data/jdbc.yml b/docs/data/jdbc.yml index 8fb41cc3d..50beae5bc 100644 --- a/docs/data/jdbc.yml +++ b/docs/data/jdbc.yml @@ -16,7 +16,8 @@ # limitations under the License. ################################################################################ -version: 3.1.0-SNAPSHOT +version: 3.2.0 +flink_compatibility: [1.18, 1.19] variants: - maven: flink-connector-jdbc sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/$full_version/flink-connector-jdbc-$full_version.jar From 8b4e57e599e389059510d09f789ab805f6930825 Mon Sep 17 00:00:00 2001 From: JohnMa123 Date: Wed, 8 Jan 2025 08:41:52 +0800 Subject: [PATCH 2/5] add oracte catlog --- .../jdbc/catalog/JdbcCatalogUtils.java | 5 + .../oracle/catalog/OracleCatalog.java | 212 +++++++++++ .../oracle/catalog/OracleTypeMapper.java | 82 +++++ .../jdbc/databases/oracle/OracleTestBase.java | 6 +- .../oracle/catalog/OracleCatalogTest.java | 168 +++++++++ .../oracle/catalog/OracleCatalogTestBase.java | 348 ++++++++++++++++++ 6 files changed, 820 insertions(+), 1 deletion(-) create mode 100644 flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalog.java create mode 100644 flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleTypeMapper.java create mode 100644 flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTest.java create mode 100644 flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTestBase.java diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java index 09d4d924b..a1ef0d452 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java @@ -22,6 +22,8 @@ import org.apache.flink.connector.jdbc.databases.cratedb.dialect.CrateDBDialect; import org.apache.flink.connector.jdbc.databases.mysql.catalog.MySqlCatalog; import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect; +import org.apache.flink.connector.jdbc.databases.oracle.catalog.OracleCatalog; +import org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialect; import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresCatalog; import org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect; import org.apache.flink.connector.jdbc.dialect.JdbcDialect; @@ -61,6 +63,9 @@ public static AbstractJdbcCatalog createCatalog( } else if (dialect instanceof MySqlDialect) { return new MySqlCatalog( userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl); + } else if (dialect instanceof OracleDialect){ + return new OracleCatalog( + userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl); } else { throw new UnsupportedOperationException( String.format("Catalog for '%s' is not supported yet.", dialect)); diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalog.java new file mode 100644 index 000000000..79c84d695 --- /dev/null +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalog.java @@ -0,0 +1,212 @@ +package org.apache.flink.connector.jdbc.databases.oracle.catalog; + +import org.apache.commons.compress.utils.Lists; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog; +import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresTablePath; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.*; +import org.apache.flink.table.catalog.exceptions.*; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; +import java.util.*; +import java.util.function.Predicate; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + + +/** + * OracleCatalog 用于查表和查数据库,便于重建 + */ +public class OracleCatalog extends AbstractJdbcCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(OracleCatalog.class); + + public static final String DEFAULT_DATABASE = "helowin"; + + + public static final String IDENTIFIER = "jdbc"; + private static final String ORACLE_DRIVER = "oracle.driver.OracleDriver"; + private OracleTypeMapper dialectTypeMapper; + private static final Set builtinDatabases = new HashSet() { + { + add("SCOTT"); + add("ANONYMOUS"); + add("XS$NULL"); + add("DIP"); + add("SPATIAL_WFS_ADMIN_USR"); + add("SPATIAL_CSW_ADMIN_USR"); + add("APEX_PUBLIC_USER"); + add("ORACLE_OCM"); + add("MDDATA"); + } + }; + + public OracleCatalog(ClassLoader userClassLoader, String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) { + super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl); + String driverVersion = Preconditions.checkNotNull(getDriverVersion(), "Driver version must not be null."); + String databaseVersion = Preconditions.checkNotNull(getDatabaseVersion(), "Database version must not be null."); + LOG.info("Driver version: {}, database version: {}", driverVersion, databaseVersion); + this.dialectTypeMapper = new OracleTypeMapper(databaseVersion, driverVersion); + } + + private String getDatabaseVersion() { + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { + return conn.getMetaData().getDatabaseProductVersion(); + } catch (Exception e) { + throw new CatalogException( String.format("Failed in getting Oracle version by %s.", defaultUrl), e); + } + } + + private String getDriverVersion() { + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { + String driverVersion = conn.getMetaData().getDriverVersion(); + Pattern regexp = Pattern.compile("\\d+?\\.\\d+?\\.\\d+"); + Matcher matcher = regexp.matcher(driverVersion); + return matcher.find() ? matcher.group(0) : null; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed in getting Oracle driver version by %s.", defaultUrl), e); + + } + } + + @Override + public List listDatabases() throws CatalogException { + return extractColumnValuesBySQL(this.defaultUrl, + "select username from sys.dba_users " + + "where DEFAULT_TABLESPACE <> 'SYSTEM' and DEFAULT_TABLESPACE <> 'SYSAUX' " + + " order by username", + 1, + dbName -> !builtinDatabases.contains(dbName)); + } + + @Override + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { + Preconditions.checkState(!StringUtils.isBlank(databaseName), "Database name must not be blank"); + if (listDatabases().contains(databaseName)) { + return new CatalogDatabaseImpl(Collections.emptyMap(), null); + } else { + throw new DatabaseNotExistException(getName(), databaseName); + } + } + + + @Override + public List listTables(String databaseName) throws DatabaseNotExistException, CatalogException { + Preconditions.checkState(StringUtils.isNotBlank(databaseName), "Database name must not be blank."); + if (!databaseExists(databaseName)){// 注意这个值是 oracle 实例名称 + throw new DatabaseNotExistException(getName(), databaseName); + } + + List listDatabases = listDatabases().stream().map(username -> "'" + username + "'") + .collect(Collectors.toList()); + return extractColumnValuesBySQL(this.defaultUrl, + "SELECT OWNER||'.'||TABLE_NAME AS schemaTableName FROM sys.all_tables WHERE OWNER IN (" + String.join(",", listDatabases) + ")"+ + "ORDER BY OWNER,TABLE_NAME",1, null, null); + } + + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + + String databaseName = tablePath.getDatabaseName(); + String dbUrl = baseUrl + databaseName; + try(Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) { + DatabaseMetaData metaData = conn.getMetaData(); + Optional primaryKey = getPrimaryKey(metaData, databaseName, getSchemaName(tablePath), getTableName(tablePath)); + String statement = String.format("SELECT * FROM %s ", getSchemaTableName(tablePath)) ; + PreparedStatement ps = conn.prepareStatement(statement); + ResultSetMetaData resultSetMetaData = ps.getMetaData(); + + String[] columnNames = new String[resultSetMetaData.getColumnCount()]; + DataType[] types = new DataType[resultSetMetaData.getColumnCount()]; + + for (int i = 1; i<=resultSetMetaData.getColumnCount(); i++) { + columnNames[i - 1] = resultSetMetaData.getColumnName(i); + types[i - 1] = fromJDBCType(tablePath, resultSetMetaData,i); + if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) { + types[i-1] = types[i-1].notNull(); + } + } + + Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types); + primaryKey.ifPresent( pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns())); + Schema tableSchema = schemaBuilder.build(); + Map props = new HashMap<>(); + props.put(FactoryUtil.CONNECTOR.key(), IDENTIFIER); + props.put("username" , username); + props.put("password", pwd); + props.put("table_name", getSchemaTableName(tablePath)); + props.put("driverName", ORACLE_DRIVER); + return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props); + + } catch (Exception ex) { + throw new CatalogException(String.format("Failed getting Table %s", tablePath.getFullName()), ex); + } + } + + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + String[] schemaTableNames = getSchemaTableName(tablePath).split("\\."); + return !extractColumnValuesBySQL( + defaultUrl, "SELECT table_name FROM sys.all_tables where OWNER = ? and table_name = ?", + 1, null, schemaTableNames[0], schemaTableNames[1]) + .isEmpty(); + + } + + protected List extractColumnValuesBySQL(String connUrl, String sql, int columnIndex, Predicate filterFunc, Object... params){ + List columnValues = Lists.newArrayList(); + + try (Connection conn = DriverManager.getConnection(connUrl, username, pwd); + PreparedStatement ps = conn.prepareStatement(sql)){ + if (Objects.nonNull(params) && params.length >0){ + for (int i=0; i 0 && precision < DecimalType.MAX_PRECISION) { + return DataTypes.DECIMAL(precision, metaData.getScale(colIndex)); + } + return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18); + case Types.DATE: + return DataTypes.DATE(); + case Types.TIMESTAMP: + case Types.TIMESTAMP_WITH_TIMEZONE: + case OracleTypes.TIMESTAMPTZ: + case OracleTypes.TIMESTAMPLTZ: + return scale > 0 ? DataTypes.TIMESTAMP(scale) : DataTypes.TIMESTAMP(); + case OracleTypes.INTERVALYM: + return DataTypes.INTERVAL(DataTypes.YEAR(), DataTypes.MONTH()); + case OracleTypes.INTERVALDS: + return DataTypes.INTERVAL(DataTypes.DAY(), DataTypes.SECOND()); + case Types.BOOLEAN: + return DataTypes.BOOLEAN(); + default: + final String jdbcColumnName = metaData.getColumnName(colIndex); + throw new UnsupportedOperationException( + String.format( + "Doesn't support Oracle type '%s' on column '%s' in Oracle version %s, driver version %s yet.", + oracleType, jdbcColumnName, databaseVersion, driverVersion)); + } + } +} diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleTestBase.java index db773ede8..9cf8884cd 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleTestBase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleTestBase.java @@ -22,11 +22,15 @@ import org.apache.flink.connector.jdbc.testutils.DatabaseTest; import org.apache.flink.connector.jdbc.testutils.databases.oracle.OracleDatabase; +import org.apache.flink.connector.jdbc.testutils.databases.oracle.OracleImages; import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers /** Base class for Oracle testing. */ @ExtendWith(OracleDatabase.class) -public interface OracleTestBase extends DatabaseTest { +public interface OracleTestBase extends DatabaseTest, OracleImages { @Override default DatabaseMetadata getMetadata() { diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTest.java new file mode 100644 index 000000000..d3b1774b5 --- /dev/null +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTest.java @@ -0,0 +1,168 @@ +package org.apache.flink.connector.jdbc.databases.oracle.catalog; + +import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresTablePath; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class OracleCatalogTest extends OracleCatalogTestBase { + + @Test + void testGetDb_DatabaseNotExistException() { + assertThatThrownBy(() -> catalog.getDatabase("nonexistent")) + .isInstanceOf(DatabaseNotExistException.class) + .hasMessageContaining("Database nonexistent does not exist in Catalog"); + } + + @Test + void testListDatabases() { + List actual = catalog.listDatabases(); + + assertThat(actual).isEqualTo(Arrays.asList("postgres", "test")); + } + + @Test + void testDbExists() { + assertThat(catalog.databaseExists("nonexistent")).isFalse(); + + assertThat(catalog.databaseExists(OracleCatalog.DEFAULT_DATABASE)).isTrue(); + } + + // ------ tables ------ + + @Test + void testListTables() throws DatabaseNotExistException { + List actual = catalog.listTables(OracleCatalog.DEFAULT_DATABASE); + + assertThat(actual) + .isEqualTo( + Arrays.asList( + "public.array_table", + "public.primitive_table", + "public.primitive_table2", + "public.serial_table", + "public.t1", + "public.t4", + "public.t5")); + + actual = catalog.listTables(TEST_DB); + + assertThat(actual).isEqualTo(Arrays.asList("public.t2", "test_schema.t3")); + } + + @Test + void testListTables_DatabaseNotExistException() { + assertThatThrownBy(() -> catalog.listTables(OracleCatalog.DEFAULT_DATABASE)) + .isInstanceOf(DatabaseNotExistException.class); + } + + @Test + void testTableExists() { + assertThat(catalog.tableExists(new ObjectPath(TEST_DB, "nonexist"))).isFalse(); + + assertThat(catalog.tableExists(new ObjectPath(OracleCatalog.DEFAULT_DATABASE, TABLE1))) + .isTrue(); + assertThat(catalog.tableExists(new ObjectPath(TEST_DB, TABLE2))).isTrue(); + assertThat(catalog.tableExists(new ObjectPath(TEST_DB, "test_schema.t3"))).isTrue(); + } + + @Test + void testGetTables_TableNotExistException() { + assertThatThrownBy( + () -> + catalog.getTable( + new ObjectPath( + TEST_DB, + PostgresTablePath.toFlinkTableName( + TEST_SCHEMA, "anytable")))) + .isInstanceOf(TableNotExistException.class); + } + + @Test + void testGetTables_TableNotExistException_NoSchema() { + assertThatThrownBy( + () -> + catalog.getTable( + new ObjectPath( + TEST_DB, + PostgresTablePath.toFlinkTableName( + "nonexistschema", "anytable")))) + .isInstanceOf(TableNotExistException.class); + } + + @Test + void testGetTables_TableNotExistException_NoDb() { + assertThatThrownBy( + () -> + catalog.getTable( + new ObjectPath( + "nonexistdb", + PostgresTablePath.toFlinkTableName( + TEST_SCHEMA, "anytable")))) + .isInstanceOf(TableNotExistException.class); + } + + @Test + void testGetTable() throws org.apache.flink.table.catalog.exceptions.TableNotExistException { + // test postgres.public.user1 + Schema schema = getSimpleTable().schema; + + CatalogBaseTable table = catalog.getTable(new ObjectPath("postgres", TABLE1)); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + + table = catalog.getTable(new ObjectPath("postgres", "public.t1")); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + + // test testdb.public.user2 + table = catalog.getTable(new ObjectPath(TEST_DB, TABLE2)); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + + table = catalog.getTable(new ObjectPath(TEST_DB, "public.t2")); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + + // test testdb.testschema.user2 + table = catalog.getTable(new ObjectPath(TEST_DB, TEST_SCHEMA + ".t3")); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + } + + @Test + void testPrimitiveDataTypes() throws TableNotExistException { + CatalogBaseTable table = + catalog.getTable( + new ObjectPath(OracleCatalog.DEFAULT_DATABASE, TABLE_PRIMITIVE_TYPE)); + + assertThat(table.getUnresolvedSchema()).isEqualTo(getPrimitiveTable().schema); + } + + @Test + void testArrayDataTypes() throws TableNotExistException { + CatalogBaseTable table = + catalog.getTable( + new ObjectPath(OracleCatalog.DEFAULT_DATABASE, TABLE_ARRAY_TYPE)); + + assertThat(table.getUnresolvedSchema()).isEqualTo(getArrayTable().schema); + } + + @Test + public void testSerialDataTypes() throws TableNotExistException { + CatalogBaseTable table = + catalog.getTable( + new ObjectPath(OracleCatalog.DEFAULT_DATABASE, TABLE_SERIAL_TYPE)); + + assertThat(table.getUnresolvedSchema()).isEqualTo(getSerialTable().schema); + } +} \ No newline at end of file diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTestBase.java new file mode 100644 index 000000000..b42f9e1b7 --- /dev/null +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTestBase.java @@ -0,0 +1,348 @@ +package org.apache.flink.connector.jdbc.databases.oracle.catalog; + +import org.apache.flink.connector.jdbc.databases.oracle.OracleTestBase; +import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresTablePath; +import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase; +import org.apache.flink.connector.jdbc.testutils.databases.oracle.OracleDatabase; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.types.logical.DecimalType; +import org.junit.jupiter.api.BeforeAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +public class OracleCatalogTestBase implements JdbcITCaseBase, OracleTestBase { + + private static DatabaseMetadata getStaticMetadata() { + return OracleDatabase.getMetadata(); + } + + public static final Logger LOG = LoggerFactory.getLogger(OracleCatalogTestBase.class); + + protected static final String TEST_CATALOG_NAME = "mypg"; + protected static final String TEST_USERNAME = getStaticMetadata().getUsername(); + protected static final String TEST_PWD = getStaticMetadata().getPassword(); + protected static final String TEST_DB = "test"; + protected static final String TEST_SCHEMA = "test_schema"; + protected static final String TABLE1 = "t1"; + protected static final String TABLE2 = "t2"; + protected static final String TABLE3 = "t3"; + protected static final String TABLE4 = "t4"; + protected static final String TABLE5 = "t5"; + protected static final String TABLE_PRIMITIVE_TYPE = "primitive_table"; + protected static final String TABLE_PRIMITIVE_TYPE2 = "primitive_table2"; + protected static final String TABLE_ARRAY_TYPE = "array_table"; + protected static final String TABLE_SERIAL_TYPE = "serial_table"; + + protected static String baseUrl; + protected static OracleCatalog catalog; + + + @BeforeAll + static void init() throws SQLException { + // jdbc:oracle:thin:@//localhost:50807/helowin + String jdbcUrl = getStaticMetadata().getJdbcUrl(); + // jdbc:oracle:thin:@//localhost:50807/ + baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/")); + + catalog = + new OracleCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + OracleCatalog.DEFAULT_DATABASE, + TEST_USERNAME, + TEST_PWD, + baseUrl); + + // create test database and schema + createSchema(TEST_DB, TEST_SCHEMA); + + // create test tables + // table: helowin.public.t1 + // table: helowin.public.t4 + // table: helowin.public.t5 + createTable(PostgresTablePath.fromFlinkTableName(TABLE1), getSimpleTable().oracleSchemaSql); + createTable(PostgresTablePath.fromFlinkTableName(TABLE4), getSimpleTable().oracleSchemaSql); + createTable(PostgresTablePath.fromFlinkTableName(TABLE5), getSimpleTable().oracleSchemaSql); + + // table: test.public.t2 + // table: test.test_schema.t3 + // table: helowin.public.dt + // table: helowin.public.dt2 + createTable( + TEST_DB, + PostgresTablePath.fromFlinkTableName(TABLE2), + getSimpleTable().oracleSchemaSql); + createTable( + TEST_DB, new PostgresTablePath(TEST_SCHEMA, TABLE3), getSimpleTable().oracleSchemaSql); + createTable( + PostgresTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE), + getPrimitiveTable().oracleSchemaSql); + createTable( + PostgresTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE2), + getPrimitiveTable("test_pk2").oracleSchemaSql); + createTable( + PostgresTablePath.fromFlinkTableName(TABLE_ARRAY_TYPE), + getArrayTable().oracleSchemaSql); + createTable( + PostgresTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE), + getSerialTable().oracleSchemaSql); + + executeSQL( + OracleCatalog.DEFAULT_DATABASE, + String.format( + "insert into public.%s values (%s);", TABLE1, getSimpleTable().values)); + executeSQL( + OracleCatalog.DEFAULT_DATABASE, + String.format( + "insert into %s values (%s);", + TABLE_PRIMITIVE_TYPE, getPrimitiveTable().values)); + executeSQL( + OracleCatalog.DEFAULT_DATABASE, + String.format( + "insert into %s values (%s);", TABLE_ARRAY_TYPE, getArrayTable().values)); + executeSQL( + OracleCatalog.DEFAULT_DATABASE, + String.format( + "insert into %s values (%s);", TABLE_SERIAL_TYPE, getSerialTable().values)); + } + + public static void createTable(PostgresTablePath tablePath, String tableSchemaSql) + throws SQLException { + executeSQL( + OracleCatalog.DEFAULT_DATABASE, + String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql)); + } + + public static void createTable(String db, PostgresTablePath tablePath, String tableSchemaSql) + throws SQLException { + executeSQL( + db, String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql)); + } + + public static void createSchema(String db, String schema) throws SQLException { + executeSQL(db, String.format("CREATE SCHEMA %s", schema)); + } + + public static void createDatabase(String database) throws SQLException { + executeSQL(String.format("CREATE DATABASE %s;", database)); + } + + public static void executeSQL(String sql) throws SQLException { + executeSQL("", sql); + } + + public static void executeSQL(String db, String sql) throws SQLException { + try (Connection conn = + DriverManager.getConnection( + String.format("%s/%s", baseUrl, db), TEST_USERNAME, TEST_PWD); + Statement statement = conn.createStatement()) { + statement.executeUpdate(sql); + } catch (SQLException e) { + throw e; + } + } + + + /** Object holding schema and corresponding sql. */ + public static class TestTable { + Schema schema; + String oracleSchemaSql; + String values; + + public TestTable(Schema schema, String oracleSchemaSql, String values) { + this.schema = schema; + this.oracleSchemaSql = oracleSchemaSql; + this.values = values; + } + } + + public static OracleCatalogTestBase.TestTable getSimpleTable() { + return new OracleCatalogTestBase.TestTable( + Schema.newBuilder().column("id", DataTypes.INT()).build(), "id integer", "1"); + } + + // oracle doesn't support to use the same primary key name across different tables, + // make the table parameterized to resolve this problem. + public static OracleCatalogTestBase.TestTable getPrimitiveTable() { + return getPrimitiveTable("test_pk"); + } + + // TODO: add back timestamptz and time types. + // Flink currently doesn't support converting time's precision, with the following error + // TableException: Unsupported conversion from data type 'TIME(6)' (conversion class: + // java.sql.Time) + // to type information. Only data types that originated from type information fully support a + // reverse conversion. + public static OracleCatalogTestBase.TestTable getPrimitiveTable(String primaryKeyName) { + return new OracleCatalogTestBase.TestTable( + Schema.newBuilder() + .column("int", DataTypes.INT().notNull()) + .column("bytea", DataTypes.BYTES()) + .column("short", DataTypes.SMALLINT().notNull()) + .column("long", DataTypes.BIGINT()) + .column("real", DataTypes.FLOAT()) + .column("double_precision", DataTypes.DOUBLE()) + .column("numeric", DataTypes.DECIMAL(10, 5)) + .column("decimal", DataTypes.DECIMAL(10, 1)) + .column("boolean", DataTypes.BOOLEAN()) + .column("text", DataTypes.STRING()) + .column("char", DataTypes.CHAR(1)) + .column("character", DataTypes.CHAR(3)) + .column("character_varying", DataTypes.VARCHAR(20)) + .column("timestamp", DataTypes.TIMESTAMP(5)) + // .field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4)) + .column("date", DataTypes.DATE()) + .column("time", DataTypes.TIME(0)) + .column("default_numeric", DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)) + .primaryKeyNamed(primaryKeyName, "short", "int") + .build(), + "int integer, " + + "bytea bytea, " + + "short smallint, " + + "long bigint, " + + "real real, " + + "double_precision double precision, " + + "numeric numeric(10, 5), " + + "decimal decimal(10, 1), " + + "boolean boolean, " + + "text text, " + + "char char, " + + "character character(3), " + + "character_varying character varying(20), " + + "timestamp timestamp(5), " + + + // "timestamptz timestamptz(4), " + + "date date," + + "time time(0), " + + "default_numeric numeric, " + + "CONSTRAINT " + + primaryKeyName + + " PRIMARY KEY (short, int)", + "1," + + "'2'," + + "3," + + "4," + + "5.5," + + "6.6," + + "7.7," + + "8.8," + + "true," + + "'a'," + + "'b'," + + "'c'," + + "'d'," + + "'2016-06-22 19:10:25'," + + + // "'2006-06-22 19:10:25'," + + "'2015-01-01'," + + "'00:51:02.746572', " + + "500"); + } + + // TODO: add back timestamptz once planner supports timestamp with timezone + public static OracleCatalogTestBase.TestTable getArrayTable() { + return new OracleCatalogTestBase.TestTable( + Schema.newBuilder() + .column("int_arr", DataTypes.ARRAY(DataTypes.INT())) + .column("bytea_arr", DataTypes.ARRAY(DataTypes.BYTES())) + .column("short_arr", DataTypes.ARRAY(DataTypes.SMALLINT())) + .column("long_arr", DataTypes.ARRAY(DataTypes.BIGINT())) + .column("real_arr", DataTypes.ARRAY(DataTypes.FLOAT())) + .column("double_precision_arr", DataTypes.ARRAY(DataTypes.DOUBLE())) + .column("numeric_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 5))) + .column( + "numeric_arr_default", + DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18))) + .column("decimal_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 2))) + .column("boolean_arr", DataTypes.ARRAY(DataTypes.BOOLEAN())) + .column("text_arr", DataTypes.ARRAY(DataTypes.STRING())) + .column("char_arr", DataTypes.ARRAY(DataTypes.CHAR(1))) + .column("character_arr", DataTypes.ARRAY(DataTypes.CHAR(3))) + .column("character_varying_arr", DataTypes.ARRAY(DataTypes.VARCHAR(20))) + .column("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP(5))) + // .field("timestamptz_arr", + // DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4))) + .column("date_arr", DataTypes.ARRAY(DataTypes.DATE())) + .column("time_arr", DataTypes.ARRAY(DataTypes.TIME(0))) + .column("null_bytea_arr", DataTypes.ARRAY(DataTypes.BYTES())) + .column("null_text_arr", DataTypes.ARRAY(DataTypes.STRING())) + .build(), + "int_arr integer[], " + + "bytea_arr bytea[], " + + "short_arr smallint[], " + + "long_arr bigint[], " + + "real_arr real[], " + + "double_precision_arr double precision[], " + + "numeric_arr numeric(10, 5)[], " + + "numeric_arr_default numeric[], " + + "decimal_arr decimal(10,2)[], " + + "boolean_arr boolean[], " + + "text_arr text[], " + + "char_arr char[], " + + "character_arr character(3)[], " + + "character_varying_arr character varying(20)[], " + + "timestamp_arr timestamp(5)[], " + + + // "timestamptz_arr timestamptz(4)[], " + + "date_arr date[], " + + "time_arr time(0)[], " + + "null_bytea_arr bytea[], " + + "null_text_arr text[]", + String.format( + "'{1,2,3}'," + + "'{2,3,4}'," + + "'{3,4,5}'," + + "'{4,5,6}'," + + "'{5.5,6.6,7.7}'," + + "'{6.6,7.7,8.8}'," + + "'{7.7,8.8,9.9}'," + + "'{8.8,9.9,10.10}'," + + "'{9.9,10.10,11.11}'," + + "'{true,false,true}'," + + "'{a,b,c}'," + + "'{b,c,d}'," + + "'{b,c,d}'," + + "'{b,c,d}'," + + "'{\"2016-06-22 19:10:25\", \"2019-06-22 19:10:25\"}'," + + + // "'{\"2006-06-22 19:10:25\", \"2009-06-22 19:10:25\"}'," + + "'{\"2015-01-01\", \"2020-01-01\"}'," + + "'{\"00:51:02.746572\", \"00:59:02.746572\"}'," + + "NULL," + + "NULL")); + } + + public static OracleCatalogTestBase.TestTable getSerialTable() { + return new OracleCatalogTestBase.TestTable( + Schema.newBuilder() + // serial fields are returned as not null by ResultSetMetaData.columnNoNulls + .column("f0", DataTypes.SMALLINT().notNull()) + .column("f1", DataTypes.INT().notNull()) + .column("f2", DataTypes.SMALLINT().notNull()) + .column("f3", DataTypes.INT().notNull()) + .column("f4", DataTypes.BIGINT().notNull()) + .column("f5", DataTypes.BIGINT().notNull()) + .build(), + "f0 smallserial, " + + "f1 serial, " + + "f2 serial2, " + + "f3 serial4, " + + "f4 serial8, " + + "f5 bigserial", + "32767," + + "2147483647," + + "32767," + + "2147483647," + + "9223372036854775807," + + "9223372036854775807"); + } + + +} \ No newline at end of file From 8c1728d0bfa36242b088530932585dccee2e954e Mon Sep 17 00:00:00 2001 From: JohnMa123 Date: Thu, 9 Jan 2025 14:29:15 +0800 Subject: [PATCH 3/5] modify TiDBConnectorITCase --- .idea/vcs.xml | 2 +- .../jdbc/catalog/AbstractJdbcCatalog.java | 14 ++- .../oracle/catalog/OracleCatalog.java | 6 +- .../oracle/catalog/OracleTablePath.java | 112 +++++++++++++++++ .../jdbc/databases/oracle/OracleTestBase.java | 9 ++ .../oracle/catalog/OracleCatalogTestBase.java | 115 +++++++++++------- .../databases/oracle/OracleImages.java | 2 + .../src/test/resources/log4j2-test.properties | 2 +- 8 files changed, 209 insertions(+), 53 deletions(-) create mode 100644 flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleTablePath.java diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 0dd9dbf02..bd3d24bbb 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -19,6 +19,6 @@ - + \ No newline at end of file diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java index 7ba0c06dd..ee3f98dbd 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java @@ -107,13 +107,19 @@ public AbstractJdbcCatalog( checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd)); checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl)); - JdbcCatalogUtils.validateJdbcUrl(baseUrl); - + if(!baseUrl.toLowerCase().contains("oracle")){ + JdbcCatalogUtils.validateJdbcUrl(baseUrl); + this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/"; + this.defaultUrl = this.baseUrl + defaultDatabase; + } else { + this.baseUrl = baseUrl; + this.defaultUrl = this.baseUrl; + } this.userClassLoader = userClassLoader; this.username = username; this.pwd = pwd; - this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/"; - this.defaultUrl = this.baseUrl + defaultDatabase; + + } @Override diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalog.java index 79c84d695..6f7795cfe 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalog.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalog.java @@ -190,15 +190,15 @@ protected List extractColumnValuesBySQL(String connUrl, String sql, int } protected String getSchemaTableName(ObjectPath tablePath) { - return PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getFullPath(); + return OracleTablePath.fromFlinkTableName(tablePath.getObjectName()).getFullPath(); } protected String getSchemaName(ObjectPath tablePath) { - return PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgSchemaName(); + return OracleTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgSchemaName(); } protected String getTableName(ObjectPath tablePath) { - return PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgTableName(); + return OracleTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgTableName(); } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleTablePath.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleTablePath.java new file mode 100644 index 000000000..4e94e50ce --- /dev/null +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleTablePath.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.databases.oracle.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.StringUtils; + +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Table path of PostgresSQL in Flink. Can be of formats "table_name" or "schema_name.table_name". + * When it's "table_name", the schema name defaults to "public". + */ +@Internal +public class OracleTablePath { + + private static final String DEFAULT_POSTGRES_SCHEMA_NAME = "public"; + + private final String pgSchemaName; + private final String pgTableName; + + public OracleTablePath(String pgSchemaName, String pgTableName) { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(pgSchemaName), + "Schema name is not valid. Null or empty is not allowed"); + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(pgTableName), + "Table name is not valid. Null or empty is not allowed"); + + this.pgSchemaName = pgSchemaName; + this.pgTableName = pgTableName; + } + + public static OracleTablePath fromFlinkTableName(String flinkTableName) { + if (flinkTableName.contains(".")) { + String[] path = flinkTableName.split("\\."); + + checkArgument( + path != null && path.length == 2, + String.format( + "Table name '%s' is not valid. The parsed length is %d", + flinkTableName, path.length)); + + return new OracleTablePath(path[0], path[1]); + } else { + return new OracleTablePath(getDefaultSchemaName(), flinkTableName); + } + } + + public static String toFlinkTableName(String schema, String table) { + return new OracleTablePath(schema, table).getFullPath(); + } + + public String getFullPath() { + return String.format("%s.%s", pgSchemaName, pgTableName); + } + + public String getPgTableName() { + return pgTableName; + } + + public String getPgSchemaName() { + return pgSchemaName; + } + + protected static String getDefaultSchemaName() { + return DEFAULT_POSTGRES_SCHEMA_NAME; + } + + @Override + public String toString() { + return getFullPath(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + OracleTablePath that = (OracleTablePath) o; + return Objects.equals(pgSchemaName, that.pgSchemaName) + && Objects.equals(pgTableName, that.pgTableName); + } + + @Override + public int hashCode() { + return Objects.hash(pgSchemaName, pgTableName); + } +} diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleTestBase.java index 9cf8884cd..eccdd4b26 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleTestBase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleTestBase.java @@ -24,6 +24,8 @@ import org.apache.flink.connector.jdbc.testutils.databases.oracle.OracleImages; import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.containers.OracleContainer; +import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -32,6 +34,13 @@ @ExtendWith(OracleDatabase.class) public interface OracleTestBase extends DatabaseTest, OracleImages { + @Container + OracleContainer CONTAINER = + new OracleContainer(ORACLE_21) + .withStartupTimeoutSeconds(240) + .withConnectTimeoutSeconds(120) + .usingSid(); + @Override default DatabaseMetadata getMetadata() { return OracleDatabase.getMetadata(); diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTestBase.java index b42f9e1b7..a8c210c66 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTestBase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTestBase.java @@ -1,7 +1,6 @@ package org.apache.flink.connector.jdbc.databases.oracle.catalog; import org.apache.flink.connector.jdbc.databases.oracle.OracleTestBase; -import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresTablePath; import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase; import org.apache.flink.connector.jdbc.testutils.databases.oracle.OracleDatabase; @@ -12,33 +11,27 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; +import java.sql.*; public class OracleCatalogTestBase implements JdbcITCaseBase, OracleTestBase { - private static DatabaseMetadata getStaticMetadata() { - return OracleDatabase.getMetadata(); - } - public static final Logger LOG = LoggerFactory.getLogger(OracleCatalogTestBase.class); protected static final String TEST_CATALOG_NAME = "mypg"; - protected static final String TEST_USERNAME = getStaticMetadata().getUsername(); - protected static final String TEST_PWD = getStaticMetadata().getPassword(); + protected static final String TEST_USERNAME = CONTAINER.getUsername(); + protected static final String TEST_PWD = CONTAINER.getPassword(); protected static final String TEST_DB = "test"; protected static final String TEST_SCHEMA = "test_schema"; - protected static final String TABLE1 = "t1"; - protected static final String TABLE2 = "t2"; - protected static final String TABLE3 = "t3"; - protected static final String TABLE4 = "t4"; - protected static final String TABLE5 = "t5"; - protected static final String TABLE_PRIMITIVE_TYPE = "primitive_table"; - protected static final String TABLE_PRIMITIVE_TYPE2 = "primitive_table2"; - protected static final String TABLE_ARRAY_TYPE = "array_table"; - protected static final String TABLE_SERIAL_TYPE = "serial_table"; + protected static final String TEST_PASSWORD = "test_password"; + protected static final String TABLE1 = TEST_SCHEMA + ".t1"; + protected static final String TABLE2 = TEST_SCHEMA + ".t2"; + protected static final String TABLE3 = TEST_SCHEMA + ".t3"; + protected static final String TABLE4 = TEST_SCHEMA + ".t4"; + protected static final String TABLE5 = TEST_SCHEMA + ".t5"; + protected static final String TABLE_PRIMITIVE_TYPE = TEST_SCHEMA + ".primitive_table"; + protected static final String TABLE_PRIMITIVE_TYPE2 = TEST_SCHEMA + ".primitive_table2"; + protected static final String TABLE_ARRAY_TYPE = TEST_SCHEMA + ".array_table"; + protected static final String TABLE_SERIAL_TYPE = TEST_SCHEMA + ".serial_table"; protected static String baseUrl; protected static OracleCatalog catalog; @@ -47,29 +40,28 @@ private static DatabaseMetadata getStaticMetadata() { @BeforeAll static void init() throws SQLException { // jdbc:oracle:thin:@//localhost:50807/helowin - String jdbcUrl = getStaticMetadata().getJdbcUrl(); + String jdbcUrl = CONTAINER.getJdbcUrl(); // jdbc:oracle:thin:@//localhost:50807/ - baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/")); - + baseUrl = jdbcUrl; // jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/")); catalog = new OracleCatalog( Thread.currentThread().getContextClassLoader(), TEST_CATALOG_NAME, - OracleCatalog.DEFAULT_DATABASE, + CONTAINER.getSid(), TEST_USERNAME, TEST_PWD, baseUrl); // create test database and schema - createSchema(TEST_DB, TEST_SCHEMA); + createSchema(TEST_SCHEMA, TEST_PASSWORD); // create test tables // table: helowin.public.t1 // table: helowin.public.t4 // table: helowin.public.t5 - createTable(PostgresTablePath.fromFlinkTableName(TABLE1), getSimpleTable().oracleSchemaSql); - createTable(PostgresTablePath.fromFlinkTableName(TABLE4), getSimpleTable().oracleSchemaSql); - createTable(PostgresTablePath.fromFlinkTableName(TABLE5), getSimpleTable().oracleSchemaSql); + createTable(OracleTablePath.fromFlinkTableName(TABLE1), getSimpleTable().oracleSchemaSql); + createTable(OracleTablePath.fromFlinkTableName(TABLE4), getSimpleTable().oracleSchemaSql); + createTable(OracleTablePath.fromFlinkTableName(TABLE5), getSimpleTable().oracleSchemaSql); // table: test.public.t2 // table: test.test_schema.t3 @@ -77,21 +69,21 @@ static void init() throws SQLException { // table: helowin.public.dt2 createTable( TEST_DB, - PostgresTablePath.fromFlinkTableName(TABLE2), + OracleTablePath.fromFlinkTableName(TABLE2), getSimpleTable().oracleSchemaSql); createTable( - TEST_DB, new PostgresTablePath(TEST_SCHEMA, TABLE3), getSimpleTable().oracleSchemaSql); + TEST_DB, new OracleTablePath(TEST_SCHEMA, TABLE3), getSimpleTable().oracleSchemaSql); createTable( - PostgresTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE), + OracleTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE), getPrimitiveTable().oracleSchemaSql); createTable( - PostgresTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE2), + OracleTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE2), getPrimitiveTable("test_pk2").oracleSchemaSql); createTable( - PostgresTablePath.fromFlinkTableName(TABLE_ARRAY_TYPE), + OracleTablePath.fromFlinkTableName(TABLE_ARRAY_TYPE), getArrayTable().oracleSchemaSql); createTable( - PostgresTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE), + OracleTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE), getSerialTable().oracleSchemaSql); executeSQL( @@ -113,29 +105,64 @@ static void init() throws SQLException { "insert into %s values (%s);", TABLE_SERIAL_TYPE, getSerialTable().values)); } - public static void createTable(PostgresTablePath tablePath, String tableSchemaSql) + public static void createTable(OracleTablePath tablePath, String tableSchemaSql) throws SQLException { - executeSQL( - OracleCatalog.DEFAULT_DATABASE, - String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql)); + executeTableSQL(String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql)); } - public static void createTable(String db, PostgresTablePath tablePath, String tableSchemaSql) + public static void createTable(String db, OracleTablePath tablePath, String tableSchemaSql) throws SQLException { executeSQL( db, String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql)); } - public static void createSchema(String db, String schema) throws SQLException { - executeSQL(db, String.format("CREATE SCHEMA %s", schema)); + public static void createSchema(String schema, String userPassword) throws SQLException { + executeSQL(String.format("CREATE USER %s IDENTIFIED BY %s DEFAULT TABLESPACE users TEMPORARY TABLESPACE temp", schema, userPassword)); + executeSQL(String.format("GRANT CONNECT, RESOURCE, CREATE TABLE TO %s",schema)); } public static void createDatabase(String database) throws SQLException { executeSQL(String.format("CREATE DATABASE %s;", database)); } - public static void executeSQL(String sql) throws SQLException { - executeSQL("", sql); + public static void executeSQL(String sql) throws SQLException { + try (Connection conn = + DriverManager.getConnection( + String.format("%s", baseUrl), TEST_USERNAME, TEST_PWD); + Statement statement = conn.createStatement()) { + statement.executeUpdate(sql); + } catch (SQLException e) { + throw e; + } + } + + public static void executeTableSQL(String sql) throws SQLException { + // 连接数据库并执行查询 + try (Connection conn = DriverManager.getConnection( + String.format("%s", baseUrl), TEST_USERNAME, TEST_PWD); + PreparedStatement stmt = conn.prepareStatement("SELECT username FROM all_users WHERE username = 'TEST_SCHEMA'"); + ResultSet rs = stmt.executeQuery()) { + + // 如果查询到结果,打印结果 + if (rs.next()) { + String username = rs.getString("username"); + System.out.println("Username: " + username); + } else { + System.out.println("No such user found."); + } + + } catch (SQLException e) { + // 处理数据库连接和查询中的异常 + e.printStackTrace(); + } + try (Connection conn = + DriverManager.getConnection( + String.format("%s", baseUrl), TEST_SCHEMA, TEST_PASSWORD); + Statement statement = conn.createStatement()) { + statement.executeUpdate(sql); + } catch (SQLException e) { + throw e; + } } public static void executeSQL(String db, String sql) throws SQLException { @@ -165,7 +192,7 @@ public TestTable(Schema schema, String oracleSchemaSql, String values) { public static OracleCatalogTestBase.TestTable getSimpleTable() { return new OracleCatalogTestBase.TestTable( - Schema.newBuilder().column("id", DataTypes.INT()).build(), "id integer", "1"); + Schema.newBuilder().column("id", DataTypes.INT()).build(), "id number", "1"); } // oracle doesn't support to use the same primary key name across different tables, diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oracle/OracleImages.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oracle/OracleImages.java index 8c31b5173..4a49cd1ee 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oracle/OracleImages.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oracle/OracleImages.java @@ -20,5 +20,7 @@ /** Oracle docker images. */ public interface OracleImages { String ORACLE_18 = "gvenzl/oracle-xe:18.4.0-slim"; + String ORACLE_12 = "gvenzl/oracle-xe:12.4.0-slim"; + String ORACLE_19 = "goodboy008/oracle-19.3.0-ee"; String ORACLE_21 = "gvenzl/oracle-xe:21.3.0-slim-faststart"; } diff --git a/flink-connector-jdbc/src/test/resources/log4j2-test.properties b/flink-connector-jdbc/src/test/resources/log4j2-test.properties index 835c2ec9a..c733b5cbd 100644 --- a/flink-connector-jdbc/src/test/resources/log4j2-test.properties +++ b/flink-connector-jdbc/src/test/resources/log4j2-test.properties @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level = OFF +rootLogger.level = debug rootLogger.appenderRef.test.ref = TestLogger appender.testlogger.name = TestLogger From 195cff2aa304be7c7e5f6436bdff338c9fbe8602 Mon Sep 17 00:00:00 2001 From: ywwwww <17342016631@163.com> Date: Tue, 14 Jan 2025 16:12:41 +0800 Subject: [PATCH 4/5] ORACLECATALOG TEST EXAMPLE --- .../oracle/catalog/OracleCatalog.java | 13 +- .../oracle/catalog/OracleCatalogTest.java | 53 +++-- .../oracle/catalog/OracleCatalogTestBase.java | 193 ++++++++++-------- 3 files changed, 148 insertions(+), 111 deletions(-) diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalog.java index 6f7795cfe..49054ea2f 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalog.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalog.java @@ -28,7 +28,7 @@ public class OracleCatalog extends AbstractJdbcCatalog { private static final Logger LOG = LoggerFactory.getLogger(OracleCatalog.class); - public static final String DEFAULT_DATABASE = "helowin"; + public static final String DEFAULT_DATABASE = "TEST_SCHEMA"; public static final String IDENTIFIER = "jdbc"; @@ -120,7 +120,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep } String databaseName = tablePath.getDatabaseName(); - String dbUrl = baseUrl + databaseName; + String dbUrl = baseUrl; try(Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) { DatabaseMetaData metaData = conn.getMetaData(); Optional primaryKey = getPrimaryKey(metaData, databaseName, getSchemaName(tablePath), getTableName(tablePath)); @@ -167,7 +167,7 @@ public boolean tableExists(ObjectPath tablePath) throws CatalogException { protected List extractColumnValuesBySQL(String connUrl, String sql, int columnIndex, Predicate filterFunc, Object... params){ List columnValues = Lists.newArrayList(); - + String sql1 = "SELECT table_name,OWNER FROM sys.all_tables"; try (Connection conn = DriverManager.getConnection(connUrl, username, pwd); PreparedStatement ps = conn.prepareStatement(sql)){ if (Objects.nonNull(params) && params.length >0){ @@ -177,11 +177,16 @@ protected List extractColumnValuesBySQL(String connUrl, String sql, int } ResultSet rs = ps.executeQuery(); while (rs.next()) { + +// String username = rs.getString("table_name"); +// String default_tablespace = rs.getString("OWNER"); +//// String temporary_tablespace = rs.getString("temporary_tablespace"); +// System.out.println("username:"+username+"OWNER:"+default_tablespace); String columnValue = rs.getString(columnIndex); if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) { columnValues.add(columnValue); } - return columnValues; +// return columnValues; } } catch (Exception ex){ throw new CatalogException(String.format("The following SQL query could not be executed (%s): %s", connUrl, sql ), ex); diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTest.java index d3b1774b5..30910192f 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTest.java @@ -26,7 +26,6 @@ void testGetDb_DatabaseNotExistException() { @Test void testListDatabases() { List actual = catalog.listDatabases(); - assertThat(actual).isEqualTo(Arrays.asList("postgres", "test")); } @@ -46,33 +45,33 @@ void testListTables() throws DatabaseNotExistException { assertThat(actual) .isEqualTo( Arrays.asList( - "public.array_table", - "public.primitive_table", - "public.primitive_table2", - "public.serial_table", - "public.t1", - "public.t4", - "public.t5")); - - actual = catalog.listTables(TEST_DB); - - assertThat(actual).isEqualTo(Arrays.asList("public.t2", "test_schema.t3")); + "TEST_SCHEMA.PRIMITIVE_TABLE", + "TEST_SCHEMA.PRIMITIVE_TABLE2", + "TEST_SCHEMA.T1", + "TEST_SCHEMA.T2", + "TEST_SCHEMA.T3", + "TEST_SCHEMA.T4", + "TEST_SCHEMA.T5")); + +// actual = catalog.listTables(TEST_DB); +// +// assertThat(actual).isEqualTo(Arrays.asList("public.t2", "test_schema.t3")); } @Test void testListTables_DatabaseNotExistException() { - assertThatThrownBy(() -> catalog.listTables(OracleCatalog.DEFAULT_DATABASE)) + assertThatThrownBy(() -> catalog.listTables(TEST_DB)) .isInstanceOf(DatabaseNotExistException.class); } @Test void testTableExists() { - assertThat(catalog.tableExists(new ObjectPath(TEST_DB, "nonexist"))).isFalse(); + assertThat(catalog.tableExists(new ObjectPath(OracleCatalog.DEFAULT_DATABASE, "nonexist"))).isFalse(); assertThat(catalog.tableExists(new ObjectPath(OracleCatalog.DEFAULT_DATABASE, TABLE1))) .isTrue(); - assertThat(catalog.tableExists(new ObjectPath(TEST_DB, TABLE2))).isTrue(); - assertThat(catalog.tableExists(new ObjectPath(TEST_DB, "test_schema.t3"))).isTrue(); + assertThat(catalog.tableExists(new ObjectPath(OracleCatalog.DEFAULT_DATABASE, TABLE2))).isTrue(); + assertThat(catalog.tableExists(new ObjectPath(OracleCatalog.DEFAULT_DATABASE, "TEST_SCHEMA.T3"))).isTrue(); } @Test @@ -116,25 +115,25 @@ void testGetTable() throws org.apache.flink.table.catalog.exceptions.TableNotExi // test postgres.public.user1 Schema schema = getSimpleTable().schema; - CatalogBaseTable table = catalog.getTable(new ObjectPath("postgres", TABLE1)); - + CatalogBaseTable table = catalog.getTable(new ObjectPath("TEST_SCHEMA", TABLE1)); + System.out.println(table.getUnresolvedSchema().toString()); assertThat(table.getUnresolvedSchema()).isEqualTo(schema); - table = catalog.getTable(new ObjectPath("postgres", "public.t1")); - - assertThat(table.getUnresolvedSchema()).isEqualTo(schema); +// table = catalog.getTable(new ObjectPath("TEST_SCHEMA", TABLE2)); +// +// assertThat(table.getUnresolvedSchema()).isEqualTo(schema); // test testdb.public.user2 - table = catalog.getTable(new ObjectPath(TEST_DB, TABLE2)); + table = catalog.getTable(new ObjectPath("TEST_SCHEMA", TABLE2)); assertThat(table.getUnresolvedSchema()).isEqualTo(schema); - table = catalog.getTable(new ObjectPath(TEST_DB, "public.t2")); - - assertThat(table.getUnresolvedSchema()).isEqualTo(schema); +// table = catalog.getTable(new ObjectPath(TEST_DB, "public.t2")); +// +// assertThat(table.getUnresolvedSchema()).isEqualTo(schema); // test testdb.testschema.user2 - table = catalog.getTable(new ObjectPath(TEST_DB, TEST_SCHEMA + ".t3")); + table = catalog.getTable(new ObjectPath("TEST_SCHEMA", TEST_SCHEMA + ".T3")); assertThat(table.getUnresolvedSchema()).isEqualTo(schema); } @@ -144,7 +143,7 @@ void testPrimitiveDataTypes() throws TableNotExistException { CatalogBaseTable table = catalog.getTable( new ObjectPath(OracleCatalog.DEFAULT_DATABASE, TABLE_PRIMITIVE_TYPE)); - + System.out.println(table.getUnresolvedSchema().toString()); assertThat(table.getUnresolvedSchema()).isEqualTo(getPrimitiveTable().schema); } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTestBase.java index a8c210c66..fd682766b 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTestBase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTestBase.java @@ -21,15 +21,15 @@ public class OracleCatalogTestBase implements JdbcITCaseBase, OracleTestBase { protected static final String TEST_USERNAME = CONTAINER.getUsername(); protected static final String TEST_PWD = CONTAINER.getPassword(); protected static final String TEST_DB = "test"; - protected static final String TEST_SCHEMA = "test_schema"; + protected static final String TEST_SCHEMA = "TEST_SCHEMA"; protected static final String TEST_PASSWORD = "test_password"; - protected static final String TABLE1 = TEST_SCHEMA + ".t1"; - protected static final String TABLE2 = TEST_SCHEMA + ".t2"; - protected static final String TABLE3 = TEST_SCHEMA + ".t3"; - protected static final String TABLE4 = TEST_SCHEMA + ".t4"; - protected static final String TABLE5 = TEST_SCHEMA + ".t5"; - protected static final String TABLE_PRIMITIVE_TYPE = TEST_SCHEMA + ".primitive_table"; - protected static final String TABLE_PRIMITIVE_TYPE2 = TEST_SCHEMA + ".primitive_table2"; + protected static final String TABLE1 = TEST_SCHEMA + ".T1"; + protected static final String TABLE2 = TEST_SCHEMA + ".T2"; + protected static final String TABLE3 = TEST_SCHEMA + ".T3"; + protected static final String TABLE4 = TEST_SCHEMA + ".T4"; + protected static final String TABLE5 = TEST_SCHEMA + ".T5"; + protected static final String TABLE_PRIMITIVE_TYPE = TEST_SCHEMA + ".PRIMITIVE_TABLE"; + protected static final String TABLE_PRIMITIVE_TYPE2 = TEST_SCHEMA + ".PRIMITIVE_TABLE2"; protected static final String TABLE_ARRAY_TYPE = TEST_SCHEMA + ".array_table"; protected static final String TABLE_SERIAL_TYPE = TEST_SCHEMA + ".serial_table"; @@ -72,53 +72,56 @@ static void init() throws SQLException { OracleTablePath.fromFlinkTableName(TABLE2), getSimpleTable().oracleSchemaSql); createTable( - TEST_DB, new OracleTablePath(TEST_SCHEMA, TABLE3), getSimpleTable().oracleSchemaSql); + TEST_DB, OracleTablePath.fromFlinkTableName(TABLE3), getSimpleTable().oracleSchemaSql); createTable( OracleTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE), getPrimitiveTable().oracleSchemaSql); createTable( OracleTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE2), getPrimitiveTable("test_pk2").oracleSchemaSql); - createTable( - OracleTablePath.fromFlinkTableName(TABLE_ARRAY_TYPE), - getArrayTable().oracleSchemaSql); - createTable( - OracleTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE), - getSerialTable().oracleSchemaSql); +// createTable( +// OracleTablePath.fromFlinkTableName(TABLE_ARRAY_TYPE), +// getArrayTable().oracleSchemaSql); +// createTable( +// OracleTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE), +// getSerialTable().oracleSchemaSql); executeSQL( OracleCatalog.DEFAULT_DATABASE, String.format( - "insert into public.%s values (%s);", TABLE1, getSimpleTable().values)); + "insert into %s values (%s)", TABLE1, getSimpleTable().values)); executeSQL( OracleCatalog.DEFAULT_DATABASE, String.format( - "insert into %s values (%s);", + "insert into %s values (%s)", TABLE_PRIMITIVE_TYPE, getPrimitiveTable().values)); - executeSQL( - OracleCatalog.DEFAULT_DATABASE, - String.format( - "insert into %s values (%s);", TABLE_ARRAY_TYPE, getArrayTable().values)); - executeSQL( - OracleCatalog.DEFAULT_DATABASE, - String.format( - "insert into %s values (%s);", TABLE_SERIAL_TYPE, getSerialTable().values)); +// executeSQL( +// OracleCatalog.DEFAULT_DATABASE, +// String.format( +// "insert into %s values (%s);", TABLE_ARRAY_TYPE, getArrayTable().values)); +// executeSQL( +// OracleCatalog.DEFAULT_DATABASE, +// String.format( +// "insert into %s values (%s)", TABLE_SERIAL_TYPE, getSerialTable().values)); + System.out.println("success"); } public static void createTable(OracleTablePath tablePath, String tableSchemaSql) throws SQLException { - executeTableSQL(String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql)); + executeTableSQL(String.format("CREATE TABLE %s(%s)", tablePath.getFullPath(), tableSchemaSql)); } public static void createTable(String db, OracleTablePath tablePath, String tableSchemaSql) throws SQLException { executeSQL( - db, String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql)); + db, String.format("CREATE TABLE %s(%s)", tablePath.getFullPath(), tableSchemaSql)); } public static void createSchema(String schema, String userPassword) throws SQLException { executeSQL(String.format("CREATE USER %s IDENTIFIED BY %s DEFAULT TABLESPACE users TEMPORARY TABLESPACE temp", schema, userPassword)); - executeSQL(String.format("GRANT CONNECT, RESOURCE, CREATE TABLE TO %s",schema)); + executeSQL(String.format("ALTER USER %s QUOTA UNLIMITED ON USERS",schema)); + executeSQL(String.format("GRANT CONNECT, RESOURCE, CREATE TABLE,INSERT ANY TABLE TO %s",schema)); + } public static void createDatabase(String database) throws SQLException { @@ -136,20 +139,38 @@ public static void executeSQL(String sql) throws SQLException { } } + public static void executeSQLQuery(String sql) throws SQLException { + try (Connection conn = + DriverManager.getConnection( + String.format("%s", baseUrl), TEST_USERNAME, TEST_PWD); + Statement statement = conn.createStatement()) { + ResultSet rs = statement.executeQuery(sql); + while (rs.next()){ + System.out.println(rs.toString()); + } + + } catch (SQLException e) { + throw e; + } + } + public static void executeTableSQL(String sql) throws SQLException { // 连接数据库并执行查询 try (Connection conn = DriverManager.getConnection( String.format("%s", baseUrl), TEST_USERNAME, TEST_PWD); - PreparedStatement stmt = conn.prepareStatement("SELECT username FROM all_users WHERE username = 'TEST_SCHEMA'"); - ResultSet rs = stmt.executeQuery()) { - - // 如果查询到结果,打印结果 - if (rs.next()) { - String username = rs.getString("username"); - System.out.println("Username: " + username); - } else { - System.out.println("No such user found."); - } + Statement statement = conn.createStatement()) { +// String s = "CREATE TABLE test_schema.primitive_table(int integer," + +// " bytea bytea, short smallint, long bigint, real real, double_precision double precision, " + +// "numeric numeric(10, 5), decimal decimal(10, 1), boolean boolean, text text, char char, " + +// "character character(3), character_varying character varying(20), timestamp timestamp(5), " + +// "date date,time time(0), default_numeric numeric, CONSTRAINT test_pk PRIMARY KEY (short, int))"; +// +// String s1 = "CREATE TABLE test_schema.primitive_table(int integer, bytea blob, " + +// "st number, lg number, rl real, docision double precision," + +// "numeric numeric(10, 5),decal decimal(10, 1), tet clob, c1har char(3)," + +// "character character(3), character_varying character varying(20), timestamp timestamp(5), " + +// "date_col date,default_numeric numeric, CONSTRAINT test_pk PRIMARY KEY (int))" ; +// statement.execute(s1); } catch (SQLException e) { // 处理数据库连接和查询中的异常 @@ -159,16 +180,30 @@ public static void executeTableSQL(String sql) throws SQLException { DriverManager.getConnection( String.format("%s", baseUrl), TEST_SCHEMA, TEST_PASSWORD); Statement statement = conn.createStatement()) { + statement.executeUpdate(sql); + String s ="insert into test_schema.t1 values (1)"; + statement.execute(s); } catch (SQLException e) { throw e; } } public static void executeSQL(String db, String sql) throws SQLException { + try { + Connection conn = DriverManager.getConnection( + String.format("%s", baseUrl), TEST_USERNAME, TEST_PWD); + System.out.println("Connected to the database!"); + // Do something with the connection + conn.close(); + }catch (SQLException e) { + System.out.println("Error connecting to the database: " + e.getMessage()); + } + + try (Connection conn = DriverManager.getConnection( - String.format("%s/%s", baseUrl, db), TEST_USERNAME, TEST_PWD); + String.format("%s", baseUrl), TEST_USERNAME, TEST_PWD); Statement statement = conn.createStatement()) { statement.executeUpdate(sql); } catch (SQLException e) { @@ -192,13 +227,13 @@ public TestTable(Schema schema, String oracleSchemaSql, String values) { public static OracleCatalogTestBase.TestTable getSimpleTable() { return new OracleCatalogTestBase.TestTable( - Schema.newBuilder().column("id", DataTypes.INT()).build(), "id number", "1"); + Schema.newBuilder().column("ID", DataTypes.DECIMAL(38,18)).build(), "id number", "1"); } // oracle doesn't support to use the same primary key name across different tables, // make the table parameterized to resolve this problem. public static OracleCatalogTestBase.TestTable getPrimitiveTable() { - return getPrimitiveTable("test_pk"); + return getPrimitiveTable("TEST_PK"); } // TODO: add back timestamptz and time types. @@ -210,48 +245,44 @@ public static OracleCatalogTestBase.TestTable getPrimitiveTable() { public static OracleCatalogTestBase.TestTable getPrimitiveTable(String primaryKeyName) { return new OracleCatalogTestBase.TestTable( Schema.newBuilder() - .column("int", DataTypes.INT().notNull()) - .column("bytea", DataTypes.BYTES()) - .column("short", DataTypes.SMALLINT().notNull()) - .column("long", DataTypes.BIGINT()) - .column("real", DataTypes.FLOAT()) - .column("double_precision", DataTypes.DOUBLE()) - .column("numeric", DataTypes.DECIMAL(10, 5)) - .column("decimal", DataTypes.DECIMAL(10, 1)) - .column("boolean", DataTypes.BOOLEAN()) - .column("text", DataTypes.STRING()) - .column("char", DataTypes.CHAR(1)) - .column("character", DataTypes.CHAR(3)) - .column("character_varying", DataTypes.VARCHAR(20)) - .column("timestamp", DataTypes.TIMESTAMP(5)) + .column("INT_COL", DataTypes.DECIMAL(38,18).notNull()) + .column("BYTEA_COL", DataTypes.BYTES()) + .column("SHORT_COL", DataTypes.DECIMAL(38,18).notNull()) + .column("LONG_COL", DataTypes.DECIMAL(38,18)) + .column("REAL_COL", DataTypes.DECIMAL(38,18)) + .column("DOUBLE_PRECISION_COL", DataTypes.DECIMAL(38,18)) + .column("NUMERIC_COL", DataTypes.DECIMAL(10, 5)) + .column("DECIMAL_COL", DataTypes.DECIMAL(10, 1)) + .column("TEXT_COL", DataTypes.STRING()) + .column("CHAR_COL", DataTypes.STRING()) + .column("CHARACTER_COL", DataTypes.STRING()) + .column("CHARACTER_VARYING_COL", DataTypes.STRING()) + .column("TIMESTAMP_COL", DataTypes.TIMESTAMP(5)) // .field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4)) - .column("date", DataTypes.DATE()) - .column("time", DataTypes.TIME(0)) - .column("default_numeric", DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)) - .primaryKeyNamed(primaryKeyName, "short", "int") + .column("DATE_COL", DataTypes.TIMESTAMP(6)) + .column("DEFAULT_NUMERIC", DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)) + .primaryKeyNamed(primaryKeyName, "SHORT_COL", "INT_COL") .build(), - "int integer, " - + "bytea bytea, " - + "short smallint, " - + "long bigint, " - + "real real, " - + "double_precision double precision, " - + "numeric numeric(10, 5), " - + "decimal decimal(10, 1), " - + "boolean boolean, " - + "text text, " - + "char char, " - + "character character(3), " - + "character_varying character varying(20), " - + "timestamp timestamp(5), " + "int_col NUMBER, " + + "bytea_col blob, " + + "short_col NUMBER, " + + "long_col NUMBER, " + + "real_col real, " + + "double_precision_col double precision, " + + "numeric_col numeric(10, 5), " + + "decimal_col decimal(10, 1), " + + "text_col clob, " + + "char_col char(3), " + + "character_col character(3), " + + "character_varying_col character varying(20), " + + "timestamp_col timestamp(5), " + // "timestamptz timestamptz(4), " + - "date date," - + "time time(0), " + "date_col date," + "default_numeric numeric, " + "CONSTRAINT " + primaryKeyName - + " PRIMARY KEY (short, int)", + + " PRIMARY KEY (short_col, int_col)", "1," + "'2'," + "3," @@ -260,16 +291,18 @@ public static OracleCatalogTestBase.TestTable getPrimitiveTable(String primaryKe + "6.6," + "7.7," + "8.8," - + "true," +// + "true," + "'a'," + "'b'," + "'c'," + "'d'," - + "'2016-06-22 19:10:25'," +// + "'2016-16-22 19:10:25'," + + "SYSTIMESTAMP," + // "'2006-06-22 19:10:25'," + - "'2015-01-01'," - + "'00:51:02.746572', " +// "'2015-12-12'," + "SYSDATE," +// + "'00:51:02.746572', " + "500"); } From f928685cdbe84b5041daaaa5bb05cffcfc36c091 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B6=E6=96=87=E6=AD=A6?= Date: Tue, 14 Jan 2025 16:31:21 +0800 Subject: [PATCH 5/5] change DEFAULT_DATABASE to DEFAULT_SCHEMA --- .../oracle/catalog/OracleCatalog.java | 2 +- .../oracle/catalog/OracleCatalogTest.java | 18 +++++++++--------- .../oracle/catalog/OracleCatalogTestBase.java | 8 ++++---- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalog.java index 49054ea2f..da3e0ec00 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalog.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalog.java @@ -28,7 +28,7 @@ public class OracleCatalog extends AbstractJdbcCatalog { private static final Logger LOG = LoggerFactory.getLogger(OracleCatalog.class); - public static final String DEFAULT_DATABASE = "TEST_SCHEMA"; + public static final String DEFAULT_SCHEMA = "TEST_SCHEMA"; public static final String IDENTIFIER = "jdbc"; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTest.java index 30910192f..cf99e6035 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTest.java @@ -33,14 +33,14 @@ void testListDatabases() { void testDbExists() { assertThat(catalog.databaseExists("nonexistent")).isFalse(); - assertThat(catalog.databaseExists(OracleCatalog.DEFAULT_DATABASE)).isTrue(); + assertThat(catalog.databaseExists(OracleCatalog.DEFAULT_SCHEMA)).isTrue(); } // ------ tables ------ @Test void testListTables() throws DatabaseNotExistException { - List actual = catalog.listTables(OracleCatalog.DEFAULT_DATABASE); + List actual = catalog.listTables(OracleCatalog.DEFAULT_SCHEMA); assertThat(actual) .isEqualTo( @@ -66,12 +66,12 @@ void testListTables_DatabaseNotExistException() { @Test void testTableExists() { - assertThat(catalog.tableExists(new ObjectPath(OracleCatalog.DEFAULT_DATABASE, "nonexist"))).isFalse(); + assertThat(catalog.tableExists(new ObjectPath(OracleCatalog.DEFAULT_SCHEMA, "nonexist"))).isFalse(); - assertThat(catalog.tableExists(new ObjectPath(OracleCatalog.DEFAULT_DATABASE, TABLE1))) + assertThat(catalog.tableExists(new ObjectPath(OracleCatalog.DEFAULT_SCHEMA, TABLE1))) .isTrue(); - assertThat(catalog.tableExists(new ObjectPath(OracleCatalog.DEFAULT_DATABASE, TABLE2))).isTrue(); - assertThat(catalog.tableExists(new ObjectPath(OracleCatalog.DEFAULT_DATABASE, "TEST_SCHEMA.T3"))).isTrue(); + assertThat(catalog.tableExists(new ObjectPath(OracleCatalog.DEFAULT_SCHEMA, TABLE2))).isTrue(); + assertThat(catalog.tableExists(new ObjectPath(OracleCatalog.DEFAULT_SCHEMA, "TEST_SCHEMA.T3"))).isTrue(); } @Test @@ -142,7 +142,7 @@ void testGetTable() throws org.apache.flink.table.catalog.exceptions.TableNotExi void testPrimitiveDataTypes() throws TableNotExistException { CatalogBaseTable table = catalog.getTable( - new ObjectPath(OracleCatalog.DEFAULT_DATABASE, TABLE_PRIMITIVE_TYPE)); + new ObjectPath(OracleCatalog.DEFAULT_SCHEMA, TABLE_PRIMITIVE_TYPE)); System.out.println(table.getUnresolvedSchema().toString()); assertThat(table.getUnresolvedSchema()).isEqualTo(getPrimitiveTable().schema); } @@ -151,7 +151,7 @@ void testPrimitiveDataTypes() throws TableNotExistException { void testArrayDataTypes() throws TableNotExistException { CatalogBaseTable table = catalog.getTable( - new ObjectPath(OracleCatalog.DEFAULT_DATABASE, TABLE_ARRAY_TYPE)); + new ObjectPath(OracleCatalog.DEFAULT_SCHEMA, TABLE_ARRAY_TYPE)); assertThat(table.getUnresolvedSchema()).isEqualTo(getArrayTable().schema); } @@ -160,7 +160,7 @@ void testArrayDataTypes() throws TableNotExistException { public void testSerialDataTypes() throws TableNotExistException { CatalogBaseTable table = catalog.getTable( - new ObjectPath(OracleCatalog.DEFAULT_DATABASE, TABLE_SERIAL_TYPE)); + new ObjectPath(OracleCatalog.DEFAULT_SCHEMA, TABLE_SERIAL_TYPE)); assertThat(table.getUnresolvedSchema()).isEqualTo(getSerialTable().schema); } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTestBase.java index fd682766b..2e600c9ae 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTestBase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/catalog/OracleCatalogTestBase.java @@ -87,20 +87,20 @@ static void init() throws SQLException { // getSerialTable().oracleSchemaSql); executeSQL( - OracleCatalog.DEFAULT_DATABASE, + OracleCatalog.DEFAULT_SCHEMA, String.format( "insert into %s values (%s)", TABLE1, getSimpleTable().values)); executeSQL( - OracleCatalog.DEFAULT_DATABASE, + OracleCatalog.DEFAULT_SCHEMA, String.format( "insert into %s values (%s)", TABLE_PRIMITIVE_TYPE, getPrimitiveTable().values)); // executeSQL( -// OracleCatalog.DEFAULT_DATABASE, +// OracleCatalog.DEFAULT_SCHEMA, // String.format( // "insert into %s values (%s);", TABLE_ARRAY_TYPE, getArrayTable().values)); // executeSQL( -// OracleCatalog.DEFAULT_DATABASE, +// OracleCatalog.DEFAULT_SCHEMA, // String.format( // "insert into %s values (%s)", TABLE_SERIAL_TYPE, getSerialTable().values)); System.out.println("success");