pull/23/head
Jeff Becker 6 years ago
parent f03698fb33
commit 291cc57395
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -58,25 +58,14 @@ namespace llarp
llarp_time_t lastActive; llarp_time_t lastActive;
const static llarp_time_t sessionTimeout = 30 * 1000; const static llarp_time_t sessionTimeout = 30 * 1000;
llarp::util::Mutex encryptq_mtx;
std::deque< FragmentBuffer > encryptq;
llarp::util::Mutex decryptq_mtx;
std::deque< FragmentBuffer > decryptq;
llarp::util::Mutex send_mtx;
std::deque< utp_iovec > vecq; std::deque< utp_iovec > vecq;
std::deque< FragmentBuffer > sendq; std::deque< FragmentBuffer > sendq;
llarp::util::Mutex recv_mtx;
std::deque< FragmentBuffer > recvq;
FragmentBuffer recvBuf; FragmentBuffer recvBuf;
size_t recvBufOffset; size_t recvBufOffset;
MessageBuffer recvMsg; MessageBuffer recvMsg;
size_t recvMsgOffset; size_t recvMsgOffset;
bool stalled = false; bool stalled = false;
std::atomic< bool > m_working;
void void
Alive(); Alive();
@ -120,84 +109,11 @@ namespace llarp
BaseSession(); BaseSession();
~BaseSession(); ~BaseSession();
static void
HandleCrypto(void* user);
void
DoPump()
{
if(!ReadAll())
{
Close();
return;
}
WriteAll();
bool shouldCrypto = encryptq.size() || decryptq.size();
shouldCrypto &= !m_working;
if(shouldCrypto)
{
Busy();
llarp_threadpool_queue_job(Router()->tp, {this, &HandleCrypto});
}
}
void void
Busy() PumpWrite()
{
m_working.store(true);
}
void
NotBusy()
{
m_working.store(false);
}
bool
ReadAll()
{
llarp::util::Lock lock(recv_mtx);
auto itr = recvq.begin();
while(itr != recvq.end())
{
auto body = InitBuffer(itr->data() + FragmentOverheadSize,
FragmentBufferSize - FragmentOverheadSize);
uint32_t upper, lower;
if(!(llarp_buffer_read_uint32(&body, &upper)
&& llarp_buffer_read_uint32(&body, &lower)))
return false;
bool fragmentEnd = upper == 0;
llarp::LogDebug("fragment size ", lower, " from ", remoteAddr);
if(lower + recvMsgOffset > recvMsg.size())
{
llarp::LogError("Fragment too big: ", lower, " bytes");
return false;
}
memcpy(recvMsg.data() + recvMsgOffset, body.cur, lower);
recvMsgOffset += lower;
if(fragmentEnd)
{
// got a message
llarp::LogDebug("end of message from ", remoteAddr);
auto mbuf = InitBuffer(recvMsg.data(), recvMsgOffset);
if(!Router()->HandleRecvLinkMessageBuffer(this, mbuf))
{
llarp::LogWarn("failed to handle message from ", remoteAddr);
llarp::DumpBuffer(mbuf);
}
recvMsgOffset = 0;
}
itr = recvq.erase(itr);
}
return true;
}
void
WriteAll()
{ {
if(!sock) if(!sock)
return; return;
llarp::util::Lock lock(send_mtx);
ssize_t expect = 0; ssize_t expect = 0;
std::vector< utp_iovec > vecs; std::vector< utp_iovec > vecs;
for(const auto& vec : vecq) for(const auto& vec : vecq)
@ -211,7 +127,7 @@ namespace llarp
llarp::LogDebug("utp_writev wrote=", s, " expect=", expect, llarp::LogDebug("utp_writev wrote=", s, " expect=", expect,
" to=", remoteAddr); " to=", remoteAddr);
while(s > ssize_t(vecq.front().iov_len)) while(s > vecq.front().iov_len)
{ {
s -= vecq.front().iov_len; s -= vecq.front().iov_len;
vecq.pop_front(); vecq.pop_front();
@ -224,16 +140,46 @@ namespace llarp
front.iov_base = ((byte_t*)front.iov_base) + s; front.iov_base = ((byte_t*)front.iov_base) + s;
} }
} }
/*
while(sendq.size() > 0 && !stalled)
{
ssize_t expect = FragmentBufferSize - sendBufOffset;
ssize_t s = write_ll(sendq.front().data() + sendBufOffset,
expect); if(s != expect)
{
llarp::LogDebug("stalled at offset=", sendBufOffset, " sz=", s,
" to ", remoteAddr);
sendBufOffset += s;
stalled = true;
}
else
{
sendBufOffset = 0;
sendq.pop_front();
}
}
*/
}
ssize_t
write_ll(byte_t* buf, size_t sz)
{
if(sock == nullptr)
{
llarp::LogWarn("write_ll failed: no socket");
return 0;
}
ssize_t s = utp_write(sock, buf, sz);
llarp::LogDebug("write_ll ", s, " of ", sz, " bytes to ", remoteAddr);
return s;
} }
bool bool
VerifyThenDecrypt(byte_t* buf); VerifyThenDecrypt(byte_t* buf);
void void
QueueRecvFragment(const byte_t* buf); EncryptThenHash(const byte_t* ptr, uint32_t sz, bool isLastFragment);
void
QueueSendFragment(const byte_t* ptr, uint32_t sz, bool isLastFragment);
bool bool
QueueWriteBuffers(llarp_buffer_t buf) QueueWriteBuffers(llarp_buffer_t buf)
@ -247,7 +193,7 @@ namespace llarp
while(sz) while(sz)
{ {
uint32_t s = std::min(FragmentBodyPayloadSize, sz); uint32_t s = std::min(FragmentBodyPayloadSize, sz);
QueueSendFragment(ptr, s, ((sz - s) == 0)); EncryptThenHash(ptr, s, ((sz - s) == 0));
ptr += s; ptr += s;
sz -= s; sz -= s;
} }
@ -326,7 +272,8 @@ namespace llarp
s -= left; s -= left;
recvBufOffset = 0; recvBufOffset = 0;
ptr += left; ptr += left;
QueueRecvFragment(recvBuf.data()); if(!VerifyThenDecrypt(recvBuf.data()))
return false;
} }
} }
// process full fragments // process full fragments
@ -334,7 +281,8 @@ namespace llarp
{ {
recvBufOffset = 0; recvBufOffset = 0;
llarp::LogDebug("process full sz=", s); llarp::LogDebug("process full sz=", s);
QueueRecvFragment(ptr); if(!VerifyThenDecrypt(ptr))
return false;
ptr += FragmentBufferSize; ptr += FragmentBufferSize;
s -= FragmentBufferSize; s -= FragmentBufferSize;
} }
@ -357,8 +305,6 @@ namespace llarp
bool bool
IsTimedOut(llarp_time_t now) const IsTimedOut(llarp_time_t now) const
{ {
if(m_working)
return false;
if(state == eClose) if(state == eClose)
return true; return true;
if(now < lastActive) if(now < lastActive)
@ -392,7 +338,6 @@ namespace llarp
{ {
utp_context* _utp_ctx = nullptr; utp_context* _utp_ctx = nullptr;
llarp_router* router = nullptr; llarp_router* router = nullptr;
static uint64 static uint64
OnRead(utp_callback_arguments* arg); OnRead(utp_callback_arguments* arg);
@ -581,7 +526,6 @@ namespace llarp
} }
} }
} }
void void
Stop() Stop()
{ {
@ -628,7 +572,6 @@ namespace llarp
BaseSession::BaseSession(LinkLayer* p) BaseSession::BaseSession(LinkLayer* p)
{ {
m_working.store(false);
parent = p; parent = p;
remoteTransportPubKey.Zero(); remoteTransportPubKey.Zero();
recvMsgOffset = 0; recvMsgOffset = 0;
@ -660,7 +603,7 @@ namespace llarp
GetPubKey = std::bind(&BaseSession::RemotePubKey, this); GetPubKey = std::bind(&BaseSession::RemotePubKey, this);
lastActive = llarp_time_now_ms(); lastActive = llarp_time_now_ms();
// Pump = []() {}; // Pump = []() {};
Pump = std::bind(&BaseSession::DoPump, this); Pump = std::bind(&BaseSession::PumpWrite, this);
Tick = std::bind(&BaseSession::TickImpl, this, std::placeholders::_1); Tick = std::bind(&BaseSession::TickImpl, this, std::placeholders::_1);
SendMessageBuffer = std::bind(&BaseSession::QueueWriteBuffers, this, SendMessageBuffer = std::bind(&BaseSession::QueueWriteBuffers, this,
std::placeholders::_1); std::placeholders::_1);
@ -858,7 +801,7 @@ namespace llarp
} }
else if(arg->state == UTP_STATE_WRITABLE) else if(arg->state == UTP_STATE_WRITABLE)
{ {
session->WriteAll(); session->PumpWrite();
} }
else if(arg->state == UTP_STATE_EOF) else if(arg->state == UTP_STATE_EOF)
{ {
@ -882,61 +825,22 @@ namespace llarp
return 0; return 0;
} }
template < typename Queue_t >
void void
EncryptThenHashQueue(llarp_crypto* crypto, const byte_t* sessionKey, BaseSession::EncryptThenHash(const byte_t* ptr, uint32_t sz,
Queue_t& queue) bool isLastFragment)
{
auto itr = queue.begin();
while(itr != queue.end())
{
byte_t* base = itr->data();
byte_t* nonce = base + FragmentHashSize;
byte_t* body = nonce + FragmentNonceSize;
auto payload =
InitBuffer(body, FragmentBufferSize - FragmentOverheadSize);
// encrypt
crypto->xchacha20(payload, sessionKey, nonce);
payload.base = nonce;
payload.cur = payload.base;
payload.sz = FragmentBufferSize - FragmentHashSize;
// key'd hash
crypto->hmac(base, payload, sessionKey);
++itr;
}
}
void
BaseSession::QueueSendFragment(const byte_t* ptr, uint32_t sz,
bool isLastFragment)
{ {
byte_t* nonce; sendq.emplace_back();
byte_t* body; auto& buf = sendq.back();
bool encryptImmediate = false; vecq.emplace_back();
if(state == eSessionReady) auto& vec = vecq.back();
{ vec.iov_base = buf.data();
llarp::util::Lock lock(encryptq_mtx); vec.iov_len = FragmentBufferSize;
encryptq.emplace_back(); llarp::LogDebug("encrypt then hash ", sz, " bytes last=", isLastFragment);
auto& buf = encryptq.back(); buf.Randomize();
llarp::LogDebug("encrypt then hash ", sz, byte_t* nonce = buf.data() + FragmentHashSize;
" bytes last=", isLastFragment); byte_t* body = nonce + FragmentNonceSize;
buf.Randomize(); byte_t* base = body;
nonce = buf.data() + FragmentHashSize;
}
else
{
llarp::util::Lock sendlock(send_mtx);
sendq.emplace_back();
auto& buf = sendq.back();
buf.Randomize();
nonce = buf.data() + FragmentHashSize;
encryptImmediate = true;
}
body = nonce + FragmentNonceSize;
if(isLastFragment) if(isLastFragment)
htobe32buf(body, 0); htobe32buf(body, 0);
else else
@ -946,88 +850,17 @@ namespace llarp
body += sizeof(uint32_t); body += sizeof(uint32_t);
memcpy(body, ptr, sz); memcpy(body, ptr, sz);
if(encryptImmediate) auto payload =
{ InitBuffer(base, FragmentBufferSize - FragmentOverheadSize);
llarp::util::Lock sendlock(send_mtx);
EncryptThenHashQueue(&Router()->crypto, sessionKey, sendq);
vecq.emplace_back();
vecq.back().iov_base = sendq.back().data();
vecq.back().iov_len = FragmentBufferSize;
}
}
template < typename Queue_t >
bool
VerifyThenDecryptQueue(llarp_crypto* crypto, const byte_t* sessionKey,
Queue_t& queue)
{
ShortHash digest;
auto itr = queue.begin();
while(itr != queue.end())
{
byte_t* buf = itr->data();
auto hbuf = InitBuffer(buf + FragmentHashSize,
FragmentBufferSize - FragmentHashSize);
if(crypto->hmac(digest.data(), hbuf, sessionKey))
{
return false;
}
if(memcmp(digest, buf, FragmentHashSize))
{
return false;
}
auto body = InitBuffer(buf + FragmentOverheadSize,
FragmentBufferSize - FragmentOverheadSize);
crypto->xchacha20(body, sessionKey, buf + FragmentHashSize);
++itr;
}
return true;
}
void
BaseSession::HandleCrypto(void* user)
{
BaseSession* self = static_cast< BaseSession* >(user);
llarp_crypto* crypto = &self->Router()->crypto;
// encrypt // encrypt
{ Router()->crypto.xchacha20(payload, sessionKey, nonce);
llarp::util::Lock enclock(self->encryptq_mtx);
EncryptThenHashQueue(crypto, self->sessionKey, self->encryptq); payload.base = nonce;
{ payload.cur = payload.base;
llarp::util::Lock sendlock(self->send_mtx); payload.sz = FragmentBufferSize - FragmentHashSize;
while(self->encryptq.size()) // key'd hash
{ Router()->crypto.hmac(buf.data(), payload, sessionKey);
self->sendq.emplace_back();
// uses operator = from aligned buffer
self->sendq.back() = self->encryptq.front();
self->encryptq.pop_front();
self->vecq.emplace_back();
self->vecq.back().iov_base = self->sendq.back().data();
self->vecq.back().iov_len = FragmentBufferSize;
}
}
}
// decrypt
{
llarp::util::Lock declock(self->decryptq_mtx);
if(VerifyThenDecryptQueue(crypto, self->sessionKey, self->decryptq))
{
llarp::util::Lock recvlock(self->recv_mtx);
while(self->decryptq.size())
{
self->recvq.emplace_back();
// uses operator = from aligned buffer
self->recvq.back() = self->decryptq.front();
self->decryptq.pop_front();
}
}
else
{
// TODO: should we post a job instead?
self->Close();
}
}
self->NotBusy();
} }
void void
@ -1042,25 +875,61 @@ namespace llarp
Alive(); Alive();
} }
void bool
BaseSession::QueueRecvFragment(const byte_t* buf) BaseSession::VerifyThenDecrypt(byte_t* buf)
{ {
if(state == eSessionReady) llarp::LogDebug("verify then decrypt ", remoteAddr);
ShortHash digest;
auto hbuf = InitBuffer(buf + FragmentHashSize,
FragmentBufferSize - FragmentHashSize);
if(!Router()->crypto.hmac(digest.data(), hbuf, sessionKey))
{ {
decryptq.emplace_back(); llarp::LogError("keyed hash failed");
memcpy(decryptq.back().data(), buf, FragmentBufferSize); return false;
} }
else if(state == eLinkEstablished) ShortHash expected(buf);
if(expected != digest)
{ {
// handshake it llarp::LogError("Message Integrity Failed: got ", digest, " from ",
std::deque< FragmentBuffer > handshakeq; remoteAddr, " instead of ", expected);
handshakeq.emplace_back(buf); llarp::DumpBuffer(InitBuffer(buf, FragmentBufferSize));
if(VerifyThenDecryptQueue(&Router()->crypto, sessionKey, handshakeq)) return false;
ReadAll();
else
Close();
} }
auto body = InitBuffer(buf + FragmentOverheadSize,
FragmentBufferSize - FragmentOverheadSize);
Router()->crypto.xchacha20(body, sessionKey, buf + FragmentHashSize);
uint32_t upper, lower;
if(!(llarp_buffer_read_uint32(&body, &upper)
&& llarp_buffer_read_uint32(&body, &lower)))
return false;
bool fragmentEnd = upper == 0;
llarp::LogDebug("fragment size ", lower, " from ", remoteAddr);
if(lower + recvMsgOffset > recvMsg.size())
{
llarp::LogError("Fragment too big: ", lower, " bytes");
return false;
}
memcpy(recvMsg.data() + recvMsgOffset, body.cur, lower);
recvMsgOffset += lower;
if(fragmentEnd)
{
// got a message
llarp::LogDebug("end of message from ", remoteAddr);
auto mbuf = InitBuffer(recvMsg.data(), recvMsgOffset);
if(!Router()->HandleRecvLinkMessageBuffer(this, mbuf))
{
llarp::LogWarn("failed to handle message from ", remoteAddr);
llarp::DumpBuffer(mbuf);
}
recvMsgOffset = 0;
}
return true;
} }
void void
BaseSession::Close() BaseSession::Close()
{ {

@ -208,10 +208,21 @@ llarp_router::SendToOrQueue(const llarp::RouterID &remote,
{ {
llarp::LogWarn("tried to queue a message to ", remote, llarp::LogWarn("tried to queue a message to ", remote,
" but the queue is full so we drop it like it's hawt"); " but the queue is full so we drop it like it's hawt");
return false; }
llarp::RouterContact remoteRC;
// we don't have an open session to that router right now
if(llarp_nodedb_get_rc(nodedb, remote, remoteRC))
{
// try connecting directly as the rc is loaded from disk
llarp_router_try_connect(this, remoteRC, 10);
return true;
} }
return TryEstablishTo(remote); // we don't have the RC locally so do a dht lookup
dht->impl.LookupRouter(remote,
std::bind(&llarp_router::HandleDHTLookupForSendTo,
this, remote, std::placeholders::_1));
return true;
} }
void void
@ -384,7 +395,7 @@ llarp_router::handle_router_ticker(void *user, uint64_t orig, uint64_t left)
self->ScheduleTicker(orig); self->ScheduleTicker(orig);
} }
bool void
llarp_router::TryEstablishTo(const llarp::RouterID &remote) llarp_router::TryEstablishTo(const llarp::RouterID &remote)
{ {
llarp::RouterContact rc; llarp::RouterContact rc;
@ -392,25 +403,17 @@ llarp_router::TryEstablishTo(const llarp::RouterID &remote)
{ {
// try connecting async // try connecting async
llarp_router_try_connect(this, rc, 5); llarp_router_try_connect(this, rc, 5);
return true;
} }
else else if(!routerProfiling.IsBad(remote))
{ {
if(routerProfiling.IsBad(remote))
{
llarp::LogWarn("won't connect to flakey router ", remote);
return false;
}
if(dht->impl.HasRouterLookup(remote)) if(dht->impl.HasRouterLookup(remote))
return false; return;
llarp::LogInfo("looking up router ", remote); llarp::LogInfo("looking up router ", remote);
// dht lookup as we don't know it // dht lookup as we don't know it
dht->impl.LookupRouter( dht->impl.LookupRouter(
remote, remote,
std::bind(&llarp_router::HandleDHTLookupForTryEstablishTo, this, remote, std::bind(&llarp_router::HandleDHTLookupForTryEstablishTo, this, remote,
std::placeholders::_1)); std::placeholders::_1));
return true;
} }
} }
@ -476,7 +479,7 @@ llarp_router::Tick()
if(inboundLinks.size() == 0) if(inboundLinks.size() == 0)
{ {
ssize_t N = llarp_nodedb_num_loaded(nodedb); auto N = llarp_nodedb_num_loaded(nodedb);
if(N < minRequiredRouters) if(N < minRequiredRouters)
{ {
llarp::LogInfo("We need at least ", minRequiredRouters, llarp::LogInfo("We need at least ", minRequiredRouters,

@ -217,8 +217,7 @@ struct llarp_router
DiscardOutboundFor(const llarp::RouterID &remote); DiscardOutboundFor(const llarp::RouterID &remote);
/// try establishing a session to a remote router /// try establishing a session to a remote router
/// return false if we didn't make a job to do it void
bool
TryEstablishTo(const llarp::RouterID &remote); TryEstablishTo(const llarp::RouterID &remote);
/// flush outbound message queue /// flush outbound message queue

Loading…
Cancel
Save