Skip to content

Commit 7a3b94a

Browse files
committed
[FLINK-33761][Connector/JDBC] support snowflake dialect
1 parent bb43b3c commit 7a3b94a

File tree

4 files changed

+130
-0
lines changed

4 files changed

+130
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.databases.snowflake.dialect;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
23+
import org.apache.flink.connector.jdbc.dialect.AbstractPostgresCompatibleDialect;
24+
import org.apache.flink.table.types.logical.RowType;
25+
26+
import java.util.Optional;
27+
28+
/** JDBC dialect for Snowflake. */
29+
@Internal
30+
public class SnowflakeDialect extends AbstractPostgresCompatibleDialect {
31+
private static final long serialVersionUID = 1L;
32+
33+
@Override
34+
public JdbcRowConverter getRowConverter(RowType rowType) {
35+
return new SnowflakeRowConverter(rowType);
36+
}
37+
38+
@Override
39+
public Optional<String> defaultDriverName() {
40+
return Optional.of("net.snowflake.client.jdbc.SnowflakeDriver");
41+
}
42+
43+
@Override
44+
public String dialectName() {
45+
return "Snowflake";
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.databases.snowflake.dialect;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
23+
import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
24+
25+
/** Factory for {@link SnowflakeDialect}. */
26+
@Internal
27+
public class SnowflakeDialectFactory implements JdbcDialectFactory {
28+
@Override
29+
public boolean acceptsURL(String url) {
30+
return url.startsWith("jdbc:snowflake:");
31+
}
32+
33+
@Override
34+
public JdbcDialect create() {
35+
return new SnowflakeDialect();
36+
}
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.databases.snowflake.dialect;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.connector.jdbc.converter.AbstractPostgresCompatibleRowConverter;
23+
import org.apache.flink.table.types.logical.RowType;
24+
25+
import org.postgresql.jdbc.PgArray;
26+
27+
/**
28+
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
29+
* Snowflake.
30+
*/
31+
@Internal
32+
public class SnowflakeRowConverter extends AbstractPostgresCompatibleRowConverter<PgArray> {
33+
34+
private static final long serialVersionUID = 1L;
35+
36+
@Override
37+
public String converterName() {
38+
return "Snowflake";
39+
}
40+
41+
// https://docs.snowflake.com/en/sql-reference/intro-summary-data-types
42+
public SnowflakeRowConverter(RowType rowType) {
43+
super(rowType);
44+
}
45+
}

flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,4 @@ org.apache.flink.connector.jdbc.databases.sqlserver.dialect.SqlServerDialectFact
2222
org.apache.flink.connector.jdbc.databases.cratedb.dialect.CrateDBDialectFactory
2323
org.apache.flink.connector.jdbc.databases.db2.dialect.Db2DialectFactory
2424
org.apache.flink.connector.jdbc.databases.trino.dialect.TrinoDialectFactory
25+
org.apache.flink.connector.jdbc.databases.snowflake.dialect.SnowflakeDialectFactory

0 commit comments

Comments
 (0)