improve send rate

pull/15/head
Jeff Becker 6 years ago
parent 6f4e998910
commit b56d25730e
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -161,6 +161,8 @@ namespace llarp
IntroSet currentIntroSet;
/// the current selected intro
Introduction selectedIntro;
/// set to true if we are updating the remote introset right now
bool updatingIntroSet;
/// update the current selected intro to be a new best introduction
void

@ -31,6 +31,8 @@ namespace llarp
FragmentOverheadSize + FragmentBodySize;
typedef llarp::AlignedBuffer< FragmentBufferSize > FragmentBuffer;
constexpr size_t MaxSend = 64;
typedef llarp::AlignedBuffer< MAX_LINK_MSG_SIZE > MessageBuffer;
struct LinkLayer;
@ -47,8 +49,8 @@ namespace llarp
llarp_time_t lastActive;
const static llarp_time_t sessionTimeout = 30 * 1000;
std::deque< utp_iovec > vecq;
std::deque< FragmentBuffer > sendq;
size_t sendBufOffset;
FragmentBuffer recvBuf;
size_t recvBufOffset;
@ -101,11 +103,41 @@ namespace llarp
void
PumpWrite()
{
if(!sock)
return;
ssize_t expect = 0;
std::vector< utp_iovec > vecs;
for(const auto& vec : vecq)
{
expect += vec.iov_len;
vecs.push_back(vec);
}
if(expect)
{
ssize_t s = utp_writev(sock, vecs.data(), vecs.size());
llarp::LogDebug("utp_writev wrote=", s, " expect=", expect,
" to=", remoteAddr);
while(s > vecq.front().iov_len)
{
s -= vecq.front().iov_len;
vecq.pop_front();
sendq.pop_front();
}
if(vecq.size())
{
auto& front = vecq.front();
front.iov_len -= s;
front.iov_base = ((byte_t*)front.iov_base) + s;
}
}
/*
while(sendq.size() > 0 && !stalled)
{
ssize_t expect = FragmentBufferSize - sendBufOffset;
ssize_t s = write_ll(sendq.front().data() + sendBufOffset, expect);
if(s != expect)
ssize_t s = write_ll(sendq.front().data() + sendBufOffset,
expect); if(s != expect)
{
llarp::LogDebug("stalled at offset=", sendBufOffset, " sz=", s,
" to ", remoteAddr);
@ -118,6 +150,7 @@ namespace llarp
sendq.pop_front();
}
}
*/
}
ssize_t
@ -540,14 +573,14 @@ namespace llarp
return true;
};
gotLIM = false;
sendBufOffset = 0;
recvBufOffset = 0;
TimedOut = [&](llarp_time_t now) -> bool {
return this->IsTimedOut(now) || this->state == eClose;
};
GetPubKey = std::bind(&BaseSession::RemotePubKey, this);
lastActive = llarp_time_now_ms();
Pump = std::bind(&BaseSession::PumpWrite, this);
// Pump = []() {};
Pump = std::bind(&BaseSession::PumpWrite, this);
Tick = std::bind(&BaseSession::TickImpl, this, std::placeholders::_1);
SendMessageBuffer = std::bind(&BaseSession::QueueWriteBuffers, this,
std::placeholders::_1);
@ -743,12 +776,7 @@ namespace llarp
}
else if(arg->state == UTP_STATE_WRITABLE)
{
if(session->IsEstablished())
{
llarp::LogDebug("write resumed for ", session->remoteAddr);
session->stalled = false;
session->PumpWrite();
}
session->PumpWrite();
}
else if(arg->state == UTP_STATE_EOF)
{
@ -779,6 +807,10 @@ namespace llarp
{
sendq.emplace_back();
auto& buf = sendq.back();
vecq.emplace_back();
auto& vec = vecq.back();
vec.iov_base = buf.data();
vec.iov_len = FragmentBufferSize;
llarp::LogDebug("encrypt then hash ", sz, " bytes last=", isLastFragment);
buf.Randomize();
byte_t* nonce = buf.data() + FragmentHashSize;

@ -720,8 +720,8 @@ namespace llarp
auto itr = m_PendingServiceLookups.find(addr);
if(itr != m_PendingServiceLookups.end())
{
itr->second(addr, nullptr);
m_PendingServiceLookups.erase(itr);
itr->second(addr, nullptr);
}
return false;
}
@ -778,6 +778,7 @@ namespace llarp
, m_Parent(parent)
{
updatingIntroSet = false;
selectedIntro.Clear();
ShiftIntroduction();
}
@ -795,6 +796,7 @@ namespace llarp
currentIntroSet = *i;
ShiftIntroduction();
}
updatingIntroSet = false;
return true;
}
@ -843,7 +845,7 @@ namespace llarp
for(const auto& intro : currentIntroSet.I)
{
m_Parent->EnsureRouterIsKnown(selectedIntro.router);
if(intro.expiresAt > selectedIntro.expiresAt)
if(selectedIntro < intro)
{
selectedIntro = intro;
shifted = true;
@ -1008,6 +1010,8 @@ namespace llarp
void
Endpoint::OutboundContext::UpdateIntroSet()
{
if(updatingIntroSet)
return;
auto addr = currentIntroSet.A.Addr();
auto path = m_Parent->PickRandomEstablishedPath();
if(path)
@ -1018,8 +1022,7 @@ namespace llarp
std::placeholders::_1, std::placeholders::_2),
addr, m_Parent->GenTXID());
if(!job->SendRequestViaPath(path, m_Parent->Router()))
llarp::LogError("send via path failed");
updatingIntroSet = job->SendRequestViaPath(path, m_Parent->Router());
}
else
{
@ -1032,10 +1035,9 @@ namespace llarp
bool
Endpoint::OutboundContext::Tick(llarp_time_t now)
{
if(selectedIntro.expiresAt <= now
|| selectedIntro.expiresAt - now < 30000)
if(selectedIntro.ExpiresSoon(now))
{
UpdateIntroSet();
ShiftIntroduction();
}
m_Parent->EnsureRouterIsKnown(selectedIntro.router);
// TODO: check for expiration of outbound context

Loading…
Cancel
Save