diff --git a/llarp/CMakeLists.txt b/llarp/CMakeLists.txt index 54fef01c3..fd6a36fbc 100644 --- a/llarp/CMakeLists.txt +++ b/llarp/CMakeLists.txt @@ -176,9 +176,11 @@ set(LIB_SRC handlers/null.cpp handlers/tun.cpp hook/shell.cpp - iwp/linklayer.cpp - iwp/outermessage.cpp iwp/iwp.cpp + iwp/linklayer.cpp + iwp/message_buffer.cpp + iwp/session.cpp + link/factory.cpp link/i_link_manager.cpp link/link_manager.cpp link/server.cpp diff --git a/llarp/config/config.cpp b/llarp/config/config.cpp index d8edb168c..78f5e3a94 100644 --- a/llarp/config/config.cpp +++ b/llarp/config/config.cpp @@ -12,6 +12,8 @@ #include #include +#include + #include #include #include @@ -49,6 +51,11 @@ namespace llarp void RouterConfig::fromSection(string_view key, string_view val) { + if(key == "default-protocol") + { + m_DefaultLinkProto = tostr(val); + LogInfo("overriding default link protocol to '", val, "'"); + } if(key == "netid") { if(val.size() <= NetID::size()) @@ -207,28 +214,31 @@ namespace llarp } void - IwpConfig::fromSection(string_view key, string_view val) + LinksConfig::fromSection(string_view key, string_view val) { - // try IPv4 first uint16_t proto = 0; - std::set< std::string > parsed_opts; + std::unordered_set< std::string > parsed_opts; std::string v = tostr(val); std::string::size_type idx; + static constexpr char delimiter = ','; do { - idx = v.find_first_of(','); + idx = v.find_first_of(delimiter); if(idx != std::string::npos) { - parsed_opts.insert(v.substr(0, idx)); + std::string val = v.substr(0, idx); + absl::StripAsciiWhitespace(&val); + parsed_opts.emplace(std::move(val)); v = v.substr(idx + 1); } else { - parsed_opts.insert(v); + absl::StripAsciiWhitespace(&v); + parsed_opts.insert(std::move(v)); } } while(idx != std::string::npos); - + std::unordered_set< std::string > opts; /// for each option for(const auto &item : parsed_opts) { @@ -242,15 +252,20 @@ namespace llarp proto = port; } } + else + { + opts.insert(item); + } } if(key == "*") { - m_OutboundPort = proto; + m_OutboundLink = std::make_tuple( + "*", AF_INET, fromEnv(proto, "OUTBOUND_PORT"), std::move(opts)); } else { - m_servers.emplace_back(tostr(key), AF_INET, proto); + m_InboundLinks.emplace_back(tostr(key), AF_INET, proto, std::move(opts)); } } @@ -449,7 +464,7 @@ namespace llarp connect = find_section< ConnectConfig >(parser, "connect"); netdb = find_section< NetdbConfig >(parser, "netdb"); dns = find_section< DnsConfig >(parser, "dns"); - iwp_links = find_section< IwpConfig >(parser, "bind"); + links = find_section< LinksConfig >(parser, "bind"); services = find_section< ServicesConfig >(parser, "services"); system = find_section< SystemConfig >(parser, "system"); metrics = find_section< MetricsConfig >(parser, "metrics"); diff --git a/llarp/config/config.hpp b/llarp/config/config.hpp index 328475fb8..25df1815a 100644 --- a/llarp/config/config.hpp +++ b/llarp/config/config.hpp @@ -12,6 +12,7 @@ #include #include #include +#include namespace llarp { @@ -120,6 +121,8 @@ namespace llarp int m_workerThreads = 1; int m_numNetThreads = 1; + std::string m_DefaultLinkProto = "iwp"; + public: // clang-format off size_t minConnectedRouters() const { return fromEnv(m_minConnectedRouters, "MIN_CONNECTED_ROUTERS"); } @@ -129,12 +132,13 @@ namespace llarp std::string transportKeyfile() const { return fromEnv(m_transportKeyfile, "TRANSPORT_KEYFILE"); } std::string identKeyfile() const { return fromEnv(m_identKeyfile, "IDENT_KEYFILE"); } std::string netId() const { return fromEnv(m_netId, "NETID"); } - std::string nickname() const { return fromEnv(m_nickname, "NICKNAME"); } + std::string nickname() const { return fromEnv(m_nickname, "NICKNAME"); } bool publicOverride() const { return fromEnv(m_publicOverride, "PUBLIC_OVERRIDE"); } const struct sockaddr_in& ip4addr() const { return m_ip4addr; } const AddressInfo& addrInfo() const { return m_addrInfo; } int workerThreads() const { return fromEnv(m_workerThreads, "WORKER_THREADS"); } int numNetThreads() const { return fromEnv(m_numNetThreads, "NUM_NET_THREADS"); } + std::string defaultLinkProto() const { return fromEnv(m_DefaultLinkProto, "LINK_PROTO"); } absl::optional< bool > blockBogons() const { return fromEnv(m_blockBogons, "BLOCK_BOGONS"); } // clang-format on @@ -187,20 +191,27 @@ namespace llarp fromSection(string_view key, string_view val); }; - class IwpConfig + class LinksConfig { public: - using Servers = std::vector< std::tuple< std::string, int, uint16_t > >; + static constexpr int Interface = 0; + static constexpr int AddressFamily = 1; + static constexpr int Port = 2; + static constexpr int Options = 3; - private: - uint16_t m_OutboundPort = 0; + using ServerOptions = std::unordered_set< std::string >; + using LinkInfo = std::tuple< std::string, int, uint16_t, ServerOptions >; + using Links = std::vector< LinkInfo >; - Servers m_servers; + private: + LinkInfo m_OutboundLink; + Links m_InboundLinks; public: // clang-format off - uint16_t outboundPort() const { return fromEnv(m_OutboundPort, "OUTBOUND_PORT"); } - const Servers& servers() const { return m_servers; } + const LinkInfo& outboundLink() const { return m_OutboundLink; } + + const Links& inboundLinks() const { return m_InboundLinks; } // clang-format on void @@ -299,7 +310,7 @@ namespace llarp ConnectConfig connect; NetdbConfig netdb; DnsConfig dns; - IwpConfig iwp_links; + LinksConfig links; ServicesConfig services; SystemConfig system; MetricsConfig metrics; diff --git a/llarp/ev/ev.hpp b/llarp/ev/ev.hpp index 0a4de55fa..4d351292e 100644 --- a/llarp/ev/ev.hpp +++ b/llarp/ev/ev.hpp @@ -40,6 +40,8 @@ typedef struct sockaddr_un #include #endif +struct llarp_ev_pkt_pipe; + #ifndef MAX_WRITE_QUEUE_SIZE #define MAX_WRITE_QUEUE_SIZE (1024UL) #endif @@ -772,6 +774,12 @@ struct llarp_ev_loop virtual llarp::ev_io* bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* addr) = 0; + virtual bool + add_pipe(llarp_ev_pkt_pipe*) + { + return false; + } + /// register event listener virtual bool add_ev(llarp::ev_io* ev, bool write) = 0; diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index 3b9ac4307..732770dda 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -427,6 +427,77 @@ namespace libuv } }; + struct pipe_glue : public glue + { + byte_t m_Buffer[1024 * 8]; + llarp_ev_pkt_pipe* const m_Pipe; + pipe_glue(uv_loop_t* loop, llarp_ev_pkt_pipe* pipe) : m_Pipe(pipe) + { + m_Handle.data = this; + m_Ticker.data = this; + uv_poll_init(loop, &m_Handle, m_Pipe->fd); + uv_check_init(loop, &m_Ticker); + } + + void + Tick() + { + m_Pipe->tick(); + } + + static void + OnRead(uv_poll_t* handle, int status, int) + { + if(status) + { + return; + } + pipe_glue* glue = static_cast< pipe_glue* >(handle->data); + int r = glue->m_Pipe->read(glue->m_Buffer, sizeof(glue->m_Buffer)); + if(r <= 0) + return; + const llarp_buffer_t buf{glue->m_Buffer, static_cast< size_t >(r)}; + glue->m_Pipe->OnRead(buf); + } + + static void + OnClosed(uv_handle_t* h) + { + auto* self = static_cast< pipe_glue* >(h->data); + if(self) + { + h->data = nullptr; + delete self; + } + } + + void + Close() override + { + uv_check_stop(&m_Ticker); + uv_close((uv_handle_t*)&m_Handle, &OnClosed); + } + + static void + OnTick(uv_check_t* h) + { + static_cast< pipe_glue* >(h->data)->Tick(); + } + + bool + Start() + { + if(uv_poll_start(&m_Handle, UV_READABLE, &OnRead)) + return false; + if(uv_check_start(&m_Ticker, &OnTick)) + return false; + return true; + } + + uv_poll_t m_Handle; + uv_check_t m_Ticker; + }; + struct tun_glue : public glue { uv_poll_t m_Handle; @@ -703,4 +774,14 @@ namespace libuv return false; } + bool + Loop::add_pipe(llarp_ev_pkt_pipe* p) + { + auto* glue = new pipe_glue(m_Impl.get(), p); + if(glue->Start()) + return true; + delete glue; + return false; + } + } // namespace libuv diff --git a/llarp/ev/ev_libuv.hpp b/llarp/ev/ev_libuv.hpp index b9b85d3df..3ad9a6389 100644 --- a/llarp/ev/ev_libuv.hpp +++ b/llarp/ev/ev_libuv.hpp @@ -1,6 +1,7 @@ #ifndef LLARP_EV_LIBUV_HPP #define LLARP_EV_LIBUV_HPP #include +#include #include #include #include @@ -68,6 +69,9 @@ namespace libuv bool tcp_listen(llarp_tcp_acceptor* tcp, const sockaddr* addr) override; + bool + add_pipe(llarp_ev_pkt_pipe* p) override; + llarp::ev_io* bind_tcp(llarp_tcp_acceptor*, const sockaddr*) override { diff --git a/llarp/ev/pipe.cpp b/llarp/ev/pipe.cpp index 76d141a9e..049e702ee 100644 --- a/llarp/ev/pipe.cpp +++ b/llarp/ev/pipe.cpp @@ -12,7 +12,7 @@ llarp_ev_pkt_pipe::llarp_ev_pkt_pipe(llarp_ev_loop_ptr loop) } bool -llarp_ev_pkt_pipe::Start() +llarp_ev_pkt_pipe::StartPipe() { #if defined(_WIN32) llarp::LogError("llarp_ev_pkt_pipe not supported on win32"); @@ -26,7 +26,7 @@ llarp_ev_pkt_pipe::Start() } fd = _fds[0]; writefd = _fds[1]; - return true; + return m_Loop->add_pipe(this); #endif } diff --git a/llarp/ev/pipe.hpp b/llarp/ev/pipe.hpp index 0bf1953fc..2abff3c48 100644 --- a/llarp/ev/pipe.hpp +++ b/llarp/ev/pipe.hpp @@ -10,7 +10,7 @@ struct llarp_ev_pkt_pipe : public llarp::ev_io /// start the pipe, initialize fds bool - Start(); + StartPipe(); /// write to the pipe from outside the event loop /// returns true on success diff --git a/llarp/iwp/iwp.cpp b/llarp/iwp/iwp.cpp index 457a51340..eae5384be 100644 --- a/llarp/iwp/iwp.cpp +++ b/llarp/iwp/iwp.cpp @@ -7,22 +7,26 @@ namespace llarp { namespace iwp { - std::unique_ptr< ILinkLayer > - NewServer(const SecretKey& enckey, GetRCFunc getrc, LinkMessageHandler h, - SessionEstablishedHandler est, SessionRenegotiateHandler reneg, - SignBufferFunc sign, TimeoutHandler t, - SessionClosedHandler closed) + LinkLayer_ptr + NewInboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc, + LinkMessageHandler h, SignBufferFunc sign, + SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, TimeoutHandler timeout, + SessionClosedHandler closed) { - (void)enckey; - (void)getrc; - (void)h; - (void)est; - (void)reneg; - (void)sign; - (void)t; - (void)closed; - // TODO: implement me - return nullptr; + return std::make_shared< LinkLayer >(routerEncSecret, getrc, h, sign, est, + reneg, timeout, closed, true); + } + + LinkLayer_ptr + NewOutboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc, + LinkMessageHandler h, SignBufferFunc sign, + SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, TimeoutHandler timeout, + SessionClosedHandler closed) + { + return std::make_shared< LinkLayer >(routerEncSecret, getrc, h, sign, est, + reneg, timeout, closed, false); } } // namespace iwp } // namespace llarp diff --git a/llarp/iwp/iwp.hpp b/llarp/iwp/iwp.hpp index e7a10413e..0e9eacaac 100644 --- a/llarp/iwp/iwp.hpp +++ b/llarp/iwp/iwp.hpp @@ -2,21 +2,25 @@ #define LLARP_IWP_HPP #include - +#include #include namespace llarp { - struct AbstractRouter; - namespace iwp { - std::unique_ptr< ILinkLayer > - NewServer(const SecretKey& routerEncSecret, llarp::GetRCFunc getrc, - llarp::LinkMessageHandler h, llarp::SessionEstablishedHandler est, - llarp::SessionRenegotiateHandler reneg, - llarp::SignBufferFunc sign, llarp::TimeoutHandler timeout, - llarp::SessionClosedHandler closed); + LinkLayer_ptr + NewInboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc, + LinkMessageHandler h, SignBufferFunc sign, + SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, TimeoutHandler timeout, + SessionClosedHandler closed); + LinkLayer_ptr + NewOutboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc, + LinkMessageHandler h, SignBufferFunc sign, + SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, TimeoutHandler timeout, + SessionClosedHandler closed); } // namespace iwp } // namespace llarp diff --git a/llarp/iwp/linklayer.cpp b/llarp/iwp/linklayer.cpp index 8a984d7c4..57d0c499a 100644 --- a/llarp/iwp/linklayer.cpp +++ b/llarp/iwp/linklayer.cpp @@ -1,16 +1,20 @@ #include +#include namespace llarp { namespace iwp { - LinkLayer::LinkLayer(const SecretKey& enckey, GetRCFunc getrc, - LinkMessageHandler h, SessionEstablishedHandler est, - SessionRenegotiateHandler reneg, SignBufferFunc sign, - TimeoutHandler t, SessionClosedHandler closed) - : ILinkLayer(enckey, getrc, h, sign, est, reneg, t, closed) + LinkLayer::LinkLayer(const SecretKey& routerEncSecret, GetRCFunc getrc, + LinkMessageHandler h, SignBufferFunc sign, + SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, + TimeoutHandler timeout, SessionClosedHandler closed, + bool allowInbound) + : ILinkLayer(routerEncSecret, getrc, h, sign, est, reneg, timeout, + closed) + , permitInbound{allowInbound} { - m_FlowCookie.Randomize(); } LinkLayer::~LinkLayer() = default; @@ -18,7 +22,28 @@ namespace llarp void LinkLayer::Pump() { + std::set< RouterID > sessions; + { + Lock l(&m_AuthedLinksMutex); + auto itr = m_AuthedLinks.begin(); + while(itr != m_AuthedLinks.end()) + { + sessions.insert(itr->first); + ++itr; + } + } ILinkLayer::Pump(); + { + Lock l(&m_AuthedLinksMutex); + for(const auto& pk : sessions) + { + if(m_AuthedLinks.count(pk) == 0) + { + // all sessions were removed + SessionClosed(pk); + } + } + } } const char* @@ -44,135 +69,57 @@ namespace llarp bool LinkLayer::Start(std::shared_ptr< Logic > l) { - if(!ILinkLayer::Start(l)) - return false; - return false; + return ILinkLayer::Start(l); } void LinkLayer::RecvFrom(const Addr& from, const void* pkt, size_t sz) { - m_OuterMsg.Clear(); - llarp_buffer_t sigbuf(pkt, sz); - llarp_buffer_t decodebuf(pkt, sz); - if(!m_OuterMsg.Decode(&decodebuf)) + std::shared_ptr< ILinkSession > session; + auto itr = m_AuthedAddrs.find(from); + if(itr == m_AuthedAddrs.end()) { - LogError("failed to decode outer message"); - return; + util::Lock lock(&m_PendingMutex); + if(m_Pending.count(from) == 0) + { + if(not permitInbound) + return; + m_Pending.insert({from, std::make_shared< Session >(this, from)}); + } + session = m_Pending.find(from)->second; } - NetID ourNetID; - switch(m_OuterMsg.command) + else { - case eOCMD_ObtainFlowID: - sigbuf.sz -= m_OuterMsg.Zsig.size(); - if(!CryptoManager::instance()->verify(m_OuterMsg.pubkey, sigbuf, - m_OuterMsg.Zsig)) - { - LogError("failed to verify signature on '", - (char)m_OuterMsg.command, "' message from ", from); - return; - } - if(!ShouldSendFlowID(from)) - { - SendReject(from, "no flo 4u :^)"); - return; - } - if(m_OuterMsg.netid == ourNetID) - { - if(GenFlowIDFor(m_OuterMsg.pubkey, from, m_OuterMsg.flow)) - SendFlowID(from, m_OuterMsg.flow); - else - SendReject(from, "genflow fail"); - } - else - SendReject(from, "bad netid"); + auto range = m_AuthedLinks.equal_range(itr->second); + session = range.first->second; + } + if(session) + { + const llarp_buffer_t buf{pkt, sz}; + session->Recv_LL(buf); } - } - - std::shared_ptr< ILinkSession > - LinkLayer::NewOutboundSession(const RouterContact& rc, - const AddressInfo& ai) - { - (void)rc; - (void)ai; - // TODO: implement me - return {}; - } - - void - LinkLayer::SendFlowID(const Addr& to, const FlowID_t& flow) - { - // TODO: implement me - (void)to; - (void)flow; - } - - bool - LinkLayer::VerifyFlowID(const PubKey& pk, const Addr& from, - const FlowID_t& flow) const - { - FlowID_t expected; - if(!GenFlowIDFor(pk, from, expected)) - return false; - return expected == flow; } bool - LinkLayer::GenFlowIDFor(const PubKey& pk, const Addr& from, - FlowID_t& flow) const + LinkLayer::MapAddr(const RouterID& r, ILinkSession* s) { - std::array< byte_t, 128 > tmp = {{0}}; - if(inet_ntop(AF_INET6, from.addr6(), (char*)tmp.data(), tmp.size()) - == nullptr) + if(!ILinkLayer::MapAddr(r, s)) return false; - std::copy_n(pk.begin(), pk.size(), tmp.begin() + 64); - std::copy_n(m_FlowCookie.begin(), m_FlowCookie.size(), - tmp.begin() + 64 + pk.size()); - llarp_buffer_t buf(tmp); - ShortHash h; - if(!CryptoManager::instance()->shorthash(h, buf)) - return false; - std::copy_n(h.begin(), flow.size(), flow.begin()); + m_AuthedAddrs.emplace(s->GetRemoteEndpoint(), r); return true; } - bool - LinkLayer::ShouldSendFlowID(const Addr& to) const + void + LinkLayer::UnmapAddr(const Addr& a) { - (void)to; - // TODO: implement me - return false; + m_AuthedAddrs.erase(a); } - void - LinkLayer::SendReject(const Addr& to, const char* msg) + std::shared_ptr< ILinkSession > + LinkLayer::NewOutboundSession(const RouterContact& rc, + const AddressInfo& ai) { - if(strlen(msg) > 14) - { - throw std::logic_error("reject message too big"); - } - std::array< byte_t, 120 > pkt; - auto now = Now(); - PubKey pk = GetOurRC().pubkey; - OuterMessage m; - m.CreateReject(msg, now, pk); - llarp_buffer_t encodebuf(pkt); - if(!m.Encode(&encodebuf)) - { - LogError("failed to encode reject message to ", to); - return; - } - llarp_buffer_t signbuf(pkt.data(), pkt.size() - m.Zsig.size()); - if(!Sign(m.Zsig, signbuf)) - { - LogError("failed to sign reject messsage to ", to); - return; - } - std::copy_n(m.Zsig.begin(), m.Zsig.size(), - pkt.begin() + (pkt.size() - m.Zsig.size())); - llarp_buffer_t pktbuf(pkt); - SendTo_LL(to, pktbuf); + return std::make_shared< Session >(this, rc, ai); } } // namespace iwp - } // namespace llarp diff --git a/llarp/iwp/linklayer.hpp b/llarp/iwp/linklayer.hpp index e7265188e..13ecf3ecf 100644 --- a/llarp/iwp/linklayer.hpp +++ b/llarp/iwp/linklayer.hpp @@ -6,7 +6,6 @@ #include #include #include -#include namespace llarp { @@ -14,10 +13,11 @@ namespace llarp { struct LinkLayer final : public ILinkLayer { - LinkLayer(const SecretKey &encryptionSecretKey, GetRCFunc getrc, - LinkMessageHandler h, SessionEstablishedHandler established, - SessionRenegotiateHandler reneg, SignBufferFunc sign, - TimeoutHandler timeout, SessionClosedHandler closed); + LinkLayer(const SecretKey &routerEncSecret, GetRCFunc getrc, + LinkMessageHandler h, SignBufferFunc sign, + SessionEstablishedHandler est, SessionRenegotiateHandler reneg, + TimeoutHandler timeout, SessionClosedHandler closed, + bool permitInbound); ~LinkLayer() override; @@ -40,41 +40,21 @@ namespace llarp uint16_t Rank() const override; - /// verify that a new flow id matches addresses and pubkey - bool - VerifyFlowID(const PubKey &pk, const Addr &from, - const FlowID_t &flow) const; - void RecvFrom(const Addr &from, const void *buf, size_t sz) override; - private: bool - GenFlowIDFor(const PubKey &pk, const Addr &from, FlowID_t &flow) const; - - bool - ShouldSendFlowID(const Addr &from) const; - - void - SendReject(const Addr &to, const char *msg); + MapAddr(const RouterID &pk, ILinkSession *s) override; void - SendFlowID(const Addr &to, const FlowID_t &flow); + UnmapAddr(const Addr &addr); - using ActiveFlows_t = - std::unordered_map< FlowID_t, RouterID, FlowID_t::Hash >; - - ActiveFlows_t m_ActiveFlows; - - using PendingFlows_t = std::unordered_map< Addr, FlowID_t, Addr::Hash >; - /// flows that are pending authentication - PendingFlows_t m_PendingFlows; - - /// cookie used in flow id computation - AlignedBuffer< 32 > m_FlowCookie; - - OuterMessage m_OuterMsg; + private: + std::unordered_map< Addr, RouterID, Addr::Hash > m_AuthedAddrs; + const bool permitInbound; }; + + using LinkLayer_ptr = std::shared_ptr< LinkLayer >; } // namespace iwp } // namespace llarp diff --git a/llarp/iwp/message_buffer.cpp b/llarp/iwp/message_buffer.cpp new file mode 100644 index 000000000..dc3319700 --- /dev/null +++ b/llarp/iwp/message_buffer.cpp @@ -0,0 +1,207 @@ +#include +#include + +namespace llarp +{ + namespace iwp + { + OutboundMessage::OutboundMessage(uint64_t msgid, const llarp_buffer_t &pkt, + llarp_time_t now, + ILinkSession::CompletionHandler handler) + : m_Size{(uint16_t)std::min(pkt.sz, MAX_LINK_MSG_SIZE)} + , m_MsgID{msgid} + , m_Completed{handler} + , m_StartedAt{now} + { + m_Data.Zero(); + std::copy_n(pkt.base, m_Size, m_Data.begin()); + const llarp_buffer_t buf{m_Data.data(), m_Size}; + CryptoManager::instance()->shorthash(digest, buf); + } + + std::vector< byte_t > + OutboundMessage::XMIT() const + { + std::vector< byte_t > xmit{ + LLARP_PROTO_VERSION, Command::eXMIT, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + htobe16buf(xmit.data() + 2, m_Size); + htobe64buf(xmit.data() + 4, m_MsgID); + std::copy(digest.begin(), digest.end(), std::back_inserter(xmit)); + return xmit; + } + + void + OutboundMessage::Completed() + { + if(m_Completed) + { + m_Completed(ILinkSession::DeliveryStatus::eDeliverySuccess); + } + m_Completed = nullptr; + } + + bool + OutboundMessage::ShouldFlush(llarp_time_t now) const + { + static constexpr llarp_time_t FlushInterval = 500; + return now - m_LastFlush >= FlushInterval; + } + + void + OutboundMessage::Ack(byte_t bitmask) + { + m_Acks = std::bitset< 8 >(bitmask); + } + + void + OutboundMessage::FlushUnAcked( + std::function< void(const llarp_buffer_t &) > sendpkt, llarp_time_t now) + { + uint16_t idx = 0; + while(idx < m_Size) + { + if(not m_Acks[idx / FragmentSize]) + { + std::vector< byte_t > frag{LLARP_PROTO_VERSION, + Command::eDATA, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0}; + htobe16buf(frag.data() + 2, idx); + htobe64buf(frag.data() + 4, m_MsgID); + std::copy(m_Data.begin() + idx, m_Data.begin() + idx + FragmentSize, + std::back_inserter(frag)); + const llarp_buffer_t pkt{frag}; + sendpkt(pkt); + } + idx += FragmentSize; + } + m_LastFlush = now; + } + + bool + OutboundMessage::IsTransmitted() const + { + for(uint16_t idx = 0; idx < m_Size; idx += FragmentSize) + { + if(!m_Acks.test(idx / FragmentSize)) + return false; + } + return true; + } + + bool + OutboundMessage::IsTimedOut(const llarp_time_t now) const + { + // TODO: make configurable by outbound message deliverer + return now > m_StartedAt && now - m_StartedAt > 5000; + } + + void + OutboundMessage::InformTimeout() + { + if(m_Completed) + { + m_Completed(ILinkSession::DeliveryStatus::eDeliveryDropped); + } + m_Completed = nullptr; + } + + InboundMessage::InboundMessage(uint64_t msgid, uint16_t sz, ShortHash h, + llarp_time_t now) + : m_Digset{std::move(h)} + , m_Size{sz} + , m_MsgID{msgid} + , m_LastActiveAt{now} + { + } + + void + InboundMessage::HandleData(uint16_t idx, const byte_t *ptr, + llarp_time_t now) + { + if(idx + FragmentSize > MAX_LINK_MSG_SIZE) + return; + auto *dst = m_Data.data() + idx; + std::copy_n(ptr, FragmentSize, dst); + m_Acks.set(idx / FragmentSize); + LogDebug("got fragment ", idx / FragmentSize, " of ", m_Size); + m_LastActiveAt = now; + } + + std::vector< byte_t > + InboundMessage::ACKS() const + { + std::vector< byte_t > acks{LLARP_PROTO_VERSION, + Command::eACKS, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + uint8_t{(uint8_t)m_Acks.to_ulong()}}; + + htobe64buf(acks.data() + 2, m_MsgID); + return acks; + } + + bool + InboundMessage::IsCompleted() const + { + for(uint16_t idx = 0; idx < m_Size; idx += FragmentSize) + { + if(!m_Acks.test(idx / FragmentSize)) + return false; + } + return true; + } + + bool + InboundMessage::ShouldSendACKS(llarp_time_t now) const + { + return now - m_LastACKSent > 1000 || IsCompleted(); + } + + bool + InboundMessage::IsTimedOut(const llarp_time_t now) const + { + return now > m_LastActiveAt && now - m_LastActiveAt > 5000; + } + + void + InboundMessage::SendACKS( + std::function< void(const llarp_buffer_t &) > sendpkt, llarp_time_t now) + { + auto acks = ACKS(); + const llarp_buffer_t pkt{acks}; + sendpkt(pkt); + m_LastACKSent = now; + } + + bool + InboundMessage::Verify() const + { + ShortHash gotten; + const llarp_buffer_t buf{m_Data.data(), m_Size}; + CryptoManager::instance()->shorthash(gotten, buf); + LogDebug("gotten=", gotten.ToHex()); + if(gotten != m_Digset) + { + DumpBuffer(buf); + return false; + } + return true; + } + + } // namespace iwp +} // namespace llarp \ No newline at end of file diff --git a/llarp/iwp/message_buffer.hpp b/llarp/iwp/message_buffer.hpp new file mode 100644 index 000000000..8a714f045 --- /dev/null +++ b/llarp/iwp/message_buffer.hpp @@ -0,0 +1,114 @@ +#ifndef LLARP_IWP_MESSAGE_BUFFER_HPP +#define LLARP_IWP_MESSAGE_BUFFER_HPP +#include +#include +#include +#include +#include +#include + +namespace llarp +{ + namespace iwp + { + enum Command + { + /// keep alive message + ePING = 0, + /// begin transission + eXMIT = 1, + /// fragment data + eDATA = 2, + /// acknolege fragments + eACKS = 3, + /// negative ack + eNACK = 4, + /// close session + eCLOS = 5 + }; + + static constexpr size_t FragmentSize = 1024; + + struct OutboundMessage + { + OutboundMessage() = default; + OutboundMessage(uint64_t msgid, const llarp_buffer_t &pkt, + llarp_time_t now, + ILinkSession::CompletionHandler handler); + + AlignedBuffer< MAX_LINK_MSG_SIZE > m_Data; + uint16_t m_Size = 0; + uint64_t m_MsgID = 0; + std::bitset< MAX_LINK_MSG_SIZE / FragmentSize > m_Acks; + ILinkSession::CompletionHandler m_Completed; + llarp_time_t m_LastFlush = 0; + ShortHash digest; + llarp_time_t m_StartedAt = 0; + + std::vector< byte_t > + XMIT() const; + + void + Ack(byte_t bitmask); + + void + FlushUnAcked(std::function< void(const llarp_buffer_t &) > sendpkt, + llarp_time_t now); + + bool + ShouldFlush(llarp_time_t now) const; + + void + Completed(); + + bool + IsTransmitted() const; + + bool + IsTimedOut(llarp_time_t now) const; + + void + InformTimeout(); + }; + + struct InboundMessage + { + InboundMessage() = default; + InboundMessage(uint64_t msgid, uint16_t sz, ShortHash h, + llarp_time_t now); + + AlignedBuffer< MAX_LINK_MSG_SIZE > m_Data; + ShortHash m_Digset; + uint16_t m_Size = 0; + uint64_t m_MsgID = 0; + llarp_time_t m_LastACKSent = 0; + llarp_time_t m_LastActiveAt = 0; + std::bitset< MAX_LINK_MSG_SIZE / FragmentSize > m_Acks; + + void + HandleData(uint16_t idx, const byte_t *ptr, llarp_time_t now); + + bool + IsCompleted() const; + + bool + IsTimedOut(llarp_time_t now) const; + + bool + Verify() const; + + bool + ShouldSendACKS(llarp_time_t now) const; + + void + SendACKS(std::function< void(const llarp_buffer_t &) > sendpkt, + llarp_time_t now); + + std::vector< byte_t > + ACKS() const; + }; + + } // namespace iwp +} // namespace llarp + +#endif \ No newline at end of file diff --git a/llarp/iwp/outermessage.cpp b/llarp/iwp/outermessage.cpp deleted file mode 100644 index 3d7ad2af7..000000000 --- a/llarp/iwp/outermessage.cpp +++ /dev/null @@ -1,155 +0,0 @@ -#include -#include - -namespace llarp -{ - namespace iwp - { - std::array< byte_t, 6 > OuterMessage::obtain_flow_id_magic = - std::array< byte_t, 6 >{{'n', 'e', 't', 'i', 'd', '?'}}; - - std::array< byte_t, 6 > OuterMessage::give_flow_id_magic = - std::array< byte_t, 6 >{{'n', 'e', 't', 'i', 'd', '!'}}; - - OuterMessage::OuterMessage() - { - Clear(); - } - - OuterMessage::~OuterMessage() = default; - - void - OuterMessage::Clear() - { - command = 0; - flow.Zero(); - netid.Zero(); - reject.fill(0); - N.Zero(); - X.Zero(); - Xsize = 0; - Zsig.Zero(); - Zhash.Zero(); - pubkey.Zero(); - magic.fill(0); - uinteger = 0; - A.reset(); - } - - void - OuterMessage::CreateReject(const char* msg, llarp_time_t now, - const PubKey& pk) - { - Clear(); - std::copy_n(msg, std::min(strlen(msg), reject.size()), reject.begin()); - uinteger = now; - pubkey = pk; - } - - bool - OuterMessage::Encode(llarp_buffer_t* buf) const - { - if(buf->size_left() < 2) - return false; - *buf->cur = command; - buf->cur++; - *buf->cur = '='; - buf->cur++; - switch(command) - { - case eOCMD_ObtainFlowID: - - case eOCMD_GiveFlowID: - if(!buf->write(reject.begin(), reject.end())) - return false; - if(!buf->write(give_flow_id_magic.begin(), give_flow_id_magic.end())) - return false; - if(!buf->write(flow.begin(), flow.end())) - return false; - if(!buf->write(pubkey.begin(), pubkey.end())) - return false; - return buf->write(Zsig.begin(), Zsig.end()); - default: - return false; - } - } - - bool - OuterMessage::Decode(llarp_buffer_t* buf) - { - static constexpr size_t header_size = 2; - - if(buf->size_left() < header_size) - return false; - command = *buf->cur; - ++buf->cur; - if(*buf->cur != '=') - return false; - ++buf->cur; - switch(command) - { - case eOCMD_ObtainFlowID: - if(!buf->read_into(magic.begin(), magic.end())) - return false; - if(!buf->read_into(netid.begin(), netid.end())) - return false; - if(!buf->read_uint64(uinteger)) - return false; - if(!buf->read_into(pubkey.begin(), pubkey.end())) - return false; - if(buf->size_left() <= Zsig.size()) - return false; - Xsize = buf->size_left() - Zsig.size(); - if(!buf->read_into(X.begin(), X.begin() + Xsize)) - return false; - return buf->read_into(Zsig.begin(), Zsig.end()); - case eOCMD_GiveFlowID: - if(!buf->read_into(magic.begin(), magic.end())) - return false; - if(!buf->read_into(flow.begin(), flow.end())) - return false; - if(!buf->read_into(pubkey.begin(), pubkey.end())) - return false; - buf->cur += buf->size_left() - Zsig.size(); - return buf->read_into(Zsig.begin(), Zsig.end()); - case eOCMD_Reject: - if(!buf->read_into(reject.begin(), reject.end())) - return false; - if(!buf->read_uint64(uinteger)) - return false; - if(!buf->read_into(pubkey.begin(), pubkey.end())) - return false; - buf->cur += buf->size_left() - Zsig.size(); - return buf->read_into(Zsig.begin(), Zsig.end()); - case eOCMD_SessionNegotiate: - if(!buf->read_into(flow.begin(), flow.end())) - return false; - if(!buf->read_into(pubkey.begin(), pubkey.end())) - return false; - if(!buf->read_uint64(uinteger)) - return false; - if(buf->size_left() == Zsig.size() + 32) - { - A = std::make_unique< AlignedBuffer< 32 > >(); - if(!buf->read_into(A->begin(), A->end())) - return false; - } - return buf->read_into(Zsig.begin(), Zsig.end()); - case eOCMD_TransmitData: - if(!buf->read_into(flow.begin(), flow.end())) - return false; - if(!buf->read_into(N.begin(), N.end())) - return false; - if(buf->size_left() <= Zhash.size()) - return false; - Xsize = buf->size_left() - Zhash.size(); - if(!buf->read_into(X.begin(), X.begin() + Xsize)) - return false; - return buf->read_into(Zhash.begin(), Zhash.end()); - default: - return false; - } - } - } // namespace iwp - -} // namespace llarp diff --git a/llarp/iwp/outermessage.hpp b/llarp/iwp/outermessage.hpp deleted file mode 100644 index eefc05593..000000000 --- a/llarp/iwp/outermessage.hpp +++ /dev/null @@ -1,86 +0,0 @@ -#ifndef LLARP_IWP_OUTERMESSAGE_HPP -#define LLARP_IWP_OUTERMESSAGE_HPP - -#include -#include -#include - -#include - -namespace llarp -{ - namespace iwp - { - using FlowID_t = AlignedBuffer< 32 >; - - using OuterCommand_t = byte_t; - - constexpr OuterCommand_t eOCMD_ObtainFlowID = 'O'; - constexpr OuterCommand_t eOCMD_GiveFlowID = 'G'; - constexpr OuterCommand_t eOCMD_Reject = 'R'; - constexpr OuterCommand_t eOCMD_SessionNegotiate = 'S'; - constexpr OuterCommand_t eOCMD_TransmitData = 'D'; - - using InnerCommand_t = byte_t; - - constexpr InnerCommand_t eICMD_KeepAlive = 'k'; - constexpr InnerCommand_t eICMD_KeepAliveAck = 'l'; - constexpr InnerCommand_t eICMD_Congestion = 'c'; - constexpr InnerCommand_t eICMD_AntiCongestion = 'd'; - constexpr InnerCommand_t eICMD_Transmit = 't'; - constexpr InnerCommand_t eICMD_Ack = 'a'; - constexpr InnerCommand_t eICMD_RotateKeys = 'r'; - constexpr InnerCommand_t eICMD_UpgradeProtocol = 'u'; - constexpr InnerCommand_t eICMD_VersionUpgrade = 'v'; - - struct OuterMessage - { - // required members - byte_t command; - FlowID_t flow; - - OuterMessage(); - ~OuterMessage(); - - // static members - static std::array< byte_t, 6 > obtain_flow_id_magic; - static std::array< byte_t, 6 > give_flow_id_magic; - - void - CreateReject(const char *msg, llarp_time_t now, const PubKey &pk); - - // optional members follow - std::array< byte_t, 6 > magic; - NetID netid; - // either timestamp or counter - uint64_t uinteger; - std::array< byte_t, 14 > reject; - AlignedBuffer< 24 > N; - PubKey pubkey; - - std::unique_ptr< AlignedBuffer< 32 > > A; - - static constexpr size_t ipv6_mtu = 1280; - static constexpr size_t overhead_size = 16 + 24 + 32; - static constexpr size_t payload_size = ipv6_mtu - overhead_size; - - AlignedBuffer< payload_size > X; - size_t Xsize; - ShortHash Zhash; - Signature Zsig; - - /// encode to buffer - bool - Encode(llarp_buffer_t *buf) const; - - /// decode from buffer - bool - Decode(llarp_buffer_t *buf); - - /// clear members - void - Clear(); - }; - } // namespace iwp -} // namespace llarp -#endif diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp new file mode 100644 index 000000000..00b2c868a --- /dev/null +++ b/llarp/iwp/session.cpp @@ -0,0 +1,647 @@ +#include +#include +#include +#include + +namespace llarp +{ + namespace iwp + { + static constexpr size_t PacketOverhead = HMACSIZE + TUNNONCESIZE; + + Session::Session(LinkLayer* p, RouterContact rc, AddressInfo ai) + : m_State{State::Initial} + , m_Inbound{false} + , m_Parent{p} + , m_CreatedAt{p->Now()} + , m_RemoteAddr{ai} + , m_ChosenAI{std::move(ai)} + , m_RemoteRC{std::move(rc)} + { + token.Zero(); + GotLIM = util::memFn(&Session::GotOutboundLIM, this); + } + + Session::Session(LinkLayer* p, Addr from) + : m_State{State::Initial} + , m_Inbound{true} + , m_Parent{p} + , m_CreatedAt{p->Now()} + , m_RemoteAddr{from} + { + token.Randomize(); + GotLIM = util::memFn(&Session::GotInboundLIM, this); + } + + void + Session::Send_LL(const llarp_buffer_t& pkt) + { + LogDebug("send ", pkt.sz, " to ", m_RemoteAddr); + m_Parent->SendTo_LL(m_RemoteAddr, pkt); + m_LastTX = time_now_ms(); + } + + bool + Session::GotInboundLIM(const LinkIntroMessage* msg) + { + if(msg->rc.enckey != m_RemoteOnionKey) + { + LogError("key missmatch"); + return false; + } + m_State = State::Ready; + GotLIM = util::memFn(&Session::GotRenegLIM, this); + m_RemoteRC = msg->rc; + m_Parent->MapAddr(m_RemoteRC.pubkey, this); + return m_Parent->SessionEstablished(this); + } + + bool + Session::GotOutboundLIM(const LinkIntroMessage* msg) + { + if(msg->rc.pubkey != m_RemoteRC.pubkey) + { + LogError("ident key missmatch"); + return false; + } + m_RemoteRC = msg->rc; + GotLIM = util::memFn(&Session::GotRenegLIM, this); + auto self = shared_from_this(); + SendOurLIM([self](ILinkSession::DeliveryStatus st) { + if(st == ILinkSession::DeliveryStatus::eDeliverySuccess) + { + self->m_State = State::Ready; + self->m_Parent->MapAddr(self->m_RemoteRC.pubkey, self.get()); + self->m_Parent->SessionEstablished(self.get()); + } + }); + return true; + } + + void + Session::SendOurLIM(ILinkSession::CompletionHandler h) + { + LinkIntroMessage msg; + msg.rc = m_Parent->GetOurRC(); + msg.N.Randomize(); + msg.P = 60000; + if(not msg.Sign(m_Parent->Sign)) + { + LogError("failed to sign our RC for ", m_RemoteAddr); + return; + } + AlignedBuffer< LinkIntroMessage::MaxSize > data; + llarp_buffer_t buf(data); + if(not msg.BEncode(&buf)) + { + LogError("failed to encode LIM for ", m_RemoteAddr); + } + buf.sz = buf.cur - buf.base; + buf.cur = buf.base; + if(!SendMessageBuffer(buf, h)) + { + LogError("failed to send LIM to ", m_RemoteAddr); + } + LogDebug("sent LIM to ", m_RemoteAddr); + } + + void + Session::EncryptAndSend(const llarp_buffer_t& data) + { + std::vector< byte_t > pkt; + pkt.resize(data.sz + PacketOverhead); + CryptoManager::instance()->randbytes(pkt.data(), pkt.size()); + llarp_buffer_t pktbuf(pkt); + pktbuf.base += PacketOverhead; + pktbuf.sz -= PacketOverhead; + byte_t* nonce_ptr = pkt.data() + HMACSIZE; + + CryptoManager::instance()->xchacha20_alt(pktbuf, data, m_SessionKey, + nonce_ptr); + + pktbuf.base = nonce_ptr; + pktbuf.sz = data.sz + 32; + CryptoManager::instance()->hmac(pkt.data(), pktbuf, m_SessionKey); + + pktbuf.base = pkt.data(); + pktbuf.sz = pkt.size(); + Send_LL(pktbuf); + } + + void + Session::Close() + { + if(m_State == State::Closed) + return; + const std::vector< byte_t > close_msg = {LLARP_PROTO_VERSION, + Command::eCLOS}; + const llarp_buffer_t buf(close_msg); + EncryptAndSend(buf); + if(m_State == State::Ready) + m_Parent->UnmapAddr(m_RemoteAddr); + m_State = State::Closed; + LogInfo("closing connection to ", m_RemoteAddr); + } + + bool + Session::SendMessageBuffer(const llarp_buffer_t& buf, + ILinkSession::CompletionHandler completed) + { + const auto now = m_Parent->Now(); + const auto msgid = m_TXID++; + auto& msg = + m_TXMsgs.emplace(msgid, OutboundMessage{msgid, buf, now, completed}) + .first->second; + const auto xmit = msg.XMIT(); + const llarp_buffer_t pkt{xmit}; + EncryptAndSend(pkt); + msg.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now); + LogDebug("send message ", msgid); + return true; + } + + void + Session::Pump() + { + const auto now = m_Parent->Now(); + if(m_State == State::Ready || m_State == State::LinkIntro) + { + if(ShouldPing()) + SendKeepAlive(); + for(auto& item : m_RXMsgs) + { + if(item.second.ShouldSendACKS(now)) + { + item.second.SendACKS(util::memFn(&Session::EncryptAndSend, this), + now); + } + } + for(auto& item : m_TXMsgs) + { + if(item.second.ShouldFlush(now)) + { + item.second.FlushUnAcked( + util::memFn(&Session::EncryptAndSend, this), now); + } + } + } + } + + bool + Session::GotRenegLIM(const LinkIntroMessage* lim) + { + LogDebug("renegotiate session on ", m_RemoteAddr); + return m_Parent->SessionRenegotiate(lim->rc, m_RemoteRC); + } + + bool + Session::RenegotiateSession() + { + SendOurLIM(); + return true; + } + + bool + Session::ShouldPing() const + { + if(m_State == State::Ready) + { + static constexpr llarp_time_t PingInterval = 500; + const auto now = m_Parent->Now(); + return now - m_LastTX > PingInterval; + } + return false; + } + + util::StatusObject + Session::ExtractStatus() const + { + return {{"remoteAddr", m_RemoteAddr.ToString()}, + {"remoteRC", m_RemoteRC.ExtractStatus()}}; + } + + bool + Session::TimedOut(llarp_time_t now) const + { + static constexpr llarp_time_t SessionAliveTimeout = 10000; + if(m_State == State::Ready || m_State == State::LinkIntro) + { + return now > m_LastRX && now - m_LastRX > SessionAliveTimeout; + } + return now - m_CreatedAt > SessionAliveTimeout; + } + + void + Session::Tick(llarp_time_t now) + { + // remove pending outbound messsages that timed out + // inform waiters + { + auto itr = m_TXMsgs.begin(); + while(itr != m_TXMsgs.end()) + { + if(itr->second.IsTimedOut(now)) + { + itr->second.InformTimeout(); + itr = m_TXMsgs.erase(itr); + } + else + ++itr; + } + } + { + // remove pending inbound messages that timed out + auto itr = m_RXMsgs.begin(); + while(itr != m_RXMsgs.end()) + { + if(itr->second.IsTimedOut(now)) + { + itr = m_RXMsgs.erase(itr); + } + else + ++itr; + } + } + } + + using Introduction = AlignedBuffer< 64 >; + + void + Session::GenerateAndSendIntro() + { + Introduction intro; + + TunnelNonce N; + N.Randomize(); + if(not CryptoManager::instance()->transport_dh_client( + m_SessionKey, m_ChosenAI.pubkey, + m_Parent->RouterEncryptionSecret(), N)) + { + LogError("failed to transport_dh_client on outbound session to ", + m_RemoteAddr); + return; + } + const auto pk = m_Parent->RouterEncryptionSecret().toPublic(); + std::copy_n(pk.begin(), pk.size(), intro.begin()); + std::copy(N.begin(), N.end(), intro.begin() + 32); + LogDebug("pk=", pk.ToHex(), " N=", N.ToHex(), + " remote-pk=", m_ChosenAI.pubkey.ToHex()); + std::vector< byte_t > req; + req.resize(intro.size() + (randint() % 64)); + CryptoManager::instance()->randbytes(req.data(), req.size()); + std::copy_n(intro.begin(), intro.size(), req.begin()); + const llarp_buffer_t buf(req); + Send_LL(buf); + m_State = State::Introduction; + LogDebug("sent intro to ", m_RemoteAddr); + } + + void + Session::HandleCreateSessionRequest(const llarp_buffer_t& buf) + { + std::vector< byte_t > result; + if(not DecryptMessage(buf, result)) + { + LogError("failed to decrypt session request from ", m_RemoteAddr); + return; + } + if(result.size() < token.size()) + { + LogError("bad session request size, ", result.size(), " < ", + token.size(), " from ", m_RemoteAddr); + return; + } + if(not std::equal(result.begin(), result.begin() + token.size(), + token.begin())) + { + LogError("token missmatch from ", m_RemoteAddr); + return; + } + m_LastRX = m_Parent->Now(); + m_State = State::LinkIntro; + SendOurLIM(); + } + + void + Session::HandleGotIntro(const llarp_buffer_t& buf) + { + if(buf.sz < Introduction::SIZE) + { + LogWarn("intro too small from ", m_RemoteAddr); + return; + } + TunnelNonce N; + std::copy_n(buf.base, PubKey::SIZE, m_RemoteOnionKey.begin()); + std::copy_n(buf.base + PubKey::SIZE, TunnelNonce::SIZE, N.begin()); + const PubKey pk = m_Parent->TransportSecretKey().toPublic(); + LogDebug("got intro: remote-pk=", m_RemoteOnionKey.ToHex(), + " N=", N.ToHex(), " local-pk=", pk.ToHex(), " sz=", buf.sz); + if(not CryptoManager::instance()->transport_dh_server( + m_SessionKey, m_RemoteOnionKey, m_Parent->TransportSecretKey(), N)) + { + LogError("failed to transport_dh_server on inbound intro from ", + m_RemoteAddr); + return; + } + std::vector< byte_t > reply; + reply.resize(token.size() + (randint() % 32)); + CryptoManager::instance()->randbytes(reply.data(), reply.size()); + std::copy_n(token.begin(), token.size(), reply.begin()); + const llarp_buffer_t pkt{reply}; + m_LastRX = m_Parent->Now(); + EncryptAndSend(pkt); + LogDebug("sent intro ack to ", m_RemoteAddr); + m_State = State::Introduction; + } + + void + Session::HandleGotIntroAck(const llarp_buffer_t& buf) + { + std::vector< byte_t > reply; + if(not DecryptMessage(buf, reply)) + { + LogError("intro ack decrypt failed from ", m_RemoteAddr); + return; + } + if(reply.size() < token.size()) + { + LogError("bad intro ack size ", reply.size(), " < ", token.size(), + " from ", m_RemoteAddr); + return; + } + m_LastRX = m_Parent->Now(); + std::copy_n(reply.begin(), token.size(), token.begin()); + const llarp_buffer_t pkt{token}; + EncryptAndSend(pkt); + LogDebug("sent session request to ", m_RemoteAddr); + m_State = State::LinkIntro; + } + + bool + Session::DecryptMessage(const llarp_buffer_t& buf, + std::vector< byte_t >& result) + { + if(buf.sz <= PacketOverhead) + { + LogError("packet too small ", buf.sz); + return false; + } + ShortHash H; + llarp_buffer_t curbuf(buf.base, buf.sz); + curbuf.base += ShortHash::SIZE; + curbuf.sz -= ShortHash::SIZE; + if(not CryptoManager::instance()->hmac(H.data(), curbuf, m_SessionKey)) + { + LogError("failed to caclulate keyed hash for ", m_RemoteAddr); + return false; + } + const ShortHash expected{buf.base}; + if(H != expected) + { + LogError("keyed hash missmatch ", H, " != ", expected, " from ", + m_RemoteAddr, " state=", int(m_State), " size=", buf.sz); + return false; + } + const byte_t* nonce_ptr = curbuf.base; + curbuf.base += 32; + curbuf.sz -= 32; + result.resize(buf.sz - PacketOverhead); + const llarp_buffer_t outbuf(result); + LogDebug("decrypt: ", result.size(), " bytes from ", m_RemoteAddr); + return CryptoManager::instance()->xchacha20_alt(outbuf, curbuf, + m_SessionKey, nonce_ptr); + } + + void + Session::Start() + { + if(m_Inbound) + return; + GenerateAndSendIntro(); + } + + void + Session::HandleSessionData(const llarp_buffer_t& buf) + { + std::vector< byte_t > result; + if(not DecryptMessage(buf, result)) + { + LogError("failed to decrypt session data from ", m_RemoteAddr); + return; + } + if(result.size() == token.size()) + { + /// we got a token so we return it + const llarp_buffer_t pktbuf(token); + EncryptAndSend(pktbuf); + return; + } + if(result[0] != LLARP_PROTO_VERSION) + { + LogError("protocol version missmatch ", int(result[0]), + " != ", LLARP_PROTO_VERSION); + return; + } + LogDebug("command ", int(result[1]), " from ", m_RemoteAddr); + switch(result[1]) + { + case Command::eXMIT: + HandleXMIT(std::move(result)); + return; + case Command::eDATA: + HandleDATA(std::move(result)); + return; + case Command::eACKS: + HandleACKS(std::move(result)); + return; + case Command::ePING: + HandlePING(std::move(result)); + return; + case Command::eNACK: + HandleNACK(std::move(result)); + return; + case Command::eCLOS: + HandleCLOS(std::move(result)); + return; + } + LogError("invalid command ", int(result[1])); + } + + void + Session::HandleNACK(std::vector< byte_t > data) + { + if(data.size() < 10) + { + LogError("short nack from ", m_RemoteAddr); + return; + } + uint64_t txid = bufbe64toh(data.data() + 2); + LogDebug("got nack on ", txid, " from ", m_RemoteAddr); + auto itr = m_TXMsgs.find(txid); + if(itr != m_TXMsgs.end()) + { + auto xmit = itr->second.XMIT(); + const llarp_buffer_t pkt(xmit); + EncryptAndSend(pkt); + } + m_LastRX = m_Parent->Now(); + } + + void + Session::HandleXMIT(std::vector< byte_t > data) + { + if(data.size() < 44) + { + LogError("short XMIT from ", m_RemoteAddr, " ", data.size(), " < 44"); + return; + } + uint16_t sz = bufbe16toh(data.data() + 2); + uint64_t rxid = bufbe64toh(data.data() + 4); + ShortHash h{data.data() + 12}; + LogDebug("rxid=", rxid, " sz=", sz, " h=", h.ToHex()); + auto itr = m_RXMsgs.find(rxid); + if(itr == m_RXMsgs.end()) + m_RXMsgs.emplace( + rxid, InboundMessage{rxid, sz, std::move(h), m_Parent->Now()}); + else + LogWarn("got duplicate xmit on ", rxid, " from ", m_RemoteAddr); + m_LastRX = m_Parent->Now(); + } + + void + Session::HandleDATA(std::vector< byte_t > data) + { + if(data.size() < FragmentSize + 12) + { + LogError("short DATA from ", m_RemoteAddr, " ", data.size()); + return; + } + m_LastRX = m_Parent->Now(); + uint16_t sz = bufbe16toh(data.data() + 2); + uint64_t rxid = bufbe64toh(data.data() + 4); + auto itr = m_RXMsgs.find(rxid); + if(itr == m_RXMsgs.end()) + { + LogWarn("no rxid=", rxid, " for ", m_RemoteAddr); + std::vector< byte_t > nack = { + LLARP_PROTO_VERSION, Command::eNACK, 0, 0, 0, 0, 0, 0, 0, 0}; + htobe64buf(nack.data() + 2, rxid); + const llarp_buffer_t nackbuf(nack); + EncryptAndSend(nackbuf); + return; + } + itr->second.HandleData(sz, data.data() + 12, m_Parent->Now()); + + if(itr->second.IsCompleted()) + { + itr->second.SendACKS(util::memFn(&Session::EncryptAndSend, this), + m_Parent->Now()); + if(itr->second.Verify()) + { + auto msg = std::move(itr->second); + const llarp_buffer_t buf(msg.m_Data.data(), msg.m_Size); + m_Parent->HandleMessage(this, buf); + } + else + { + LogError("hash missmatch for message ", itr->first); + } + m_RXMsgs.erase(rxid); + } + } + + void + Session::HandleACKS(std::vector< byte_t > data) + { + if(data.size() < 11) + { + LogError("short ACKS from ", m_RemoteAddr, " ", data.size(), " < 11"); + return; + } + m_LastRX = m_Parent->Now(); + uint64_t txid = bufbe64toh(data.data() + 2); + auto itr = m_TXMsgs.find(txid); + if(itr == m_TXMsgs.end()) + { + LogWarn("no txid=", txid, " for ", m_RemoteAddr); + return; + } + itr->second.Ack(data[10]); + + if(itr->second.IsTransmitted()) + { + LogDebug("sent message ", itr->first); + itr->second.Completed(); + itr = m_TXMsgs.erase(itr); + } + } + + void Session::HandleCLOS(std::vector< byte_t >) + { + LogInfo("remote closed by ", m_RemoteAddr); + Close(); + } + + void Session::HandlePING(std::vector< byte_t >) + { + m_LastRX = m_Parent->Now(); + } + + bool + Session::SendKeepAlive() + { + if(m_State == State::Ready) + { + std::vector< byte_t > ping{LLARP_PROTO_VERSION, Command::ePING}; + const llarp_buffer_t buf(ping); + EncryptAndSend(buf); + return true; + } + return false; + } + + bool + Session::IsEstablished() const + { + return m_State == State::Ready; + } + + void + Session::Recv_LL(const llarp_buffer_t& buf) + { + switch(m_State) + { + case State::Initial: + if(m_Inbound) + { + // initial data + // enter introduction phase + HandleGotIntro(buf); + } + else + { + // this case should never happen + ::abort(); + } + break; + case State::Introduction: + if(m_Inbound) + { + // we are replying to an intro ack + HandleCreateSessionRequest(buf); + } + else + { + // we got an intro ack + // send a session request + HandleGotIntroAck(buf); + } + break; + case State::LinkIntro: + default: + HandleSessionData(buf); + break; + } + } + } // namespace iwp +} // namespace llarp \ No newline at end of file diff --git a/llarp/iwp/session.hpp b/llarp/iwp/session.hpp new file mode 100644 index 000000000..b9f598bf1 --- /dev/null +++ b/llarp/iwp/session.hpp @@ -0,0 +1,189 @@ +#ifndef LLARP_IWP_SESSION_HPP +#define LLARP_IWP_SESSION_HPP + +#include +#include +#include + +namespace llarp +{ + namespace iwp + { + struct Session : public ILinkSession, + public std::enable_shared_from_this< Session > + { + /// outbound session + Session(LinkLayer* parent, RouterContact rc, AddressInfo ai); + /// inbound session + Session(LinkLayer* parent, Addr from); + + ~Session() = default; + + void + Pump() override; + + void + Tick(llarp_time_t now) override; + + bool + SendMessageBuffer(const llarp_buffer_t& buf, + CompletionHandler resultHandler) override; + + void + Send_LL(const llarp_buffer_t& pkt); + + void + EncryptAndSend(const llarp_buffer_t& data); + + void + Start() override; + + void + Close() override; + + void + Recv_LL(const llarp_buffer_t& pkt) override; + + bool + SendKeepAlive() override; + + bool + IsEstablished() const override; + + bool + TimedOut(llarp_time_t now) const override; + + PubKey + GetPubKey() const override + { + return m_RemoteRC.pubkey; + } + + Addr + GetRemoteEndpoint() const override + { + return m_RemoteAddr; + } + + RouterContact + GetRemoteRC() const override + { + return m_RemoteRC; + } + + size_t + SendQueueBacklog() const override + { + return m_TXMsgs.size(); + } + + ILinkLayer* + GetLinkLayer() const override + { + return m_Parent; + } + + bool + RenegotiateSession() override; + + bool + ShouldPing() const override; + + util::StatusObject + ExtractStatus() const override; + + private: + enum class State + { + /// we have no data recv'd + Initial, + /// we are in introduction phase + Introduction, + /// we sent our LIM + LinkIntro, + /// handshake done and LIM has been obtained + Ready, + /// we are closed now + Closed + }; + State m_State; + /// are we inbound session ? + const bool m_Inbound; + /// parent link layer + LinkLayer* const m_Parent; + const llarp_time_t m_CreatedAt; + const Addr m_RemoteAddr; + + AddressInfo m_ChosenAI; + /// remote rc + RouterContact m_RemoteRC; + /// session key + SharedSecret m_SessionKey; + /// session token + AlignedBuffer< 24 > token; + + PubKey m_RemoteOnionKey; + + llarp_time_t m_LastTX = 0; + llarp_time_t m_LastRX = 0; + + uint64_t m_TXID = 0; + + std::unordered_map< uint64_t, InboundMessage > m_RXMsgs; + std::unordered_map< uint64_t, OutboundMessage > m_TXMsgs; + + void + HandleGotIntro(const llarp_buffer_t& buf); + + void + HandleGotIntroAck(const llarp_buffer_t& buf); + + void + HandleCreateSessionRequest(const llarp_buffer_t& buf); + + void + HandleAckSession(const llarp_buffer_t& buf); + + void + HandleSessionData(const llarp_buffer_t& buf); + + bool + DecryptMessage(const llarp_buffer_t& buf, std::vector< byte_t >& result); + + void + GenerateAndSendIntro(); + + bool + GotInboundLIM(const LinkIntroMessage* msg); + + bool + GotOutboundLIM(const LinkIntroMessage* msg); + + bool + GotRenegLIM(const LinkIntroMessage* msg); + + void + SendOurLIM(ILinkSession::CompletionHandler h = nullptr); + + void + HandleXMIT(std::vector< byte_t > msg); + + void + HandleDATA(std::vector< byte_t > msg); + + void + HandleACKS(std::vector< byte_t > msg); + + void + HandleNACK(std::vector< byte_t > msg); + + void + HandlePING(std::vector< byte_t > msg); + + void + HandleCLOS(std::vector< byte_t > msg); + }; + } // namespace iwp +} // namespace llarp + +#endif \ No newline at end of file diff --git a/llarp/link/factory.cpp b/llarp/link/factory.cpp new file mode 100644 index 000000000..734e7b1ef --- /dev/null +++ b/llarp/link/factory.cpp @@ -0,0 +1,52 @@ +#include +#include +#include + +namespace llarp +{ + LinkFactory::LinkType + LinkFactory::TypeFromName(string_view str) + { + if(str == "utp") + return LinkType::eLinkUTP; + if(str == "iwp") + return LinkType::eLinkIWP; + if(str == "mempipe") + return LinkType::eLinkMempipe; + return LinkType::eLinkUnknown; + } + + std::string + LinkFactory::NameFromType(LinkFactory::LinkType tp) + { + switch(tp) + { + case LinkType::eLinkUTP: + return "utp"; + case LinkType::eLinkIWP: + return "iwp"; + case LinkType::eLinkMempipe: + return "mempipe"; + default: + return "unspec"; + } + } + + LinkFactory::Factory + LinkFactory::Obtain(LinkFactory::LinkType tp, bool permitInbound) + { + switch(tp) + { + case LinkType::eLinkUTP: + if(permitInbound) + return llarp::utp::NewInboundLink; + return llarp::utp::NewOutboundLink; + case LinkType::eLinkIWP: + if(permitInbound) + return llarp::iwp::NewInboundLink; + return llarp::iwp::NewOutboundLink; + default: + return nullptr; + } + } +} // namespace llarp \ No newline at end of file diff --git a/llarp/link/factory.hpp b/llarp/link/factory.hpp new file mode 100644 index 000000000..5acf3063e --- /dev/null +++ b/llarp/link/factory.hpp @@ -0,0 +1,43 @@ +#ifndef LLARP_LINK_FACTORY_HPP +#define LLARP_LINK_FACTORY_HPP +#include +#include + +#include + +namespace llarp +{ + /// LinkFactory is responsible for returning std::functions that create the + /// link layer types + struct LinkFactory + { + enum class LinkType + { + eLinkUTP, + eLinkIWP, + eLinkMempipe, + eLinkUnknown + }; + + using Factory = std::function< LinkLayer_ptr( + const SecretKey&, GetRCFunc, LinkMessageHandler, SignBufferFunc, + SessionEstablishedHandler, SessionRenegotiateHandler, TimeoutHandler, + SessionClosedHandler) >; + + /// get link type by name string + /// if invalid returns eLinkUnspec + static LinkType + TypeFromName(string_view name); + + /// turns a link type into a string representation + static std::string + NameFromType(LinkType t); + + /// obtain a link factory of a certain type + static Factory + Obtain(LinkType t, bool permitInbound); + }; + +} // namespace llarp + +#endif \ No newline at end of file diff --git a/llarp/link/link_manager.cpp b/llarp/link/link_manager.cpp index 8c6c5a42f..69b58dc0a 100644 --- a/llarp/link/link_manager.cpp +++ b/llarp/link/link_manager.cpp @@ -346,21 +346,20 @@ namespace llarp if(stopping) return nullptr; - for(const auto &link : inboundLinks) + for(const auto &link : outboundLinks) { if(link->HasSessionTo(remote)) { return link; } } - for(const auto &link : outboundLinks) + for(const auto &link : inboundLinks) { if(link->HasSessionTo(remote)) { return link; } } - return nullptr; } diff --git a/llarp/link/server.cpp b/llarp/link/server.cpp index 73da6dcb3..e1478fc9b 100644 --- a/llarp/link/server.cpp +++ b/llarp/link/server.cpp @@ -129,7 +129,7 @@ namespace llarp auto itr = m_AuthedLinks.begin(); while(itr != m_AuthedLinks.end()) { - if(itr->second.get() && !itr->second->TimedOut(_now)) + if(not itr->second->TimedOut(_now)) { itr->second->Pump(); ++itr; @@ -149,7 +149,7 @@ namespace llarp auto itr = m_Pending.begin(); while(itr != m_Pending.end()) { - if(itr->second.get() && !itr->second->TimedOut(_now)) + if(not itr->second->TimedOut(_now)) { itr->second->Pump(); ++itr; @@ -175,6 +175,7 @@ namespace llarp { if(m_AuthedLinks.count(pk) > MaxSessionsPerKey) { + LogWarn("too many session for ", pk); s->Close(); return false; } diff --git a/llarp/link/server.hpp b/llarp/link/server.hpp index 74f3cf3cf..be25c4fdc 100644 --- a/llarp/link/server.hpp +++ b/llarp/link/server.hpp @@ -103,7 +103,7 @@ namespace llarp llarp_ev_udp_sendto(&m_udp, to, pkt); } - bool + virtual bool Configure(llarp_ev_loop_ptr loop, const std::string& ifname, int af, uint16_t port); @@ -125,7 +125,7 @@ namespace llarp virtual bool Start(std::shared_ptr< llarp::Logic > l); - void + virtual void Stop(); virtual const char* @@ -140,11 +140,11 @@ namespace llarp void KeepAliveSessionTo(const RouterID& remote); - bool + virtual bool SendTo(const RouterID& remote, const llarp_buffer_t& buf, ILinkSession::CompletionHandler completed); - bool + virtual bool GetOurAddressInfo(AddressInfo& addr) const; bool @@ -186,7 +186,7 @@ namespace llarp bool GenEphemeralKeys(); - bool + virtual bool MapAddr(const RouterID& pk, ILinkSession* s); void @@ -200,6 +200,12 @@ namespace llarp SessionClosedHandler SessionClosed; SessionRenegotiateHandler SessionRenegotiate; + std::shared_ptr< Logic > + logic() + { + return m_Logic; + } + bool operator<(const ILinkLayer& other) const { diff --git a/llarp/link/session.hpp b/llarp/link/session.hpp index 39dc1ae96..73a6efe3e 100644 --- a/llarp/link/session.hpp +++ b/llarp/link/session.hpp @@ -27,7 +27,7 @@ namespace llarp /// hook for utp for when we have established a connection virtual void - OnLinkEstablished(ILinkLayer *p) = 0; + OnLinkEstablished(ILinkLayer *){}; /// called every event loop tick virtual void @@ -50,6 +50,13 @@ namespace llarp virtual void Close() = 0; + /// recv packet on low layer + /// not used by utp + virtual void + Recv_LL(const llarp_buffer_t &) + { + } + /// send a keepalive to the remote endpoint virtual bool SendKeepAlive() = 0; diff --git a/llarp/router/outbound_message_handler.cpp b/llarp/router/outbound_message_handler.cpp index d5102a2ed..99782de55 100644 --- a/llarp/router/outbound_message_handler.cpp +++ b/llarp/router/outbound_message_handler.cpp @@ -170,18 +170,13 @@ namespace llarp { const llarp_buffer_t buf(msg.first); auto callback = msg.second; - if(!_linkManager->SendTo( - remote, buf, [=](ILinkSession::DeliveryStatus status) { - if(status == ILinkSession::DeliveryStatus::eDeliverySuccess) - DoCallback(callback, SendStatus::Success); - else - DoCallback(callback, SendStatus::Congestion); - })) - { - DoCallback(callback, SendStatus::Congestion); - return false; - } - return true; + return _linkManager->SendTo( + remote, buf, [=](ILinkSession::DeliveryStatus status) { + if(status == ILinkSession::DeliveryStatus::eDeliverySuccess) + DoCallback(callback, SendStatus::Success); + else + DoCallback(callback, SendStatus::Congestion); + }); } bool diff --git a/llarp/router/outbound_session_maker.cpp b/llarp/router/outbound_session_maker.cpp index efdc73b1d..fd64990ce 100644 --- a/llarp/router/outbound_session_maker.cpp +++ b/llarp/router/outbound_session_maker.cpp @@ -226,6 +226,8 @@ namespace llarp bool OutboundSessionMaker::ShouldConnectTo(const RouterID &router) const { + if(router == us) + return false; size_t numPending = 0; { util::Lock lock(&_mutex); diff --git a/llarp/router/outbound_session_maker.hpp b/llarp/router/outbound_session_maker.hpp index f14352651..9be7f47c5 100644 --- a/llarp/router/outbound_session_maker.hpp +++ b/llarp/router/outbound_session_maker.hpp @@ -61,6 +61,12 @@ namespace llarp std::shared_ptr< Logic > logic, llarp_nodedb *nodedb, std::shared_ptr< llarp::thread::ThreadPool > threadpool); + void + SetOurRouter(RouterID r) + { + us = std::move(r); + } + /// always maintain this many connections to other routers size_t minConnectedRouters = 4; /// hard upperbound limit on the number of router to router connections @@ -108,6 +114,7 @@ namespace llarp std::shared_ptr< Logic > _logic; llarp_nodedb *_nodedb; std::shared_ptr< llarp::thread::ThreadPool > _threadpool; + RouterID us; }; } // namespace llarp diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index e4fef1523..61e515db5 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -368,9 +368,17 @@ namespace llarp // reset netid in our rc _rc.netID = llarp::NetID(); } + const auto linktypename = conf->router.defaultLinkProto(); + _defaultLinkType = LinkFactory::TypeFromName(linktypename); + if(_defaultLinkType == LinkFactory::LinkType::eLinkUnknown) + { + LogError("failed to set link type to '", linktypename, + "' as that is invalid"); + return false; + } // IWP config - m_OutboundPort = conf->iwp_links.outboundPort(); + m_OutboundPort = std::get< LinksConfig::Port >(conf->links.outboundLink()); // Router config _rc.SetNick(conf->router.nickname()); _outboundSessionMaker.maxConnectedRouters = @@ -398,7 +406,7 @@ namespace llarp lokidRPCPassword = conf->lokid.lokidRPCPassword; // TODO: add config flag for "is service node" - if(conf->iwp_links.servers().size()) + if(conf->links.inboundLinks().size()) { m_isServiceNode = true; } @@ -486,15 +494,34 @@ namespace llarp } // create inbound links, if we are a service node - for(const auto &serverConfig : conf->iwp_links.servers()) + for(const auto &serverConfig : conf->links.inboundLinks()) { - auto server = llarp::utp::NewInboundLink( + // get default factory + auto inboundLinkFactory = LinkFactory::Obtain(_defaultLinkType, true); + // for each option if provided ... + for(const auto &opt : std::get< LinksConfig::Options >(serverConfig)) + { + // try interpreting it as a link type + const auto linktype = LinkFactory::TypeFromName(opt); + if(linktype != LinkFactory::LinkType::eLinkUnknown) + { + // override link factory if it's a valid link type + auto factory = LinkFactory::Obtain(linktype, true); + if(factory) + { + inboundLinkFactory = std::move(factory); + break; + } + } + } + + auto server = inboundLinkFactory( encryption(), util::memFn(&AbstractRouter::rc, this), util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this), + util::memFn(&AbstractRouter::Sign, this), util::memFn(&IOutboundSessionMaker::OnSessionEstablished, &_outboundSessionMaker), util::memFn(&AbstractRouter::CheckRenegotiateValid, this), - util::memFn(&AbstractRouter::Sign, this), util::memFn(&IOutboundSessionMaker::OnConnectTimeout, &_outboundSessionMaker), util::memFn(&AbstractRouter::SessionClosed, this)); @@ -505,9 +532,9 @@ namespace llarp return false; } - const auto &key = std::get< 0 >(serverConfig); - int af = std::get< 1 >(serverConfig); - uint16_t port = std::get< 2 >(serverConfig); + const auto &key = std::get< LinksConfig::Interface >(serverConfig); + int af = std::get< LinksConfig::AddressFamily >(serverConfig); + uint16_t port = std::get< LinksConfig::Port >(serverConfig); if(!server->Configure(netloop(), key, af, port)) { LogError("failed to bind inbound link on ", key, " port ", port); @@ -884,7 +911,7 @@ namespace llarp LogError("failed to save RC"); return false; } - + _outboundSessionMaker.SetOurRouter(pubkey()); if(!_linkManager.StartLinks(_logic)) { LogWarn("One or more links failed to start."); @@ -1072,48 +1099,44 @@ namespace llarp bool Router::InitOutboundLinks() { - using LinkFactory = std::function< LinkLayer_ptr( - const SecretKey &, GetRCFunc, LinkMessageHandler, - SessionEstablishedHandler, SessionRenegotiateHandler, SignBufferFunc, - TimeoutHandler, SessionClosedHandler) >; + const auto linkTypeName = LinkFactory::NameFromType(_defaultLinkType); + LogInfo("initialize outbound link: ", linkTypeName); + auto factory = LinkFactory::Obtain(_defaultLinkType, false); + if(factory == nullptr) + { + LogError("cannot initialize outbound link of type '", linkTypeName, + "' as it has no implementation"); + return false; + } + auto link = + factory(encryption(), util::memFn(&AbstractRouter::rc, this), + util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this), + util::memFn(&AbstractRouter::Sign, this), + util::memFn(&IOutboundSessionMaker::OnSessionEstablished, + &_outboundSessionMaker), + util::memFn(&AbstractRouter::CheckRenegotiateValid, this), + util::memFn(&IOutboundSessionMaker::OnConnectTimeout, + &_outboundSessionMaker), + util::memFn(&AbstractRouter::SessionClosed, this)); + + if(!link) + return false; + if(!link->EnsureKeys(transport_keyfile.string().c_str())) + { + LogError("failed to load ", transport_keyfile); + return false; + } - static std::list< LinkFactory > linkFactories = {utp::NewOutboundLink, - iwp::NewServer}; + const auto afs = {AF_INET, AF_INET6}; - bool addedAtLeastOne = false; - for(const auto &factory : linkFactories) + for(const auto af : afs) { - auto link = factory( - encryption(), util::memFn(&AbstractRouter::rc, this), - util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this), - util::memFn(&IOutboundSessionMaker::OnSessionEstablished, - &_outboundSessionMaker), - util::memFn(&AbstractRouter::CheckRenegotiateValid, this), - util::memFn(&AbstractRouter::Sign, this), - util::memFn(&IOutboundSessionMaker::OnConnectTimeout, - &_outboundSessionMaker), - util::memFn(&AbstractRouter::SessionClosed, this)); - - if(!link) + if(!link->Configure(netloop(), "*", af, m_OutboundPort)) continue; - if(!link->EnsureKeys(transport_keyfile.string().c_str())) - { - LogError("failed to load ", transport_keyfile); - continue; - } - - const auto afs = {AF_INET, AF_INET6}; - - for(const auto af : afs) - { - if(!link->Configure(netloop(), "*", af, m_OutboundPort)) - continue; - _linkManager.AddLink(std::move(link), false); - addedAtLeastOne = true; - break; - } + _linkManager.AddLink(std::move(link), false); + return true; } - return addedAtLeastOne; + return false; } bool diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index 27994a573..27a83b1c5 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -175,6 +176,8 @@ namespace llarp struct sockaddr_in ip4addr; AddressInfo addrInfo; + LinkFactory::LinkType _defaultLinkType; + llarp_ev_loop_ptr _netloop; std::shared_ptr< llarp::thread::ThreadPool > cryptoworker; std::shared_ptr< Logic > _logic; diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 67e300995..2b084b637 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -509,7 +509,7 @@ namespace llarp { auto msg = std::make_shared< routing::DHTMessage >(); msg->M.emplace_back( - std::make_unique< dht::PublishIntroMessage >(m_IntroSet, txid, 1)); + std::make_unique< dht::PublishIntroMessage >(m_IntroSet, txid, 5)); return msg; } diff --git a/llarp/utp/utp.cpp b/llarp/utp/utp.cpp index 01d47502d..911fdc309 100644 --- a/llarp/utp/utp.cpp +++ b/llarp/utp/utp.cpp @@ -10,9 +10,10 @@ namespace llarp { LinkLayer_ptr NewOutboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc, - LinkMessageHandler h, SessionEstablishedHandler est, - SessionRenegotiateHandler reneg, SignBufferFunc sign, - TimeoutHandler timeout, SessionClosedHandler closed) + LinkMessageHandler h, SignBufferFunc sign, + SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, TimeoutHandler timeout, + SessionClosedHandler closed) { return std::make_shared< LinkLayer >(routerEncSecret, getrc, h, sign, est, reneg, timeout, closed, false); @@ -20,9 +21,10 @@ namespace llarp LinkLayer_ptr NewInboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc, - LinkMessageHandler h, SessionEstablishedHandler est, - SessionRenegotiateHandler reneg, SignBufferFunc sign, - TimeoutHandler timeout, SessionClosedHandler closed) + LinkMessageHandler h, SignBufferFunc sign, + SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, TimeoutHandler timeout, + SessionClosedHandler closed) { return std::make_shared< LinkLayer >(routerEncSecret, getrc, h, sign, est, reneg, timeout, closed, true); diff --git a/llarp/utp/utp.hpp b/llarp/utp/utp.hpp index 10857b796..d368065e2 100644 --- a/llarp/utp/utp.hpp +++ b/llarp/utp/utp.hpp @@ -6,20 +6,20 @@ namespace llarp { - struct AbstractRouter; - namespace utp { LinkLayer_ptr NewInboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc, - LinkMessageHandler h, SessionEstablishedHandler est, - SessionRenegotiateHandler reneg, SignBufferFunc sign, - TimeoutHandler timeout, SessionClosedHandler closed); + LinkMessageHandler h, SignBufferFunc sign, + SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, TimeoutHandler timeout, + SessionClosedHandler closed); LinkLayer_ptr NewOutboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc, - LinkMessageHandler h, SessionEstablishedHandler est, - SessionRenegotiateHandler reneg, SignBufferFunc sign, - TimeoutHandler timeout, SessionClosedHandler closed); + LinkMessageHandler h, SignBufferFunc sign, + SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, TimeoutHandler timeout, + SessionClosedHandler closed); /// shim const auto NewServer = NewInboundLink; } // namespace utp diff --git a/test/config/test_llarp_config_config.cpp b/test/config/test_llarp_config_config.cpp index 55eadeb3c..a31a7ed79 100644 --- a/test/config/test_llarp_config_config.cpp +++ b/test/config/test_llarp_config_config.cpp @@ -102,10 +102,10 @@ metric-tank-host=52.80.56.123:2003 ASSERT_FALSE(config.metrics.disableMetrics); { - using kv = IwpConfig::Servers::value_type; + using kv = LinksConfig::Links::value_type; - ASSERT_THAT(config.iwp_links.servers(), - UnorderedElementsAre(kv("eth0", AF_INET, 5501))); + ASSERT_THAT(config.links.inboundLinks(), + UnorderedElementsAre(kv("eth0", AF_INET, 5501, {}))); } ASSERT_THAT(config.bootstrap.routers, diff --git a/test/link/test_llarp_link.cpp b/test/link/test_llarp_link.cpp index d1a3eb01a..958e85da8 100644 --- a/test/link/test_llarp_link.cpp +++ b/test/link/test_llarp_link.cpp @@ -1,11 +1,13 @@ -#include +#include #include #include #include +#include #include #include #include + #include #include @@ -13,7 +15,7 @@ using namespace ::llarp; using namespace ::testing; -struct LinkLayerTest : public test::LlarpTest< NoOpCrypto > +struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium > { static constexpr uint16_t AlicePort = 5000; static constexpr uint16_t BobPort = 6000; @@ -167,12 +169,12 @@ struct LinkLayerTest : public test::LlarpTest< NoOpCrypto > } }; -TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob) +TEST_F(LinkLayerTest, TestIWP) { #ifdef WIN32 - GTEST_SKIP(); + GTEST_SKIP(); #else - Alice.link = utp::NewServer( + Alice.link = iwp::NewInboundLink( Alice.encryptionKey, [&]() -> const RouterContact& { return Alice.GetRC(); }, [&](ILinkSession* s, const llarp_buffer_t& buf) -> bool { @@ -193,14 +195,108 @@ TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob) return true; } }, + [&](Signature& sig, const llarp_buffer_t& buf) -> bool { + return m_crypto.sign(sig, Alice.signingKey, buf); + }, [&](ILinkSession* s) -> bool { const auto rc = s->GetRemoteRC(); return rc.pubkey == Bob.GetRC().pubkey; }, [&](RouterContact, RouterContact) -> bool { return true; }, + + [&](ILinkSession* session) { + ASSERT_FALSE(session->IsEstablished()); + Stop(); + }, + [&](RouterID router) { ASSERT_EQ(router, Bob.GetRouterID()); }); + + auto sendDiscardMessage = [](ILinkSession* s) -> bool { + // send discard message in reply to complete unit test + std::array< byte_t, 32 > tmp; + llarp_buffer_t otherBuf(tmp); + DiscardMessage discard; + if(!discard.BEncode(&otherBuf)) + return false; + otherBuf.sz = otherBuf.cur - otherBuf.base; + otherBuf.cur = otherBuf.base; + return s->SendMessageBuffer(otherBuf, nullptr); + }; + + Bob.link = iwp::NewInboundLink( + Bob.encryptionKey, [&]() -> const RouterContact& { return Bob.GetRC(); }, + [&](ILinkSession* s, const llarp_buffer_t& buf) -> bool { + LinkIntroMessage msg; + ManagedBuffer copy{buf}; + if(!msg.BDecode(©.underlying)) + return false; + if(!s->GotLIM(&msg)) + return false; + Bob.gotLIM = true; + return sendDiscardMessage(s); + }, + + [&](Signature& sig, const llarp_buffer_t& buf) -> bool { + return m_crypto.sign(sig, Bob.signingKey, buf); + }, + [&](ILinkSession* s) -> bool { + if(s->GetRemoteRC().pubkey != Alice.GetRC().pubkey) + return false; + LogInfo("bob established with alice"); + return Bob.link->VisitSessionByPubkey(Alice.GetRC().pubkey.as_array(), + sendDiscardMessage); + }, + [&](RouterContact newrc, RouterContact oldrc) -> bool { + success = newrc.pubkey == oldrc.pubkey; + return true; + }, + [&](ILinkSession* session) { ASSERT_FALSE(session->IsEstablished()); }, + [&](RouterID router) { ASSERT_EQ(router, Alice.GetRouterID()); }); + + ASSERT_TRUE(Alice.Start(m_logic, netLoop, AlicePort)); + ASSERT_TRUE(Bob.Start(m_logic, netLoop, BobPort)); + + ASSERT_TRUE(Alice.link->TryEstablishTo(Bob.GetRC())); + + RunMainloop(); + ASSERT_TRUE(Bob.gotLIM); +#endif +}; + +TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob) +{ +#ifdef WIN32 + GTEST_SKIP(); +#else + Alice.link = utp::NewServer( + Alice.encryptionKey, + [&]() -> const RouterContact& { return Alice.GetRC(); }, + [&](ILinkSession* s, const llarp_buffer_t& buf) -> bool { + if(Alice.gotLIM) + { + Alice.Regen(); + return s->RenegotiateSession(); + } + else + { + LinkIntroMessage msg; + ManagedBuffer copy{buf}; + if(!msg.BDecode(©.underlying)) + return false; + if(!s->GotLIM(&msg)) + return false; + Alice.gotLIM = true; + return true; + } + }, [&](Signature& sig, const llarp_buffer_t& buf) -> bool { return m_crypto.sign(sig, Alice.signingKey, buf); }, + [&](ILinkSession* s) -> bool { + const auto rc = s->GetRemoteRC(); + return rc.pubkey == Bob.GetRC().pubkey; + }, + [&](RouterContact, RouterContact) -> bool { return true; }, + [&](ILinkSession* session) { ASSERT_FALSE(session->IsEstablished()); Stop(); @@ -231,6 +327,10 @@ TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob) Bob.gotLIM = true; return sendDiscardMessage(s); }, + + [&](Signature& sig, const llarp_buffer_t& buf) -> bool { + return m_crypto.sign(sig, Bob.signingKey, buf); + }, [&](ILinkSession* s) -> bool { if(s->GetRemoteRC().pubkey != Alice.GetRC().pubkey) return false; @@ -242,9 +342,6 @@ TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob) success = newrc.pubkey == oldrc.pubkey; return true; }, - [&](Signature& sig, const llarp_buffer_t& buf) -> bool { - return m_crypto.sign(sig, Bob.signingKey, buf); - }, [&](ILinkSession* session) { ASSERT_FALSE(session->IsEstablished()); }, [&](RouterID router) { ASSERT_EQ(router, Alice.GetRouterID()); }); @@ -280,6 +377,9 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob) return false; } return AliceGotMessage(buf); + }, + [&](Signature& sig, const llarp_buffer_t& buf) -> bool { + return m_crypto.sign(sig, Alice.signingKey, buf); }, [&](ILinkSession* s) -> bool { if(s->GetRemoteRC().pubkey != Bob.GetRC().pubkey) @@ -288,9 +388,7 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob) return true; }, [&](RouterContact, RouterContact) -> bool { return true; }, - [&](Signature& sig, const llarp_buffer_t& buf) -> bool { - return m_crypto.sign(sig, Alice.signingKey, buf); - }, + [&](ILinkSession* session) { ASSERT_FALSE(session->IsEstablished()); Stop(); @@ -312,6 +410,9 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob) return false; } return true; + }, + [&](Signature& sig, const llarp_buffer_t& buf) -> bool { + return m_crypto.sign(sig, Bob.signingKey, buf); }, [&](ILinkSession* s) -> bool { if(s->GetRemoteRC().pubkey != Alice.GetRC().pubkey) @@ -332,9 +433,6 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob) return true; }, [&](RouterContact, RouterContact) -> bool { return true; }, - [&](Signature& sig, const llarp_buffer_t& buf) -> bool { - return m_crypto.sign(sig, Bob.signingKey, buf); - }, [&](ILinkSession* session) { ASSERT_FALSE(session->IsEstablished()); }, [&](RouterID router) { ASSERT_EQ(router, Alice.GetRouterID()); });