From 25f10d5b11c2416662e3409442d3c1cc208eab22 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 2 Apr 2019 10:03:53 +0100 Subject: [PATCH] Fix more address sanitiser issues --- CMakeLists.txt | 2 +- llarp/iwp/linklayer.cpp | 4 +- llarp/iwp/linklayer.hpp | 9 +- llarp/link/server.cpp | 17 +- llarp/link/server.hpp | 12 +- llarp/link/session.hpp | 51 ++-- llarp/utp/linklayer.cpp | 21 +- llarp/utp/linklayer.hpp | 20 +- llarp/utp/session.cpp | 340 +++++++++++----------- llarp/utp/session.hpp | 112 ++++--- test/util/test_llarp_util_thread_pool.cpp | 4 +- 11 files changed, 318 insertions(+), 274 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 542ef9ea2..b0f1d1539 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -190,7 +190,7 @@ if(NOT DEBIAN) endif(NOT DEBIAN) if(ASAN) - set(DEBUG_FLAGS ${DEBUG_FLAGS} -fsanitize=undefined -fno-omit-frame-pointer) + set(DEBUG_FLAGS ${DEBUG_FLAGS} -fsanitize=address -fno-omit-frame-pointer) set(OPTIMIZE_FLAGS "-O0") endif(ASAN) diff --git a/llarp/iwp/linklayer.cpp b/llarp/iwp/linklayer.cpp index 7a7b9d8b0..39b23c32f 100644 --- a/llarp/iwp/linklayer.cpp +++ b/llarp/iwp/linklayer.cpp @@ -91,14 +91,14 @@ namespace llarp } } - ILinkSession* + std::shared_ptr< ILinkSession > LinkLayer::NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) { (void)rc; (void)ai; // TODO: implement me - return nullptr; + return {}; } void diff --git a/llarp/iwp/linklayer.hpp b/llarp/iwp/linklayer.hpp index c90242dc3..05c8bf09e 100644 --- a/llarp/iwp/linklayer.hpp +++ b/llarp/iwp/linklayer.hpp @@ -21,12 +21,19 @@ namespace llarp TimeoutHandler timeout, SessionClosedHandler closed); ~LinkLayer(); + Crypto *const crypto; + Crypto * + OurCrypto() override + { + return crypto; + } + bool Start(Logic *l) override; - ILinkSession * + std::shared_ptr< ILinkSession > NewOutboundSession(const RouterContact &rc, const AddressInfo &ai) override; diff --git a/llarp/link/server.cpp b/llarp/link/server.cpp index 93359a20f..35925e867 100644 --- a/llarp/link/server.cpp +++ b/llarp/link/server.cpp @@ -143,10 +143,10 @@ namespace llarp { if(m_AuthedLinks.count(pk) > MaxSessionsPerKey) { - s->SendClose(); + s->Close(); return false; } - m_AuthedLinks.emplace(pk, std::move(itr->second)); + m_AuthedLinks.emplace(pk, itr->second); itr = m_Pending.erase(itr); return true; } @@ -206,13 +206,12 @@ namespace llarp if(!PickAddress(rc, to)) return false; llarp::Addr addr(to); - auto s = NewOutboundSession(rc, to); + std::shared_ptr< ILinkSession > s = NewOutboundSession(rc, to); if(PutSession(s)) { s->Start(); return true; } - delete s; return false; } @@ -258,7 +257,7 @@ namespace llarp auto itr = m_AuthedLinks.begin(); while(itr != m_AuthedLinks.end()) { - itr->second->SendClose(); + itr->second->Close(); ++itr; } } @@ -267,7 +266,7 @@ namespace llarp auto itr = m_Pending.begin(); while(itr != m_Pending.end()) { - itr->second->SendClose(); + itr->second->Close(); ++itr; } } @@ -283,7 +282,7 @@ namespace llarp auto itr = range.first; while(itr != range.second) { - itr->second->SendClose(); + itr->second->Close(); itr = m_AuthedLinks.erase(itr); } } @@ -380,14 +379,14 @@ namespace llarp } bool - ILinkLayer::PutSession(ILinkSession* s) + ILinkLayer::PutSession(const std::shared_ptr< ILinkSession >& s) { static constexpr size_t MaxSessionsPerEndpoint = 5; Lock lock(&m_PendingMutex); llarp::Addr addr = s->GetRemoteEndpoint(); if(m_Pending.count(addr) >= MaxSessionsPerEndpoint) return false; - m_Pending.emplace(addr, std::unique_ptr< ILinkSession >(s)); + m_Pending.emplace(addr, s); return true; } diff --git a/llarp/link/server.hpp b/llarp/link/server.hpp index 0d9654d2c..256a3c764 100644 --- a/llarp/link/server.hpp +++ b/llarp/link/server.hpp @@ -11,6 +11,7 @@ #include #include +#include #include namespace llarp @@ -60,6 +61,9 @@ namespace llarp return llarp_ev_loop_time_now_ms(m_Loop); } + virtual Crypto* + OurCrypto() = 0; + bool HasSessionTo(const RouterID& pk); @@ -105,7 +109,7 @@ namespace llarp Configure(llarp_ev_loop* loop, const std::string& ifname, int af, uint16_t port); - virtual ILinkSession* + virtual std::shared_ptr< ILinkSession > NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) = 0; virtual void @@ -225,7 +229,7 @@ namespace llarp using Mutex = util::NullMutex; bool - PutSession(ILinkSession* s); + PutSession(const std::shared_ptr< ILinkSession >& s); llarp::Logic* m_Logic = nullptr; llarp_ev_loop* m_Loop = nullptr; @@ -234,10 +238,10 @@ namespace llarp SecretKey m_SecretKey; using AuthedLinks = - std::unordered_multimap< RouterID, std::unique_ptr< ILinkSession >, + std::unordered_multimap< RouterID, std::shared_ptr< ILinkSession >, RouterID::Hash >; using Pending = - std::unordered_multimap< llarp::Addr, std::unique_ptr< ILinkSession >, + std::unordered_multimap< llarp::Addr, std::shared_ptr< ILinkSession >, llarp::Addr::Hash >; Mutex m_AuthedLinksMutex ACQUIRED_BEFORE(m_PendingMutex); diff --git a/llarp/link/session.hpp b/llarp/link/session.hpp index fb112d347..8e5fa8d7e 100644 --- a/llarp/link/session.hpp +++ b/llarp/link/session.hpp @@ -17,53 +17,70 @@ namespace llarp { virtual ~ILinkSession(){}; + /// hook for utp for when we have established a connection + virtual void + OnLinkEstablished(ILinkLayer *p) = 0; + /// called every event loop tick - std::function< void(void) > Pump; + virtual void + Pump() = 0; /// called every timer tick - std::function< void(llarp_time_t) > Tick; + virtual void Tick(llarp_time_t) = 0; /// send a message buffer to the remote endpoint - std::function< bool(const llarp_buffer_t &) > SendMessageBuffer; + virtual bool + SendMessageBuffer(const llarp_buffer_t &) = 0; /// start the connection - std::function< void(void) > Start; + virtual void + Start() = 0; - /// send a keepalive to the remote endpoint - std::function< bool(void) > SendKeepAlive; + virtual void + Close() = 0; - /// send close message - std::function< void(void) > SendClose; + /// send a keepalive to the remote endpoint + virtual bool + SendKeepAlive() = 0; /// return true if we are established - std::function< bool(void) > IsEstablished; + virtual bool + IsEstablished() = 0; /// return true if this session has timed out - std::function< bool(llarp_time_t) > TimedOut; + virtual bool + TimedOut(llarp_time_t now) const = 0; /// get remote public identity key - std::function< PubKey(void) > GetPubKey; + virtual PubKey + GetPubKey() const = 0; /// get remote address - std::function< Addr(void) > GetRemoteEndpoint; + virtual Addr + GetRemoteEndpoint() const = 0; // get remote rc - std::function< RouterContact(void) > GetRemoteRC; + virtual RouterContact + GetRemoteRC() const = 0; /// handle a valid LIM std::function< bool(const LinkIntroMessage *msg) > GotLIM; /// send queue current blacklog - std::function< size_t(void) > SendQueueBacklog; + virtual size_t + SendQueueBacklog() const = 0; /// get parent link layer - std::function< ILinkLayer *(void) > GetLinkLayer; + virtual ILinkLayer * + GetLinkLayer() const = 0; /// renegotiate session when we have a new RC locally - std::function< bool(void) > RenegotiateSession; + virtual bool + RenegotiateSession() = 0; /// return true if we should send an explicit keepalive message - std::function< bool(void) > ShouldPing; + virtual bool + ShouldPing() const = 0; }; } // namespace llarp diff --git a/llarp/utp/linklayer.cpp b/llarp/utp/linklayer.cpp index 29a90c8d4..96eb18d69 100644 --- a/llarp/utp/linklayer.cpp +++ b/llarp/utp/linklayer.cpp @@ -24,12 +24,6 @@ namespace llarp { namespace utp { - Crypto* - LinkLayer::OurCrypto() - { - return _crypto; - } - uint64 LinkLayer::OnConnect(utp_callback_arguments* arg) { @@ -175,6 +169,8 @@ namespace llarp LinkLayer::~LinkLayer() { + m_Pending.clear(); + m_AuthedLinks.clear(); utp_destroy(_utp_ctx); } @@ -302,7 +298,7 @@ namespace llarp void LinkLayer::Stop() { - ForEachSession([](ILinkSession* s) { s->SendClose(); }); + ForEachSession([](ILinkSession* s) { s->Close(); }); } bool @@ -331,11 +327,12 @@ namespace llarp return "utp"; } - ILinkSession* + std::shared_ptr< ILinkSession > LinkLayer::NewOutboundSession(const RouterContact& rc, const AddressInfo& addr) { - return new Session(this, utp_create_socket(_utp_ctx), rc, addr); + return std::make_shared< OutboundSession >( + this, utp_create_socket(_utp_ctx), rc, addr); } uint64 @@ -391,14 +388,16 @@ namespace llarp static_cast< LinkLayer* >(utp_context_get_userdata(arg->context)); Addr remote(*arg->address); LogDebug("utp accepted from ", remote); - Session* session = new Session(self, arg->socket, remote); + std::shared_ptr< ILinkSession > session = + std::make_shared< InboundSession >(self, arg->socket, remote); if(!self->PutSession(session)) { session->Close(); - delete session; } else + { session->OnLinkEstablished(self); + } return 0; } diff --git a/llarp/utp/linklayer.hpp b/llarp/utp/linklayer.hpp index 69d47dd24..809fae00c 100644 --- a/llarp/utp/linklayer.hpp +++ b/llarp/utp/linklayer.hpp @@ -58,11 +58,11 @@ namespace llarp /// get AI rank uint16_t - Rank() const; + Rank() const override; /// handle low level recv void - RecvFrom(const Addr& from, const void* buf, size_t sz); + RecvFrom(const Addr& from, const void* buf, size_t sz) override; #ifdef __linux__ /// process ICMP stuff on linux @@ -71,11 +71,14 @@ namespace llarp #endif Crypto* - OurCrypto(); + OurCrypto() override + { + return _crypto; + } /// pump sessions void - Pump(); + Pump() override; /// stop link layer void @@ -83,15 +86,16 @@ namespace llarp /// regenerate transport keypair bool - KeyGen(SecretKey& k); + KeyGen(SecretKey& k) override; /// do tick void Tick(llarp_time_t now); /// create new outbound session - ILinkSession* - NewOutboundSession(const RouterContact& rc, const AddressInfo& addr); + std::shared_ptr< ILinkSession > + NewOutboundSession(const RouterContact& rc, + const AddressInfo& addr) override; /// create new socket utp_socket* @@ -99,7 +103,7 @@ namespace llarp /// get ai name const char* - Name() const; + Name() const override; }; } // namespace utp diff --git a/llarp/utp/session.cpp b/llarp/utp/session.cpp index 975d3a3c8..84a66f352 100644 --- a/llarp/utp/session.cpp +++ b/llarp/utp/session.cpp @@ -12,7 +12,7 @@ namespace llarp using namespace std::placeholders; void - Session::OnLinkEstablished(LinkLayer* p) + Session::OnLinkEstablished(ILinkLayer* p) { parent = p; EnterState(eLinkEstablished); @@ -79,13 +79,6 @@ namespace llarp } } - void - Session::Connect() - { - utp_connect(sock, remoteAddr, remoteAddr.SockLen()); - EnterState(eConnecting); - } - void Session::OutboundLinkEstablished(LinkLayer* p) { @@ -136,7 +129,7 @@ namespace llarp } void - Session::TickImpl(llarp_time_t now) + Session::Tick(llarp_time_t now) { PruneInboundMessages(now); m_TXRate = 0; @@ -194,7 +187,7 @@ namespace llarp } bool - Session::IsTimedOut(llarp_time_t now) const + Session::TimedOut(llarp_time_t now) const { if(state == eInitial) return false; @@ -206,14 +199,14 @@ namespace llarp return state == eClose; } - const PubKey& - Session::RemotePubKey() const + PubKey + Session::GetPubKey() const { return remoteRC.pubkey; } Addr - Session::RemoteEndpoint() + Session::GetRemoteEndpoint() const { return remoteAddr; } @@ -227,163 +220,27 @@ namespace llarp parent = p; remoteTransportPubKey.Zero(); - SendQueueBacklog = [&]() -> size_t { return sendq.size(); }; - - ShouldPing = [&]() -> bool { - auto dlt = parent->Now() - lastActive; - return dlt >= 10000; - }; - - SendKeepAlive = [&]() -> bool { - if(state == eSessionReady) - { - DiscardMessage msg; - std::array< byte_t, 128 > tmp; - llarp_buffer_t buf(tmp); - if(!msg.BEncode(&buf)) - return false; - buf.sz = buf.cur - buf.base; - buf.cur = buf.base; - return this->QueueWriteBuffers(buf); - } - return true; - }; gotLIM = false; recvBufOffset = 0; - TimedOut = std::bind(&Session::IsTimedOut, this, std::placeholders::_1); - GetPubKey = std::bind(&Session::RemotePubKey, this); - GetRemoteRC = [&]() -> RouterContact { return this->remoteRC; }; - GetLinkLayer = std::bind(&Session::GetParent, this); lastActive = parent->Now(); - - Pump = std::bind(&Session::DoPump, this); - Tick = std::bind(&Session::TickImpl, this, std::placeholders::_1); - SendMessageBuffer = - std::bind(&Session::QueueWriteBuffers, this, std::placeholders::_1); - - IsEstablished = [=]() { - return this->state == eSessionReady || this->state == eLinkEstablished; - }; - - SendClose = std::bind(&Session::Close, this); - GetRemoteEndpoint = std::bind(&Session::RemoteEndpoint, this); - RenegotiateSession = std::bind(&Session::Rehandshake, this); - } - - /// outbound session - Session::Session(LinkLayer* p, utp_socket* s, const RouterContact& rc, - const AddressInfo& addr) - : Session(p) - { - remoteTransportPubKey = addr.pubkey; - remoteRC = rc; - RouterID rid = remoteRC.pubkey; - OurCrypto()->shorthash(txKey, llarp_buffer_t(rid)); - rid = p->GetOurRC().pubkey; - OurCrypto()->shorthash(rxKey, llarp_buffer_t(rid)); - - sock = s; - assert(utp_set_userdata(sock, this) == this); - assert(s == sock); - remoteAddr = addr; - Start = std::bind(&Session::Connect, this); - GotLIM = std::bind(&Session::OutboundLIM, this, std::placeholders::_1); } - /// inbound session - Session::Session(LinkLayer* p, utp_socket* s, const Addr& addr) : Session(p) + bool + Session::ShouldPing() const { - RouterID rid = p->GetOurRC().pubkey; - OurCrypto()->shorthash(rxKey, llarp_buffer_t(rid)); - remoteRC.Clear(); - sock = s; - assert(s == sock); - assert(utp_set_userdata(sock, this) == this); - remoteAddr = addr; - Start = []() {}; - GotLIM = std::bind(&Session::InboundLIM, this, std::placeholders::_1); - } + auto dlt = parent->Now() - lastActive; + return dlt >= 10000; + }; ILinkLayer* - Session::GetParent() + Session::GetLinkLayer() const { return parent; } - bool - Session::InboundLIM(const LinkIntroMessage* msg) - { - if(gotLIM && remoteRC.pubkey != msg->rc.pubkey) - { - Close(); - return false; - } - if(!gotLIM) - { - remoteRC = msg->rc; - OurCrypto()->shorthash(txKey, llarp_buffer_t(remoteRC.pubkey)); - - if(!DoKeyExchange(std::bind(&Crypto::transport_dh_server, OurCrypto(), - _1, _2, _3, _4), - rxKey, msg->N, remoteRC.enckey, - parent->TransportSecretKey())) - return false; - - std::array< byte_t, LinkIntroMessage::MaxSize > tmp; - llarp_buffer_t buf(tmp); - LinkIntroMessage replymsg; - replymsg.rc = parent->GetOurRC(); - if(!replymsg.rc.Verify(OurCrypto(), parent->Now())) - { - LogError("our RC is invalid? closing session to", remoteAddr); - Close(); - return false; - } - replymsg.N.Randomize(); - replymsg.P = DefaultLinkSessionLifetime; - if(!replymsg.Sign(parent->Sign)) - { - LogError("failed to sign LIM for inbound handshake from ", - remoteAddr); - Close(); - return false; - } - // encode - if(!replymsg.BEncode(&buf)) - { - LogError("failed to encode LIM for handshake from ", remoteAddr); - Close(); - return false; - } - // rewind - buf.sz = buf.cur - buf.base; - buf.cur = buf.base; - // send - if(!SendMessageBuffer(buf)) - { - LogError("failed to repl to handshake from ", remoteAddr); - Close(); - return false; - } - if(!DoKeyExchange(std::bind(&Crypto::transport_dh_client, OurCrypto(), - _1, _2, _3, _4), - txKey, replymsg.N, remoteRC.enckey, - parent->RouterEncryptionSecret())) - - return false; - LogDebug("Sent reply LIM"); - gotLIM = true; - EnterState(eSessionReady); - /// future LIM are used for session renegotiation - GotLIM = std::bind(&Session::GotSessionRenegotiate, this, - std::placeholders::_1); - } - return true; - } - void - Session::DoPump() + Session::Pump() { // pump write queue PumpWrite(); @@ -392,11 +249,11 @@ namespace llarp } bool - Session::QueueWriteBuffers(const llarp_buffer_t& buf) + Session::SendMessageBuffer(const llarp_buffer_t& buf) { if(sendq.size() >= MaxSendQueueSize) return false; - // premptive pump + // preemptive pump if(sendq.size() >= MaxSendQueueSize / 2) PumpWrite(); size_t sz = buf.sz; @@ -418,27 +275,19 @@ namespace llarp } bool - Session::OutboundLIM(const LinkIntroMessage* msg) + Session::SendKeepAlive() { - if(gotLIM && remoteRC.pubkey != msg->rc.pubkey) - { - return false; - } - remoteRC = msg->rc; - gotLIM = true; - - if(!DoKeyExchange(std::bind(&Crypto::transport_dh_server, OurCrypto(), _1, - _2, _3, _4), - rxKey, msg->N, remoteRC.enckey, - parent->RouterEncryptionSecret())) + if(state == eSessionReady) { - Close(); - return false; + DiscardMessage msg; + std::array< byte_t, 128 > tmp; + llarp_buffer_t buf(tmp); + if(!msg.BEncode(&buf)) + return false; + buf.sz = buf.cur - buf.base; + buf.cur = buf.base; + return this->SendMessageBuffer(buf); } - /// future LIM are used for session renegotiation - GotLIM = std::bind(&Session::GotSessionRenegotiate, this, - std::placeholders::_1); - EnterState(eSessionReady); return true; } @@ -593,7 +442,7 @@ namespace llarp } bool - Session::Rehandshake() + Session::RenegotiateSession() { LinkIntroMessage lim; lim.rc = parent->GetOurRC(); @@ -741,5 +590,142 @@ namespace llarp { lastActive = parent->Now(); } + + InboundSession::InboundSession(LinkLayer* p, utp_socket* s, + const Addr& addr) + : Session(p) + { + sock = s; + remoteAddr = addr; + RouterID rid = p->GetOurRC().pubkey; + OurCrypto()->shorthash(rxKey, llarp_buffer_t(rid)); + remoteRC.Clear(); + + assert(s == sock); + assert(utp_set_userdata(sock, this) == this); + GotLIM = std::bind(&InboundSession::InboundLIM, this, _1); + } + + bool + InboundSession::InboundLIM(const LinkIntroMessage* msg) + { + if(gotLIM && remoteRC.pubkey != msg->rc.pubkey) + { + Close(); + return false; + } + if(!gotLIM) + { + remoteRC = msg->rc; + OurCrypto()->shorthash(txKey, llarp_buffer_t(remoteRC.pubkey)); + + if(!DoKeyExchange(std::bind(&Crypto::transport_dh_server, OurCrypto(), + _1, _2, _3, _4), + rxKey, msg->N, remoteRC.enckey, + parent->TransportSecretKey())) + return false; + + std::array< byte_t, LinkIntroMessage::MaxSize > tmp; + llarp_buffer_t buf(tmp); + LinkIntroMessage replymsg; + replymsg.rc = parent->GetOurRC(); + if(!replymsg.rc.Verify(OurCrypto(), parent->Now())) + { + LogError("our RC is invalid? closing session to", remoteAddr); + Close(); + return false; + } + replymsg.N.Randomize(); + replymsg.P = DefaultLinkSessionLifetime; + if(!replymsg.Sign(parent->Sign)) + { + LogError("failed to sign LIM for inbound handshake from ", + remoteAddr); + Close(); + return false; + } + // encode + if(!replymsg.BEncode(&buf)) + { + LogError("failed to encode LIM for handshake from ", remoteAddr); + Close(); + return false; + } + // rewind + buf.sz = buf.cur - buf.base; + buf.cur = buf.base; + // send + if(!SendMessageBuffer(buf)) + { + LogError("failed to repl to handshake from ", remoteAddr); + Close(); + return false; + } + if(!DoKeyExchange(std::bind(&Crypto::transport_dh_client, OurCrypto(), + _1, _2, _3, _4), + txKey, replymsg.N, remoteRC.enckey, + parent->RouterEncryptionSecret())) + + return false; + LogDebug("Sent reply LIM"); + gotLIM = true; + EnterState(eSessionReady); + /// future LIM are used for session renegotiation + GotLIM = std::bind(&Session::GotSessionRenegotiate, this, _1); + } + return true; + } + + OutboundSession::OutboundSession(LinkLayer* p, utp_socket* s, + const RouterContact& rc, + const AddressInfo& addr) + : Session(p) + { + remoteTransportPubKey = addr.pubkey; + remoteRC = rc; + sock = s; + remoteAddr = addr; + + RouterID rid = remoteRC.pubkey; + OurCrypto()->shorthash(txKey, llarp_buffer_t(rid)); + rid = p->GetOurRC().pubkey; + OurCrypto()->shorthash(rxKey, llarp_buffer_t(rid)); + + assert(utp_set_userdata(sock, this) == this); + assert(s == sock); + + GotLIM = std::bind(&OutboundSession::OutboundLIM, this, _1); + } + + void + OutboundSession::Start() + { + utp_connect(sock, remoteAddr, remoteAddr.SockLen()); + EnterState(eConnecting); + } + + bool + OutboundSession::OutboundLIM(const LinkIntroMessage* msg) + { + if(gotLIM && remoteRC.pubkey != msg->rc.pubkey) + { + return false; + } + remoteRC = msg->rc; + gotLIM = true; + + if(!DoKeyExchange(std::bind(&Crypto::transport_dh_server, OurCrypto(), _1, + _2, _3, _4), + rxKey, msg->N, remoteRC.enckey, + parent->RouterEncryptionSecret())) + { + Close(); + return false; + } + /// future LIM are used for session renegotiation + GotLIM = std::bind(&Session::GotSessionRenegotiate, this, _1); + EnterState(eSessionReady); + return true; + } } // namespace utp } // namespace llarp diff --git a/llarp/utp/session.hpp b/llarp/utp/session.hpp index a0bf86b66..41f906b65 100644 --- a/llarp/utp/session.hpp +++ b/llarp/utp/session.hpp @@ -13,14 +13,14 @@ namespace llarp { struct LinkLayer; - struct Session final : public ILinkSession + struct Session : public ILinkSession { /// remote router's rc RouterContact remoteRC; /// underlying socket utp_socket* sock; /// link layer parent - LinkLayer* parent; + ILinkLayer* parent; /// did we get a LIM from the remote yet? bool gotLIM; /// remote router's transport pubkey @@ -68,15 +68,10 @@ namespace llarp util::StatusObject ExtractStatus() const override; - /// base - Session(LinkLayer* p); - - /// outbound - Session(LinkLayer* p, utp_socket* s, const RouterContact& rc, - const AddressInfo& addr); + virtual ~Session() = 0; - /// inbound - Session(LinkLayer* p, utp_socket* s, const Addr& remote); + /// base + explicit Session(LinkLayer* p); enum State { @@ -88,41 +83,49 @@ namespace llarp eClose // utp connection is closed }; - /// get router - // Router* - // Router(); - - Crypto* - OurCrypto(); - /// session state, call EnterState(State) to set State state; /// hook for utp for when we have established a connection void - OnLinkEstablished(LinkLayer* p); + OnLinkEstablished(ILinkLayer* p) override; + + Crypto* + OurCrypto(); /// switch states void EnterState(State st); - Session(); - ~Session(); - /// handle LIM after handshake bool GotSessionRenegotiate(const LinkIntroMessage* msg); /// re negotiate session with our new local RC bool - Rehandshake(); + RenegotiateSession() override; + + bool + ShouldPing() const override; /// pump tx queue void PumpWrite(); void - DoPump(); + Pump() override; + + bool + SendKeepAlive() override; + + bool + IsEstablished() override + { + return state == eSessionReady || state == eLinkEstablished; + } + + bool + TimedOut(llarp_time_t now) const override; /// verify a fragment buffer and the decrypt it /// buf is assumed to be FragmentBufferSize bytes long @@ -136,7 +139,7 @@ namespace llarp /// queue a fully formed message bool - QueueWriteBuffers(const llarp_buffer_t& buf); + SendMessageBuffer(const llarp_buffer_t& buf) override; /// prune expired inbound messages void @@ -165,42 +168,67 @@ namespace llarp MutateKey(SharedSecret& K, const AlignedBuffer< 24 >& A); void - TickImpl(llarp_time_t now); + Tick(llarp_time_t now) override; /// close session void - Close(); + Close() override; /// low level read bool Recv(const byte_t* buf, size_t sz); - /// handle inbound LIM - bool - InboundLIM(const LinkIntroMessage* msg); - - /// handle outbound LIM - bool - OutboundLIM(const LinkIntroMessage* msg); - - /// return true if timed out - bool - IsTimedOut(llarp_time_t now) const; - /// get remote identity pubkey - const PubKey& - RemotePubKey() const; + PubKey + GetPubKey() const override; /// get remote address Addr - RemoteEndpoint(); + GetRemoteEndpoint() const override; + + RouterContact + GetRemoteRC() const override + { + return remoteRC; + } /// get parent link ILinkLayer* - GetParent(); + GetLinkLayer() const override; void MarkEstablished(); + + size_t + SendQueueBacklog() const override + { + return sendq.size(); + } + }; + + struct InboundSession final : public Session + { + InboundSession(LinkLayer* p, utp_socket* s, const Addr& addr); + + bool + InboundLIM(const LinkIntroMessage* msg); + + void + Start() override + { + } + }; + + struct OutboundSession final : public Session + { + OutboundSession(LinkLayer* p, utp_socket* s, const RouterContact& rc, + const AddressInfo& addr); + + bool + OutboundLIM(const LinkIntroMessage* msg); + + void + Start() override; }; } // namespace utp } // namespace llarp diff --git a/test/util/test_llarp_util_thread_pool.cpp b/test/util/test_llarp_util_thread_pool.cpp index 8240589ee..8b58ce49f 100644 --- a/test/util/test_llarp_util_thread_pool.cpp +++ b/test/util/test_llarp_util_thread_pool.cpp @@ -419,11 +419,11 @@ TEST(TestThreadPool, recurseJob) static constexpr size_t depth = 10; static constexpr size_t capacity = 100; - ThreadPool pool(threads, capacity); - util::Barrier barrier(threads + 1); std::atomic_size_t counter{0}; + ThreadPool pool(threads, capacity); + pool.start(); ASSERT_TRUE(pool.addJob(std::bind(recurse, std::ref(barrier),