2
2
import subprocess
3
3
import time
4
4
import os
5
+ from kafka import KafkaProducer
5
6
6
7
7
8
def test_producer_and_consumer_via_scripts (tmp_path ):
@@ -24,6 +25,8 @@ def test_producer_and_consumer_via_scripts(tmp_path):
24
25
time .sleep (1 )
25
26
26
27
# 2. Capture consumer output
28
+ env ["COVERAGE_PROCESS_START" ] = ".coveragerc" # Enable coverage for subprocess
29
+
27
30
result = subprocess .run (
28
31
["python" , "consumer.py" , "--test-mode" ],
29
32
env = env ,
@@ -34,3 +37,43 @@ def test_producer_and_consumer_via_scripts(tmp_path):
34
37
)
35
38
36
39
assert "note event" in result .stdout .lower ()
40
+
41
+
42
+ def test_plain_text_consumer (tmp_path ):
43
+ topic = "test-topic"
44
+
45
+ with KafkaContainer (image = "confluentinc/cp-kafka:7.9.2" ) as kafka :
46
+ bootstrap_servers = kafka .get_bootstrap_server ()
47
+
48
+ # Create a producer that sends plain text messages
49
+ producer = KafkaProducer (
50
+ bootstrap_servers = bootstrap_servers ,
51
+ key_serializer = lambda k : k .encode ('utf-8' ) if k else None ,
52
+ value_serializer = lambda v : v .encode ('utf-8' ) # Plain text serializer
53
+ )
54
+
55
+ # Send a plain text message
56
+ plain_text_message = "This is a plain text message"
57
+ producer .send (topic , key = "plain-text-key" , value = plain_text_message )
58
+ producer .flush ()
59
+
60
+ time .sleep (1 )
61
+
62
+ # Run the consumer and capture its output
63
+ env = os .environ .copy ()
64
+ env ["KAFKA_BOOTSTRAP_SERVERS" ] = bootstrap_servers
65
+ env ["KAFKA_TOPIC" ] = topic
66
+ env ["COVERAGE_PROCESS_START" ] = ".coveragerc" # Enable coverage for subprocess
67
+
68
+ result = subprocess .run (
69
+ ["python" , "consumer.py" , "--test-mode" ],
70
+ env = env ,
71
+ check = True ,
72
+ capture_output = True ,
73
+ text = True ,
74
+ timeout = 10 ,
75
+ )
76
+
77
+ # Verify that the plain text message was processed correctly
78
+ assert "📦 Plain" in result .stdout
79
+ assert plain_text_message in result .stdout
0 commit comments