diff --git a/ReactiveXComponent/Connection/IXCConnection.cs b/ReactiveXComponent/Connection/IXCConnection.cs index 80632ba..4cdfb9d 100644 --- a/ReactiveXComponent/Connection/IXCConnection.cs +++ b/ReactiveXComponent/Connection/IXCConnection.cs @@ -4,6 +4,6 @@ namespace ReactiveXComponent.Connection { public interface IXCConnection { - IXCSession CreateSession(); + IXCSession CreateSession(TimeSpan? timeout = null, TimeSpan? retryInterval = null, int maxRetries = 5); } } \ No newline at end of file diff --git a/ReactiveXComponent/Connection/IXCSession.cs b/ReactiveXComponent/Connection/IXCSession.cs index a55b04c..fb39dfe 100644 --- a/ReactiveXComponent/Connection/IXCSession.cs +++ b/ReactiveXComponent/Connection/IXCSession.cs @@ -6,7 +6,9 @@ namespace ReactiveXComponent.Connection public interface IXCSession : IDisposable { bool IsOpen { get; } + event EventHandler SessionOpened; event EventHandler SessionClosed; + event EventHandler ConnectionError; IXCPublisher CreatePublisher(string component); IXCSubscriber CreateSubscriber(string component); List GetXCApiList(string requestId = null, TimeSpan ? timeout = null); diff --git a/ReactiveXComponent/Connection/XCConnectionFactory.cs b/ReactiveXComponent/Connection/XCConnectionFactory.cs index 8876f60..f9444bb 100644 --- a/ReactiveXComponent/Connection/XCConnectionFactory.cs +++ b/ReactiveXComponent/Connection/XCConnectionFactory.cs @@ -24,7 +24,7 @@ public override IXCConnection CreateConnection(ConnectionType connectionType, in return new RabbitMqConnection(_xcConfiguration, _privateCommunicationIdentifier); case ConnectionType.WebSocket: var webSocketEndpoint = _xcConfiguration.GetWebSocketEndpoint(); - return new WebSocketConnection(_xcConfiguration, webSocketEndpoint, connectionTimeout, _privateCommunicationIdentifier); + return new WebSocketConnection(_xcConfiguration, webSocketEndpoint, _privateCommunicationIdentifier); default: throw new ReactiveXComponentException($"Unsupported connection type: {connectionType}"); } diff --git a/ReactiveXComponent/IXComponentApi.cs b/ReactiveXComponent/IXComponentApi.cs index 7e0a6e6..697beb4 100644 --- a/ReactiveXComponent/IXComponentApi.cs +++ b/ReactiveXComponent/IXComponentApi.cs @@ -5,6 +5,6 @@ namespace ReactiveXComponent { public interface IXComponentApi { - IXCSession CreateSession(); + IXCSession CreateSession(TimeSpan? timeout = null, TimeSpan? retryInterval = null, int maxRetries = 5); } } \ No newline at end of file diff --git a/ReactiveXComponent/RabbitMq/RabbitMqConnection.cs b/ReactiveXComponent/RabbitMq/RabbitMqConnection.cs index d17acc9..a295224 100644 --- a/ReactiveXComponent/RabbitMq/RabbitMqConnection.cs +++ b/ReactiveXComponent/RabbitMq/RabbitMqConnection.cs @@ -1,4 +1,5 @@ -using ReactiveXComponent.Configuration; +using System; +using ReactiveXComponent.Configuration; using ReactiveXComponent.Connection; namespace ReactiveXComponent.RabbitMq @@ -16,9 +17,9 @@ public RabbitMqConnection(IXCConfiguration configuration, string privateCommunic _busDetails = configuration?.GetBusDetails(); } - public IXCSession CreateSession() + public IXCSession CreateSession(TimeSpan? timeout = null, TimeSpan? retryInterval = null, int maxRetries = 5) { - return new RabbitMqSession(_xcConfiguration, _busDetails, _privateCommunicationIdentifier); + return new RabbitMqSession(_xcConfiguration, _busDetails, _privateCommunicationIdentifier, timeout, retryInterval); } } } \ No newline at end of file diff --git a/ReactiveXComponent/RabbitMq/RabbitMqSession.cs b/ReactiveXComponent/RabbitMq/RabbitMqSession.cs index 66bf7ac..7a917b7 100644 --- a/ReactiveXComponent/RabbitMq/RabbitMqSession.cs +++ b/ReactiveXComponent/RabbitMq/RabbitMqSession.cs @@ -1,6 +1,8 @@ using System; using System.Collections.Generic; +using System.IO; using RabbitMQ.Client; +using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using ReactiveXComponent.Common; using ReactiveXComponent.Configuration; @@ -16,12 +18,16 @@ public class RabbitMqSession : IXCSession private readonly string _privateCommunicationIdentifier; private readonly ISerializer _serializer; private ConnectionFactory _factory; + private readonly TimeSpan? _timeout; + private readonly TimeSpan? _retryInterval; - public RabbitMqSession(IXCConfiguration xcConfiguration, BusDetails busDetails , string privateCommunicationIdentifier = null) + public RabbitMqSession(IXCConfiguration xcConfiguration, BusDetails busDetails , string privateCommunicationIdentifier = null, TimeSpan? timeout = null, TimeSpan? retryInterval = null) { _xcConfiguration = xcConfiguration; _privateCommunicationIdentifier = privateCommunicationIdentifier; _serializer = SelectSerializer(); + _timeout = timeout; + _retryInterval = retryInterval; InitConnection(busDetails); } @@ -36,12 +42,18 @@ private void InitConnection(BusDetails busDetails) VirtualHost = ConnectionFactory.DefaultVHost, HostName = busDetails.Host, Port = busDetails.Port, - Protocol = Protocols.DefaultProtocol + Protocol = Protocols.DefaultProtocol, + AutomaticRecoveryEnabled = true, + TopologyRecoveryEnabled = false, + NetworkRecoveryInterval = (_retryInterval != null) ? _retryInterval.Value : TimeSpan.FromSeconds(5), + RequestedConnectionTimeout = (_timeout != null) ? (int)_timeout.Value.TotalMilliseconds : 10000 }; _connection = _factory?.CreateConnection(); - + SessionOpened?.Invoke(this, EventArgs.Empty); + _connection.ConnectionUnblocked += ConnectionOnConnectionUnblocked; _connection.ConnectionShutdown += ConnectionOnConnectionShutdown; + _connection.ConnectionBlocked += ConnectionOnConnectionBlocked; } catch (BrokerUnreachableException e) { @@ -49,11 +61,21 @@ private void InitConnection(BusDetails busDetails) } } + private void ConnectionOnConnectionUnblocked(object sender, EventArgs eventArgs) + { + SessionOpened?.Invoke(this, EventArgs.Empty); + } + private void ConnectionOnConnectionShutdown(object sender, ShutdownEventArgs shutdownEventArgs) { SessionClosed?.Invoke(this, EventArgs.Empty); } + private void ConnectionOnConnectionBlocked(object sender, ConnectionBlockedEventArgs connectionBlockedEventArgs) + { + ConnectionError?.Invoke(this, new ErrorEventArgs(new Exception(connectionBlockedEventArgs.Reason))); + } + private ISerializer SelectSerializer() { switch (_xcConfiguration.GetSerializationType()) @@ -71,8 +93,12 @@ private ISerializer SelectSerializer() public bool IsOpen => _connection.IsOpen; + public event EventHandler SessionOpened; + public event EventHandler SessionClosed; + public event EventHandler ConnectionError; + public IXCPublisher CreatePublisher(string component) { return new RabbitMqPublisher(component, _xcConfiguration, _connection, _serializer, _privateCommunicationIdentifier); @@ -97,7 +123,9 @@ private void CloseConnection() { if (_connection == null) return; + _connection.ConnectionUnblocked -= ConnectionOnConnectionUnblocked; _connection.ConnectionShutdown -= ConnectionOnConnectionShutdown; + _connection.ConnectionBlocked -= ConnectionOnConnectionBlocked; _connection.Dispose(); } diff --git a/ReactiveXComponent/WebSocket/IWebSocketClient.cs b/ReactiveXComponent/WebSocket/IWebSocketClient.cs index 7d39019..72edf2e 100644 --- a/ReactiveXComponent/WebSocket/IWebSocketClient.cs +++ b/ReactiveXComponent/WebSocket/IWebSocketClient.cs @@ -6,7 +6,7 @@ namespace ReactiveXComponent.WebSocket public interface IWebSocketClient : IDisposable { bool IsOpen { get; } - void Init(WebSocketEndpoint endpoint, int timeout); + void Init(WebSocketEndpoint endpoint, TimeSpan? timeout, TimeSpan? retryInterval = null, int maxRetries = 5); void Open(); void Close(); void Send(string data); diff --git a/ReactiveXComponent/WebSocket/WebSocketClient.cs b/ReactiveXComponent/WebSocket/WebSocketClient.cs index e0db955..b44052e 100644 --- a/ReactiveXComponent/WebSocket/WebSocketClient.cs +++ b/ReactiveXComponent/WebSocket/WebSocketClient.cs @@ -7,7 +7,7 @@ namespace ReactiveXComponent.WebSocket { - public class WebSocketClient : IWebSocketClient + internal class WebSocketClient : IWebSocketClient { private WebSocket4Net.WebSocket _webSocket; private readonly object _webSocketLock = new object(); @@ -15,7 +15,14 @@ public class WebSocketClient : IWebSocketClient private AutoResetEvent _socketCloseEvent; private WebSocketEndpoint _endpoint; - private int _timeout; + private TimeSpan _timeout; + + private System.Timers.Timer _reconnectionTimer; + private int _maxRetries; + private int _currentRetryCount; + private TimeSpan _retryInterval; + private bool _isClosing; + private bool _isReconnecting; public event EventHandler ConnectionOpened; public event EventHandler ConnectionClosed; @@ -27,22 +34,8 @@ private void OpenConnection() _socketOpenEvent = new AutoResetEvent(false); _socketCloseEvent = new AutoResetEvent(false); var serverUri = GetServerUri(); - _webSocket = new WebSocket4Net.WebSocket(serverUri); - - _webSocket.Security.AllowUnstrustedCertificate = true; - _webSocket.Security.AllowNameMismatchCertificate = true; - - _webSocket.Opened += WebSocketOnOpened; - _webSocket.Closed += WebSocketOnClosed; - _webSocket.Error += WebSocketOnError; + InitWebSocket(serverUri); _webSocket.Open(); - - if (!_socketOpenEvent.WaitOne(_timeout)) - { - throw new ReactiveXComponentException($"Could not connect to the web socket server {serverUri} after {_timeout} ms"); - } - - _webSocket.MessageReceived += WebSocketOnMessageReceived; } private void CloseConnection() @@ -53,6 +46,7 @@ private void CloseConnection() { if (CanClose()) { + _isClosing = true; _webSocket.Close(); _socketCloseEvent.WaitOne(_timeout); } @@ -96,12 +90,27 @@ private void WebSocketOnClosed(object sender, EventArgs eventArgs) { _socketCloseEvent.Set(); - ConnectionClosed?.Invoke(this, EventArgs.Empty); + if (!_isClosing) + { + TryReconnect(); + } + + if (!_isReconnecting) + { + ConnectionClosed?.Invoke(this, EventArgs.Empty); + } } private void WebSocketOnError(object sender, ErrorEventArgs errorEventArgs) { - ConnectionError?.Invoke(this, new System.IO.ErrorEventArgs(errorEventArgs.Exception)); + if (!_isClosing && _maxRetries > 0) + { + TryReconnect(); + } + else + { + ConnectionError?.Invoke(this, new System.IO.ErrorEventArgs(errorEventArgs.Exception)); + } } private void WebSocketOnMessageReceived(object sender, MessageReceivedEventArgs messageReceivedEventArgs) @@ -111,10 +120,62 @@ private void WebSocketOnMessageReceived(object sender, MessageReceivedEventArgs public bool IsOpen { get { return _webSocket != null && _webSocket.State == WebSocketState.Open; } } - public void Init(WebSocketEndpoint endpoint, int timeout) + public void Init(WebSocketEndpoint endpoint, TimeSpan? timeout, TimeSpan? retryInterval, int maxRetries) { _endpoint = endpoint; - _timeout = timeout; + _timeout = timeout != null ? timeout.Value : TimeSpan.FromSeconds(10); + + _maxRetries = maxRetries; + _retryInterval = (retryInterval != null) ? retryInterval.Value : TimeSpan.FromSeconds(5); + _currentRetryCount = 0; + + _reconnectionTimer = new System.Timers.Timer(_retryInterval.TotalMilliseconds); + + _reconnectionTimer.Elapsed += (sender, args) => + { + lock (_webSocketLock) + { + var serverUri = GetServerUri(); + + if (!IsOpen && !_isClosing && _currentRetryCount < _maxRetries) + { + _isReconnecting = true; + DisposeWebSocket(); + + InitWebSocket(serverUri); + + _webSocket.Open(); + + if (_socketOpenEvent.WaitOne(_timeout)) + { + _isReconnecting = false; + _currentRetryCount = 0; + _reconnectionTimer.Stop(); + } + else + { + _currentRetryCount++; + + if (_currentRetryCount >= _maxRetries) + { + _reconnectionTimer.Stop(); + RaiseConnectionError($"Could not connect to the web socket server {serverUri} after {_timeout} ms"); + } + } + } + else if (_maxRetries <= 0) + { + _reconnectionTimer.Stop(); + RaiseConnectionError($"Could not connect to the web socket server {serverUri} after {_timeout} ms"); + } + } + }; + } + + private void RaiseConnectionError(string message) + { + var errorEvent = new System.IO.ErrorEventArgs(new ReactiveXComponentException(message)); + ConnectionError?.Invoke(this, errorEvent); } public void Open() @@ -132,6 +193,33 @@ public void Send(string data) _webSocket?.Send(data); } + private void InitWebSocket(string serverUri) + { + _webSocket = new WebSocket4Net.WebSocket(serverUri); + + _webSocket.Security.AllowUnstrustedCertificate = true; + _webSocket.Security.AllowNameMismatchCertificate = true; + + _webSocket.Opened += WebSocketOnOpened; + _webSocket.Closed += WebSocketOnClosed; + _webSocket.Error += WebSocketOnError; + _webSocket.MessageReceived += WebSocketOnMessageReceived; + } + + private void DisposeWebSocket() + { + _webSocket.Opened -= WebSocketOnOpened; + _webSocket.Closed -= WebSocketOnClosed; + _webSocket.Error -= WebSocketOnError; + _webSocket.MessageReceived -= WebSocketOnMessageReceived; + _webSocket.Dispose(); + } + + private void TryReconnect() + { + _reconnectionTimer.Start(); + } + #region IDisposable implementation private bool _disposed; @@ -144,13 +232,13 @@ private void Dispose(bool disposing) { CloseConnection(); - _webSocket.Opened -= WebSocketOnOpened; - _webSocket.Closed -= WebSocketOnClosed; - _webSocket.Error -= WebSocketOnError; - _webSocket.MessageReceived -= WebSocketOnMessageReceived; + DisposeWebSocket(); _socketOpenEvent.Dispose(); _socketCloseEvent.Dispose(); + + _isClosing = false; + _reconnectionTimer.Dispose(); } // clear unmanaged resources diff --git a/ReactiveXComponent/WebSocket/WebSocketConnection.cs b/ReactiveXComponent/WebSocket/WebSocketConnection.cs index 6c99e10..7932344 100644 --- a/ReactiveXComponent/WebSocket/WebSocketConnection.cs +++ b/ReactiveXComponent/WebSocket/WebSocketConnection.cs @@ -8,21 +8,19 @@ namespace ReactiveXComponent.WebSocket public class WebSocketConnection : IXCConnection { private readonly WebSocketEndpoint _endpoint; - private readonly int _timeout; private readonly IXCConfiguration _xcConfiguration; private readonly string _privateCommunicationIdentifier; - public WebSocketConnection(IXCConfiguration xcConfiguration, WebSocketEndpoint endpoint, int timeout, string privateCommunicationIdentifier = null) + public WebSocketConnection(IXCConfiguration xcConfiguration, WebSocketEndpoint endpoint, string privateCommunicationIdentifier = null) { _endpoint = endpoint; - _timeout = timeout; _xcConfiguration = xcConfiguration; _privateCommunicationIdentifier = privateCommunicationIdentifier; } - public IXCSession CreateSession() + public IXCSession CreateSession(TimeSpan? timeout = null, TimeSpan? retryInterval = null, int maxRetries = 5) { - return new WebSocketSession(_endpoint, _timeout, _xcConfiguration, _privateCommunicationIdentifier); + return new WebSocketSession(_endpoint, _xcConfiguration, _privateCommunicationIdentifier, timeout, retryInterval, maxRetries); } } } diff --git a/ReactiveXComponent/WebSocket/WebSocketSession.cs b/ReactiveXComponent/WebSocket/WebSocketSession.cs index 32f7f3c..8e6deda 100644 --- a/ReactiveXComponent/WebSocket/WebSocketSession.cs +++ b/ReactiveXComponent/WebSocket/WebSocketSession.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.IO; using ReactiveXComponent.Common; using ReactiveXComponent.Configuration; using ReactiveXComponent.Connection; @@ -12,27 +13,50 @@ public class WebSocketSession : IXCSession private readonly IXCConfiguration _xcConfiguration; private readonly string _privateCommunicationIdentifier; private readonly WebSocketXCApiManager _webSocketXCApiManager; + private readonly TimeSpan _timeout; + private readonly TimeSpan _retryInterval; + private readonly int _maxRetries; - public WebSocketSession(WebSocketEndpoint endpoint, int timeout, IXCConfiguration xcConfiguration, string privateCommunicationIdentifier) + public WebSocketSession(WebSocketEndpoint endpoint, IXCConfiguration xcConfiguration, string privateCommunicationIdentifier, TimeSpan? timeout, TimeSpan? retryInterval = null, int maxRetries = 5) { + _timeout = timeout ?? TimeSpan.FromSeconds(10); + _retryInterval = retryInterval ?? TimeSpan.FromSeconds(5); + _maxRetries = maxRetries; _webSocketClient = new WebSocketClient(); - _webSocketClient.Init(endpoint, timeout); - _webSocketClient.Open(); + _webSocketClient.Init(endpoint, _timeout, _retryInterval, _maxRetries); + _webSocketClient.ConnectionOpened += WebSocketClientOnConnectionOpened; _webSocketClient.ConnectionClosed += WebSocketClientOnConnectionClosed; + _webSocketClient.ConnectionError += WebSocketClientOnConnectionError; + _webSocketClient.Open(); + _xcConfiguration = xcConfiguration; _privateCommunicationIdentifier = privateCommunicationIdentifier; _webSocketXCApiManager = new WebSocketXCApiManager(_webSocketClient); } + private void WebSocketClientOnConnectionOpened(object sender, EventArgs eventArgs) + { + SessionOpened?.Invoke(this, EventArgs.Empty); + } + private void WebSocketClientOnConnectionClosed(object sender, EventArgs eventArgs) { SessionClosed?.Invoke(this, EventArgs.Empty); } + private void WebSocketClientOnConnectionError(object sender, ErrorEventArgs errorEventArgs) + { + ConnectionError?.Invoke(this, errorEventArgs); + } + public bool IsOpen => _webSocketClient.IsOpen; + public event EventHandler SessionOpened; + public event EventHandler SessionClosed; + public event EventHandler ConnectionError; + public IXCPublisher CreatePublisher(string component) { return new WebSocketPublisher(component, _webSocketClient, _xcConfiguration, _privateCommunicationIdentifier); @@ -65,6 +89,7 @@ private void Dispose(bool disposing) { if (disposing) { + _webSocketClient.ConnectionOpened -= WebSocketClientOnConnectionOpened; _webSocketClient.ConnectionClosed -= WebSocketClientOnConnectionClosed; _webSocketClient.Dispose(); } diff --git a/ReactiveXComponent/XComponentApi.cs b/ReactiveXComponent/XComponentApi.cs index 4632c10..44cb57b 100644 --- a/ReactiveXComponent/XComponentApi.cs +++ b/ReactiveXComponent/XComponentApi.cs @@ -25,9 +25,9 @@ public static IXComponentApi CreateFromXCApi(Stream xcApiStream, string privateC return new XComponentApi(xcApiStream, privateCommunicationIdentifier); } - public IXCSession CreateSession() + public IXCSession CreateSession(TimeSpan? timeout = null, TimeSpan? retryInterval = null, int maxRetries = 5) { - return _xcConnection.CreateSession(); + return _xcConnection.CreateSession(timeout, retryInterval, maxRetries); } } } diff --git a/ReactiveXComponentTest/WebSocket/WebSocketTests.cs b/ReactiveXComponentTest/WebSocket/WebSocketTests.cs index 261ee77..2e83f0e 100644 --- a/ReactiveXComponentTest/WebSocket/WebSocketTests.cs +++ b/ReactiveXComponentTest/WebSocket/WebSocketTests.cs @@ -329,5 +329,38 @@ public void GetXCApiListTest() Check.That(answer.Count == expectedAnswer.Count); Check.That(answer.FirstOrDefault().Equals(expectedAnswer.FirstOrDefault())); } + + [TestCase(0)] + [TestCase(1)] + [TestCase(3)] + public void ConnectionRetryTest(int maxRetries) + { + + var websocketEndpoint = new WebSocketEndpoint("websocket", "localhost", "443", WebSocketType.Secure); + var connectionFailedEvent = new AutoResetEvent(false); + var xcConfiguration = Substitute.For(); + var connectionAttemptTimeout = TimeSpan.FromSeconds(3); + var reconnectionAttemptInterval = TimeSpan.FromSeconds(5); + var totalConnectionAttemptTimeInMs = (int)(connectionAttemptTimeout.TotalMilliseconds + maxRetries*(connectionAttemptTimeout.TotalMilliseconds + reconnectionAttemptInterval.TotalMilliseconds)); + var epsilonMs = 10; + var totalWaitTimeMs = totalConnectionAttemptTimeInMs + epsilonMs; + + var webSocketSession = new WebSocketSession( + websocketEndpoint, + xcConfiguration, + "", + connectionAttemptTimeout, + reconnectionAttemptInterval, + maxRetries); + + webSocketSession.ConnectionError += (sender, args) => + { + connectionFailedEvent.Set(); + }; + + var connectionAttemptsFinished = connectionFailedEvent.WaitOne(totalWaitTimeMs); + + Check.That(connectionAttemptsFinished).IsTrue(); + } } }