From 14f1250e3120520040e3056ac01ded47b4fcd6b0 Mon Sep 17 00:00:00 2001 From: Joao Boto Date: Fri, 25 Nov 2022 18:06:05 +0100 Subject: [PATCH 1/3] [FLINK-15571] Add basic connector configuration --- .../docs/connectors/datastream/redis.md | 58 ++ pom.xml | 453 ++++++++++++++ tools/ci/log4j.properties | 43 ++ tools/maven/checkstyle.xml | 561 ++++++++++++++++++ tools/maven/suppressions.xml | 26 + 5 files changed, 1141 insertions(+) create mode 100644 docs/content/docs/connectors/datastream/redis.md create mode 100644 pom.xml create mode 100644 tools/ci/log4j.properties create mode 100644 tools/maven/checkstyle.xml create mode 100644 tools/maven/suppressions.xml diff --git a/docs/content/docs/connectors/datastream/redis.md b/docs/content/docs/connectors/datastream/redis.md new file mode 100644 index 0000000..46c866f --- /dev/null +++ b/docs/content/docs/connectors/datastream/redis.md @@ -0,0 +1,58 @@ +--- +title: Redis +weight: 5 +type: docs +aliases: +--- + + +# Redis Connector + +This connector provides sinks that can request document actions to an +[Redis](https://redis.io/). To use this connector, add the following +dependencies to your project: + + + + + + + + + + + + + + +
Redis versionMaven Dependency
7.x{{< connector_artifact flink-connector-redis-streams 3.0.0 >}}
+ +{{< py_download_link "redis" >}} + +Note that the streaming connectors are currently not part of the binary +distribution. See [here]({{< ref "docs/dev/configuration/overview" >}}) for information +about how to package the program with the libraries for cluster execution. + +## Installing Redis + +Instructions for setting up a Redis cluster can be found +[here](https://redis.io/docs/getting-started/installation/). + + diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..ceca4de --- /dev/null +++ b/pom.xml @@ -0,0 +1,453 @@ + + + + + + io.github.zentol.flink + flink-connector-parent + 1.0 + + + 4.0.0 + + org.apache.flink + flink-connector-redis-parent + 3.0-SNAPSHOT + + Flink : Connectors : Redis : Parent + pom + 2022 + + + https://github.com/apache/flink-connector-redis-streams + git@github.com:apache/flink-connector-redis-streams.git + + scm:git:https://gitbox.apache.org/repos/asf/flink-connector-redis-stream.git + + + + + 1.16.0 + 15.0 + 4.2.3 + + 2.13.4.20221013 + 5.8.1 + 3.21.0 + 0.22.0 + 1.17.2 + 2.21.0 + + false + 1.15.0 + + 1.7.36 + 2.17.2 + + + + flink-connector-redis-parent + + + + + org.apache.flink + flink-shaded-force-shading + ${flink.shaded.version} + + + + + + + org.slf4j + slf4j-api + provided + + + + + com.google.code.findbugs + jsr305 + provided + + + + + org.junit.jupiter + junit-jupiter + test + + + + org.junit.vintage + junit-vintage-engine + test + + + + org.assertj + assertj-core + test + + + + org.mockito + mockito-core + ${mockito.version} + jar + test + + + + org.testcontainers + junit-jupiter + test + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + test + + + + org.apache.logging.log4j + log4j-api + test + + + + org.apache.logging.log4j + log4j-core + test + + + + + org.apache.logging.log4j + log4j-1.2-api + test + + + + org.apache.flink + flink-test-utils-junit + test + + + + + org.apache.flink + flink-architecture-tests-test + test + + + org.apache.flink + flink-architecture-tests-production + test + + + + + + + + + + + + redis.clients + jedis + ${jedis.version} + compile + + + org.slf4j + slf4j.api + + + + + + + org.apache.flink + flink-connector-base + ${flink.version} + + + + org.apache.flink + flink-streaming-java + ${flink.version} + + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + org.apache.flink + flink-test-utils-junit + ${flink.version} + test + + + + + + org.apache.flink + flink-architecture-tests-base + ${flink.version} + test + + + + org.apache.flink + flink-architecture-tests-test + ${flink.version} + test + + + + org.apache.flink + flink-architecture-tests-production + ${flink.version} + test + + + + + com.google.code.findbugs + jsr305 + 1.3.9 + + + + commons-codec + commons-codec + 1.15 + + + + org.apache.httpcomponents + httpcore + 4.4.14 + + + + org.apache.httpcomponents + httpclient + 4.5.13 + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + + org.apache.logging.log4j + log4j-1.2-api + ${log4j.version} + + + + + com.fasterxml.jackson + jackson-bom + pom + import + ${jackson-bom.version} + + + + + org.junit + junit-bom + ${junit5.version} + pom + import + + + + org.assertj + assertj-core + ${assertj.version} + test + + + + + com.esotericsoftware.kryo + kryo + 2.24.0 + + + + + org.objenesis + objenesis + 2.1 + + + + org.testcontainers + testcontainers-bom + ${testcontainers.version} + pom + import + + + + com.tngtech.archunit + archunit + ${archunit.version} + test + + + + com.tngtech.archunit + archunit-junit5 + ${archunit.version} + test + + + + + + + + + + + sql-jars + + + !skipSqlJars + + + + + + + + + + + + org.codehaus.mojo + exec-maven-plugin + false + + + org.apache.flink + flink-ci-tools + ${flink.version} + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + + io.github.zentol.japicmp + japicmp-maven-plugin + + + + org.apache.rat + apache-rat-plugin + false + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + com.diffplug.spotless + spotless-maven-plugin + + + org.apache.maven.plugins + maven-compiler-plugin + + + + org.apache.maven.plugins + maven-surefire-plugin + + + + org.apache.maven.plugins + maven-enforcer-plugin + + + + org.apache.maven.plugins + maven-shade-plugin + + + + org.commonjava.maven.plugins + directory-maven-plugin + + + + \ No newline at end of file diff --git a/tools/ci/log4j.properties b/tools/ci/log4j.properties new file mode 100644 index 0000000..b28a9e3 --- /dev/null +++ b/tools/ci/log4j.properties @@ -0,0 +1,43 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger.level = INFO +rootLogger.appenderRef.out.ref = ConsoleAppender + +# ----------------------------------------------------------------------------- +# Console (use 'console') +# ----------------------------------------------------------------------------- + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} [%20t] %-5p %-60c %x - %m%n + +# ----------------------------------------------------------------------------- +# File (use 'file') +# ----------------------------------------------------------------------------- +appender.file.name = FileAppender +appender.file.type = FILE +appender.file.fileName = ${sys:log.dir}/mvn-${sys:mvn.forkNumber:-output}.log +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d{HH:mm:ss,SSS} [%20t] %-5p %-60c %x - %m%n +appender.file.createOnDemand = true + +# suppress the irrelevant (wrong) warnings from the netty channel handler +logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline +logger.netty.level = ERROR \ No newline at end of file diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml new file mode 100644 index 0000000..2841ea4 --- /dev/null +++ b/tools/maven/checkstyle.xml @@ -0,0 +1,561 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml new file mode 100644 index 0000000..f0de8cd --- /dev/null +++ b/tools/maven/suppressions.xml @@ -0,0 +1,26 @@ + + + + + + + \ No newline at end of file From d272e018a64cc2b2399c1d103c772298e1919a9f Mon Sep 17 00:00:00 2001 From: Joao Boto Date: Tue, 29 Nov 2022 15:42:29 +0100 Subject: [PATCH 2/3] [FLINK-15571] Add basic Redis Streams sink. --- flink-connector-redis-streams/pom.xml | 36 ++ .../redis/streams/sink/RedisSerializer.java | 32 ++ .../redis/streams/sink/RedisStreamsSink.java | 43 +++ .../streams/sink/RedisStreamsWriter.java | 61 ++++ .../streams/sink/command/RedisCommand.java | 26 ++ .../sink/command/StreamRedisCommand.java | 61 ++++ .../sink/config/JedisClusterConfig.java | 261 +++++++++++++++ .../streams/sink/config/JedisConfig.java | 147 +++++++++ .../streams/sink/config/JedisPoolConfig.java | 276 ++++++++++++++++ .../sink/config/JedisSentinelConfig.java | 308 ++++++++++++++++++ .../sink/connection/JedisConnector.java | 72 ++++ .../connection/JedisConnectorBuilder.java | 142 ++++++++ .../redis/streams/sink/BaseITCase.java | 75 +++++ .../streams/sink/RedisStreamsSinkTest.java | 67 ++++ pom.xml | 4 + 15 files changed, 1611 insertions(+) create mode 100644 flink-connector-redis-streams/pom.xml create mode 100644 flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisSerializer.java create mode 100644 flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSink.java create mode 100644 flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsWriter.java create mode 100644 flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/command/RedisCommand.java create mode 100644 flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/command/StreamRedisCommand.java create mode 100644 flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisClusterConfig.java create mode 100644 flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisConfig.java create mode 100644 flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisPoolConfig.java create mode 100644 flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisSentinelConfig.java create mode 100644 flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/connection/JedisConnector.java create mode 100644 flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/connection/JedisConnectorBuilder.java create mode 100644 flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/BaseITCase.java create mode 100644 flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSinkTest.java diff --git a/flink-connector-redis-streams/pom.xml b/flink-connector-redis-streams/pom.xml new file mode 100644 index 0000000..4a8b3b1 --- /dev/null +++ b/flink-connector-redis-streams/pom.xml @@ -0,0 +1,36 @@ + + + + flink-connector-redis-parent + org.apache.flink + 3.0-SNAPSHOT + + 4.0.0 + + flink-connector-redis-streams + Flink : Connectors : Redis : Streams + + jar + + + + redis.clients + jedis + + + + org.apache.flink + flink-connector-base + ${flink.version} + + + + org.apache.flink + flink-streaming-java + + + + + \ No newline at end of file diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisSerializer.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisSerializer.java new file mode 100644 index 0000000..0f8262a --- /dev/null +++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisSerializer.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connector.redis.streams.sink; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.connector.redis.streams.sink.command.RedisCommand; + +import java.io.Serializable; + +/** + * Function that creates the description how the input data should be mapped to redis type. + * + * @param The type of the element handled by this {@code RedisSerializer} + */ +public interface RedisSerializer extends Function, Serializable { + + RedisCommand getMessage(T input); +} diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSink.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSink.java new file mode 100644 index 0000000..93ff057 --- /dev/null +++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSink.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connector.redis.streams.sink; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.redis.streams.sink.config.JedisConfig; +import org.apache.flink.connector.redis.streams.sink.connection.JedisConnector; +import org.apache.flink.connector.redis.streams.sink.connection.JedisConnectorBuilder; + +import java.io.IOException; + +public class RedisStreamsSink implements Sink { + + private final JedisConfig jedisConfig; + private final RedisSerializer serializer; + + public RedisStreamsSink(JedisConfig jedisConfig, RedisSerializer serializer) { + this.jedisConfig = jedisConfig; + this.serializer = serializer; + } + + @Override + public SinkWriter createWriter(InitContext initContext) throws IOException { + JedisConnector connection = JedisConnectorBuilder.build(jedisConfig); + return new RedisStreamsWriter<>(connection, this.serializer); + } + +} diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsWriter.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsWriter.java new file mode 100644 index 0000000..0601cb1 --- /dev/null +++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsWriter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connector.redis.streams.sink; + +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.redis.streams.sink.command.RedisCommand; +import org.apache.flink.connector.redis.streams.sink.connection.JedisConnector; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Queue; + +public class RedisStreamsWriter implements SinkWriter { + + private final JedisConnector jedisConnector; + private final RedisSerializer serializer; + private final Queue queue = new ArrayDeque<>(); + + public RedisStreamsWriter(JedisConnector jedisConnector, RedisSerializer serializer) { + this.jedisConnector = jedisConnector; + this.serializer = serializer; + } + + + @Override + public void write(T input, Context context) throws IOException, InterruptedException { + RedisCommand message = serializer.getMessage(input); + queue.add(message); + } + + @Override + public void flush(boolean endOfInput) throws IOException, InterruptedException { + flush(); + } + + private void flush() { + while(!this.queue.isEmpty()) { + RedisCommand element = this.queue.remove(); + element.send(this.jedisConnector); + } + } + + @Override + public void close() throws Exception { + jedisConnector.close(); + } +} diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/command/RedisCommand.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/command/RedisCommand.java new file mode 100644 index 0000000..bb6555f --- /dev/null +++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/command/RedisCommand.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connector.redis.streams.sink.command; + +import org.apache.flink.connector.redis.streams.sink.connection.JedisConnector; + +import java.io.Serializable; + +public interface RedisCommand extends Serializable { + void send(JedisConnector connector); + +} \ No newline at end of file diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/command/StreamRedisCommand.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/command/StreamRedisCommand.java new file mode 100644 index 0000000..d6abec5 --- /dev/null +++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/command/StreamRedisCommand.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connector.redis.streams.sink.command; + + +import org.apache.flink.connector.redis.streams.sink.connection.JedisConnector; +import redis.clients.jedis.StreamEntryID; + +import java.util.Map; + +public class StreamRedisCommand implements RedisCommand { + public final String key; + public final Map value; + + private StreamRedisCommand(String key, Map value) { + this.key = key; + this.value = value; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public void send(JedisConnector connector) { + connector.getJedisCommands().xadd(key, StreamEntryID.NEW_ENTRY, value); + } + + public static class Builder { + private String key; + private Map value; + + public Builder withKey(String key) { + this.key = key; + return this; + } + + public Builder withValue(Map value) { + this.value = value; + return this; + } + + public StreamRedisCommand build() { + return new StreamRedisCommand(key, value); + } + } +} \ No newline at end of file diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisClusterConfig.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisClusterConfig.java new file mode 100644 index 0000000..076d74f --- /dev/null +++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisClusterConfig.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.redis.streams.sink.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Protocol; + +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Configuration for Jedis cluster. */ +public class JedisClusterConfig extends JedisConfig { + + private final Set nodes; + private final int maxRedirections; + + /** + * Jedis cluster configuration. The list of node is mandatory, and when nodes is not set, it + * throws NullPointerException. + * + * @param nodes list of node information for JedisCluster + * @param connectionTimeout socket / connection timeout. The default is 2000 + * @param maxRedirections limit of redirections-how much we'll follow MOVED or ASK + * @param maxTotal the maximum number of objects that can be allocated by the pool + * @param maxIdle the cap on the number of "idle" instances in the pool + * @param minIdle the minimum number of idle objects to maintain in the pool + * @param password the password of redis cluster + * @param testOnBorrow Whether objects borrowed from the pool will be validated before being + * returned, default value is false + * @param testOnReturn Whether objects borrowed from the pool will be validated when they are + * returned to the pool, default value is false + * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle + * object evictor, default value is false + * @throws NullPointerException if parameter {@code nodes} is {@code null} + */ + private JedisClusterConfig( + Set nodes, + int connectionTimeout, + int maxRedirections, + int maxTotal, + int maxIdle, + int minIdle, + String password, + boolean testOnBorrow, + boolean testOnReturn, + boolean testWhileIdle) { + super( + connectionTimeout, + maxTotal, + maxIdle, + minIdle, + password, + testOnBorrow, + testOnReturn, + testWhileIdle); + + checkNotNull(nodes, "Node information should be presented"); + checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty"); + this.nodes = new HashSet<>(nodes); + this.maxRedirections = maxRedirections; + } + + /** + * Returns nodes. + * + * @return list of node information + */ + public Set getNodes() { + Set ret = new HashSet<>(); + for (InetSocketAddress node : nodes) { + ret.add(new HostAndPort(node.getHostName(), node.getPort())); + } + return ret; + } + + /** + * Returns limit of redirection. + * + * @return limit of redirection + */ + public int getMaxRedirections() { + return maxRedirections; + } + + /** Builder for initializing {@link JedisClusterConfig}. */ + public static class Builder { + private Set nodes; + private int timeout = Protocol.DEFAULT_TIMEOUT; + private int maxRedirections = 5; + private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL; + private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE; + private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE; + private boolean testOnBorrow = GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW; + private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN; + private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE; + private String password; + + /** + * Sets list of node. + * + * @param nodes list of node + * @return Builder itself + */ + public Builder setNodes(Set nodes) { + this.nodes = nodes; + return this; + } + + /** + * Sets socket / connection timeout. + * + * @param timeout socket / connection timeout, default value is 2000 + * @return Builder itself + */ + public Builder setTimeout(int timeout) { + this.timeout = timeout; + return this; + } + + /** + * Sets limit of redirection. + * + * @param maxRedirections limit of redirection, default value is 5 + * @return Builder itself + */ + public Builder setMaxRedirections(int maxRedirections) { + this.maxRedirections = maxRedirections; + return this; + } + + /** + * Sets value for the {@code maxTotal} configuration attribute for pools to be created with + * this configuration instance. + * + * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, + * default value is 8 + * @return Builder itself + */ + public Builder setMaxTotal(int maxTotal) { + this.maxTotal = maxTotal; + return this; + } + + /** + * Sets value for the {@code maxIdle} configuration attribute for pools to be created with + * this configuration instance. + * + * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxIdle(int maxIdle) { + this.maxIdle = maxIdle; + return this; + } + + /** + * Sets value for the {@code minIdle} configuration attribute for pools to be created with + * this configuration instance. + * + * @param minIdle the minimum number of idle objects to maintain in the pool, default value + * is 0 + * @return Builder itself + */ + public Builder setMinIdle(int minIdle) { + this.minIdle = minIdle; + return this; + } + + /** + * Sets value for the {@code password} configuration attribute for pools to be created with + * this configuration instance. + * + * @param password the password for accessing redis cluster + * @return Builder itself + */ + public Builder setPassword(String password) { + this.password = password; + return this; + } + + /** + * Sets value for the {@code testOnBorrow} configuration attribute for pools to be created + * with this configuration instance. + * + * @param testOnBorrow Whether objects borrowed from the pool will be validated before being + * returned + * @return Builder itself + */ + public Builder setTestOnBorrow(boolean testOnBorrow) { + this.testOnBorrow = testOnBorrow; + return this; + } + + /** + * Sets value for the {@code testOnReturn} configuration attribute for pools to be created + * with this configuration instance. + * + * @param testOnReturn Whether objects borrowed from the pool will be validated when they + * are returned to the pool + * @return Builder itself + */ + public Builder setTestOnReturn(boolean testOnReturn) { + this.testOnReturn = testOnReturn; + return this; + } + + /** + * Sets value for the {@code testWhileIdle} configuration attribute for pools to be created + * with this configuration instance. Setting this to true will also set default idle-testing + * parameters provided in Jedis + * + * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the + * idle object evictor + * @return Builder itself + * @see redis.clients.jedis.JedisPoolConfig + */ + public Builder setTestWhileIdle(boolean testWhileIdle) { + this.testWhileIdle = testWhileIdle; + return this; + } + + /** + * Builds JedisClusterConfig. + * + * @return JedisClusterConfig + */ + public JedisClusterConfig build() { + return new JedisClusterConfig( + nodes, + timeout, + maxRedirections, + maxTotal, + maxIdle, + minIdle, + password, + testOnBorrow, + testOnReturn, + testWhileIdle); + } + } +} diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisConfig.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisConfig.java new file mode 100644 index 0000000..7b8c65e --- /dev/null +++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisConfig.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.redis.streams.sink.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** Base class for Flink Redis configuration. */ +public abstract class JedisConfig implements Serializable { + + protected final int maxTotal; + protected final int maxIdle; + protected final int minIdle; + protected final int connectionTimeout; + protected final String password; + + protected final boolean testOnBorrow; + protected final boolean testOnReturn; + protected final boolean testWhileIdle; + + protected JedisConfig( + int connectionTimeout, + int maxTotal, + int maxIdle, + int minIdle, + String password, + boolean testOnBorrow, + boolean testOnReturn, + boolean testWhileIdle) { + + checkArgument(connectionTimeout >= 0, "connection timeout can not be negative"); + checkArgument(maxTotal >= 0, "maxTotal value can not be negative"); + checkArgument(maxIdle >= 0, "maxIdle value can not be negative"); + checkArgument(minIdle >= 0, "minIdle value can not be negative"); + + this.connectionTimeout = connectionTimeout; + this.maxTotal = maxTotal; + this.maxIdle = maxIdle; + this.minIdle = minIdle; + this.testOnBorrow = testOnBorrow; + this.testOnReturn = testOnReturn; + this.testWhileIdle = testWhileIdle; + this.password = password; + } + + /** + * Returns timeout. + * + * @return connection timeout + */ + public int getConnectionTimeout() { + return connectionTimeout; + } + + /** + * Get the value for the {@code maxTotal} configuration attribute for pools to be created with + * this configuration instance. + * + * @return The current setting of {@code maxTotal} for this configuration instance + * @see GenericObjectPoolConfig#getMaxTotal() + */ + public int getMaxTotal() { + return maxTotal; + } + + /** + * Get the value for the {@code maxIdle} configuration attribute for pools to be created with + * this configuration instance. + * + * @return The current setting of {@code maxIdle} for this configuration instance + * @see GenericObjectPoolConfig#getMaxIdle() + */ + public int getMaxIdle() { + return maxIdle; + } + + /** + * Get the value for the {@code minIdle} configuration attribute for pools to be created with + * this configuration instance. + * + * @return The current setting of {@code minIdle} for this configuration instance + * @see GenericObjectPoolConfig#getMinIdle() + */ + public int getMinIdle() { + return minIdle; + } + + /** + * Returns password. + * + * @return password + */ + public String getPassword() { + return password; + } + + /** + * Get the value for the {@code testOnBorrow} configuration attribute for pools to be created + * with this configuration instance. + * + * @return The current setting of {@code testOnBorrow} for this configuration instance + * @see GenericObjectPoolConfig#getTestOnBorrow() + */ + public boolean getTestOnBorrow() { + return testOnBorrow; + } + + /** + * Get the value for the {@code testOnReturn} configuration attribute for pools to be created + * with this configuration instance. + * + * @return The current setting of {@code testOnReturn} for this configuration instance + * @see GenericObjectPoolConfig#getTestOnReturn() + */ + public boolean getTestOnReturn() { + return testOnReturn; + } + + /** + * Get the value for the {@code testWhileIdle} configuration attribute for pools to be created + * with this configuration instance. + * + * @return The current setting of {@code testWhileIdle} for this configuration instance + * @see GenericObjectPoolConfig#getTestWhileIdle() + */ + public boolean getTestWhileIdle() { + return testWhileIdle; + } +} diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisPoolConfig.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisPoolConfig.java new file mode 100644 index 0000000..dfe85c8 --- /dev/null +++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisPoolConfig.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.redis.streams.sink.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import redis.clients.jedis.Protocol; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Configuration for Jedis pool. */ +public class JedisPoolConfig extends JedisConfig { + + private final String host; + private final int port; + private final int database; + + /** + * Jedis pool configuration. The host is mandatory, and when host is not set, it throws + * NullPointerException. + * + * @param host hostname or IP + * @param port port, default value is 6379 + * @param connectionTimeout socket / connection timeout, default value is 2000 milli second + * @param password password, if any + * @param database database index + * @param maxTotal the maximum number of objects that can be allocated by the pool, default + * value is 8 + * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8 + * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0 + * @param testOnBorrow Whether objects borrowed from the pool will be validated before being + * returned, default value is false + * @param testOnReturn Whether objects borrowed from the pool will be validated when they are + * returned to the pool, default value is false + * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle + * object evictor, default value is false + * @throws NullPointerException if parameter {@code host} is {@code null} + */ + private JedisPoolConfig( + String host, + int port, + int connectionTimeout, + String password, + int database, + int maxTotal, + int maxIdle, + int minIdle, + boolean testOnBorrow, + boolean testOnReturn, + boolean testWhileIdle) { + super( + connectionTimeout, + maxTotal, + maxIdle, + minIdle, + password, + testOnBorrow, + testOnReturn, + testWhileIdle); + + checkNotNull(host, "Host information should be presented"); + this.host = host; + this.port = port; + this.database = database; + } + + /** + * Returns host. + * + * @return hostname or IP + */ + public String getHost() { + return host; + } + + /** + * Returns port. + * + * @return port + */ + public int getPort() { + return port; + } + + /** + * Returns database index. + * + * @return database index + */ + public int getDatabase() { + return database; + } + + /** Builder for initializing {@link JedisPoolConfig}. */ + public static class Builder { + private String host; + private int port = Protocol.DEFAULT_PORT; + private int timeout = Protocol.DEFAULT_TIMEOUT; + private int database = Protocol.DEFAULT_DATABASE; + private String password; + private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL; + private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE; + private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE; + private boolean testOnBorrow = GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW; + private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN; + private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE; + + /** + * Sets value for the {@code maxTotal} configuration attribute for pools to be created with + * this configuration instance. + * + * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, + * default value is 8 + * @return Builder itself + */ + public Builder setMaxTotal(int maxTotal) { + this.maxTotal = maxTotal; + return this; + } + + /** + * Sets value for the {@code maxIdle} configuration attribute for pools to be created with + * this configuration instance. + * + * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxIdle(int maxIdle) { + this.maxIdle = maxIdle; + return this; + } + + /** + * Sets value for the {@code minIdle} configuration attribute for pools to be created with + * this configuration instance. + * + * @param minIdle the minimum number of idle objects to maintain in the pool, default value + * is 0 + * @return Builder itself + */ + public Builder setMinIdle(int minIdle) { + this.minIdle = minIdle; + return this; + } + + /** + * Sets host. + * + * @param host host + * @return Builder itself + */ + public Builder setHost(String host) { + this.host = host; + return this; + } + + /** + * Sets port. + * + * @param port port, default value is 6379 + * @return Builder itself + */ + public Builder setPort(int port) { + this.port = port; + return this; + } + + /** + * Sets timeout. + * + * @param timeout timeout, default value is 2000 + * @return Builder itself + */ + public Builder setTimeout(int timeout) { + this.timeout = timeout; + return this; + } + + /** + * Sets database index. + * + * @param database database index, default value is 0 + * @return Builder itself + */ + public Builder setDatabase(int database) { + this.database = database; + return this; + } + + /** + * Sets password. + * + * @param password password, if any + * @return Builder itself + */ + public Builder setPassword(String password) { + this.password = password; + return this; + } + + /** + * Sets value for the {@code testOnBorrow} configuration attribute for pools to be created + * with this configuration instance. + * + * @param testOnBorrow Whether objects borrowed from the pool will be validated before being + * returned + * @return Builder itself + */ + public Builder setTestOnBorrow(boolean testOnBorrow) { + this.testOnBorrow = testOnBorrow; + return this; + } + + /** + * Sets value for the {@code testOnReturn} configuration attribute for pools to be created + * with this configuration instance. + * + * @param testOnReturn Whether objects borrowed from the pool will be validated when they + * are returned to the pool + * @return Builder itself + */ + public Builder setTestOnReturn(boolean testOnReturn) { + this.testOnReturn = testOnReturn; + return this; + } + + /** + * Sets value for the {@code testWhileIdle} configuration attribute for pools to be created + * with this configuration instance. Setting this to true will also set default idle-testing + * parameters provided in Jedis + * + * @see redis.clients.jedis.JedisPoolConfig + * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the + * idle object evictor + * @return Builder itself + */ + public Builder setTestWhileIdle(boolean testWhileIdle) { + this.testWhileIdle = testWhileIdle; + return this; + } + + /** + * Builds JedisPoolConfig. + * + * @return JedisPoolConfig + */ + public JedisPoolConfig build() { + return new JedisPoolConfig( + host, + port, + timeout, + password, + database, + maxTotal, + maxIdle, + minIdle, + testOnBorrow, + testOnReturn, + testWhileIdle); + } + } +} diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisSentinelConfig.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisSentinelConfig.java new file mode 100644 index 0000000..49253a5 --- /dev/null +++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisSentinelConfig.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.redis.streams.sink.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import redis.clients.jedis.Protocol; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Configuration for Jedis Sentinel pool. */ +public class JedisSentinelConfig extends JedisConfig { + private final String masterName; + private final Set sentinels; + private final int soTimeout; + private final int database; + + /** + * Jedis Sentinels config. The master name and sentinels are mandatory, and when you didn't set + * these, it throws NullPointerException. + * + * @param masterName master name of the replica set + * @param sentinels set of sentinel hosts + * @param connectionTimeout timeout connection timeout + * @param soTimeout timeout socket timeout + * @param password password, if any + * @param database database database index + * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool + * @param maxIdle the cap on the number of "idle" instances in the pool + * @param minIdle the minimum number of idle objects to maintain in the pool + * @param testOnBorrow Whether objects borrowed from the pool will be validated before being + * returned, default value is false + * @param testOnReturn Whether objects borrowed from the pool will be validated when they are + * returned to the pool, default value is false + * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle + * object evictor, default value is false + * @throws NullPointerException if {@code masterName} or {@code sentinels} is {@code null} + * @throws IllegalArgumentException if {@code sentinels} are empty + */ + private JedisSentinelConfig( + String masterName, + Set sentinels, + int connectionTimeout, + int soTimeout, + String password, + int database, + int maxTotal, + int maxIdle, + int minIdle, + boolean testOnBorrow, + boolean testOnReturn, + boolean testWhileIdle) { + super( + connectionTimeout, + maxTotal, + maxIdle, + minIdle, + password, + testOnBorrow, + testOnReturn, + testWhileIdle); + + checkNotNull(masterName, "Master name should be presented"); + checkNotNull(sentinels, "Sentinels information should be presented"); + checkArgument(!sentinels.isEmpty(), "Sentinel hosts should not be empty"); + + this.masterName = masterName; + this.sentinels = new HashSet<>(sentinels); + this.soTimeout = soTimeout; + this.database = database; + } + + /** + * Returns master name of the replica set. + * + * @return master name of the replica set. + */ + public String getMasterName() { + return masterName; + } + + /** + * Returns Sentinels host addresses. + * + * @return Set of Sentinels host addresses + */ + public Set getSentinels() { + return sentinels; + } + + /** + * Returns socket timeout. + * + * @return socket timeout + */ + public int getSoTimeout() { + return soTimeout; + } + + /** + * Returns database index. + * + * @return database index + */ + public int getDatabase() { + return database; + } + + /** Builder for initializing {@link JedisSentinelConfig}. */ + public static class Builder { + private String masterName; + private Set sentinels; + private int connectionTimeout = Protocol.DEFAULT_TIMEOUT; + private int soTimeout = Protocol.DEFAULT_TIMEOUT; + private String password; + private int database = Protocol.DEFAULT_DATABASE; + private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL; + private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE; + private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE; + private boolean testOnBorrow = GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW; + private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN; + private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE; + + /** + * Sets master name of the replica set. + * + * @param masterName master name of the replica set + * @return Builder itself + */ + public Builder setMasterName(String masterName) { + this.masterName = masterName; + return this; + } + + /** + * Sets sentinels address. + * + * @param sentinels host set of the sentinels + * @return Builder itself + */ + public Builder setSentinels(Set sentinels) { + this.sentinels = sentinels; + return this; + } + + /** + * Sets connection timeout. + * + * @param connectionTimeout connection timeout, default value is 2000 + * @return Builder itself + */ + public Builder setConnectionTimeout(int connectionTimeout) { + this.connectionTimeout = connectionTimeout; + return this; + } + + /** + * Sets socket timeout. + * + * @param soTimeout socket timeout, default value is 2000 + * @return Builder itself + */ + public Builder setSoTimeout(int soTimeout) { + this.soTimeout = soTimeout; + return this; + } + + /** + * Sets password. + * + * @param password password, if any + * @return Builder itself + */ + public Builder setPassword(String password) { + this.password = password; + return this; + } + + /** + * Sets database index. + * + * @param database database index, default value is 0 + * @return Builder itself + */ + public Builder setDatabase(int database) { + this.database = database; + return this; + } + + /** + * Sets value for the {@code maxTotal} configuration attribute for pools to be created with + * this configuration instance. + * + * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, + * default value is 8 + * @return Builder itself + */ + public Builder setMaxTotal(int maxTotal) { + this.maxTotal = maxTotal; + return this; + } + + /** + * Sets value for the {@code maxIdle} configuration attribute for pools to be created with + * this configuration instance. + * + * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxIdle(int maxIdle) { + this.maxIdle = maxIdle; + return this; + } + + /** + * Sets value for the {@code minIdle} configuration attribute for pools to be created with + * this configuration instance. + * + * @param minIdle the minimum number of idle objects to maintain in the pool, default value + * is 0 + * @return Builder itself + */ + public Builder setMinIdle(int minIdle) { + this.minIdle = minIdle; + return this; + } + + /** + * Sets value for the {@code testOnBorrow} configuration attribute for pools to be created + * with this configuration instance. + * + * @param testOnBorrow Whether objects borrowed from the pool will be validated before being + * returned + * @return Builder itself + */ + public Builder setTestOnBorrow(boolean testOnBorrow) { + this.testOnBorrow = testOnBorrow; + return this; + } + + /** + * Sets value for the {@code testOnReturn} configuration attribute for pools to be created + * with this configuration instance. + * + * @param testOnReturn Whether objects borrowed from the pool will be validated when they + * are returned to the pool + * @return Builder itself + */ + public Builder setTestOnReturn(boolean testOnReturn) { + this.testOnReturn = testOnReturn; + return this; + } + + /** + * Sets value for the {@code testWhileIdle} configuration attribute for pools to be created + * with this configuration instance. Setting this to true will also set default idle-testing + * parameters provided in Jedis + * + * @see redis.clients.jedis.JedisPoolConfig + * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the + * idle object evictor + * @return Builder itself + */ + public Builder setTestWhileIdle(boolean testWhileIdle) { + this.testWhileIdle = testWhileIdle; + return this; + } + + /** + * Builds JedisSentinelConfig. + * + * @return JedisSentinelConfig + */ + public JedisSentinelConfig build() { + return new JedisSentinelConfig( + masterName, + sentinels, + connectionTimeout, + soTimeout, + password, + database, + maxTotal, + maxIdle, + minIdle, + testOnBorrow, + testOnReturn, + testWhileIdle); + } + } +} diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/connection/JedisConnector.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/connection/JedisConnector.java new file mode 100644 index 0000000..8c2fb8f --- /dev/null +++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/connection/JedisConnector.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.redis.streams.sink.connection; + +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; +import redis.clients.jedis.commands.JedisCommands; + +import java.io.Serializable; + +/** A connector to Redis. */ +public class JedisConnector implements AutoCloseable, Serializable { + + private transient JedisCluster jedisCluster; + private transient JedisPool jedisPool; + private transient JedisSentinelPool jedisSentinelPool; + + public JedisConnector(JedisCluster jedisCluster) { + this.jedisCluster = jedisCluster; + } + + public JedisConnector(JedisPool jedisPool) { + this.jedisPool = jedisPool; + } + + public JedisConnector(JedisSentinelPool jedisSentinelPool) { + this.jedisSentinelPool = jedisSentinelPool; + } + + public JedisCommands getJedisCommands() { + if (jedisCluster != null) { + return jedisCluster; + } + if (jedisPool != null) { + return jedisPool.getResource(); + } + if (jedisSentinelPool != null) { + return jedisSentinelPool.getResource(); + } + + throw new IllegalArgumentException("No redis connection found"); + } + + @Override + public void close() { + if (jedisCluster != null) { + jedisCluster.close(); + } + if (jedisPool != null) { + jedisPool.close(); + } + if (jedisSentinelPool != null) { + jedisSentinelPool.close(); + } + } +} diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/connection/JedisConnectorBuilder.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/connection/JedisConnectorBuilder.java new file mode 100644 index 0000000..ef04e55 --- /dev/null +++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/connection/JedisConnectorBuilder.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.redis.streams.sink.connection; + +import org.apache.flink.connector.redis.streams.sink.config.JedisClusterConfig; +import org.apache.flink.connector.redis.streams.sink.config.JedisConfig; +import org.apache.flink.connector.redis.streams.sink.config.JedisPoolConfig; +import org.apache.flink.connector.redis.streams.sink.config.JedisSentinelConfig; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +import java.util.Objects; + +/** The builder for {@link JedisConnector}. */ +public class JedisConnectorBuilder { + + /** + * Initialize the {@link JedisConnector} based on the instance type. + * + * @param jedisConfig configuration base + * @return @throws IllegalArgumentException if not valid configuration is provided + */ + public static JedisConnector build(JedisConfig jedisConfig) { + if (jedisConfig instanceof JedisPoolConfig) { + JedisPoolConfig jedisPoolConfig = (JedisPoolConfig) jedisConfig; + return JedisConnectorBuilder.build(jedisPoolConfig); + } else if (jedisConfig instanceof JedisClusterConfig) { + JedisClusterConfig jedisClusterConfig = (JedisClusterConfig) jedisConfig; + return JedisConnectorBuilder.build(jedisClusterConfig); + } else if (jedisConfig instanceof JedisSentinelConfig) { + JedisSentinelConfig jedisSentinelConfig = (JedisSentinelConfig) jedisConfig; + return JedisConnectorBuilder.build(jedisSentinelConfig); + } else { + throw new IllegalArgumentException("Jedis configuration not found"); + } + } + + /** + * Builds container for single Redis environment. + * + * @param jedisPoolConfig configuration for JedisPool + * @return container for single Redis environment + * @throws NullPointerException if jedisPoolConfig is null + */ + public static JedisConnector build(JedisPoolConfig jedisPoolConfig) { + Objects.requireNonNull(jedisPoolConfig, "Redis pool config should not be Null"); + + GenericObjectPoolConfig genericObjectPoolConfig = + getGenericObjectPoolConfig(jedisPoolConfig); + + JedisPool jedisPool = + new JedisPool( + genericObjectPoolConfig, + jedisPoolConfig.getHost(), + jedisPoolConfig.getPort(), + jedisPoolConfig.getConnectionTimeout(), + jedisPoolConfig.getPassword(), + jedisPoolConfig.getDatabase()); + return new JedisConnector(jedisPool); + } + + /** + * Builds container for Redis Cluster environment. + * + * @param jedisClusterConfig configuration for JedisCluster + * @return container for Redis Cluster environment + * @throws NullPointerException if jedisClusterConfig is null + */ + public static JedisConnector build(JedisClusterConfig jedisClusterConfig) { + Objects.requireNonNull(jedisClusterConfig, "Redis cluster config should not be Null"); + + GenericObjectPoolConfig genericObjectPoolConfig = + getGenericObjectPoolConfig(jedisClusterConfig); + + JedisCluster jedisCluster = + new JedisCluster( + jedisClusterConfig.getNodes(), + jedisClusterConfig.getConnectionTimeout(), + jedisClusterConfig.getConnectionTimeout(), + jedisClusterConfig.getMaxRedirections(), + jedisClusterConfig.getPassword(), + genericObjectPoolConfig); + return new JedisConnector(jedisCluster); + } + + /** + * Builds container for Redis Sentinel environment. + * + * @param jedisSentinelConfig configuration for JedisSentinel + * @return container for Redis sentinel environment + * @throws NullPointerException if jedisSentinelConfig is null + */ + public static JedisConnector build(JedisSentinelConfig jedisSentinelConfig) { + Objects.requireNonNull(jedisSentinelConfig, "Redis sentinel config should not be Null"); + + GenericObjectPoolConfig genericObjectPoolConfig = + getGenericObjectPoolConfig(jedisSentinelConfig); + + JedisSentinelPool jedisSentinelPool = + new JedisSentinelPool( + jedisSentinelConfig.getMasterName(), + jedisSentinelConfig.getSentinels(), + genericObjectPoolConfig, + jedisSentinelConfig.getConnectionTimeout(), + jedisSentinelConfig.getSoTimeout(), + jedisSentinelConfig.getPassword(), + jedisSentinelConfig.getDatabase()); + return new JedisConnector(jedisSentinelPool); + } + + public static GenericObjectPoolConfig getGenericObjectPoolConfig(JedisConfig jedisConfig) { + GenericObjectPoolConfig genericObjectPoolConfig = + jedisConfig.getTestWhileIdle() + ? new redis.clients.jedis.JedisPoolConfig() + : new GenericObjectPoolConfig<>(); + genericObjectPoolConfig.setMaxIdle(jedisConfig.getMaxIdle()); + genericObjectPoolConfig.setMaxTotal(jedisConfig.getMaxTotal()); + genericObjectPoolConfig.setMinIdle(jedisConfig.getMinIdle()); + genericObjectPoolConfig.setTestOnBorrow(jedisConfig.getTestOnBorrow()); + genericObjectPoolConfig.setTestOnReturn(jedisConfig.getTestOnReturn()); + + return genericObjectPoolConfig; + } +} diff --git a/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/BaseITCase.java b/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/BaseITCase.java new file mode 100644 index 0000000..39efb34 --- /dev/null +++ b/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/BaseITCase.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connector.redis.streams.sink; + +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import redis.clients.jedis.Jedis; + +@Testcontainers +public class BaseITCase { + + @Container + private GenericContainer redis = new GenericContainer<>(DockerImageName + .parse("redis:7.0.5-alpine")) + .withExposedPorts(6379); + + protected Jedis jedis; + + public static MiniClusterWithClientResource cluster = new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(2) + .setNumberTaskManagers(1) + .build()); + + @BeforeAll + public static void beforeAll() throws Exception { + cluster.before(); + } + + @AfterAll + public static void afterAll() { + cluster.after(); + } + + @BeforeEach + public void setUp() { + jedis = new Jedis(redisHost(), redisPort()); + } + + @AfterEach + public void cleanUp() { + jedis.close(); + } + + public String redisHost() { + return redis.getHost(); + } + + public Integer redisPort() { + return redis.getFirstMappedPort(); + } + +} diff --git a/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSinkTest.java b/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSinkTest.java new file mode 100644 index 0000000..ca6842f --- /dev/null +++ b/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSinkTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connector.redis.streams.sink; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.connector.redis.streams.sink.command.StreamRedisCommand; +import org.apache.flink.connector.redis.streams.sink.config.JedisPoolConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class RedisStreamsSinkTest extends BaseITCase { + @Test + public void testStreamCommand() throws Exception { + + JedisPoolConfig jedisConfig = new JedisPoolConfig.Builder() + .setHost(redisHost()) + .setPort(redisPort()) + .build(); + + RedisSerializer> serializer = input -> { + Map value = new HashMap<>(); + value.put(input.f1, input.f2); + return StreamRedisCommand.builder() + .withKey(input.f0) + .withValue(value) + .build(); + }; + + RedisStreamsSink> underTest = new RedisStreamsSink<>(jedisConfig, serializer); + + List> source = Arrays.asList( + Tuple3.of("one", "onekey", "onevalue"), + Tuple3.of("two", "firstkey", "firstvalue"), + Tuple3.of("two", "secontkey", "secondvalue")); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + env.fromCollection(source).sinkTo(underTest); + env.execute(); + + // verify results + assertEquals(1, jedis.xlen("one")); + assertEquals(2, jedis.xlen("two")); + + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index ceca4de..bb6b39d 100644 --- a/pom.xml +++ b/pom.xml @@ -43,6 +43,10 @@ under the License. + + flink-connector-redis-streams + + 1.16.0 15.0 From fdc3d79052f8cbf4d2b00882a71abc348a8de021 Mon Sep 17 00:00:00 2001 From: Joao Boto Date: Tue, 29 Nov 2022 15:44:19 +0100 Subject: [PATCH 3/3] [FLINK-15571] Change Redis Sink to AsyncSink implementation --- ...sCommand.java => RedisStreamsCommand.java} | 43 +++++-- ...ava => RedisStreamsCommandSerializer.java} | 12 +- .../redis/streams/sink/RedisStreamsSink.java | 58 +++++++-- .../sink/RedisStreamsStateSerializer.java | 117 ++++++++++++++++++ .../streams/sink/RedisStreamsWriter.java | 58 ++++----- .../streams/sink/command/RedisCommand.java | 26 ---- .../redis/streams/sink/BaseITCase.java | 21 ++-- .../streams/sink/RedisStreamsSinkTest.java | 63 +++++++--- .../sink/RedisStreamsStateSerializerTest.java | 81 ++++++++++++ 9 files changed, 372 insertions(+), 107 deletions(-) rename flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/{command/StreamRedisCommand.java => RedisStreamsCommand.java} (56%) rename flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/{RedisSerializer.java => RedisStreamsCommandSerializer.java} (79%) create mode 100644 flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsStateSerializer.java delete mode 100644 flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/command/RedisCommand.java create mode 100644 flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsStateSerializerTest.java diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/command/StreamRedisCommand.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsCommand.java similarity index 56% rename from flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/command/StreamRedisCommand.java rename to flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsCommand.java index d6abec5..0b98578 100644 --- a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/command/StreamRedisCommand.java +++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsCommand.java @@ -14,19 +14,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.connector.redis.streams.sink.command; +package org.apache.flink.connector.redis.streams.sink; import org.apache.flink.connector.redis.streams.sink.connection.JedisConnector; + import redis.clients.jedis.StreamEntryID; +import java.io.Serializable; import java.util.Map; -public class StreamRedisCommand implements RedisCommand { +/** A Redis Streams Command. */ +public class RedisStreamsCommand implements Serializable { + + private transient StreamEntryID streamId = null; public final String key; public final Map value; - private StreamRedisCommand(String key, Map value) { + private RedisStreamsCommand(String key, Map value) { this.key = key; this.value = value; } @@ -35,11 +40,33 @@ public static Builder builder() { return new Builder(); } - @Override public void send(JedisConnector connector) { - connector.getJedisCommands().xadd(key, StreamEntryID.NEW_ENTRY, value); + this.streamId = + connector + .getJedisCommands() + .xadd( + key, + (this.streamId != null) ? this.streamId : StreamEntryID.NEW_ENTRY, + value); + } + + public boolean sendCorrectly() { + return true; + } + + public boolean sendIncorrectly() { + return !sendCorrectly(); + } + + public long getMessageSize() { + return this.key.length() + + this.value.entrySet().stream() + .map(k -> k.getKey().length() + k.getValue().length()) + .reduce(Integer::sum) + .orElse(0); } + /** The builder for {@link RedisStreamsCommand}. */ public static class Builder { private String key; private Map value; @@ -54,8 +81,8 @@ public Builder withValue(Map value) { return this; } - public StreamRedisCommand build() { - return new StreamRedisCommand(key, value); + public RedisStreamsCommand build() { + return new RedisStreamsCommand(key, value); } } -} \ No newline at end of file +} diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisSerializer.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsCommandSerializer.java similarity index 79% rename from flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisSerializer.java rename to flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsCommandSerializer.java index 0f8262a..e1e98b0 100644 --- a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisSerializer.java +++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsCommandSerializer.java @@ -14,19 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.connector.redis.streams.sink; -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.connector.redis.streams.sink.command.RedisCommand; +package org.apache.flink.connector.redis.streams.sink; -import java.io.Serializable; +import org.apache.flink.connector.base.sink.writer.ElementConverter; /** * Function that creates the description how the input data should be mapped to redis type. * * @param The type of the element handled by this {@code RedisSerializer} */ -public interface RedisSerializer extends Function, Serializable { - - RedisCommand getMessage(T input); -} +public interface RedisStreamsCommandSerializer + extends ElementConverter {} diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSink.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSink.java index 93ff057..5b068c2 100644 --- a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSink.java +++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSink.java @@ -14,30 +14,72 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.connector.redis.streams.sink; -import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; import org.apache.flink.connector.redis.streams.sink.config.JedisConfig; import org.apache.flink.connector.redis.streams.sink.connection.JedisConnector; import org.apache.flink.connector.redis.streams.sink.connection.JedisConnectorBuilder; +import org.apache.flink.core.io.SimpleVersionedSerializer; import java.io.IOException; +import java.util.Collection; +import java.util.Collections; -public class RedisStreamsSink implements Sink { +/** + * A sink for publishing data into Redis. + * + * @param + */ +public class RedisStreamsSink extends AsyncSinkBase { private final JedisConfig jedisConfig; - private final RedisSerializer serializer; - public RedisStreamsSink(JedisConfig jedisConfig, RedisSerializer serializer) { + public RedisStreamsSink( + JedisConfig jedisConfig, + RedisStreamsCommandSerializer converter, + AsyncSinkWriterConfiguration asyncConfig) { + super( + converter, + asyncConfig.getMaxBatchSize(), + asyncConfig.getMaxInFlightRequests(), + asyncConfig.getMaxBufferedRequests(), + asyncConfig.getMaxBatchSizeInBytes(), + asyncConfig.getMaxTimeInBufferMS(), + asyncConfig.getMaxRecordSizeInBytes()); this.jedisConfig = jedisConfig; - this.serializer = serializer; } @Override - public SinkWriter createWriter(InitContext initContext) throws IOException { + public RedisStreamsWriter createWriter(InitContext initContext) throws IOException { + return restoreWriter(initContext, Collections.emptyList()); + } + + @Override + public RedisStreamsWriter restoreWriter( + InitContext initContext, + Collection> recoveredState) + throws IOException { + AsyncSinkWriterConfiguration asyncConfig = + AsyncSinkWriterConfiguration.builder() + .setMaxBatchSize(getMaxBatchSize()) + .setMaxBatchSizeInBytes(getMaxBatchSizeInBytes()) + .setMaxInFlightRequests(getMaxInFlightRequests()) + .setMaxBufferedRequests(getMaxBufferedRequests()) + .setMaxTimeInBufferMS(getMaxTimeInBufferMS()) + .setMaxRecordSizeInBytes(getMaxRecordSizeInBytes()) + .build(); JedisConnector connection = JedisConnectorBuilder.build(jedisConfig); - return new RedisStreamsWriter<>(connection, this.serializer); + return new RedisStreamsWriter<>( + connection, getElementConverter(), asyncConfig, initContext, recoveredState); } + @Override + public SimpleVersionedSerializer> + getWriterStateSerializer() { + return new RedisStreamsStateSerializer(); + } } diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsStateSerializer.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsStateSerializer.java new file mode 100644 index 0000000..1673c1e --- /dev/null +++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsStateSerializer.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.redis.streams.sink; + +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** The Redis implementation for {@link SimpleVersionedSerializer}. */ +public class RedisStreamsStateSerializer + implements SimpleVersionedSerializer> { + + @Override + public int getVersion() { + return 1; + } + + @Override + public byte[] serialize(BufferedRequestState obj) throws IOException { + Collection> bufferState = + obj.getBufferedRequestEntries(); + + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream out = new DataOutputStream(baos)) { + + out.writeInt(getVersion()); + out.writeInt(bufferState.size()); + + for (RequestEntryWrapper wrapper : bufferState) { + RedisStreamsCommand command = wrapper.getRequestEntry(); + writeString(out, command.key); + out.writeInt(command.value.size()); + for (Map.Entry entry : command.value.entrySet()) { + writeString(out, entry.getKey()); + writeString(out, entry.getValue()); + } + } + + out.flush(); + return baos.toByteArray(); + } + } + + @Override + public BufferedRequestState deserialize(int version, byte[] serialized) + throws IOException { + try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + final DataInputStream in = new DataInputStream(bais)) { + + int byteVersion = in.readInt(); + + int bufferSize = in.readInt(); + List> state = new ArrayList<>(); + for (int bs = 0; bs < bufferSize; bs++) { + String key = readString(in); + + int valueSize = in.readInt(); + Map values = new HashMap<>(); + for (int i = 0; i < valueSize; i++) { + String eKey = readString(in); + String eValue = readString(in); + values.put(eKey, eValue); + } + + RedisStreamsCommand command = + RedisStreamsCommand.builder().withKey(key).withValue(values).build(); + + state.add(new RequestEntryWrapper<>(command, command.getMessageSize())); + } + return new BufferedRequestState<>(state); + } + } + + private void writeString(final DataOutputStream out, String value) throws IOException { + out.writeInt(value.length()); + out.writeBytes(value); + } + + private String readString(final DataInputStream in) throws IOException { + int sizeToRead = in.readInt(); + byte[] bytesRead = new byte[sizeToRead]; + int sizeRead = in.read(bytesRead); + + if (sizeToRead != sizeRead) { + throw new IOException( + String.format("Expected to read %s but read %s", sizeToRead, sizeRead)); + } + + return new String(bytesRead); + } +} diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsWriter.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsWriter.java index 0601cb1..516d664 100644 --- a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsWriter.java +++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsWriter.java @@ -14,48 +14,50 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.connector.redis.streams.sink; -import org.apache.flink.api.connector.sink2.SinkWriter; -import org.apache.flink.connector.redis.streams.sink.command.RedisCommand; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; import org.apache.flink.connector.redis.streams.sink.connection.JedisConnector; -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Queue; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; -public class RedisStreamsWriter implements SinkWriter { +class RedisStreamsWriter extends AsyncSinkWriter { private final JedisConnector jedisConnector; - private final RedisSerializer serializer; - private final Queue queue = new ArrayDeque<>(); - public RedisStreamsWriter(JedisConnector jedisConnector, RedisSerializer serializer) { + public RedisStreamsWriter( + JedisConnector jedisConnector, + ElementConverter elementConverter, + AsyncSinkWriterConfiguration asyncConfig, + Sink.InitContext initContext, + Collection> recoveredState) { + super(elementConverter, initContext, asyncConfig, recoveredState); this.jedisConnector = jedisConnector; - this.serializer = serializer; } - @Override - public void write(T input, Context context) throws IOException, InterruptedException { - RedisCommand message = serializer.getMessage(input); - queue.add(message); - } - - @Override - public void flush(boolean endOfInput) throws IOException, InterruptedException { - flush(); - } - - private void flush() { - while(!this.queue.isEmpty()) { - RedisCommand element = this.queue.remove(); - element.send(this.jedisConnector); - } + protected void submitRequestEntries( + List requestEntries, + Consumer> requestResult) { + List errors = + requestEntries.stream() + .peek(command -> command.send(this.jedisConnector)) + .filter(RedisStreamsCommand::sendIncorrectly) + .collect(Collectors.toList()); + + requestResult.accept(errors); } @Override - public void close() throws Exception { - jedisConnector.close(); + protected long getSizeInBytes(RedisStreamsCommand command) { + return command.getMessageSize(); } } diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/command/RedisCommand.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/command/RedisCommand.java deleted file mode 100644 index bb6555f..0000000 --- a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/command/RedisCommand.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.connector.redis.streams.sink.command; - -import org.apache.flink.connector.redis.streams.sink.connection.JedisConnector; - -import java.io.Serializable; - -public interface RedisCommand extends Serializable { - void send(JedisConnector connector); - -} \ No newline at end of file diff --git a/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/BaseITCase.java b/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/BaseITCase.java index 39efb34..b7f32b2 100644 --- a/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/BaseITCase.java +++ b/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/BaseITCase.java @@ -14,10 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.connector.redis.streams.sink; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.test.util.MiniClusterWithClientResource; + import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -28,21 +30,23 @@ import org.testcontainers.utility.DockerImageName; import redis.clients.jedis.Jedis; +/** A redis container for testing. */ @Testcontainers public class BaseITCase { @Container - private GenericContainer redis = new GenericContainer<>(DockerImageName - .parse("redis:7.0.5-alpine")) - .withExposedPorts(6379); + private GenericContainer redis = + new GenericContainer<>(DockerImageName.parse("redis:7.0.5-alpine")) + .withExposedPorts(6379); protected Jedis jedis; - public static MiniClusterWithClientResource cluster = new MiniClusterWithClientResource( - new MiniClusterResourceConfiguration.Builder() - .setNumberSlotsPerTaskManager(2) - .setNumberTaskManagers(1) - .build()); + public static MiniClusterWithClientResource cluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(2) + .setNumberTaskManagers(1) + .build()); @BeforeAll public static void beforeAll() throws Exception { @@ -71,5 +75,4 @@ public String redisHost() { public Integer redisPort() { return redis.getFirstMappedPort(); } - } diff --git a/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSinkTest.java b/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSinkTest.java index ca6842f..90b2b03 100644 --- a/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSinkTest.java +++ b/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSinkTest.java @@ -14,45 +14,52 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.connector.redis.streams.sink; +import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.connector.redis.streams.sink.command.StreamRedisCommand; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; import org.apache.flink.connector.redis.streams.sink.config.JedisPoolConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.HashMap; import java.util.List; -import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; class RedisStreamsSinkTest extends BaseITCase { + @Test public void testStreamCommand() throws Exception { - JedisPoolConfig jedisConfig = new JedisPoolConfig.Builder() - .setHost(redisHost()) - .setPort(redisPort()) - .build(); + JedisPoolConfig jedisConfig = + new JedisPoolConfig.Builder().setHost(redisHost()).setPort(redisPort()).build(); - RedisSerializer> serializer = input -> { - Map value = new HashMap<>(); - value.put(input.f1, input.f2); - return StreamRedisCommand.builder() - .withKey(input.f0) - .withValue(value) - .build(); - }; + RedisStreamsCommandSerializer> serializer = + new TestCommandSerializer(); - RedisStreamsSink> underTest = new RedisStreamsSink<>(jedisConfig, serializer); + AsyncSinkWriterConfiguration asyncConfig = + AsyncSinkWriterConfiguration.builder() + .setMaxBatchSize(5) + .setMaxBatchSizeInBytes(1000) + .setMaxInFlightRequests(5) + .setMaxBufferedRequests(6) + .setMaxTimeInBufferMS(10000) + .setMaxRecordSizeInBytes(1000) + .build(); - List> source = Arrays.asList( - Tuple3.of("one", "onekey", "onevalue"), - Tuple3.of("two", "firstkey", "firstvalue"), - Tuple3.of("two", "secontkey", "secondvalue")); + RedisStreamsSink> underTest = + new RedisStreamsSink<>(jedisConfig, serializer, asyncConfig); + + List> source = + Arrays.asList( + Tuple3.of("one", "onekey", "onevalue"), + Tuple3.of("two", "firstkey", "firstvalue"), + Tuple3.of("two", "secontkey", "secondvalue")); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); @@ -62,6 +69,22 @@ public void testStreamCommand() throws Exception { // verify results assertEquals(1, jedis.xlen("one")); assertEquals(2, jedis.xlen("two")); + } + public static class TestCommandSerializer + implements RedisStreamsCommandSerializer> { + @Override + public RedisStreamsCommand apply( + Tuple3 input, SinkWriter.Context context) { + return RedisStreamsCommand.builder() + .withKey(input.f0) + .withValue( + new HashMap() { + { + put(input.f1, input.f2); + } + }) + .build(); + } } -} \ No newline at end of file +} diff --git a/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsStateSerializerTest.java b/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsStateSerializerTest.java new file mode 100644 index 0000000..7e1863e --- /dev/null +++ b/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsStateSerializerTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.redis.streams.sink; + +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class RedisStreamsStateSerializerTest { + + @Test + void testSerDe() throws IOException { + + RedisStreamsCommand command1 = + RedisStreamsCommand.builder() + .withKey("test1") + .withValue( + new HashMap() { + { + put("first", "1"); + put("second", "2"); + } + }) + .build(); + + RedisStreamsCommand command2 = + RedisStreamsCommand.builder() + .withKey("test2") + .withValue( + new HashMap() { + { + put("third", "3"); + put("fourth", "4"); + } + }) + .build(); + + List> state = new ArrayList<>(); + state.add(new RequestEntryWrapper<>(command1, command1.getMessageSize())); + state.add(new RequestEntryWrapper<>(command2, command2.getMessageSize())); + + RedisStreamsStateSerializer serializer = new RedisStreamsStateSerializer(); + + byte[] serialized = serializer.serialize(new BufferedRequestState<>(state)); + BufferedRequestState deserialized = + serializer.deserialize(1, serialized); + assertEquals(2, deserialized.getBufferedRequestEntries().size()); + + RedisStreamsCommand newCommand = + deserialized.getBufferedRequestEntries().get(0).getRequestEntry(); + assertEquals(command1.key, newCommand.key); + assertEquals(command1.value, newCommand.value); + + newCommand = deserialized.getBufferedRequestEntries().get(1).getRequestEntry(); + assertEquals(command2.key, newCommand.key); + assertEquals(command2.value, newCommand.value); + } +}