From 5dfcd60df1da983e7bc817d68e906ed357de5427 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 6 Sep 2018 16:31:58 -0400 Subject: [PATCH] more --- Makefile | 2 +- include/llarp/dht/context.hpp | 2 +- include/llarp/ev.h | 2 + include/llarp/link/server.hpp | 12 +- include/llarp/link_layer.hpp | 1 + include/llarp/messages.hpp | 1 + include/llarp/messages/discard.hpp | 49 ++++ llarp/dht/context.cpp | 14 +- llarp/ev.hpp | 10 + llarp/ev_epoll.hpp | 2 - llarp/ev_kqueue.hpp | 2 - llarp/ev_win32.hpp | 2 - llarp/link/server.cpp | 40 +-- llarp/link/utp.cpp | 399 ++++++++++++++++++++--------- llarp/link_layer.hpp | 5 - llarp/link_message.cpp | 3 + llarp/nodedb.cpp | 12 +- llarp/router.cpp | 17 +- llarp/router.hpp | 3 - 19 files changed, 401 insertions(+), 177 deletions(-) create mode 100644 include/llarp/messages/discard.hpp diff --git a/Makefile b/Makefile index 2b8c6a5c2..ab5f66eea 100644 --- a/Makefile +++ b/Makefile @@ -97,7 +97,7 @@ shared: shared-configure testnet: cp $(EXE) $(TESTNET_EXE) mkdir -p $(TESTNET_ROOT) - python3 contrib/testnet/genconf.py --bin=$(TESTNET_EXE) --svc=$(TESTNET_SERVERS) --clients=$(TESTNET_CLIENTS) --dir=$(TESTNET_ROOT) --out $(TESTNET_CONF) + python3 contrib/testnet/genconf.py --bin=$(TESTNET_EXE) --svc=$(TESTNET_SERVERS) --clients=$(TESTNET_CLIENTS) --dir=$(TESTNET_ROOT) --out $(TESTNET_CONF) --ifname=tun11 LLARP_DEBUG=$(TESTNET_DEBUG) supervisord -n -d $(TESTNET_ROOT) -l $(TESTNET_LOG) -c $(TESTNET_CONF) test: debug diff --git a/include/llarp/dht/context.hpp b/include/llarp/dht/context.hpp index 82d12173b..01f3e813e 100644 --- a/include/llarp/dht/context.hpp +++ b/include/llarp/dht/context.hpp @@ -218,7 +218,7 @@ namespace llarp /// explore dht for new routers void - Explore(); + Explore(size_t N = 3); llarp_router* router = nullptr; // for router contacts diff --git a/include/llarp/ev.h b/include/llarp/ev.h index 42e416c2d..80588e757 100644 --- a/include/llarp/ev.h +++ b/include/llarp/ev.h @@ -52,6 +52,8 @@ llarp_ev_loop_stop(struct llarp_ev_loop *ev); /// UDP handling configuration struct llarp_udp_io { + /// set after added + int fd; void *user; void *impl; struct llarp_ev_loop *parent; diff --git a/include/llarp/link/server.hpp b/include/llarp/link/server.hpp index a3e8aff2b..34f7c0d75 100644 --- a/include/llarp/link/server.hpp +++ b/include/llarp/link/server.hpp @@ -76,6 +76,9 @@ namespace llarp bool GetOurAddressInfo(AddressInfo& addr) const; + void + RemoveSessionVia(const Addr& addr); + virtual uint16_t Rank() const = 0; @@ -98,6 +101,11 @@ namespace llarp m_Links.insert(std::make_pair(pk, addr)); } + virtual void + Tick(llarp_time_t now) + { + } + private: static void on_timer_tick(void* user, uint64_t orig, uint64_t left) @@ -105,11 +113,11 @@ namespace llarp // timer cancelled if(left) return; - static_cast< ILinkLayer* >(user)->Tick(orig, llarp_time_now_ms()); + static_cast< ILinkLayer* >(user)->OnTick(orig, llarp_time_now_ms()); } void - Tick(uint64_t interval, llarp_time_t now); + OnTick(uint64_t interval, llarp_time_t now); void ScheduleTick(uint64_t interval); diff --git a/include/llarp/link_layer.hpp b/include/llarp/link_layer.hpp index a7948271c..00d0965a5 100644 --- a/include/llarp/link_layer.hpp +++ b/include/llarp/link_layer.hpp @@ -2,4 +2,5 @@ #define LLARP_LINK_LAYER_HPP #include #include +constexpr size_t MAX_LINK_MSG_SIZE = 8192; #endif diff --git a/include/llarp/messages.hpp b/include/llarp/messages.hpp index 10128b4cb..b953d19b2 100644 --- a/include/llarp/messages.hpp +++ b/include/llarp/messages.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #endif diff --git a/include/llarp/messages/discard.hpp b/include/llarp/messages/discard.hpp new file mode 100644 index 000000000..9a49daaff --- /dev/null +++ b/include/llarp/messages/discard.hpp @@ -0,0 +1,49 @@ +#ifndef LLARP_MESSAGES_DISCARD_HPP +#define LLARP_MESSAGES_DISCARD_HPP +#include +#include +namespace llarp +{ + struct DiscardMessage : public ILinkMessage + { + /// who did this message come from or is going to + + DiscardMessage() : ILinkMessage(nullptr) + { + } + + DiscardMessage(ILinkSession* from) : ILinkMessage(from) + { + } + + ~DiscardMessage() + { + } + + bool + BEncode(llarp_buffer_t* buf) const + { + if(!bencode_start_dict(buf)) + return false; + if(!bencode_write_bytestring(buf, "a", 1)) + return false; + if(!bencode_write_bytestring(buf, "x", 1)) + return false; + return bencode_end(buf); + } + + bool + DecodeKey(llarp_buffer_t key, llarp_buffer_t* buf) + { + return false; + } + + bool + HandleMessage(llarp_router* router) const + { + return true; + } + }; +} // namespace llarp + +#endif diff --git a/llarp/dht/context.cpp b/llarp/dht/context.cpp index ff68a3dee..e94bc2948 100644 --- a/llarp/dht/context.cpp +++ b/llarp/dht/context.cpp @@ -24,13 +24,13 @@ namespace llarp } void - Context::Explore() + Context::Explore(size_t N) { // ask N random peers for new routers llarp::LogInfo("Exploring network"); std::set< Key_t > peers; - if(nodes->GetManyRandom(peers, 3)) + if(nodes->GetManyRandom(peers, N)) { for(const auto &peer : peers) ExploreNetworkVia(peer); @@ -626,8 +626,9 @@ namespace llarp void Start(const TXOwner &peer) { - FindRouterMessage msg(parent->OurKey(), target, peer.txid); - parent->DHTSendTo(peer.node, &msg); + parent->DHTSendTo( + peer.node, + new FindRouterMessage(parent->OurKey(), target, peer.txid)); } void @@ -645,8 +646,9 @@ namespace llarp } else { - GotRouterMessage msg({}, whoasked.txid, valuesFound, false); - parent->DHTSendTo(whoasked.node, &msg); + parent->DHTSendTo( + whoasked.node, + new GotRouterMessage({}, whoasked.txid, valuesFound, false)); } } }; diff --git a/llarp/ev.hpp b/llarp/ev.hpp index 5e45facf3..277ca8be5 100644 --- a/llarp/ev.hpp +++ b/llarp/ev.hpp @@ -13,6 +13,10 @@ #define MAX_WRITE_QUEUE_SIZE 1024 #endif +#ifndef EV_READ_BUF_SZ +#define EV_READ_BUF_SZ (4 * 1024) +#endif + namespace llarp { struct ev_io @@ -135,6 +139,8 @@ namespace llarp struct llarp_ev_loop { + byte_t readbuf[EV_READ_BUF_SZ]; + virtual bool init() = 0; virtual int @@ -150,6 +156,10 @@ struct llarp_ev_loop udp_listen(llarp_udp_io* l, const sockaddr* src) { auto ev = create_udp(l, src); + if(ev) + { + l->fd = ev->fd; + } return ev && add_ev(ev, false); } diff --git a/llarp/ev_epoll.hpp b/llarp/ev_epoll.hpp index 3096676ec..aafef5664 100644 --- a/llarp/ev_epoll.hpp +++ b/llarp/ev_epoll.hpp @@ -360,8 +360,6 @@ struct llarp_epoll_loop : public llarp_ev_loop auto val = write(pipefds[1], &i, sizeof(i)); (void)val; } - - byte_t readbuf[2048]; }; #endif diff --git a/llarp/ev_kqueue.hpp b/llarp/ev_kqueue.hpp index 3b3ad7986..5c190e2c6 100644 --- a/llarp/ev_kqueue.hpp +++ b/llarp/ev_kqueue.hpp @@ -180,7 +180,6 @@ struct llarp_kqueue_loop : public llarp_ev_loop { struct kevent events[1024]; int result; - byte_t readbuf[2048]; timespec t; t.tv_sec = 0; t.tv_nsec = ms * 1000UL; @@ -208,7 +207,6 @@ struct llarp_kqueue_loop : public llarp_ev_loop t.tv_nsec = 1000UL * EV_TICK_INTERVAL; struct kevent events[1024]; int result; - byte_t readbuf[2048]; do { result = kevent(kqueuefd, nullptr, 0, events, 1024, &t); diff --git a/llarp/ev_win32.hpp b/llarp/ev_win32.hpp index 1a6cf03c7..d61e4f2a4 100644 --- a/llarp/ev_win32.hpp +++ b/llarp/ev_win32.hpp @@ -137,7 +137,6 @@ struct llarp_win32_loop : public llarp_ev_loop WSAOVERLAPPED* qdata = nullptr; int result = 0; int idx = 0; - byte_t readbuf[2048]; do { @@ -183,7 +182,6 @@ struct llarp_win32_loop : public llarp_ev_loop WSAOVERLAPPED* qdata = nullptr; int result = 0; int idx = 0; - byte_t readbuf[2048]; // unlike epoll and kqueue, we only need to run so long as the // system call returns TRUE diff --git a/llarp/link/server.cpp b/llarp/link/server.cpp index e68dcac86..8024ad6e7 100644 --- a/llarp/link/server.cpp +++ b/llarp/link/server.cpp @@ -50,21 +50,8 @@ namespace llarp if(!itr->second->TimedOut(now)) { itr->second->Pump(); - ++itr; - } - else - { - { - util::Lock lock(m_LinksMutex); - auto i = m_Links.find(itr->second->GetPubKey()); - if(i != m_Links.end()) - { - m_Links.erase(i); - } - } - delete itr->second; - itr = m_Sessions.erase(itr); } + ++itr; } } @@ -84,6 +71,17 @@ namespace llarp return false; } + void + ILinkLayer::RemoveSessionVia(const Addr& addr) + { + auto itr = m_Sessions.find(addr); + if(itr == m_Sessions.end()) + return; + + delete itr->second; + m_Sessions.erase(itr); + } + void ILinkLayer::TryEstablishTo(const RouterContact& rc) { @@ -93,12 +91,15 @@ namespace llarp util::Lock l(m_SessionsMutex); llarp::Addr addr(to); auto itr = m_Sessions.find(addr); - if(itr == m_Sessions.end()) + if(itr != m_Sessions.end()) { - auto s = NewOutboundSession(rc, to); - s->Start(); - m_Sessions.emplace(addr, s); + itr->second->SendClose(); + delete itr->second; + m_Sessions.erase(itr); } + auto s = NewOutboundSession(rc, to); + s->Start(); + m_Sessions.emplace(addr, s); } bool @@ -239,8 +240,9 @@ namespace llarp } void - ILinkLayer::Tick(uint64_t interval, llarp_time_t now) + ILinkLayer::OnTick(uint64_t interval, llarp_time_t now) { + Tick(now); util::Lock l(m_SessionsMutex); auto itr = m_Sessions.begin(); while(itr != m_Sessions.end()) diff --git a/llarp/link/utp.cpp b/llarp/link/utp.cpp index 56b6361e0..f3c7325ad 100644 --- a/llarp/link/utp.cpp +++ b/llarp/link/utp.cpp @@ -1,25 +1,34 @@ #include #include "router.hpp" #include +#include #include #include #include #include +#ifdef __linux__ +#include +#include +#endif + namespace llarp { namespace utp { - constexpr size_t FragmentBufferSize = 1088; - constexpr size_t FragmentHashSize = 32; - constexpr size_t FragmentNonceSize = 24; + constexpr size_t FragmentPadMax = 32; + constexpr size_t FragmentHashSize = 32; + constexpr size_t FragmentNonceSize = 32; constexpr size_t FragmentOverheadSize = FragmentHashSize + FragmentNonceSize; - constexpr size_t FragmentBodySize = - FragmentBufferSize - FragmentOverheadSize; + constexpr size_t FragmentBodySize = 512; + constexpr size_t FragmentBufferSize = + FragmentHashSize + FragmentNonceSize + FragmentBodySize; typedef llarp::AlignedBuffer< FragmentBufferSize > FragmentBuffer; + typedef llarp::AlignedBuffer< MAX_LINK_MSG_SIZE > MessageBuffer; + struct LinkLayer; struct BaseSession : public ILinkSession @@ -36,13 +45,12 @@ namespace llarp std::queue< FragmentBuffer > sendq; FragmentBuffer recvBuf; size_t recvBufOffset; - std::vector< byte_t > recvMsg; + MessageBuffer recvMsg; + size_t recvMsgOffset; + bool stalled = false; void - Alive() - { - lastActive = llarp_time_now_ms(); - } + Alive(); /// base BaseSession(llarp_router* r); @@ -87,30 +95,27 @@ namespace llarp void PumpWrite() { - // TODO: use utp_writev - while(sendq.size()) + while(sendq.size() && !stalled) { auto& front = sendq.front(); - write_ll(front.data(), front.size()); + if(write_ll(front.data(), front.size()) == 0) + { + stalled = true; + return; + } sendq.pop(); } } - void + ssize_t write_ll(void* buf, size_t sz) { if(sock == nullptr) { llarp::LogWarn("write_ll failed: no socket"); - return; - } - llarp::LogDebug("utp_write ", sz, " bytes to ", remoteAddr); - ssize_t wrote = utp_write(sock, buf, sz); - if(wrote < 0) - { - llarp::LogWarn("utp_write returned ", wrote); + return 0; } - llarp::LogDebug("utp_write wrote ", wrote, " bytes to ", remoteAddr); + return utp_write(sock, buf, sz); } bool @@ -133,8 +138,8 @@ namespace llarp size_t sz = buf.sz; while(sz) { - uint32_t s = - std::min((FragmentBodySize - (llarp_randint() % 128)), sz); + uint32_t s = std::min( + (FragmentBodySize - (llarp_randint() % FragmentPadMax)), sz); sendq.emplace(); EncryptThenHash(sendq.back(), buf.cur, s, ((sz - s) == 0)); buf.cur += s; @@ -156,13 +161,12 @@ namespace llarp OnLinkEstablished(p); KeyExchangeNonce nonce; nonce.Randomize(); - SendHandshake(nonce, sock); gotLIM = true; EnterState(eCryptoHandshake); - auto router = Router(); - if(DoKeyExchange(sock, router->crypto.dh_client, nonce, - remoteTransportPubKey, router->encryption)) + if(DoKeyExchange(Router()->crypto.transport_dh_client, nonce, + remoteTransportPubKey, Router()->encryption)) { + SendHandshake(nonce, sock); EnterState(eSessionReady); } } @@ -177,12 +181,11 @@ namespace llarp byte_t* begin = buf.cur; LinkIntroMessage msg; msg.rc = Router()->rc; - - msg.N = n; + msg.N = n; if(!msg.BEncode(&buf)) { llarp::LogError("failed to encode our RC for handshake"); - Close(s); + Close(); return; } @@ -198,15 +201,15 @@ namespace llarp } bool - DoKeyExchange(utp_socket* s, llarp_transport_dh_func dh, - const KeyExchangeNonce& n, const PubKey& other, - const SecretKey& secret) + DoKeyExchange(llarp_transport_dh_func dh, const KeyExchangeNonce& n, + const PubKey& other, const SecretKey& secret) { - sock = s; + PubKey us = llarp::seckey_topublic(secret); + llarp::LogDebug("DH us=", us, " then=", other, " n=", n); if(!dh(sessionKey, other, secret, n)) { llarp::LogError("key exchange with ", other, " failed"); - Close(sock); + Close(); return false; } return true; @@ -217,31 +220,8 @@ namespace llarp { } - bool - SendKeepAlive() - { - return true; - } - void - SendClose() - { - if(sock) - Close(sock); - } - - void - Close(utp_socket* s) - { - if(state != eClose) - { - utp_shutdown(s, SHUT_RDWR); - utp_close(s); - utp_set_userdata(s, nullptr); - } - EnterState(eClose); - sock = nullptr; - } + Close(bool remove = true); void RecvHandshake(const void* buf, size_t bufsz, LinkLayer* p, utp_socket* s); @@ -249,25 +229,30 @@ namespace llarp bool Recv(const void* buf, size_t sz) { - Alive(); const byte_t* ptr = (const byte_t*)buf; llarp::LogDebug("utp read ", sz, " from ", remoteAddr); - while(sz + recvBufOffset > FragmentBufferSize) - { - memcpy(recvBuf.data() + recvBufOffset, ptr, FragmentBufferSize); - sz -= FragmentBufferSize; - ptr += FragmentBufferSize; - VerifyThenDecrypt(recvBuf); - recvBufOffset = 0; - } - memcpy(recvBuf.data() + recvBufOffset, ptr, sz); - if(sz + recvBufOffset <= FragmentBufferSize) + size_t s = sz; + while(s > 0) { - recvBufOffset = 0; - VerifyThenDecrypt(recvBuf); + auto dlt = recvBuf.size() - recvBufOffset; + if(dlt > 0) + { + llarp::LogDebug("dlt=", dlt, " offset=", recvBufOffset, " s=", s); + memcpy(recvBuf.data() + recvBufOffset, ptr, dlt); + if(!VerifyThenDecrypt(recvBuf)) + return false; + recvBufOffset = 0; + ptr += dlt; + s -= dlt; + } + else + { + if(!VerifyThenDecrypt(recvBuf)) + return false; + recvBufOffset = 0; + } } - else - recvBufOffset += sz; + Alive(); return true; } @@ -276,7 +261,7 @@ namespace llarp { if(now < lastActive) return false; - return lastActive - now > sessionTimeout; + return now - lastActive > sessionTimeout; } const PubKey& @@ -307,7 +292,21 @@ namespace llarp { LinkLayer* l = static_cast< LinkLayer* >(utp_context_get_userdata(arg->context)); - llarp_ev_udp_sendto(&l->m_udp, arg->address, arg->buf, arg->len); + llarp::LogDebug("utp_sendto ", Addr(*arg->address), " ", arg->len, + " bytes"); + if(sendto(l->m_udp.fd, arg->buf, arg->len, arg->flags, arg->address, + arg->address_len) + == -1) + { + llarp::LogError("sendto failed: ", strerror(errno)); + } + return 0; + } + + static uint64 + OnError(utp_callback_arguments* arg) + { + llarp::LogError(utp_error_code_names[arg->error_code]); return 0; } @@ -317,6 +316,13 @@ namespace llarp static uint64 OnAccept(utp_callback_arguments*); + static uint64 + OnLog(utp_callback_arguments* arg) + { + llarp::LogDebug(arg->buf); + return 0; + } + LinkLayer(llarp_router* r) : ILinkLayer() { router = r; @@ -327,9 +333,16 @@ namespace llarp utp_set_callback(_utp_ctx, UTP_ON_STATE_CHANGE, &LinkLayer::OnStateChange); utp_set_callback(_utp_ctx, UTP_ON_READ, &LinkLayer::OnRead); + utp_set_callback(_utp_ctx, UTP_ON_ERROR, &LinkLayer::OnError); + utp_set_callback(_utp_ctx, UTP_LOG, &LinkLayer::OnLog); utp_context_set_option(_utp_ctx, UTP_LOG_NORMAL, 1); utp_context_set_option(_utp_ctx, UTP_LOG_MTU, 1); utp_context_set_option(_utp_ctx, UTP_LOG_DEBUG, 1); + utp_context_set_option(_utp_ctx, UTP_SNDBUF, MAX_LINK_MSG_SIZE * 64); + utp_context_set_option(_utp_ctx, UTP_RCVBUF, MAX_LINK_MSG_SIZE * 64); + utp_set_callback( + _utp_ctx, UTP_GET_UDP_MTU, + [](utp_callback_arguments*) -> uint64 { return 1392; }); } ~LinkLayer() @@ -349,14 +362,96 @@ namespace llarp utp_process_udp(_utp_ctx, (const byte_t*)buf, sz, from, from.SockLen()); } +#ifdef __linux__ + void + ProcessICMP() + { + do + { + byte_t vec_buf[4096], ancillary_buf[4096]; + struct iovec iov = {vec_buf, sizeof(vec_buf)}; + struct sockaddr_in remote; + struct msghdr msg; + ssize_t len; + struct cmsghdr* cmsg; + struct sock_extended_err* e; + struct sockaddr* icmp_addr; + struct sockaddr_in* icmp_sin; + + memset(&msg, 0, sizeof(msg)); + + msg.msg_name = &remote; + msg.msg_namelen = sizeof(remote); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_flags = 0; + msg.msg_control = ancillary_buf; + msg.msg_controllen = sizeof(ancillary_buf); + + len = recvmsg(m_udp.fd, &msg, MSG_ERRQUEUE | MSG_DONTWAIT); + if(len < 0) + { + if(errno == EAGAIN || errno == EWOULDBLOCK) + errno = 0; + else + llarp::LogError("failed to read icmp for utp ", strerror(errno)); + return; + } + + for(cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) + { + if(cmsg->cmsg_type != IP_RECVERR) + { + continue; + } + if(cmsg->cmsg_level != SOL_IP) + { + continue; + } + e = (struct sock_extended_err*)CMSG_DATA(cmsg); + if(!e) + continue; + if(e->ee_origin != SO_EE_ORIGIN_ICMP) + { + continue; + } + icmp_addr = (struct sockaddr*)SO_EE_OFFENDER(e); + icmp_sin = (struct sockaddr_in*)icmp_addr; + if(icmp_sin->sin_port != 0) + { + continue; + } + if(e->ee_type == 3 && e->ee_code == 4) + { + utp_process_icmp_fragmentation(_utp_ctx, vec_buf, len, + (struct sockaddr*)&remote, + sizeof(remote), e->ee_info); + } + else + { + utp_process_icmp_error(_utp_ctx, vec_buf, len, + (struct sockaddr*)&remote, sizeof(remote)); + } + } + } while(true); + } +#endif + void Pump() { - utp_check_timeouts(_utp_ctx); - utp_issue_deferred_acks(_utp_ctx); + ReadPump(); ILinkLayer::Pump(); } + void + ReadPump() + { +#ifdef __linux__ + ProcessICMP(); +#endif + } + void Stop() { @@ -371,6 +466,17 @@ namespace llarp router->crypto.encryption_keygen(k); return true; } + + void + Tick(llarp_time_t now) + { +#ifdef __linux__ + ProcessICMP(); +#endif + utp_check_timeouts(_utp_ctx); + ILinkLayer::Tick(now); + } + ILinkSession* NewOutboundSession(const RouterContact& rc, const AddressInfo& addr); @@ -395,13 +501,30 @@ namespace llarp BaseSession::BaseSession(llarp_router* r) { + parent = nullptr; + recvMsgOffset = 0; + SendKeepAlive = [&]() -> bool { + if(sendq.size() == 0) + { + DiscardMessage msg; + byte_t tmp[128] = {0}; + auto buf = llarp::StackBuffer< decltype(tmp) >(tmp); + if(!msg.BEncode(&buf)) + return false; + buf.sz = buf.cur - buf.base; + buf.cur = buf.base; + if(!this->QueueWriteBuffers(buf)) + return false; + } + return true; + }; recvBufOffset = 0; TimedOut = [&](llarp_time_t now) -> bool { return this->IsTimedOut(now); }; GetPubKey = std::bind(&BaseSession::RemotePubKey, this); lastActive = llarp_time_now_ms(); - Pump = [&]() { PumpWrite(); }; + Pump = std::bind(&BaseSession::PumpWrite, this); Tick = std::bind(&BaseSession::TickImpl, this, std::placeholders::_1); SendMessageBuffer = [&](llarp_buffer_t buf) -> bool { return this->QueueWriteBuffers(buf); @@ -410,6 +533,7 @@ namespace llarp HandleLinkIntroMessage = [](const LinkIntroMessage*) -> bool { return false; }; + SendClose = std::bind(&BaseSession::Close, this, false); } BaseSession::BaseSession(llarp_router* r, utp_socket* s, @@ -462,8 +586,13 @@ namespace llarp uint64 LinkLayer::OnRead(utp_callback_arguments* arg) { + LinkLayer* parent = + static_cast< LinkLayer* >(utp_context_get_userdata(arg->context)); BaseSession* self = static_cast< BaseSession* >(utp_get_userdata(arg->socket)); +#ifdef __linux__ + parent->ProcessICMP(); +#endif if(self) { if(self->state == BaseSession::eClose) @@ -471,14 +600,13 @@ namespace llarp return 0; } else if(self->state == BaseSession::eSessionReady) + { self->Recv(arg->buf, arg->len); + } else if(self->state == BaseSession::eLinkEstablished) { - LinkLayer* parent = - static_cast< LinkLayer* >(utp_context_get_userdata(arg->context)); self->RecvHandshake(arg->buf, arg->len, parent, arg->socket); } - utp_read_drained(arg->socket); } else { @@ -504,9 +632,14 @@ namespace llarp } session->OutboundLinkEstablished(l); } + else if(arg->state == UTP_STATE_WRITABLE) + { + session->stalled = false; + session->PumpWrite(); + } else if(arg->state == UTP_STATE_EOF) { - session->SendClose(); + session->Close(false); } } return 0; @@ -519,6 +652,7 @@ namespace llarp static_cast< LinkLayer* >(utp_context_get_userdata(arg->context)); Addr remote(*arg->address); llarp::LogDebug("utp accepted from ", remote); + if(self->HasSessionVia(remote)) { // TODO should we do this? @@ -540,7 +674,7 @@ namespace llarp uint32_t sz, bool isLastFragment) { - llarp::LogDebug("encrypt then hash ", sz, " bytes"); + llarp::LogDebug("encrypt then hash ", sz, " bytes last=", isLastFragment); buf.Randomize(); const byte_t* nonce = buf.data() + FragmentHashSize; byte_t* body = buf.data() + FragmentOverheadSize; @@ -553,37 +687,43 @@ namespace llarp memcpy(body, ptr, sz); auto payload = InitBuffer(base, FragmentBodySize); parent->router->crypto.xchacha20(payload, sessionKey, nonce); - parent->router->crypto.hmac(buf, payload, sessionKey); + payload.base = buf.data() + FragmentHashSize; + payload.cur = payload.base; + payload.sz = FragmentBufferSize - FragmentHashSize; + parent->router->crypto.hmac(buf.data(), payload, sessionKey); } void BaseSession::EnterState(State st) { - Alive(); state = st; if(st == eSessionReady) { + llarp::LogInfo("map ", remoteRC.pubkey, " to ", remoteAddr); parent->MapAddr(this->remoteAddr, remoteRC.pubkey); Router()->HandleLinkSessionEstablished(remoteRC); } + Alive(); } bool BaseSession::VerifyThenDecrypt(FragmentBuffer& buf) { + llarp::LogDebug("verify then decrypt ", remoteAddr); ShortHash digest; - if(!Router()->crypto.hmac( - digest, - InitBuffer(buf.data() + FragmentHashSize, - FragmentBufferSize - FragmentHashSize), - sessionKey)) + + auto hbuf = InitBuffer(buf.data() + FragmentHashSize, + buf.size() - FragmentHashSize); + if(!Router()->crypto.hmac(digest, hbuf, sessionKey)) { llarp::LogError("keyed hash failed"); return false; } - if(digest != ShortHash(buf.data())) + if(memcmp(buf.data(), digest.data(), 32)) { - llarp::LogError("Message Integrity Failed"); + llarp::LogError("Message Integrity Failed: got ", digest, " from ", + remoteAddr); + llarp::DumpBuffer(hbuf); return false; } AlignedBuffer< FragmentNonceSize > nonce(buf.data() + FragmentHashSize); @@ -598,22 +738,27 @@ namespace llarp && llarp_buffer_read_uint32(&body, &lower))) return false; bool fragmentEnd = upper == 0; - if(recvMsg.size() + lower > MAX_LINK_MSG_SIZE) + llarp::LogDebug("fragment size ", lower, " from ", remoteAddr); + if(lower + recvMsgOffset > recvMsg.size()) { llarp::LogError("Fragment too big: ", lower, " bytes"); return false; } - size_t newsz = recvMsg.size() + lower; - recvMsg.reserve(newsz); - byte_t* ptr = recvMsg.data() + (newsz - lower); + byte_t* ptr = recvMsg.data() + recvMsgOffset; memcpy(ptr, body.cur, lower); + recvMsgOffset += lower; if(fragmentEnd) { // got a message - auto mbuf = Buffer(recvMsg); + llarp::LogDebug("end of message from ", remoteAddr); + auto mbuf = InitBuffer(recvMsg.data(), recvMsgOffset); auto result = Router()->HandleRecvLinkMessageBuffer(this, mbuf); - recvMsg.clear(); - recvMsg.shrink_to_fit(); + if(!result) + { + llarp::LogWarn("failed to handle message from ", remoteAddr); + llarp::DumpBuffer(mbuf); + } + recvMsgOffset = 0; return result; } return true; @@ -623,29 +768,21 @@ namespace llarp BaseSession::RecvHandshake(const void* buf, size_t bufsz, LinkLayer* p, utp_socket* s) { - Alive(); size_t sz = bufsz; parent = p; sock = s; - if(parent->HasSessionVia(remoteAddr)) - { - llarp::LogDebug("already have session via ", remoteAddr, - " so closing before processing handshake"); - Close(sock); - return; - } llarp::LogDebug("recv handshake ", sz, " from ", remoteAddr); if(recvBuf.size() < sz) { llarp::LogDebug("handshake too big from ", remoteAddr); - Close(sock); + Close(); return; } if(sz <= 8) { llarp::LogDebug("handshake too small from ", remoteAddr); - Close(sock); + Close(); return; } memcpy(recvBuf.data(), buf, sz); @@ -657,7 +794,7 @@ namespace llarp { llarp::LogWarn("protocol version missmatch ", version, " != ", LLARP_PROTO_VERSION); - Close(sock); + Close(); return; } ptr += sizeof(uint32_t); @@ -671,7 +808,7 @@ namespace llarp // TODO: don't bail here, continue reading llarp::LogDebug("not enough data for handshake, want ", limsz, " bytes but got ", sz); - Close(sock); + Close(); return; } llarp::LogInfo("read LIM from ", remoteAddr); @@ -683,7 +820,7 @@ namespace llarp { llarp::LogError("Failed to parse LIM from ", remoteAddr); llarp::DumpBuffer(mbuf); - Close(sock); + Close(); return; } if(!msg.HandleMessage(Router())) @@ -691,13 +828,39 @@ namespace llarp llarp::LogError("failed to verify signature of rc"); return; } - if(!DoKeyExchange(sock, Router()->crypto.dh_server, msg.N, msg.rc.enckey, - parent->TransportSecretKey())) - return; remoteRC = msg.rc; - gotLIM = true; + if(!DoKeyExchange(Router()->crypto.transport_dh_server, msg.N, + remoteRC.enckey, parent->TransportSecretKey())) + return; + gotLIM = true; llarp::LogInfo("we got a new session from ", GetPubKey()); EnterState(eSessionReady); + Alive(); + } + + void + BaseSession::Close(bool remove) + { + if(state != eClose) + { + utp_shutdown(sock, SHUT_RDWR); + utp_close(sock); + utp_set_userdata(sock, nullptr); + } + EnterState(eClose); + sock = nullptr; + if(remove) + parent->RemoveSessionVia(remoteAddr); + } + + void + BaseSession::Alive() + { + lastActive = llarp_time_now_ms(); + if(sock) + utp_read_drained(sock); + if(parent) + utp_issue_deferred_acks(parent->_utp_ctx); } } // namespace utp diff --git a/llarp/link_layer.hpp b/llarp/link_layer.hpp index a7948271c..e69de29bb 100644 --- a/llarp/link_layer.hpp +++ b/llarp/link_layer.hpp @@ -1,5 +0,0 @@ -#ifndef LLARP_LINK_LAYER_HPP -#define LLARP_LINK_LAYER_HPP -#include -#include -#endif diff --git a/llarp/link_message.cpp b/llarp/link_message.cpp index 669577cb3..3621841f4 100644 --- a/llarp/link_message.cpp +++ b/llarp/link_message.cpp @@ -63,6 +63,9 @@ namespace llarp case 'c': handler->msg = std::make_unique< LR_CommitMessage >(handler->from); break; + case 'x': + handler->msg = std::make_unique< DiscardMessage >(handler->from); + break; default: return false; } diff --git a/llarp/nodedb.cpp b/llarp/nodedb.cpp index 8633176ec..37f16d81b 100644 --- a/llarp/nodedb.cpp +++ b/llarp/nodedb.cpp @@ -381,8 +381,9 @@ llarp_nodedb_select_random_hop(struct llarp_nodedb *n, /// checking for "guard" status for N = 0 is done by caller inside of /// pathbuilder's scope auto sz = n->entries.size(); - if(sz == 0) + if(sz < 3) return false; + size_t tries = 5; if(N) { do @@ -395,13 +396,18 @@ llarp_nodedb_select_random_hop(struct llarp_nodedb *n, std::advance(itr, idx - 1); } if(prev.pubkey == itr->second.pubkey) - continue; + { + if(tries--) + continue; + return false; + } if(itr->second.addrs.size()) { result = itr->second; return true; } - } while(true); + } while(tries--); + return false; } else { diff --git a/llarp/router.cpp b/llarp/router.cpp index b888a9774..22f21e480 100644 --- a/llarp/router.cpp +++ b/llarp/router.cpp @@ -47,22 +47,12 @@ struct TryConnectJob void Success() { - router->FlushOutboundFor(rc.pubkey, link); - router->pendingEstablishJobs.erase(rc.pubkey); - // we are gone } void AttemptTimedout() { - --triesLeft; - if(!ShouldRetry()) - { - router->pendingEstablishJobs.erase(rc.pubkey); - // we are gone after this - return; - } - Attempt(); + router->pendingEstablishJobs.erase(rc.pubkey); } void @@ -349,8 +339,7 @@ llarp_router::on_verify_server_rc(llarp_async_verify_rc *job) { ctx->establish_job->Success(); } - else // this was an inbound session - router->FlushOutboundFor(pk, router->GetLinkWithSessionByPubkey(pk)); + router->FlushOutboundFor(pk, router->GetLinkWithSessionByPubkey(pk)); } void @@ -443,6 +432,7 @@ llarp_router::Tick() { llarp::LogInfo( "We need more than 3 service nodes to build paths but we have ", N); + dht->impl.Explore(N); } hiddenServiceContext.Tick(); } @@ -523,6 +513,7 @@ llarp_router::FlushOutboundFor(const llarp::RouterID &remote, llarp::ILinkLayer *chosen) { llarp::LogDebug("Flush outbound for ", remote); + pendingEstablishJobs.erase(remote); auto itr = outboundMessageQueue.find(remote); if(itr == outboundMessageQueue.end()) { diff --git a/llarp/router.hpp b/llarp/router.hpp index 786bc8756..5d74aa061 100644 --- a/llarp/router.hpp +++ b/llarp/router.hpp @@ -23,9 +23,6 @@ #include "fs.hpp" #include "mem.hpp" -/** 2^15 bytes */ -#define MAX_LINK_MSG_SIZE (32768) - bool llarp_findOrCreateEncryption(llarp_crypto *crypto, const char *fpath, llarp::SecretKey &encryption);