Skip to content

Commit 65324e1

Browse files
authored
Pass retain flag along in async publish (#134)
* Pass retain flag along in async publish Add test to check Clean up tests to use task groups * Fix testAsyncSequencePublishListener
1 parent a0e1126 commit 65324e1

File tree

2 files changed

+105
-66
lines changed

2 files changed

+105
-66
lines changed

Sources/MQTTNIO/AsyncAwaitSupport/MQTTClient+async.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ extension MQTTClient {
7272
/// - qos: Quality of Service for message.
7373
/// - retain: Whether this is a retained message.
7474
public func publish(to topicName: String, payload: ByteBuffer, qos: MQTTQoS, retain: Bool = false) async throws {
75-
return try await self.publish(to: topicName, payload: payload, qos: qos).get()
75+
return try await self.publish(to: topicName, payload: payload, qos: qos, retain: retain).get()
7676
}
7777

7878
/// Subscribe to topic

Tests/MQTTNIOTests/MQTTNIOTests+async.swift

Lines changed: 104 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -52,29 +52,35 @@ final class AsyncMQTTNIOTests: XCTestCase {
5252
}
5353

5454
func testPublishSubscribe() async throws {
55-
let expectation = XCTestExpectation(description: "testPublishSubscribe")
56-
expectation.expectedFulfillmentCount = 1
57-
5855
let client = self.createClient(identifier: "testPublish+async")
5956
let client2 = self.createClient(identifier: "testPublish+async2")
6057
let payloadString = "Hello"
6158
try await client.connect()
6259
try await client2.connect()
6360
_ = try await client2.subscribe(to: [.init(topicFilter: "TestSubject", qos: .atLeastOnce)])
64-
client2.addPublishListener(named: "test") { result in
65-
switch result {
66-
case .success(let publish):
67-
var buffer = publish.payload
68-
let string = buffer.readString(length: buffer.readableBytes)
69-
XCTAssertEqual(string, payloadString)
70-
expectation.fulfill()
71-
case .failure(let error):
72-
XCTFail("\(error)")
61+
try await withThrowingTaskGroup(of: Void.self) { group in
62+
group.addTask {
63+
let listener = client2.createPublishListener()
64+
for try await event in listener {
65+
switch event {
66+
case .success(let publish):
67+
var buffer = publish.payload
68+
let string = buffer.readString(length: buffer.readableBytes)
69+
XCTAssertEqual(string, payloadString)
70+
return
71+
case .failure(let error):
72+
XCTFail("\(error)")
73+
}
74+
}
7375
}
76+
group.addTask {
77+
try await Task.sleep(nanoseconds: 5_000_000_000)
78+
XCTFail("Timeout")
79+
}
80+
try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: payloadString), qos: .atLeastOnce)
81+
try await group.next()
82+
group.cancelAll()
7483
}
75-
try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: payloadString), qos: .atLeastOnce)
76-
77-
self.wait(for: [expectation], timeout: 2)
7884

7985
try await client.disconnect()
8086
try await client2.disconnect()
@@ -99,51 +105,45 @@ final class AsyncMQTTNIOTests: XCTestCase {
99105
}
100106

101107
func testAsyncSequencePublishListener() async throws {
102-
let expectation = ManagedAtomic(0)
103-
let finishExpectation = ManagedAtomic(0)
104-
105108
let client = self.createClient(identifier: "testAsyncSequencePublishListener+async", version: .v5_0)
106109
let client2 = self.createClient(identifier: "testAsyncSequencePublishListener+async2", version: .v5_0)
107110

108111
try await client.connect()
109112
try await client2.connect()
110113
_ = try await client2.v5.subscribe(to: [.init(topicFilter: "TestSubject", qos: .atLeastOnce)])
111-
let task = Task {
112-
let publishListener = client2.createPublishListener()
113-
for await result in publishListener {
114-
switch result {
115-
case .success(let publish):
116-
var buffer = publish.payload
117-
let string = buffer.readString(length: buffer.readableBytes)
118-
print("Received: \(string ?? "nothing")")
119-
expectation.wrappingIncrement(ordering: .relaxed)
120-
121-
case .failure(let error):
122-
XCTFail("\(error)")
114+
try await withThrowingTaskGroup(of: Void.self) { group in
115+
group.addTask {
116+
let publishListener = client2.createPublishListener()
117+
for await result in publishListener {
118+
switch result {
119+
case .success(let publish):
120+
var buffer = publish.payload
121+
let string = buffer.readString(length: buffer.readableBytes)
122+
print("Received: \(string ?? "nothing")")
123+
return
124+
125+
case .failure(let error):
126+
XCTFail("\(error)")
127+
}
123128
}
124129
}
125-
finishExpectation.wrappingIncrement(ordering: .relaxed)
130+
group.addTask {
131+
try await Task.sleep(nanoseconds: 5_000_000_000)
132+
XCTFail("Timeout")
133+
}
134+
try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: "Hello"), qos: .atLeastOnce)
135+
try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: "Goodbye"), qos: .atLeastOnce)
136+
137+
try await group.next()
138+
group.cancelAll()
126139
}
127-
try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: "Hello"), qos: .atLeastOnce)
128-
try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: "Goodbye"), qos: .atLeastOnce)
129140
try await client.disconnect()
130-
131-
_ = try await Task.sleep(nanoseconds: 500_000_000)
132-
133141
try await client2.disconnect()
134142
try await client.shutdown()
135143
try await client2.shutdown()
136-
137-
_ = await task.result
138-
139-
XCTAssertEqual(expectation.load(ordering: .relaxed), 2)
140-
XCTAssertEqual(finishExpectation.load(ordering: .relaxed), 1)
141144
}
142145

