-
Notifications
You must be signed in to change notification settings - Fork 557
Description
I'm trying to use this library in a test. What I want to do is send some messages and then exit. My broker will disconnect the mqtt connection if I try to send messages too early so I want to retry until I know the message is delivered so I can exit when that is done.
The issue is that token.Wait()
doesn't always wait until the message is actually delivered. Reading through issues, especially this comment from @MattBrittan it seems like this is "intended" behavior.
I'm using paho.mqtt.golang v1.3.5
Here is some code to reproduce the issue:
package main
import (
"fmt"
"log"
"os"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
func main() {
mqtt.ERROR = log.New(os.Stdout, "[ERROR] ", 0)
mqtt.CRITICAL = log.New(os.Stdout, "[CRIT] ", 0)
mqtt.WARN = log.New(os.Stdout, "[WARN] ", 0)
mqtt.DEBUG = log.New(os.Stdout, "[DEBUG] ", 0)
opts := mqtt.NewClientOptions().
AddBroker("tcp://localhost:1883").
SetClientID("tester").
SetUsername("hunter")
c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
token := c.Publish("test", 1, false, []byte{})
token.Wait()
if token.Error() != nil {
fmt.Printf("token.Error() = %+v\n", token.Error())
}
c.Disconnect(250)
}
and the logs:
[DEBUG] [client] Connect()
[DEBUG] [store] memorystore initialized
[DEBUG] [client] about to write new connect msg
[DEBUG] [client] socket connected to broker
[DEBUG] [client] Using MQTT 3.1.1 protocol
[DEBUG] [net] connect started
[DEBUG] [net] received connack
[DEBUG] [client] startCommsWorkers called
[DEBUG] [client] client is connected/reconnected
[DEBUG] [net] incoming started
[DEBUG] [net] startIncomingComms started
[DEBUG] [net] outgoing started
[DEBUG] [net] startComms started
[DEBUG] [pinger] keepalive starting
[DEBUG] [client] startCommsWorkers done
[DEBUG] [net] outgoing waiting for an outbound message
[WARN] [store] memorystore wiped
[DEBUG] [client] exit startClient
[DEBUG] [client] enter Publish
[DEBUG] [client] sending publish message, topic: test
[DEBUG] [net] obound msg to write 1
[DEBUG] [net] obound wrote msg, id: 1
[DEBUG] [net] outgoing waiting for an outbound message
[DEBUG] [net] logic waiting for msg on ibound
[DEBUG] [net] startIncomingComms: inboundFromStore complete
[DEBUG] [net] logic waiting for msg on ibound
[DEBUG] [net] incoming complete
[DEBUG] [net] startIncomingComms: got msg on ibound
[DEBUG] [net] logic waiting for msg on ibound
[DEBUG] [net] startIncomingComms: ibound complete
[DEBUG] [net] startIncomingComms goroutine complete
[DEBUG] [net] outgoing waiting for an outbound message
[ERROR] [client] Connect comms goroutine - error triggered EOF
[DEBUG] [client] internalConnLost called
[DEBUG] [client] stopCommsWorkers called
[DEBUG] [pinger] keepalive stopped
[DEBUG] [router] matchAndDispatch exiting
[DEBUG] [client] stopCommsWorkers waiting for workers
[DEBUG] [net] outgoing waiting for an outbound message
[DEBUG] [client] internalConnLost waiting on workers
[DEBUG] [net] outgoing waiting for an outbound message
[DEBUG] [net] outgoing comms stopping
[DEBUG] [net] startComms closing outError
[DEBUG] [client] startCommsWorkers output redirector finished
[DEBUG] [client] incoming comms goroutine done
[DEBUG] [client] stopCommsWorkers waiting for comms
[DEBUG] [client] stopCommsWorkers done
[DEBUG] [client] internalConnLost workers stopped
[DEBUG] [client] internalConnLost complete
[DEBUG] Connection lost: EOF
[DEBUG] [client] enter reconnect
[DEBUG] [client] about to write new connect msg
[DEBUG] [client] socket connected to broker
[DEBUG] [client] Using MQTT 3.1.1 protocol
[DEBUG] [net] connect started
[DEBUG] [net] received connack
[DEBUG] [client] startCommsWorkers called
[DEBUG] [client] client is connected/reconnected
[DEBUG] [net] incoming started
[DEBUG] [net] startIncomingComms started
[DEBUG] [net] outgoing started
[DEBUG] [net] startComms started
[DEBUG] [pinger] keepalive starting
[DEBUG] [net] outgoing waiting for an outbound message
[DEBUG] [net] logic waiting for msg on ibound
[DEBUG] [client] startCommsWorkers done
[DEBUG] [store] enter Resume
[DEBUG] [store] memorystore get: message 1 found
[DEBUG] [store] loaded pending publish (1)
[DEBUG] [client] disconnecting
[DEBUG] [store] {1 1}
[DEBUG] [store] exit resume
[DEBUG] [client] calling WaitTimeout
[DEBUG] [net] obound priority msg to write, type *packets.DisconnectPacket
[DEBUG] [net] startIncomingComms: inboundFromStore complete
[DEBUG] [net] logic waiting for msg on ibound
[DEBUG] [net] outbound wrote disconnect, closing connection
[DEBUG] [client] WaitTimeout done
[DEBUG] [client] stopCommsWorkers called
[DEBUG] [client] stopCommsWorkers waiting for workers
[DEBUG] [pinger] keepalive stopped
[DEBUG] [net] incoming complete
[DEBUG] [net] outgoing waiting for an outbound message
[DEBUG] [net] startIncomingComms: ibound complete
[DEBUG] [net] startIncomingComms goroutine complete
[DEBUG] [net] obound msg to write 1
[DEBUG] [router] matchAndDispatch exiting
[ERROR] [net] outgoing obound reporting error write tcp 127.0.0.1:35620->127.0.0.1:1883: use of closed network connection
[DEBUG] [client] startCommsWorkers output redirector finished
[DEBUG] [client] stopCommsWorkers waiting for comms
[DEBUG] [net] outgoing waiting for an outbound message
[DEBUG] [net] outgoing waiting for an outbound message
[DEBUG] [net] outgoing waiting for an outbound message
[DEBUG] [net] outgoing waiting for an outbound message
[DEBUG] [net] outgoing comms stopping
[DEBUG] [net] startComms closing outError
[DEBUG] [client] incoming comms goroutine done
[DEBUG] [client] stopCommsWorkers done
[DEBUG] [client] forcefully disconnecting
[DEBUG] [msgids] cleaned up
[DEBUG] [client] disconnected
[DEBUG] [store] memorystore closed
My understanding of what is happening is this:
- Try to publish message to broker
- Broker sends EOF
- Client reconnects
c.claimID(token, details.messageId)
is called here, resulting in the original token be considered completec.Disconnect(250)
is called astoken.Wait()
completed successfully
To solve my immedaite issue I will try to disable auto-reconnect and handle reconnects myself.
Would it not make sense to have token.Wait()
wait until the message is actually delivered, regardless of reconnects?
At least documenting the caveat would be nice