|
| 1 | +#!/usr/bin/env python3 |
| 2 | +from http.server import HTTPServer, SimpleHTTPRequestHandler |
| 3 | +from itertools import islice, chain |
| 4 | +from threading import Thread |
| 5 | +from telnetlib import Telnet |
| 6 | +from pathlib import Path |
| 7 | +import hmac |
| 8 | + |
| 9 | +# if you add any new fields that need to be customed, you will |
| 10 | +# also need to adjust PayloadHandler.do_GET() |
| 11 | +CONFIG_TEMPLATE="""ETH10GESLOT=1 |
| 12 | +EepEqVendorID=GPON |
| 13 | +EepEqSerialNumber=GPON12345678 |
| 14 | +EepVDSL2SerialNumber= VDSLSerialNumberGPON12345678 |
| 15 | +EepEqVersionID=GPON |
| 16 | +EepEqID=XG-99S |
| 17 | +""" |
| 18 | + |
| 19 | +VENDOR_SPECIFIC = { |
| 20 | + "HUMA": "iONT320500G", |
| 21 | + "NOKA": "iONT320505G", |
| 22 | +} |
| 23 | + |
| 24 | +# 0x36 byte array, each byte of the output digest is used as an index % 0x36 |
| 25 | +# for the next output char; chars that can be easily confused are missing |
| 26 | +output_base = "2345679abcdefghijkmnpqrstuvwxyzACDEFGHJKLMNPQRSTUVWXYZ" |
| 27 | +def extract_chars(key_base, serial, extract_len, total_len): |
| 28 | + # the number of bytes being extracted is appended to the 15 byte fixed |
| 29 | + # key to round it out to an even 0x10 byte hmac key |
| 30 | + key = key_base + total_len.to_bytes(1, 'little') |
| 31 | + digest = hmac.HMAC(key, serial.encode("utf-8"), "md5").digest() |
| 32 | + return map(lambda x: output_base[x % len(output_base)], islice(digest, extract_len)) |
| 33 | + |
| 34 | +# when over 0x10 bytes are requested, the first 0x10 characters of output |
| 35 | +# use a different key than all remaining bytes. |
| 36 | +# suspect that the Nokia XS-010X-Q is exactly the same with the exception |
| 37 | +# of the HMAC keys baked into libvos.so being different |
| 38 | +keys = [b"\x01\x03\n\x10\x13\x05\x17d\xc8\x06\x14\x19\xb4\x9d\x05", |
| 39 | + b"\x05\x11:`{\xfb\x0fC\\!\xbe\x86A2\x1c"] |
| 40 | +def VOS_HmacMD5(serial, required_len): |
| 41 | + it1 = extract_chars(keys[0], serial, min(0x10, required_len ), required_len) |
| 42 | + it2 = extract_chars(keys[1], serial, max( 0, required_len - 0x10), required_len) |
| 43 | + return ''.join(chain(it1, it2)) |
| 44 | + |
| 45 | +class CigTimeout(Exception): |
| 46 | + pass |
| 47 | + |
| 48 | +class CigTelnet(Telnet): |
| 49 | + def __init__(self, serial): |
| 50 | + serial = serial[:4].upper() + serial[4:].lower() |
| 51 | + |
| 52 | + super().__init__("192.168.100.1", 23) |
| 53 | + self._in_shell = False |
| 54 | + |
| 55 | + self.read_until(b"Login as:") |
| 56 | + self.write(f"{serial}\n".encode("utf-8")) |
| 57 | + self.read_until(b":") |
| 58 | + self.write(f"{VOS_HmacMD5(serial.upper(), 8)}\n".encode("utf-8")) |
| 59 | + self.read_until(b"ONT>") |
| 60 | + self.write(b"enable\n") |
| 61 | + |
| 62 | + def sh_cmd(self, cmd, timeout=2): |
| 63 | + if not self._in_shell: |
| 64 | + self._in_shell = True |
| 65 | + self.write(b"/s/s\n") |
| 66 | + self.read_until(b"shell>", timeout) |
| 67 | + |
| 68 | + if not cmd.endswith("\n"): |
| 69 | + cmd += "\n" |
| 70 | + |
| 71 | + self.write(cmd.encode("utf-8")) |
| 72 | + res = self.read_until(b"shell>", timeout) |
| 73 | + if b"shell>" not in res: |
| 74 | + raise CigTimeout("CigTelnet command timed out") |
| 75 | + return res.decode("utf-8") |
| 76 | + |
| 77 | +class PayloadHandler(SimpleHTTPRequestHandler): |
| 78 | + def __init__(self, *args, serial=None, **kwargs): |
| 79 | + self.vendor = serial[:4] |
| 80 | + self.serial = serial[4:].lower() |
| 81 | + self.eqid = VENDOR_SPECIFIC[self.vendor] |
| 82 | + super().__init__(*args, directory=Path(__file__).parent / "payload", **kwargs) |
| 83 | + |
| 84 | + def do_GET(self): |
| 85 | + if self.path=="/config": |
| 86 | + self.send_response(200) |
| 87 | + self.send_header("Content-type", "html") |
| 88 | + self.end_headers() |
| 89 | + self.wfile.write(CONFIG_TEMPLATE \ |
| 90 | + .replace("GPON", self.vendor) \ |
| 91 | + .replace("12345678", self.serial) \ |
| 92 | + .replace("XG-99S", self.eqid) \ |
| 93 | + .encode("utf-8")) |
| 94 | + else: |
| 95 | + return super().do_GET() |
| 96 | + |
| 97 | +def genpw(args): |
| 98 | + serial = args.serial[:4].upper() + args.serial[4:].lower() |
| 99 | + |
| 100 | + # the CigLogin binary relies on VOS_HmacMD5 (from libvos.so) to generate |
| 101 | + # the telnet password from the raw serial with output length 8 |
| 102 | + # dropbearmulti, Console, and MecMgr all use VOS_HmacMD5 to generate |
| 103 | + # the password in the form of {SERIAL}-ONTUSER with output length 16 |
| 104 | + |
| 105 | + print(f"Creds for FS.com XGS-PON stick with serial {serial}:") |
| 106 | + print(f" Telnet: {serial} / {VOS_HmacMD5(serial.upper(), 8)}") |
| 107 | + print(f" SSH: ONTUSER / {VOS_HmacMD5(serial + '-ONTUSER', 16)}") |
| 108 | + |
| 109 | +def telnet(args): |
| 110 | + with CigTelnet(args.serial) as tn: |
| 111 | + tn.interact() |
| 112 | + |
| 113 | +def install(args): |
| 114 | + assert args.att_serial[:4] in VENDOR_SPECIFIC |
| 115 | + |
| 116 | + class PayloadServer(HTTPServer): |
| 117 | + def finish_request(self, request, client_address): |
| 118 | + self.RequestHandlerClass(request, client_address, self, serial=args.att_serial) |
| 119 | + |
| 120 | + print("Connecting via telnet...") |
| 121 | + with CigTelnet(args.gpon_serial) as tn: |
| 122 | + (addr, _) = tn.get_socket().getsockname() |
| 123 | + |
| 124 | + with PayloadServer(("", 8172), PayloadHandler) as ps: |
| 125 | + (_, port) = ps.socket.getsockname() |
| 126 | + Thread(target=ps.serve_forever, daemon=True).start() |
| 127 | + print(f"Webserver listening on {addr}:{port}") |
| 128 | + print("If this doesn't complete almost immediately, ensure there is no router between you and the device!") |
| 129 | + |
| 130 | + # ensure that if this goes Poorly we can power cycle our way out of it |
| 131 | + tn.sh_cmd("touch /mnt/rwdir/disarmed") |
| 132 | + tn.sh_cmd("[ -f /mnt/rwdir/setup.sh ] && rm /mnt/rwdir/setup.sh") |
| 133 | + |
| 134 | + # prevent a bad update from incorrectly persisting based on a safe prior version that |
| 135 | + # was persisting successfully by forcing people to re-enable it the long way |
| 136 | + tn.sh_cmd("[ -f /mnt/rwdir/payload_auto_rearm ] && rm /mnt/rwdir/payload_auto_rearm") |
| 137 | + |
| 138 | + try: |
| 139 | + assert "100%" in tn.sh_cmd(f"wget -O - {addr}:{port}/config > /mnt/rwdir/payload.cfg", 10) |
| 140 | + print("Payload configuration sent") |
| 141 | + |
| 142 | + assert "100%" in tn.sh_cmd(f"wget -O - {addr}:{port}/payload.tgz | tar xvzf - -C /mnt/rwdir/", 10) |
| 143 | + except (CigTimeout, AssertionError): |
| 144 | + print(f"Error: Stick was not able to connect back and download payload! Check firewall!") |
| 145 | + return |
| 146 | + |
| 147 | + assert "stage0.sh" in tn.sh_cmd("ls /mnt/rwdir/") |
| 148 | + |
| 149 | + tn.sh_cmd("ln -sf /mnt/rwdir/stage0.sh /mnt/rwdir/setup.sh") |
| 150 | + tn.sh_cmd("[ -f /mnt/rwdir/disarmed ] && rm /mnt/rwdir/disarmed") |
| 151 | + tn.sh_cmd("sync") |
| 152 | + |
| 153 | + print("Payload extracted -- press enter to reboot!") |
| 154 | + |
| 155 | + tn.write(b"reboot") # missing newline on purpose |
| 156 | + tn.interact() |
| 157 | + |
| 158 | +def persist(args): |
| 159 | + print("Connecting via telnet...") |
| 160 | + with CigTelnet(args.att_serial) as tn: |
| 161 | + if "payload_postboot_dropbear" not in tn.sh_cmd("ls -l /tmp/"): |
| 162 | + print("Persistence not allowed yet -- has it been 3+ minutes since boot?") |
| 163 | + print("If it's been more than 3 minutes and the device is not listening for SSH connections, the mod is not active!") |
| 164 | + return |
| 165 | + |
| 166 | + tn.sh_cmd("[ -f /tmp/payload_postboot_dropbear ] && touch /mnt/rwdir/payload_auto_rearm") |
| 167 | + tn.sh_cmd("[ -f /mnt/rwdir/disarmed ] && rm /mnt/rwdir/disarmed") |
| 168 | + tn.sh_cmd("[ ! -f /mnt/rwdir/setup.sh ] && ln -s /mnt/rwdir/stage0.sh /mnt/rwdir/setup.sh") |
| 169 | + tn.sh_cmd("sync") |
| 170 | + |
| 171 | + print("Persistence now enabled -- as a fail safe, a power cycle between ~30 seconds and ~120 seconds after boot should restore to stock") |
| 172 | + |
| 173 | +if __name__=="__main__": |
| 174 | + import argparse |
| 175 | + |
| 176 | + def parse_serial(serial): |
| 177 | + serial = serial.upper() |
| 178 | + |
| 179 | + if len(serial) != 12: |
| 180 | + raise argparse.ArgumentError("serial must be 12 characters") |
| 181 | + |
| 182 | + if serial[:4] not in ("GPON", "NOKA", "HUMA"): |
| 183 | + raise argparse.ArgumentError("vendor must be one of GPON, NOKA, HUMA") |
| 184 | + |
| 185 | + try: |
| 186 | + numeric = serial[4:].strip() |
| 187 | + assert len(numeric) == 8 |
| 188 | + int(numeric, 16) |
| 189 | + except (ValueError, AssertionError): |
| 190 | + raise argparse.ArgumentError("numeric portion of serial must be valid hexadecimal") |
| 191 | + |
| 192 | + return serial |
| 193 | + |
| 194 | + p = argparse.ArgumentParser() |
| 195 | + s = p.add_subparsers() |
| 196 | + |
| 197 | + parse_genpw = s.add_parser("genpw") |
| 198 | + parse_genpw.add_argument("serial", type=parse_serial) |
| 199 | + parse_genpw.set_defaults(func=genpw) |
| 200 | + |
| 201 | + parse_telnet = s.add_parser("telnet") |
| 202 | + parse_telnet.add_argument("serial", type=parse_serial) |
| 203 | + parse_telnet.set_defaults(func=telnet) |
| 204 | + |
| 205 | + parse_install = s.add_parser("install") |
| 206 | + parse_install.add_argument("gpon_serial", type=parse_serial) |
| 207 | + parse_install.add_argument("att_serial", type=parse_serial) |
| 208 | + parse_install.set_defaults(func=install) |
| 209 | + |
| 210 | + parse_persist = s.add_parser("persist") |
| 211 | + parse_persist.add_argument("att_serial", type=parse_serial) |
| 212 | + parse_persist.set_defaults(func=persist) |
| 213 | + |
| 214 | + args = p.parse_args() |
| 215 | + args.func(args) |
| 216 | + |
0 commit comments