163 lines
19 KiB
Python
163 lines
19 KiB
Python
|
#!/bin/python3
|
||
|
|
||
|
import requests
|
||
|
from requests.exceptions import RequestException
|
||
|
import hashlib
|
||
|
from datetime import datetime
|
||
|
import binascii
|
||
|
import urllib.parse
|
||
|
import json
|
||
|
import sys
|
||
|
import time
|
||
|
import urllib3
|
||
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||
|
import argparse
|
||
|
|
||
|
parser = argparse.ArgumentParser("ZTE simple exporter")
|
||
|
parser.add_argument("ip", help="Router ip address")
|
||
|
parser.add_argument("username", help="Router username")
|
||
|
parser.add_argument("password", help="Router password")
|
||
|
args = parser.parse_args()
|
||
|
|
||
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||
|
|
||
|
s = requests.Session()
|
||
|
|
||
|
class zteRouter:
|
||
|
|
||
|
def __init__(self, ip, username, password):
|
||
|
self.ip = ip
|
||
|
self.protocol = "http" # default to http
|
||
|
self.username = username
|
||
|
self.password = password
|
||
|
self.try_set_protocol()
|
||
|
self.referer = f"{self.protocol}://{self.ip}/"
|
||
|
self.js = """/*\n * A JavaScript implementation of the RSA Data Security, Inc. MD5 Message\n * Digest Algorithm, as defined in RFC 1321.\n * Version 2.1 Copyright (C) Paul Johnston 1999 - 2002.\n * Other contributors: Greg Holt, Andrew Kepert, Ydnar, Lostinet\n * Distributed under the BSD License\n * See http://pajhome.org.uk/crypt/md5 for more info.\n */\n\n/*\n * Configurable variables. You may need to tweak these to be compatible with\n * the server-side, but the defaults work in most cases.\n */\nvar hexcase = 0; /* hex output format. 0 - lowercase; 1 - uppercase */\nvar b64pad = ""; /* base-64 pad character. "=" for strict RFC compliance */\nvar chrsz = 8; /* bits per input character. 8 - ASCII; 16 - Unicode */\n\n/*\n * These are the functions you\'ll usually want to call\n * They take string arguments and return either hex or base-64 encoded strings\n */\nfunction hex_md5(s){ return binl2hex(core_md5(str2binl(s), s.length * chrsz));}\nfunction b64_md5(s){ return binl2b64(core_md5(str2binl(s), s.length * chrsz));}\nfunction str_md5(s){ return binl2str(core_md5(str2binl(s), s.length * chrsz));}\nfunction hex_hmac_md5(key, data) { return binl2hex(core_hmac_md5(key, data)); }\nfunction b64_hmac_md5(key, data) { return binl2b64(core_hmac_md5(key, data)); }\nfunction str_hmac_md5(key, data) { return binl2str(core_hmac_md5(key, data)); }\n\n/*\n * Perform a simple self-test to see if the VM is working\n */\nfunction md5_vm_test()\n{\n return hex_md5("abc") == "900150983cd24fb0d6963f7d28e17f72";\n}\n\n/*\n * Calculate the MD5 of an array of little-endian words, and a bit length\n */\nfunction core_md5(x, len)\n{\n /* append padding */\n x[len >> 5] |= 0x80 << ((len) % 32);\n x[(((len + 64) >>> 9) << 4) + 14] = len;\n\n var a = 1732584193;\n var b = -271733879;\n var c = -1732584194;\n var d = 271733878;\n\n for(var i = 0; i < x.length; i += 16)\n {\n var olda = a;\n var oldb = b;\n var oldc = c;\n var oldd = d;\n\n a = md5_ff(a, b, c, d, x[i+ 0], 7 , -680876936);\n d = md5_ff(d, a, b, c, x[i+ 1], 12, -389564586);\n c = md5_ff(c, d, a, b, x[i+ 2], 17, 606105819);\n b = md5_ff(b, c, d, a, x[i+ 3], 22, -1044525330);\n a = md5_ff(a, b, c, d, x[i+ 4], 7 , -176418897);\n d = md5_ff(d, a, b, c, x[i+ 5], 12, 1200080426);\n c = md5_ff(c, d, a, b, x[i+ 6], 17, -1473231341);\n b = md5_ff(b, c, d, a, x[i+ 7], 22, -45705983);\n a = md5_ff(a, b, c, d, x[i+ 8], 7 , 1770035416);\n d = md5_ff(d, a, b, c, x[i+ 9], 12, -1958414417);\n c = md5_ff(c, d, a, b, x[i+10], 17, -42063);\n b = md5_ff(b, c, d, a, x[i+11], 22, -1990404162);\n a = md5_ff(a, b, c, d, x[i+12], 7 , 1804603682);\n d = md5_ff(d, a, b, c, x[i+13], 12, -40341101);\n c = md5_ff(c, d, a, b, x[i+14], 17, -1502002290);\n b = md5_ff(b, c, d, a, x[i+15], 22, 1236535329);\n\n a = md5_gg(a, b, c, d, x[i+ 1], 5 , -165796510);\n d = md5_gg(d, a, b, c, x[i+ 6], 9 , -1069501632);\n c = md5_gg(c, d, a, b, x[i+11], 14, 643717713);\n b = md5_gg(b, c, d, a, x[i+ 0], 20, -373897302);\n a = md5_gg(a, b, c, d, x[i+ 5], 5 , -701558691);\n d = md5_gg(d, a, b, c, x[i+10], 9 , 38016083);\n c = md5_gg(c, d, a, b, x[i+15], 14, -660478335);\n b = md5_gg(b, c, d, a, x[i+ 4], 20, -405537848);\n a = md5_gg(a, b, c, d, x[i+ 9], 5 , 568446438);\n d = md5_gg(d, a, b, c, x[i+14], 9 , -1019803690);\n c = md5_gg(c, d, a, b, x[i+ 3], 14, -187363961);\n b = md5_gg(b, c, d, a, x[i+ 8], 20, 1163531501);\n a = md5_gg(a, b, c, d, x[i+13], 5 , -1444681467);\n d = md5_gg(d, a, b, c, x[i+ 2], 9 , -51403784);\n c = md5_gg(c, d, a, b, x[i+ 7], 14, 1735328473);\n b = md5_gg(b, c, d, a, x[i+12], 20, -1926607734);\n\n a = md5_hh(a, b, c, d, x[i+ 5], 4 , -378558);\n d = md5_hh(d, a, b, c, x[i+ 8], 11, -2022574463);\n c = md5_hh(c, d, a, b, x[i+11], 16, 1839030562);\n b = md5_hh(b, c, d, a, x[i+14], 23, -35309556);\n a = md5_hh(a, b, c, d, x[i+ 1], 4 , -1530992060);\n d = md5_hh(d, a, b, c, x[i+ 4], 11, 1272893353);\n c = md5_hh(c, d, a, b, x[i+ 7]
|
||
|
|
||
|
def try_set_protocol(self):
|
||
|
protocols = ["http", "https"]
|
||
|
for protocol in protocols:
|
||
|
url = f"{protocol}://{self.ip}"
|
||
|
try:
|
||
|
response = requests.get(url, timeout=5, verify=False)
|
||
|
if response.ok:
|
||
|
self.protocol = protocol
|
||
|
# print(f"{self.ip} is accessible via {protocol}")
|
||
|
return
|
||
|
except RequestException:
|
||
|
pass # If RequestException occurs, try the next protocol
|
||
|
# print(f"Could not determine the protocol for {self.ip}")
|
||
|
|
||
|
def hash(self, str):
|
||
|
return hashlib.sha256(str.encode()).hexdigest()
|
||
|
|
||
|
def getVersion(self):
|
||
|
header = {"Referer": self.referer}
|
||
|
payload = "isTest=false&cmd=wa_inner_version"
|
||
|
r = s.get(self.referer + f"goform/goform_get_cmd_process?{payload}", headers=header, data=payload, verify=False)
|
||
|
return r.json()["wa_inner_version"]
|
||
|
|
||
|
def get_LD(self):
|
||
|
header = {"Referer": self.referer}
|
||
|
payload = "isTest=false&cmd=LD"
|
||
|
r = s.get(self.referer + f"goform/goform_get_cmd_process?{payload}", headers=header, data=payload, verify=False)
|
||
|
return r.json()["LD"].upper()
|
||
|
|
||
|
def getCookie(self, username, password, LD):
|
||
|
header = {"Referer": self.referer}
|
||
|
hashPassword = self.hash(password).upper()
|
||
|
ztePass = self.hash(hashPassword + LD).upper()
|
||
|
old_login = self.getVersion()
|
||
|
|
||
|
payload = {
|
||
|
'isTest': 'false',
|
||
|
'goformId': 'LOGIN' if username is not None and old_login in 'MC801' or 'MC7010' else 'LOGIN_MULTI_USER',
|
||
|
'password': ztePass
|
||
|
}
|
||
|
if username is not None and username != "":
|
||
|
payload['username'] = username
|
||
|
|
||
|
r = s.post(self.referer + "goform/goform_set_cmd_process", headers=header, data=payload, verify=False)
|
||
|
return "stok=" + r.cookies["stok"].strip('\"')
|
||
|
|
||
|
def get_RD(self):
|
||
|
header = {"Referer": self.referer}
|
||
|
payload = "isTest=false&cmd=RD"
|
||
|
r = s.post(self.referer + "goform/goform_get_cmd_process", headers=header, data=payload, verify=False)
|
||
|
return r.json()["RD"]
|
||
|
|
||
|
def zteinfo(self):
|
||
|
ip = self.ip
|
||
|
cookie = self.getCookie(username=self.username, password=self.password, LD=self.get_LD())
|
||
|
#cmd_url = f"{self.protocol}://{self.ip}/goform/goform_get_cmd_process?isTest=false&cmd=wa_inner_version%2Crmcc%2Crmnc%2Cenodeb_id%2Clte_rsrq%2Clte_rsrp%2CZ5g_snr%2CZ5g_rsrp%2CZCELLINFO_band%2CZ5g_dlEarfcn%2Clte_ca_pcell_band%2Clte_ca_scell_band%2Clte_ca_pcell_bandwidth%2Clte_ca_scell_bandwidth%2Cwan_lte_ca%2Clte_pci%2CZ5g_CELL_ID%2CZ5g_SINR%2Ccell_id%2Cwan_lte_ca%2Clte_ca_scell_band%2Clte_ca_scell_bandwidth%2Clte_ca_pcell_arfcn%2Clte_ca_scell_arfcn%2Clte_multi_ca_scell_info%2Cnr5g_pci%2Cnr5g_action_band%2Cnr5g_cell_id%2Clte_snr%2Cecio%2Cwan_active_channel%2Cnr5g_action_channel%2Cngbr_cell_info%2Cmonthly_tx_bytes%2Cmonthly_rx_bytes%2Clte_pci%2Clte_pci_lock%2Clte_earfcn_lock%2Cwan_ipaddr%2Cwan_apn%2Cpm_sensor_mdm%2Cpm_modem_5g%2Cnr5g_pci%2Cnr5g_action_channel%2Cnr5g_action_band%2CZ5g_SINR%2Cwan_active_channel%2Cwan_lte_ca%2Clte_multi_ca_scell_info%2Ccell_id%2Cloginfo%2Crealtime_time%2Csignalbar&multi_data=1"
|
||
|
#cmd_url = f"{self.protocol}://{self.ip}/goform/goform_get_cmd_process?multi_data=1&isTest=false&cmd=network_type%2Cwa_inner_version%2Clte_rssi%2Crscp%2Clte_rsrp%2CZ5g_snr%2CZ5g_rssi%2CZ5g_rsrp%2CZCELLINFO_band%2CZ5g_dlEarfcn%2Clte_ca_pcell_arfcn%2Clte_ca_pcell_band%2Clte_ca_scell_band%2Clte_ca_pcell_bandwidth%2Clte_ca_scell_info%2Clte_ca_scell_bandwidth%2Cwan_lte_ca%2Clte_pci%2CZ5g_CELL_ID%2CZ5g_SINR%2Ccell_id%2Cwan_lte_ca%2Clte_ca_pcell_band%2Clte_ca_pcell_bandwidth%2Clte_ca_scell_band%2Clte_ca_scell_bandwidth%2Clte_ca_pcell_arfcn%2Clte_ca_scell_arfcn%2Clte_multi_ca_scell_info%2Cwan_active_band%2Cnr5g_pci%2Cnr5g_action_band%2Cnr5g_cell_id%2Clte_snr%2Cecio%2Cwan_active_channel%2Cnr5g_action_channel%2Cmodem_main_state%2Cpin_status%2Copms_wan_mode%2Copms_wan_auto_mode%2Cloginfo%2Cnew_version_state%2Ccurrent_upgrade_state%2Cis_mandatory%2Cwifi_dfs_status%2Cbattery_value%2Cppp_dial_conn_fail_counter%2Cwifi_chip1_ssid1_auth_mode%2Cwifi_chip2_ssid1_auth_mode%2Csignalbar%2Cnetwork_type%2Cnetwork_provider%2Cppp_status%2Csimcard_roam%2Cspn_name_data%2Cspn_b1_flag%2Cspn_b2_flag%2Cwifi_onoff_state%2Cwifi_chip1_ssid1_ssid%2Cwifi_chip2_ssid1_ssid%2Cwan_lte_ca%2Cmonthly_tx_bytes%2Cmonthly_rx_bytes%2Cpppoe_status%2Cdhcp_wan_status%2Cstatic_wan_status%2Crmcc%2Crmnc%2Cmdm_mcc%2Cmdm_mnc%2CEX_SSID1%2Csta_ip_status%2CEX_wifi_profile%2Cm_ssid_enable%2CRadioOff%2Cwifi_chip1_ssid1_access_sta_num%2Cwifi_chip2_ssid1_access_sta_num%2Clan_ipaddr%2Cstation_mac%2Cwifi_access_sta_num%2Cbattery_charging%2Cbattery_vol_percent%2Cbattery_pers%2Crealtime_tx_bytes%2Crealtime_rx_bytes%2Crealtime_time%2Crealtime_tx_thrpt%2Crealtime_rx_thrpt%2Cmonthly_time%2Cdate_month%2Cdata_volume_limit_switch%2Cdata_volume_limit_size%2Cdata_volume_alert_percent%2Cdata_volume_limit_unit%2Croam_setting_option%2Cupg_roam_switch%2Cssid%2Cwifi_enable%2Cwifi_5g_enable%2Ccheck_web_conflict%2Cdial_mode%2Cprivacy_read_flag%2Cis_night_mode%2Cvpn_conn_status%2Cwan_connect_status%2Csms_received_flag%2Csts_received_flag%2Csms_unread_num%2Cwifi_chip1_ssid2_access_sta_num%2Cwifi_chip2_ssid2_access_sta_num&multi_data=1"
|
||
|
cmd_url = f"{self.protocol}://{self.ip}/goform/goform_get_cmd_process?multi_data=1&isTest=false&cmd=wa_inner_version%2Clte_rssi%2CZ5g_rssi%2Clte_rsrp%2CZ5g_rsrp%2Clte_rsrq%2CZ5g_rsrq%2Clte_snr%2CZ5g_SINR%2Csignalbar%2Ccell_id%2Cnr5g_action_band%2Ccell_id%2Cnetwork_provider%2Cmonthly_tx_bytes%2Cmonthly_rx_bytes%2Crealtime_tx_bytes%2Crealtime_rx_bytes%2Crealtime_tx_thrpt%2Crealtime_rx_thrpt%2Cwan_ipaddr%2Cwan_apn"
|
||
|
cookie_pass = cookie
|
||
|
|
||
|
headers = {
|
||
|
"Host": ip,
|
||
|
"Referer": f"{self.referer}index.html",
|
||
|
"Cookie": f"{cookie_pass}"
|
||
|
}
|
||
|
|
||
|
response = s.get(cmd_url, headers=headers, verify=False)
|
||
|
return response.text
|
||
|
|
||
|
zteInstance = zteRouter(args.ip, args.username, args.password)
|
||
|
serverPort = 8082
|
||
|
|
||
|
class serveInfos(BaseHTTPRequestHandler):
|
||
|
def do_GET(self):
|
||
|
gatheredJson = json.loads(zteInstance.zteinfo())
|
||
|
self.send_response(200)
|
||
|
self.send_header("Content-type", "text/plain")
|
||
|
self.end_headers()
|
||
|
|
||
|
#self.wfile.write(bytes("\n\n%s\n\n" % json.dumps(gatheredJson, indent=4), "utf-8"))
|
||
|
|
||
|
self.wfile.write(bytes("# HELP wa_inner_version Modem firmware version\n# TYPE wa_inner_version gauge\n", "utf-8"))
|
||
|
self.wfile.write(bytes("wa_inner_version %s\n" % gatheredJson["wa_inner_version"], "utf-8"))
|
||
|
|
||
|
self.wfile.write(bytes("# HELP cell_id enB Cell ID\n# TYPE cell_id gauge\n", "utf-8"))
|
||
|
self.wfile.write(bytes("cell_id %s\n" % gatheredJson["cell_id"], "utf-8"))
|
||
|
|
||
|
self.wfile.write(bytes("# HELP rsrq Reference Signal Received Quality\n# TYPE rsrq gauge\n", "utf-8"))
|
||
|
self.wfile.write(bytes("rsrq{type=lte} %s\n" % gatheredJson["lte_rsrq"], "utf-8"))
|
||
|
self.wfile.write(bytes("rsrq{type=5g} %s\n" % gatheredJson["Z5g_rsrq"], "utf-8"))
|
||
|
|
||
|
self.wfile.write(bytes("# HELP rsrp Reference Signal Received Power\n# TYPE rsrp gauge\n", "utf-8"))
|
||
|
self.wfile.write(bytes("rsrp{type=lte} %s\n" % gatheredJson["lte_rsrp"], "utf-8"))
|
||
|
self.wfile.write(bytes("rsrp{type=5g} %s\n" % gatheredJson["Z5g_rsrp"], "utf-8"))
|
||
|
|
||
|
self.wfile.write(bytes("# HELP snr Signal-to-Interference-plus-Noise Ratio\n# TYPE snr gauge\n", "utf-8"))
|
||
|
self.wfile.write(bytes("snr{type=lte} %s\n" % gatheredJson["lte_snr"], "utf-8"))
|
||
|
self.wfile.write(bytes("snr{type=5g} %s\n" % gatheredJson["Z5g_SINR"], "utf-8"))
|
||
|
|
||
|
self.wfile.write(bytes("# HELP wan_ip WAN IP\n# TYPE wan_ip gauge\n", "utf-8"))
|
||
|
self.wfile.write(bytes("wan_ip %s\n" % gatheredJson["wan_ipaddr"], "utf-8"))
|
||
|
|
||
|
self.wfile.write(bytes("# HELP wan_apn WAN APN\n# TYPE wan_apn gauge\n", "utf-8"))
|
||
|
self.wfile.write(bytes("wan_apn %s\n" % gatheredJson["wan_apn"], "utf-8"))
|
||
|
|
||
|
self.wfile.write(bytes("# HELP bandwidth_used_monthly Used bandwidth, transmit (bytes)\n# TYPE bandwidth_used_monthly_tx counter\n", "utf-8"))
|
||
|
self.wfile.write(bytes("bandwidth_used_monthly_tx %s\n" % gatheredJson["monthly_tx_bytes"], "utf-8"))
|
||
|
self.wfile.write(bytes("# HELP bandwidth_used_monthly Used bandwidth, receive (bytes)\n# TYPE bandwidth_used_monthly_rx counter\n", "utf-8"))
|
||
|
self.wfile.write(bytes("bandwidth_used_monthly_rx %s\n" % gatheredJson["monthly_rx_bytes"], "utf-8"))
|
||
|
|
||
|
self.wfile.write(bytes("# HELP bandwidth_used_session_tx Used bandwidth, current session, transmit (bytes)\n# TYPE bandwidth_used_session_tx counter\n", "utf-8"))
|
||
|
self.wfile.write(bytes("bandwidth_used_session_tx %s\n" % gatheredJson["realtime_tx_bytes"], "utf-8"))
|
||
|
self.wfile.write(bytes("# HELP bandwidth_used_session_rx Used bandwidth, current session, receive (bytes)\n# TYPE bandwidth_used_session_rx counter\n", "utf-8"))
|
||
|
self.wfile.write(bytes("bandwidth_used_session_rx %s\n" % gatheredJson["realtime_tx_bytes"], "utf-8"))
|
||
|
|
||
|
self.wfile.write(bytes("# HELP bandwidth_tx Current bandwidth, transmit (bytes)\n# TYPE bandwidth_tx gauge\n", "utf-8"))
|
||
|
self.wfile.write(bytes("bandwidth_tx %s\n" % gatheredJson["realtime_tx_thrpt"], "utf-8"))
|
||
|
self.wfile.write(bytes("# HELP bandwidth_rx Current bandwidth, receive (bytes)\n# TYPE bandwidth_rx gauge\n", "utf-8"))
|
||
|
self.wfile.write(bytes("bandwidth_rx %s\n" % gatheredJson["realtime_rx_thrpt"], "utf-8"))
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
webServer = HTTPServer(("0.0.0.0", serverPort), serveInfos)
|
||
|
print("Server started http://%s:%s" % ("0.0.0.0", serverPort))
|
||
|
webServer.serve_forever()
|
||
|
|
||
|
webServer.server_close()
|
||
|
print("Server stopped.")
|