195 lines
9.8 KiB
Python
195 lines
9.8 KiB
Python
#!/bin/python3
|
|
|
|
import requests
|
|
from requests.exceptions import RequestException
|
|
import hashlib
|
|
from datetime import datetime
|
|
import urllib.parse
|
|
import json
|
|
import sys
|
|
import time
|
|
import urllib3
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser("ZTE simple exporter")
|
|
parser.add_argument("port", help="Serve metrics on what port", type=int)
|
|
parser.add_argument("ip", help="Router ip address")
|
|
parser.add_argument("username", help="Router username", default="admin")
|
|
parser.add_argument("password", help="Router password")
|
|
parser.add_argument("--login", help="Login method (multi, single)", default="single")
|
|
args = parser.parse_args()
|
|
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
s = requests.Session()
|
|
|
|
class zteRouter:
|
|
|
|
def __init__(self, ip, username, password, login):
|
|
self.login = login
|
|
self.ip = ip
|
|
self.protocol = "http" # default to http
|
|
self.username = username
|
|
self.password = password
|
|
self.try_set_protocol()
|
|
self.referer = f"{self.protocol}://{self.ip}/"
|
|
|
|
def try_set_protocol(self):
|
|
protocols = ["http", "https"]
|
|
for protocol in protocols:
|
|
url = f"{protocol}://{self.ip}"
|
|
try:
|
|
response = requests.get(url, timeout=5, verify=False)
|
|
if response.ok:
|
|
self.protocol = protocol
|
|
# print(f"{self.ip} is accessible via {protocol}")
|
|
return
|
|
except RequestException:
|
|
pass # If RequestException occurs, try the next protocol
|
|
# print(f"Could not determine the protocol for {self.ip}")
|
|
|
|
def hash(self, str):
|
|
return hashlib.sha256(str.encode()).hexdigest()
|
|
|
|
def get_LD(self):
|
|
header = {"Referer": self.referer}
|
|
payload = "isTest=false&cmd=LD"
|
|
r = s.get(self.referer + f"goform/goform_get_cmd_process?{payload}&_=", headers=header, data=payload, verify=False)
|
|
return r.json()["LD"].upper()
|
|
|
|
def getCookie(self, username, password, LD, login):
|
|
header = {"Referer": self.referer}
|
|
hashPassword = self.hash(password).upper()
|
|
ztePass = self.hash(hashPassword + LD).upper()
|
|
|
|
if login == "multi":
|
|
payload = {
|
|
'isTest': 'false',
|
|
'goformId': 'LOGIN_MULTI_USER',
|
|
'password': ztePass,
|
|
'user': username
|
|
}
|
|
else:
|
|
payload = {
|
|
'isTest': 'false',
|
|
'goformId': 'LOGIN',
|
|
'password': ztePass,
|
|
}
|
|
|
|
r = s.post(self.referer + "goform/goform_set_cmd_process", headers=header, data=payload, verify=False)
|
|
return "stok=" + r.cookies["stok"].strip('\"')
|
|
|
|
def zteinfo(self):
|
|
ip = self.ip
|
|
cookie = self.getCookie(username=self.username, password=self.password, LD=self.get_LD(), login=self.login)
|
|
cmd_url = f"{self.protocol}://{self.ip}/goform/goform_get_cmd_process?multi_data=1&isTest=false&cmd=wa_inner_version%2Clte_rssi%2CZ5g_rssi%2Clte_rsrp%2CZ5g_rsrp%2Clte_rsrq%2CZ5g_rsrq%2Clte_snr%2CZ5g_SINR%2Csignalbar%2Ccell_id%2Cnr5g_action_band%2Ccell_id%2Cnetwork_provider%2Cmonthly_tx_bytes%2Cmonthly_rx_bytes%2Crealtime_tx_bytes%2Crealtime_rx_bytes%2Crealtime_tx_thrpt%2Crealtime_rx_thrpt%2Cwan_ipaddr%2Cwan_apn%2Clte_multi_ca_scell_sig_info%2Cnr_multi_ca_scell_info%2Cnr5g_action_band"
|
|
cookie_pass = cookie
|
|
|
|
headers = {
|
|
"Host": ip,
|
|
"Referer": f"{self.referer}index.html",
|
|
"Cookie": f"{cookie_pass}"
|
|
}
|
|
|
|
response = s.get(cmd_url, headers=headers, verify=False)
|
|
return response.text
|
|
|
|
def iterateCellInfo(self, str, type):
|
|
for f in str:
|
|
print (f)
|
|
|
|
|
|
zteInstance = zteRouter(args.ip, args.username, args.password, args.login)
|
|
serverPort = args.port
|
|
|
|
class serveInfos(BaseHTTPRequestHandler):
|
|
def do_GET(self):
|
|
gatheredJson = json.loads(zteInstance.zteinfo())
|
|
self.send_response(200)
|
|
self.send_header("Content-type", "text/plain")
|
|
self.end_headers()
|
|
|
|
#self.wfile.write(bytes("\n\n%s\n\n" % json.dumps(gatheredJson, indent=4), "utf-8"))
|
|
|
|
# for reference for now
|
|
self.wfile.write(bytes("# HELP zte_modem_info Modem general information\n# TYPE zte_modem_info untyped\n", "utf-8"))
|
|
self.wfile.write(bytes("zte_modem_info{firmware=\"%s\",ip_addr=\"" % gatheredJson["wa_inner_version"], "utf-8"))
|
|
self.wfile.write(bytes("%s\",apn=\"" % gatheredJson["wan_ipaddr"], "utf-8"))
|
|
self.wfile.write(bytes("%s\",cell_id=\"" % gatheredJson["wan_apn"], "utf-8"))
|
|
self.wfile.write(bytes("%s\"} 1\n\n" % gatheredJson["cell_id"], "utf-8"))
|
|
|
|
self.wfile.write(bytes("# HELP zte_rsrp Reference Signal Received Power\n# TYPE zte_rsrp gauge\n", "utf-8"))
|
|
self.wfile.write(bytes("# HELP zte_snr Signal-to-Interference-plus-Noise Ratio\n# TYPE zte_snr gauge\n", "utf-8"))
|
|
self.wfile.write(bytes("# HELP zte_rsrq Reference Signal Received Quality\n# TYPE zte_rsrq gauge\n", "utf-8"))
|
|
self.wfile.write(bytes("# HELP zte_rssi Received Signal Strength Indicator\n# TYPE zte_rssi gauge\n", "utf-8"))
|
|
|
|
if gatheredJson['nr_multi_ca_scell_info'] and gatheredJson['nr5g_action_band']:
|
|
self.wfile.write(bytes("zte_rsrq{band=\"%s\"} " % gatheredJson["nr5g_action_band"], "utf-8"))
|
|
self.wfile.write(bytes("%s\n" % gatheredJson["Z5g_rsrq"], "utf-8"))
|
|
|
|
self.wfile.write(bytes("zte_rsrp{band=\"%s\"} " % gatheredJson["nr5g_action_band"], "utf-8"))
|
|
self.wfile.write(bytes("%s\n" % gatheredJson["Z5g_rsrp"], "utf-8"))
|
|
|
|
self.wfile.write(bytes("zte_rssi{band=\"%s\"} " % gatheredJson["nr5g_action_band"], "utf-8"))
|
|
self.wfile.write(bytes("%s\n" % gatheredJson["Z5g_rssi"], "utf-8"))
|
|
|
|
self.wfile.write(bytes("zte_snr{band=\"%s\"} " % gatheredJson["nr5g_action_band"], "utf-8"))
|
|
self.wfile.write(bytes("%s\n\n" % gatheredJson["Z5g_SINR"], "utf-8"))
|
|
|
|
for index, item in enumerate(gatheredJson['nr_multi_ca_scell_info'].split(";")):
|
|
nrbit = item.split(",")
|
|
if not len(nrbit[0]) == 0:
|
|
self.wfile.write(bytes("zte_rsrq{band=\"%s\"} " % nrbit[3], "utf-8"))
|
|
self.wfile.write(bytes("%s\n" % nrbit[8], "utf-8"))
|
|
|
|
self.wfile.write(bytes("zte_rsrp{band=\"%s\"} " % nrbit[3], "utf-8"))
|
|
self.wfile.write(bytes("%s\n" % nrbit[7], "utf-8"))
|
|
|
|
self.wfile.write(bytes("zte_rssi{band=\"%s\"} " % nrbit[3], "utf-8"))
|
|
self.wfile.write(bytes("%s\n" % nrbit[10], "utf-8"))
|
|
|
|
self.wfile.write(bytes("zte_snr{band=\"%s\"} " % nrbit[3], "utf-8"))
|
|
self.wfile.write(bytes("%s\n\n" % nrbit[9], "utf-8"))
|
|
|
|
else:
|
|
if gatheredJson["lte_rsrq"]:
|
|
self.wfile.write(bytes("zte_rsrq{band=\"lte\"} %s\n" % gatheredJson["lte_rsrq"], "utf-8"))
|
|
if gatheredJson["Z5g_rsrq"]:
|
|
self.wfile.write(bytes("zte_rsrq{band=\"5g\"} %s\n" % gatheredJson["Z5g_rsrq"], "utf-8"))
|
|
if gatheredJson["lte_rsrp"]:
|
|
self.wfile.write(bytes("zte_rsrp{band=\"lte\"} %s\n" % gatheredJson["lte_rsrp"], "utf-8"))
|
|
if gatheredJson["Z5g_rssi"]:
|
|
self.wfile.write(bytes("zte_rsrp{band=\"5g\"} %s\n" % gatheredJson["Z5g_rssi"], "utf-8"))
|
|
if gatheredJson["lte_rssi"]:
|
|
self.wfile.write(bytes("zte_snr{band=\"lte\"} %s\n" % gatheredJson["lte_rssi"], "utf-8"))
|
|
if gatheredJson["Z5g_SINR"]:
|
|
self.wfile.write(bytes("zte_snr{band=\"5g\"} %s\n" % gatheredJson["Z5g_SINR"], "utf-8"))
|
|
if gatheredJson["lte_snr"]:
|
|
self.wfile.write(bytes("zte_snr{band=\"lte\"} %s\n" % gatheredJson["lte_snr"], "utf-8"))
|
|
if gatheredJson["Z5g_SINR"]:
|
|
self.wfile.write(bytes("zte_snr{band=\"5g\"} %s\n\n" % gatheredJson["Z5g_SINR"], "utf-8"))
|
|
|
|
self.wfile.write(bytes("# HELP zte_bandwidth_used_monthly_tx Used bandwidth, transmit (bytes)\n# TYPE zte_bandwidth_used_monthly_tx counter\n", "utf-8"))
|
|
self.wfile.write(bytes("zte_bandwidth_used_monthly_tx %s\n\n" % gatheredJson["monthly_tx_bytes"], "utf-8"))
|
|
self.wfile.write(bytes("# HELP zte_bandwidth_used_monthly_rx Used bandwidth, receive (bytes)\n# TYPE zte_bandwidth_used_monthly_rx counter\n", "utf-8"))
|
|
self.wfile.write(bytes("zte_bandwidth_used_monthly_rx %s\n\n" % gatheredJson["monthly_rx_bytes"], "utf-8"))
|
|
|
|
self.wfile.write(bytes("# HELP zte_bandwidth_used_session_tx Used bandwidth, current session, transmit (bytes)\n# TYPE zte_bandwidth_used_session_tx counter\n", "utf-8"))
|
|
self.wfile.write(bytes("zte_bandwidth_used_session_tx %s\n\n" % gatheredJson["realtime_tx_bytes"], "utf-8"))
|
|
self.wfile.write(bytes("# HELP zte_bandwidth_used_session_rx Used bandwidth, current session, receive (bytes)\n# TYPE zte_bandwidth_used_session_rx counter\n", "utf-8"))
|
|
self.wfile.write(bytes("zte_bandwidth_used_session_rx %s\n\n" % gatheredJson["realtime_tx_bytes"], "utf-8"))
|
|
|
|
self.wfile.write(bytes("# HELP zte_bandwidth_tx Current bandwidth, transmit (bytes)\n# TYPE zte_bandwidth_tx gauge\n", "utf-8"))
|
|
self.wfile.write(bytes("zte_bandwidth_tx %s\n\n" % gatheredJson["realtime_tx_thrpt"], "utf-8"))
|
|
self.wfile.write(bytes("# HELP bandwidth_rx Current bandwidth, receive (bytes)\n# TYPE zte_bandwidth_rx gauge\n", "utf-8"))
|
|
self.wfile.write(bytes("zte_bandwidth_rx %s\n\n" % gatheredJson["realtime_rx_thrpt"], "utf-8"))
|
|
|
|
if __name__ == "__main__":
|
|
webServer = HTTPServer(("0.0.0.0", serverPort), serveInfos)
|
|
print("Server started http://%s:%s" % ("0.0.0.0", serverPort))
|
|
webServer.serve_forever()
|
|
|
|
webServer.server_close()
|
|
print("Server stopped.")
|