Skip to content

Messages are received with method.redelivered=True on first delivery #1132

Open
@abdullahenesoncu-s4e

Description

@abdullahenesoncu-s4e

Description

I’m using LavinMQ with pika in Python, and encountering an issue where all messages received by a consumer have redelivered=True on their first delivery, even though they’ve never been NACKed, rejected, or requeued.

Code to reproduce

import pika
import json
import threading
import time

RABBIT_MQ_HOST = "localhost"
RABBITMQ_DEFAULT_USER = "guest"
RABBITMQ_DEFAULT_PASS = "guest"

EXCHANGE = 'scan_exchange'
QUEUE = 'scan_output_queue'
ROUTING_KEY = 'scan_output_routing'


def get_connection():
    return pika.BlockingConnection(
        pika.ConnectionParameters(
            host=RABBIT_MQ_HOST,
            port=5672,
            credentials=pika.PlainCredentials(RABBITMQ_DEFAULT_USER, RABBITMQ_DEFAULT_PASS),
            heartbeat=3600,
        )
    )


def setup_channel(channel):
    channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct')
    channel.queue_declare(queue=QUEUE, durable=True, arguments={'x-max-priority': 5})
    channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY)
    channel.basic_qos(prefetch_count=1)


def produce_messages():
    connection = get_connection()
    channel = connection.channel()
    setup_channel(channel)

    for priority in [1, 2, 3]:
        message = {'hello': 'world', 'priority': priority}
        channel.basic_publish(
            exchange=EXCHANGE,
            routing_key=ROUTING_KEY,
            body=json.dumps(message),
            properties=pika.BasicProperties(priority=priority)
        )
        print(f"Produced: {message}")

    connection.close()


def consume_messages():
    connection = get_connection()
    channel = connection.channel()
    setup_channel(channel)

    def callback(ch, method, properties, body):
        print(f"Received: {body.decode()} | Redelivered: {method.redelivered}")
        time.sleep(1)
        ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_consume(queue=QUEUE, on_message_callback=callback)
    print("Consumer started.")
    channel.start_consuming()


# Run producer
produce_messages()

threading.Thread(target=consume_messages, daemon=True).start()

time.sleep(10)

Observed output

Produced: {'hello': 'world', 'priority': 1}
Produced: {'hello': 'world', 'priority': 2}
Produced: {'hello': 'world', 'priority': 3}
Consumer started.
Received: {"hello": "world", "priority": 3} | Redelivered: True
Received: {"hello": "world", "priority": 2} | Redelivered: True
Received: {"hello": "world", "priority": 1} | Redelivered: True

Expected behavior

On first delivery of each message, method.redelivered should be False.

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions