Skip to content

Commit e39c1c6

Browse files
author
Tejas Wanjari
committed
Send tx messages to subscribers in same order as received on broker
Signed-off-by: Tejas Wanjari <[email protected]>
1 parent f5c233b commit e39c1c6

File tree

2 files changed

+29
-14
lines changed

2 files changed

+29
-14
lines changed

README.md

+9-2
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,24 @@ Includes:
77
2. `stompd`: STOMP Broker
88
3. `stomper`: Interactive CLI for STOMP Client
99

10-
## `stomp`
10+
## stomp
1111
Package import:
12-
```
12+
```shell
1313
go get -u github.com/tjs-w/go-proto-stomp
1414
```
1515

1616
## stomper
1717

18+
```shell
19+
stomper -p tcp
20+
```
21+
1822
![stomper demo](stomper.gif "stomper")
1923

2024
## stompd
25+
```shell
26+
stompd -p tcp <host> <port>
27+
```
2128

2229
## STOMP Library Documentation
2330

pkg/stomp/subscribe.go

+20-12
Original file line numberDiff line numberDiff line change
@@ -84,19 +84,27 @@ func publish(frame *Frame, txID string) error {
8484
return errorMsg(errBrokerStateMachine, "Missing entry in destToSubsMap, for key: "+dest)
8585
}
8686

87+
sendIt := func(subsID string, info *subsInfo) {
88+
info.Lock()
89+
defer info.Unlock()
90+
91+
if err := info.sessionHandler.sendMessage(dest, subsID, info.nextAckNum, txID,
92+
frame.headers, frame.body); err != nil {
93+
fmt.Println(err)
94+
return
95+
}
96+
info.pendingAckBitmap.Add(info.nextAckNum)
97+
info.nextAckNum++
98+
}
99+
87100
for subsID, info := range destToSubsMap[dest] {
88-
go func(subsID string, info *subsInfo) {
89-
info.Lock()
90-
defer info.Unlock()
91-
92-
if err := info.sessionHandler.sendMessage(dest, subsID, info.nextAckNum, txID,
93-
frame.headers, frame.body); err != nil {
94-
fmt.Println(err)
95-
return
96-
}
97-
info.pendingAckBitmap.Add(info.nextAckNum)
98-
info.nextAckNum++
99-
}(subsID, info)
101+
if txID == "" {
102+
// Parallelize sending non-tx messages
103+
go sendIt(subsID, info)
104+
} else {
105+
// Send tx messages in same sequence (do not parallelize to maintain order)
106+
sendIt(subsID, info)
107+
}
100108
}
101109

102110
return nil

0 commit comments

Comments
 (0)