Skip to content

Commit 703e949

Browse files
committed
Made Kafka topic configurable via KAFKA_TOPIC environment variable for producer and consumer.
- Updated `producer.py` and `consumer.py` to support configurable topic names. - Refactored `Dockerfiles` to include `KAFKA_TOPIC` environment variable setup. - Modified `docker-compose.yml` to pass `KAFKA_TOPIC` to all relevant services.
1 parent c4fdd55 commit 703e949

File tree

5 files changed

+18
-24
lines changed

5 files changed

+18
-24
lines changed

consumer.Dockerfile

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,9 @@ RUN pip install --no-cache-dir -r requirements.txt
1010
COPY logger.py .
1111
COPY consumer.py .
1212

13-
# Set environment variable for Kafka connection
13+
# Set environment variables for Kafka connection
1414
ENV KAFKA_BOOTSTRAP_SERVERS=kafka:9092
15-
16-
# Modify the consumer.py to use the environment variable
17-
RUN sed -i "s/'bootstrap_servers': .*/'bootstrap_servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),/" consumer.py
18-
19-
# Add import os at the top of the file
20-
RUN sed -i '1s/^/import os\n/' consumer.py
15+
ENV KAFKA_TOPIC=test-topic
2116

2217
# Command to run the consumer
2318
# The entrypoint allows passing additional arguments to the consumer

consumer.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
It can handle both JSON and plain text messages, providing a polyglot consumer
1919
that's useful in environments where different systems produce data in different formats.
2020
21-
The consumer connects to a local Kafka broker, subscribes to 'test-topic',
21+
The consumer connects to a Kafka broker (configurable via KAFKA_BOOTSTRAP_SERVERS environment variable),
22+
subscribes to a topic (configurable via KAFKA_TOPIC environment variable, defaults to 'test-topic'),
2223
and processes incoming messages until interrupted with Ctrl+C.
2324
2425
Usage:
@@ -51,9 +52,10 @@ def try_parse_json(value: bytes):
5152

5253
def main():
5354
bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
55+
topic = os.environ.get("KAFKA_TOPIC", "test-topic")
5456

5557
# Initialize the Kafka consumer with configuration
56-
# - 'test-topic': The Kafka topic to subscribe to
58+
# - topic: The Kafka topic to subscribe to (from environment variable)
5759
# - bootstrap_servers: Connection string for the Kafka broker
5860
# - auto_offset_reset='earliest': Start reading from the beginning of the topic if no committed offset exists
5961
# - enable_auto_commit=True: Automatically commit offsets
@@ -67,11 +69,9 @@ def main():
6769
consumer_args['group_id'] = args.group_id
6870

6971
if args.test_mode:
70-
consumer_timeout_ms = 3000 # pragma: no cover
71-
else:
72-
consumer_timeout_ms = None
72+
consumer_args['consumer_timeout_ms'] = 3000 # pragma: no cover
7373

74-
consumer = KafkaConsumer('test-topic', consumer_timeout_ms=consumer_timeout_ms, **consumer_args)
74+
consumer = KafkaConsumer(topic, **consumer_args)
7575

7676
logger = get_logger("consumer")
7777

docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ services:
4646
condition: service_completed_successfully
4747
environment:
4848
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
49+
KAFKA_TOPIC: test-topic
4950
restart: on-failure
5051

5152
consumer:
@@ -60,6 +61,7 @@ services:
6061
condition: service_completed_successfully
6162
environment:
6263
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
64+
KAFKA_TOPIC: test-topic
6365
restart: on-failure
6466
# You can pass arguments to the consumer here
6567
# command: ["--group-id", "demo-group"]

producer.Dockerfile

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,8 @@ RUN pip install --no-cache-dir -r requirements.txt
1010
COPY logger.py .
1111
COPY producer.py .
1212

13-
# Set environment variable for Kafka connection
13+
# Set environment variables for Kafka connection
1414
ENV KAFKA_BOOTSTRAP_SERVERS=kafka:9092
15-
16-
# Modify the producer.py to use the environment variable
17-
RUN sed -i 's/bootstrap_servers=.*/bootstrap_servers=os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"),/' producer.py
18-
19-
# Add import os at the top of the file
20-
RUN sed -i '1s/^/import os\n/' producer.py
15+
ENV KAFKA_TOPIC=test-topic
2116

2217
CMD ["python", "-u", "producer.py"]

producer.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515
Apache Kafka Producer Demo
1616
1717
This script demonstrates how to produce JSON messages to an Apache Kafka topic.
18-
It creates a series of simple JSON messages and sends them to 'test-topic' on
19-
a local Kafka broker.
18+
It creates a series of simple JSON messages and sends them to a topic
19+
(configurable via KAFKA_TOPIC environment variable, defaults to 'test-topic')
20+
on a Kafka broker (configurable via KAFKA_BOOTSTRAP_SERVERS environment variable).
2021
2122
The producer is configured to automatically serialize Python dictionaries to JSON
2223
before sending them to Kafka.
@@ -30,6 +31,7 @@
3031

3132
def main():
3233
bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
34+
topic = os.environ.get("KAFKA_TOPIC", "test-topic")
3335

3436
# Initialize the Kafka producer with configuration
3537
# - bootstrap_servers: Connection string for the Kafka broker
@@ -48,8 +50,8 @@ def main():
4850
# Create a simple message with an ID and text
4951
message = {"id": i, "event_type": event_type, "text": f"Note event {i} of type {event_type}"}
5052

51-
# Send the message to 'test-topic'
52-
producer.send('test-topic', key=event_type, value=message)
53+
# Send the message to the topic from the environment variable
54+
producer.send(topic, key=event_type, value=message)
5355

5456
# Print confirmation and wait 1 second between messages
5557
logger.info(f"Sent: key={event_type} | value={message}")

0 commit comments

Comments
 (0)