143146
func testAsyncSequencePublishSubscriptionIdListener() async throws {
144-
let expectation = ManagedAtomic(0)
145-
let expectation2 = ManagedAtomic(0)
146-
147147
let client = self.createClient(identifier: "testAsyncSequencePublishSubscriptionIdListener+async", version: .v5_0)
148148
let client2 = self.createClient(identifier: "testAsyncSequencePublishSubscriptionIdListener+async2", version: .v5_0)
149149
let payloadString = "Hello"
@@ -152,37 +152,76 @@ final class AsyncMQTTNIOTests: XCTestCase {
152152
try await client2.connect()
153153
_ = try await client2.v5.subscribe(to: [.init(topicFilter: "TestSubject", qos: .atLeastOnce)], properties: [.subscriptionIdentifier(1)])
154154
_ = try await client2.v5.subscribe(to: [.init(topicFilter: "TestSubject2", qos: .atLeastOnce)], properties: [.subscriptionIdentifier(2)])
155-
let task = Task {
156-
let publishListener = client2.v5.createPublishListener(subscriptionId: 1)
157-
for await _ in publishListener {
158-
expectation.wrappingIncrement(ordering: .relaxed)
155+
try await withThrowingTaskGroup(of: Void.self) { group in
156+
group.addTask {
157+
let publishListener = client2.v5.createPublishListener(subscriptionId: 1)
158+
for await event in publishListener {
159+
XCTAssertEqual(String(buffer: event.payload), payloadString)
160+
return
161+
}
159162
}
160-
expectation.wrappingIncrement(ordering: .relaxed)
161-
}
162-
let task2 = Task {
163-
let publishListener = client2.v5.createPublishListener(subscriptionId: 2)
164-
for await _ in publishListener {
165-
expectation2.wrappingIncrement(ordering: .relaxed)
163+
group.addTask {
164+
let publishListener = client2.v5.createPublishListener(subscriptionId: 2)
165+
for await event in publishListener {
166+
XCTAssertEqual(String(buffer: event.payload), payloadString)
167+
return
168+
}
166169
}
167-
expectation2.wrappingIncrement(ordering: .relaxed)
170+
group.addTask {
171+
try await Task.sleep(nanoseconds: 5_000_000_000)
172+
XCTFail("Timeout")
173+
}
174+
try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: payloadString), qos: .atLeastOnce)
175+
try await client.publish(to: "TestSubject2", payload: ByteBufferAllocator().buffer(string: payloadString), qos: .atLeastOnce)
176+
177+
try await group.next()
178+
try await group.next()
179+
group.cancelAll()
168180
}
169-
try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: payloadString), qos: .atLeastOnce)
170-
try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: payloadString), qos: .atLeastOnce)
171-
try await client.publish(to: "TestSubject2", payload: ByteBufferAllocator().buffer(string: payloadString), qos: .atLeastOnce)
172181

173182
try await client.disconnect()
174-
175-
_ = try await Task.sleep(nanoseconds: 500_000_000)
176-
177183
try await client2.disconnect()
178184
try client.syncShutdownGracefully()
179185
try client2.syncShutdownGracefully()
186+
}
187+
188+
func testMQTTPublishRetain() async throws {
189+
let payloadString = #"{"from":1000000,"to":1234567,"type":1,"content":"I am a beginner in swift and I am studying hard!!测试\n\n test, message","timestamp":1607243024,"nonce":"pAx2EsUuXrVuiIU3GGOGHNbUjzRRdT5b","sign":"ff902e31a6a5f5343d70a3a93ac9f946adf1caccab539c6f3a6"}"#
190+
let payload = ByteBufferAllocator().buffer(string: payloadString)
180191

181-
_ = await task.result
182-
_ = await task2.result
192+
let client = self.createClient(identifier: "testMQTTPublishRetain_publisher")
193+
defer { XCTAssertNoThrow(try client.syncShutdownGracefully()) }
194+
let client2 = self.createClient(identifier: "testMQTTPublishRetain_subscriber")
195+
defer { XCTAssertNoThrow(try client2.syncShutdownGracefully()) }
196+
try await client.connect()
197+
try await client.publish(to: "testAsyncMQTTPublishRetain", payload: payload, qos: .atLeastOnce, retain: true)
198+
try await client2.connect()
199+
try await withThrowingTaskGroup(of: Void.self) { group in
200+
group.addTask {
201+
let listener = client2.createPublishListener()
202+
for try await event in listener {
203+
switch event {
204+
case .success(let publish):
205+
var buffer = publish.payload
206+
let string = buffer.readString(length: buffer.readableBytes)
207+
XCTAssertEqual(string, payloadString)
208+
return
209+
case .failure(let error):
210+
XCTFail("\(error)")
211+
}
212+
}
213+
}
214+
group.addTask {
215+
try await Task.sleep(nanoseconds: 5_000_000_000)
216+
XCTFail("Timeout")
217+
}
218+
_ = try await client2.subscribe(to: [.init(topicFilter: "testAsyncMQTTPublishRetain", qos: .atLeastOnce)])
219+
try await group.next()
220+
group.cancelAll()
221+
}
183222

184-
XCTAssertEqual(expectation.load(ordering: .relaxed), 3)
185-
XCTAssertEqual(expectation2.load(ordering: .relaxed), 2)
223+
try await client.disconnect()
224+
try await client2.disconnect()
186225
}
187226
}
188227

0 commit comments

Comments
 (0)