From 291cc573951efdece5cdae3d18388fdd54878d7a Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sun, 7 Oct 2018 11:29:36 -0400 Subject: [PATCH] revert --- llarp/link/utp.cpp | 359 ++++++++++++++------------------------------- llarp/router.cpp | 31 ++-- llarp/router.hpp | 3 +- 3 files changed, 132 insertions(+), 261 deletions(-) diff --git a/llarp/link/utp.cpp b/llarp/link/utp.cpp index a686fcc18..a1cc7124c 100644 --- a/llarp/link/utp.cpp +++ b/llarp/link/utp.cpp @@ -58,25 +58,14 @@ namespace llarp llarp_time_t lastActive; const static llarp_time_t sessionTimeout = 30 * 1000; - llarp::util::Mutex encryptq_mtx; - std::deque< FragmentBuffer > encryptq; - - llarp::util::Mutex decryptq_mtx; - std::deque< FragmentBuffer > decryptq; - - llarp::util::Mutex send_mtx; std::deque< utp_iovec > vecq; std::deque< FragmentBuffer > sendq; - llarp::util::Mutex recv_mtx; - std::deque< FragmentBuffer > recvq; - FragmentBuffer recvBuf; size_t recvBufOffset; MessageBuffer recvMsg; size_t recvMsgOffset; bool stalled = false; - std::atomic< bool > m_working; void Alive(); @@ -120,84 +109,11 @@ namespace llarp BaseSession(); ~BaseSession(); - static void - HandleCrypto(void* user); - - void - DoPump() - { - if(!ReadAll()) - { - Close(); - return; - } - WriteAll(); - bool shouldCrypto = encryptq.size() || decryptq.size(); - shouldCrypto &= !m_working; - if(shouldCrypto) - { - Busy(); - llarp_threadpool_queue_job(Router()->tp, {this, &HandleCrypto}); - } - } - void - Busy() - { - m_working.store(true); - } - - void - NotBusy() - { - m_working.store(false); - } - - bool - ReadAll() - { - llarp::util::Lock lock(recv_mtx); - auto itr = recvq.begin(); - while(itr != recvq.end()) - { - auto body = InitBuffer(itr->data() + FragmentOverheadSize, - FragmentBufferSize - FragmentOverheadSize); - uint32_t upper, lower; - if(!(llarp_buffer_read_uint32(&body, &upper) - && llarp_buffer_read_uint32(&body, &lower))) - return false; - bool fragmentEnd = upper == 0; - llarp::LogDebug("fragment size ", lower, " from ", remoteAddr); - if(lower + recvMsgOffset > recvMsg.size()) - { - llarp::LogError("Fragment too big: ", lower, " bytes"); - return false; - } - memcpy(recvMsg.data() + recvMsgOffset, body.cur, lower); - recvMsgOffset += lower; - if(fragmentEnd) - { - // got a message - llarp::LogDebug("end of message from ", remoteAddr); - auto mbuf = InitBuffer(recvMsg.data(), recvMsgOffset); - if(!Router()->HandleRecvLinkMessageBuffer(this, mbuf)) - { - llarp::LogWarn("failed to handle message from ", remoteAddr); - llarp::DumpBuffer(mbuf); - } - recvMsgOffset = 0; - } - itr = recvq.erase(itr); - } - return true; - } - - void - WriteAll() + PumpWrite() { if(!sock) return; - llarp::util::Lock lock(send_mtx); ssize_t expect = 0; std::vector< utp_iovec > vecs; for(const auto& vec : vecq) @@ -211,7 +127,7 @@ namespace llarp llarp::LogDebug("utp_writev wrote=", s, " expect=", expect, " to=", remoteAddr); - while(s > ssize_t(vecq.front().iov_len)) + while(s > vecq.front().iov_len) { s -= vecq.front().iov_len; vecq.pop_front(); @@ -224,16 +140,46 @@ namespace llarp front.iov_base = ((byte_t*)front.iov_base) + s; } } + + /* + while(sendq.size() > 0 && !stalled) + { + ssize_t expect = FragmentBufferSize - sendBufOffset; + ssize_t s = write_ll(sendq.front().data() + sendBufOffset, + expect); if(s != expect) + { + llarp::LogDebug("stalled at offset=", sendBufOffset, " sz=", s, + " to ", remoteAddr); + sendBufOffset += s; + stalled = true; + } + else + { + sendBufOffset = 0; + sendq.pop_front(); + } + } + */ + } + + ssize_t + write_ll(byte_t* buf, size_t sz) + { + if(sock == nullptr) + { + llarp::LogWarn("write_ll failed: no socket"); + return 0; + } + ssize_t s = utp_write(sock, buf, sz); + llarp::LogDebug("write_ll ", s, " of ", sz, " bytes to ", remoteAddr); + return s; } bool VerifyThenDecrypt(byte_t* buf); void - QueueRecvFragment(const byte_t* buf); - - void - QueueSendFragment(const byte_t* ptr, uint32_t sz, bool isLastFragment); + EncryptThenHash(const byte_t* ptr, uint32_t sz, bool isLastFragment); bool QueueWriteBuffers(llarp_buffer_t buf) @@ -247,7 +193,7 @@ namespace llarp while(sz) { uint32_t s = std::min(FragmentBodyPayloadSize, sz); - QueueSendFragment(ptr, s, ((sz - s) == 0)); + EncryptThenHash(ptr, s, ((sz - s) == 0)); ptr += s; sz -= s; } @@ -326,7 +272,8 @@ namespace llarp s -= left; recvBufOffset = 0; ptr += left; - QueueRecvFragment(recvBuf.data()); + if(!VerifyThenDecrypt(recvBuf.data())) + return false; } } // process full fragments @@ -334,7 +281,8 @@ namespace llarp { recvBufOffset = 0; llarp::LogDebug("process full sz=", s); - QueueRecvFragment(ptr); + if(!VerifyThenDecrypt(ptr)) + return false; ptr += FragmentBufferSize; s -= FragmentBufferSize; } @@ -357,8 +305,6 @@ namespace llarp bool IsTimedOut(llarp_time_t now) const { - if(m_working) - return false; if(state == eClose) return true; if(now < lastActive) @@ -392,7 +338,6 @@ namespace llarp { utp_context* _utp_ctx = nullptr; llarp_router* router = nullptr; - static uint64 OnRead(utp_callback_arguments* arg); @@ -581,7 +526,6 @@ namespace llarp } } } - void Stop() { @@ -628,7 +572,6 @@ namespace llarp BaseSession::BaseSession(LinkLayer* p) { - m_working.store(false); parent = p; remoteTransportPubKey.Zero(); recvMsgOffset = 0; @@ -660,7 +603,7 @@ namespace llarp GetPubKey = std::bind(&BaseSession::RemotePubKey, this); lastActive = llarp_time_now_ms(); // Pump = []() {}; - Pump = std::bind(&BaseSession::DoPump, this); + Pump = std::bind(&BaseSession::PumpWrite, this); Tick = std::bind(&BaseSession::TickImpl, this, std::placeholders::_1); SendMessageBuffer = std::bind(&BaseSession::QueueWriteBuffers, this, std::placeholders::_1); @@ -858,7 +801,7 @@ namespace llarp } else if(arg->state == UTP_STATE_WRITABLE) { - session->WriteAll(); + session->PumpWrite(); } else if(arg->state == UTP_STATE_EOF) { @@ -882,61 +825,22 @@ namespace llarp return 0; } - template < typename Queue_t > void - EncryptThenHashQueue(llarp_crypto* crypto, const byte_t* sessionKey, - Queue_t& queue) - { - auto itr = queue.begin(); - while(itr != queue.end()) - { - byte_t* base = itr->data(); - byte_t* nonce = base + FragmentHashSize; - byte_t* body = nonce + FragmentNonceSize; - auto payload = - InitBuffer(body, FragmentBufferSize - FragmentOverheadSize); - - // encrypt - crypto->xchacha20(payload, sessionKey, nonce); - - payload.base = nonce; - payload.cur = payload.base; - payload.sz = FragmentBufferSize - FragmentHashSize; - // key'd hash - crypto->hmac(base, payload, sessionKey); - - ++itr; - } - } + BaseSession::EncryptThenHash(const byte_t* ptr, uint32_t sz, + bool isLastFragment) - void - BaseSession::QueueSendFragment(const byte_t* ptr, uint32_t sz, - bool isLastFragment) { - byte_t* nonce; - byte_t* body; - bool encryptImmediate = false; - if(state == eSessionReady) - { - llarp::util::Lock lock(encryptq_mtx); - encryptq.emplace_back(); - auto& buf = encryptq.back(); - llarp::LogDebug("encrypt then hash ", sz, - " bytes last=", isLastFragment); - buf.Randomize(); - nonce = buf.data() + FragmentHashSize; - } - else - { - llarp::util::Lock sendlock(send_mtx); - sendq.emplace_back(); - auto& buf = sendq.back(); - buf.Randomize(); - nonce = buf.data() + FragmentHashSize; - encryptImmediate = true; - } - - body = nonce + FragmentNonceSize; + sendq.emplace_back(); + auto& buf = sendq.back(); + vecq.emplace_back(); + auto& vec = vecq.back(); + vec.iov_base = buf.data(); + vec.iov_len = FragmentBufferSize; + llarp::LogDebug("encrypt then hash ", sz, " bytes last=", isLastFragment); + buf.Randomize(); + byte_t* nonce = buf.data() + FragmentHashSize; + byte_t* body = nonce + FragmentNonceSize; + byte_t* base = body; if(isLastFragment) htobe32buf(body, 0); else @@ -946,88 +850,17 @@ namespace llarp body += sizeof(uint32_t); memcpy(body, ptr, sz); - if(encryptImmediate) - { - llarp::util::Lock sendlock(send_mtx); - EncryptThenHashQueue(&Router()->crypto, sessionKey, sendq); - vecq.emplace_back(); - vecq.back().iov_base = sendq.back().data(); - vecq.back().iov_len = FragmentBufferSize; - } - } - - template < typename Queue_t > - bool - VerifyThenDecryptQueue(llarp_crypto* crypto, const byte_t* sessionKey, - Queue_t& queue) - { - ShortHash digest; - auto itr = queue.begin(); - while(itr != queue.end()) - { - byte_t* buf = itr->data(); - auto hbuf = InitBuffer(buf + FragmentHashSize, - FragmentBufferSize - FragmentHashSize); - if(crypto->hmac(digest.data(), hbuf, sessionKey)) - { - return false; - } - if(memcmp(digest, buf, FragmentHashSize)) - { - return false; - } - auto body = InitBuffer(buf + FragmentOverheadSize, - FragmentBufferSize - FragmentOverheadSize); - crypto->xchacha20(body, sessionKey, buf + FragmentHashSize); - ++itr; - } - return true; - } + auto payload = + InitBuffer(base, FragmentBufferSize - FragmentOverheadSize); - void - BaseSession::HandleCrypto(void* user) - { - BaseSession* self = static_cast< BaseSession* >(user); - llarp_crypto* crypto = &self->Router()->crypto; // encrypt - { - llarp::util::Lock enclock(self->encryptq_mtx); - EncryptThenHashQueue(crypto, self->sessionKey, self->encryptq); - { - llarp::util::Lock sendlock(self->send_mtx); - while(self->encryptq.size()) - { - self->sendq.emplace_back(); - // uses operator = from aligned buffer - self->sendq.back() = self->encryptq.front(); - self->encryptq.pop_front(); - self->vecq.emplace_back(); - self->vecq.back().iov_base = self->sendq.back().data(); - self->vecq.back().iov_len = FragmentBufferSize; - } - } - } - // decrypt - { - llarp::util::Lock declock(self->decryptq_mtx); - if(VerifyThenDecryptQueue(crypto, self->sessionKey, self->decryptq)) - { - llarp::util::Lock recvlock(self->recv_mtx); - while(self->decryptq.size()) - { - self->recvq.emplace_back(); - // uses operator = from aligned buffer - self->recvq.back() = self->decryptq.front(); - self->decryptq.pop_front(); - } - } - else - { - // TODO: should we post a job instead? - self->Close(); - } - } - self->NotBusy(); + Router()->crypto.xchacha20(payload, sessionKey, nonce); + + payload.base = nonce; + payload.cur = payload.base; + payload.sz = FragmentBufferSize - FragmentHashSize; + // key'd hash + Router()->crypto.hmac(buf.data(), payload, sessionKey); } void @@ -1042,25 +875,61 @@ namespace llarp Alive(); } - void - BaseSession::QueueRecvFragment(const byte_t* buf) + bool + BaseSession::VerifyThenDecrypt(byte_t* buf) { - if(state == eSessionReady) + llarp::LogDebug("verify then decrypt ", remoteAddr); + ShortHash digest; + + auto hbuf = InitBuffer(buf + FragmentHashSize, + FragmentBufferSize - FragmentHashSize); + if(!Router()->crypto.hmac(digest.data(), hbuf, sessionKey)) { - decryptq.emplace_back(); - memcpy(decryptq.back().data(), buf, FragmentBufferSize); + llarp::LogError("keyed hash failed"); + return false; } - else if(state == eLinkEstablished) + ShortHash expected(buf); + if(expected != digest) { - // handshake it - std::deque< FragmentBuffer > handshakeq; - handshakeq.emplace_back(buf); - if(VerifyThenDecryptQueue(&Router()->crypto, sessionKey, handshakeq)) - ReadAll(); - else - Close(); + llarp::LogError("Message Integrity Failed: got ", digest, " from ", + remoteAddr, " instead of ", expected); + llarp::DumpBuffer(InitBuffer(buf, FragmentBufferSize)); + return false; } + + auto body = InitBuffer(buf + FragmentOverheadSize, + FragmentBufferSize - FragmentOverheadSize); + + Router()->crypto.xchacha20(body, sessionKey, buf + FragmentHashSize); + + uint32_t upper, lower; + if(!(llarp_buffer_read_uint32(&body, &upper) + && llarp_buffer_read_uint32(&body, &lower))) + return false; + bool fragmentEnd = upper == 0; + llarp::LogDebug("fragment size ", lower, " from ", remoteAddr); + if(lower + recvMsgOffset > recvMsg.size()) + { + llarp::LogError("Fragment too big: ", lower, " bytes"); + return false; + } + memcpy(recvMsg.data() + recvMsgOffset, body.cur, lower); + recvMsgOffset += lower; + if(fragmentEnd) + { + // got a message + llarp::LogDebug("end of message from ", remoteAddr); + auto mbuf = InitBuffer(recvMsg.data(), recvMsgOffset); + if(!Router()->HandleRecvLinkMessageBuffer(this, mbuf)) + { + llarp::LogWarn("failed to handle message from ", remoteAddr); + llarp::DumpBuffer(mbuf); + } + recvMsgOffset = 0; + } + return true; } + void BaseSession::Close() { diff --git a/llarp/router.cpp b/llarp/router.cpp index 429fce8c5..982f7d06b 100644 --- a/llarp/router.cpp +++ b/llarp/router.cpp @@ -208,10 +208,21 @@ llarp_router::SendToOrQueue(const llarp::RouterID &remote, { llarp::LogWarn("tried to queue a message to ", remote, " but the queue is full so we drop it like it's hawt"); - return false; + } + llarp::RouterContact remoteRC; + // we don't have an open session to that router right now + if(llarp_nodedb_get_rc(nodedb, remote, remoteRC)) + { + // try connecting directly as the rc is loaded from disk + llarp_router_try_connect(this, remoteRC, 10); + return true; } - return TryEstablishTo(remote); + // we don't have the RC locally so do a dht lookup + dht->impl.LookupRouter(remote, + std::bind(&llarp_router::HandleDHTLookupForSendTo, + this, remote, std::placeholders::_1)); + return true; } void @@ -384,7 +395,7 @@ llarp_router::handle_router_ticker(void *user, uint64_t orig, uint64_t left) self->ScheduleTicker(orig); } -bool +void llarp_router::TryEstablishTo(const llarp::RouterID &remote) { llarp::RouterContact rc; @@ -392,25 +403,17 @@ llarp_router::TryEstablishTo(const llarp::RouterID &remote) { // try connecting async llarp_router_try_connect(this, rc, 5); - return true; } - else + else if(!routerProfiling.IsBad(remote)) { - if(routerProfiling.IsBad(remote)) - { - llarp::LogWarn("won't connect to flakey router ", remote); - return false; - } - if(dht->impl.HasRouterLookup(remote)) - return false; + return; llarp::LogInfo("looking up router ", remote); // dht lookup as we don't know it dht->impl.LookupRouter( remote, std::bind(&llarp_router::HandleDHTLookupForTryEstablishTo, this, remote, std::placeholders::_1)); - return true; } } @@ -476,7 +479,7 @@ llarp_router::Tick() if(inboundLinks.size() == 0) { - ssize_t N = llarp_nodedb_num_loaded(nodedb); + auto N = llarp_nodedb_num_loaded(nodedb); if(N < minRequiredRouters) { llarp::LogInfo("We need at least ", minRequiredRouters, diff --git a/llarp/router.hpp b/llarp/router.hpp index d2d55d546..a0ad8df2a 100644 --- a/llarp/router.hpp +++ b/llarp/router.hpp @@ -217,8 +217,7 @@ struct llarp_router DiscardOutboundFor(const llarp::RouterID &remote); /// try establishing a session to a remote router - /// return false if we didn't make a job to do it - bool + void TryEstablishTo(const llarp::RouterID &remote); /// flush outbound message queue