1
1
import hashlib
2
2
from datetime import datetime , timedelta
3
- import binascii
4
3
import json
5
4
import sys
6
5
import os
15
14
from cryptography import x509
16
15
from cryptography .hazmat .backends import default_backend
17
16
import re
18
- import atexit # <-- Added for auto cleanup
19
17
from pygsm7 import encodeMessage , decodeMessage
20
18
import traceback # <-- add this at the top if not already
21
19
from logging .handlers import TimedRotatingFileHandler
@@ -129,98 +127,18 @@ def hex2utf(string):
129
127
class zteRouter :
130
128
def __init__ (self , ip , username , password ):
131
129
self .ip = ip
132
- self .protocol = "http"
130
+ self .protocol = "http" # default to http
133
131
self .username = username
134
132
self .password = password
135
- self .cookies = {}
133
+ self .cookies = {} # Existing cookie management
136
134
self .stok = None
137
- self .session_expiry = datetime .min
138
- self .uses_stok = False
135
+ self .session_expiry = datetime .min # Initialize expiry in the past
139
136
logger .info (f"Initializing ZTE Router with IP { ip } , Username: { username } , Password: { password } " )
140
-
141
137
self .try_set_protocol ()
142
138
self .referer = f"{ self .protocol } ://{ self .ip } /"
143
139
144
- ld = self .get_LD ()
145
- self .session_id = ld if ld else self .ip .replace ("." , "_" )
146
-
147
- self .SESSION_FILE = os .path .join ("/tmp" , f"zte_session_{ self .session_id } .json" )
148
- logger .info (f"Using session ID: { self .session_id } " )
149
- logger .info (f"Session file path: { self .SESSION_FILE } " )
150
-
151
- # Register cleanup function
152
- atexit .register (self .cleanup_files )
153
-
154
- self .SESSION_FILE = os .path .join ("/tmp" , f"zte_session_{ self .session_id } .json" )
155
140
CERT_FILE = "/tmp/zte_router_cert.pem"
156
141
157
- def cleanup_files (self ):
158
- if os .path .exists (self .SESSION_FILE ):
159
- try :
160
- with open (self .SESSION_FILE , 'r' ) as f :
161
- session_data = json .load (f )
162
- expiry = datetime .fromisoformat (session_data .get ("session_expiry" ))
163
- if datetime .now () > expiry :
164
- os .remove (self .SESSION_FILE )
165
- logger .info (f"Cleaned up expired session file: { self .SESSION_FILE } " )
166
- else :
167
- logger .info ("Session still valid, skipping cleanup." )
168
- except Exception as e :
169
- logger .warning (f"Failed to evaluate or clean session file: { e } " )
170
-
171
-
172
- def save_session (self ):
173
- session_data = {
174
- 'stok' : self .stok ,
175
- 'session_expiry' : self .session_expiry .isoformat (),
176
- 'cookies' : self .cookies ,
177
- # Persist full session state
178
- 'last_command' : session .get ("last_command" ),
179
- 'last_successful_cmd' : session .get ("last_successful_cmd" ),
180
- 'last_error' : session .get ("last_error" ),
181
- 'total_requests' : session .get ("total_requests" ),
182
- 'last_latency_ms' : session .get ("last_latency_ms" ),
183
- 'session_created' : session .get ("created" ),
184
- 'expires_in' : session .get ("expires_in" ),
185
- 'session_uses_stok' : self .uses_stok ,
186
- }
187
- with open (self .SESSION_FILE , 'w' ) as f :
188
- json .dump (session_data , f , indent = 2 )
189
- logger .info ("📝 Session saved to disk:" )
190
- logger .debug (json .dumps (session_data , indent = 2 ))
191
-
192
-
193
- def load_session (self ):
194
- if os .path .exists (self .SESSION_FILE ):
195
- with open (self .SESSION_FILE , 'r' ) as f :
196
- session_data = json .load (f )
197
- self .stok = session_data .get ('stok' )
198
- self .cookies = session_data .get ('cookies' , {})
199
- self .session_expiry = datetime .fromisoformat (session_data ['session_expiry' ])
200
- self .uses_stok = session_data .get ("session_uses_stok" , False )
201
-
202
-
203
- # Restore extended session state
204
- session ["last_command" ] = session_data .get ("last_command" )
205
- session ["last_successful_cmd" ] = session_data .get ("last_successful_cmd" )
206
- session ["last_error" ] = session_data .get ("last_error" )
207
- session ["total_requests" ] = session_data .get ("total_requests" , 0 )
208
- session ["last_latency_ms" ] = session_data .get ("last_latency_ms" , 0 )
209
- session ["created" ] = session_data .get ("session_created" , datetime .now ().isoformat ())
210
- session ["expires_in" ] = session_data .get ("expires_in" , "N/A" )
211
-
212
- logger .info ("📦 Session loaded from disk:" )
213
- logger .debug (json .dumps (session_data , indent = 2 ))
214
-
215
- remaining = self .session_expiry - datetime .now ()
216
- logger .info (f"⏳ Session expires in: { remaining .total_seconds () / 60 :.2f} minutes" )
217
- else :
218
- logger .info ("⚠️ No existing session file found." )
219
-
220
-
221
- def is_session_valid (self ):
222
- return datetime .now () < self .session_expiry and (not self .uses_stok or self .stok is not None )
223
-
224
142
def invalidate_session (self ):
225
143
logger .info ("Invalidating session cookie" )
226
144
self .stok = None
@@ -438,16 +356,12 @@ def get_LD(self):
438
356
439
357
def getCookie (self , username , password , LD ):
440
358
logger .debug (f"Getting cookie for username: { username } , password: { password } , LD: { LD } " )
441
-
442
- self .load_session ()
443
- if self .is_session_valid () and 'stok' in self .cookies :
444
- remaining = self .session_expiry - datetime .now ()
445
- logger .info (f"🟢 Reusing session: stok={ self .stok } " )
446
- logger .info (f"Session valid for another { remaining .total_seconds () / 60 :.1f} minutes" )
447
- session ["expires_in" ] = f"{ int (remaining .total_seconds () / 60 )} min"
359
+ if self .stok is not None and datetime .now () < self .session_expiry :
360
+ logger .info (f"🟢 Reusing in-memory session: stok={ self .stok } " )
448
361
return self .stok
449
362
else :
450
- logger .warning ("🔄 Session not valid or missing 'stok' — performing fresh login" )
363
+ logger .info ("🔄 No valid session in memory, performing fresh login" )
364
+
451
365
452
366
header = {"Referer" : self .referer }
453
367
cookie_header = self .build_cookie_header ()
@@ -486,20 +400,14 @@ def getCookie(self, username, password, LD):
486
400
self .update_cookies (set_cookie_header )
487
401
488
402
stok = self .cookies .get ('stok' )
489
- if stok :
490
- self .uses_stok = True
491
- self .stok = stok
492
- logger .info (f"🔐 Router uses stok: { stok } " )
493
- else :
494
- self .uses_stok = False
495
- self .stok = None
496
- logger .info ("🔓 Router does NOT use stok (cookie-based only login)" )
403
+ if not stok :
404
+ logger .error ("Failed to obtain a valid cookie from the router" )
405
+ raise ValueError ("Failed to obtain a valid cookie from the router" )
497
406
498
407
# Set session expiry (e.g., valid for 60 minutes)
499
408
self .session_expiry = datetime .now () + timedelta (minutes = 1 )
500
409
session ["expires_in" ] = f"{ int ((self .session_expiry - datetime .now ()).total_seconds () / 60 )} min"
501
410
self .stok = stok
502
- self .save_session ()
503
411
logger .info (f"Obtained new session cookie: stok={ stok } " )
504
412
return stok
505
413
@@ -1280,25 +1188,20 @@ def setdata_mode(self, BearerPreference):
1280
1188
result = zte .zteinfo4 ()
1281
1189
print (result )
1282
1190
elif command == 99 :
1283
- # ✅ Explicitly load session before printing diagnostics
1284
- zte .load_session ()
1285
1191
diagnostics = {
1286
- "session_created" : session .get ("created" ),
1287
- "session_expires_in" : session .get ("expires_in" ),
1288
- "last_command" : session .get ("last_command" , "Unknown" ),
1289
- "last_successful_cmd" : session .get ("last_successful_cmd" , "Unknown" ),
1290
- "last_error" : session .get ("last_error" , "Unknown" ),
1291
- "total_requests" : session .get ("total_requests" , 0 ),
1292
- "fetch_latency_ms" : session .get ("last_latency_ms" , 0 ),
1192
+ "runtime_started" : session .get ("created" ),
1193
+ "last_latency_ms" : session .get ("last_latency_ms" , 0 ),
1194
+ "total_http_requests" : session .get ("total_requests" , 0 ),
1195
+ "last_error" : session .get ("last_error" , "None" ),
1196
+ "command_success" : session .get ("last_successful_cmd" , "None" ),
1197
+ "router_ip" : ip ,
1293
1198
}
1294
1199
print (json .dumps (diagnostics , indent = 2 ))
1295
1200
sys .exit (0 )
1296
1201
1297
1202
# ✅ Track last successful command if applicable
1298
1203
if command != 99 and result is not None :
1299
1204
session ["last_successful_cmd" ] = command
1300
- zte .save_session () # 💾 Save updated session to disk!
1301
-
1302
1205
# ❌ Fallback if command is not recognized
1303
1206
if result is None :
1304
1207
print (f"Invalid command: { command } " )
@@ -1307,5 +1210,4 @@ def setdata_mode(self, BearerPreference):
1307
1210
except Exception as e :
1308
1211
logger .error (f"An error occurred: { e } " )
1309
1212
session ["last_error" ] = str (e )
1310
- zte .save_session ()
1311
- sys .exit (1 )
1213
+ sys .exit (1 )
0 commit comments