#!/bin/python3 import requests from requests.exceptions import RequestException import hashlib from datetime import datetime import urllib.parse import json import sys import time import urllib3 from http.server import BaseHTTPRequestHandler, HTTPServer import argparse parser = argparse.ArgumentParser("ZTE simple exporter") parser.add_argument("port", help="Serve metrics on what port", type=int) parser.add_argument("ip", help="Router ip address") parser.add_argument("username", help="Router username", default="admin") parser.add_argument("password", help="Router password") parser.add_argument("--login", help="Login method (multi, single)", default="single") args = parser.parse_args() urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) s = requests.Session() class zteRouter: def __init__(self, ip, username, password, login): self.login = login self.ip = ip self.protocol = "http" # default to http self.username = username self.password = password self.try_set_protocol() self.referer = f"{self.protocol}://{self.ip}/" def try_set_protocol(self): protocols = ["http", "https"] for protocol in protocols: url = f"{protocol}://{self.ip}" try: response = requests.get(url, timeout=5, verify=False) if response.ok: self.protocol = protocol # print(f"{self.ip} is accessible via {protocol}") return except RequestException: pass # If RequestException occurs, try the next protocol # print(f"Could not determine the protocol for {self.ip}") def hash(self, str): return hashlib.sha256(str.encode()).hexdigest() def get_LD(self): header = {"Referer": self.referer} payload = "isTest=false&cmd=LD" r = s.get(self.referer + f"goform/goform_get_cmd_process?{payload}&_=", headers=header, data=payload, verify=False) return r.json()["LD"].upper() def getCookie(self, username, password, LD, login): header = {"Referer": self.referer} hashPassword = self.hash(password).upper() ztePass = self.hash(hashPassword + LD).upper() if login == "multi": payload = { 'isTest': 'false', 'goformId': 'LOGIN_MULTI_USER', 'password': ztePass, 'user': username } else: payload = { 'isTest': 'false', 'goformId': 'LOGIN', 'password': ztePass, } r = s.post(self.referer + "goform/goform_set_cmd_process", headers=header, data=payload, verify=False) return "stok=" + r.cookies["stok"].strip('\"') def zteinfo(self): ip = self.ip cookie = self.getCookie(username=self.username, password=self.password, LD=self.get_LD(), login=self.login) cmd_url = f"{self.protocol}://{self.ip}/goform/goform_get_cmd_process?multi_data=1&isTest=false&cmd=wa_inner_version%2Cwan_ipaddr%2Cwan_apn%2Cnetwork_type%2Cnr5g_action_band%2CZ5g_rsrq%2CZ5g_rsrp%2CZ5g_rssi%2CZ5g_SINR%2Cnr_multi_ca_scell_info%2Clte_multi_ca_scell_info%2Clte_ca_pcell_band%2Clte_rsrp%2Clte_rsrq%2Clte_rssi%2Clte_snr%2Cmonthly_tx_bytes%2Cmonthly_rx_bytes%2Crealtime_tx_bytes%2Crealtime_rx_bytes%2Crealtime_tx_thrpt%2Crealtime_rx_thrpt%2Cnr_multi_ca_scell_info%2Cnr5g_action_band" cookie_pass = cookie headers = { "Host": ip, "Referer": f"{self.referer}index.html", "Cookie": f"{cookie_pass}" } response = s.get(cmd_url, headers=headers, verify=False) return response.text zteInstance = zteRouter(args.ip, args.username, args.password, args.login) serverPort = args.port class serveInfos(BaseHTTPRequestHandler): def do_GET(self): gatheredJson = json.loads(zteInstance.zteinfo()) self.send_response(200) self.send_header("Content-type", "text/plain") self.end_headers() self.wfile.write(bytes("# HELP zte_modem_info Modem general information\n# TYPE zte_modem_info untyped\n", "utf-8")) self.wfile.write(bytes("zte_modem_info{firmware=\"%s\"," % gatheredJson["wa_inner_version"], "utf-8")) self.wfile.write(bytes("ip_addr=\"%s\"," % gatheredJson["wan_ipaddr"], "utf-8")) self.wfile.write(bytes("apn=\"%s\"," % gatheredJson["wan_apn"], "utf-8")) self.wfile.write(bytes("network_type=\"%s\"} 1\n\n" % gatheredJson["network_type"], "utf-8")) self.wfile.write(bytes("# HELP zte_rsrp Reference Signal Received Power\n# TYPE zte_rsrp gauge\n", "utf-8")) self.wfile.write(bytes("# HELP zte_snr Signal-to-Interference-plus-Noise Ratio\n# TYPE zte_snr gauge\n", "utf-8")) self.wfile.write(bytes("# HELP zte_rsrq Reference Signal Received Quality\n# TYPE zte_rsrq gauge\n", "utf-8")) self.wfile.write(bytes("# HELP zte_rssi Received Signal Strength Indicator\n# TYPE zte_rssi gauge\n", "utf-8")) if gatheredJson['nr5g_action_band']: self.wfile.write(bytes("zte_rsrq{band=\"%s\"} " % gatheredJson["nr5g_action_band"], "utf-8")) self.wfile.write(bytes("%s\n" % gatheredJson["Z5g_rsrq"], "utf-8")) self.wfile.write(bytes("zte_rsrp{band=\"%s\"} " % gatheredJson["nr5g_action_band"], "utf-8")) self.wfile.write(bytes("%s\n" % gatheredJson["Z5g_rsrp"], "utf-8")) self.wfile.write(bytes("zte_rssi{band=\"%s\"} " % gatheredJson["nr5g_action_band"], "utf-8")) self.wfile.write(bytes("%s\n" % gatheredJson["Z5g_rssi"], "utf-8")) self.wfile.write(bytes("zte_snr{band=\"%s\"} " % gatheredJson["nr5g_action_band"], "utf-8")) self.wfile.write(bytes("%s\n\n" % gatheredJson["Z5g_SINR"], "utf-8")) if gatheredJson['nr_multi_ca_scell_info']: for index, item in enumerate(gatheredJson['nr_multi_ca_scell_info'].split(";")): nrbit = item.split(",") if not len(nrbit[0]) == 0: self.wfile.write(bytes("zte_rsrq{band=\"%s\"} " % nrbit[3], "utf-8")) self.wfile.write(bytes("%s\n" % nrbit[8], "utf-8")) self.wfile.write(bytes("zte_rsrp{band=\"%s\"} " % nrbit[3], "utf-8")) self.wfile.write(bytes("%s\n" % nrbit[7], "utf-8")) self.wfile.write(bytes("zte_rssi{band=\"%s\"} " % nrbit[3], "utf-8")) self.wfile.write(bytes("%s\n" % nrbit[10], "utf-8")) self.wfile.write(bytes("zte_snr{band=\"%s\"} " % nrbit[3], "utf-8")) self.wfile.write(bytes("%s\n\n" % nrbit[9], "utf-8")) if gatheredJson['lte_ca_pcell_band']: self.wfile.write(bytes("zte_rsrq{band=\"b%s\"} " % gatheredJson["lte_ca_pcell_band"], "utf-8")) self.wfile.write(bytes("%s\n" % gatheredJson["lte_rsrq"], "utf-8")) self.wfile.write(bytes("zte_rsrp{band=\"b%s\"} " % gatheredJson["lte_ca_pcell_band"], "utf-8")) self.wfile.write(bytes("%s\n" % gatheredJson["lte_rsrp"], "utf-8")) self.wfile.write(bytes("zte_rssi{band=\"b%s\"} " % gatheredJson["lte_ca_pcell_band"], "utf-8")) self.wfile.write(bytes("%s\n" % gatheredJson["lte_rssi"], "utf-8")) self.wfile.write(bytes("zte_snr{band=\"b%s\"} " % gatheredJson["lte_ca_pcell_band"], "utf-8")) self.wfile.write(bytes("%s\n\n" % gatheredJson["lte_snr"], "utf-8")) if gatheredJson['lte_multi_ca_scell_info']: for index, item in enumerate(gatheredJson['lte_multi_ca_scell_info'].split(";")): cabit = item.split(",") sigbits = gatheredJson['lte_multi_ca_scell_sig_info'].split(";") sigbit = sigbits[index].split(",") if not len(cabit[0]) == 0: self.wfile.write(bytes("zte_rsrq{band=\"b%s\"} " % cabit[3], "utf-8")) self.wfile.write(bytes("%s\n" % sigbit[0], "utf-8")) self.wfile.write(bytes("zte_rsrp{band=\"b%s\"} " % cabit[3], "utf-8")) self.wfile.write(bytes("%s\n" % sigbit[1], "utf-8")) self.wfile.write(bytes("zte_rssi{band=\"b%s\"} " % cabit[3], "utf-8")) self.wfile.write(bytes("%s\n" % sigbit[3], "utf-8")) self.wfile.write(bytes("zte_snr{band=\"b%s\"} " % cabit[3], "utf-8")) self.wfile.write(bytes("%s\n\n" % sigbit[2], "utf-8")) self.wfile.write(bytes("# HELP zte_bandwidth_used_monthly_tx Used bandwidth, transmit (bytes)\n# TYPE zte_bandwidth_used_monthly_tx counter\n", "utf-8")) self.wfile.write(bytes("zte_bandwidth_used_monthly_tx %s\n\n" % gatheredJson["monthly_tx_bytes"], "utf-8")) self.wfile.write(bytes("# HELP zte_bandwidth_used_monthly_rx Used bandwidth, receive (bytes)\n# TYPE zte_bandwidth_used_monthly_rx counter\n", "utf-8")) self.wfile.write(bytes("zte_bandwidth_used_monthly_rx %s\n\n" % gatheredJson["monthly_rx_bytes"], "utf-8")) self.wfile.write(bytes("# HELP zte_bandwidth_used_session_tx Used bandwidth, current session, transmit (bytes)\n# TYPE zte_bandwidth_used_session_tx counter\n", "utf-8")) self.wfile.write(bytes("zte_bandwidth_used_session_tx %s\n\n" % gatheredJson["realtime_tx_bytes"], "utf-8")) self.wfile.write(bytes("# HELP zte_bandwidth_used_session_rx Used bandwidth, current session, receive (bytes)\n# TYPE zte_bandwidth_used_session_rx counter\n", "utf-8")) self.wfile.write(bytes("zte_bandwidth_used_session_rx %s\n\n" % gatheredJson["realtime_tx_bytes"], "utf-8")) self.wfile.write(bytes("# HELP zte_bandwidth_tx Current bandwidth, transmit (bytes)\n# TYPE zte_bandwidth_tx gauge\n", "utf-8")) self.wfile.write(bytes("zte_bandwidth_tx %s\n\n" % gatheredJson["realtime_tx_thrpt"], "utf-8")) self.wfile.write(bytes("# HELP bandwidth_rx Current bandwidth, receive (bytes)\n# TYPE zte_bandwidth_rx gauge\n", "utf-8")) self.wfile.write(bytes("zte_bandwidth_rx %s\n\n" % gatheredJson["realtime_rx_thrpt"], "utf-8")) if __name__ == "__main__": webServer = HTTPServer(("0.0.0.0", serverPort), serveInfos) print("Server started http://%s:%s" % ("0.0.0.0", serverPort)) webServer.serve_forever() webServer.server_close() print("Server stopped.")