From 1e78debe4f4c9fca041650bfa5eb3fa287ad718a Mon Sep 17 00:00:00 2001 From: sintetico82 Date: Wed, 26 Jun 2019 12:32:06 +0200 Subject: [PATCH] Added the optional message.key value for using the Kafka key with the generated messages. For now, it can be only a fixed value. --- .../performance/DatagenConnectorConfig.java | 14 ++++++++++++++ .../connect/datagen/performance/DatagenTask.java | 11 ++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/github/xushiyan/kafka/connect/datagen/performance/DatagenConnectorConfig.java b/src/main/java/com/github/xushiyan/kafka/connect/datagen/performance/DatagenConnectorConfig.java index b35d06c..81ca35c 100644 --- a/src/main/java/com/github/xushiyan/kafka/connect/datagen/performance/DatagenConnectorConfig.java +++ b/src/main/java/com/github/xushiyan/kafka/connect/datagen/performance/DatagenConnectorConfig.java @@ -41,6 +41,10 @@ public class DatagenConnectorConfig extends AbstractConfig { private static final String POLL_INTERVAL_DISPLAY = "Poll Interval"; private static final int POLL_INTERVAL_DEFAULT = 10000; + private static final String MESSAGE_KEY_NAME_CONFIG = "message.key"; + private static final String MESSAGE_KEY_NAME_DOC = "Key of the message to be used for each message (optional)."; + private static final String MESSAGE_KEY_NAME_DISPLAY = "Key message"; + private static final String MESSAGE_TEMPLATE_CONFIG = "message.template"; private static final String MESSAGE_TEMPLATE_DOC = "Message template to be used for each message."; private static final String MESSAGE_TEMPLATE_DISPLAY = "Message Template"; @@ -78,6 +82,14 @@ public class DatagenConnectorConfig extends AbstractConfig { CONNECTOR_GROUP, 4, ConfigDef.Width.LONG, POLL_INTERVAL_DISPLAY) + .define(MESSAGE_KEY_NAME_CONFIG, + Type.STRING, + null, // Default value + Importance.MEDIUM, + MESSAGE_KEY_NAME_DOC, + CONNECTOR_GROUP, 5, + ConfigDef.Width.LONG, + MESSAGE_KEY_NAME_DISPLAY) .define(MESSAGE_TEMPLATE_CONFIG, Type.STRING, Importance.MEDIUM, @@ -104,6 +116,7 @@ public class DatagenConnectorConfig extends AbstractConfig { public final String topicName; public final int pollSize; public final int pollInterval; + public final String messageKey; public final String messageTemplate; public final List randomFields; public final String eventTimestampField; @@ -113,6 +126,7 @@ public DatagenConnectorConfig(ConfigDef definition, Map originals) { this.topicName = getString(TOPIC_NAME_CONFIG); this.pollSize = getInt(POLL_SIZE_CONFIG); this.pollInterval = getInt(POLL_INTERVAL_CONFIG); + this.messageKey = getString(MESSAGE_KEY_NAME_CONFIG); this.messageTemplate = getString(MESSAGE_TEMPLATE_CONFIG); this.randomFields = getList(RANDOM_FIELDS_CONFIG); this.eventTimestampField = getString(EVENT_TIMESTAMP_FIELD_CONFIG); diff --git a/src/main/java/com/github/xushiyan/kafka/connect/datagen/performance/DatagenTask.java b/src/main/java/com/github/xushiyan/kafka/connect/datagen/performance/DatagenTask.java index edef7ea..840f9c1 100644 --- a/src/main/java/com/github/xushiyan/kafka/connect/datagen/performance/DatagenTask.java +++ b/src/main/java/com/github/xushiyan/kafka/connect/datagen/performance/DatagenTask.java @@ -79,7 +79,16 @@ public List poll() throws InterruptedException { msg.addProperty(config.eventTimestampField, nanos); - records.add(new SourceRecord(sourcePartition, sourceOffset, config.topicName, Schema.STRING_SCHEMA, gson.toJson(msg))); + if(config.messageKey != null && config.messageKey.length() != 0 ) { + + records.add(new SourceRecord(sourcePartition, sourceOffset, config.topicName, Schema.STRING_SCHEMA, config.messageKey, Schema.STRING_SCHEMA, gson.toJson(msg))); + + } else { + + records.add(new SourceRecord(sourcePartition, sourceOffset, config.topicName, Schema.STRING_SCHEMA, gson.toJson(msg))); + } + + } return records;