@@ -59,29 +59,27 @@ def main(rab_host, rab_port):
59
59
send_channel = connection .channel ()
60
60
61
61
# declare exchange
62
- # TODO maybe you need to check if it already exists!
63
62
send_channel .exchange_declare (exchange = 'capture' ,
64
63
exchange_type = 'topic' ,
65
64
durable = True ,)
66
65
67
- # create queue for receiving raw capture data
68
- # TODO you probably do not want a random queue, but one which is used by multiple workers!!!
69
- result = recv_channel . queue_declare ( queue = 'eewids-parser' , exclusive = False )
70
- queue_name = result . method . queue
71
- #recv_queue = recv_channel.queue.declare(exclusive=True)
66
+ # declare queue for receiving raw capture data
67
+ recv_channel . queue_declare ( queue = 'eewids-parser' ,
68
+ durable = True ,
69
+ exclusive = False )
70
+
72
71
# TODO change routing key on capture tool and here, make it meaningful and global/as argument
73
- # FIXME do not abort if it does not exist
74
- recv_channel .queue_bind (exchange = 'capture-raw' , queue = queue_name , routing_key = '*' )
72
+ recv_channel .queue_bind (exchange = 'capture-raw' , queue = 'eewids-parser' , routing_key = '*' )
75
73
76
74
def callback (ch , method , properties , body ):
77
75
78
76
parsed_data = p .packet_parse (body )
79
77
distribute (parsed_data , send_channel )
78
+ ch .basic_ack (delivery_tag = method .delivery_tag )
80
79
81
- # TODO look up right arguments to make sure that this is dividable through multiple workers!
80
+ recv_channel . basic_qos ( prefetch_count = 1 )
82
81
recv_channel .basic_consume (on_message_callback = callback ,
83
- queue = queue_name ,
84
- auto_ack = True )
82
+ queue = 'eewids-parser' )
85
83
86
84
print (' [*] Starting parsing. To exit press CTRL+C' )
87
85
recv_channel .start_consuming ()
0 commit comments