Skip to content

Commit 556926f

Browse files
authored
Merge pull request #11 from banananyo/emit-event-when-connection-closed
feat: emit event on connection closed
2 parents 7defc75 + b6a7fc5 commit 556926f

File tree

3 files changed

+32
-3
lines changed

3 files changed

+32
-3
lines changed

lib/constants/amqp.constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
export const AMQP_MODULE_OPTIONS = 'AMQP_MODULE_OPTIONS';
22
export const AMQP_CONSUMER_METADATA = 'AMQP_CONSUMER_METADATA';
33
export const AMQP_CONNECTION_RECONNECT = 'AMQP_CONNECTION_RECONNECT';
4+
export const AMQP_CONNECTION_DISCONNECTED = 'AMQP_CONNECTION_DISCONNECTED';

lib/services/amqp.service.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { EventEmitter } from 'events';
44
import { hostname } from 'os';
55
import { AwaitableSender, Connection, ConnectionEvents, Container, EventContext, Receiver, ReceiverEvents, SenderEvents } from 'rhea-promise';
66

7-
import { AMQP_CONNECTION_RECONNECT } from '../constants';
7+
import { AMQP_CONNECTION_DISCONNECTED, AMQP_CONNECTION_RECONNECT } from '../constants';
88
import { AMQPModuleOptions, CreateReceiverOptions, CreateSenderOptions } from '../interfaces';
99
import { ErrorMessage, getConnectionToken, getLogger, parseURL } from '../utils';
1010

@@ -46,6 +46,10 @@ export class AMQPService {
4646
connection.on(ConnectionEvents.disconnected, (context: EventContext) => {
4747
const error = [`Connection closed by peer: ${connectionToken}`, ErrorMessage.fromContext(context)];
4848
this.logger.warn(...error.filter(e => e));
49+
const emitted = AMQPService.eventEmitter.emit(AMQP_CONNECTION_DISCONNECTED);
50+
if (!emitted) {
51+
this.logger.warn('disconnect event not emitted');
52+
}
4953
});
5054
connection.on(ConnectionEvents.connectionClose, (context: EventContext) => {
5155
const error = `Connection closed: ${connectionToken}`;
@@ -54,6 +58,22 @@ export class AMQPService {
5458
} else {
5559
this.logger.warn(error);
5660
}
61+
const timeoutHandler = setTimeout(async () => {
62+
(context.connection as any)._connection.dispatch(ConnectionEvents.disconnected, void 0);
63+
await context.connection
64+
.open()
65+
.then(() => {
66+
this.logger.silly('connection successfully reopened');
67+
const emitted = AMQPService.eventEmitter.emit(AMQP_CONNECTION_RECONNECT);
68+
if (!emitted) {
69+
this.logger.warn('reconnect event not emitted');
70+
}
71+
})
72+
.catch(error => {
73+
this.logger.error(`reopening connection failed with error: ${error.message}`, error);
74+
});
75+
clearTimeout(timeoutHandler);
76+
}, 1000);
5777
});
5878
try {
5979
await connection.open();

src/app.service.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Logger } from '@dollarsign/logger';
2-
import { Consumer, MessageControl, ProducerService, SendOptions } from '@dollarsign/nestjs-amqp';
2+
import { AMQP_CONNECTION_DISCONNECTED, AMQPService, Consumer, MessageControl, ProducerService, SendOptions } from '@dollarsign/nestjs-amqp';
33
import { delay } from '@dollarsign/utils';
44
import { Injectable } from '@nestjs/common';
55

@@ -13,7 +13,15 @@ export class AppService {
1313
displayFilePath: false,
1414
});
1515

16-
constructor(private readonly producer: ProducerService) {}
16+
constructor(private readonly producer: ProducerService) {
17+
this.bindListener();
18+
}
19+
20+
async bindListener() {
21+
AMQPService.eventEmitter.on(AMQP_CONNECTION_DISCONNECTED, () => {
22+
this.logger.debug('received disconnected event');
23+
});
24+
}
1725

1826
getHello(): string {
1927
return 'Hello World!';

0 commit comments

Comments
 (0)