lokinet/contrib/py/admin/lokinetmon
Jeff Becker 6b115913bc
lokinetmon updates
* add country flags to lokinetmon
* expose hop ip addresses via rpc introspection for geoip in lokinetmon
2021-05-01 08:44:37 -04:00

397 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
import curses
import json
import sys
import time
import zmq
geo = None
try:
import GeoIP
geo = GeoIP.open("/usr/share/GeoIP/GeoIP.dat", GeoIP.GEOIP_STANDARD)
except Exception as ex:
print('no geoip: {}'.format(ex))
time.sleep(1)
def ip_to_flag(ip):
"""
convert an ip to a flag emoji
"""
# bail if no geoip available
if not geo:
return ''
# trim off excess ipv6 jizz
ip = ip.replace("::ffff:", "")
# get the country code
cc = geo.country_code_by_addr(ip)
# Unicode flag sequences are just country codes transposed into the REGIONAL
# INDICATOR SYMBOL LETTER A ... Z range (U+1F1E6 ... U+1F1FF):
flag = ''.join(chr(0x1f1e6 + ord(i) - ord('A')) for i in cc)
return '({}) {}'.format(cc, flag)
class Monitor:
_sample_size = 12
def __init__(self, url):
self.txrate = 0
self.rxrate = 0
self.data = dict()
self.win = curses.initscr()
curses.start_color()
curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK)
self._rpc_context = zmq.Context()
self._rpc_socket = self._rpc_context.socket(zmq.DEALER)
self._rpc_socket.setsockopt(zmq.CONNECT_TIMEOUT, 5000)
self._rpc_socket.setsockopt(zmq.HANDSHAKE_IVL, 5000)
self._rpc_socket.connect(url)
self._speed_samples = [(0,0,0,0)] * self._sample_size
self._run = True
def rpc(self, method):
self._rpc_socket.send_multipart([method.encode(), b'lokinetmon'+method.encode()])
if not self._rpc_socket.poll(timeout=50):
return
reply = self._rpc_socket.recv_multipart()
if len(reply) >= 3 and reply[0:2] == [b'REPLY', b'lokinetmon'+method.encode()]:
return reply[2].decode()
def _close(self):
self._rpc_socket.close(linger=0)
self._run = False
curses.endwin()
def update_data(self):
"""update data from lokinet"""
try:
data = json.loads(self.rpc("llarp.status"))
self.data = data['result']
except:
self.data = None
return self.data is not None and self._run
def _render_path(self, y_pos, path, name):
"""render a path at current position"""
self.win.move(y_pos, 1)
self.win.addstr("({}) ".format(name))
y_pos += 1
self.win.move(y_pos, 1)
y_pos += 1
self.win.addstr("[tx:\t{}]\t[rx:\t{}]".format(
self.speed_of(path['txRateCurrent']), self.speed_of(path['rxRateCurrent'])))
self.win.move(y_pos, 1)
y_pos += 1
self.win.addstr("me -> ")
for hop in path["hops"]:
hopstr = hop['router'][:4]
if 'ip' in hop:
hopstr += ' {}'.format(ip_to_flag(hop['ip']))
self.win.addstr(" {} ->".format(hopstr))
self.win.addstr(" [{} ms latency]".format(path["intro"]["latency"]))
self.win.addstr(" [{} until expire]".format(self.time_to(path["expiresAt"])))
if path["expiresSoon"]:
self.win.addstr("(expiring)")
elif path["expired"]:
self.win.addstr("(expired)")
return y_pos
@staticmethod
def time_to(timestamp):
""" return time until timestamp in seconds formatted"""
now = time.time() * 1000
return "{} seconds".format(int((timestamp - now) / 1000))
@staticmethod
def speed_of(rate):
"""turn int speed into string formatted"""
units = ["b", "Kb", "Mb", "Gb"]
idx = 0
rate *= 8
while rate > 1000 and idx < len(units):
rate /= 1000.0
idx += 1
return "{} {}ps".format("%.2f" % rate, units[idx])
def get_all_paths(self):
""" yield all paths in current data """
for key in self.data['services']:
status = self.data['services'][key]
for path in (status['paths'] or []):
yield path
for sess in (status['remoteSessions'] or []):
for path in sess['paths']:
yield path
for sess in (status['snodeSessions'] or []):
for path in sess['paths']:
yield path
def display_service(self, y_pos, name, status):
"""display a service at current position"""
self.win.move(y_pos, 1)
self.win.addstr("service [{}]".format(name))
build = status["buildStats"]
ratio = build["success"] / (build["attempts"] or 1)
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr("build success: {} %".format(int(100 * ratio)))
y_pos += 1
self.win.move(y_pos, 1)
paths = status["paths"]
self.win.addstr("paths: {}".format(len(paths)))
for path in paths:
y_pos = self._render_path(y_pos, path, "inbound")
for session in (status["remoteSessions"] or []):
for path in session["paths"]:
y_pos = self._render_path(
y_pos, path, "[active] {}".format(session["currentConvoTag"])
)
for session in (status["snodeSessions"] or []):
for path in session["paths"]:
y_pos = self._render_path(y_pos, path, "[snode]")
return y_pos
def display_links(self, y_pos, data):
""" display links section """
self.txrate = 0
self.rxrate = 0
for link in data["outbound"]:
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr("outbound sessions:")
y_pos = self.display_link(y_pos, link)
for link in data["inbound"]:
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr("inbound sessions:")
y_pos = self.display_link(y_pos, link)
y_pos += 2
self.win.move(y_pos, 1)
self.win.addstr(
"throughput:\t\t[{}\ttx]\t[{}\trx]".format(
self.speed_of(self.txrate), self.speed_of(self.rxrate)
)
)
bloat_tx, bloat_rx = self.calculate_bloat(self.data['links']['outbound'])
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr("goodput:\t\t[{}\ttx]\t[{}\trx]".format(
self.speed_of(self.txrate-bloat_tx), self.speed_of(self.rxrate-bloat_rx)))
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr("overhead:\t\t[{}\ttx]\t[{}\trx]".format(
self.speed_of(bloat_tx), self.speed_of(bloat_rx)))
self._speed_samples.append((self.txrate, self.rxrate, bloat_tx, bloat_rx))
while len(self._speed_samples) > self._sample_size:
self._speed_samples.pop(0)
return self.display_speedgraph(y_pos + 2)
@staticmethod
def _scale(_x, _n):
while _n > 0:
_x /= 2
_n -= 1
return int(_x)
@staticmethod
def _makebar(samp, badsamp, maxsamp):
barstr = "#" * (samp - badsamp)
pad = " " * (maxsamp - samp)
return pad, barstr, '#' * badsamp
def display_speedgraph(self, y_pos, maxsz=40):
""" display global speed graph """
txmax, rxmax = 1024, 1024
for _tx, _rx, b_tx, b_rx in self._speed_samples:
if _tx > txmax:
txmax = _tx
if _rx > rxmax:
rxmax = _rx
rxscale = 0
while rxmax > maxsz:
rxscale += 1
rxmax /= 2
txscale = 0
while txmax > maxsz:
txscale += 1
txmax /= 2
txlabelpad = int(txmax / 2)
rxlabelpad = int(rxmax / 2)
if txlabelpad <= 0:
txlabelpad = 1
if rxlabelpad <= 0:
rxlabelpad = 1
txlabelpad_str = " " * txlabelpad
rxlabelpad_str = " " * rxlabelpad
y_pos += 1
self.win.move(y_pos, 1)
for val in [txlabelpad_str, 'tx', txlabelpad_str, rxlabelpad_str, 'rx', rxlabelpad_str]:
self.win.addstr(val)
for _tx, _rx, b_tx, b_rx in self._speed_samples:
y_pos += 1
self.win.move(y_pos, 1)
txpad, txbar, btxbar = self._makebar(self._scale(_tx, txscale), self._scale(b_tx, txscale), int(txmax))
rxpad, rxbar, brxbar = self._makebar(self._scale(_rx, rxscale), self._scale(b_rx, rxscale), int(rxmax))
self.win.addstr(txpad)
self.win.addstr(btxbar, curses.color_pair(1))
self.win.addstr(txbar)
self.win.addstr('|')
self.win.addstr(rxbar)
self.win.addstr(brxbar, curses.color_pair(1))
self.win.addstr(rxpad)
return y_pos + 2
def calculate_bloat(self, links):
"""
calculate bandwith overhead
"""
paths = self.get_all_paths()
lltx = 0
llrx = 0
_tx = 0
_rx = 0
for link in links:
sessions = link["sessions"]["established"]
for sess in sessions:
lltx += sess['tx']
llrx += sess['rx']
for path in paths:
_tx += path['txRateCurrent']
_rx += path['rxRateCurrent']
lltx -= _tx
llrx -= _rx
if lltx < 0:
lltx = 0
if llrx < 0:
llrx = 0
return lltx, llrx
def display_link(self, y_pos, link):
""" display links """
y_pos += 1
self.win.move(y_pos, 1)
sessions = link["sessions"]["established"] or []
for sess in sessions:
y_pos = self.display_link_session(y_pos, sess)
return y_pos
def display_link_session(self, y_pos, sess):
""" display link sessions """
y_pos += 1
self.win.move(y_pos, 1)
self.txrate += sess["txRateCurrent"]
self.rxrate += sess["rxRateCurrent"]
addr = sess['remoteAddr']
if geo:
ip = addr.split(':')[0]
addr += '\t{}'.format(ip_to_flag(ip))
self.win.addstr(
"{}\t[{}\ttx]\t[{}\trx]".format(
addr, self.speed_of(sess["txRateCurrent"]), self.speed_of(sess["rxRateCurrent"])
)
)
if (sess['txMsgQueueSize'] or 0) > 1:
self.win.addstr(" [out window: {}]".format(sess['txMsgQueueSize']))
if (sess['rxMsgQueueSize'] or 0) > 1:
self.win.addstr(" [in window: {}]".format(sess['rxMsgQueueSize']))
def display(acks, label, num='acks', dem='packets'):
if acks[dem] > 0:
self.win.addstr(" [{}: {}]".format(label, round(float(acks[num]) / float(acks[dem]), 2)))
if ('recvMACKs' in sess) and ('sendMACKs' in sess):
display(sess['sendMACKs'], 'out MACK density')
display(sess['recvMACKs'], 'in MACK density')
dats = {'recvAcks': 'in acks',
'sendAcks': 'out acks',
'recvRTX': 'in RTX',
'sendRTX': 'out RTX'}
for key in dats:
val = dats[key]
if (key in sess) and (sess[key] > 0):
self.win.addstr(" [{}: {}]".format(val, sess[key]))
return y_pos
def display_dht(self, y_pos, data):
""" display dht window """
y_pos += 2
self.win.move(y_pos, 1)
self.win.addstr("DHT:")
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr("introset lookups")
y_pos = self.display_bucket(y_pos, data["pendingIntrosetLookups"])
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr("router lookups")
return self.display_bucket(y_pos, data["pendingRouterLookups"])
def display_bucket(self, y_pos, data):
""" display dht bucket """
txs = data["tx"]
self.win.addstr(" ({} lookups)".format(len(txs)))
for transaction in txs:
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr("search for {}".format(transaction["tx"]["target"]))
return y_pos
def display_data(self):
""" draw main window """
if self.data is not None:
if self.version:
self.win.addstr(1, 1, self.version)
services = self.data["services"] or {}
y_pos = 3
try:
y_pos = self.display_links(y_pos, self.data["links"])
for key in services:
y_pos = self.display_service(y_pos, key, services[key])
y_pos = self.display_dht(y_pos, self.data["dht"])
except:
pass
else:
self.win.move(1, 1)
self.win.addstr("lokinet offline")
def run(self):
""" run mainloop """
try:
self.version = json.loads(self.rpc("llarp.version"))['result']['version']
except:
self.version = None
while self._run:
if self.update_data():
self.win.box()
self.display_data()
elif self._run:
self.win.addstr(1, 1, "offline")
else:
self._close()
return
self.win.refresh()
try:
time.sleep(1)
except:
self._close()
return
self.win.clear()
def main():
""" main function """
mon = Monitor(
len(sys.argv) > 1 and sys.argv[1] or "tcp://127.0.0.1:1190"
)
mon.run()
if __name__ == "__main__":
main()