diff --git a/Makefile b/Makefile index 3784f6ebf..605ce7bc3 100644 --- a/Makefile +++ b/Makefile @@ -51,7 +51,7 @@ GRADLE ?= gradle JAVA_HOME ?= /usr/lib/jvm/default-java # jsonrpc server -JSONRPC ?= OFF +JSONRPC ?= ON # native avx2 code AVX2 ?= OFF # non x86 target @@ -63,7 +63,7 @@ NETNS ?= OFF # cross compile? CROSS ?= OFF # build liblokinet-shared.so -SHARED_LIB ?= ON +SHARED_LIB ?= OFF # enable generating coverage COVERAGE ?= OFF COVERAGE_OUTDIR ?= "$(TMPDIR)/lokinet-coverage" diff --git a/docs/wire-protocol.txt b/docs/wire-protocol.txt index 6070cad2e..489eb9759 100644 --- a/docs/wire-protocol.txt +++ b/docs/wire-protocol.txt @@ -9,132 +9,128 @@ LLARP supports by default an authenticated message transport over a datagram based network layer. -outer message format: - -outer-header: - -<1 byte command> -<1 byte reserved, R, set to '=' (0x3d)> -<32 bytes flow id, B> +protocol phases: +first phase: proof of flow +second phase: session handshake +thrid phase: data transmission -command 'O' - obtain flow id +proof of flow: -obtain a handshake cookie +At any time before the data transfer phase a reject message +is sent the session is reset. - -<32 bytes random> -<8 bytes net id> - +Alice (A) is the sender and Bob (B) is the recipiant. -the if the network id differs from the current network's id a reject message -MUST be sent +A asks for a flow id from B. -MUST be replied to with a message rejected or a give handshake cookie +B MAY send a flow id to A or MAY reject the message from A. -command 'G' - give flow id - -<32 byte new flow-id, X> - +session handshake: -give a flow id to a remote endpoint that asks for one +an encrypted session is established using establish wire session messages +using a newly created flow id. -B is the B value from the request flow id message -X is a 32 byte the flow id, calcuated via: -r = RAND(32) -a = "::ffff." + ":" + "" -X = HS(a + B + r + net id) +outer message format: -resulting message: - "G=<32 byte flowid><32 bytes new flowid>" +every outer message MAY be obfsucated via symettric encryption for dpi +resistance reasons, this is not authenticated encryption. -after recieving a give flow id message a session negotiation can happen +the message is first assumed to be sent in clear first. +if parsing of clear variant fails then the recipiant MUST fall back to assuming +the protocol is in obfuscated mode. -command 'R' - message rejected - - +<16 bytes nounce, n> + -reject a message on a flow id +obfuscated via: -B is the flow id from the recipiant +K = HS(B_k) +N = HS(n + K) +X = SD(K, m, N[0:24]) -resulting message of reject with reason "no you" +outer-header: - "R=<32 byte flow-id>no you" +<1 byte command> +<1 byte reserved set to 0x3d> -command 'S' - session negotiation +command 'O' - obtain flow id -negotiate encrypted session +obtain a flow id -<24 bytes nounce, N> - - - -B is the flow id from the recipiant generated by the give flow id message (from outer header) -N is a random nounce -X is encrypted session negotiation data -Z is a keyed hash +<6 magic bytes "netid?"> +<8 bytes netid, I> +<8 bytes timestamp milliseconds since epoch, T> +<32 bytes ed25519 public key of sender, A_k> +<0-N bytes discarded> + -Z is generated via: - -msg.Z = '0x00' * 32 -msg.Z = MDS(msg, tx_K) - -session negotiation: - -The session starts out with each side having 2 session keys rx_K and tx_K for -decrypting inbound messages and encrypting outbound messages respectively. +the if the network id differs from the current network's id a reject message +MUST be sent -The initiator (alice) and the recipiant (bob) start out with static session keys +MUST be replied to with a message rejected or a give handshake cookie -k_a = HS(a.k) -k_b = HS(b.k) +command 'G' - give flow id -a.rx_K = k_a -b.rx_K = k_b + +<6 magic bytes "netid!"> +<16 bytes new flow id> +<32 bytes ed25519 public key of sender, A_k> +<0-N bytes discarded> + -a.tx_K = k_b -b.tx_K = k_a +after recieving a give flow id message a session negotiation can happen with that flow id. +command 'R' - flow rejected -decryption is done via: +reject new flow -SD(msg.X, rx_K, msg.N) + +<14 ascii bytes reason for rejection null padded> +<8 bytes timestamp> +<32 bytes ed25519 public key of sender, A_k> +<0-N bytes discarded> + -encrypted payload is bencoded LIM (see proto_v0.txt) +command 'E' - establish wire session -the initiator starts out by sending a LIM a_LIM to the recipiant. +establish an encrypted session using a flow id -the recipiant replies with a LIM b_LIM to the initiator. + +<16 bytes flow id, B> +<32 bytes ephemeral public encryption key, E> +<8 bytes packet counter starting at 0> + + -when the initiator gets a valid LIM from the recipiant the session keys for data -transmission are set to: +every time we try establishing a wire session we increment the counter +by 1 for the next message we send. -k_a = TKE(a.k, b.k, a.sk, a_LIM.n) -k_b = TKE(b.k, a.k, b.sk, b_LIM.n) +when we get an establish wire session message +we reply with an establish wire session message with counter being counter + 1 -a.rx_K = k_a -b.rx_K = k_b +if A is provided that is interpreted as being generated via: -a.tx_K = k_b -b.tx_K = k_a +h0 = HS('') +h1 = EDDH(us, them) +A = HS(B + h0 + h1) -afterwards data transmission may happen. -the intiator's remote address is permitted to change during data transmission. -remote address of the last successfully +each side establishes their own rx key using this message. +when each side has both established thier rx key data can be transmitted. -D - encrypted data transmission +command 'D' - encrypted data transmission -transmit encrypted data on session +transmit encrypted data on a wire session +<16 bytes flow-id, F> <24 bytes nonce, N> - + @@ -158,7 +154,7 @@ header: <1 byte command> -command: 'K' (keep alive) +command: 'k' (keep alive) tell other side to acknoledge they are alive @@ -171,7 +167,7 @@ tell other side to acknoledge they are alive <8 bytes milliseconds since epoch our current time> -command: 'L' (keep alive ack) +command: 'l' (keep alive ack) acknolege keep alive message @@ -183,7 +179,29 @@ acknolege keep alive message -command: 'C' (congestion) +command: 'n' (advertise neighboors) + +tell peer about neighboors, only sent by non service nodes to other non service +nodes. + +
+ +<0 or more intermediate routes> + + +route: + +<1 byte route version (currently 0)> +<1 byte flags, lsb set indicates src is a service node> +<2 bytes latency in ms> +<2 bytes backpressure> +<2 bytes number of connected peers> +<8 bytes publish timestamp ms since epoch> +<32 bytes pubkey neighboor> +<32 bytes pubkey src> +<64 bytes signature of entire route signed by src> + +command: 'c' (congestion) tell other side to slow down @@ -192,7 +210,7 @@ tell other side to slow down <4 bytes milliseconds slowdown lifetime> -command: 'D' (anti-congestion) +command: 'd' (anti-congestion) tell other side to speed up @@ -201,26 +219,35 @@ tell other side to speed up <4 bytes milliseconds speedup lifetime> -command: 'T' (transmit) +command: 't' (transmit data) + +transmit a message to a peer -transit fragment +if this fragment is not addressed to us we route it to the neighboor +with the shortest route to the recipiant as advertised by all neighboors.
-<1 byte number of 16 byte blocks offset from beginning of message, O> -<1 byte number of 16 byte blocks size of fragment data, N> -<4 bytes sequence number> -<32 bytes expected digest of message, present if O is 0, otherwise omitted> -<16 * N bytes of data> - +<32 bytes public identity key of recipiant> +<32 bytes public identity key of sender> +<24 bytes nounce, N> + + -command: 'A' (ack) +encrypted via: -acknoledge fragments +K = EDDH(recipiant, sender) +X = SE(msg, K, N) +Z = MDS(X, K) -
-<1 byte number of acks following, N> -<8 * N bytes acks> - +encrypted message format: + +<1 byte version, currently 0> +<1 byte number of acks following, aN> +<8 * aN bytes acks> +<4 byte sequence number of fragment or 0 if no fragment is included> +<2 byte 16 byte block offset in message of this fragment if it is included> + + ack format: @@ -231,7 +258,7 @@ ack format: <1 byte bitmask fragments posative ack (msb is fragment 0, lsb is fragment 7)> -command: 'R' (rotate keys) +command: 'r' (rotate keys) inform remote that their RX key should be rotated @@ -248,7 +275,7 @@ B.rx_K = n_K <32 bytes next public encryption key, K> -command: 'U' (upgrade) +command: 'u' (upgrade) request protocol upgrade @@ -257,7 +284,7 @@ request protocol upgrade <1 byte protocol max version to upgrade to> -command: 'V' (version upgrade) +command: 'v' (version upgrade) sent in response to upgrade message diff --git a/libabyss/main.cpp b/libabyss/main.cpp index 5d4d174d1..bd8a3672f 100644 --- a/libabyss/main.cpp +++ b/libabyss/main.cpp @@ -24,15 +24,28 @@ struct DemoHandler : public abyss::httpd::IRPCHandler struct DemoCall : public abyss::http::IRPCClientHandler { - DemoCall(abyss::http::ConnImpl* impl) : abyss::http::IRPCClientHandler(impl) + std::function< void(void) > m_Callback; + llarp::Logic* m_Logic; + + DemoCall(abyss::http::ConnImpl* impl, llarp::Logic* logic, + std::function< void(void) > callback) + : abyss::http::IRPCClientHandler(impl) + , m_Callback(callback) + , m_Logic(logic) { llarp::LogInfo("new call"); } - bool - HandleResponse(abyss::http::RPC_Response resp) override + static void + CallCallback(void* u) + { + static_cast< DemoCall* >(u)->m_Callback(); + } + + bool HandleResponse(abyss::http::RPC_Response) override { - llarp::json::ToString(resp, std::cout); + llarp::LogInfo("response get"); + m_Logic->queue_job({this, &CallCallback}); return true; } @@ -51,10 +64,18 @@ struct DemoCall : public abyss::http::IRPCClientHandler struct DemoClient : public abyss::http::JSONRPC { + llarp_ev_loop* m_Loop; + llarp::Logic* m_Logic; + + DemoClient(llarp_ev_loop* l, llarp::Logic* logic) + : abyss::http::JSONRPC(), m_Loop(l), m_Logic(logic) + { + } + abyss::http::IRPCClientHandler* NewConn(abyss::http::ConnImpl* impl) { - return new DemoCall(impl); + return new DemoCall(impl, m_Logic, std::bind(&llarp_ev_loop_stop, m_Loop)); } void @@ -109,7 +130,7 @@ main(__attribute__((unused)) int argc, __attribute__((unused)) char* argv[]) addr.sin_port = htons(1222); addr.sin_family = AF_INET; DemoServer serv; - DemoClient client; + DemoClient client(loop, logic); llarp::Addr a(addr); while(true) { diff --git a/libabyss/src/client.cpp b/libabyss/src/client.cpp index 5f0d00069..9797924c1 100644 --- a/libabyss/src/client.cpp +++ b/libabyss/src/client.cpp @@ -70,7 +70,7 @@ namespace abyss ConnImpl* self = static_cast< ConnImpl* >(conn->user); if(!self->ProcessRead((const char*)buf.base, buf.sz)) { - self->CloseError(); + self->CloseError("on read failed"); } } @@ -138,8 +138,8 @@ namespace abyss Close(); return true; case json::IParser::eParseError: - handler->HandleError(); - return false; + CloseError("json parse error"); + return true; default: return false; } @@ -195,8 +195,9 @@ namespace abyss } void - CloseError() + CloseError(const char* msg) { + LogError("CloseError: ", msg); if(handler) handler->HandleError(); handler = nullptr; @@ -221,72 +222,20 @@ namespace abyss std::stringstream ss; json::ToString(m_RequestBody, ss); body = ss.str(); - // request base - char buf[512] = {0}; - int sz = snprintf(buf, sizeof(buf), - "POST /rpc HTTP/1.0\r\nContent-Type: " - "application/json\r\nContent-Length: %zu\r\nAccept: " - "application/json\r\n", - body.size()); - if(sz <= 0) - return; - if(!llarp_tcp_conn_async_write(m_Conn, llarp_buffer_t(buf, sz))) - { - llarp::LogError("failed to write first part of request"); - CloseError(); - return; - } - // header delimiter - buf[0] = ':'; - buf[1] = ' '; - // CRLF - buf[2] = '\r'; - buf[3] = '\n'; - // write extra request header + m_SendHeaders.emplace("Content-Type", "application/json"); + m_SendHeaders.emplace("Content-Length", std::to_string(body.size())); + m_SendHeaders.emplace("Accept", "application/json"); + std::stringstream request; + request << "POST /json_rpc HTTP/1.0\r\n"; for(const auto& item : m_SendHeaders) + request << item.first << ": " << item.second << "\r\n"; + request << "\r\n" << body; + std::string buf = request.str(); + + if(!llarp_tcp_conn_async_write(m_Conn, + llarp_buffer_t(buf.c_str(), buf.size()))) { - // header name - if(!llarp_tcp_conn_async_write( - m_Conn, llarp_buffer_t(item.first.c_str(), item.first.size()))) - { - CloseError(); - return; - } - // header delimiter - if(!llarp_tcp_conn_async_write(m_Conn, - llarp_buffer_t(buf, 2 * sizeof(char)))) - { - CloseError(); - return; - } - // header value - if(!llarp_tcp_conn_async_write( - m_Conn, - llarp_buffer_t(item.second.c_str(), item.second.size()))) - { - CloseError(); - return; - } - // CRLF - if(!llarp_tcp_conn_async_write( - m_Conn, llarp_buffer_t(buf + 2, 2 * sizeof(char)))) - { - CloseError(); - return; - } - } - // CRLF - if(!llarp_tcp_conn_async_write( - m_Conn, llarp_buffer_t(buf + 2, 2 * sizeof(char)))) - { - CloseError(); - return; - } - // request body - if(!llarp_tcp_conn_async_write( - m_Conn, llarp_buffer_t(body.c_str(), body.size()))) - { - CloseError(); + CloseError("failed to write request"); return; } llarp::LogDebug("request sent"); diff --git a/llarp/crypto/encrypted_frame.hpp b/llarp/crypto/encrypted_frame.hpp index 1ab5efe21..49406191a 100644 --- a/llarp/crypto/encrypted_frame.hpp +++ b/llarp/crypto/encrypted_frame.hpp @@ -39,6 +39,16 @@ namespace llarp return *this; } + void + Resize(size_t sz) + { + if(sz <= EncryptedFrameSize) + { + _sz = sz; + UpdateBuffer(); + } + } + bool DecryptInPlace(const SecretKey& seckey, llarp::Crypto* crypto); diff --git a/llarp/ev/ev.hpp b/llarp/ev/ev.hpp index cdbeaf66d..20c97f651 100644 --- a/llarp/ev/ev.hpp +++ b/llarp/ev/ev.hpp @@ -555,13 +555,22 @@ namespace llarp void connected() { - // we are connected yeh boi - if(_conn) + sockaddr_storage st; + socklen_t sl; + if (getpeername(fd, (sockaddr*)&st, &sl) == 0) { - if(_conn->connected && !_calledConnected) - _conn->connected(_conn, &tcp); + // we are connected yeh boi + if(_conn) + { + if(_conn->connected && !_calledConnected) + _conn->connected(_conn, &tcp); + } + _calledConnected = true; + } + else + { + error(); } - _calledConnected = true; } void @@ -575,8 +584,9 @@ namespace llarp } void - error() + error() override { + _shouldClose = true; if(_conn) { #ifndef _WIN32 @@ -591,6 +601,7 @@ namespace llarp if(_conn->error) _conn->error(_conn); } + } virtual ssize_t diff --git a/llarp/ev/ev_epoll.cpp b/llarp/ev/ev_epoll.cpp index 12adf3061..3dbb4bf9d 100644 --- a/llarp/ev/ev_epoll.cpp +++ b/llarp/ev/ev_epoll.cpp @@ -20,7 +20,7 @@ namespace llarp if(tcp.read) tcp.read(&tcp, llarp_buffer_t(buf, amount)); } - else + else if(amount < 0) { // error _shouldClose = true; @@ -123,7 +123,7 @@ namespace llarp return -1; b.sz = ret; udp->recvfrom(udp, addr, ManagedBuffer{b}); - return 0; + return ret; } int @@ -323,6 +323,7 @@ llarp_epoll_loop::tick(int ms) epoll_event events[1024]; int result; result = epoll_wait(epollfd, events, 1024, ms); + bool didIO = false; if(result > 0) { int idx = 0; @@ -331,22 +332,27 @@ llarp_epoll_loop::tick(int ms) llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].data.ptr); if(ev) { - llarp::LogDebug(idx, " of ", result, + llarp::LogDebug(idx, " of ", result, " on ", ev->fd, " events=", std::to_string(events[idx].events)); if(events[idx].events & EPOLLERR) { + llarp::LogDebug("epoll error"); ev->error(); } - else + else { - if(events[idx].events & EPOLLIN) - { - ev->read(readbuf, sizeof(readbuf)); - } + // write THEN READ don't revert me if(events[idx].events & EPOLLOUT) { + llarp::LogDebug("epoll out"); ev->flush_write(); } + if(events[idx].events & EPOLLIN) + { + llarp::LogDebug("epoll in"); + if(ev->read(readbuf, sizeof(readbuf)) > 0) + didIO = true; + } } } ++idx; @@ -354,6 +360,9 @@ llarp_epoll_loop::tick(int ms) } if(result != -1) tick_listeners(); + /// if we didn't get an io events we sleep to avoid 100% cpu use + if(!didIO) + std::this_thread::sleep_for(std::chrono::milliseconds(ms)); return result; } diff --git a/llarp/link/iwp.cpp b/llarp/link/iwp.cpp index 91e7dccab..72cf038e7 100644 --- a/llarp/link/iwp.cpp +++ b/llarp/link/iwp.cpp @@ -1,23 +1,207 @@ #include #include +#include namespace llarp { namespace iwp { - std::unique_ptr< ILinkLayer > - NewServerFromRouter(AbstractRouter*) + void + OuterMessage::Clear() + { + command = 0; + flow.Zero(); + netid.Zero(); + nextFlowID.Zero(); + rejectReason.clear(); + N.Zero(); + X.Zero(); + Xsize = 0; + Z.Zero(); + } + + bool + OuterMessage::Decode(llarp_buffer_t* buf) { + if(buf->size_left() < 34) + return false; + command = *buf->cur; + ++buf->cur; + if(*buf->cur != '=') + return false; + std::copy_n(flow.begin(), 32, buf->cur); + buf->cur += 32; + switch(command) + { + case eOCMD_ObtainFlowID: + if(buf->size_left() < 40) + return false; + buf->cur += 32; + std::copy_n(netid.begin(), 8, buf->cur); + return true; + case eOCMD_GiveFlowID: + if(buf->size_left() < 32) + return false; + std::copy_n(nextFlowID.begin(), 32, buf->cur); + return true; + case eOCMD_Reject: + rejectReason = std::string(buf->cur, buf->base + buf->sz); + return true; + case eOCMD_SessionNegotiate: + // explicit fallthrough + case eOCMD_TransmitData: + if(buf->size_left() <= 56) + return false; + std::copy_n(N.begin(), N.size(), buf->cur); + buf->cur += N.size(); + Xsize = buf->size_left() - Z.size(); + if(Xsize > X.size()) + return false; + std::copy_n(X.begin(), Xsize, buf->cur); + buf->cur += Xsize; + std::copy_n(Z.begin(), Z.size(), buf->cur); + return true; + + default: + return false; + } + } + + LinkLayer::LinkLayer(Crypto* c, const SecretKey& enckey, GetRCFunc getrc, + LinkMessageHandler h, SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, SignBufferFunc sign, + TimeoutHandler t, SessionClosedHandler closed) + : ILinkLayer(enckey, getrc, h, sign, est, reneg, t, closed), crypto(c) + { + } + + LinkLayer::~LinkLayer() + { + } + + void + LinkLayer::Pump() + { + ILinkLayer::Pump(); + } + + const char* + LinkLayer::Name() const + { + return "iwp"; + } + + bool + LinkLayer::KeyGen(SecretKey& k) + { + k.Zero(); + crypto->encryption_keygen(k); + return !k.IsZero(); + } + + uint16_t + LinkLayer::Rank() const + { + return 2; + } + + bool + LinkLayer::Start(Logic* l) + { + if(!ILinkLayer::Start(l)) + return false; + /// TODO: change me to true when done + return false; + } + + void + LinkLayer::RecvFrom(const Addr& from, const void* pkt, size_t sz) + { + m_OuterMsg.Clear(); + llarp_buffer_t buf(pkt, sz); + if(!m_OuterMsg.Decode(&buf)) + { + LogError("failed to decode outer message"); + return; + } + NetID ourNetID; + switch(m_OuterMsg.command) + { + case eOCMD_ObtainFlowID: + if(!ShouldSendFlowID(from)) + return; // drop + + if(m_OuterMsg.netid == ourNetID) + SendFlowID(from, m_OuterMsg.flow); + else + SendReject(from, m_OuterMsg.flow, "bad net id"); + } + } + + ILinkSession* + LinkLayer::NewOutboundSession(const RouterContact& rc, + const AddressInfo& ai) + { + (void)rc; + (void)ai; // TODO: implement me return nullptr; } + void + LinkLayer::SendFlowID(const Addr& to, const FlowID_t& flow) + { + // TODO: implement me + (void)to; + (void)flow; + } + + bool + LinkLayer::ShouldSendFlowID(const Addr& to) const + { + (void)to; + // TODO: implement me + return false; + } + + void + LinkLayer::SendReject(const Addr& to, const FlowID_t& flow, const char* msg) + { + // TODO: implement me + (void)to; + (void)flow; + (void)msg; + } + + std::unique_ptr< ILinkLayer > + NewServerFromRouter(AbstractRouter* r) + { + using namespace std::placeholders; + return NewServer( + r->crypto(), r->encryption(), std::bind(&AbstractRouter::rc, r), + std::bind(&AbstractRouter::HandleRecvLinkMessageBuffer, r, _1, _2), + std::bind(&AbstractRouter::OnSessionEstablished, r, _1), + std::bind(&AbstractRouter::CheckRenegotiateValid, r, _1, _2), + std::bind(&AbstractRouter::Sign, r, _1, _2), + std::bind(&AbstractRouter::OnConnectTimeout, r, _1), + std::bind(&AbstractRouter::SessionClosed, r, _1)); + } + std::unique_ptr< ILinkLayer > - NewServer(llarp::Crypto*, const SecretKey&, llarp::GetRCFunc, - llarp::LinkMessageHandler, llarp::SessionEstablishedHandler, - llarp::SessionRenegotiateHandler, llarp::SignBufferFunc, - llarp::TimeoutHandler, llarp::SessionClosedHandler) + NewServer(Crypto* c, const SecretKey& enckey, GetRCFunc getrc, + LinkMessageHandler h, SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, SignBufferFunc sign, + TimeoutHandler t, SessionClosedHandler closed) { + (void)c; + (void)enckey; + (void)getrc; + (void)h; + (void)est; + (void)reneg; + (void)sign; + (void)t; + (void)closed; // TODO: implement me return nullptr; } diff --git a/llarp/link/iwp_internal.hpp b/llarp/link/iwp_internal.hpp index 3c1977d49..46ffcf988 100644 --- a/llarp/link/iwp_internal.hpp +++ b/llarp/link/iwp_internal.hpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -17,6 +18,117 @@ namespace llarp { struct LinkLayer; + using FlowID_t = llarp::AlignedBuffer< 32 >; + + using OuterCommand_t = byte_t; + + constexpr OuterCommand_t eOCMD_ObtainFlowID = 'O'; + constexpr OuterCommand_t eOCMD_GiveFlowID = 'G'; + constexpr OuterCommand_t eOCMD_Reject = 'R'; + constexpr OuterCommand_t eOCMD_SessionNegotiate = 'S'; + constexpr OuterCommand_t eOCMD_TransmitData = 'D'; + + using InnerCommand_t = byte_t; + + constexpr InnerCommand_t eICMD_KeepAlive = 'k'; + constexpr InnerCommand_t eICMD_KeepAliveAck = 'l'; + constexpr InnerCommand_t eICMD_Congestion = 'c'; + constexpr InnerCommand_t eICMD_AntiCongestion = 'd'; + constexpr InnerCommand_t eICMD_Transmit = 't'; + constexpr InnerCommand_t eICMD_Ack = 'a'; + constexpr InnerCommand_t eICMD_RotateKeys = 'r'; + constexpr InnerCommand_t eICMD_UpgradeProtocol = 'u'; + constexpr InnerCommand_t eICMD_VersionUpgrade = 'v'; + + struct OuterMessage + { + // required memebers + byte_t command; + FlowID_t flow; + + // optional memebers follow + NetID netid; + FlowID_t nextFlowID; + std::string rejectReason; + AlignedBuffer< 24 > N; + // TODO: compute optimal size + AlignedBuffer< 1440 > X; + size_t Xsize; + ShortHash Z; + + /// encode to buffer + bool + Encode(llarp_buffer_t *buf) const; + + /// decode from buffer + bool + Decode(llarp_buffer_t *buf); + + /// verify signature if needed + bool + Verify(const SharedSecret &K) const; + + /// clear members + void + Clear(); + }; + + /// TODO: fixme + constexpr size_t MaxFrags = 8; + + using MessageBuffer_t = AlignedBuffer< MAX_LINK_MSG_SIZE >; + using FragmentLen_t = uint16_t; + using SequenceNum_t = uint32_t; + + using WritePacketFunc = std::function< void(const llarp_buffer_t &) >; + + struct MessageState + { + /// default + MessageState(); + /// inbound + MessageState(const ShortHash &digest, SequenceNum_t num); + /// outbound + MessageState(const ShortHash &digest, const llarp_buffer_t &buf, + SequenceNum_t num); + + /// the expected hash of the message + const ShortHash expectedHash; + + /// which fragments have we got + std::bitset< MaxFrags > acks; + /// the message buffer + MessageBuffer_t msg; + /// the message's size + FragmentLen_t sz; + /// the last activity we have had + llarp_time_t lastActiveAt; + // sequence number + const SequenceNum_t seqno; + + /// return true if this message is to be removed + /// because of inactivity + bool + IsExpired(llarp_time_t now) const; + + /// return true if we have recvieved or sent the underlying message in + /// full. + bool + IsDone() const; + + /// return true if we should retransmit some packets + bool + ShouldRetransmit(llarp_time_t now) const; + + /// transmit unacked fragments + bool + TransmitUnacked(WritePacketFunc write_pkt) const; + + /// transmit acks packet + bool + TransmitAcks(WritePacketFunc write_pkt); + }; + struct Session final : public llarp::ILinkSession { /// base @@ -35,12 +147,15 @@ namespace llarp return {}; } + /// pump ll io void PumpIO(); + /// tick every 1 s void TickIO(llarp_time_t now); + /// queue full message bool QueueMessageBuffer(const llarp_buffer_t &buf); @@ -57,6 +172,7 @@ namespace llarp void Close(); + /// start outbound handshake void Connect(); @@ -105,196 +221,6 @@ namespace llarp /// maximum fragment size static constexpr FragLen_t fragsize = MAX_LINK_MSG_SIZE / maxfrags; - struct FragmentHeader - { - /// protocol version, always LLARP_PROTO_VERSION - Proto_t version = LLARP_PROTO_VERSION; - /// fragment command type - Cmd_t cmd = 0; - /// if cmd is XMIT this is the number of additional fragments this - /// message has - /// if cmd is FACK this is the fragment bitfield of the - /// messages acked otherwise 0 - Flags_t flags = 0; - /// if cmd is XMIT this is the fragment index - /// if cmd is FACK this is set to 0xff to indicate message drop - /// otherwise set to 0 - /// any other cmd it is set to 0 - Fragno_t fragno = 0; - /// if cmd is XMIT then this is the size of the current fragment - /// if cmd is FACK then this MUST be set to 0 - FragLen_t fraglen = 0; - /// if cmd is XMIT or FACK this is the sequence number of the message - /// otherwise it's 0 - Seqno_t seqno = 0; - - bool - Decode(llarp_buffer_t *buf) - { - if(buf->size_left() < fragoverhead) - return false; - version = *buf->cur; - if(version != LLARP_PROTO_VERSION) - return false; - buf->cur++; - cmd = *buf->cur; - buf->cur++; - flags = *buf->cur; - buf->cur++; - fragno = *buf->cur; - buf->cur++; - buf->read_uint16(fraglen); - buf->read_uint32(seqno); - return fraglen <= fragsize; - } - - bool - Encode(llarp_buffer_t *buf, const llarp_buffer_t &body) - { - if(body.sz > fragsize) - return false; - fraglen = body.sz; - if(buf->size_left() < (fragoverhead + fraglen)) - return false; - *buf->cur = LLARP_PROTO_VERSION; - buf->cur++; - *buf->cur = cmd; - buf->cur++; - *buf->cur = flags; - buf->cur++; - *buf->cur = fragno; - buf->cur++; - buf->put_uint16(fraglen); - buf->put_uint32(seqno); - if(fraglen) - memcpy(buf->cur, body.base, fraglen); - buf->cur += fraglen; - return true; - } - }; - - struct MessageState - { - /// default - MessageState(){}; - /// inbound - MessageState(Flags_t numfrags) - { - acks.set(); - if(numfrags <= maxfrags) - { - while(numfrags) - acks.reset(maxfrags - (numfrags--)); - } - else // invalid value - return; - } - - /// outbound - MessageState(const llarp_buffer_t &buf) - { - sz = std::min(buf.sz, MAX_LINK_MSG_SIZE); - memcpy(msg.data(), buf.base, sz); - size_t idx = 0; - acks.set(); - while(idx * fragsize < sz) - acks.reset(idx++); - }; - - /// which fragments have we got - std::bitset< maxfrags > acks; - /// the message buffer - MessageBuffer_t msg; - /// the message's size - FragLen_t sz; - /// the last activity we have had - llarp_time_t lastActiveAt; - - /// return true if this message is to be removed - /// because of inactivity - bool - IsExpired(llarp_time_t now) const - { - return now > lastActiveAt && now - lastActiveAt > 2000; - } - - bool - IsDone() const - { - return acks.all(); - } - - bool - ShouldRetransmit(llarp_time_t now) const - { - if(IsDone()) - return false; - return now > lastActiveAt && now - lastActiveAt > 500; - } - - template < typename write_pkt_func > - bool - TransmitUnacked(write_pkt_func write_pkt, Seqno_t seqno) const - { - static FragLen_t maxfragsize = fragsize; - FragmentHeader hdr; - hdr.seqno = seqno; - hdr.cmd = XMIT; - AlignedBuffer< fragoverhead + fragsize > frag; - llarp_buffer_t buf(frag); - const byte_t *ptr = msg.data(); - Fragno_t idx = 0; - FragLen_t len = sz; - while(idx < maxfrags) - { - const FragLen_t l = std::min(len, maxfragsize); - if(!acks.test(idx)) - { - hdr.fragno = idx; - hdr.fraglen = l; - if(!hdr.Encode(&buf, llarp_buffer_t(ptr, l))) - return false; - buf.sz = buf.cur - buf.base; - buf.cur = buf.base; - len -= l; - if(write_pkt(buf.base, buf.sz) != int(buf.sz)) - return false; - } - ptr += l; - len -= l; - if(l >= fragsize) - ++idx; - else - break; - } - return true; - } - - template < typename write_pkt_func > - bool - TransmitAcks(write_pkt_func write_pkt, Seqno_t seqno) - { - FragmentHeader hdr; - hdr.seqno = seqno; - hdr.cmd = FACK; - hdr.flags = 0; - byte_t idx = 0; - while(idx < maxfrags) - { - if(acks.test(idx)) - hdr.flags |= 1 << idx; - ++idx; - } - hdr.fraglen = 0; - hdr.fragno = 0; - AlignedBuffer< fragoverhead > frag; - llarp_buffer_t buf(frag); - if(!hdr.Encode(&buf, llarp_buffer_t(nullptr, nullptr, 0))) - return false; - return write_pkt(buf.base, buf.sz) == int(buf.sz); - } - }; - using MessageHolder_t = std::unordered_map< Seqno_t, MessageState >; MessageHolder_t m_Inbound; @@ -310,11 +236,10 @@ namespace llarp struct LinkLayer final : public llarp::ILinkLayer { LinkLayer(llarp::Crypto *crypto, const SecretKey &encryptionSecretKey, - const SecretKey &identitySecretKey, llarp::GetRCFunc getrc, - llarp::LinkMessageHandler h, llarp::SignBufferFunc sign, + llarp::GetRCFunc getrc, llarp::LinkMessageHandler h, llarp::SessionEstablishedHandler established, llarp::SessionRenegotiateHandler reneg, - llarp::TimeoutHandler timeout, + llarp::SignBufferFunc sign, llarp::TimeoutHandler timeout, llarp::SessionClosedHandler closed); ~LinkLayer(); @@ -339,36 +264,37 @@ namespace llarp uint16_t Rank() const override; - const byte_t * - IndentityKey() const - { - return m_IdentityKey.data(); - } - - const AlignedBuffer< 32 > & - CookieSec() const - { - return m_CookieSec; - } + /// verify that a new flow id matches addresses and old flow id + bool + VerifyFlowID(const FlowID_t &newID, const Addr &from, + const FlowID_t &oldID) const; - RouterID - GetRouterID() const - { - return m_IdentityKey.toPublic(); - } + void + RecvFrom(const llarp::Addr &from, const void *buf, size_t sz) override; private: bool - SignBuffer(llarp::Signature &sig, llarp_buffer_t buf) const - { - return crypto->sign(sig, m_IdentityKey, buf); - } - const llarp::SecretKey m_IdentityKey; - AlignedBuffer< 32 > m_CookieSec; + ShouldSendFlowID(const Addr &from) const; - /// handle ll recv void - RecvFrom(const llarp::Addr &from, const void *buf, size_t sz) override; + SendReject(const Addr &to, const FlowID_t &flow, const char *msg); + + void + SendFlowID(const Addr &to, const FlowID_t &flow); + + using ActiveFlows_t = + std::unordered_map< FlowID_t, RouterID, FlowID_t::Hash >; + + ActiveFlows_t m_ActiveFlows; + + using PendingFlows_t = std::unordered_map< Addr, FlowID_t, Addr::Hash >; + /// flows that are pending authentication + PendingFlows_t m_PendingFlows; + + /// cookie used in flow id computation + AlignedBuffer< 32 > m_FlowCookie; + + OuterMessage m_OuterMsg; }; } // namespace iwp } // namespace llarp diff --git a/llarp/link/server.hpp b/llarp/link/server.hpp index 9c38f26b4..97a460f5e 100644 --- a/llarp/link/server.hpp +++ b/llarp/link/server.hpp @@ -30,7 +30,9 @@ namespace llarp using GetRCFunc = std::function< const llarp::RouterContact&(void) >; /// handler of session established - using SessionEstablishedHandler = std::function< void(llarp::RouterContact) >; + /// return false to reject + /// return true to accept + using SessionEstablishedHandler = std::function< bool(ILinkSession*) >; /// f(new, old) /// handler of session renegotiation diff --git a/llarp/link/utp.cpp b/llarp/link/utp.cpp index cbec08545..40474ee64 100644 --- a/llarp/link/utp.cpp +++ b/llarp/link/utp.cpp @@ -777,10 +777,10 @@ namespace llarp Close(); return false; } - EnterState(eSessionReady); /// future LIM are used for session renegotiation GotLIM = std::bind(&Session::GotSessionRenegotiate, this, std::placeholders::_1); + EnterState(eSessionReady); return true; } @@ -982,7 +982,8 @@ namespace llarp if(st == eSessionReady) { parent->MapAddr(remoteRC.pubkey.as_array(), this); - parent->SessionEstablished(remoteRC); + if(!parent->SessionEstablished(this)) + Close(); } } diff --git a/llarp/path/pathbuilder.cpp b/llarp/path/pathbuilder.cpp index 037ae0c93..bb42c68fb 100644 --- a/llarp/path/pathbuilder.cpp +++ b/llarp/path/pathbuilder.cpp @@ -89,17 +89,18 @@ namespace llarp record.nextHop = hop.upstream; record.commkey = seckey_topublic(hop.commkey); - auto buf = frame.Buffer(); - buf->cur = buf->base + EncryptedFrameOverheadSize; + llarp_buffer_t buf(frame.data(), frame.size()); + buf.cur = buf.base + EncryptedFrameOverheadSize; // encode record - if(!record.BEncode(buf)) + if(!record.BEncode(&buf)) { // failed to encode? LogError("Failed to generate Commit Record"); - DumpBuffer(*buf); + DumpBuffer(buf); delete ctx; return; } + frame.Resize(buf.cur - buf.base); // use ephemeral keypair for frame SecretKey framekey; ctx->crypto->encryption_keygen(framekey); diff --git a/llarp/router/abstractrouter.hpp b/llarp/router/abstractrouter.hpp index 736373949..a159da4ea 100644 --- a/llarp/router/abstractrouter.hpp +++ b/llarp/router/abstractrouter.hpp @@ -47,8 +47,8 @@ namespace llarp struct AbstractRouter : public util::IStateful { - virtual void - OnSessionEstablished(RouterContact rc) = 0; + virtual bool + OnSessionEstablished(ILinkSession *) = 0; virtual bool HandleRecvLinkMessageBuffer(ILinkSession *from, @@ -121,6 +121,9 @@ namespace llarp virtual void OnConnectTimeout(ILinkSession *session) = 0; + /// connect to N random routers + virtual void + ConnectToRandomRouters(int N) = 0; /// inject configuration and reconfigure router virtual bool Reconfigure(Config *conf) = 0; diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 8790351b4..63bf84cd8 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -192,11 +192,10 @@ namespace llarp return false; } - void - Router::OnSessionEstablished(RouterContact rc) + bool + Router::OnSessionEstablished(ILinkSession * s) { - async_verify_RC(rc, nullptr); - LogInfo("session with ", RouterID(rc.pubkey), " established"); + return async_verify_RC(s->GetRemoteRC()); } Router::Router(struct llarp_threadpool *_tp, struct llarp_ev_loop *__netloop, @@ -851,7 +850,7 @@ namespace llarp { whitelistRouters = IsTrueValue(val); } - if(StrEq(key, "jsonrpc")) + if(StrEq(key, "jsonrpc") || StrEq(key, "addr")) { lokidRPCAddr = val; } @@ -989,7 +988,8 @@ namespace llarp return false; // store it in nodedb async - async_verify_RC(newrc, nullptr); + if(!async_verify_RC(newrc)) + return false; // update dht if required if(dht()->impl->Nodes()->HasNode(dht::Key_t{newrc.pubkey})) { @@ -1086,15 +1086,18 @@ namespace llarp LogError("we have no bootstrap nodes specified"); } - if(inboundLinks.size() == 0) + if(!IsServiceNode()) { + size_t connected = NumberOfConnectedRouters(); + if(connected < minConnectedRouters) + { + size_t dlt = minConnectedRouters - connected; + LogInfo("connecting to ", dlt, " random routers to keep alive"); + ConnectToRandomRouters(dlt); + } paths.BuildPaths(now); _hiddenServiceContext.Tick(now); } - if(NumberOfConnectedRouters() < minConnectedRouters) - { - ConnectToRandomRouters(minConnectedRouters); - } _exitContext.Tick(now); if(rpcCaller) rpcCaller->Tick(now); @@ -1222,21 +1225,22 @@ namespace llarp return false; } - void - Router::async_verify_RC(const RouterContact &rc, ILinkLayer *link) + bool + Router::async_verify_RC(const RouterContact &rc) { - if(pendingVerifyRC.count(rc.pubkey)) - return; - if(rc.IsPublicRouter() && whitelistRouters) + + if(rc.IsPublicRouter() && whitelistRouters && IsServiceNode()) { if(lokinetRouters.find(rc.pubkey) == lokinetRouters.end()) { RouterID sn(rc.pubkey); LogInfo(sn, " is NOT a valid service node, rejecting"); - link->CloseSessionTo(rc.pubkey); return; } } + if(pendingVerifyRC.count(rc.pubkey)) + return true; + LogInfo("session with ", RouterID(rc.pubkey), " established"); llarp_async_verify_rc *job = &pendingVerifyRC[rc.pubkey]; async_verify_context *ctx = new async_verify_context(); ctx->router = this; @@ -1261,6 +1265,7 @@ namespace llarp job->hook = &Router::on_verify_client_rc; llarp_nodedb_async_verify(job); + return true; } void diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index ba83b68ee..67629e104 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -211,7 +211,7 @@ namespace llarp uint16_t m_OutboundPort = 0; /// always maintain this many connections to other routers - size_t minConnectedRouters = 1; + size_t minConnectedRouters = 3; /// hard upperbound limit on the number of router to router connections size_t maxConnectedRouters = 2000; @@ -321,8 +321,8 @@ namespace llarp ~Router(); - void - OnSessionEstablished(RouterContact rc) override; + bool + OnSessionEstablished(ILinkSession *) override; bool HandleRecvLinkMessageBuffer(ILinkSession *from, @@ -493,7 +493,7 @@ namespace llarp const PathID_t &rxid) override; void - ConnectToRandomRouters(int N); + ConnectToRandomRouters(int N) override; size_t NumberOfConnectedRouters() const override; @@ -504,8 +504,8 @@ namespace llarp bool GetRandomConnectedRouter(RouterContact &result) const override; - void - async_verify_RC(const RouterContact &rc, ILinkLayer *link); + bool + async_verify_RC(const RouterContact &rc); void HandleDHTLookupForSendTo(RouterID remote, diff --git a/llarp/router_contact.hpp b/llarp/router_contact.hpp index c939461b8..8681f87bf 100644 --- a/llarp/router_contact.hpp +++ b/llarp/router_contact.hpp @@ -130,6 +130,12 @@ namespace llarp && nickname == other.nickname && last_updated == other.last_updated && netID == other.netID; } + + bool + operator!=(const RouterContact & other) const + { + return !(*this == other); + } void Clear(); diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 17c78bc95..12bf459bd 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -1011,9 +1011,6 @@ namespace llarp p->SetDropHandler(std::bind( &Endpoint::OutboundContext::HandleDataDrop, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); - p->SetDeadChecker(std::bind(&Endpoint::CheckPathIsDead, m_Endpoint, - std::placeholders::_1, - std::placeholders::_2)); } void @@ -1024,10 +1021,15 @@ namespace llarp } bool - Endpoint::CheckPathIsDead(__attribute__((unused)) path::Path* p, - __attribute__((unused)) llarp_time_t latency) - { - return false; + Endpoint::CheckPathIsDead(path::Path*, llarp_time_t) + { + RouterLogic()->call_later( + {100, this, [](void* u, uint64_t, uint64_t left) { + if(left) + return; + HandlePathDead(u); + }}); + return true; } bool diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index 3b953f85a..76781c4c3 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -31,7 +31,7 @@ namespace llarp { /// minimum interval for publishing introsets static const llarp_time_t INTROSET_PUBLISH_INTERVAL = - DEFAULT_PATH_LIFETIME / 4; + DEFAULT_PATH_LIFETIME / 8; static const llarp_time_t INTROSET_PUBLISH_RETRY_INTERVAL = 5000; diff --git a/test/link/test_llarp_link.cpp b/test/link/test_llarp_link.cpp index 1caf95780..0d48d85bd 100644 --- a/test/link/test_llarp_link.cpp +++ b/test/link/test_llarp_link.cpp @@ -189,9 +189,9 @@ TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob) return true; } }, - [&](llarp::RouterContact rc) { - ASSERT_EQ(rc, Bob.GetRC()); - llarp::LogInfo("alice established with bob"); + [&](llarp::ILinkSession * s) -> bool { + const auto rc = s->GetRemoteRC(); + return rc.pubkey == Bob.GetRC().pubkey; }, [&](llarp::RouterContact, llarp::RouterContact) -> bool { return true; }, [&](llarp::Signature& sig, const llarp_buffer_t& buf) -> bool { @@ -228,10 +228,11 @@ TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob) Bob.gotLIM = true; return sendDiscardMessage(s); }, - [&](llarp::RouterContact rc) { - ASSERT_EQ(rc, Alice.GetRC()); + [&](llarp::ILinkSession * s) -> bool { + if(s->GetRemoteRC().pubkey != Alice.GetRC().pubkey) + return false; llarp::LogInfo("bob established with alice"); - Bob.link->VisitSessionByPubkey(Alice.GetRC().pubkey.as_array(), + return Bob.link->VisitSessionByPubkey(Alice.GetRC().pubkey.as_array(), sendDiscardMessage); }, [&](llarp::RouterContact newrc, llarp::RouterContact oldrc) -> bool { @@ -252,7 +253,6 @@ TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob) ASSERT_TRUE(Alice.link->TryEstablishTo(Bob.GetRC())); RunMainloop(); - ASSERT_TRUE(Alice.gotLIM); ASSERT_TRUE(Bob.gotLIM); ASSERT_TRUE(success); } @@ -262,26 +262,14 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob) Alice.link = llarp::utp::NewServer( &crypto, Alice.encryptionKey, [&]() -> const llarp::RouterContact& { return Alice.GetRC(); }, - [&](llarp::ILinkSession* s, const llarp_buffer_t& buf) -> bool { - if(Alice.gotLIM) - { - return AliceGotMessage(buf); - } - else - { - llarp::LinkIntroMessage msg; - ManagedBuffer copy{buf}; - if(!msg.BDecode(©.underlying)) - return false; - if(!s->GotLIM(&msg)) - return false; - Alice.gotLIM = true; - return true; - } + [&](llarp::ILinkSession*, const llarp_buffer_t& buf) -> bool { + return AliceGotMessage(buf); }, - [&](llarp::RouterContact rc) { - ASSERT_EQ(rc, Bob.GetRC()); + [&](llarp::ILinkSession * s) -> bool { + if(s->GetRemoteRC().pubkey != Bob.GetRC().pubkey) + return false; llarp::LogInfo("alice established with bob"); + return true; }, [&](llarp::RouterContact, llarp::RouterContact) -> bool { return true; }, [&](llarp::Signature& sig, const llarp_buffer_t& buf) -> bool { @@ -293,36 +281,28 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob) }, [&](llarp::RouterID router) { ASSERT_EQ(router, Bob.GetRouterID()); }); - auto sendDiscardMessage = [](llarp::ILinkSession* s) -> bool { - // send discard message in reply to complete unit test - std::array< byte_t, 32 > tmp; - llarp_buffer_t otherBuf(tmp); - llarp::DiscardMessage discard; - if(!discard.BEncode(&otherBuf)) - return false; - otherBuf.sz = otherBuf.cur - otherBuf.base; - otherBuf.cur = otherBuf.base; - return s->SendMessageBuffer(otherBuf); - }; - Bob.link = llarp::utp::NewServer( &crypto, Bob.encryptionKey, [&]() -> const llarp::RouterContact& { return Bob.GetRC(); }, - [&](llarp::ILinkSession* s, const llarp_buffer_t& buf) -> bool { - llarp::LinkIntroMessage msg; - ManagedBuffer copy{buf}; - if(!msg.BDecode(©.underlying)) - return false; - if(!s->GotLIM(&msg)) - return false; - Bob.gotLIM = true; + [&](llarp::ILinkSession*, const llarp_buffer_t& ) -> bool { return true; }, - [&](llarp::RouterContact rc) { - ASSERT_EQ(rc, Alice.GetRC()); + [&](llarp::ILinkSession * s) -> bool { + if(s->GetRemoteRC().pubkey != Alice.GetRC().pubkey) + return false; llarp::LogInfo("bob established with alice"); - Bob.link->VisitSessionByPubkey(Alice.GetRC().pubkey.as_array(), - sendDiscardMessage); + logic->queue_job({s, [](void * u) { + llarp::ILinkSession * self = static_cast(u); + std::array< byte_t, 32 > tmp; + llarp_buffer_t otherBuf(tmp); + llarp::DiscardMessage discard; + if(!discard.BEncode(&otherBuf)) + return; + otherBuf.sz = otherBuf.cur - otherBuf.base; + otherBuf.cur = otherBuf.base; + self->SendMessageBuffer(otherBuf); + }}); + return true; }, [&](llarp::RouterContact, llarp::RouterContact) -> bool { return true; }, [&](llarp::Signature& sig, const llarp_buffer_t& buf) -> bool { @@ -339,18 +319,18 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob) ASSERT_TRUE(Alice.link->TryEstablishTo(Bob.GetRC())); RunMainloop(); - ASSERT_TRUE(Alice.gotLIM); ASSERT_TRUE(Bob.gotLIM); ASSERT_TRUE(success); } +/* + TEST_F(LinkLayerTest, TestIWPAliceConnectToBob) { - /* Alice.link = llarp::iwp::NewServer( &crypto, Alice.encryptionKey, [&]() -> const llarp::RouterContact& { return Alice.GetRC(); }, - [&](llarp::ILinkSession* s, llarp_buffer_t buf) -> bool { + [&](llarp::ILinkSession* s, const llarp_buffer_t& buf) -> bool { if(Alice.gotLIM) { return AliceGotMessage(buf); @@ -358,7 +338,8 @@ TEST_F(LinkLayerTest, TestIWPAliceConnectToBob) else { llarp::LinkIntroMessage msg; - if(!msg.BDecode(&buf)) + ManagedBuffer copy{buf}; + if(!msg.BDecode(©.underlying)) return false; if(!s->GotLIM(&msg)) return false; @@ -371,7 +352,7 @@ TEST_F(LinkLayerTest, TestIWPAliceConnectToBob) llarp::LogInfo("alice established with bob"); }, [&](llarp::RouterContact, llarp::RouterContact) -> bool { return true; }, - [&](llarp::Signature& sig, llarp_buffer_t buf) -> bool { + [&](llarp::Signature& sig, const llarp_buffer_t& buf) -> bool { return crypto.sign(sig, Alice.signingKey, buf); }, [&](llarp::ILinkSession* session) { @@ -382,8 +363,8 @@ TEST_F(LinkLayerTest, TestIWPAliceConnectToBob) auto sendDiscardMessage = [](llarp::ILinkSession* s) -> bool { // send discard message in reply to complete unit test - byte_t tmp[32] = {0}; - auto otherBuf = llarp::StackBuffer< decltype(tmp) >(tmp); + std::array< byte_t, 32 > tmp; + llarp_buffer_t otherBuf(tmp); llarp::DiscardMessage discard; if(!discard.BEncode(&otherBuf)) return false; @@ -395,9 +376,10 @@ TEST_F(LinkLayerTest, TestIWPAliceConnectToBob) Bob.link = llarp::iwp::NewServer( &crypto, Bob.encryptionKey, [&]() -> const llarp::RouterContact& { return Bob.GetRC(); }, - [&](llarp::ILinkSession* s, llarp_buffer_t buf) -> bool { + [&](llarp::ILinkSession* s, const llarp_buffer_t& buf) -> bool { llarp::LinkIntroMessage msg; - if(!msg.BDecode(&buf)) + ManagedBuffer copy{buf}; + if(!msg.BDecode(©.underlying)) return false; if(!s->GotLIM(&msg)) return false; @@ -411,7 +393,7 @@ TEST_F(LinkLayerTest, TestIWPAliceConnectToBob) sendDiscardMessage); }, [&](llarp::RouterContact, llarp::RouterContact) -> bool { return true; }, - [&](llarp::Signature& sig, llarp_buffer_t buf) -> bool { + [&](llarp::Signature& sig, const llarp_buffer_t& buf) -> bool { return crypto.sign(sig, Bob.signingKey, buf); }, [&](llarp::ILinkSession* session) { @@ -428,5 +410,5 @@ TEST_F(LinkLayerTest, TestIWPAliceConnectToBob) ASSERT_TRUE(Alice.gotLIM); ASSERT_TRUE(Bob.gotLIM); ASSERT_TRUE(success); - */ -} +}; +*/ \ No newline at end of file