@@ -6,7 +6,7 @@ use std::{
6
6
task:: { Context , Poll } ,
7
7
} ;
8
8
9
- use chrono:: { DateTime , Utc } ;
9
+ use chrono:: { DateTime , TimeDelta , Utc } ;
10
10
use client_state:: ClientMap ;
11
11
use sqlx:: { query, Error as SqlxError , PgExecutor , PgPool } ;
12
12
use thiserror:: Error ;
@@ -16,6 +16,7 @@ use tokio::{
16
16
mpsc:: { self , error:: SendError , Receiver , UnboundedSender } ,
17
17
} ,
18
18
task:: JoinHandle ,
19
+ time:: { interval, Duration } ,
19
20
} ;
20
21
use tokio_stream:: Stream ;
21
22
use tonic:: { metadata:: MetadataMap , Code , Request , Response , Status } ;
@@ -34,6 +35,8 @@ use crate::{
34
35
mail:: Mail ,
35
36
} ;
36
37
38
+ const PEER_DISCONNECT_INTERVAL : u64 = 60 ;
39
+
37
40
/// Sends given `GatewayEvent` to be handled by gateway GRPC server
38
41
///
39
42
/// If you want to use it inside the API context, use [`crate::AppState::send_wireguard_event`] instead
@@ -192,6 +195,7 @@ impl GatewayServer {
192
195
. client_state
193
196
. lock ( )
194
197
. map_err ( |_| GatewayServerError :: ClientStateMutexError ) ?;
198
+ debug ! ( "Current VPN client state map: {client_state:?}" ) ;
195
199
Ok ( client_state)
196
200
}
197
201
@@ -726,8 +730,46 @@ impl gateway_service_server::GatewayService for GatewayServer {
726
730
let network_id = Self :: get_network_id ( request. metadata ( ) ) ?;
727
731
let gateway_hostname = Self :: get_gateway_hostname ( request. metadata ( ) ) ?;
728
732
let mut stream = request. into_inner ( ) ;
733
+ let mut disconnect_timer = interval ( Duration :: from_secs ( PEER_DISCONNECT_INTERVAL ) ) ;
734
+
735
+ loop {
736
+ // wait for a message or update client map at least once a mninute if no messages are received
737
+ let stats_update = tokio:: select! {
738
+ message = stream. message( ) => {
739
+ match message? {
740
+ Some ( update) => update,
741
+ None => break , // Stream ended
742
+ }
743
+ }
744
+ _ = disconnect_timer. tick( ) => {
745
+ debug!( "No stats updates received in last {PEER_DISCONNECT_INTERVAL} seconds. Updating disconnected VPN clients" ) ;
746
+ // fetch location to get current peer disconnect threshold
747
+ let location = self . fetch_location_from_db( network_id) . await ?;
748
+
749
+ // perform client state operations in a dedicated block to drop mutex guard
750
+ let disconnected_clients = {
751
+ // acquire lock on client state map
752
+ let mut client_map = self . get_client_state_guard( ) ?;
753
+
754
+ // disconnect inactive clients
755
+ client_map. disconnect_inactive_vpn_clients_for_location(
756
+ network_id,
757
+ location. peer_disconnect_threshold,
758
+ ) ?
759
+ } ;
760
+
761
+ // emit client disconnect events
762
+ for ( device, context) in disconnected_clients {
763
+ self . emit_event( GrpcEvent :: ClientDisconnected {
764
+ context,
765
+ location: location. clone( ) ,
766
+ device,
767
+ } ) ?;
768
+ } ;
769
+ continue ;
770
+ }
771
+ } ;
729
772
730
- while let Some ( stats_update) = stream. message ( ) . await ? {
731
773
debug ! ( "Received stats message: {stats_update:?}" ) ;
732
774
let Some ( stats_update:: Payload :: PeerStats ( peer_stats) ) = stats_update. payload else {
733
775
debug ! ( "Received stats message is empty, skipping." ) ;
@@ -781,30 +823,35 @@ impl gateway_service_server::GatewayService for GatewayServer {
781
823
) ;
782
824
}
783
825
None => {
784
- // mark new VPN client as connected
785
- client_map. connect_vpn_client (
786
- network_id,
787
- & gateway_hostname,
788
- & public_key,
789
- & device,
790
- & user,
791
- socket_addr,
792
- & stats,
793
- ) ?;
794
-
795
- // emit connection event
796
- let context = GrpcRequestContext :: new (
797
- user. id ,
798
- user. username . clone ( ) ,
799
- socket_addr. ip ( ) ,
800
- device. id ,
801
- device. name . clone ( ) ,
802
- ) ;
803
- self . emit_event ( GrpcEvent :: ClientConnected {
804
- context,
805
- location : location. clone ( ) ,
806
- device : device. clone ( ) ,
807
- } ) ?;
826
+ // don't mark inactive peers as connected
827
+ if ( Utc :: now ( ) . naive_utc ( ) - stats. latest_handshake )
828
+ < TimeDelta :: seconds ( location. peer_disconnect_threshold . into ( ) )
829
+ {
830
+ // mark new VPN client as connected
831
+ client_map. connect_vpn_client (
832
+ network_id,
833
+ & gateway_hostname,
834
+ & public_key,
835
+ & device,
836
+ & user,
837
+ socket_addr,
838
+ & stats,
839
+ ) ?;
840
+
841
+ // emit connection event
842
+ let context = GrpcRequestContext :: new (
843
+ user. id ,
844
+ user. username . clone ( ) ,
845
+ socket_addr. ip ( ) ,
846
+ device. id ,
847
+ device. name . clone ( ) ,
848
+ ) ;
849
+ self . emit_event ( GrpcEvent :: ClientConnected {
850
+ context,
851
+ location : location. clone ( ) ,
852
+ device : device. clone ( ) ,
853
+ } ) ?;
854
+ }
808
855
}
809
856
} ;
810
857
0 commit comments