diff --git a/llarp/link/server.cpp b/llarp/link/server.cpp
index 2e5f901f6..93359a20f 100644
--- a/llarp/link/server.cpp
+++ b/llarp/link/server.cpp
@@ -382,10 +382,10 @@ namespace llarp
bool
ILinkLayer::PutSession(ILinkSession* s)
{
+ static constexpr size_t MaxSessionsPerEndpoint = 5;
Lock lock(&m_PendingMutex);
llarp::Addr addr = s->GetRemoteEndpoint();
- auto itr = m_Pending.find(addr);
- if(itr != m_Pending.end())
+ if(m_Pending.count(addr) >= MaxSessionsPerEndpoint)
return false;
m_Pending.emplace(addr, std::unique_ptr< ILinkSession >(s));
return true;
diff --git a/llarp/link/server.hpp b/llarp/link/server.hpp
index b968e1e3a..134a2d6bc 100644
--- a/llarp/link/server.hpp
+++ b/llarp/link/server.hpp
@@ -240,8 +240,8 @@ namespace llarp
m_AuthedLinks GUARDED_BY(m_AuthedLinksMutex);
Mutex m_PendingMutex
ACQUIRED_AFTER(m_AuthedLinksMutex); // protects m_Pending
- std::unordered_map< llarp::Addr, std::unique_ptr< ILinkSession >,
- llarp::Addr::Hash >
+ std::unordered_multimap< llarp::Addr, std::unique_ptr< ILinkSession >,
+ llarp::Addr::Hash >
m_Pending GUARDED_BY(m_PendingMutex);
};
} // namespace llarp
diff --git a/llarp/link/utp.cpp b/llarp/link/utp.cpp
index 3f42ba3a4..3ff337c63 100644
--- a/llarp/link/utp.cpp
+++ b/llarp/link/utp.cpp
@@ -25,6 +25,8 @@
#include
+#include
+
namespace llarp
{
namespace utp
@@ -85,6 +87,8 @@ namespace llarp
ssize_t s = utp_writev(sock, vecs.data(), vecs.size());
if(s < 0)
return;
+ METRICS_DYNAMIC_INT_UPDATE(
+ "utp.session.tx", RouterID(remoteRC.pubkey).ToString().c_str(), s);
m_TXRate += s;
size_t sz = s;
while(vecq.size() && sz >= vecq.front().iov_len)
@@ -178,6 +182,9 @@ namespace llarp
PruneInboundMessages(now);
m_TXRate = 0;
m_RXRate = 0;
+ METRICS_DYNAMIC_UPDATE("utp.session.sendq",
+ RouterID(remoteRC.pubkey).ToString().c_str(),
+ sendq.size());
}
/// low level read
@@ -188,6 +195,8 @@ namespace llarp
Alive();
m_RXRate += sz;
size_t s = sz;
+ METRICS_DYNAMIC_INT_UPDATE(
+ "utp.session.rx", RouterID(remoteRC.pubkey).ToString().c_str(), s);
// process leftovers
if(recvBufOffset)
{
@@ -228,6 +237,8 @@ namespace llarp
bool
Session::IsTimedOut(llarp_time_t now) const
{
+ if(state == eInitial)
+ return false;
if(sendq.size() >= MaxSendQueueSize)
{
return now - lastActive > 5000;
@@ -346,11 +357,11 @@ namespace llarp
if(session && link)
{
- link->HandleTimeout(session);
- llarp::LogError(utp_error_code_names[arg->error_code], " via ",
- session->remoteAddr);
if(arg->error_code == UTP_ETIMEDOUT)
+ {
+ link->HandleTimeout(session);
utp_close(arg->socket);
+ }
else
session->Close();
}
@@ -577,6 +588,7 @@ namespace llarp
/// base constructor
Session::Session(LinkLayer* p)
{
+ state = eInitial;
m_NextTXMsgID = 0;
m_NextRXMsgID = 0;
parent = p;
diff --git a/llarp/link/utp_internal.hpp b/llarp/link/utp_internal.hpp
index 9662d74ee..8671dac59 100644
--- a/llarp/link/utp_internal.hpp
+++ b/llarp/link/utp_internal.hpp
@@ -42,7 +42,7 @@ namespace llarp
using FragmentBuffer = llarp::AlignedBuffer< FragmentBufferSize >;
/// maximum size for send queue for a session before we drop
- constexpr size_t MaxSendQueueSize = 1024;
+ constexpr size_t MaxSendQueueSize = 64;
/// buffer for a link layer message
using MessageBuffer = llarp::AlignedBuffer< MAX_LINK_MSG_SIZE >;
diff --git a/llarp/messages/dht_immediate.hpp b/llarp/messages/dht_immediate.hpp
index 9802eecb2..c6142a0b0 100644
--- a/llarp/messages/dht_immediate.hpp
+++ b/llarp/messages/dht_immediate.hpp
@@ -29,6 +29,12 @@ namespace llarp
void
Clear() override;
+
+ const char*
+ Name() const override
+ {
+ return "DHTImmediate";
+ }
};
} // namespace llarp
diff --git a/llarp/messages/discard.hpp b/llarp/messages/discard.hpp
index eeac2e862..fd755ca85 100644
--- a/llarp/messages/discard.hpp
+++ b/llarp/messages/discard.hpp
@@ -31,6 +31,12 @@ namespace llarp
{
}
+ const char*
+ Name() const override
+ {
+ return "Discard";
+ }
+
bool
DecodeKey(__attribute__((unused)) const llarp_buffer_t& key,
__attribute__((unused)) llarp_buffer_t* buf) override
diff --git a/llarp/messages/link_intro.hpp b/llarp/messages/link_intro.hpp
index 75c63c43e..83538761f 100644
--- a/llarp/messages/link_intro.hpp
+++ b/llarp/messages/link_intro.hpp
@@ -44,6 +44,12 @@ namespace llarp
void
Clear() override;
+
+ const char*
+ Name() const override
+ {
+ return "LinkIntro";
+ }
};
} // namespace llarp
diff --git a/llarp/messages/link_message.cpp b/llarp/messages/link_message.cpp
index d9a49d7b8..c132dcb33 100644
--- a/llarp/messages/link_message.cpp
+++ b/llarp/messages/link_message.cpp
@@ -9,6 +9,7 @@
#include
#include
#include
+#include
namespace llarp
{
@@ -67,10 +68,12 @@ namespace llarp
}
// create the message to parse based off message type
llarp::LogDebug("inbound message ", *strbuf.cur);
+ bool isLIM = false;
switch(*strbuf.cur)
{
case 'i':
handler->msg = &handler->holder->i;
+ isLIM = true;
break;
case 'd':
handler->msg = &handler->holder->d;
@@ -90,6 +93,14 @@ namespace llarp
default:
return false;
}
+
+ if(!isLIM)
+ {
+ const std::string host =
+ "RX_" + RouterID(handler->from->GetPubKey()).ToString();
+ METRICS_DYNAMIC_INCREMENT(handler->msg->Name(), host.c_str());
+ }
+
handler->msg->session = handler->from;
handler->firstkey = false;
return true;
diff --git a/llarp/messages/link_message.hpp b/llarp/messages/link_message.hpp
index 6edd91b6c..256a59d82 100644
--- a/llarp/messages/link_message.hpp
+++ b/llarp/messages/link_message.hpp
@@ -30,6 +30,10 @@ namespace llarp
virtual void
Clear() = 0;
+
+ // the name of this kind of message
+ virtual const char*
+ Name() const = 0;
};
} // namespace llarp
diff --git a/llarp/messages/relay.hpp b/llarp/messages/relay.hpp
index 7b4b10be3..e64975c34 100644
--- a/llarp/messages/relay.hpp
+++ b/llarp/messages/relay.hpp
@@ -31,6 +31,12 @@ namespace llarp
void
Clear() override;
+ const char*
+
+ Name() const override
+ {
+ return "RelayUpstream";
+ }
};
struct RelayDownstreamMessage : public ILinkMessage
@@ -53,6 +59,12 @@ namespace llarp
void
Clear() override;
+
+ const char*
+ Name() const override
+ {
+ return "RelayDownstream";
+ }
};
} // namespace llarp
diff --git a/llarp/messages/relay_commit.hpp b/llarp/messages/relay_commit.hpp
index d68fce51e..72180b1b1 100644
--- a/llarp/messages/relay_commit.hpp
+++ b/llarp/messages/relay_commit.hpp
@@ -68,6 +68,12 @@ namespace llarp
bool
AsyncDecrypt(llarp::path::PathContext *context) const;
+
+ const char *
+ Name() const override
+ {
+ return "RelayCommit";
+ }
};
} // namespace llarp
diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp
index 5ae844f4f..4f11f40ba 100644
--- a/llarp/router/router.cpp
+++ b/llarp/router/router.cpp
@@ -63,6 +63,7 @@ struct TryConnectJob
void
Success()
{
+ router->routerProfiling().MarkSuccess(rc.pubkey);
router->FlushOutboundFor(rc.pubkey, link);
}
@@ -165,6 +166,8 @@ namespace llarp
const RouterID us = pubkey();
if(remote.pubkey == us)
return false;
+ if(!ConnectionToRouterAllowed(remote.pubkey))
+ return false;
// do we already have a pending job for this remote?
if(HasPendingConnectJob(remote.pubkey))
{
@@ -627,6 +630,7 @@ namespace llarp
const RouterID us = pubkey();
if(us == remote)
return;
+
if(!ConnectionToRouterAllowed(remote))
{
LogWarn("not connecting to ", remote, " as it's not permitted by config");
@@ -1144,7 +1148,8 @@ namespace llarp
void
Router::SendTo(RouterID remote, const ILinkMessage *msg, ILinkLayer *selected)
{
- METRICS_TIME_BLOCK("RouterSendTo", remote.ToString().c_str());
+ const std::string remoteName = "TX_" + remote.ToString();
+ METRICS_DYNAMIC_INCREMENT(msg->Name(), remoteName.c_str());
llarp_buffer_t buf(linkmsg_buffer);
if(!msg->BEncode(&buf))
@@ -1263,6 +1268,11 @@ namespace llarp
{
if(rc.IsPublicRouter() && whitelistRouters && IsServiceNode())
{
+ if(lokinetRouters.size() == 0)
+ {
+ LogError("we have no service nodes in whitelist");
+ return false;
+ }
if(lokinetRouters.find(rc.pubkey) == lokinetRouters.end())
{
RouterID sn(rc.pubkey);
diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp
index d603a2c97..806b4e7f6 100644
--- a/llarp/service/endpoint.cpp
+++ b/llarp/service/endpoint.cpp
@@ -1175,12 +1175,10 @@ namespace llarp
, currentIntroSet(introset)
{
- auto& profiling = parent->m_Router->routerProfiling();
updatingIntroSet = false;
for(const auto intro : introset.I)
{
- if(intro.expiresAt > m_NextIntro.expiresAt
- && !profiling.IsBad(intro.router))
+ if(intro.expiresAt > m_NextIntro.expiresAt)
{
m_NextIntro = intro;
remoteIntro = intro;
@@ -1489,7 +1487,7 @@ namespace llarp
}
bool
- Endpoint::OutboundContext::ShiftIntroduction()
+ Endpoint::OutboundContext::ShiftIntroduction(bool rebuild)
{
bool success = false;
auto now = Now();
@@ -1524,7 +1522,7 @@ namespace llarp
break;
}
}
- if(shifted)
+ if(shifted && rebuild)
{
lastShift = now;
BuildOneAlignedTo(m_NextIntro.router);
@@ -1848,13 +1846,25 @@ namespace llarp
llarp::path::PathRole roles)
{
if(m_NextIntro.router.IsZero())
+ {
+ llarp::LogError("intro is not set, cannot select hops");
return false;
+ }
if(hop == numHops - 1)
{
if(db->Get(m_NextIntro.router, cur))
{
return true;
}
+ else if(router->routerProfiling().IsBad(m_NextIntro.router))
+ {
+ if(!ShiftIntroduction(false))
+ {
+ llarp::LogError("bad intro chosen, not selecting hop");
+ return false;
+ }
+ return db->Get(m_NextIntro.router, cur);
+ }
else
{
// we don't have it?
diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp
index b7afeb853..3eef971a4 100644
--- a/llarp/service/endpoint.hpp
+++ b/llarp/service/endpoint.hpp
@@ -239,8 +239,9 @@ namespace llarp
bool markedBad = false;
virtual bool
- ShiftIntroduction()
+ ShiftIntroduction(bool rebuild = true)
{
+ (void)rebuild;
return true;
};
@@ -288,7 +289,7 @@ namespace llarp
/// update the current selected intro to be a new best introduction
/// return true if we have changed intros
bool
- ShiftIntroduction() override;
+ ShiftIntroduction(bool rebuild = true) override;
/// mark the current remote intro as bad
bool
diff --git a/llarp/util/metrics_types.cpp b/llarp/util/metrics_types.cpp
index 6d8f0a743..6c97be19b 100644
--- a/llarp/util/metrics_types.cpp
+++ b/llarp/util/metrics_types.cpp
@@ -64,6 +64,8 @@ namespace llarp
return "Rate";
case Type::RateCount:
return "RateCount";
+ default:
+ return "???";
}
}