Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ venv_examples
.coverage
**/coverage.xml
**/test-report.xml
*.ducktape
11 changes: 7 additions & 4 deletions tests/ducktape/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ Ducktape-based producer tests for the Confluent Kafka Python client.
# Run all tests
./tests/ducktape/run_ducktape_test.py

# Run specific test
./tests/ducktape/run_ducktape_test.py SimpleProducerTest.test_basic_produce
# Run all tests in a file
./tests/ducktape/run_ducktape_test.py test.producer.py

# Run a specific test
./tests/ducktape/run_ducktape_test.py test.producer.py SimpleProducerTest.test_basic_produce
```

## Test Cases

- **test_basic_produce**: Basic message production with callbacks
- **test_produce_multiple_batches**: Parameterized tests (5, 10, 50 messages)
- **test_produce_with_compression**: Matrix tests (none, gzip, snappy)
- **test_produce_multiple_batches**: Parameterized tests (5, 10, 50 messages)
- **test_produce_with_compression**: Matrix tests (none, gzip, snappy)
6 changes: 3 additions & 3 deletions tests/ducktape/run_ducktape_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def main():

# Get test file path (ducktape expects file paths, not module paths)
test_dir = os.path.dirname(os.path.abspath(__file__))
test_file = os.path.join(test_dir, "test_producer.py")
test_file = os.path.join(test_dir, sys.argv[1])

if not os.path.exists(test_file):
print(f"ERROR: Test file not found: {test_file}")
Expand All @@ -71,8 +71,8 @@ def main():
]

# Add specific test if provided as argument
if len(sys.argv) > 1:
test_method = sys.argv[1]
if len(sys.argv) > 2:
test_method = sys.argv[2]
cmd[-1] = f"{test_file}::{test_method}"
print(f"Running specific test: {test_method}")
else:
Expand Down
236 changes: 236 additions & 0 deletions tests/ducktape/test_producer_with_schema_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
import json
import time
from uuid import uuid4
from ducktape.tests.test import Test

from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry._sync.json_schema import JSONSerializer
from confluent_kafka.schema_registry._sync.protobuf import ProtobufSerializer
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import MessageField, SerializationContext, StringSerializer
from tests.ducktape.services.kafka import KafkaClient
from tests.integration.schema_registry.data.proto import PublicTestProto_pb2

try:
from confluent_kafka import Producer
except ImportError:
# Handle case where confluent_kafka is not installed
Producer = None
KafkaError = None

class SimpleProducerTestWithSchemaRegistry(Test):
"""Test producer functionality with Schema Registry integration"""

def __init__(self, test_context):
super(SimpleProducerTestWithSchemaRegistry, self).__init__(test_context=test_context)

# Set up Kafka client and Schema Registry client
self.kafka = KafkaClient(test_context, bootstrap_servers="localhost:9092")
self.schema_registry_client = SchemaRegistryClient({'url': 'http://localhost:8081'})

def setUp(self):
"""Set up test environment"""
self.logger.info("Verifying connection to external Kafka at localhost:9092")

if not self.kafka.verify_connection():
raise ConnectionError("Cannot connect to Kafka at localhost:9092. "
"Please ensure Kafka is running.")

self.logger.info("Successfully connected to Kafka")

def calculate_and_verify_results(self, start_time, messages_sent, delivered_messages, failed_messages):
"""Calculate throughput and verify results"""
actual_duration = time.time() - start_time
send_throughput = messages_sent / actual_duration
delivery_throughput = len(delivered_messages) / actual_duration

# Log results
self.logger.info("Time-based production results:")
self.logger.info(" Duration: %.2f seconds", actual_duration)
self.logger.info(" Messages sent: %d", messages_sent)
self.logger.info(" Messages delivered: %d", len(delivered_messages))
self.logger.info(" Messages failed: %d", len(failed_messages))
self.logger.info(" Send throughput: %.2f msg/s", send_throughput)
self.logger.info(" Delivery throughput: %.2f msg/s", delivery_throughput)

# Verify results
assert messages_sent > 0, "No messages were sent during test duration"
assert len(delivered_messages) > 0, "No messages were delivered"
assert send_throughput > 10, f"Send throughput too low: {send_throughput:.2f} msg/s"

def produce_messages(self, producer, topic_name, serializer, string_serializer, test_duration, message_value_func):
"""Produce messages using the given serializer"""
delivered_messages = []
failed_messages = []

def delivery_callback(err, msg):
"""Delivery report callback"""
if err is not None:
self.logger.error("Message delivery failed: %s", err)
failed_messages.append(err)
else:
delivered_messages.append(msg)

self.logger.info("Producing messages for %.1f seconds to topic %s", test_duration, topic_name)
start_time = time.time()
messages_sent = 0

while time.time() - start_time < test_duration:
message_value = message_value_func(messages_sent)
try:
producer.produce(
topic=topic_name,
key=string_serializer(str(uuid4())),
value=serializer(message_value, SerializationContext(topic_name, MessageField.VALUE)),
callback=delivery_callback
)
messages_sent += 1
if messages_sent % 100 == 0:
producer.poll(0)
except BufferError:
producer.poll(0.001)
continue

producer.flush(timeout=30)
self.calculate_and_verify_results(start_time, messages_sent, delivered_messages, failed_messages)

def test_basic_produce_with_avro_serialization(self):
"""Test producing messages with Avro serialization using Schema Registry"""
if Producer is None:
self.logger.error("confluent_kafka not available, skipping test")

Check failure on line 100 in tests/ducktape/test_producer_with_schema_registry.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/ducktape/test_producer_with_schema_registry.py#L100

Define a constant instead of duplicating this literal "confluent_kafka not available, skipping test" 3 times.
return

topic_name = "test-topic-schema-registry"
test_duration = 5.0 # 5 seconds

# Create topic
self.kafka.create_topic(topic_name, partitions=1, replication_factor=1)
topic_ready = self.kafka.wait_for_topic(topic_name, max_wait_time=30)
assert topic_ready, f"Topic {topic_name} was not created within timeout"

# Define Avro schema
avro_schema = {
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
avro_schema_str = json.dumps(avro_schema)

# Create serializers
string_serializer = StringSerializer('utf8')
avro_serializer = AvroSerializer(schema_registry_client=self.schema_registry_client, schema_str=avro_schema_str)

# Configure producer
producer_config = {
'bootstrap.servers': self.kafka.bootstrap_servers(),

Check failure on line 128 in tests/ducktape/test_producer_with_schema_registry.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/ducktape/test_producer_with_schema_registry.py#L128

Define a constant instead of duplicating this literal 'bootstrap.servers' 3 times.
'client.id': 'ducktape-test-producer-schema-registry',

Check failure on line 129 in tests/ducktape/test_producer_with_schema_registry.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/ducktape/test_producer_with_schema_registry.py#L129

Define a constant instead of duplicating this literal 'client.id' 3 times.
}

self.logger.info("Creating producer with config: %s", producer_config)

Check failure on line 132 in tests/ducktape/test_producer_with_schema_registry.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/ducktape/test_producer_with_schema_registry.py#L132

Define a constant instead of duplicating this literal "Creating producer with config: %s" 3 times.
producer = Producer(producer_config)

# Produce messages
self.produce_messages(
producer,
topic_name,
avro_serializer,
string_serializer,
test_duration,
lambda messages_sent: {'name': f"User{messages_sent}", 'age': messages_sent}
)

def test_basic_produce_with_json_serialization(self):
"""Test producing messages with JSON serialization using Schema Registry"""
if Producer is None:
self.logger.error("confluent_kafka not available, skipping test")
return

topic_name = "test-topic-json-serialization"
test_duration = 5.0 # 5 seconds

# Create topic
self.kafka.create_topic(topic_name, partitions=1, replication_factor=1)
topic_ready = self.kafka.wait_for_topic(topic_name, max_wait_time=30)
assert topic_ready, f"Topic {topic_name} was not created within timeout"

# Define JSON schema
json_schema = {
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "integer"}
},
"required": ["name", "age"]
}
json_schema_str = json.dumps(json_schema)

# Create serializers
string_serializer = StringSerializer('utf8')
json_serializer = JSONSerializer(json_schema_str, self.schema_registry_client)

# Configure producer
producer_config = {
'bootstrap.servers': self.kafka.bootstrap_servers(),
'client.id': 'ducktape-test-producer-json-serialization',
}

self.logger.info("Creating producer with config: %s", producer_config)
producer = Producer(producer_config)

# Produce messages
self.produce_messages(
producer,
topic_name,
json_serializer,
string_serializer,
test_duration,
lambda messages_sent: {'name': f"User{messages_sent}", 'age': messages_sent}
)

def test_basic_produce_with_protobuf_serialization(self):
"""Test producing messages with Protobuf serialization using Schema Registry"""
if Producer is None:
self.logger.error("confluent_kafka not available, skipping test")
return

topic_name = "test-topic-protobuf-serialization"
test_duration = 5.0 # 5 seconds

# Create topic
self.kafka.create_topic(topic_name, partitions=1, replication_factor=1)
topic_ready = self.kafka.wait_for_topic(topic_name, max_wait_time=30)
assert topic_ready, f"Topic {topic_name} was not created within timeout"

# Create serializers
string_serializer = StringSerializer('utf8')
protobuf_serializer = ProtobufSerializer(PublicTestProto_pb2.TestMessage, self.schema_registry_client)

# Configure producer
producer_config = {
'bootstrap.servers': self.kafka.bootstrap_servers(),
'client.id': 'ducktape-test-producer-protobuf-serialization',
}

self.logger.info("Creating producer with config: %s", producer_config)
producer = Producer(producer_config)

# Produce messages
self.produce_messages(
producer,
topic_name,
protobuf_serializer,
string_serializer,
test_duration,
lambda _: PublicTestProto_pb2.TestMessage(
test_string=f"example string",
test_bool=True,
test_bytes=b'example bytes',
test_double=1.0,
test_float=12.0,
test_fixed32=1,
test_fixed64=1,
)
)