Skip to content

Commit 85fe51d

Browse files
authored
Add support for shared subscription topic names (#75)
1 parent ca47fff commit 85fe51d

File tree

2 files changed

+59
-1
lines changed

2 files changed

+59
-1
lines changed

src/Subscription.php

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,17 @@ public function __construct(string $topicFilter, int $qualityOfService = 0, ?\Cl
3939
*/
4040
private function regexifyTopicFilter(): void
4141
{
42-
$this->regexifiedTopicFilter = '/^' . str_replace(['$', '/', '+', '#'], ['\$', '\/', '[^\/]*', '.*'], $this->topicFilter) . '$/';
42+
$topicFilter = $this->topicFilter;
43+
44+
// If the topic filter is for a shared subscription, we remove the shared subscription prefix as well as the group name
45+
// from the topic filter. To do so, we look for the $share keyword and then try to find the second topic separator to
46+
// calculate the substring containing the actual topic filter.
47+
// Note: shared subscriptions always have the form: $share/<group>/<topic>
48+
if (strpos($topicFilter, '$share/') === 0 && ($separatorIndex = strpos($topicFilter, '/', 7)) !== false) {
49+
$topicFilter = substr($topicFilter, $separatorIndex + 1);
50+
}
51+
52+
$this->regexifiedTopicFilter = '/^' . str_replace(['$', '/', '+', '#'], ['\$', '\/', '[^\/]*', '.*'], $topicFilter) . '$/';
4353
}
4454

4555
/**

tests/Feature/PublishSubscribeTest.php

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,4 +180,52 @@ function (string $topic, string $message, bool $retained) use ($subscriber, $sub
180180
$publisher->disconnect();
181181
$subscriber->disconnect();
182182
}
183+
184+
public function test_shared_subscriptions_using_quality_of_service_0_work_as_intended(): void
185+
{
186+
$subscriptionTopicFilter = '$share/test-shared-subscriptions/foo/+';
187+
$publishTopic = 'foo/bar';
188+
189+
// We connect and subscribe to a topic using the first client with a shared subscription.
190+
$subscriber1 = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber1');
191+
$subscriber1->connect(null, true);
192+
193+
$subscriber1->subscribe($subscriptionTopicFilter, function (string $topic, string $message, bool $retained) use ($subscriber1, $publishTopic) {
194+
// By asserting something here, we will avoid a no-assertions-in-test warning, making the test pass.
195+
$this->assertEquals($publishTopic, $topic);
196+
$this->assertEquals('hello world #1', $message);
197+
$this->assertFalse($retained);
198+
199+
$subscriber1->interrupt(); // This allows us to exit the test as soon as possible.
200+
}, 0);
201+
202+
// We connect and subscribe to a topic using the second client with a shared subscription.
203+
$subscriber2 = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber2');
204+
$subscriber2->connect(null, true);
205+
206+
$subscriber2->subscribe($subscriptionTopicFilter, function (string $topic, string $message, bool $retained) use ($subscriber2, $publishTopic) {
207+
// By asserting something here, we will avoid a no-assertions-in-test warning, making the test pass.
208+
$this->assertEquals($publishTopic, $topic);
209+
$this->assertEquals('hello world #2', $message);
210+
$this->assertFalse($retained);
211+
212+
$subscriber2->interrupt(); // This allows us to exit the test as soon as possible.
213+
}, 0);
214+
215+
// We publish a message from a second client on the same topic.
216+
$publisher = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'publisher');
217+
$publisher->connect(null, true);
218+
219+
$publisher->publish($publishTopic, 'hello world #1', 0, false);
220+
$publisher->publish($publishTopic, 'hello world #2', 0, false);
221+
222+
// Then we loop on the subscribers to (hopefully) receive the published messages.
223+
$subscriber1->loop(true);
224+
$subscriber2->loop(true);
225+
226+
// Finally, we disconnect for a graceful shutdown on the broker side.
227+
$publisher->disconnect();
228+
$subscriber1->disconnect();
229+
$subscriber2->disconnect();
230+
}
183231
}

0 commit comments

Comments
 (0)