Merge pull request #448 from majestrate/master

staging
This commit is contained in:
Jeff 2019-03-26 17:46:49 -04:00 committed by GitHub
commit 65434a4e62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 102 additions and 16 deletions

View File

@ -382,10 +382,10 @@ namespace llarp
bool
ILinkLayer::PutSession(ILinkSession* s)
{
static constexpr size_t MaxSessionsPerEndpoint = 5;
Lock lock(&m_PendingMutex);
llarp::Addr addr = s->GetRemoteEndpoint();
auto itr = m_Pending.find(addr);
if(itr != m_Pending.end())
if(m_Pending.count(addr) >= MaxSessionsPerEndpoint)
return false;
m_Pending.emplace(addr, std::unique_ptr< ILinkSession >(s));
return true;

View File

@ -240,7 +240,7 @@ namespace llarp
m_AuthedLinks GUARDED_BY(m_AuthedLinksMutex);
Mutex m_PendingMutex
ACQUIRED_AFTER(m_AuthedLinksMutex); // protects m_Pending
std::unordered_map< llarp::Addr, std::unique_ptr< ILinkSession >,
std::unordered_multimap< llarp::Addr, std::unique_ptr< ILinkSession >,
llarp::Addr::Hash >
m_Pending GUARDED_BY(m_PendingMutex);
};

View File

@ -25,6 +25,8 @@
#include <link/utp_internal.hpp>
#include <util/metrics.hpp>
namespace llarp
{
namespace utp
@ -85,6 +87,8 @@ namespace llarp
ssize_t s = utp_writev(sock, vecs.data(), vecs.size());
if(s < 0)
return;
METRICS_DYNAMIC_INT_UPDATE(
"utp.session.tx", RouterID(remoteRC.pubkey).ToString().c_str(), s);
m_TXRate += s;
size_t sz = s;
while(vecq.size() && sz >= vecq.front().iov_len)
@ -178,6 +182,9 @@ namespace llarp
PruneInboundMessages(now);
m_TXRate = 0;
m_RXRate = 0;
METRICS_DYNAMIC_UPDATE("utp.session.sendq",
RouterID(remoteRC.pubkey).ToString().c_str(),
sendq.size());
}
/// low level read
@ -188,6 +195,8 @@ namespace llarp
Alive();
m_RXRate += sz;
size_t s = sz;
METRICS_DYNAMIC_INT_UPDATE(
"utp.session.rx", RouterID(remoteRC.pubkey).ToString().c_str(), s);
// process leftovers
if(recvBufOffset)
{
@ -228,6 +237,8 @@ namespace llarp
bool
Session::IsTimedOut(llarp_time_t now) const
{
if(state == eInitial)
return false;
if(sendq.size() >= MaxSendQueueSize)
{
return now - lastActive > 5000;
@ -346,11 +357,11 @@ namespace llarp
if(session && link)
{
link->HandleTimeout(session);
llarp::LogError(utp_error_code_names[arg->error_code], " via ",
session->remoteAddr);
if(arg->error_code == UTP_ETIMEDOUT)
{
link->HandleTimeout(session);
utp_close(arg->socket);
}
else
session->Close();
}
@ -577,6 +588,7 @@ namespace llarp
/// base constructor
Session::Session(LinkLayer* p)
{
state = eInitial;
m_NextTXMsgID = 0;
m_NextRXMsgID = 0;
parent = p;

View File

@ -42,7 +42,7 @@ namespace llarp
using FragmentBuffer = llarp::AlignedBuffer< FragmentBufferSize >;
/// maximum size for send queue for a session before we drop
constexpr size_t MaxSendQueueSize = 1024;
constexpr size_t MaxSendQueueSize = 64;
/// buffer for a link layer message
using MessageBuffer = llarp::AlignedBuffer< MAX_LINK_MSG_SIZE >;

View File

@ -29,6 +29,12 @@ namespace llarp
void
Clear() override;
const char*
Name() const override
{
return "DHTImmediate";
}
};
} // namespace llarp

View File

@ -31,6 +31,12 @@ namespace llarp
{
}
const char*
Name() const override
{
return "Discard";
}
bool
DecodeKey(__attribute__((unused)) const llarp_buffer_t& key,
__attribute__((unused)) llarp_buffer_t* buf) override

View File

@ -44,6 +44,12 @@ namespace llarp
void
Clear() override;
const char*
Name() const override
{
return "LinkIntro";
}
};
} // namespace llarp

View File

@ -9,6 +9,7 @@
#include <router_contact.hpp>
#include <util/buffer.hpp>
#include <util/logger.hpp>
#include <util/metrics.hpp>
namespace llarp
{
@ -67,10 +68,12 @@ namespace llarp
}
// create the message to parse based off message type
llarp::LogDebug("inbound message ", *strbuf.cur);
bool isLIM = false;
switch(*strbuf.cur)
{
case 'i':
handler->msg = &handler->holder->i;
isLIM = true;
break;
case 'd':
handler->msg = &handler->holder->d;
@ -90,6 +93,14 @@ namespace llarp
default:
return false;
}
if(!isLIM)
{
const std::string host =
"RX_" + RouterID(handler->from->GetPubKey()).ToString();
METRICS_DYNAMIC_INCREMENT(handler->msg->Name(), host.c_str());
}
handler->msg->session = handler->from;
handler->firstkey = false;
return true;

