diff --git a/llarp/link/utp.cpp b/llarp/link/utp.cpp index f6cbc5f87..3ff337c63 100644 --- a/llarp/link/utp.cpp +++ b/llarp/link/utp.cpp @@ -25,6 +25,8 @@ #include +#include + namespace llarp { namespace utp @@ -85,6 +87,8 @@ namespace llarp ssize_t s = utp_writev(sock, vecs.data(), vecs.size()); if(s < 0) return; + METRICS_DYNAMIC_INT_UPDATE( + "utp.session.tx", RouterID(remoteRC.pubkey).ToString().c_str(), s); m_TXRate += s; size_t sz = s; while(vecq.size() && sz >= vecq.front().iov_len) @@ -178,6 +182,9 @@ namespace llarp PruneInboundMessages(now); m_TXRate = 0; m_RXRate = 0; + METRICS_DYNAMIC_UPDATE("utp.session.sendq", + RouterID(remoteRC.pubkey).ToString().c_str(), + sendq.size()); } /// low level read @@ -188,6 +195,8 @@ namespace llarp Alive(); m_RXRate += sz; size_t s = sz; + METRICS_DYNAMIC_INT_UPDATE( + "utp.session.rx", RouterID(remoteRC.pubkey).ToString().c_str(), s); // process leftovers if(recvBufOffset) { diff --git a/llarp/link/utp_internal.hpp b/llarp/link/utp_internal.hpp index 9662d74ee..8671dac59 100644 --- a/llarp/link/utp_internal.hpp +++ b/llarp/link/utp_internal.hpp @@ -42,7 +42,7 @@ namespace llarp using FragmentBuffer = llarp::AlignedBuffer< FragmentBufferSize >; /// maximum size for send queue for a session before we drop - constexpr size_t MaxSendQueueSize = 1024; + constexpr size_t MaxSendQueueSize = 64; /// buffer for a link layer message using MessageBuffer = llarp::AlignedBuffer< MAX_LINK_MSG_SIZE >; diff --git a/llarp/messages/dht_immediate.hpp b/llarp/messages/dht_immediate.hpp index 9802eecb2..c6142a0b0 100644 --- a/llarp/messages/dht_immediate.hpp +++ b/llarp/messages/dht_immediate.hpp @@ -29,6 +29,12 @@ namespace llarp void Clear() override; + + const char* + Name() const override + { + return "DHTImmediate"; + } }; } // namespace llarp diff --git a/llarp/messages/discard.hpp b/llarp/messages/discard.hpp index eeac2e862..fd755ca85 100644 --- a/llarp/messages/discard.hpp +++ b/llarp/messages/discard.hpp @@ -31,6 +31,12 @@ namespace llarp { } + const char* + Name() const override + { + return "Discard"; + } + bool DecodeKey(__attribute__((unused)) const llarp_buffer_t& key, __attribute__((unused)) llarp_buffer_t* buf) override diff --git a/llarp/messages/link_intro.hpp b/llarp/messages/link_intro.hpp index 75c63c43e..83538761f 100644 --- a/llarp/messages/link_intro.hpp +++ b/llarp/messages/link_intro.hpp @@ -44,6 +44,12 @@ namespace llarp void Clear() override; + + const char* + Name() const override + { + return "LinkIntro"; + } }; } // namespace llarp diff --git a/llarp/messages/link_message.cpp b/llarp/messages/link_message.cpp index d9a49d7b8..c132dcb33 100644 --- a/llarp/messages/link_message.cpp +++ b/llarp/messages/link_message.cpp @@ -9,6 +9,7 @@ #include #include #include +#include namespace llarp { @@ -67,10 +68,12 @@ namespace llarp } // create the message to parse based off message type llarp::LogDebug("inbound message ", *strbuf.cur); + bool isLIM = false; switch(*strbuf.cur) { case 'i': handler->msg = &handler->holder->i; + isLIM = true; break; case 'd': handler->msg = &handler->holder->d; @@ -90,6 +93,14 @@ namespace llarp default: return false; } + + if(!isLIM) + { + const std::string host = + "RX_" + RouterID(handler->from->GetPubKey()).ToString(); + METRICS_DYNAMIC_INCREMENT(handler->msg->Name(), host.c_str()); + } + handler->msg->session = handler->from; handler->firstkey = false; return true; diff --git a/llarp/messages/link_message.hpp b/llarp/messages/link_message.hpp index 6edd91b6c..256a59d82 100644 --- a/llarp/messages/link_message.hpp +++ b/llarp/messages/link_message.hpp @@ -30,6 +30,10 @@ namespace llarp virtual void Clear() = 0; + + // the name of this kind of message + virtual const char* + Name() const = 0; }; } // namespace llarp diff --git a/llarp/messages/relay.hpp b/llarp/messages/relay.hpp index 7b4b10be3..e64975c34 100644 --- a/llarp/messages/relay.hpp +++ b/llarp/messages/relay.hpp @@ -31,6 +31,12 @@ namespace llarp void Clear() override; + const char* + + Name() const override + { + return "RelayUpstream"; + } }; struct RelayDownstreamMessage : public ILinkMessage @@ -53,6 +59,12 @@ namespace llarp void Clear() override; + + const char* + Name() const override + { + return "RelayDownstream"; + } }; } // namespace llarp diff --git a/llarp/messages/relay_commit.hpp b/llarp/messages/relay_commit.hpp index d68fce51e..72180b1b1 100644 --- a/llarp/messages/relay_commit.hpp +++ b/llarp/messages/relay_commit.hpp @@ -68,6 +68,12 @@ namespace llarp bool AsyncDecrypt(llarp::path::PathContext *context) const; + + const char * + Name() const override + { + return "RelayCommit"; + } }; } // namespace llarp diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 0a0f4fc5a..4f11f40ba 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -166,6 +166,8 @@ namespace llarp const RouterID us = pubkey(); if(remote.pubkey == us) return false; + if(!ConnectionToRouterAllowed(remote.pubkey)) + return false; // do we already have a pending job for this remote? if(HasPendingConnectJob(remote.pubkey)) { @@ -628,6 +630,7 @@ namespace llarp const RouterID us = pubkey(); if(us == remote) return; + if(!ConnectionToRouterAllowed(remote)) { LogWarn("not connecting to ", remote, " as it's not permitted by config"); @@ -1145,7 +1148,8 @@ namespace llarp void Router::SendTo(RouterID remote, const ILinkMessage *msg, ILinkLayer *selected) { - METRICS_TIME_BLOCK("RouterSendTo", remote.ToString().c_str()); + const std::string remoteName = "TX_" + remote.ToString(); + METRICS_DYNAMIC_INCREMENT(msg->Name(), remoteName.c_str()); llarp_buffer_t buf(linkmsg_buffer); if(!msg->BEncode(&buf)) @@ -1264,6 +1268,11 @@ namespace llarp { if(rc.IsPublicRouter() && whitelistRouters && IsServiceNode()) { + if(lokinetRouters.size() == 0) + { + LogError("we have no service nodes in whitelist"); + return false; + } if(lokinetRouters.find(rc.pubkey) == lokinetRouters.end()) { RouterID sn(rc.pubkey); diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index d603a2c97..806b4e7f6 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -1175,12 +1175,10 @@ namespace llarp , currentIntroSet(introset) { - auto& profiling = parent->m_Router->routerProfiling(); updatingIntroSet = false; for(const auto intro : introset.I) { - if(intro.expiresAt > m_NextIntro.expiresAt - && !profiling.IsBad(intro.router)) + if(intro.expiresAt > m_NextIntro.expiresAt) { m_NextIntro = intro; remoteIntro = intro; @@ -1489,7 +1487,7 @@ namespace llarp } bool - Endpoint::OutboundContext::ShiftIntroduction() + Endpoint::OutboundContext::ShiftIntroduction(bool rebuild) { bool success = false; auto now = Now(); @@ -1524,7 +1522,7 @@ namespace llarp break; } } - if(shifted) + if(shifted && rebuild) { lastShift = now; BuildOneAlignedTo(m_NextIntro.router); @@ -1848,13 +1846,25 @@ namespace llarp llarp::path::PathRole roles) { if(m_NextIntro.router.IsZero()) + { + llarp::LogError("intro is not set, cannot select hops"); return false; + } if(hop == numHops - 1) { if(db->Get(m_NextIntro.router, cur)) { return true; } + else if(router->routerProfiling().IsBad(m_NextIntro.router)) + { + if(!ShiftIntroduction(false)) + { + llarp::LogError("bad intro chosen, not selecting hop"); + return false; + } + return db->Get(m_NextIntro.router, cur); + } else { // we don't have it? diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index b7afeb853..3eef971a4 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -239,8 +239,9 @@ namespace llarp bool markedBad = false; virtual bool - ShiftIntroduction() + ShiftIntroduction(bool rebuild = true) { + (void)rebuild; return true; }; @@ -288,7 +289,7 @@ namespace llarp /// update the current selected intro to be a new best introduction /// return true if we have changed intros bool - ShiftIntroduction() override; + ShiftIntroduction(bool rebuild = true) override; /// mark the current remote intro as bad bool