-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Open
Labels
Description
Description
Hello
Here is my implementation of a pubsub
class RedisGraphqlPubSub implements PubSub {
constructor(
private readonly redisClient: RedisClientType,
private readonly logger: ILoggerService
) {}
async publish<K extends keyof Events>(
routingKey: K,
payload: Events[K]
): Promise<void> {
await this.redisClient.publish(routingKey, 'toto');
}
subscribe<K extends keyof Events>(routingKey: K): AsyncIterable<unknown> {
return this.subscribeWithPolling(routingKey);
}
async *subscribeWithPolling<K extends keyof Events>(
routingKey: K
): AsyncIterable<string> {
const subscriber = this.redisClient.duplicate({disableOfflineQueue: true});
try {
await subscriber.connect();
const messages: string[] = [];
await subscriber.subscribe(routingKey, (message: string) => {
try {
messages.push(message);
} catch (err) {
console.error("❌ POLLING MESSAGE ERROR", err);
}
});
while (true) {
if (messages.length > 0) {
yield messages.shift()!;
} else {
await new Promise(resolve => setTimeout(resolve, 50));
}
}
} finally {
try {
await subscriber.unsubscribe(routingKey);
await subscriber.destroy();
} catch (err) {
console.error("❌ POLLING CLEANUP ERROR", err);
}
}
}
}
Publish works fine and polling too but when receiving a message i have an error:
{"message":"Cannot read properties of undefined (reading 'value')","stack":"TypeError: Cannot read properties of undefined (reading 'value')\n at RedisCommandQueue.#getTypeMapping (/Users/apps/api/node_modules/@redis/client/lib/client/commands-queue.ts:113:40)
It look like this line is the error in commands-queue.ts
#getTypeMapping() {
return this.#waitingForReply.head!.value.typeMapping ?? {};
}
First of all head non null assertion is not safe enought, i could have received {}
Second, i don't know what's wrong in my implementation
Thanks in advance
Node.js Version
v20.11.0
Redis Server Version
redis-stack:7.4.0-v6 or alpine redis image
Node Redis Version
Platform
macOS