From 4596e0dddbe5a0b9a1462c1bd64777fbae34199f Mon Sep 17 00:00:00 2001 From: Riku Date: Tue, 21 Jan 2025 12:51:10 +0200 Subject: [PATCH] add tr069 reset functionality, first save --- zte_exporter.py | 155 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 129 insertions(+), 26 deletions(-) diff --git a/zte_exporter.py b/zte_exporter.py index b8733d3..eead9f6 100644 --- a/zte_exporter.py +++ b/zte_exporter.py @@ -13,11 +13,12 @@ from http.server import BaseHTTPRequestHandler, HTTPServer import argparse parser = argparse.ArgumentParser("ZTE simple exporter") -parser.add_argument("port", help="Serve metrics on what port", type=int) parser.add_argument("ip", help="Router ip address") parser.add_argument("username", help="Router username", default="admin") parser.add_argument("password", help="Router password") -parser.add_argument("--login", help="Login method (multi, single)", default="single") +parser.add_argument("--port", help="Serve metrics on what port", type=int, default=8999) +parser.add_argument("--reset", help="Reset tr069 info", default=False, type=bool) +parser.add_argument("--device", help="Device type, valid devices: mc7010, defaults to generic (multi login, sha256 hash)", default="generic") args = parser.parse_args() urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -26,12 +27,13 @@ s = requests.Session() class zteRouter: - def __init__(self, ip, username, password, login): + def __init__(self, ip, username, password, login, device): self.login = login self.ip = ip self.protocol = "http" # default to http self.username = username self.password = password + self.device = device self.try_set_protocol() self.referer = f"{self.protocol}://{self.ip}/" @@ -49,54 +51,146 @@ class zteRouter: pass # If RequestException occurs, try the next protocol # print(f"Could not determine the protocol for {self.ip}") + def getVersion(self): + header = {"Referer": self.referer} + payload = "isTest=false&cmd=wa_inner_version" + r = s.get(self.referer + f"goform/goform_get_cmd_process?{payload}", headers=header, data=payload, verify=False, timeout=1) + return r.json()["wa_inner_version"] + def hash(self, str): return hashlib.sha256(str.encode()).hexdigest() def get_LD(self): header = {"Referer": self.referer} payload = "isTest=false&cmd=LD" - r = s.get(self.referer + f"goform/goform_get_cmd_process?{payload}&_=", headers=header, data=payload, verify=False) + r = s.get(self.referer + f"goform/goform_get_cmd_process?{payload}&_=", headers=header, data=payload, verify=False, timeout=1) return r.json()["LD"].upper() - def getCookie(self, username, password, LD, login): + def get_AD(self): + def md5(s): + m = hashlib.md5() + m.update(s.encode("utf-8")) + return m.hexdigest() + + def sha256(s): + m = hashlib.sha256() + m.update(s.encode("utf-8")) + return m.hexdigest().upper() + + wa_inner_version = self.getVersion() + + if self.device == "mc7010": + hash_function = md5 + else: + hash_function = sha256 + + cr_version = "" # is empty, is printed on getInfos + + a = hash_function(wa_inner_version + cr_version) + + header = {"Referer": self.referer} + rd_response = s.get(self.referer + "goform/goform_get_cmd_process?isTest=false&cmd=RD", headers=header, verify=False, timeout=1) + + rd_json = rd_response.json() + u = rd_json.get("RD", "") + + result = hash_function(a + u) # Use hash_function here as well + # print("result (hash of a + u):", result) # debug check against device + + return result + + def getVersion(self): + header = {"Referer": self.referer} + payload = "isTest=false&cmd=wa_inner_version" + r = s.get(self.referer + f"goform/goform_get_cmd_process?{payload}", headers=header, data=payload, verify=False, timeout=1) + return r.json()["wa_inner_version"] + + def getCookie(self, username, password, LD, device): header = {"Referer": self.referer} hashPassword = self.hash(password).upper() ztePass = self.hash(hashPassword + LD).upper() - if login == "multi": + if device == "mc7010": + payload = { + 'isTest': 'false', + 'goformId': 'LOGIN', + 'password': ztePass, + } + else: payload = { 'isTest': 'false', 'goformId': 'LOGIN_MULTI_USER', 'password': ztePass, 'user': username } - else: - payload = { - 'isTest': 'false', - 'goformId': 'LOGIN', - 'password': ztePass, - } - r = s.post(self.referer + "goform/goform_set_cmd_process", headers=header, data=payload, verify=False) + r = s.post(self.referer + "goform/goform_set_cmd_process", headers=header, data=payload, verify=False, timeout=1) return "stok=" + r.cookies["stok"].strip('\"') - - def zteinfo(self): + + def resettr69(self): ip = self.ip - cookie = self.getCookie(username=self.username, password=self.password, LD=self.get_LD(), login=self.login) - cmd_url = f"{self.protocol}://{self.ip}/goform/goform_get_cmd_process?multi_data=1&isTest=false&cmd=wa_inner_version%2Cwan_ipaddr%2Cwan_apn%2Cnetwork_type%2Cnr5g_action_band%2CZ5g_rsrq%2CZ5g_rsrp%2CZ5g_rssi%2CZ5g_SINR%2Cnr_multi_ca_scell_info%2Clte_multi_ca_scell_info%2Clte_multi_ca_scell_sig_info%2Clte_ca_pcell_band%2Clte_rsrp%2Clte_rsrq%2Clte_rssi%2Clte_snr%2Cmonthly_tx_bytes%2Cmonthly_rx_bytes%2Crealtime_tx_bytes%2Crealtime_rx_bytes%2Crealtime_tx_thrpt%2Crealtime_rx_thrpt%2Cnr_multi_ca_scell_info%2Cnr5g_action_band" - - cookie_pass = cookie + cookie = self.getCookie(username=self.username, password=self.password, LD=self.get_LD(), login=self.device) + cmd_url = f"{self.protocol}://{self.ip}/goform/goform_get_cmd_process?multi_data=1&isTest=false&cmd=cr_version%2Ctr069_ServerURL%2Ctr069_CPEPortNo%2Ctr069_ServerUsername%2Ctr069_ServerPassword%2Ctr069_ConnectionRequestUname%2Ctr069_ConnectionRequestPassword%2Cwan_ipaddr%2Ctr069_PeriodicInformEnable%2Ctr069_PeriodicInformInterval%2Ctr069_CertEnable%2Ctr069_DataModule%2Ctr069_Webui_DataModuleSupport" headers = { "Host": ip, "Referer": f"{self.referer}index.html", - "Cookie": f"{cookie_pass}" + "Cookie": f"{cookie}" } response = s.get(cmd_url, headers=headers, verify=False) + print("Current TR069 config:\n") + print(response.text) + print("\n---") + + ip = self.ip + cookie = self.getCookie(username=self.username, password=self.password, LD=self.get_LD(), login=self.device) + + headers = { + "Host": ip, + "Referer": f"{self.referer}index.html", + "Cookie": f"{cookie}" + } + + payload = { + 'isTest': 'false', + 'goformId': 'setTR069Config', + 'AD': self.get_AD(), + "tr069_ServerURL": "", + "tr069_CPEPortNo": "", + "tr069_ServerUsername": "", + "tr069_ServerPassword": "", + "tr069_ConnectionRequestUname": "", + "tr069_ConnectionRequestPassword": "", + "tr069_PeriodicInformEnable": "1", + "tr069_PeriodicInformInterval": "7200", + "tr069_CertEnable": "1", + "tr069_DataModule": "", + "tr069_Webui_DataModuleSupport": "" + } + response = s.post(self.referer + "goform/goform_set_cmd_process", headers=headers, data=payload, verify=False) + + success = json.loads(response.text) + if success["result"] == "success": + return True + else: + return False + + def zteinfo(self): + ip = self.ip + cookie = self.getCookie(username=self.username, password=self.password, LD=self.get_LD(), login=self.device) + cmd_url = f"{self.protocol}://{self.ip}/goform/goform_get_cmd_process?multi_data=1&isTest=false&cmd=wa_inner_version%2Cwan_ipaddr%2Cwan_apn%2Cnetwork_type%2Cnr5g_action_band%2CZ5g_rsrq%2CZ5g_rsrp%2CZ5g_rssi%2CZ5g_SINR%2Cnr_multi_ca_scell_info%2Clte_multi_ca_scell_info%2Clte_multi_ca_scell_sig_info%2Clte_ca_pcell_band%2Clte_rsrp%2Clte_rsrq%2Clte_rssi%2Clte_snr%2Cmonthly_tx_bytes%2Cmonthly_rx_bytes%2Crealtime_tx_bytes%2Crealtime_rx_bytes%2Crealtime_tx_thrpt%2Crealtime_rx_thrpt%2Cnr_multi_ca_scell_info%2Cnr5g_action_band" + + headers = { + "Host": ip, + "Referer": f"{self.referer}index.html", + "Cookie": f"{cookie}" + } + + response = s.get(cmd_url, headers=headers, verify=False, timeout=1) return response.text -zteInstance = zteRouter(args.ip, args.username, args.password, args.login) +zteInstance = zteRouter(args.ip, args.username, args.password, args.login, args.device) serverPort = args.port class serveInfos(BaseHTTPRequestHandler): @@ -193,9 +287,18 @@ class serveInfos(BaseHTTPRequestHandler): self.wfile.write(bytes("zte_bandwidth_rx %s\n\n" % gatheredJson["realtime_rx_thrpt"], "utf-8")) if __name__ == "__main__": - webServer = HTTPServer(("0.0.0.0", serverPort), serveInfos) - print("Server started http://%s:%s" % ("0.0.0.0", serverPort)) - webServer.serve_forever() + if args.reset: + print("Trying TR069 reset...") + try: + zteInstance.resettr69() + except: + print("Failure!") + else: + print("Success!") + else: + webServer = HTTPServer(("0.0.0.0", serverPort), serveInfos) + print("Server started http://%s:%s" % ("0.0.0.0", serverPort)) + webServer.serve_forever() - webServer.server_close() - print("Server stopped.") \ No newline at end of file + webServer.server_close() + print("Server stopped.") \ No newline at end of file