zte-exporter/zte_exporter.py

160 lines
7.7 KiB
Python

#!/bin/python3
import requests
from requests.exceptions import RequestException
import hashlib
from datetime import datetime
import urllib.parse
import json
import sys
import time
import urllib3
from http.server import BaseHTTPRequestHandler, HTTPServer
import argparse
parser = argparse.ArgumentParser("ZTE simple exporter")
parser.add_argument("port", help="Serve metrics on what port", type=int)
parser.add_argument("ip", help="Router ip address")
parser.add_argument("username", help="Router username")
parser.add_argument("password", help="Router password")
args = parser.parse_args()
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
s = requests.Session()
class zteRouter:
def __init__(self, ip, username, password):
self.ip = ip
self.protocol = "http" # default to http
self.username = username
self.password = password
self.try_set_protocol()
self.referer = f"{self.protocol}://{self.ip}/"
def try_set_protocol(self):
protocols = ["http", "https"]
for protocol in protocols:
url = f"{protocol}://{self.ip}"
try:
response = requests.get(url, timeout=5, verify=False)
if response.ok:
self.protocol = protocol
# print(f"{self.ip} is accessible via {protocol}")
return
except RequestException:
pass # If RequestException occurs, try the next protocol
# print(f"Could not determine the protocol for {self.ip}")
def hash(self, str):
return hashlib.sha256(str.encode()).hexdigest()
def getVersion(self):
header = {"Referer": self.referer}
payload = "isTest=false&cmd=wa_inner_version"
r = s.get(self.referer + f"goform/goform_get_cmd_process?{payload}", headers=header, data=payload, verify=False)
return r.json()["wa_inner_version"]
def get_LD(self):
header = {"Referer": self.referer}
payload = "isTest=false&cmd=LD"
r = s.get(self.referer + f"goform/goform_get_cmd_process?{payload}", headers=header, data=payload, verify=False)
return r.json()["LD"].upper()
def getCookie(self, username, password, LD):
header = {"Referer": self.referer}
hashPassword = self.hash(password).upper()
ztePass = self.hash(hashPassword + LD).upper()
old_login = self.getVersion()
payload = {
'isTest': 'false',
'goformId': 'LOGIN' if username is not None and old_login in 'MC801' or 'MC7010' else 'LOGIN_MULTI_USER',
'password': ztePass
}
if username is not None and username != "":
payload['username'] = username
r = s.post(self.referer + "goform/goform_set_cmd_process", headers=header, data=payload, verify=False)
return "stok=" + r.cookies["stok"].strip('\"')
def get_RD(self):
header = {"Referer": self.referer}
payload = "isTest=false&cmd=RD"
r = s.post(self.referer + "goform/goform_get_cmd_process", headers=header, data=payload, verify=False)
return r.json()["RD"]
def zteinfo(self):
ip = self.ip
cookie = self.getCookie(username=self.username, password=self.password, LD=self.get_LD())
cmd_url = f"{self.protocol}://{self.ip}/goform/goform_get_cmd_process?multi_data=1&isTest=false&cmd=wa_inner_version%2Clte_rssi%2CZ5g_rssi%2Clte_rsrp%2CZ5g_rsrp%2Clte_rsrq%2CZ5g_rsrq%2Clte_snr%2CZ5g_SINR%2Csignalbar%2Ccell_id%2Cnr5g_action_band%2Ccell_id%2Cnetwork_provider%2Cmonthly_tx_bytes%2Cmonthly_rx_bytes%2Crealtime_tx_bytes%2Crealtime_rx_bytes%2Crealtime_tx_thrpt%2Crealtime_rx_thrpt%2Cwan_ipaddr%2Cwan_apn"
cookie_pass = cookie
headers = {
"Host": ip,
"Referer": f"{self.referer}index.html",
"Cookie": f"{cookie_pass}"
}
response = s.get(cmd_url, headers=headers, verify=False)
return response.text
zteInstance = zteRouter(args.ip, args.username, args.password)
serverPort = args.port
class serveInfos(BaseHTTPRequestHandler):
def do_GET(self):
gatheredJson = json.loads(zteInstance.zteinfo())
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.end_headers()
#self.wfile.write(bytes("\n\n%s\n\n" % json.dumps(gatheredJson, indent=4), "utf-8"))
self.wfile.write(bytes("# HELP wa_inner_version Modem firmware version\n# TYPE wa_inner_version gauge\n", "utf-8"))
self.wfile.write(bytes("wa_inner_version %s\n" % gatheredJson["wa_inner_version"], "utf-8"))
self.wfile.write(bytes("# HELP cell_id enB Cell ID\n# TYPE cell_id gauge\n", "utf-8"))
self.wfile.write(bytes("cell_id %s\n" % gatheredJson["cell_id"], "utf-8"))
self.wfile.write(bytes("# HELP rsrq Reference Signal Received Quality\n# TYPE rsrq gauge\n", "utf-8"))
self.wfile.write(bytes("rsrq{type=lte} %s\n" % gatheredJson["lte_rsrq"], "utf-8"))
self.wfile.write(bytes("rsrq{type=5g} %s\n" % gatheredJson["Z5g_rsrq"], "utf-8"))
self.wfile.write(bytes("# HELP rsrp Reference Signal Received Power\n# TYPE rsrp gauge\n", "utf-8"))
self.wfile.write(bytes("rsrp{type=lte} %s\n" % gatheredJson["lte_rsrp"], "utf-8"))
self.wfile.write(bytes("rsrp{type=5g} %s\n" % gatheredJson["Z5g_rsrp"], "utf-8"))
self.wfile.write(bytes("# HELP snr Signal-to-Interference-plus-Noise Ratio\n# TYPE snr gauge\n", "utf-8"))
self.wfile.write(bytes("snr{type=lte} %s\n" % gatheredJson["lte_snr"], "utf-8"))
self.wfile.write(bytes("snr{type=5g} %s\n" % gatheredJson["Z5g_SINR"], "utf-8"))
self.wfile.write(bytes("# HELP wan_ip WAN IP\n# TYPE wan_ip gauge\n", "utf-8"))
self.wfile.write(bytes("wan_ip %s\n" % gatheredJson["wan_ipaddr"], "utf-8"))
self.wfile.write(bytes("# HELP wan_apn WAN APN\n# TYPE wan_apn gauge\n", "utf-8"))
self.wfile.write(bytes("wan_apn %s\n" % gatheredJson["wan_apn"], "utf-8"))
self.wfile.write(bytes("# HELP bandwidth_used_monthly Used bandwidth, transmit (bytes)\n# TYPE bandwidth_used_monthly_tx counter\n", "utf-8"))
self.wfile.write(bytes("bandwidth_used_monthly_tx %s\n" % gatheredJson["monthly_tx_bytes"], "utf-8"))
self.wfile.write(bytes("# HELP bandwidth_used_monthly Used bandwidth, receive (bytes)\n# TYPE bandwidth_used_monthly_rx counter\n", "utf-8"))
self.wfile.write(bytes("bandwidth_used_monthly_rx %s\n" % gatheredJson["monthly_rx_bytes"], "utf-8"))
self.wfile.write(bytes("# HELP bandwidth_used_session_tx Used bandwidth, current session, transmit (bytes)\n# TYPE bandwidth_used_session_tx counter\n", "utf-8"))
self.wfile.write(bytes("bandwidth_used_session_tx %s\n" % gatheredJson["realtime_tx_bytes"], "utf-8"))
self.wfile.write(bytes("# HELP bandwidth_used_session_rx Used bandwidth, current session, receive (bytes)\n# TYPE bandwidth_used_session_rx counter\n", "utf-8"))
self.wfile.write(bytes("bandwidth_used_session_rx %s\n" % gatheredJson["realtime_tx_bytes"], "utf-8"))
self.wfile.write(bytes("# HELP bandwidth_tx Current bandwidth, transmit (bytes)\n# TYPE bandwidth_tx gauge\n", "utf-8"))
self.wfile.write(bytes("bandwidth_tx %s\n" % gatheredJson["realtime_tx_thrpt"], "utf-8"))
self.wfile.write(bytes("# HELP bandwidth_rx Current bandwidth, receive (bytes)\n# TYPE bandwidth_rx gauge\n", "utf-8"))
self.wfile.write(bytes("bandwidth_rx %s\n" % gatheredJson["realtime_rx_thrpt"], "utf-8"))
if __name__ == "__main__":
webServer = HTTPServer(("0.0.0.0", serverPort), serveInfos)
print("Server started http://%s:%s" % ("0.0.0.0", serverPort))
webServer.serve_forever()
webServer.server_close()
print("Server stopped.")