Skip to content

Adding re connection management #53

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ReactiveXComponent/Connection/IXCSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ namespace ReactiveXComponent.Connection
public interface IXCSession : IDisposable
{
bool IsOpen { get; }
event EventHandler SessionOpened;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not Action instead of EventHandler ? It seems that you don't use the sender and EventArgs are empty most of the time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll refactor this in the next version as I try to avoid any impact on the modules using this Api (like Client Api generated by the XCStudio).

event EventHandler SessionClosed;
event EventHandler<System.IO.ErrorEventArgs> ConnectionError;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the JS API we called it DisconnectionError because what we are handling here is the case of a disconnection after connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This event is raised when we encounter a problem while connecting. That's why it's called ConnectionError. You can refer to WebSocketClient.Init() method.

IXCPublisher CreatePublisher(string component);
IXCSubscriber CreateSubscriber(string component);
List<string> GetXCApiList(string requestId = null, TimeSpan ? timeout = null);
Expand Down
9 changes: 6 additions & 3 deletions ReactiveXComponent/RabbitMq/RabbitMqConnection.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using ReactiveXComponent.Configuration;
using System;
using ReactiveXComponent.Configuration;
using ReactiveXComponent.Connection;

namespace ReactiveXComponent.RabbitMq
Expand All @@ -8,17 +9,19 @@ public class RabbitMqConnection : IXCConnection
private readonly IXCConfiguration _xcConfiguration;
private readonly string _privateCommunicationIdentifier;
private readonly BusDetails _busDetails;
private readonly TimeSpan? _retryInterval;

public RabbitMqConnection(IXCConfiguration configuration, string privateCommunicationIdentifier = null)
public RabbitMqConnection(IXCConfiguration configuration, string privateCommunicationIdentifier = null, TimeSpan? retryInterval = null)
{
_xcConfiguration = configuration;
_privateCommunicationIdentifier = privateCommunicationIdentifier;
_busDetails = configuration?.GetBusDetails();
_retryInterval = retryInterval;
}

public IXCSession CreateSession()
{
return new RabbitMqSession(_xcConfiguration, _busDetails, _privateCommunicationIdentifier);
return new RabbitMqSession(_xcConfiguration, _busDetails, _privateCommunicationIdentifier, _retryInterval);
}
}
}
31 changes: 28 additions & 3 deletions ReactiveXComponent/RabbitMq/RabbitMqSession.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.IO;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using ReactiveXComponent.Common;
using ReactiveXComponent.Configuration;
Expand All @@ -16,12 +18,14 @@ public class RabbitMqSession : IXCSession
private readonly string _privateCommunicationIdentifier;
private readonly ISerializer _serializer;
private ConnectionFactory _factory;
private readonly TimeSpan? _retryInterval;

public RabbitMqSession(IXCConfiguration xcConfiguration, BusDetails busDetails , string privateCommunicationIdentifier = null)
public RabbitMqSession(IXCConfiguration xcConfiguration, BusDetails busDetails , string privateCommunicationIdentifier = null, TimeSpan? retryInterval = null)
{
_xcConfiguration = xcConfiguration;
_privateCommunicationIdentifier = privateCommunicationIdentifier;
_serializer = SelectSerializer();
_retryInterval = retryInterval;
InitConnection(busDetails);
}

Expand All @@ -36,24 +40,39 @@ private void InitConnection(BusDetails busDetails)
VirtualHost = ConnectionFactory.DefaultVHost,
HostName = busDetails.Host,
Port = busDetails.Port,
Protocol = Protocols.DefaultProtocol
Protocol = Protocols.DefaultProtocol,
AutomaticRecoveryEnabled = true,
TopologyRecoveryEnabled = false,
NetworkRecoveryInterval = (_retryInterval != null) ? _retryInterval.Value : TimeSpan.FromSeconds(5)
};

_connection = _factory?.CreateConnection();

SessionOpened?.Invoke(this, EventArgs.Empty);
_connection.ConnectionUnblocked += ConnectionOnConnectionUnblocked;
_connection.ConnectionShutdown += ConnectionOnConnectionShutdown;
_connection.ConnectionBlocked += ConnectionOnConnectionBlocked;
}
catch (BrokerUnreachableException e)
{
throw new ReactiveXComponentException("Error while creating Rabbit Mq connection: " + e.Message, e);
}
}

private void ConnectionOnConnectionUnblocked(object sender, EventArgs eventArgs)
{
SessionOpened?.Invoke(this, EventArgs.Empty);
}

private void ConnectionOnConnectionShutdown(object sender, ShutdownEventArgs shutdownEventArgs)
{
SessionClosed?.Invoke(this, EventArgs.Empty);
}

