add tr069 reset functionality, first save

main
"Riku" 2025-01-21 12:51:10 +02:00
parent 251673403a
commit 4596e0dddb
1 changed files with 129 additions and 26 deletions

View File

@ -13,11 +13,12 @@ from http.server import BaseHTTPRequestHandler, HTTPServer
import argparse
parser = argparse.ArgumentParser("ZTE simple exporter")
parser.add_argument("port", help="Serve metrics on what port", type=int)
parser.add_argument("ip", help="Router ip address")
parser.add_argument("username", help="Router username", default="admin")
parser.add_argument("password", help="Router password")
parser.add_argument("--login", help="Login method (multi, single)", default="single")
parser.add_argument("--port", help="Serve metrics on what port", type=int, default=8999)
parser.add_argument("--reset", help="Reset tr069 info", default=False, type=bool)
parser.add_argument("--device", help="Device type, valid devices: mc7010, defaults to generic (multi login, sha256 hash)", default="generic")
args = parser.parse_args()
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@ -26,12 +27,13 @@ s = requests.Session()
class zteRouter:
def __init__(self, ip, username, password, login):
def __init__(self, ip, username, password, login, device):
self.login = login
self.ip = ip
self.protocol = "http" # default to http
self.username = username
self.password = password
self.device = device
self.try_set_protocol()
self.referer = f"{self.protocol}://{self.ip}/"
@ -49,54 +51,146 @@ class zteRouter:
pass # If RequestException occurs, try the next protocol
# print(f"Could not determine the protocol for {self.ip}")
def getVersion(self):
header = {"Referer": self.referer}
payload = "isTest=false&cmd=wa_inner_version"
r = s.get(self.referer + f"goform/goform_get_cmd_process?{payload}", headers=header, data=payload, verify=False, timeout=1)
return r.json()["wa_inner_version"]
def hash(self, str):
return hashlib.sha256(str.encode()).hexdigest()
def get_LD(self):
header = {"Referer": self.referer}
payload = "isTest=false&cmd=LD"
r = s.get(self.referer + f"goform/goform_get_cmd_process?{payload}&_=", headers=header, data=payload, verify=False)
r = s.get(self.referer + f"goform/goform_get_cmd_process?{payload}&_=", headers=header, data=payload, verify=False, timeout=1)
return r.json()["LD"].upper()
def getCookie(self, username, password, LD, login):
def get_AD(self):
def md5(s):
m = hashlib.md5()
m.update(s.encode("utf-8"))
return m.hexdigest()
def sha256(s):
m = hashlib.sha256()
m.update(s.encode("utf-8"))
return m.hexdigest().upper()
wa_inner_version = self.getVersion()
if self.device == "mc7010":
hash_function = md5
else:
hash_function = sha256
cr_version = "" # is empty, is printed on getInfos
a = hash_function(wa_inner_version + cr_version)
header = {"Referer": self.referer}
rd_response = s.get(self.referer + "goform/goform_get_cmd_process?isTest=false&cmd=RD", headers=header, verify=False, timeout=1)
rd_json = rd_response.json()
u = rd_json.get("RD", "")
result = hash_function(a + u) # Use hash_function here as well
# print("result (hash of a + u):", result) # debug check against device
return result
def getVersion(self):
header = {"Referer": self.referer}
payload = "isTest=false&cmd=wa_inner_version"
r = s.get(self.referer + f"goform/goform_get_cmd_process?{payload}", headers=header, data=payload, verify=False, timeout=1)
return r.json()["wa_inner_version"]
def getCookie(self, username, password, LD, device):
header = {"Referer": self.referer}
hashPassword = self.hash(password).upper()
ztePass = self.hash(hashPassword + LD).upper()
if login == "multi":
if device == "mc7010":
payload = {
'isTest': 'false',
'goformId': 'LOGIN',
'password': ztePass,
}
else:
payload = {
'isTest': 'false',
'goformId': 'LOGIN_MULTI_USER',
'password': ztePass,
'user': username
}
else:
payload = {
'isTest': 'false',
'goformId': 'LOGIN',
'password': ztePass,
}
r = s.post(self.referer + "goform/goform_set_cmd_process", headers=header, data=payload, verify=False)
r = s.post(self.referer + "goform/goform_set_cmd_process", headers=header, data=payload, verify=False, timeout=1)
return "stok=" + r.cookies["stok"].strip('\"')
def zteinfo(self):
def resettr69(self):
ip = self.ip
cookie = self.getCookie(username=self.username, password=self.password, LD=self.get_LD(), login=self.login)
cmd_url = f"{self.protocol}://{self.ip}/goform/goform_get_cmd_process?multi_data=1&isTest=false&cmd=wa_inner_version%2Cwan_ipaddr%2Cwan_apn%2Cnetwork_type%2Cnr5g_action_band%2CZ5g_rsrq%2CZ5g_rsrp%2CZ5g_rssi%2CZ5g_SINR%2Cnr_multi_ca_scell_info%2Clte_multi_ca_scell_info%2Clte_multi_ca_scell_sig_info%2Clte_ca_pcell_band%2Clte_rsrp%2Clte_rsrq%2Clte_rssi%2Clte_snr%2Cmonthly_tx_bytes%2Cmonthly_rx_bytes%2Crealtime_tx_bytes%2Crealtime_rx_bytes%2Crealtime_tx_thrpt%2Crealtime_rx_thrpt%2Cnr_multi_ca_scell_info%2Cnr5g_action_band"
cookie_pass = cookie
cookie = self.getCookie(username=self.username, password=self.password, LD=self.get_LD(), login=self.device)
cmd_url = f"{self.protocol}://{self.ip}/goform/goform_get_cmd_process?multi_data=1&isTest=false&cmd=cr_version%2Ctr069_ServerURL%2Ctr069_CPEPortNo%2Ctr069_ServerUsername%2Ctr069_ServerPassword%2Ctr069_ConnectionRequestUname%2Ctr069_ConnectionRequestPassword%2Cwan_ipaddr%2Ctr069_PeriodicInformEnable%2Ctr069_PeriodicInformInterval%2Ctr069_CertEnable%2Ctr069_DataModule%2Ctr069_Webui_DataModuleSupport"
headers = {
"Host": ip,
"Referer": f"{self.referer}index.html",
"Cookie": f"{cookie_pass}"
"Cookie": f"{cookie}"
}
response = s.get(cmd_url, headers=headers, verify=False)
print("Current TR069 config:\n")
print(response.text)
print("\n---")
ip = self.ip
cookie = self.getCookie(username=self.username, password=self.password, LD=self.get_LD(), login=self.device)
headers = {
"Host": ip,
"Referer": f"{self.referer}index.html",
"Cookie": f"{cookie}"
}
payload = {
'isTest': 'false',
'goformId': 'setTR069Config',
'AD': self.get_AD(),
"tr069_ServerURL": "",
"tr069_CPEPortNo": "",
"tr069_ServerUsername": "",
"tr069_ServerPassword": "",
"tr069_ConnectionRequestUname": "",
"tr069_ConnectionRequestPassword": "",
"tr069_PeriodicInformEnable": "1",
"tr069_PeriodicInformInterval": "7200",
"tr069_CertEnable": "1",
"tr069_DataModule": "",
"tr069_Webui_DataModuleSupport": ""
}
response = s.post(self.referer + "goform/goform_set_cmd_process", headers=headers, data=payload, verify=False)
success = json.loads(response.text)
if success["result"] == "success":
return True
else:
return False
def zteinfo(self):
ip = self.ip
cookie = self.getCookie(username=self.username, password=self.password, LD=self.get_LD(), login=self.device)
cmd_url = f"{self.protocol}://{self.ip}/goform/goform_get_cmd_process?multi_data=1&isTest=false&cmd=wa_inner_version%2Cwan_ipaddr%2Cwan_apn%2Cnetwork_type%2Cnr5g_action_band%2CZ5g_rsrq%2CZ5g_rsrp%2CZ5g_rssi%2CZ5g_SINR%2Cnr_multi_ca_scell_info%2Clte_multi_ca_scell_info%2Clte_multi_ca_scell_sig_info%2Clte_ca_pcell_band%2Clte_rsrp%2Clte_rsrq%2Clte_rssi%2Clte_snr%2Cmonthly_tx_bytes%2Cmonthly_rx_bytes%2Crealtime_tx_bytes%2Crealtime_rx_bytes%2Crealtime_tx_thrpt%2Crealtime_rx_thrpt%2Cnr_multi_ca_scell_info%2Cnr5g_action_band"
headers = {
"Host": ip,
"Referer": f"{self.referer}index.html",
"Cookie": f"{cookie}"
}
response = s.get(cmd_url, headers=headers, verify=False, timeout=1)
return response.text
zteInstance = zteRouter(args.ip, args.username, args.password, args.login)
zteInstance = zteRouter(args.ip, args.username, args.password, args.login, args.device)
serverPort = args.port
class serveInfos(BaseHTTPRequestHandler):
@ -193,6 +287,15 @@ class serveInfos(BaseHTTPRequestHandler):
self.wfile.write(bytes("zte_bandwidth_rx %s\n\n" % gatheredJson["realtime_rx_thrpt"], "utf-8"))
if __name__ == "__main__":
if args.reset:
print("Trying TR069 reset...")
try:
zteInstance.resettr69()
except:
print("Failure!")
else:
print("Success!")
else:
webServer = HTTPServer(("0.0.0.0", serverPort), serveInfos)
print("Server started http://%s:%s" % ("0.0.0.0", serverPort))
webServer.serve_forever()