View File

@ -30,6 +30,10 @@ namespace llarp
virtual void
Clear() = 0;
// the name of this kind of message
virtual const char*
Name() const = 0;
};
} // namespace llarp

View File

@ -31,6 +31,12 @@ namespace llarp
void
Clear() override;
const char*
Name() const override
{
return "RelayUpstream";
}
};
struct RelayDownstreamMessage : public ILinkMessage
@ -53,6 +59,12 @@ namespace llarp
void
Clear() override;
const char*
Name() const override
{
return "RelayDownstream";
}
};
} // namespace llarp

View File

@ -68,6 +68,12 @@ namespace llarp
bool
AsyncDecrypt(llarp::path::PathContext *context) const;
const char *
Name() const override
{
return "RelayCommit";
}
};
} // namespace llarp

View File

@ -63,6 +63,7 @@ struct TryConnectJob
void
Success()
{
router->routerProfiling().MarkSuccess(rc.pubkey);
router->FlushOutboundFor(rc.pubkey, link);
}
@ -165,6 +166,8 @@ namespace llarp
const RouterID us = pubkey();
if(remote.pubkey == us)
return false;
if(!ConnectionToRouterAllowed(remote.pubkey))
return false;
// do we already have a pending job for this remote?
if(HasPendingConnectJob(remote.pubkey))
{
@ -627,6 +630,7 @@ namespace llarp
const RouterID us = pubkey();
if(us == remote)
return;
if(!ConnectionToRouterAllowed(remote))
{
LogWarn("not connecting to ", remote, " as it's not permitted by config");
@ -1144,7 +1148,8 @@ namespace llarp
void
Router::SendTo(RouterID remote, const ILinkMessage *msg, ILinkLayer *selected)
{
METRICS_TIME_BLOCK("RouterSendTo", remote.ToString().c_str());
const std::string remoteName = "TX_" + remote.ToString();
METRICS_DYNAMIC_INCREMENT(msg->Name(), remoteName.c_str());
llarp_buffer_t buf(linkmsg_buffer);
if(!msg->BEncode(&buf))
@ -1263,6 +1268,11 @@ namespace llarp
{
if(rc.IsPublicRouter() && whitelistRouters && IsServiceNode())
{
if(lokinetRouters.size() == 0)
{
LogError("we have no service nodes in whitelist");
return false;
}
if(lokinetRouters.find(rc.pubkey) == lokinetRouters.end())
{
RouterID sn(rc.pubkey);

View File

@ -1175,12 +1175,10 @@ namespace llarp
, currentIntroSet(introset)
{
auto& profiling = parent->m_Router->routerProfiling();
updatingIntroSet = false;
for(const auto intro : introset.I)
{
if(intro.expiresAt > m_NextIntro.expiresAt
&& !profiling.IsBad(intro.router))
if(intro.expiresAt > m_NextIntro.expiresAt)
{
m_NextIntro = intro;
remoteIntro = intro;
@ -1489,7 +1487,7 @@ namespace llarp
}
bool
Endpoint::OutboundContext::ShiftIntroduction()
Endpoint::OutboundContext::ShiftIntroduction(bool rebuild)
{
bool success = false;
auto now = Now();
@ -1524,7 +1522,7 @@ namespace llarp
break;
}
}
if(shifted)
if(shifted && rebuild)
{
lastShift = now;
BuildOneAlignedTo(m_NextIntro.router);
@ -1848,13 +1846,25 @@ namespace llarp
llarp::path::PathRole roles)
{
if(m_NextIntro.router.IsZero())
{
llarp::LogError("intro is not set, cannot select hops");
return false;
}
if(hop == numHops - 1)
{
if(db->Get(m_NextIntro.router, cur))
{
return true;
}
else if(router->routerProfiling().IsBad(m_NextIntro.router))
{
if(!ShiftIntroduction(false))
{
llarp::LogError("bad intro chosen, not selecting hop");
return false;
}
return db->Get(m_NextIntro.router, cur);
}
else
{
// we don't have it?

View File

@ -239,8 +239,9 @@ namespace llarp
bool markedBad = false;
virtual bool
ShiftIntroduction()
ShiftIntroduction(bool rebuild = true)
{
(void)rebuild;
return true;
};
@ -288,7 +289,7 @@ namespace llarp
/// update the current selected intro to be a new best introduction
/// return true if we have changed intros
bool
ShiftIntroduction() override;
ShiftIntroduction(bool rebuild = true) override;
/// mark the current remote intro as bad
bool

View File

@ -64,6 +64,8 @@ namespace llarp
return "Rate";
case Type::RateCount:
return "RateCount";
default:
return "???";
}
}