private void ConnectionOnConnectionBlocked(object sender, ConnectionBlockedEventArgs connectionBlockedEventArgs)
{
ConnectionError?.Invoke(this, new ErrorEventArgs(new Exception(connectionBlockedEventArgs.Reason)));
}

private ISerializer SelectSerializer()
{
switch (_xcConfiguration.GetSerializationType())
Expand All @@ -71,8 +90,12 @@ private ISerializer SelectSerializer()

public bool IsOpen => _connection.IsOpen;

public event EventHandler SessionOpened;

public event EventHandler SessionClosed;

public event EventHandler<System.IO.ErrorEventArgs> ConnectionError;

public IXCPublisher CreatePublisher(string component)
{
return new RabbitMqPublisher(component, _xcConfiguration, _connection, _serializer, _privateCommunicationIdentifier);
Expand All @@ -97,7 +120,9 @@ private void CloseConnection()
{
if (_connection == null) return;

_connection.ConnectionUnblocked -= ConnectionOnConnectionUnblocked;
_connection.ConnectionShutdown -= ConnectionOnConnectionShutdown;
_connection.ConnectionBlocked -= ConnectionOnConnectionBlocked;
_connection.Dispose();
}

Expand Down
2 changes: 1 addition & 1 deletion ReactiveXComponent/WebSocket/IWebSocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace ReactiveXComponent.WebSocket
public interface IWebSocketClient : IDisposable
{
bool IsOpen { get; }
void Init(WebSocketEndpoint endpoint, int timeout);
void Init(WebSocketEndpoint endpoint, TimeSpan timeout, TimeSpan? retryInterval = null, int maxRetries = 5);
void Open();
void Close();
void Send(string data);
Expand Down
116 changes: 93 additions & 23 deletions ReactiveXComponent/WebSocket/WebSocketClient.cs
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
using System;
using System.Text.RegularExpressions;
using System.Threading;
using System.Timers;
using ReactiveXComponent.Common;
using SuperSocket.ClientEngine;
using WebSocket4Net;

namespace ReactiveXComponent.WebSocket
{
public class WebSocketClient : IWebSocketClient
internal class WebSocketClient : IWebSocketClient
{
private WebSocket4Net.WebSocket _webSocket;
private readonly object _webSocketLock = new object();
private AutoResetEvent _socketOpenEvent;
private AutoResetEvent _socketCloseEvent;

private WebSocketEndpoint _endpoint;
private int _timeout;
private TimeSpan _timeout;

private System.Timers.Timer _reconnectionTimer;
private int _maxRetries;
private int _currentRetryCount;
private TimeSpan _retryInterval;
private bool _isClosing;
private bool _isReconnecting;

public event EventHandler<EventArgs> ConnectionOpened;
public event EventHandler<EventArgs> ConnectionClosed;
Expand All @@ -27,22 +35,8 @@ private void OpenConnection()
_socketOpenEvent = new AutoResetEvent(false);
_socketCloseEvent = new AutoResetEvent(false);
var serverUri = GetServerUri();
_webSocket = new WebSocket4Net.WebSocket(serverUri);

_webSocket.Security.AllowUnstrustedCertificate = true;
_webSocket.Security.AllowNameMismatchCertificate = true;

_webSocket.Opened += WebSocketOnOpened;
_webSocket.Closed += WebSocketOnClosed;
_webSocket.Error += WebSocketOnError;
InitWebSocket(serverUri);
_webSocket.Open();

if (!_socketOpenEvent.WaitOne(_timeout))
{
throw new ReactiveXComponentException($"Could not connect to the web socket server {serverUri} after {_timeout} ms");
}

_webSocket.MessageReceived += WebSocketOnMessageReceived;
}

private void CloseConnection()
Expand All @@ -53,6 +47,7 @@ private void CloseConnection()
{
if (CanClose())
{
_isClosing = true;
_webSocket.Close();
_socketCloseEvent.WaitOne(_timeout);
}
Expand Down Expand Up @@ -96,7 +91,15 @@ private void WebSocketOnClosed(object sender, EventArgs eventArgs)
{
_socketCloseEvent.Set();

ConnectionClosed?.Invoke(this, EventArgs.Empty);
if (!_isClosing)
{
TryReconnect();
}

if (!_isReconnecting)
{
ConnectionClosed?.Invoke(this, EventArgs.Empty);
}
}

private void WebSocketOnError(object sender, ErrorEventArgs errorEventArgs)
Expand All @@ -111,10 +114,50 @@ private void WebSocketOnMessageReceived(object sender, MessageReceivedEventArgs

public bool IsOpen { get { return _webSocket != null && _webSocket.State == WebSocketState.Open; } }

public void Init(WebSocketEndpoint endpoint, int timeout)
public void Init(WebSocketEndpoint endpoint, TimeSpan timeout, TimeSpan? retryInterval, int maxRetries)
{
_endpoint = endpoint;
_timeout = timeout;

_maxRetries = maxRetries;
_retryInterval = (retryInterval != null) ? retryInterval.Value : TimeSpan.FromSeconds(5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be interesting to increment the _retryInterval depending on _currentRetryCount

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the JS API we decided to leave the reconnection strategy up to the user... what is the point of implementing that on the API side if it is probably application specific?

_currentRetryCount = 0;

_reconnectionTimer = new System.Timers.Timer(_retryInterval.TotalMilliseconds);

_reconnectionTimer.Elapsed += (sender, args) =>
{
lock (_webSocketLock)
{
if (!IsOpen && !_isClosing && _currentRetryCount < _maxRetries)
{
_isReconnecting = true;
DisposeWebSocket();

var serverUri = GetServerUri();
InitWebSocket(serverUri);

_webSocket.Open();

if (_socketOpenEvent.WaitOne(_timeout))
{
_isReconnecting = false;
_currentRetryCount = 0;
_reconnectionTimer.Stop();
}
else
{
_currentRetryCount++;

if (_currentRetryCount >= _maxRetries)
{
var errorEvent = new System.IO.ErrorEventArgs(new ReactiveXComponentException($"Could not connect to the web socket server {serverUri} after {_timeout} ms"));
ConnectionError?.Invoke(this, errorEvent);
}
}
}
}
};
}

public void Open()
Expand All @@ -132,6 +175,33 @@ public void Send(string data)
_webSocket?.Send(data);
}

private void InitWebSocket(string serverUri)
{
_webSocket = new WebSocket4Net.WebSocket(serverUri);

_webSocket.Security.AllowUnstrustedCertificate = true;
_webSocket.Security.AllowNameMismatchCertificate = true;

_webSocket.Opened += WebSocketOnOpened;
_webSocket.Closed += WebSocketOnClosed;
_webSocket.Error += WebSocketOnError;
_webSocket.MessageReceived += WebSocketOnMessageReceived;
}

private void DisposeWebSocket()
{
_webSocket.Opened -= WebSocketOnOpened;
_webSocket.Closed -= WebSocketOnClosed;
_webSocket.Error -= WebSocketOnError;
_webSocket.MessageReceived -= WebSocketOnMessageReceived;
_webSocket.Dispose();
}

private void TryReconnect()
{
_reconnectionTimer.Start();
}

#region IDisposable implementation

private bool _disposed;
Expand All @@ -144,13 +214,13 @@ private void Dispose(bool disposing)
{
CloseConnection();

_webSocket.Opened -= WebSocketOnOpened;
_webSocket.Closed -= WebSocketOnClosed;
_webSocket.Error -= WebSocketOnError;
_webSocket.MessageReceived -= WebSocketOnMessageReceived;
DisposeWebSocket();

_socketOpenEvent.Dispose();
_socketCloseEvent.Dispose();

_isClosing = false;
_reconnectionTimer.Dispose();
}

// clear unmanaged resources
Expand Down
8 changes: 6 additions & 2 deletions ReactiveXComponent/WebSocket/WebSocketConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,22 @@ public class WebSocketConnection : IXCConnection
private readonly int _timeout;
private readonly IXCConfiguration _xcConfiguration;
private readonly string _privateCommunicationIdentifier;
private readonly TimeSpan? _retryInterval;
private readonly int _maxRetries;

public WebSocketConnection(IXCConfiguration xcConfiguration, WebSocketEndpoint endpoint, int timeout, string privateCommunicationIdentifier = null)
public WebSocketConnection(IXCConfiguration xcConfiguration, WebSocketEndpoint endpoint, int timeout, string privateCommunicationIdentifier = null, TimeSpan? retryInterval = null, int maxRetries = 5)
{
_endpoint = endpoint;
_timeout = timeout;
_xcConfiguration = xcConfiguration;
_privateCommunicationIdentifier = privateCommunicationIdentifier;
_retryInterval = retryInterval;
_maxRetries = maxRetries;
}

public IXCSession CreateSession()
{
return new WebSocketSession(_endpoint, _timeout, _xcConfiguration, _privateCommunicationIdentifier);
return new WebSocketSession(_endpoint, _timeout, _xcConfiguration, _privateCommunicationIdentifier, _retryInterval, _maxRetries);
}
}
}
Loading