zte-exporter/zte_exporter.py

304 lines
14 KiB
Python

#!/bin/python3
import requests
from requests.exceptions import RequestException
import hashlib
from datetime import datetime
import urllib.parse
import json
import sys
import time
import urllib3
from http.server import BaseHTTPRequestHandler, HTTPServer
import argparse
parser = argparse.ArgumentParser("ZTE simple exporter")
parser.add_argument("ip", help="Router ip address")
parser.add_argument("--username", help="Router username", default="admin")
parser.add_argument("password", help="Router password")
parser.add_argument("--port", help="Serve metrics on what port", type=int, default=8999)
parser.add_argument("--reset", help="Reset tr069 info", default=False, type=bool)
parser.add_argument("--device", help="Device type, valid devices: mc7010, defaults to generic (multi login, sha256 hash)", default="generic")
args = parser.parse_args()
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
s = requests.Session()
class zteRouter:
def __init__(self, ip, username, password, device):
self.ip = ip
self.protocol = "http" # default to http
self.username = username
self.password = password
self.device = device
self.try_set_protocol()
self.referer = f"{self.protocol}://{self.ip}/"
def try_set_protocol(self):
protocols = ["http", "https"]
for protocol in protocols:
url = f"{protocol}://{self.ip}"
try:
response = requests.get(url, timeout=5, verify=False)
if response.ok:
self.protocol = protocol
# print(f"{self.ip} is accessible via {protocol}")
return
except RequestException:
pass # If RequestException occurs, try the next protocol
# print(f"Could not determine the protocol for {self.ip}")
def getVersion(self):
header = {"Referer": self.referer}
payload = "isTest=false&cmd=wa_inner_version"
r = s.get(self.referer + f"goform/goform_get_cmd_process?{payload}", headers=header, data=payload, verify=False, timeout=1)
return r.json()["wa_inner_version"]
def hash(self, str):
return hashlib.sha256(str.encode()).hexdigest()
def get_LD(self):
header = {"Referer": self.referer}
payload = "isTest=false&cmd=LD"
r = s.get(self.referer + f"goform/goform_get_cmd_process?{payload}&_=", headers=header, data=payload, verify=False, timeout=1)
return r.json()["LD"].upper()
def get_AD(self):
def md5(s):
m = hashlib.md5()
m.update(s.encode("utf-8"))
return m.hexdigest()
def sha256(s):
m = hashlib.sha256()
m.update(s.encode("utf-8"))
return m.hexdigest().upper()
wa_inner_version = self.getVersion()
if self.device == "mc7010":
hash_function = md5
else:
hash_function = sha256
cr_version = "" # is empty, is printed on getInfos
a = hash_function(wa_inner_version + cr_version)
header = {"Referer": self.referer}
rd_response = s.get(self.referer + "goform/goform_get_cmd_process?isTest=false&cmd=RD", headers=header, verify=False, timeout=1)
rd_json = rd_response.json()
u = rd_json.get("RD", "")
result = hash_function(a + u) # Use hash_function here as well
# print("result (hash of a + u):", result) # debug check against device
return result
def getVersion(self):
header = {"Referer": self.referer}
payload = "isTest=false&cmd=wa_inner_version"
r = s.get(self.referer + f"goform/goform_get_cmd_process?{payload}", headers=header, data=payload, verify=False, timeout=1)
return r.json()["wa_inner_version"]
def getCookie(self, username, password, LD, device):
header = {"Referer": self.referer}
hashPassword = self.hash(password).upper()
ztePass = self.hash(hashPassword + LD).upper()
if device == "mc7010":
payload = {
'isTest': 'false',
'goformId': 'LOGIN',
'password': ztePass,
}
else:
payload = {
'isTest': 'false',
'goformId': 'LOGIN_MULTI_USER',
'password': ztePass,
'user': username
}
r = s.post(self.referer + "goform/goform_set_cmd_process", headers=header, data=payload, verify=False, timeout=1)
return "stok=" + r.cookies["stok"].strip('\"')
def resettr69(self):
ip = self.ip
cookie = self.getCookie(username=self.username, password=self.password, LD=self.get_LD(), device=self.device)
cmd_url = f"{self.protocol}://{self.ip}/goform/goform_get_cmd_process?multi_data=1&isTest=false&cmd=cr_version%2Ctr069_ServerURL%2Ctr069_CPEPortNo%2Ctr069_ServerUsername%2Ctr069_ServerPassword%2Ctr069_ConnectionRequestUname%2Ctr069_ConnectionRequestPassword%2Cwan_ipaddr%2Ctr069_PeriodicInformEnable%2Ctr069_PeriodicInformInterval%2Ctr069_CertEnable%2Ctr069_DataModule%2Ctr069_Webui_DataModuleSupport"
headers = {
"Host": ip,
"Referer": f"{self.referer}index.html",
"Cookie": f"{cookie}"
}
response = s.get(cmd_url, headers=headers, verify=False)
print("Current TR069 config:\n")
print(response.text)
print("\n---")
ip = self.ip
cookie = self.getCookie(username=self.username, password=self.password, LD=self.get_LD(), device=self.device)
headers = {
"Host": ip,
"Referer": f"{self.referer}index.html",
"Cookie": f"{cookie}"
}
payload = {
'isTest': 'false',
'goformId': 'setTR069Config',
'AD': self.get_AD(),
"tr069_ServerURL": "",
"tr069_CPEPortNo": "",
"tr069_ServerUsername": "",
"tr069_ServerPassword": "",
"tr069_ConnectionRequestUname": "",
"tr069_ConnectionRequestPassword": "",
"tr069_PeriodicInformEnable": "1",
"tr069_PeriodicInformInterval": "7200",
"tr069_CertEnable": "1",
"tr069_DataModule": "",
"tr069_Webui_DataModuleSupport": ""
}
response = s.post(self.referer + "goform/goform_set_cmd_process", headers=headers, data=payload, verify=False)
success = json.loads(response.text)
if success["result"] == "success":
return True
else:
return False
def zteinfo(self):
ip = self.ip
cookie = self.getCookie(username=self.username, password=self.password, LD=self.get_LD(), device=self.device)
cmd_url = f"{self.protocol}://{self.ip}/goform/goform_get_cmd_process?multi_data=1&isTest=false&cmd=wa_inner_version%2Cwan_ipaddr%2Cwan_apn%2Cnetwork_type%2Cnr5g_action_band%2CZ5g_rsrq%2CZ5g_rsrp%2CZ5g_rssi%2CZ5g_SINR%2Cnr_multi_ca_scell_info%2Clte_multi_ca_scell_info%2Clte_multi_ca_scell_sig_info%2Clte_ca_pcell_band%2Clte_rsrp%2Clte_rsrq%2Clte_rssi%2Clte_snr%2Cmonthly_tx_bytes%2Cmonthly_rx_bytes%2Crealtime_tx_bytes%2Crealtime_rx_bytes%2Crealtime_tx_thrpt%2Crealtime_rx_thrpt%2Cnr_multi_ca_scell_info%2Cnr5g_action_band"
headers = {
"Host": ip,
"Referer": f"{self.referer}index.html",
"Cookie": f"{cookie}"
}
response = s.get(cmd_url, headers=headers, verify=False, timeout=1)
return response.text
zteInstance = zteRouter(args.ip, args.username, args.password, args.device)
serverPort = args.port
class serveInfos(BaseHTTPRequestHandler):
def do_GET(self):
gatheredJson = json.loads(zteInstance.zteinfo())
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(bytes("# HELP zte_modem_info Modem general information\n# TYPE zte_modem_info untyped\n", "utf-8"))
self.wfile.write(bytes("zte_modem_info{firmware=\"%s\"," % gatheredJson["wa_inner_version"], "utf-8"))
self.wfile.write(bytes("ip_addr=\"%s\"," % gatheredJson["wan_ipaddr"], "utf-8"))
self.wfile.write(bytes("apn=\"%s\"," % gatheredJson["wan_apn"], "utf-8"))
self.wfile.write(bytes("network_type=\"%s\"} 1\n\n" % gatheredJson["network_type"], "utf-8"))
self.wfile.write(bytes("# HELP zte_rsrp Reference Signal Received Power\n# TYPE zte_rsrp gauge\n", "utf-8"))
self.wfile.write(bytes("# HELP zte_snr Signal-to-Interference-plus-Noise Ratio\n# TYPE zte_snr gauge\n", "utf-8"))
self.wfile.write(bytes("# HELP zte_rsrq Reference Signal Received Quality\n# TYPE zte_rsrq gauge\n", "utf-8"))
self.wfile.write(bytes("# HELP zte_rssi Received Signal Strength Indicator\n# TYPE zte_rssi gauge\n", "utf-8"))
if gatheredJson['nr5g_action_band'] != 0:
self.wfile.write(bytes("zte_rsrq{band=\"%s\"} " % gatheredJson["nr5g_action_band"], "utf-8"))
self.wfile.write(bytes("%s\n" % gatheredJson["Z5g_rsrq"], "utf-8"))
self.wfile.write(bytes("zte_rsrp{band=\"%s\"} " % gatheredJson["nr5g_action_band"], "utf-8"))
self.wfile.write(bytes("%s\n" % gatheredJson["Z5g_rsrp"], "utf-8"))
self.wfile.write(bytes("zte_rssi{band=\"%s\"} " % gatheredJson["nr5g_action_band"], "utf-8"))
self.wfile.write(bytes("%s\n" % gatheredJson["Z5g_rssi"], "utf-8"))
self.wfile.write(bytes("zte_snr{band=\"%s\"} " % gatheredJson["nr5g_action_band"], "utf-8"))
self.wfile.write(bytes("%s\n\n" % gatheredJson["Z5g_SINR"], "utf-8"))
if gatheredJson['nr_multi_ca_scell_info']:
for index, item in enumerate(gatheredJson['nr_multi_ca_scell_info'].split(";")):
nrbit = item.split(",")
if not len(nrbit[0]) == 0:
self.wfile.write(bytes("zte_rsrq{band=\"%s\"} " % nrbit[3], "utf-8"))
self.wfile.write(bytes("%s\n" % nrbit[8], "utf-8"))
self.wfile.write(bytes("zte_rsrp{band=\"%s\"} " % nrbit[3], "utf-8"))
self.wfile.write(bytes("%s\n" % nrbit[7], "utf-8"))
self.wfile.write(bytes("zte_rssi{band=\"%s\"} " % nrbit[3], "utf-8"))
self.wfile.write(bytes("%s\n" % nrbit[10], "utf-8"))
self.wfile.write(bytes("zte_snr{band=\"%s\"} " % nrbit[3], "utf-8"))
self.wfile.write(bytes("%s\n\n" % nrbit[9], "utf-8"))
if gatheredJson['lte_ca_pcell_band'] and gatheredJson['lte_ca_pcell_band'] != "0":
self.wfile.write(bytes("zte_rsrq{band=\"b%s\"} " % gatheredJson["lte_ca_pcell_band"], "utf-8"))
self.wfile.write(bytes("%s\n" % gatheredJson["lte_rsrq"], "utf-8"))
self.wfile.write(bytes("zte_rsrp{band=\"b%s\"} " % gatheredJson["lte_ca_pcell_band"], "utf-8"))
self.wfile.write(bytes("%s\n" % gatheredJson["lte_rsrp"], "utf-8"))
self.wfile.write(bytes("zte_rssi{band=\"b%s\"} " % gatheredJson["lte_ca_pcell_band"], "utf-8"))
self.wfile.write(bytes("%s\n" % gatheredJson["lte_rssi"], "utf-8"))
self.wfile.write(bytes("zte_snr{band=\"b%s\"} " % gatheredJson["lte_ca_pcell_band"], "utf-8"))
self.wfile.write(bytes("%s\n\n" % gatheredJson["lte_snr"], "utf-8"))
if gatheredJson['lte_multi_ca_scell_info'] and gatheredJson['lte_multi_ca_scell_sig_info']:
for index, item in enumerate(gatheredJson['lte_multi_ca_scell_info'].split(";")):
cabit = item.split(",")
sigbits = gatheredJson['lte_multi_ca_scell_sig_info'].split(";")
sigbit = sigbits[index].split(",")
if not len(cabit[0]) == 0:
self.wfile.write(bytes("zte_rsrq{band=\"b%s\"} " % cabit[3], "utf-8"))
self.wfile.write(bytes("%s\n" % sigbit[1], "utf-8"))
self.wfile.write(bytes("zte_rsrp{band=\"b%s\"} " % cabit[3], "utf-8"))
self.wfile.write(bytes("%s\n" % sigbit[0], "utf-8"))
self.wfile.write(bytes("zte_rssi{band=\"b%s\"} " % cabit[3], "utf-8"))
self.wfile.write(bytes("%s\n" % sigbit[3], "utf-8"))
self.wfile.write(bytes("zte_snr{band=\"b%s\"} " % cabit[3], "utf-8"))
self.wfile.write(bytes("%s\n\n" % sigbit[2], "utf-8"))
self.wfile.write(bytes("# HELP zte_bandwidth_used_monthly_tx Used bandwidth, transmit (bytes)\n# TYPE zte_bandwidth_used_monthly_tx counter\n", "utf-8"))
self.wfile.write(bytes("zte_bandwidth_used_monthly_tx %s\n\n" % gatheredJson["monthly_tx_bytes"], "utf-8"))
self.wfile.write(bytes("# HELP zte_bandwidth_used_monthly_rx Used bandwidth, receive (bytes)\n# TYPE zte_bandwidth_used_monthly_rx counter\n", "utf-8"))
self.wfile.write(bytes("zte_bandwidth_used_monthly_rx %s\n\n" % gatheredJson["monthly_rx_bytes"], "utf-8"))
self.wfile.write(bytes("# HELP zte_bandwidth_used_session_tx Used bandwidth, current session, transmit (bytes)\n# TYPE zte_bandwidth_used_session_tx counter\n", "utf-8"))
self.wfile.write(bytes("zte_bandwidth_used_session_tx %s\n\n" % gatheredJson["realtime_tx_bytes"], "utf-8"))
self.wfile.write(bytes("# HELP zte_bandwidth_used_session_rx Used bandwidth, current session, receive (bytes)\n# TYPE zte_bandwidth_used_session_rx counter\n", "utf-8"))
self.wfile.write(bytes("zte_bandwidth_used_session_rx %s\n\n" % gatheredJson["realtime_tx_bytes"], "utf-8"))
self.wfile.write(bytes("# HELP zte_bandwidth_tx Current bandwidth, transmit (bytes)\n# TYPE zte_bandwidth_tx gauge\n", "utf-8"))
self.wfile.write(bytes("zte_bandwidth_tx %s\n\n" % gatheredJson["realtime_tx_thrpt"], "utf-8"))
self.wfile.write(bytes("# HELP bandwidth_rx Current bandwidth, receive (bytes)\n# TYPE zte_bandwidth_rx gauge\n", "utf-8"))
self.wfile.write(bytes("zte_bandwidth_rx %s\n\n" % gatheredJson["realtime_rx_thrpt"], "utf-8"))
if __name__ == "__main__":
if args.reset:
print("Trying TR069 reset...")
try:
zteInstance.resettr69()
except:
print("Failure!")
else:
print("Success!")
else:
webServer = HTTPServer(("0.0.0.0", serverPort), serveInfos)
print("Server started http://%s:%s" % ("0.0.0.0", serverPort))
webServer.serve_forever()
webServer.server_close()
print("Server stopped.")