Merge branch 'master' of ssh://github.com/majestrate/loki-network

pull/453/head
Jeff 5 years ago
commit 6e9fa786af

@ -25,6 +25,8 @@
#include <link/utp_internal.hpp>
#include <util/metrics.hpp>
namespace llarp
{
namespace utp
@ -85,6 +87,8 @@ namespace llarp
ssize_t s = utp_writev(sock, vecs.data(), vecs.size());
if(s < 0)
return;
METRICS_DYNAMIC_INT_UPDATE(
"utp.session.tx", RouterID(remoteRC.pubkey).ToString().c_str(), s);
m_TXRate += s;
size_t sz = s;
while(vecq.size() && sz >= vecq.front().iov_len)
@ -178,6 +182,9 @@ namespace llarp
PruneInboundMessages(now);
m_TXRate = 0;
m_RXRate = 0;
METRICS_DYNAMIC_UPDATE("utp.session.sendq",
RouterID(remoteRC.pubkey).ToString().c_str(),
sendq.size());
}
/// low level read
@ -188,6 +195,8 @@ namespace llarp
Alive();
m_RXRate += sz;
size_t s = sz;
METRICS_DYNAMIC_INT_UPDATE(
"utp.session.rx", RouterID(remoteRC.pubkey).ToString().c_str(), s);
// process leftovers
if(recvBufOffset)
{

@ -42,7 +42,7 @@ namespace llarp
using FragmentBuffer = llarp::AlignedBuffer< FragmentBufferSize >;
/// maximum size for send queue for a session before we drop
constexpr size_t MaxSendQueueSize = 1024;
constexpr size_t MaxSendQueueSize = 64;
/// buffer for a link layer message
using MessageBuffer = llarp::AlignedBuffer< MAX_LINK_MSG_SIZE >;

@ -29,6 +29,12 @@ namespace llarp
void
Clear() override;
const char*
Name() const override
{
return "DHTImmediate";
}
};
} // namespace llarp

@ -31,6 +31,12 @@ namespace llarp
{
}
const char*
Name() const override
{
return "Discard";
}
bool
DecodeKey(__attribute__((unused)) const llarp_buffer_t& key,
__attribute__((unused)) llarp_buffer_t* buf) override

@ -44,6 +44,12 @@ namespace llarp
void
Clear() override;
const char*
Name() const override
{
return "LinkIntro";
}
};
} // namespace llarp

@ -9,6 +9,7 @@
#include <router_contact.hpp>
#include <util/buffer.hpp>
#include <util/logger.hpp>
#include <util/metrics.hpp>
namespace llarp
{
@ -67,10 +68,12 @@ namespace llarp
}
// create the message to parse based off message type
llarp::LogDebug("inbound message ", *strbuf.cur);
bool isLIM = false;
switch(*strbuf.cur)
{
case 'i':
handler->msg = &handler->holder->i;
isLIM = true;
break;
case 'd':
handler->msg = &handler->holder->d;
@ -90,6 +93,14 @@ namespace llarp
default:
return false;
}
if(!isLIM)
{
const std::string host =
"RX_" + RouterID(handler->from->GetPubKey()).ToString();
METRICS_DYNAMIC_INCREMENT(handler->msg->Name(), host.c_str());
}
handler->msg->session = handler->from;
handler->firstkey = false;
return true;

@ -30,6 +30,10 @@ namespace llarp
virtual void
Clear() = 0;
// the name of this kind of message
virtual const char*
Name() const = 0;
};
} // namespace llarp

@ -31,6 +31,12 @@ namespace llarp
void
Clear() override;
const char*
Name() const override
{
return "RelayUpstream";
}
};
struct RelayDownstreamMessage : public ILinkMessage
@ -53,6 +59,12 @@ namespace llarp
void
Clear() override;
const char*
Name() const override
{
return "RelayDownstream";
}
};
} // namespace llarp

@ -68,6 +68,12 @@ namespace llarp
bool
AsyncDecrypt(llarp::path::PathContext *context) const;
const char *
Name() const override
{
return "RelayCommit";
}
};
} // namespace llarp

@ -166,6 +166,8 @@ namespace llarp
const RouterID us = pubkey();
if(remote.pubkey == us)
return false;
if(!ConnectionToRouterAllowed(remote.pubkey))
return false;
// do we already have a pending job for this remote?
if(HasPendingConnectJob(remote.pubkey))
{
@ -628,6 +630,7 @@ namespace llarp
const RouterID us = pubkey();
if(us == remote)
return;
if(!ConnectionToRouterAllowed(remote))
{
LogWarn("not connecting to ", remote, " as it's not permitted by config");
@ -1145,7 +1148,8 @@ namespace llarp
void
Router::SendTo(RouterID remote, const ILinkMessage *msg, ILinkLayer *selected)
{
METRICS_TIME_BLOCK("RouterSendTo", remote.ToString().c_str());
const std::string remoteName = "TX_" + remote.ToString();
METRICS_DYNAMIC_INCREMENT(msg->Name(), remoteName.c_str());
llarp_buffer_t buf(linkmsg_buffer);
if(!msg->BEncode(&buf))
@ -1264,6 +1268,11 @@ namespace llarp
{
if(rc.IsPublicRouter() && whitelistRouters && IsServiceNode())
{
if(lokinetRouters.size() == 0)
{
LogError("we have no service nodes in whitelist");
return false;
}
if(lokinetRouters.find(rc.pubkey) == lokinetRouters.end())
{
RouterID sn(rc.pubkey);

@ -1175,12 +1175,10 @@ namespace llarp
, currentIntroSet(introset)
{
auto& profiling = parent->m_Router->routerProfiling();
updatingIntroSet = false;
for(const auto intro : introset.I)
{
if(intro.expiresAt > m_NextIntro.expiresAt
&& !profiling.IsBad(intro.router))
if(intro.expiresAt > m_NextIntro.expiresAt)
{
m_NextIntro = intro;
remoteIntro = intro;
@ -1489,7 +1487,7 @@ namespace llarp
}
bool
Endpoint::OutboundContext::ShiftIntroduction()
Endpoint::OutboundContext::ShiftIntroduction(bool rebuild)
{
bool success = false;
auto now = Now();
@ -1524,7 +1522,7 @@ namespace llarp
break;
}
}
if(shifted)
if(shifted && rebuild)
{
lastShift = now;
BuildOneAlignedTo(m_NextIntro.router);
@ -1848,13 +1846,25 @@ namespace llarp
llarp::path::PathRole roles)
{
if(m_NextIntro.router.IsZero())
{
llarp::LogError("intro is not set, cannot select hops");
return false;
}
if(hop == numHops - 1)
{
if(db->Get(m_NextIntro.router, cur))
{
return true;
}
else if(router->routerProfiling().IsBad(m_NextIntro.router))
{
if(!ShiftIntroduction(false))
{
llarp::LogError("bad intro chosen, not selecting hop");
return false;
}
return db->Get(m_NextIntro.router, cur);
}
else
{
// we don't have it?

@ -239,8 +239,9 @@ namespace llarp
bool markedBad = false;
virtual bool
ShiftIntroduction()
ShiftIntroduction(bool rebuild = true)
{
(void)rebuild;
return true;
};
@ -288,7 +289,7 @@ namespace llarp
/// update the current selected intro to be a new best introduction
/// return true if we have changed intros
bool
ShiftIntroduction() override;
ShiftIntroduction(bool rebuild = true) override;
/// mark the current remote intro as bad
bool

Loading…
Cancel
Save