diff --git a/CMakeLists.txt b/CMakeLists.txt index 3600a96ff..68228cb9a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -611,6 +611,7 @@ set(TEST_SRC test/obtain_exit_unittest.cpp test/pq_unittest.cpp test/net_unittest.cpp + test/utp_unittest.cpp test/test_dns_unit.cpp test/test_dnsc_unit.cpp test/test_dnsd_unit.cpp diff --git a/contrib/node/.gitignore b/contrib/node/.gitignore new file mode 100644 index 000000000..2a232c084 --- /dev/null +++ b/contrib/node/.gitignore @@ -0,0 +1,3 @@ +node_modules +build +*.log \ No newline at end of file diff --git a/llarp/crypto.hpp b/llarp/crypto.hpp index a3c2b034a..1a579c65e 100644 --- a/llarp/crypto.hpp +++ b/llarp/crypto.hpp @@ -54,6 +54,10 @@ namespace llarp using sym_cipher_func = std::function< bool(llarp_buffer_t, const byte_t *, const byte_t *) >; + /// SD/SE(dst, src, key, nonce) + using sym_ciper_alt_func = std::function< bool( + llarp_buffer_t, llarp_buffer_t, const byte_t *, const byte_t *) >; + /// H(result, body) using hash_func = std::function< bool(byte_t *, llarp_buffer_t) >; @@ -77,6 +81,8 @@ namespace llarp { /// xchacha symettric cipher sym_cipher_func xchacha20; + /// xchacha symettric cipher (multibuffer) + sym_ciper_alt_func xchacha20_alt; /// path dh creator's side path_dh_func dh_client; /// path dh relay side diff --git a/llarp/crypto_libsodium.cpp b/llarp/crypto_libsodium.cpp index 43d1c3df5..2560fbb17 100644 --- a/llarp/crypto_libsodium.cpp +++ b/llarp/crypto_libsodium.cpp @@ -24,6 +24,15 @@ namespace llarp == 0; } + static bool + xchacha20_alt(llarp_buffer_t out, llarp_buffer_t in, const byte_t *k, + const byte_t *n) + { + if(in.sz > out.sz) + return false; + return crypto_stream_xchacha20_xor(out.base, in.base, in.sz, n, k) == 0; + } + static bool dh(uint8_t *out, const uint8_t *client_pk, const uint8_t *server_pk, const uint8_t *themPub, const uint8_t *usSec) @@ -183,6 +192,7 @@ namespace llarp else ntru_init(0); this->xchacha20 = llarp::sodium::xchacha20; + this->xchacha20_alt = llarp::sodium::xchacha20_alt; this->dh_client = llarp::sodium::dh_client; this->dh_server = llarp::sodium::dh_server; this->transport_dh_client = llarp::sodium::dh_client; diff --git a/llarp/link/server.cpp b/llarp/link/server.cpp index b1ecb5b1a..7b0d5f0af 100644 --- a/llarp/link/server.cpp +++ b/llarp/link/server.cpp @@ -3,6 +3,20 @@ namespace llarp { + ILinkLayer::ILinkLayer(const byte_t* routerEncSecret, GetRCFunc getrc, + LinkMessageHandler handler, SignBufferFunc signbuf, + SessionEstablishedHandler establishedSession, + TimeoutHandler timeout, SessionClosedHandler closed) + : HandleMessage(handler) + , HandleTimeout(timeout) + , Sign(signbuf) + , GetOurRC(getrc) + , SessionEstablished(establishedSession) + , SessionClosed(closed) + , m_RouterEncSecret(routerEncSecret) + { + } + ILinkLayer::~ILinkLayer() { } @@ -48,7 +62,7 @@ namespace llarp void ILinkLayer::Pump() { - auto _now = now(); + auto _now = Now(); { Lock lock(m_AuthedLinksMutex); auto itr = m_AuthedLinks.begin(); @@ -246,6 +260,12 @@ namespace llarp return m_SecretKey; } + bool + ILinkLayer::GenEphemeralKeys() + { + return KeyGen(m_SecretKey); + } + bool ILinkLayer::EnsureKeys(const char* f) { @@ -279,7 +299,7 @@ namespace llarp void ILinkLayer::OnTick(uint64_t interval) { - Tick(now()); + Tick(Now()); ScheduleTick(interval); } diff --git a/llarp/link/server.hpp b/llarp/link/server.hpp index c03886763..71de9bf16 100644 --- a/llarp/link/server.hpp +++ b/llarp/link/server.hpp @@ -14,17 +14,40 @@ namespace llarp { - struct Router; + /// handle a link layer message + using LinkMessageHandler = + std::function< bool(ILinkSession*, llarp_buffer_t) >; + + /// sign a buffer with identity key + using SignBufferFunc = std::function< bool(Signature&, llarp_buffer_t) >; + + /// handle connection timeout + using TimeoutHandler = std::function< void(ILinkSession*) >; + + /// get our RC + using GetRCFunc = std::function< const llarp::RouterContact&(void) >; + + /// handler of session established + using SessionEstablishedHandler = std::function< void(ILinkSession*) >; + + /// handles close of all sessions with pubkey + using SessionClosedHandler = std::function< void(llarp::RouterID) >; struct ILinkLayer { + ILinkLayer(const byte_t* routerEncSecret, GetRCFunc getrc, + LinkMessageHandler handler, SignBufferFunc signFunc, + SessionEstablishedHandler sessionEstablish, + TimeoutHandler timeout, SessionClosedHandler closed); virtual ~ILinkLayer(); + /// get current time via event loop llarp_time_t - now() const + Now() const { return llarp_ev_loop_time_now_ms(m_Loop); } + bool HasSessionTo(const byte_t* pk); @@ -102,20 +125,35 @@ namespace llarp const byte_t* TransportPubKey() const; + const byte_t* + RouterEncryptionSecret() const + { + return m_RouterEncSecret; + } + const byte_t* TransportSecretKey() const; bool EnsureKeys(const char* fpath); + bool + GenEphemeralKeys(); + void MapAddr(const byte_t* pk, ILinkSession* s); - virtual void - Tick(__attribute__((unused)) llarp_time_t now) + virtual void Tick(llarp_time_t) { } + LinkMessageHandler HandleMessage; + TimeoutHandler HandleTimeout; + SignBufferFunc Sign; + GetRCFunc GetOurRC; + SessionEstablishedHandler SessionEstablished; + SessionClosedHandler SessionClosed; + private: static void on_timer_tick(void* user, uint64_t orig, uint64_t left) @@ -133,6 +171,7 @@ namespace llarp ScheduleTick(uint64_t interval); uint32_t tick_id; + const byte_t* m_RouterEncSecret; protected: using Lock = util::NullLock; diff --git a/llarp/link/session.hpp b/llarp/link/session.hpp index f0d91cfb4..28ec3c3c6 100644 --- a/llarp/link/session.hpp +++ b/llarp/link/session.hpp @@ -43,14 +43,21 @@ namespace llarp /// get remote public identity key std::function< const byte_t *(void) > GetPubKey; + /// get remote address std::function< const Addr &(void) > GetRemoteEndpoint; + // get remote rc + std::function< const llarp::RouterContact &(void) > GetRemoteRC; + /// handle a valid LIM std::function< bool(const LinkIntroMessage *msg) > GotLIM; /// send queue current blacklog std::function< size_t(void) > SendQueueBacklog; + + /// get parent link layer + std::function< ILinkLayer *(void) > GetLinkLayer; }; } // namespace llarp diff --git a/llarp/link/utp.cpp b/llarp/link/utp.cpp index 9f095d0ca..5409d1bbf 100644 --- a/llarp/link/utp.cpp +++ b/llarp/link/utp.cpp @@ -5,11 +5,6 @@ #include #include #include -#include - -#include -#include -#include #ifdef __linux__ #include @@ -26,630 +21,544 @@ #define IP_DONTFRAGMENT IP_DONTFRAG #endif +#include + namespace llarp { namespace utp { - constexpr size_t FragmentHashSize = 32; - constexpr size_t FragmentNonceSize = 32; - constexpr size_t FragmentOverheadSize = - FragmentHashSize + FragmentNonceSize; - constexpr size_t FragmentBodyPayloadSize = 1024; - constexpr size_t FragmentBodyOverhead = 32; - constexpr size_t FragmentBodySize = - FragmentBodyOverhead + FragmentBodyPayloadSize; - - constexpr size_t FragmentBufferSize = - FragmentOverheadSize + FragmentBodySize; - typedef llarp::AlignedBuffer< FragmentBufferSize > FragmentBuffer; - - /// maximum size for send queue for a session before we drop - constexpr size_t MaxSendQueueSize = 1024; - - using MessageBuffer = llarp::AlignedBuffer< MAX_LINK_MSG_SIZE >; - - struct LinkLayer; - - /// pending inbound message being received - struct InboundMessage + bool + InboundMessage::IsExpired(llarp_time_t now) const { - llarp_time_t lastActive; - MessageBuffer msg; - - llarp_buffer_t buffer = llarp::Buffer(msg); - - /// return true if this inbound message can be removed due to expiration - bool - IsExpired(llarp_time_t now) const - { - return now > lastActive && now - lastActive >= 2000; - } - }; + return now > lastActive && now - lastActive >= 2000; + } - struct BaseSession : public ILinkSession + bool + InboundMessage::AppendData(const byte_t* ptr, uint16_t sz) { - RouterContact remoteRC; - utp_socket* sock; - LinkLayer* parent; - bool gotLIM; - PubKey remoteTransportPubKey; - Addr remoteAddr; - SharedSecret rxKey; - SharedSecret txKey; - llarp_time_t lastActive; - const static llarp_time_t sessionTimeout = 30 * 1000; - - /// send queue for utp - std::deque< utp_iovec > vecq; - /// current fragments waiting to be sent - std::deque< FragmentBuffer > sendq; - - /// current fragment buffer - FragmentBuffer recvBuf; - /// current offset in current fragment buffer - size_t recvBufOffset; - - /// messages we are recving right now - std::unordered_map< uint32_t, InboundMessage > m_RecvMsgs; - - /// are we stalled or nah? - bool stalled = false; - - /// mark session as alive - void - Alive(); - - /// base - BaseSession(LinkLayer* p); - - /// outbound - BaseSession(LinkLayer* p, utp_socket* s, const RouterContact& rc, - const AddressInfo& addr); - - /// inbound - BaseSession(LinkLayer* p, utp_socket* s, const Addr& remote); + if(llarp_buffer_size_left(buffer) < sz) + return false; + memcpy(buffer.cur, ptr, sz); + buffer.cur += sz; + return true; + } - enum State - { - eInitial, - eConnecting, - eLinkEstablished, // when utp connection is established - eCryptoHandshake, // crypto handshake initiated - eSessionReady, // session is ready - eClose // utp connection is closed - }; + void + Session::OnLinkEstablished(LinkLayer* p) + { + parent = p; + EnterState(eLinkEstablished); + llarp::LogDebug("link established with ", remoteAddr); + } - llarp::Router* - Router(); + llarp::Crypto* + Session::Crypto() + { + return parent->Crypto(); + } - State state; + llarp::Crypto* + LinkLayer::Crypto() + { + return _crypto; + } - /// hook for utp - void - OnLinkEstablished(LinkLayer* p) + /// pump tx queue + void + Session::PumpWrite() + { + if(!sock) + return; + ssize_t expect = 0; + std::vector< utp_iovec > vecs; + for(const auto& vec : vecq) { - parent = p; - EnterState(eLinkEstablished); - llarp::LogDebug("link established with ", remoteAddr); + expect += vec.iov_len; + vecs.push_back(vec); } - - void - EnterState(State st); - - BaseSession(); - ~BaseSession(); - - /// pump outbound send queue - void - PumpWrite() + if(expect) { - if(!sock) - return; - ssize_t expect = 0; - std::vector< utp_iovec > vecs; - for(const auto& vec : vecq) + ssize_t s = utp_writev(sock, vecs.data(), vecs.size()); + + while(s > static_cast< ssize_t >(vecq.front().iov_len)) { - expect += vec.iov_len; - vecs.push_back(vec); + s -= vecq.front().iov_len; + vecq.pop_front(); + sendq.pop_front(); } - if(expect) + if(vecq.size()) { - ssize_t s = utp_writev(sock, vecs.data(), vecs.size()); - llarp::LogDebug("utp_writev wrote=", s, " expect=", expect, - " to=", remoteAddr); - - while(s > static_cast< ssize_t >(vecq.front().iov_len)) - { - s -= vecq.front().iov_len; - vecq.pop_front(); - sendq.pop_front(); - } - if(vecq.size()) - { - auto& front = vecq.front(); - front.iov_len -= s; - front.iov_base = ((byte_t*)front.iov_base) + s; - } + auto& front = vecq.front(); + front.iov_len -= s; + front.iov_base = ((byte_t*)front.iov_base) + s; } } + } - /// verify a fragment buffer and the decrypt it - bool - VerifyThenDecrypt(byte_t* buf); - - /// encrypt a fragment then hash the ciphertext - void - EncryptThenHash(const byte_t* ptr, uint32_t sz, bool isLastFragment); - - /// queue a fully formed message - bool - QueueWriteBuffers(llarp_buffer_t buf); - - /// do low level connect - void - Connect() + /// prune expired inbound messages + void + Session::PruneInboundMessages(llarp_time_t now) + { + auto itr = m_RecvMsgs.begin(); + while(itr != m_RecvMsgs.end()) { - utp_connect(sock, remoteAddr, remoteAddr.SockLen()); - EnterState(eConnecting); + if(itr->second.IsExpired(now)) + itr = m_RecvMsgs.erase(itr); + else + ++itr; } + } - /// handle outbound connection made - void - OutboundLinkEstablished(LinkLayer* p) - { - OnLinkEstablished(p); - OutboundHandshake(); - } + void + Session::Connect() + { + utp_connect(sock, remoteAddr, remoteAddr.SockLen()); + EnterState(eConnecting); + } - // send first message - void + void + Session::OutboundLinkEstablished(LinkLayer* p) + { + OnLinkEstablished(p); OutboundHandshake(); + } - // do key exchange for handshake - bool - DoKeyExchange(transport_dh_func dh, SharedSecret& K, - const KeyExchangeNonce& n, const PubKey& other, - const byte_t* secret) + bool + Session::DoKeyExchange(transport_dh_func dh, SharedSecret& K, + const KeyExchangeNonce& n, const PubKey& other, + const byte_t* secret) + { + ShortHash t_h; + AlignedBuffer< 64 > tmp; + memcpy(tmp.data(), K, K.size()); + memcpy(tmp.data() + K.size(), n, n.size()); + // t_h = HS(K + L.n) + if(!Crypto()->shorthash(t_h, ConstBuffer(tmp))) { - ShortHash t_h; - AlignedBuffer< 64 > tmp; - memcpy(tmp.data(), K, K.size()); - memcpy(tmp.data() + K.size(), n, n.size()); - // t_h = HS(K + L.n) - if(!Router()->crypto.shorthash(t_h, ConstBuffer(tmp))) - { - llarp::LogError("failed to mix key to ", remoteAddr); - return false; - } - - // K = TKE(a.p, B_a.e, sk, t_h) - if(!dh(K, other, secret, t_h)) - { - llarp::LogError("key exchange with ", other, " failed"); - return false; - } - llarp::LogDebug("keys mixed with session to ", remoteAddr); - return true; + llarp::LogError("failed to mix key to ", remoteAddr); + return false; } - /// does K = HS(K + A) - bool - MutateKey(SharedSecret& K, const AlignedBuffer< 24 >& A) + // K = TKE(a.p, B_a.e, sk, t_h) + if(!dh(K, other, secret, t_h)) { - AlignedBuffer< 56 > tmp; - auto buf = llarp::Buffer(tmp); - memcpy(buf.cur, K.data(), K.size()); - buf.cur += K.size(); - memcpy(buf.cur, A, A.size()); - buf.cur = buf.base; - return Router()->crypto.shorthash(K, buf); + llarp::LogError("key exchange with ", other, " failed"); + return false; } + llarp::LogDebug("keys mixed with session to ", remoteAddr); + return true; + } - void - TickImpl(__attribute__((unused)) llarp_time_t now) - { - } + bool + Session::MutateKey(SharedSecret& K, const AlignedBuffer< 24 >& A) + { + AlignedBuffer< 56 > tmp; + auto buf = llarp::Buffer(tmp); + memcpy(buf.cur, K.data(), K.size()); + buf.cur += K.size(); + memcpy(buf.cur, A, A.size()); + buf.cur = buf.base; + return Crypto()->shorthash(K, buf); + } - /// close session - void - Close(); + void + Session::TickImpl(llarp_time_t now) + { + PruneInboundMessages(now); + } - /// low level read - bool - Recv(const void* buf, size_t sz) + /// low level read + bool + Session::Recv(const byte_t* buf, size_t sz) + { + // mark we are alive + Alive(); + size_t s = sz; + // process leftovers + if(recvBufOffset) { - Alive(); - byte_t* ptr = (byte_t*)buf; - llarp::LogDebug("utp read ", sz, " from ", remoteAddr); - size_t s = sz; - // process leftovers - if(recvBufOffset) - { - auto left = FragmentBufferSize - recvBufOffset; - if(s >= left) - { - // yes it fills it - llarp::LogDebug("process leftovers, offset=", recvBufOffset, - " sz=", s, " left=", left); - memcpy(recvBuf.data() + recvBufOffset, ptr, left); - s -= left; - recvBufOffset = 0; - ptr += left; - if(!VerifyThenDecrypt(recvBuf.data())) - return false; - } - } - // process full fragments - while(s >= FragmentBufferSize) + auto left = FragmentBufferSize - recvBufOffset; + if(s >= left) { + // yes it fills it + llarp::LogDebug("process leftovers, offset=", recvBufOffset, + " sz=", s, " left=", left); + memcpy(recvBuf.data() + recvBufOffset, buf, left); + s -= left; recvBufOffset = 0; - llarp::LogDebug("process full sz=", s); - if(!VerifyThenDecrypt(ptr)) + buf += left; + if(!VerifyThenDecrypt(recvBuf.data())) return false; - ptr += FragmentBufferSize; - s -= FragmentBufferSize; } - if(s) - { - // hold onto leftovers - llarp::LogDebug("leftovers sz=", s); - memcpy(recvBuf.data() + recvBufOffset, ptr, s); - recvBufOffset += s; - } - return true; } - - bool - InboundLIM(const LinkIntroMessage* msg); - - bool - OutboundLIM(const LinkIntroMessage* msg); - - bool - IsTimedOut(llarp_time_t now) const + // process full fragments + while(s >= FragmentBufferSize) { - if(state == eClose) - return true; - if(now < lastActive) + recvBufOffset = 0; + llarp::LogDebug("process full sz=", s); + if(!VerifyThenDecrypt(buf)) return false; - auto dlt = now - lastActive; - if(dlt >= sessionTimeout) - { - llarp::LogDebug("session timeout reached for ", remoteAddr); - return true; - } - return false; + buf += FragmentBufferSize; + s -= FragmentBufferSize; } - - const PubKey& - RemotePubKey() const + if(s) { - return remoteRC.pubkey; + // hold onto leftovers + llarp::LogDebug("leftovers sz=", s); + memcpy(recvBuf.data() + recvBufOffset, buf, s); + recvBufOffset += s; } + return true; + } - const Addr& - RemoteEndpoint() const + bool + Session::IsTimedOut(llarp_time_t now) const + { + if(state == eClose) + return true; + if(now < lastActive) + return false; + auto dlt = now - lastActive; + if(dlt >= sessionTimeout) { - return remoteAddr; + llarp::LogDebug("session timeout reached for ", remoteAddr); + return true; } + return false; + } - void - MarkEstablished(); - }; // namespace utp + const PubKey& + Session::RemotePubKey() const + { + return remoteRC.pubkey; + } - struct LinkLayer : public ILinkLayer + const Addr& + Session::RemoteEndpoint() const { - utp_context* _utp_ctx = nullptr; - llarp::Router* router = nullptr; - static uint64 - OnRead(utp_callback_arguments* arg); + return remoteAddr; + } - static uint64 - SendTo(utp_callback_arguments* arg) - { - LinkLayer* l = - static_cast< LinkLayer* >(utp_context_get_userdata(arg->context)); - llarp::LogDebug("utp_sendto ", Addr(*arg->address), " ", arg->len, - " bytes"); - // For whatever reason, the UTP_UDP_DONTFRAG flag is set - // on the socket itself....which isn't correct and causes - // winsock (at minimum) to reeee - // here, we check its value, then set fragmentation the _right_ - // way. Naturally, Linux has its own special procedure. - // Of course, the flag itself is cleared. -rick + uint64 + LinkLayer::SendTo(utp_callback_arguments* arg) + { + LinkLayer* l = + static_cast< LinkLayer* >(utp_context_get_userdata(arg->context)); + if(l == nullptr) + return 0; + llarp::LogDebug("utp_sendto ", Addr(*arg->address), " ", arg->len, + " bytes"); + // For whatever reason, the UTP_UDP_DONTFRAG flag is set + // on the socket itself....which isn't correct and causes + // winsock (at minimum) to reeee + // here, we check its value, then set fragmentation the _right_ + // way. Naturally, Linux has its own special procedure. + // Of course, the flag itself is cleared. -rick #ifndef _WIN32 - // No practical method of doing this on NetBSD or Darwin - // without resorting to raw sockets + // No practical method of doing this on NetBSD or Darwin + // without resorting to raw sockets #if !(__NetBSD__ || __OpenBSD__ || (__APPLE__ && __MACH__)) #ifndef __linux__ - if(arg->flags == 2) - { - int val = 1; - setsockopt(l->m_udp.fd, IPPROTO_IP, IP_DONTFRAGMENT, &val, - sizeof(val)); - } - else - { - int val = 0; - setsockopt(l->m_udp.fd, IPPROTO_IP, IP_DONTFRAGMENT, &val, - sizeof(val)); - } + if(arg->flags == 2) + { + int val = 1; + setsockopt(l->m_udp.fd, IPPROTO_IP, IP_DONTFRAGMENT, &val, sizeof(val)); + } + else + { + int val = 0; + setsockopt(l->m_udp.fd, IPPROTO_IP, IP_DONTFRAGMENT, &val, sizeof(val)); + } #else - if(arg->flags == 2) - { - int val = IP_PMTUDISC_DO; - setsockopt(l->m_udp.fd, IPPROTO_IP, IP_MTU_DISCOVER, &val, - sizeof(val)); - } - else - { - int val = IP_PMTUDISC_DONT; - setsockopt(l->m_udp.fd, IPPROTO_IP, IP_MTU_DISCOVER, &val, - sizeof(val)); - } + if(arg->flags == 2) + { + int val = IP_PMTUDISC_DO; + setsockopt(l->m_udp.fd, IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val)); + } + else + { + int val = IP_PMTUDISC_DONT; + setsockopt(l->m_udp.fd, IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val)); + } #endif #endif - arg->flags = 0; - if(::sendto(l->m_udp.fd, (char*)arg->buf, arg->len, arg->flags, - arg->address, arg->address_len) - == -1 - && errno) + arg->flags = 0; + if(::sendto(l->m_udp.fd, (char*)arg->buf, arg->len, arg->flags, + arg->address, arg->address_len) + == -1 + && errno) #else - if(arg->flags == 2) - { - char val = 1; - setsockopt(l->m_udp.fd, IPPROTO_IP, IP_DONTFRAGMENT, &val, - sizeof(val)); - } - else - { - char val = 0; - setsockopt(l->m_udp.fd, IPPROTO_IP, IP_DONTFRAGMENT, &val, - sizeof(val)); - } - arg->flags = 0; - if(::sendto(l->m_udp.fd, (char*)arg->buf, arg->len, arg->flags, - arg->address, arg->address_len) - == -1) + if(arg->flags == 2) + { + char val = 1; + setsockopt(l->m_udp.fd, IPPROTO_IP, IP_DONTFRAGMENT, &val, sizeof(val)); + } + else + { + char val = 0; + setsockopt(l->m_udp.fd, IPPROTO_IP, IP_DONTFRAGMENT, &val, sizeof(val)); + } + arg->flags = 0; + if(::sendto(l->m_udp.fd, (char*)arg->buf, arg->len, arg->flags, + arg->address, arg->address_len) + == -1) #endif - { + { #ifdef _WIN32 - char buf[1024]; - int err = WSAGetLastError(); - FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, nullptr, err, LANG_NEUTRAL, - buf, 1024, nullptr); - llarp::LogError("sendto failed: ", buf); + char buf[1024]; + int err = WSAGetLastError(); + FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, nullptr, err, LANG_NEUTRAL, + buf, 1024, nullptr); + llarp::LogError("sendto failed: ", buf); #else - llarp::LogError("sendto failed: ", strerror(errno)); + llarp::LogError("sendto failed: ", strerror(errno)); #endif - } - return 0; - } - - static uint64 - OnError(utp_callback_arguments* arg) - { - BaseSession* session = - static_cast< BaseSession* >(utp_get_userdata(arg->socket)); - if(session) - { - session->Router()->OnConnectTimeout(session->GetPubKey()); - llarp::LogError(utp_error_code_names[arg->error_code], " via ", - session->remoteAddr); - session->Close(); - } - return 0; } + return 0; + } - static uint64 - OnStateChange(utp_callback_arguments*); + uint64 + LinkLayer::OnError(utp_callback_arguments* arg) + { + Session* session = static_cast< Session* >(utp_get_userdata(arg->socket)); - static uint64 - OnAccept(utp_callback_arguments*); + LinkLayer* link = + static_cast< LinkLayer* >(utp_context_get_userdata(arg->context)); - static uint64 - OnLog(utp_callback_arguments* arg) + if(session && link) { - llarp::LogDebug(arg->buf); - return 0; + link->HandleTimeout(session); + llarp::LogError(utp_error_code_names[arg->error_code], " via ", + session->remoteAddr); + session->Close(); } + return 0; + } - LinkLayer(llarp::Router* r) : ILinkLayer() - { - router = r; - _utp_ctx = utp_init(2); - utp_context_set_userdata(_utp_ctx, this); - utp_set_callback(_utp_ctx, UTP_SENDTO, &LinkLayer::SendTo); - utp_set_callback(_utp_ctx, UTP_ON_ACCEPT, &LinkLayer::OnAccept); - utp_set_callback(_utp_ctx, UTP_ON_STATE_CHANGE, - &LinkLayer::OnStateChange); - utp_set_callback(_utp_ctx, UTP_ON_READ, &LinkLayer::OnRead); - utp_set_callback(_utp_ctx, UTP_ON_ERROR, &LinkLayer::OnError); - utp_set_callback(_utp_ctx, UTP_LOG, &LinkLayer::OnLog); - utp_context_set_option(_utp_ctx, UTP_LOG_NORMAL, 1); - utp_context_set_option(_utp_ctx, UTP_LOG_MTU, 1); - utp_context_set_option(_utp_ctx, UTP_LOG_DEBUG, 1); - utp_context_set_option(_utp_ctx, UTP_SNDBUF, MAX_LINK_MSG_SIZE * 16); - utp_context_set_option(_utp_ctx, UTP_RCVBUF, MAX_LINK_MSG_SIZE * 64); - } + uint64 + LinkLayer::OnLog(utp_callback_arguments* arg) + { + llarp::LogDebug(arg->buf); + return 0; + } - ~LinkLayer() - { - utp_destroy(_utp_ctx); - } + LinkLayer::LinkLayer(llarp::Crypto* crypto, const byte_t* routerEncSecret, + llarp::GetRCFunc getrc, llarp::LinkMessageHandler h, + llarp::SignBufferFunc sign, + llarp::SessionEstablishedHandler established, + llarp::TimeoutHandler timeout, + llarp::SessionClosedHandler closed) + : ILinkLayer(routerEncSecret, getrc, h, sign, established, timeout, + closed) + { + _crypto = crypto; + _utp_ctx = utp_init(2); + utp_context_set_userdata(_utp_ctx, this); + utp_set_callback(_utp_ctx, UTP_SENDTO, &LinkLayer::SendTo); + utp_set_callback(_utp_ctx, UTP_ON_ACCEPT, &LinkLayer::OnAccept); + utp_set_callback(_utp_ctx, UTP_ON_STATE_CHANGE, + &LinkLayer::OnStateChange); + utp_set_callback(_utp_ctx, UTP_ON_READ, &LinkLayer::OnRead); + utp_set_callback(_utp_ctx, UTP_ON_ERROR, &LinkLayer::OnError); + utp_set_callback(_utp_ctx, UTP_LOG, &LinkLayer::OnLog); + utp_context_set_option(_utp_ctx, UTP_LOG_NORMAL, 1); + utp_context_set_option(_utp_ctx, UTP_LOG_MTU, 1); + utp_context_set_option(_utp_ctx, UTP_LOG_DEBUG, 1); + utp_context_set_option(_utp_ctx, UTP_SNDBUF, MAX_LINK_MSG_SIZE * 16); + utp_context_set_option(_utp_ctx, UTP_RCVBUF, MAX_LINK_MSG_SIZE * 64); + } - uint16_t - Rank() const - { - return 1; - } + LinkLayer::~LinkLayer() + { + utp_destroy(_utp_ctx); + } - void - RecvFrom(const Addr& from, const void* buf, size_t sz) - { - utp_process_udp(_utp_ctx, (const byte_t*)buf, sz, from, from.SockLen()); - } + uint16_t + LinkLayer::Rank() const + { + return 1; + } + + void + LinkLayer::RecvFrom(const Addr& from, const void* buf, size_t sz) + { + utp_process_udp(_utp_ctx, (const byte_t*)buf, sz, from, from.SockLen()); + } #ifdef __linux__ - void - ProcessICMP() + void + LinkLayer::ProcessICMP() + { + do { - do + byte_t vec_buf[4096], ancillary_buf[4096]; + struct iovec iov = {vec_buf, sizeof(vec_buf)}; + struct sockaddr_in remote; + struct msghdr msg; + ssize_t len; + struct cmsghdr* cmsg; + struct sock_extended_err* e; + struct sockaddr* icmp_addr; + struct sockaddr_in* icmp_sin; + + memset(&msg, 0, sizeof(msg)); + + msg.msg_name = &remote; + msg.msg_namelen = sizeof(remote); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_flags = 0; + msg.msg_control = ancillary_buf; + msg.msg_controllen = sizeof(ancillary_buf); + + len = recvmsg(m_udp.fd, &msg, MSG_ERRQUEUE | MSG_DONTWAIT); + if(len < 0) + { + if(errno == EAGAIN || errno == EWOULDBLOCK) + errno = 0; + else + llarp::LogError("failed to read icmp for utp ", strerror(errno)); + return; + } + + for(cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) { - byte_t vec_buf[4096], ancillary_buf[4096]; - struct iovec iov = {vec_buf, sizeof(vec_buf)}; - struct sockaddr_in remote; - struct msghdr msg; - ssize_t len; - struct cmsghdr* cmsg; - struct sock_extended_err* e; - struct sockaddr* icmp_addr; - struct sockaddr_in* icmp_sin; - - memset(&msg, 0, sizeof(msg)); - - msg.msg_name = &remote; - msg.msg_namelen = sizeof(remote); - msg.msg_iov = &iov; - msg.msg_iovlen = 1; - msg.msg_flags = 0; - msg.msg_control = ancillary_buf; - msg.msg_controllen = sizeof(ancillary_buf); - - len = recvmsg(m_udp.fd, &msg, MSG_ERRQUEUE | MSG_DONTWAIT); - if(len < 0) + if(cmsg->cmsg_type != IP_RECVERR) { - if(errno == EAGAIN || errno == EWOULDBLOCK) - errno = 0; - else - llarp::LogError("failed to read icmp for utp ", strerror(errno)); - return; + continue; } - - for(cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) + if(cmsg->cmsg_level != SOL_IP) { - if(cmsg->cmsg_type != IP_RECVERR) - { - continue; - } - if(cmsg->cmsg_level != SOL_IP) - { - continue; - } - e = (struct sock_extended_err*)CMSG_DATA(cmsg); - if(!e) - continue; - if(e->ee_origin != SO_EE_ORIGIN_ICMP) - { - continue; - } - icmp_addr = (struct sockaddr*)SO_EE_OFFENDER(e); - icmp_sin = (struct sockaddr_in*)icmp_addr; - if(icmp_sin->sin_port != 0) - { - continue; - } - if(e->ee_type == 3 && e->ee_code == 4) - { - utp_process_icmp_fragmentation(_utp_ctx, vec_buf, len, - (struct sockaddr*)&remote, - sizeof(remote), e->ee_info); - } - else - { - utp_process_icmp_error(_utp_ctx, vec_buf, len, - (struct sockaddr*)&remote, sizeof(remote)); - } + continue; } - } while(true); - } + e = (struct sock_extended_err*)CMSG_DATA(cmsg); + if(!e) + continue; + if(e->ee_origin != SO_EE_ORIGIN_ICMP) + { + continue; + } + icmp_addr = (struct sockaddr*)SO_EE_OFFENDER(e); + icmp_sin = (struct sockaddr_in*)icmp_addr; + if(icmp_sin->sin_port != 0) + { + continue; + } + if(e->ee_type == 3 && e->ee_code == 4) + { + utp_process_icmp_fragmentation(_utp_ctx, vec_buf, len, + (struct sockaddr*)&remote, + sizeof(remote), e->ee_info); + } + else + { + utp_process_icmp_error(_utp_ctx, vec_buf, len, + (struct sockaddr*)&remote, sizeof(remote)); + } + } + } while(true); + } #endif - void - Pump() - { - utp_issue_deferred_acks(_utp_ctx); + void + LinkLayer::Pump() + { + utp_issue_deferred_acks(_utp_ctx); #ifdef __linux__ - ProcessICMP(); + ProcessICMP(); #endif - std::set< RouterID > sessions; + std::set< RouterID > sessions; + { + Lock l(m_AuthedLinksMutex); + auto itr = m_AuthedLinks.begin(); + while(itr != m_AuthedLinks.end()) { - Lock l(m_AuthedLinksMutex); - auto itr = m_AuthedLinks.begin(); - while(itr != m_AuthedLinks.end()) - { - sessions.insert(itr->first); - ++itr; - } + sessions.insert(itr->first); + ++itr; } - ILinkLayer::Pump(); + } + ILinkLayer::Pump(); + { + Lock l(m_AuthedLinksMutex); + for(const auto& pk : sessions) { - Lock l(m_AuthedLinksMutex); - for(const auto& pk : sessions) + if(m_AuthedLinks.count(pk) == 0) { - if(m_AuthedLinks.count(pk) == 0) - { - // all sessions were removed - router->SessionClosed(pk); - } + // all sessions were removed + SessionClosed(pk); } } } + } - void - Stop() - { - } - - llarp::Router* - GetRouter(); + void + LinkLayer::Stop() + { + } - bool - KeyGen(SecretKey& k) - { - router->crypto.encryption_keygen(k); - return true; - } + bool + LinkLayer::KeyGen(SecretKey& k) + { + Crypto()->encryption_keygen(k); + return true; + } - void - Tick(llarp_time_t now) - { - utp_check_timeouts(_utp_ctx); - ILinkLayer::Tick(now); - } + void + LinkLayer::Tick(llarp_time_t now) + { + utp_check_timeouts(_utp_ctx); + ILinkLayer::Tick(now); + } - ILinkSession* - NewOutboundSession(const RouterContact& rc, const AddressInfo& addr); + utp_socket* + LinkLayer::NewSocket() + { + return utp_create_socket(_utp_ctx); + } - utp_socket* - NewSocket() - { - return utp_create_socket(_utp_ctx); - } + const char* + LinkLayer::Name() const + { + return "utp"; + } - const char* - Name() const - { - return "utp"; - } - }; + std::unique_ptr< ILinkLayer > + NewServer(llarp::Crypto* crypto, const byte_t* routerEncSecret, + llarp::GetRCFunc getrc, llarp::LinkMessageHandler h, + llarp::SignBufferFunc sign, llarp::SessionEstablishedHandler est, + llarp::TimeoutHandler timeout, llarp::SessionClosedHandler closed) + { + return std::unique_ptr< ILinkLayer >(new LinkLayer( + crypto, routerEncSecret, getrc, h, sign, est, timeout, closed)); + } std::unique_ptr< ILinkLayer > - NewServer(llarp::Router* r) + NewServerFromRouter(llarp::Router* r) { - return std::unique_ptr< LinkLayer >(new LinkLayer(r)); + return NewServer( + &r->crypto, r->encryption, std::bind(&llarp::Router::rc, r), + std::bind(&llarp::Router::HandleRecvLinkMessageBuffer, r, + std::placeholders::_1, std::placeholders::_2), + std::bind(&llarp::Router::Sign, r, std::placeholders::_1, + std::placeholders::_2), + std::bind(&llarp::Router::OnSessionEstablished, r, + std::placeholders::_1), + std::bind(&llarp::Router::OnConnectTimeout, r, std::placeholders::_1), + std::bind(&llarp::Router::SessionClosed, r, std::placeholders::_1)); } /// base constructor - BaseSession::BaseSession(LinkLayer* p) + Session::Session(LinkLayer* p) { - parent = p; + m_NextTXMsgID = 0; + m_NextRXMsgID = 0; + parent = p; remoteTransportPubKey.Zero(); SendQueueBacklog = [&]() -> size_t { return sendq.size(); }; SendKeepAlive = [&]() -> bool { - auto now = parent->now(); + auto now = parent->Now(); if(sendq.size() == 0 && state == eSessionReady && now > lastActive && now - lastActive > 5000) { @@ -670,87 +579,153 @@ namespace llarp TimedOut = [&](llarp_time_t now) -> bool { return this->IsTimedOut(now) || this->state == eClose; }; - GetPubKey = std::bind(&BaseSession::RemotePubKey, this); - lastActive = parent->now(); + GetPubKey = std::bind(&Session::RemotePubKey, this); + GetRemoteRC = std::bind(&Session::RemoteRC, this); + GetLinkLayer = std::bind(&Session::GetParent, this); + + lastActive = parent->Now(); - Pump = std::bind(&BaseSession::PumpWrite, this); - Tick = std::bind(&BaseSession::TickImpl, this, std::placeholders::_1); - SendMessageBuffer = std::bind(&BaseSession::QueueWriteBuffers, this, - std::placeholders::_1); + Pump = std::bind(&Session::PumpWrite, this); + Tick = std::bind(&Session::TickImpl, this, std::placeholders::_1); + SendMessageBuffer = + std::bind(&Session::QueueWriteBuffers, this, std::placeholders::_1); IsEstablished = [=]() { return this->state == eSessionReady || this->state == eLinkEstablished; }; - SendClose = std::bind(&BaseSession::Close, this); - GetRemoteEndpoint = std::bind(&BaseSession::RemoteEndpoint, this); + SendClose = std::bind(&Session::Close, this); + GetRemoteEndpoint = std::bind(&Session::RemoteEndpoint, this); } /// outbound session - BaseSession::BaseSession(LinkLayer* p, utp_socket* s, - const RouterContact& rc, const AddressInfo& addr) - : BaseSession(p) + Session::Session(LinkLayer* p, utp_socket* s, const RouterContact& rc, + const AddressInfo& addr) + : Session(p) { - p->router->crypto.shorthash(txKey, InitBuffer(rc.pubkey, PUBKEYSIZE)); - remoteRC.Clear(); remoteTransportPubKey = addr.pubkey; remoteRC = rc; - sock = s; + RouterID rid = remoteRC.pubkey; + Crypto()->shorthash(txKey, InitBuffer(rid.data(), PUBKEYSIZE)); + rid = p->GetOurRC().pubkey.data(); + Crypto()->shorthash(rxKey, llarp::InitBuffer(rid.data(), PUBKEYSIZE)); + + sock = s; assert(utp_set_userdata(sock, this) == this); assert(s == sock); remoteAddr = addr; - Start = std::bind(&BaseSession::Connect, this); - GotLIM = - std::bind(&BaseSession::OutboundLIM, this, std::placeholders::_1); + Start = std::bind(&Session::Connect, this); + GotLIM = std::bind(&Session::OutboundLIM, this, std::placeholders::_1); } /// inbound session - BaseSession::BaseSession(LinkLayer* p, utp_socket* s, const Addr& addr) - : BaseSession(p) + Session::Session(LinkLayer* p, utp_socket* s, const Addr& addr) : Session(p) { - p->router->crypto.shorthash(rxKey, - InitBuffer(p->router->pubkey(), PUBKEYSIZE)); + RouterID rid = p->GetOurRC().pubkey; + Crypto()->shorthash(rxKey, InitBuffer(rid.data(), PUBKEYSIZE)); remoteRC.Clear(); sock = s; assert(s == sock); assert(utp_set_userdata(sock, this) == this); remoteAddr = addr; Start = []() {}; - GotLIM = std::bind(&BaseSession::InboundLIM, this, std::placeholders::_1); + GotLIM = std::bind(&Session::InboundLIM, this, std::placeholders::_1); + } + + const RouterContact& + Session::RemoteRC() const + { + return remoteRC; + } + + ILinkLayer* + Session::GetParent() + { + return parent; } bool - BaseSession::InboundLIM(const LinkIntroMessage* msg) + Session::InboundLIM(const LinkIntroMessage* msg) { if(gotLIM && remoteRC.pubkey != msg->rc.pubkey) { + Close(); return false; } if(!gotLIM) { remoteRC = msg->rc; - gotLIM = true; - if(!DoKeyExchange(Router()->crypto.transport_dh_server, txKey, msg->N, + Crypto()->shorthash( + txKey, llarp::InitBuffer(remoteRC.pubkey.data(), PUBKEYSIZE)); + + if(!DoKeyExchange(Crypto()->transport_dh_server, rxKey, msg->N, remoteRC.enckey, parent->TransportSecretKey())) return false; + + byte_t tmp[LinkIntroMessage::MaxSize]; + auto buf = StackBuffer< decltype(tmp) >(tmp); + LinkIntroMessage replymsg; + replymsg.rc = parent->GetOurRC(); + if(!replymsg.rc.Verify(Crypto())) + { + llarp::LogError("our RC is invalid? closing session to", remoteAddr); + Close(); + return false; + } + replymsg.N.Randomize(); + replymsg.P = DefaultLinkSessionLifetime; + if(!replymsg.Sign(parent->Sign)) + { + llarp::LogError("failed to sign LIM for inbound handshake from ", + remoteAddr); + Close(); + return false; + } + // encode + if(!replymsg.BEncode(&buf)) + { + llarp::LogError("failed to encode LIM for handshake from ", + remoteAddr); + Close(); + return false; + } + // rewind + buf.sz = buf.cur - buf.base; + buf.cur = buf.base; + // send + if(!SendMessageBuffer(buf)) + { + llarp::LogError("failed to repl to handshake from ", remoteAddr); + Close(); + return false; + } + llarp::LogDebug("Sent reply LIM"); + if(!DoKeyExchange(Crypto()->transport_dh_client, txKey, replymsg.N, + remoteRC.enckey, parent->RouterEncryptionSecret())) + return false; + gotLIM = true; + EnterState(eSessionReady); } - EnterState(eSessionReady); return true; } bool - BaseSession::QueueWriteBuffers(llarp_buffer_t buf) + Session::QueueWriteBuffers(llarp_buffer_t buf) { if(sendq.size() >= MaxSendQueueSize) return false; - llarp::LogDebug("write ", buf.sz, " bytes to ", remoteAddr); - lastActive = parent->now(); - size_t sz = buf.sz; - byte_t* ptr = buf.base; + size_t sz = buf.sz; + byte_t* ptr = buf.base; + uint32_t msgid = m_NextTXMsgID++; while(sz) { uint32_t s = std::min(FragmentBodyPayloadSize, sz); - EncryptThenHash(ptr, s, ((sz - s) == 0)); + if(!EncryptThenHash(ptr, msgid, s, sz - s)) + { + llarp::LogError("EncryptThenHash failed?!"); + return false; + } + llarp::LogDebug("encrypted ", s, " bytes"); ptr += s; sz -= s; } @@ -758,7 +733,7 @@ namespace llarp } bool - BaseSession::OutboundLIM(const LinkIntroMessage* msg) + Session::OutboundLIM(const LinkIntroMessage* msg) { if(gotLIM && remoteRC.pubkey != msg->rc.pubkey) { @@ -766,19 +741,25 @@ namespace llarp } remoteRC = msg->rc; gotLIM = true; - return DoKeyExchange(Router()->crypto.transport_dh_client, msg->N, - remoteTransportPubKey, Router()->encryption); + if(!DoKeyExchange(Crypto()->transport_dh_server, rxKey, msg->N, + remoteRC.enckey, parent->RouterEncryptionSecret())) + { + Close(); + return false; + } + EnterState(eSessionReady); + return true; } void - BaseSession::OutboundHandshake() + Session::OutboundHandshake() { byte_t tmp[LinkIntroMessage::MaxSize]; auto buf = StackBuffer< decltype(tmp) >(tmp); // build our RC LinkIntroMessage msg; - msg.rc = Router()->rc(); - if(!msg.rc.Verify(&Router()->crypto)) + msg.rc = parent->GetOurRC(); + if(!msg.rc.Verify(Crypto())) { llarp::LogError("our RC is invalid? closing session to", remoteAddr); Close(); @@ -786,7 +767,7 @@ namespace llarp } msg.N.Randomize(); msg.P = DefaultLinkSessionLifetime; - if(!msg.Sign(&Router()->crypto, Router()->identity)) + if(!msg.Sign(parent->Sign)) { llarp::LogError("failed to sign LIM for outbound handshake to ", remoteAddr); @@ -810,24 +791,18 @@ namespace llarp Close(); return; } - if(!DoKeyExchange(Router()->crypto.transport_dh_client, txKey, msg.N, - remoteTransportPubKey, Router()->encryption)) + if(!DoKeyExchange(Crypto()->transport_dh_client, txKey, msg.N, + remoteTransportPubKey, + parent->RouterEncryptionSecret())) { llarp::LogError("failed to mix keys for outbound session to ", remoteAddr); Close(); return; } - EnterState(eSessionReady); - } - - llarp::Router* - BaseSession::Router() - { - return parent->router; } - BaseSession::~BaseSession() + Session::~Session() { if(sock) { @@ -841,18 +816,17 @@ namespace llarp LinkLayer::NewOutboundSession(const RouterContact& rc, const AddressInfo& addr) { - return new BaseSession(this, utp_create_socket(_utp_ctx), rc, addr); + return new Session(this, utp_create_socket(_utp_ctx), rc, addr); } uint64 LinkLayer::OnRead(utp_callback_arguments* arg) { - BaseSession* self = - static_cast< BaseSession* >(utp_get_userdata(arg->socket)); + Session* self = static_cast< Session* >(utp_get_userdata(arg->socket)); if(self) { - if(self->state == BaseSession::eClose) + if(self->state == Session::eClose) { return 0; } @@ -877,13 +851,12 @@ namespace llarp { LinkLayer* l = static_cast< LinkLayer* >(utp_context_get_userdata(arg->context)); - BaseSession* session = - static_cast< BaseSession* >(utp_get_userdata(arg->socket)); + Session* session = static_cast< Session* >(utp_get_userdata(arg->socket)); if(session) { if(arg->state == UTP_STATE_CONNECT) { - if(session->state == BaseSession::eClose) + if(session->state == Session::eClose) { return 0; } @@ -909,15 +882,15 @@ namespace llarp static_cast< LinkLayer* >(utp_context_get_userdata(arg->context)); Addr remote(*arg->address); llarp::LogDebug("utp accepted from ", remote); - BaseSession* session = new BaseSession(self, arg->socket, remote); + Session* session = new Session(self, arg->socket, remote); self->PutSession(session); session->OnLinkEstablished(self); return 0; } - void - BaseSession::EncryptThenHash(const byte_t* ptr, uint32_t msgid, - uint16_t length, uint16_t remaining) + bool + Session::EncryptThenHash(const byte_t* ptr, uint32_t msgid, uint16_t length, + uint16_t remaining) { sendq.emplace_back(); @@ -926,55 +899,64 @@ namespace llarp auto& vec = vecq.back(); vec.iov_base = buf.data(); vec.iov_len = FragmentBufferSize; - llarp::LogDebug("encrypt then hash ", sz, " bytes last=", isLastFragment); buf.Randomize(); - AlignedBuffer< 24 > A = buf.data(); byte_t* nonce = buf.data() + FragmentHashSize; byte_t* body = nonce + FragmentNonceSize; byte_t* base = body; - if(isLastFragment) - htobe32buf(body, 0); - else - htobe32buf(body, 1); - body += sizeof(uint32_t); - htobe32buf(body, sz); + AlignedBuffer< 24 > A = base; + // skip inner nonce + body += A.size(); + // put msgid + htobe32buf(body, msgid); body += sizeof(uint32_t); - memcpy(body, ptr, sz); + // put length + htobe16buf(body, length); + body += sizeof(uint16_t); + // put remaining + htobe16buf(body, remaining); + body += sizeof(uint16_t); + // put body + memcpy(body, ptr, length); auto payload = InitBuffer(base, FragmentBufferSize - FragmentOverheadSize); // encrypt - Router()->crypto.xchacha20(payload, txKey, nonce); + if(!Crypto()->xchacha20(payload, txKey, nonce)) + return false; payload.base = nonce; payload.cur = payload.base; payload.sz = FragmentBufferSize - FragmentHashSize; // key'd hash - Router()->crypto.hmac(buf.data(), payload, txKey); + if(!Crypto()->hmac(buf.data(), payload, txKey)) + return false; + if(msgid) + return MutateKey(txKey, A); + return true; } void - BaseSession::EnterState(State st) + Session::EnterState(State st) { state = st; Alive(); if(st == eSessionReady) { parent->MapAddr(remoteRC.pubkey, this); - Router()->HandleLinkSessionEstablished(remoteRC, parent); + parent->SessionEstablished(this); } } bool - BaseSession::VerifyThenDecrypt(byte_t* buf) + Session::VerifyThenDecrypt(const byte_t* buf) { llarp::LogDebug("verify then decrypt ", remoteAddr); ShortHash digest; auto hbuf = InitBuffer(buf + FragmentHashSize, FragmentBufferSize - FragmentHashSize); - if(!Router()->crypto.hmac(digest.data(), hbuf, rxKey)) + if(!Crypto()->hmac(digest.data(), hbuf, rxKey)) { llarp::LogError("keyed hash failed"); return false; @@ -984,56 +966,86 @@ namespace llarp { llarp::LogError("Message Integrity Failed: got ", digest, " from ", remoteAddr, " instead of ", expected); - llarp::DumpBuffer(InitBuffer(buf, FragmentBufferSize)); return false; } - auto body = InitBuffer(buf + FragmentOverheadSize, - FragmentBufferSize - FragmentOverheadSize); - - Router()->crypto.xchacha20(body, rxKey, buf + FragmentHashSize); - - AlignedBuffer< 24 > A = body.cur; + auto in = InitBuffer(buf + FragmentOverheadSize, + FragmentBufferSize - FragmentOverheadSize); - MutateKey(rxKey, A); + auto out = Buffer(rxFragBody); - body.cur += 24; + // decrypt + if(!Crypto()->xchacha20_alt(out, in, rxKey, buf + FragmentHashSize)) + { + llarp::LogError("failed to decrypt message from ", remoteAddr); + return false; + } + // get inner nonce + AlignedBuffer< 24 > A = out.base; + // advance buffer + out.cur += A.size(); + // read msgid uint32_t msgid; - if(!llarp_buffer_read_uint32(&body, &msgid)) + if(!llarp_buffer_read_uint32(&out, &msgid)) + { + llarp::LogError("failed to read msgid"); return false; - - uint32_t length, remaining; - - if(!(llarp_buffer_read_uint16(&body, &length) - && llarp_buffer_read_uint16(&body, &remaining))) + } + // read length and remaining + uint16_t length, remaining; + if(!(llarp_buffer_read_uint16(&out, &length) + && llarp_buffer_read_uint16(&out, &remaining))) + { + llarp::LogError("failed to read the rest of the header"); return false; - - if(length > (body.sz - (body.cur - body.base))) + } + if(length > (out.sz - (out.cur - out.base))) { // too big length + llarp::LogError("fragment body too big"); return false; } - auto& inbound = m_RecvMsgs[msgid]; - inbound.lastActive = Router()->Now(); - - inbound.FeedData(body.cur, length); - + // get message + auto& inbound = m_RecvMsgs[msgid]; + // set next message + m_NextRXMsgID = std::max(msgid, m_NextRXMsgID); + // add message activity + inbound.lastActive = parent->Now(); + // append data + if(!inbound.AppendData(out.cur, length)) + { + llarp::LogError("inbound buffer is full"); + return false; // not enough room + } + // determine if this message is done + bool result = true; if(remaining == 0) { - // reszie + // resize inbound.buffer.sz = inbound.buffer.cur - inbound.buffer.base; // rewind inbound.buffer.cur = inbound.buffer.base; - // process - if(!Router()->HandleRecvLinkMessageBuffer(this, inbound.buffer)) + // process buffer + llarp::LogDebug("got message ", msgid, " from ", remoteAddr); + result = parent->HandleMessage(this, inbound.buffer); + // get rid of message buffer + m_RecvMsgs.erase(msgid); + } + // mutate key + if(msgid) + { + if(!MutateKey(rxKey, A)) + { + llarp::LogError("failed to mutate rx key"); return false; + } } - return true; + return result; } void - BaseSession::Close() + Session::Close() { if(state != eClose) { @@ -1050,9 +1062,9 @@ namespace llarp } void - BaseSession::Alive() + Session::Alive() { - lastActive = parent->now(); + lastActive = parent->Now(); } } // namespace utp diff --git a/llarp/link/utp.hpp b/llarp/link/utp.hpp index a64d1c72e..2cd6dfb77 100644 --- a/llarp/link/utp.hpp +++ b/llarp/link/utp.hpp @@ -2,17 +2,24 @@ #define LLARP_LINK_UTP_HPP #include +#include namespace llarp { - struct ILinkLayer; struct Router; namespace utp { std::unique_ptr< ILinkLayer > - NewServer(llarp::Router* r); - } + NewServer(llarp::Crypto* crypto, const byte_t* routerEncSecret, + llarp::GetRCFunc getrc, llarp::LinkMessageHandler h, + llarp::SessionEstablishedHandler est, llarp::SignBufferFunc sign, + llarp::TimeoutHandler timeout, + llarp::SessionClosedHandler closed); + + std::unique_ptr< ILinkLayer > + NewServerFromRouter(llarp::Router* r); + } // namespace utp } // namespace llarp #endif diff --git a/llarp/link/utp_internal.hpp b/llarp/link/utp_internal.hpp new file mode 100644 index 000000000..1e2e32870 --- /dev/null +++ b/llarp/link/utp_internal.hpp @@ -0,0 +1,335 @@ +#ifndef LLARP_LINK_UTP_INTERNAL_HPP +#define LLARP_LINK_UTP_INTERNAL_HPP +#include +#include +#include +#include + +#include +#include + +namespace llarp +{ + namespace utp + { + /// size of keyed hash + constexpr size_t FragmentHashSize = 32; + /// size of outer nonce + constexpr size_t FragmentNonceSize = 32; + /// size of outer overhead + constexpr size_t FragmentOverheadSize = + FragmentHashSize + FragmentNonceSize; + /// max fragment payload size + constexpr size_t FragmentBodyPayloadSize = 1024; + /// size of inner nonce + constexpr size_t FragmentBodyNonceSize = 24; + /// size of fragment body overhead + constexpr size_t FragmentBodyOverhead = FragmentBodyNonceSize + + sizeof(uint32) + sizeof(uint16_t) + sizeof(uint16_t); + /// size of fragment body + constexpr size_t FragmentBodySize = + FragmentBodyOverhead + FragmentBodyPayloadSize; + /// size of fragment + constexpr size_t FragmentBufferSize = + FragmentOverheadSize + FragmentBodySize; + + static_assert(FragmentBufferSize == 1120); + + /// buffer for a single utp fragment + using FragmentBuffer = llarp::AlignedBuffer< FragmentBufferSize >; + + /// maximum size for send queue for a session before we drop + constexpr size_t MaxSendQueueSize = 1024; + + /// buffer for a link layer message + using MessageBuffer = llarp::AlignedBuffer< MAX_LINK_MSG_SIZE >; + + struct LinkLayer; + + /// pending inbound message being received + struct InboundMessage + { + /// timestamp of last activity + llarp_time_t lastActive; + /// the underlying message buffer + MessageBuffer _msg; + + /// for accessing message buffer + llarp_buffer_t buffer = llarp::Buffer(_msg); + + /// return true if this inbound message can be removed due to expiration + bool + IsExpired(llarp_time_t now) const; + + /// append data at ptr of size sz bytes to message buffer + /// increment current position + /// return false if we don't have enough room + /// return true on success + bool + AppendData(const byte_t* ptr, uint16_t sz); + }; + + struct Session final : public ILinkSession + { + /// remote router's rc + RouterContact remoteRC; + /// underlying socket + utp_socket* sock; + /// link layer parent + LinkLayer* parent; + /// did we get a LIM from the remote yet? + bool gotLIM; + /// remote router's transport pubkey + PubKey remoteTransportPubKey; + /// remote router's transport ip + Addr remoteAddr; + /// rx session key + SharedSecret rxKey; + /// tx session key + SharedSecret txKey; + /// timestamp last active + llarp_time_t lastActive; + /// session timeout (30s) + const static llarp_time_t sessionTimeout = 30 * 1000; + + /// send queue for utp + std::deque< utp_iovec > vecq; + /// tx fragment queue + std::deque< FragmentBuffer > sendq; + /// current rx fragment buffer + FragmentBuffer recvBuf; + /// current offset in current rx fragment buffer + size_t recvBufOffset; + /// rx fragment message body + AlignedBuffer< FragmentBodySize > rxFragBody; + + /// the next message id for tx + uint32_t m_NextTXMsgID; + /// the next message id for rx + uint32_t m_NextRXMsgID; + /// messages we are recving right now + std::unordered_map< uint32_t, InboundMessage > m_RecvMsgs; + /// are we stalled or nah? + bool stalled = false; + /// mark session as alive + void + Alive(); + + /// base + Session(LinkLayer* p); + + /// outbound + Session(LinkLayer* p, utp_socket* s, const RouterContact& rc, + const AddressInfo& addr); + + /// inbound + Session(LinkLayer* p, utp_socket* s, const Addr& remote); + + enum State + { + eInitial, // initial state + eConnecting, // we are connecting + eLinkEstablished, // when utp connection is established + eCryptoHandshake, // crypto handshake initiated + eSessionReady, // session is ready + eClose // utp connection is closed + }; + + /// get router + // llarp::Router* + // Router(); + + llarp::Crypto* + Crypto(); + + /// session state, call EnterState(State) to set + State state; + + /// hook for utp for when we have established a connection + void + OnLinkEstablished(LinkLayer* p); + + /// switch states + void + EnterState(State st); + + Session(); + ~Session(); + + /// pump tx queue + void + PumpWrite(); + + /// verify a fragment buffer and the decrypt it + /// buf is assumed to be FragmentBufferSize bytes long + bool + VerifyThenDecrypt(const byte_t* buf); + + /// encrypt a fragment then hash the ciphertext + bool + EncryptThenHash(const byte_t* ptr, uint32_t msgid, uint16_t sz, + uint16_t remain); + + /// queue a fully formed message + bool + QueueWriteBuffers(llarp_buffer_t buf); + + /// prune expired inbound messages + void + PruneInboundMessages(llarp_time_t now); + + /// do low level connect + void + Connect(); + + /// handle outbound connection made + void + OutboundLinkEstablished(LinkLayer* p); + + // send first message + void + OutboundHandshake(); + + // do key exchange for handshake + bool + DoKeyExchange(transport_dh_func dh, SharedSecret& K, + const KeyExchangeNonce& n, const PubKey& other, + const byte_t* secret); + + /// does K = HS(K + A) + bool + MutateKey(SharedSecret& K, const AlignedBuffer< 24 >& A); + + void + TickImpl(llarp_time_t now); + + /// close session + void + Close(); + + /// low level read + bool + Recv(const byte_t* buf, size_t sz); + + /// handle inbound LIM + bool + InboundLIM(const LinkIntroMessage* msg); + + /// handle outbound LIM + bool + OutboundLIM(const LinkIntroMessage* msg); + + /// return true if timed out + bool + IsTimedOut(llarp_time_t now) const; + + /// get remote identity pubkey + const PubKey& + RemotePubKey() const; + + /// get remote address + const Addr& + RemoteEndpoint() const; + + /// get remote rc + const RouterContact& + RemoteRC() const; + + /// get parent link + ILinkLayer* + GetParent(); + + void + MarkEstablished(); + }; + + struct LinkLayer final : public ILinkLayer + { + utp_context* _utp_ctx = nullptr; + llarp::Crypto* _crypto = nullptr; + + // low level read callback + static uint64 + OnRead(utp_callback_arguments* arg); + + // low level sendto callback + static uint64 + SendTo(utp_callback_arguments* arg); + + /// error callback + static uint64 + OnError(utp_callback_arguments* arg); + + /// state change callback + static uint64 + OnStateChange(utp_callback_arguments*); + + /// accept callback + static uint64 + OnAccept(utp_callback_arguments*); + + /// logger callback + static uint64 + OnLog(utp_callback_arguments* arg); + + /// construct + LinkLayer(llarp::Crypto* crypto, const byte_t* routerEncSecret, + llarp::GetRCFunc getrc, llarp::LinkMessageHandler h, + llarp::SignBufferFunc sign, + llarp::SessionEstablishedHandler established, + llarp::TimeoutHandler timeout, + llarp::SessionClosedHandler closed); + + /// destruct + ~LinkLayer(); + + /// get AI rank + uint16_t + Rank() const; + + /// handle low level recv + void + RecvFrom(const Addr& from, const void* buf, size_t sz); + +#ifdef __linux__ + /// process ICMP stuff on linux + void + ProcessICMP(); +#endif + + llarp::Crypto* + Crypto(); + + /// pump sessions + void + Pump(); + + /// stop link layer + void + Stop(); + + /// rengenerate transport keypair + bool + KeyGen(SecretKey& k); + + /// do tick + void + Tick(llarp_time_t now); + + /// create new outbound session + ILinkSession* + NewOutboundSession(const RouterContact& rc, const AddressInfo& addr); + + /// create new socket + utp_socket* + NewSocket(); + + /// get ai name + const char* + Name() const; + }; + + } // namespace utp +} // namespace llarp + +#endif diff --git a/llarp/link_intro.cpp b/llarp/link_intro.cpp index 9eafcc797..cd4a90c23 100644 --- a/llarp/link_intro.cpp +++ b/llarp/link_intro.cpp @@ -122,7 +122,8 @@ namespace llarp } bool - LinkIntroMessage::Sign(llarp::Crypto* c, const SecretKey& k) + LinkIntroMessage::Sign( + std::function< bool(Signature&, llarp_buffer_t) > signer) { Z.Zero(); byte_t tmp[MaxSize] = {0}; @@ -131,7 +132,7 @@ namespace llarp return false; buf.sz = buf.cur - buf.base; buf.cur = buf.base; - return c->sign(Z, k, buf); + return signer(Z, buf); } bool diff --git a/llarp/logic.hpp b/llarp/logic.hpp index 420dc85ea..e8982fc51 100644 --- a/llarp/logic.hpp +++ b/llarp/logic.hpp @@ -9,11 +9,10 @@ namespace llarp { class Logic { - private: + public: struct llarp_threadpool* thread; struct llarp_timer_context* timer; - public: Logic() : thread(llarp_init_same_process_threadpool()) , timer(llarp_init_timer()) diff --git a/llarp/messages/link_intro.hpp b/llarp/messages/link_intro.hpp index 7baa072d0..c26e098b9 100644 --- a/llarp/messages/link_intro.hpp +++ b/llarp/messages/link_intro.hpp @@ -40,7 +40,7 @@ namespace llarp HandleMessage(llarp::Router* router) const; bool - Sign(llarp::Crypto* c, const SecretKey& signKeySecret); + Sign(std::function< bool(Signature&, llarp_buffer_t) > signer); bool Verify(llarp::Crypto* c) const; diff --git a/llarp/router.cpp b/llarp/router.cpp index 8f2c36f66..3b092e052 100644 --- a/llarp/router.cpp +++ b/llarp/router.cpp @@ -179,10 +179,9 @@ llarp_findOrCreateEncryption(llarp::Crypto *crypto, const char *fpath, namespace llarp { void - Router::HandleLinkSessionEstablished(llarp::RouterContact rc, - llarp::ILinkLayer *link) + Router::OnSessionEstablished(llarp::ILinkSession *session) { - async_verify_RC(rc, link); + async_verify_RC(session->GetRemoteRC(), session->GetLinkLayer()); } Router::Router(struct llarp_threadpool *_tp, struct llarp_ev_loop *_netloop, @@ -591,9 +590,9 @@ namespace llarp } void - Router::OnConnectTimeout(const llarp::RouterID &remote) + Router::OnConnectTimeout(ILinkSession *session) { - auto itr = pendingEstablishJobs.find(remote); + auto itr = pendingEstablishJobs.find(session->GetPubKey()); if(itr != pendingEstablishJobs.end()) { itr->second->AttemptTimedout(); @@ -691,6 +690,12 @@ namespace llarp rpcCaller->Tick(now); } + bool + Router::Sign(llarp::Signature &sig, llarp_buffer_t buf) const + { + return crypto.sign(sig, identity, buf); + } + void Router::SendTo(llarp::RouterID remote, const llarp::ILinkMessage *msg, llarp::ILinkLayer *selected) @@ -735,7 +740,7 @@ namespace llarp } void - Router::SessionClosed(const llarp::RouterID &remote) + Router::SessionClosed(llarp::RouterID remote) { __llarp_dht_remove_peer(dht, remote); // remove from valid routers if it's a valid router @@ -1088,7 +1093,7 @@ namespace llarp if(outboundLink) return true; - auto link = llarp::utp::NewServer(this); + auto link = llarp::utp::NewServerFromRouter(this); if(!link->EnsureKeys(transport_keyfile.string().c_str())) { @@ -1189,7 +1194,7 @@ namespace llarp { if(!StrEq(key, "*")) { - auto server = llarp::utp::NewServer(self); + auto server = llarp::utp::NewServerFromRouter(self); if(!server->EnsureKeys(self->transport_keyfile.string().c_str())) { llarp::LogError("failed to ensure keyfile ", self->transport_keyfile); diff --git a/llarp/router.hpp b/llarp/router.hpp index b5ab4b0d2..aae6e00dd 100644 --- a/llarp/router.hpp +++ b/llarp/router.hpp @@ -89,6 +89,9 @@ namespace llarp llarp_threadpool *disk; llarp_dht_context *dht = nullptr; + bool + Sign(Signature &sig, llarp_buffer_t buf) const; + llarp_nodedb *nodedb; // buffer for serializing link messages @@ -188,7 +191,7 @@ namespace llarp ~Router(); void - HandleLinkSessionEstablished(llarp::RouterContact, llarp::ILinkLayer *); + OnSessionEstablished(llarp::ILinkSession *from); bool HandleRecvLinkMessageBuffer(llarp::ILinkSession *from, llarp_buffer_t msg); @@ -254,7 +257,7 @@ namespace llarp } void - OnConnectTimeout(const llarp::RouterID &remote); + OnConnectTimeout(ILinkSession *session); bool HasPendingConnectJob(const llarp::RouterID &remote); @@ -305,9 +308,9 @@ namespace llarp void FlushOutbound(); - /// called by link when a remote session is expunged + /// called by link when a remote session has no more sessions open void - SessionClosed(const llarp::RouterID &remote); + SessionClosed(RouterID remote); /// call internal router ticker void diff --git a/llarp/router_contact.cpp b/llarp/router_contact.cpp index 4a2bb873f..34156b59c 100644 --- a/llarp/router_contact.cpp +++ b/llarp/router_contact.cpp @@ -12,6 +12,8 @@ namespace llarp { + bool RouterContact::IgnoreBogons = false; + bool RouterContact::BEncode(llarp_buffer_t *buf) const { @@ -169,7 +171,7 @@ namespace llarp { for(const auto &a : addrs) { - if(IsBogon(a.ip)) + if(IsBogon(a.ip) && !IgnoreBogons) { llarp::LogError("invalid address info: ", a); return false; diff --git a/llarp/router_contact.hpp b/llarp/router_contact.hpp index 0a2384011..de05101c2 100644 --- a/llarp/router_contact.hpp +++ b/llarp/router_contact.hpp @@ -15,6 +15,9 @@ namespace llarp { struct RouterContact final : public IBEncodeMessage { + /// for unit tests + static bool IgnoreBogons; + RouterContact() : IBEncodeMessage() { Clear(); @@ -51,6 +54,14 @@ namespace llarp bool BEncode(llarp_buffer_t *buf) const override; + bool + operator==(const RouterContact &other) const + { + return addrs == other.addrs && enckey == other.enckey + && pubkey == other.pubkey && signature == other.signature + && nickname == other.nickname && last_updated == other.last_updated; + } + void Clear(); diff --git a/test/utp_unittest.cpp b/test/utp_unittest.cpp new file mode 100644 index 000000000..fa6a5c3dc --- /dev/null +++ b/test/utp_unittest.cpp @@ -0,0 +1,235 @@ +#include +#include +#include +#include +#include + +struct UTPTest : public ::testing::Test +{ + using Link_t = llarp::utp::LinkLayer; + + static constexpr uint16_t AlicePort = 5000; + static constexpr uint16_t BobPort = 6000; + + struct Context + { + Context(llarp::Crypto& c) + { + crypto = &c; + crypto->identity_keygen(signingKey); + crypto->encryption_keygen(encryptionKey); + rc.pubkey = llarp::seckey_topublic(signingKey); + rc.enckey = llarp::seckey_topublic(encryptionKey); + } + + llarp::SecretKey signingKey; + llarp::SecretKey encryptionKey; + + llarp::RouterContact rc; + + llarp::Crypto* crypto; + + bool gotLIM = false; + + const llarp::RouterContact& + GetRC() const + { + return rc; + } + + llarp::RouterID + GetRouterID() const + { + return rc.pubkey; + } + + std::unique_ptr< Link_t > link; + + bool + Start(llarp::Logic* logic, llarp_ev_loop* loop, uint16_t port) + { + if(!link->Configure(loop, "lo", AF_INET, port)) + return false; + if(!link->GenEphemeralKeys()) + return false; + rc.addrs.emplace_back(); + if(!link->GetOurAddressInfo(rc.addrs[0])) + return false; + if(!rc.Sign(crypto, signingKey)) + return false; + return link->Start(logic); + } + + void + Stop() + { + link->Stop(); + } + + void + TearDown() + { + Stop(); + link.reset(); + } + }; + + llarp::Crypto crypto; + + Context Alice; + Context Bob; + + bool success = false; + + llarp_ev_loop* netLoop; + std::unique_ptr< llarp::Logic > logic; + + UTPTest() + : crypto(llarp::Crypto::sodium{}) + , Alice(crypto) + , Bob(crypto) + , netLoop(nullptr) + { + } + + void + SetUp() + { + llarp::RouterContact::IgnoreBogons = true; + llarp_ev_loop_alloc(&netLoop); + logic.reset(new llarp::Logic()); + } + + void + TearDown() + { + Alice.TearDown(); + Bob.TearDown(); + logic.reset(); + llarp_ev_loop_free(&netLoop); + llarp::RouterContact::IgnoreBogons = false; + } + + static void + OnTimeout(void* u, uint64_t, uint64_t left) + { + if(left) + return; + static_cast< UTPTest* >(u)->Stop(); + } + + void + RunMainloop() + { + logic->call_later({5000, this, &OnTimeout}); + llarp_ev_loop_run_single_process(netLoop, logic->thread, logic.get()); + } + + void + Stop() + { + llarp_ev_loop_stop(netLoop); + } + + bool AliceGotMessage(llarp_buffer_t) + { + success = true; + return true; + } + + bool BobGotMessage(llarp_buffer_t) + { + success = true; + return true; + } +}; + +TEST_F(UTPTest, TestAliceAndBob) +{ + Alice.link.reset(new Link_t( + &crypto, Alice.encryptionKey, + [&]() -> const llarp::RouterContact& { return Alice.GetRC(); }, + [&](llarp::ILinkSession* s, llarp_buffer_t buf) -> bool { + if(Alice.gotLIM) + { + return AliceGotMessage(buf); + } + else + { + llarp::LinkIntroMessage msg; + if(!msg.BDecode(&buf)) + return false; + if(!s->GotLIM(&msg)) + return false; + Alice.gotLIM = true; + return true; + } + }, + [&](llarp::Signature& sig, llarp_buffer_t buf) -> bool { + return crypto.sign(sig, Alice.signingKey, buf); + }, + [&](llarp::ILinkSession* session) { + ASSERT_EQ(session->GetRemoteRC(), Bob.GetRC()); + llarp::DiscardMessage msg; + byte_t tmp[32] = {0}; + auto buf = llarp::StackBuffer< decltype(tmp) >(tmp); + ASSERT_TRUE(msg.BEncode(&buf)); + buf.sz = buf.cur - buf.base; + buf.cur = buf.base; + ASSERT_TRUE(session->SendMessageBuffer(buf)); + ASSERT_TRUE(session->SendMessageBuffer(buf)); + }, + [&](llarp::ILinkSession* session) { + ASSERT_FALSE(session->IsEstablished()); + Stop(); + }, + [&](llarp::RouterID router) { ASSERT_EQ(router, Bob.GetRouterID()); })); + + Bob.link.reset(new Link_t( + &crypto, Bob.encryptionKey, + [&]() -> const llarp::RouterContact& { return Bob.GetRC(); }, + [&](llarp::ILinkSession* s, llarp_buffer_t buf) -> bool { + if(Bob.gotLIM) + { + return BobGotMessage(buf); + } + else + { + llarp::LinkIntroMessage msg; + if(!msg.BDecode(&buf)) + return false; + if(!s->GotLIM(&msg)) + return false; + Bob.gotLIM = true; + return true; + } + }, + [&](llarp::Signature& sig, llarp_buffer_t buf) -> bool { + return crypto.sign(sig, Bob.signingKey, buf); + }, + [&](llarp::ILinkSession* session) { + ASSERT_EQ(session->GetRemoteRC(), Alice.GetRC()); + llarp::DiscardMessage msg; + byte_t tmp[32] = {0}; + auto buf = llarp::StackBuffer< decltype(tmp) >(tmp); + ASSERT_TRUE(msg.BEncode(&buf)); + buf.sz = buf.cur - buf.base; + buf.cur = buf.base; + ASSERT_TRUE(session->SendMessageBuffer(buf)); + ASSERT_TRUE(session->SendMessageBuffer(buf)); + }, + [&](llarp::ILinkSession* session) { + ASSERT_FALSE(session->IsEstablished()); + }, + [&](llarp::RouterID router) { ASSERT_EQ(router, Alice.GetRouterID()); })); + + ASSERT_TRUE(Alice.Start(logic.get(), netLoop, AlicePort)); + ASSERT_TRUE(Bob.Start(logic.get(), netLoop, BobPort)); + + ASSERT_TRUE(Alice.link->TryEstablishTo(Bob.GetRC())); + + RunMainloop(); + ASSERT_TRUE(Alice.gotLIM); + ASSERT_TRUE(Bob.gotLIM); + ASSERT_TRUE(success); +}