Skip to content

Commit 6321133

Browse files
committed
Refactored tests and scripts for better modularity and reusability
- Introduced a reusable Kafka container fixture for integration tests using Testcontainers. - Modularized functions in `consumer.py` and `producer.py` for better testability. - Refactored test cases to use shared Kafka fixture, reducing redundancy. - Enhanced argument parsing in `consumer.py` for improved clarity and functionality. - Simplified test logic for producer and consumer interaction.
1 parent 703e949 commit 6321133

File tree

3 files changed

+120
-94
lines changed

3 files changed

+120
-94
lines changed

consumer.py

Lines changed: 43 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@
1111
import argparse
1212
import json
1313

14+
parser = argparse.ArgumentParser(description="Kafka Event Consumer")
15+
parser.add_argument("-e", "--event-type", help="Filter by event_type (optional)", required=False)
16+
parser.add_argument("-g", "--group-id", help="Kafka consumer group ID (optional)", required=False)
17+
parser.add_argument("-t", "--test-mode", action="store_true")
18+
args = parser.parse_args()
19+
1420
"""
1521
Apache Kafka Consumer Demo
1622
@@ -43,39 +49,24 @@ def try_parse_json(value: bytes):
4349
return value.decode('utf-8', errors='replace')
4450

4551

46-
parser = argparse.ArgumentParser(description="Kafka Event Consumer")
47-
parser.add_argument("-e", "--event-type", help="Filter by event_type (optional)", required=False)
48-
parser.add_argument("-g", "--group-id", help="Kafka consumer group ID (optional)", required=False)
49-
parser.add_argument("-t", "--test-mode", action="store_true")
50-
args = parser.parse_args()
51-
52-
53-
def main():
54-
bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
55-
topic = os.environ.get("KAFKA_TOPIC", "test-topic")
56-
57-
# Initialize the Kafka consumer with configuration
58-
# - topic: The Kafka topic to subscribe to (from environment variable)
59-
# - bootstrap_servers: Connection string for the Kafka broker
60-
# - auto_offset_reset='earliest': Start reading from the beginning of the topic if no committed offset exists
61-
# - enable_auto_commit=True: Automatically commit offsets
62-
consumer_args = {
63-
'bootstrap_servers': bootstrap_servers,
64-
'auto_offset_reset': 'earliest',
65-
'enable_auto_commit': True
66-
}
67-
68-
if args.group_id:
69-
consumer_args['group_id'] = args.group_id
52+
def consume_events(topic, consumer_args, event_type=None, group_id=None):
53+
"""
54+
Consume messages from a Kafka topic.
7055
71-
if args.test_mode:
72-
consumer_args['consumer_timeout_ms'] = 3000 # pragma: no cover
56+
This function creates a Kafka consumer with the provided arguments and listens
57+
to a specified topic for incoming messages. Messages are parsed as JSON where
58+
possible and logged appropriately. Filtering is available based on an optional
59+
event type attribute in the message.
7360
61+
:param topic: Kafka topic to consume messages from.
62+
:param consumer_args: Dictionary of arguments to configure the KafkaConsumer.
63+
:param event_type: Optional. Filters messages by the `event_type` attribute if it's included in the message payload.
64+
"""
7465
consumer = KafkaConsumer(topic, **consumer_args)
7566

7667
logger = get_logger("consumer")
7768

78-
logger.info(f"Polyglot consumer listening, consumer group: {args.group_id}\n")
69+
logger.info(f"Polyglot consumer listening, consumer group: {group_id}\n")
7970

8071
try:
8172
# Continuously poll for new messages
@@ -90,7 +81,7 @@ def main():
9081

9182
# Display the message with an appropriate prefix based on its type
9283
if isinstance(parsed, dict):
93-
if args.event_type and parsed.get("event_type") != args.event_type:
84+
if event_type and parsed.get("event_type") != event_type:
9485
continue # Skip non-matching event
9586
logger.info(
9687
f"✅ JSON ({parsed['event_type']}) | key={key} | partition={partition} | offset={offset}{parsed}")
@@ -104,5 +95,29 @@ def main():
10495
consumer.close()
10596

10697

98+
def main():
99+
"""Main function that runs when the script is executed directly"""
100+
kafka_topic = os.environ.get("KAFKA_TOPIC", "test-topic")
101+
kafka_bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
102+
103+
# Initialize the Kafka consumer with configuration
104+
# - bootstrap_servers: Connection string for the Kafka broker
105+
# - auto_offset_reset='earliest': Start reading from the beginning of the topic if no committed offset exists
106+
# - enable_auto_commit=True: Automatically commit offsets
107+
kafka_consumer_args = {
108+
'bootstrap_servers': kafka_bootstrap_servers,
109+
'auto_offset_reset': 'earliest',
110+
'enable_auto_commit': True
111+
}
112+
113+
if args.group_id:
114+
kafka_consumer_args['group_id'] = args.group_id
115+
116+
if args.test_mode:
117+
kafka_consumer_args['consumer_timeout_ms'] = 3000 # pragma: no cover
118+
119+
consume_events(kafka_topic, kafka_consumer_args, args.event_type, args.group_id)
120+
121+
107122
if __name__ == "__main__":
108123
main()

producer.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,7 @@
2929
EVENT_TYPES = ["note_created", "note_updated", "note_deleted"]
3030

3131

32-
def main():
33-
bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
34-
topic = os.environ.get("KAFKA_TOPIC", "test-topic")
35-
32+
def produce_events(bootstrap_servers, topic):
3633
# Initialize the Kafka producer with configuration
3734
# - bootstrap_servers: Connection string for the Kafka broker
3835
# - value_serializer: Function to convert Python objects to bytes
@@ -61,5 +58,13 @@ def main():
6158
producer.flush()
6259

6360

61+
def main():
62+
"""Main function that runs when the script is executed directly"""
63+
kafka_bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
64+
kafka_topic = os.environ.get("KAFKA_TOPIC", "test-topic")
65+
66+
produce_events(kafka_bootstrap_servers, kafka_topic)
67+
68+
6469
if __name__ == "__main__":
6570
main()

tests/test_cli_integration.py

Lines changed: 68 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -3,77 +3,83 @@
33
import time
44
import os
55
from kafka import KafkaProducer
6+
import pytest
67

78

8-
def test_producer_and_consumer_via_scripts(tmp_path):
9-
topic = "test-topic"
10-
9+
@pytest.fixture(scope="module")
10+
def kafka_container():
11+
"""Fixture that provides a reusable Kafka container for all tests."""
1112
with KafkaContainer(image="confluentinc/cp-kafka:7.9.2") as kafka:
12-
bootstrap_servers = kafka.get_bootstrap_server()
13+
yield kafka
1314

14-
env = os.environ.copy()
15-
env["KAFKA_BOOTSTRAP_SERVERS"] = bootstrap_servers
16-
env["KAFKA_TOPIC"] = topic
1715

18-
# 1. Run the producer
19-
subprocess.run(
20-
["python", "producer.py"],
21-
env=env,
22-
check=True,
23-
)
16+
def test_producer_and_consumer_via_scripts(tmp_path, kafka_container):
17+
topic = "test-topic-producer-consumer-scripts"
2418

25-
time.sleep(1)
19+
bootstrap_servers = kafka_container.get_bootstrap_server()
2620

27-
# 2. Capture consumer output
28-
env["COVERAGE_PROCESS_START"] = ".coveragerc" # Enable coverage for subprocess
21+
env = os.environ.copy()
22+
env["KAFKA_BOOTSTRAP_SERVERS"] = bootstrap_servers
23+
env["KAFKA_TOPIC"] = topic
2924

30-
result = subprocess.run(
31-
["python", "consumer.py", "--test-mode"],
32-
env=env,
33-
check=True,
34-
capture_output=True,
35-
text=True,
36-
timeout=10,
37-
)
25+
# 1. Run the producer
26+
subprocess.run(
27+
["python", "producer.py"],
28+
env=env,
29+
check=True,
30+
)
3831

39-
assert "note event" in result.stdout.lower()
32+
time.sleep(1)
4033

34+
# 2. Capture consumer output
35+
env["COVERAGE_PROCESS_START"] = ".coveragerc" # Enable coverage for subprocess
4136

42-
def test_plain_text_consumer(tmp_path):
43-
topic = "test-topic"
37+
result = subprocess.run(
38+
["python", "consumer.py", "--test-mode"],
39+
env=env,
40+
check=True,
41+
capture_output=True,
42+
text=True,
43+
timeout=10,
44+
)
4445

45-
with KafkaContainer(image="confluentinc/cp-kafka:7.9.2") as kafka:
46-
bootstrap_servers = kafka.get_bootstrap_server()
47-
48-
# Create a producer that sends plain text messages
49-
producer = KafkaProducer(
50-
bootstrap_servers=bootstrap_servers,
51-
key_serializer=lambda k: k.encode('utf-8') if k else None,
52-
value_serializer=lambda v: v.encode('utf-8') # Plain text serializer
53-
)
54-
55-
# Send a plain text message
56-
plain_text_message = "This is a plain text message"
57-
producer.send(topic, key="plain-text-key", value=plain_text_message)
58-
producer.flush()
59-
60-
time.sleep(1)
61-
62-
# Run the consumer and capture its output
63-
env = os.environ.copy()
64-
env["KAFKA_BOOTSTRAP_SERVERS"] = bootstrap_servers
65-
env["KAFKA_TOPIC"] = topic
66-
env["COVERAGE_PROCESS_START"] = ".coveragerc" # Enable coverage for subprocess
67-
68-
result = subprocess.run(
69-
["python", "consumer.py", "--test-mode"],
70-
env=env,
71-
check=True,
72-
capture_output=True,
73-
text=True,
74-
timeout=10,
75-
)
76-
77-
# Verify that the plain text message was processed correctly
78-
assert "📦 Plain" in result.stdout
79-
assert plain_text_message in result.stdout
46+
assert "note event" in result.stdout.lower()
47+
48+
49+
def test_plain_text_consumer(tmp_path, kafka_container):
50+
topic = "test-topic-plain-text-consumer"
51+
52+
bootstrap_servers = kafka_container.get_bootstrap_server()
53+
54+
# Create a producer that sends plain text messages
55+
producer = KafkaProducer(
56+
bootstrap_servers=bootstrap_servers,
57+
key_serializer=lambda k: k.encode('utf-8') if k else None,
58+
value_serializer=lambda v: v.encode('utf-8') # Plain text serializer
59+
)
60+
61+
# Send a plain text message
62+
plain_text_message = "This is a plain text message"
63+
producer.send(topic, key="plain-text-key", value=plain_text_message)
64+
producer.flush()
65+
66+
time.sleep(1)
67+
68+
# Run the consumer and capture its output
69+
env = os.environ.copy()
70+
env["KAFKA_BOOTSTRAP_SERVERS"] = bootstrap_servers
71+
env["KAFKA_TOPIC"] = topic
72+
env["COVERAGE_PROCESS_START"] = ".coveragerc" # Enable coverage for subprocess
73+
74+
result = subprocess.run(
75+
["python", "consumer.py", "--test-mode"],
76+
env=env,
77+
check=True,
78+
capture_output=True,
79+
text=True,
80+
timeout=10,
81+
)
82+
83+
# Verify that the plain text message was processed correctly
84+
assert "📦 Plain" in result.stdout
85+
assert plain_text_message in result.stdout

0 commit comments

Comments
 (0)