From 90c8689d28060afab6d223fa6055377f844bf21d Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Mon, 18 Feb 2019 14:38:41 -0500 Subject: [PATCH 01/16] make wire protocol routable --- docs/wire-protocol.txt | 231 +++++++++++++++++++++++------------------ 1 file changed, 129 insertions(+), 102 deletions(-) diff --git a/docs/wire-protocol.txt b/docs/wire-protocol.txt index 6070cad2e..489eb9759 100644 --- a/docs/wire-protocol.txt +++ b/docs/wire-protocol.txt @@ -9,132 +9,128 @@ LLARP supports by default an authenticated message transport over a datagram based network layer. -outer message format: - -outer-header: - -<1 byte command> -<1 byte reserved, R, set to '=' (0x3d)> -<32 bytes flow id, B> +protocol phases: +first phase: proof of flow +second phase: session handshake +thrid phase: data transmission -command 'O' - obtain flow id +proof of flow: -obtain a handshake cookie +At any time before the data transfer phase a reject message +is sent the session is reset. - -<32 bytes random> -<8 bytes net id> - +Alice (A) is the sender and Bob (B) is the recipiant. -the if the network id differs from the current network's id a reject message -MUST be sent +A asks for a flow id from B. -MUST be replied to with a message rejected or a give handshake cookie +B MAY send a flow id to A or MAY reject the message from A. -command 'G' - give flow id - -<32 byte new flow-id, X> - +session handshake: -give a flow id to a remote endpoint that asks for one +an encrypted session is established using establish wire session messages +using a newly created flow id. -B is the B value from the request flow id message -X is a 32 byte the flow id, calcuated via: -r = RAND(32) -a = "::ffff." + ":" + "" -X = HS(a + B + r + net id) +outer message format: -resulting message: - "G=<32 byte flowid><32 bytes new flowid>" +every outer message MAY be obfsucated via symettric encryption for dpi +resistance reasons, this is not authenticated encryption. -after recieving a give flow id message a session negotiation can happen +the message is first assumed to be sent in clear first. +if parsing of clear variant fails then the recipiant MUST fall back to assuming +the protocol is in obfuscated mode. -command 'R' - message rejected - - +<16 bytes nounce, n> + -reject a message on a flow id +obfuscated via: -B is the flow id from the recipiant +K = HS(B_k) +N = HS(n + K) +X = SD(K, m, N[0:24]) -resulting message of reject with reason "no you" +outer-header: - "R=<32 byte flow-id>no you" +<1 byte command> +<1 byte reserved set to 0x3d> -command 'S' - session negotiation +command 'O' - obtain flow id -negotiate encrypted session +obtain a flow id -<24 bytes nounce, N> - - - -B is the flow id from the recipiant generated by the give flow id message (from outer header) -N is a random nounce -X is encrypted session negotiation data -Z is a keyed hash +<6 magic bytes "netid?"> +<8 bytes netid, I> +<8 bytes timestamp milliseconds since epoch, T> +<32 bytes ed25519 public key of sender, A_k> +<0-N bytes discarded> + -Z is generated via: - -msg.Z = '0x00' * 32 -msg.Z = MDS(msg, tx_K) - -session negotiation: - -The session starts out with each side having 2 session keys rx_K and tx_K for -decrypting inbound messages and encrypting outbound messages respectively. +the if the network id differs from the current network's id a reject message +MUST be sent -The initiator (alice) and the recipiant (bob) start out with static session keys +MUST be replied to with a message rejected or a give handshake cookie -k_a = HS(a.k) -k_b = HS(b.k) +command 'G' - give flow id -a.rx_K = k_a -b.rx_K = k_b + +<6 magic bytes "netid!"> +<16 bytes new flow id> +<32 bytes ed25519 public key of sender, A_k> +<0-N bytes discarded> + -a.tx_K = k_b -b.tx_K = k_a +after recieving a give flow id message a session negotiation can happen with that flow id. +command 'R' - flow rejected -decryption is done via: +reject new flow -SD(msg.X, rx_K, msg.N) + +<14 ascii bytes reason for rejection null padded> +<8 bytes timestamp> +<32 bytes ed25519 public key of sender, A_k> +<0-N bytes discarded> + -encrypted payload is bencoded LIM (see proto_v0.txt) +command 'E' - establish wire session -the initiator starts out by sending a LIM a_LIM to the recipiant. +establish an encrypted session using a flow id -the recipiant replies with a LIM b_LIM to the initiator. + +<16 bytes flow id, B> +<32 bytes ephemeral public encryption key, E> +<8 bytes packet counter starting at 0> + + -when the initiator gets a valid LIM from the recipiant the session keys for data -transmission are set to: +every time we try establishing a wire session we increment the counter +by 1 for the next message we send. -k_a = TKE(a.k, b.k, a.sk, a_LIM.n) -k_b = TKE(b.k, a.k, b.sk, b_LIM.n) +when we get an establish wire session message +we reply with an establish wire session message with counter being counter + 1 -a.rx_K = k_a -b.rx_K = k_b +if A is provided that is interpreted as being generated via: -a.tx_K = k_b -b.tx_K = k_a +h0 = HS('') +h1 = EDDH(us, them) +A = HS(B + h0 + h1) -afterwards data transmission may happen. -the intiator's remote address is permitted to change during data transmission. -remote address of the last successfully +each side establishes their own rx key using this message. +when each side has both established thier rx key data can be transmitted. -D - encrypted data transmission +command 'D' - encrypted data transmission -transmit encrypted data on session +transmit encrypted data on a wire session +<16 bytes flow-id, F> <24 bytes nonce, N> - + @@ -158,7 +154,7 @@ header: <1 byte command> -command: 'K' (keep alive) +command: 'k' (keep alive) tell other side to acknoledge they are alive @@ -171,7 +167,7 @@ tell other side to acknoledge they are alive <8 bytes milliseconds since epoch our current time> -command: 'L' (keep alive ack) +command: 'l' (keep alive ack) acknolege keep alive message @@ -183,7 +179,29 @@ acknolege keep alive message -command: 'C' (congestion) +command: 'n' (advertise neighboors) + +tell peer about neighboors, only sent by non service nodes to other non service +nodes. + +
+ +<0 or more intermediate routes> + + +route: + +<1 byte route version (currently 0)> +<1 byte flags, lsb set indicates src is a service node> +<2 bytes latency in ms> +<2 bytes backpressure> +<2 bytes number of connected peers> +<8 bytes publish timestamp ms since epoch> +<32 bytes pubkey neighboor> +<32 bytes pubkey src> +<64 bytes signature of entire route signed by src> + +command: 'c' (congestion) tell other side to slow down @@ -192,7 +210,7 @@ tell other side to slow down <4 bytes milliseconds slowdown lifetime> -command: 'D' (anti-congestion) +command: 'd' (anti-congestion) tell other side to speed up @@ -201,26 +219,35 @@ tell other side to speed up <4 bytes milliseconds speedup lifetime> -command: 'T' (transmit) +command: 't' (transmit data) + +transmit a message to a peer -transit fragment +if this fragment is not addressed to us we route it to the neighboor +with the shortest route to the recipiant as advertised by all neighboors.
-<1 byte number of 16 byte blocks offset from beginning of message, O> -<1 byte number of 16 byte blocks size of fragment data, N> -<4 bytes sequence number> -<32 bytes expected digest of message, present if O is 0, otherwise omitted> -<16 * N bytes of data> - +<32 bytes public identity key of recipiant> +<32 bytes public identity key of sender> +<24 bytes nounce, N> + + -command: 'A' (ack) +encrypted via: -acknoledge fragments +K = EDDH(recipiant, sender) +X = SE(msg, K, N) +Z = MDS(X, K) -
-<1 byte number of acks following, N> -<8 * N bytes acks> - +encrypted message format: + +<1 byte version, currently 0> +<1 byte number of acks following, aN> +<8 * aN bytes acks> +<4 byte sequence number of fragment or 0 if no fragment is included> +<2 byte 16 byte block offset in message of this fragment if it is included> + + ack format: @@ -231,7 +258,7 @@ ack format: <1 byte bitmask fragments posative ack (msb is fragment 0, lsb is fragment 7)> -command: 'R' (rotate keys) +command: 'r' (rotate keys) inform remote that their RX key should be rotated @@ -248,7 +275,7 @@ B.rx_K = n_K <32 bytes next public encryption key, K> -command: 'U' (upgrade) +command: 'u' (upgrade) request protocol upgrade @@ -257,7 +284,7 @@ request protocol upgrade <1 byte protocol max version to upgrade to> -command: 'V' (version upgrade) +command: 'v' (version upgrade) sent in response to upgrade message From 445ed05b32a4ab445cdfe016ca2cd117e9547e4a Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Mon, 18 Feb 2019 14:44:41 -0500 Subject: [PATCH 02/16] make format --- llarp/ev/ev.cpp | 2 +- llarp/link/iwp.cpp | 158 +++++++++++++- llarp/link/iwp_internal.hpp | 360 +++++++++++++------------------- llarp/link/server.cpp | 37 ++-- llarp/link/server.hpp | 5 +- llarp/link/utp_internal.hpp | 2 +- llarp/messages/relay_commit.cpp | 2 +- llarp/path/path.cpp | 47 ++--- llarp/router/abstractrouter.cpp | 1 - llarp/router/abstractrouter.hpp | 10 +- llarp/router/router.cpp | 4 +- llarp/router/router.hpp | 5 +- llarp/service/endpoint.cpp | 27 ++- llarp/service/protocol.hpp | 3 +- llarp/util/status.hpp | 2 +- llarp/util/threadpool.h | 4 +- test/link/test_llarp_link.cpp | 22 +- 17 files changed, 376 insertions(+), 315 deletions(-) diff --git a/llarp/ev/ev.cpp b/llarp/ev/ev.cpp index 9f9bf7e8a..4ca77cf99 100644 --- a/llarp/ev/ev.cpp +++ b/llarp/ev/ev.cpp @@ -26,7 +26,7 @@ llarp_ev_loop_alloc(struct llarp_ev_loop **ev) || (__APPLE__ && __MACH__) *ev = new llarp_kqueue_loop; #elif defined(_WIN32) || defined(_WIN64) || defined(__NT__) - *ev = new llarp_win32_loop; + *ev = new llarp_win32_loop; #else // TODO: fall back to a generic select-based event loop #error no event loop subclass diff --git a/llarp/link/iwp.cpp b/llarp/link/iwp.cpp index 91e7dccab..d545a2fd9 100644 --- a/llarp/link/iwp.cpp +++ b/llarp/link/iwp.cpp @@ -1,25 +1,165 @@ #include #include +#include namespace llarp { namespace iwp { + void + OuterMessage::Clear() + { + command = 0; + flow.Zero(); + netid.Zero(); + nextFlowID.Zero(); + rejectReason.clear(); + N.Zero(); + X.Zero(); + Xsize = 0; + Z.Zero(); + } + + bool + OuterMessage::Decode(llarp_buffer_t* buf) + { + if(buf->size_left() < 34) + return false; + command = *buf->cur; + ++buf->cur; + if(*buf->cur != '=') + return false; + std::copy_n(flow.begin(), 32, buf->cur); + buf->cur += 32; + switch(command) + { + case eOCMD_ObtainFlowID: + if(buf->size_left() < 40) + return false; + buf->cur += 32; + std::copy_n(netid.begin(), 8, buf->cur); + return true; + case eOCMD_GiveFlowID: + if(buf->size_left() < 32) + return false; + std::copy_n(nextFlowID.begin(), 32, buf->cur); + return true; + case eOCMD_Reject: + rejectReason = std::string(buf->cur, buf->base + buf->sz); + return true; + case eOCMD_SessionNegotiate: + // explicit fallthrough + case eOCMD_TransmitData: + if(buf->size_left() <= 56) + return false; + std::copy_n(N.begin(), N.size(), buf->cur); + buf->cur += N.size(); + Xsize = buf->size_left() - Z.size(); + if(Xsize > X.size()) + return false; + std::copy_n(X.begin(), Xsize, buf->cur); + buf->cur += Xsize; + std::copy_n(Z.begin(), Z.size(), buf->cur); + return true; + + default: + return false; + } + } + + LinkLayer::LinkLayer(Crypto* c, const SecretKey& enckey, GetRCFunc getrc, + LinkMessageHandler h, SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, SignBufferFunc sign, + TimeoutHandler t, SessionClosedHandler closed) + : ILinkLayer(enckey, getrc, h, sign, est, reneg, t, closed), crypto(c) + { + } + + LinkLayer::~LinkLayer() + { + } + + void + LinkLayer::Pump() + { + ILinkLayer::Pump(); + } + + const char* + LinkLayer::Name() const + { + return "iwp"; + } + + bool + LinkLayer::KeyGen(SecretKey& k) + { + k.Zero(); + crypto->encryption_keygen(k); + return !k.IsZero(); + } + + uint16_t + LinkLayer::Rank() const + { + return 2; + } + + bool + LinkLayer::Start(Logic* l) + { + if(!ILinkLayer::Start(l)) + return false; + /// TODO: change me to true when done + return false; + } + + void + LinkLayer::RecvFrom(const Addr& from, const void* pkt, size_t sz) + { + m_OuterMsg.Clear(); + llarp_buffer_t buf(pkt, sz); + if(!m_OuterMsg.Decode(&buf)) + { + LogError("failed to decode outer message"); + return; + } + NetID ourNetID; + switch(m_OuterMsg.command) + { + case eOCMD_ObtainFlowID: + if(!ShouldSendFlowID(from)) + return; // drop + + if(m_OuterMsg.netid == ourNetID) + SendFlowID(from, m_OuterMsg.flow); + else + SendReject(from, m_OuterMsg.flow, "bad net id"); + } + } + std::unique_ptr< ILinkLayer > - NewServerFromRouter(AbstractRouter*) + NewServerFromRouter(AbstractRouter* r) { - // TODO: implement me - return nullptr; + using namespace std::placeholders; + return NewServer( + r->crypto(), r->encryption(), std::bind(&AbstractRouter::rc, r), + std::bind(&AbstractRouter::HandleRecvLinkMessageBuffer, r, _1, _2), + std::bind(&AbstractRouter::OnSessionEstablished, r, _1), + std::bind(&AbstractRouter::CheckRenegotiateValid, r, _1, _2), + std::bind(&AbstractRouter::Sign, r, _1, _2), + std::bind(&AbstractRouter::OnConnectTimeout, r, _1), + std::bind(&AbstractRouter::SessionClosed, r, _1)); } std::unique_ptr< ILinkLayer > - NewServer(llarp::Crypto*, const SecretKey&, llarp::GetRCFunc, - llarp::LinkMessageHandler, llarp::SessionEstablishedHandler, - llarp::SessionRenegotiateHandler, llarp::SignBufferFunc, - llarp::TimeoutHandler, llarp::SessionClosedHandler) + NewServer(Crypto* c, const SecretKey& enckey, GetRCFunc getrc, + LinkMessageHandler h, SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, SignBufferFunc sign, + TimeoutHandler t, SessionClosedHandler closed) { - // TODO: implement me - return nullptr; + return std::unique_ptr< ILinkLayer >( + new LinkLayer(c, enckey, getrc, h, est, reneg, sign, t, closed)); } } // namespace iwp } // namespace llarp diff --git a/llarp/link/iwp_internal.hpp b/llarp/link/iwp_internal.hpp index 3c1977d49..46ffcf988 100644 --- a/llarp/link/iwp_internal.hpp +++ b/llarp/link/iwp_internal.hpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -17,6 +18,117 @@ namespace llarp { struct LinkLayer; + using FlowID_t = llarp::AlignedBuffer< 32 >; + + using OuterCommand_t = byte_t; + + constexpr OuterCommand_t eOCMD_ObtainFlowID = 'O'; + constexpr OuterCommand_t eOCMD_GiveFlowID = 'G'; + constexpr OuterCommand_t eOCMD_Reject = 'R'; + constexpr OuterCommand_t eOCMD_SessionNegotiate = 'S'; + constexpr OuterCommand_t eOCMD_TransmitData = 'D'; + + using InnerCommand_t = byte_t; + + constexpr InnerCommand_t eICMD_KeepAlive = 'k'; + constexpr InnerCommand_t eICMD_KeepAliveAck = 'l'; + constexpr InnerCommand_t eICMD_Congestion = 'c'; + constexpr InnerCommand_t eICMD_AntiCongestion = 'd'; + constexpr InnerCommand_t eICMD_Transmit = 't'; + constexpr InnerCommand_t eICMD_Ack = 'a'; + constexpr InnerCommand_t eICMD_RotateKeys = 'r'; + constexpr InnerCommand_t eICMD_UpgradeProtocol = 'u'; + constexpr InnerCommand_t eICMD_VersionUpgrade = 'v'; + + struct OuterMessage + { + // required memebers + byte_t command; + FlowID_t flow; + + // optional memebers follow + NetID netid; + FlowID_t nextFlowID; + std::string rejectReason; + AlignedBuffer< 24 > N; + // TODO: compute optimal size + AlignedBuffer< 1440 > X; + size_t Xsize; + ShortHash Z; + + /// encode to buffer + bool + Encode(llarp_buffer_t *buf) const; + + /// decode from buffer + bool + Decode(llarp_buffer_t *buf); + + /// verify signature if needed + bool + Verify(const SharedSecret &K) const; + + /// clear members + void + Clear(); + }; + + /// TODO: fixme + constexpr size_t MaxFrags = 8; + + using MessageBuffer_t = AlignedBuffer< MAX_LINK_MSG_SIZE >; + using FragmentLen_t = uint16_t; + using SequenceNum_t = uint32_t; + + using WritePacketFunc = std::function< void(const llarp_buffer_t &) >; + + struct MessageState + { + /// default + MessageState(); + /// inbound + MessageState(const ShortHash &digest, SequenceNum_t num); + /// outbound + MessageState(const ShortHash &digest, const llarp_buffer_t &buf, + SequenceNum_t num); + + /// the expected hash of the message + const ShortHash expectedHash; + + /// which fragments have we got + std::bitset< MaxFrags > acks; + /// the message buffer + MessageBuffer_t msg; + /// the message's size + FragmentLen_t sz; + /// the last activity we have had + llarp_time_t lastActiveAt; + // sequence number + const SequenceNum_t seqno; + + /// return true if this message is to be removed + /// because of inactivity + bool + IsExpired(llarp_time_t now) const; + + /// return true if we have recvieved or sent the underlying message in + /// full. + bool + IsDone() const; + + /// return true if we should retransmit some packets + bool + ShouldRetransmit(llarp_time_t now) const; + + /// transmit unacked fragments + bool + TransmitUnacked(WritePacketFunc write_pkt) const; + + /// transmit acks packet + bool + TransmitAcks(WritePacketFunc write_pkt); + }; + struct Session final : public llarp::ILinkSession { /// base @@ -35,12 +147,15 @@ namespace llarp return {}; } + /// pump ll io void PumpIO(); + /// tick every 1 s void TickIO(llarp_time_t now); + /// queue full message bool QueueMessageBuffer(const llarp_buffer_t &buf); @@ -57,6 +172,7 @@ namespace llarp void Close(); + /// start outbound handshake void Connect(); @@ -105,196 +221,6 @@ namespace llarp /// maximum fragment size static constexpr FragLen_t fragsize = MAX_LINK_MSG_SIZE / maxfrags; - struct FragmentHeader - { - /// protocol version, always LLARP_PROTO_VERSION - Proto_t version = LLARP_PROTO_VERSION; - /// fragment command type - Cmd_t cmd = 0; - /// if cmd is XMIT this is the number of additional fragments this - /// message has - /// if cmd is FACK this is the fragment bitfield of the - /// messages acked otherwise 0 - Flags_t flags = 0; - /// if cmd is XMIT this is the fragment index - /// if cmd is FACK this is set to 0xff to indicate message drop - /// otherwise set to 0 - /// any other cmd it is set to 0 - Fragno_t fragno = 0; - /// if cmd is XMIT then this is the size of the current fragment - /// if cmd is FACK then this MUST be set to 0 - FragLen_t fraglen = 0; - /// if cmd is XMIT or FACK this is the sequence number of the message - /// otherwise it's 0 - Seqno_t seqno = 0; - - bool - Decode(llarp_buffer_t *buf) - { - if(buf->size_left() < fragoverhead) - return false; - version = *buf->cur; - if(version != LLARP_PROTO_VERSION) - return false; - buf->cur++; - cmd = *buf->cur; - buf->cur++; - flags = *buf->cur; - buf->cur++; - fragno = *buf->cur; - buf->cur++; - buf->read_uint16(fraglen); - buf->read_uint32(seqno); - return fraglen <= fragsize; - } - - bool - Encode(llarp_buffer_t *buf, const llarp_buffer_t &body) - { - if(body.sz > fragsize) - return false; - fraglen = body.sz; - if(buf->size_left() < (fragoverhead + fraglen)) - return false; - *buf->cur = LLARP_PROTO_VERSION; - buf->cur++; - *buf->cur = cmd; - buf->cur++; - *buf->cur = flags; - buf->cur++; - *buf->cur = fragno; - buf->cur++; - buf->put_uint16(fraglen); - buf->put_uint32(seqno); - if(fraglen) - memcpy(buf->cur, body.base, fraglen); - buf->cur += fraglen; - return true; - } - }; - - struct MessageState - { - /// default - MessageState(){}; - /// inbound - MessageState(Flags_t numfrags) - { - acks.set(); - if(numfrags <= maxfrags) - { - while(numfrags) - acks.reset(maxfrags - (numfrags--)); - } - else // invalid value - return; - } - - /// outbound - MessageState(const llarp_buffer_t &buf) - { - sz = std::min(buf.sz, MAX_LINK_MSG_SIZE); - memcpy(msg.data(), buf.base, sz); - size_t idx = 0; - acks.set(); - while(idx * fragsize < sz) - acks.reset(idx++); - }; - - /// which fragments have we got - std::bitset< maxfrags > acks; - /// the message buffer - MessageBuffer_t msg; - /// the message's size - FragLen_t sz; - /// the last activity we have had - llarp_time_t lastActiveAt; - - /// return true if this message is to be removed - /// because of inactivity - bool - IsExpired(llarp_time_t now) const - { - return now > lastActiveAt && now - lastActiveAt > 2000; - } - - bool - IsDone() const - { - return acks.all(); - } - - bool - ShouldRetransmit(llarp_time_t now) const - { - if(IsDone()) - return false; - return now > lastActiveAt && now - lastActiveAt > 500; - } - - template < typename write_pkt_func > - bool - TransmitUnacked(write_pkt_func write_pkt, Seqno_t seqno) const - { - static FragLen_t maxfragsize = fragsize; - FragmentHeader hdr; - hdr.seqno = seqno; - hdr.cmd = XMIT; - AlignedBuffer< fragoverhead + fragsize > frag; - llarp_buffer_t buf(frag); - const byte_t *ptr = msg.data(); - Fragno_t idx = 0; - FragLen_t len = sz; - while(idx < maxfrags) - { - const FragLen_t l = std::min(len, maxfragsize); - if(!acks.test(idx)) - { - hdr.fragno = idx; - hdr.fraglen = l; - if(!hdr.Encode(&buf, llarp_buffer_t(ptr, l))) - return false; - buf.sz = buf.cur - buf.base; - buf.cur = buf.base; - len -= l; - if(write_pkt(buf.base, buf.sz) != int(buf.sz)) - return false; - } - ptr += l; - len -= l; - if(l >= fragsize) - ++idx; - else - break; - } - return true; - } - - template < typename write_pkt_func > - bool - TransmitAcks(write_pkt_func write_pkt, Seqno_t seqno) - { - FragmentHeader hdr; - hdr.seqno = seqno; - hdr.cmd = FACK; - hdr.flags = 0; - byte_t idx = 0; - while(idx < maxfrags) - { - if(acks.test(idx)) - hdr.flags |= 1 << idx; - ++idx; - } - hdr.fraglen = 0; - hdr.fragno = 0; - AlignedBuffer< fragoverhead > frag; - llarp_buffer_t buf(frag); - if(!hdr.Encode(&buf, llarp_buffer_t(nullptr, nullptr, 0))) - return false; - return write_pkt(buf.base, buf.sz) == int(buf.sz); - } - }; - using MessageHolder_t = std::unordered_map< Seqno_t, MessageState >; MessageHolder_t m_Inbound; @@ -310,11 +236,10 @@ namespace llarp struct LinkLayer final : public llarp::ILinkLayer { LinkLayer(llarp::Crypto *crypto, const SecretKey &encryptionSecretKey, - const SecretKey &identitySecretKey, llarp::GetRCFunc getrc, - llarp::LinkMessageHandler h, llarp::SignBufferFunc sign, + llarp::GetRCFunc getrc, llarp::LinkMessageHandler h, llarp::SessionEstablishedHandler established, llarp::SessionRenegotiateHandler reneg, - llarp::TimeoutHandler timeout, + llarp::SignBufferFunc sign, llarp::TimeoutHandler timeout, llarp::SessionClosedHandler closed); ~LinkLayer(); @@ -339,36 +264,37 @@ namespace llarp uint16_t Rank() const override; - const byte_t * - IndentityKey() const - { - return m_IdentityKey.data(); - } - - const AlignedBuffer< 32 > & - CookieSec() const - { - return m_CookieSec; - } + /// verify that a new flow id matches addresses and old flow id + bool + VerifyFlowID(const FlowID_t &newID, const Addr &from, + const FlowID_t &oldID) const; - RouterID - GetRouterID() const - { - return m_IdentityKey.toPublic(); - } + void + RecvFrom(const llarp::Addr &from, const void *buf, size_t sz) override; private: bool - SignBuffer(llarp::Signature &sig, llarp_buffer_t buf) const - { - return crypto->sign(sig, m_IdentityKey, buf); - } - const llarp::SecretKey m_IdentityKey; - AlignedBuffer< 32 > m_CookieSec; + ShouldSendFlowID(const Addr &from) const; - /// handle ll recv void - RecvFrom(const llarp::Addr &from, const void *buf, size_t sz) override; + SendReject(const Addr &to, const FlowID_t &flow, const char *msg); + + void + SendFlowID(const Addr &to, const FlowID_t &flow); + + using ActiveFlows_t = + std::unordered_map< FlowID_t, RouterID, FlowID_t::Hash >; + + ActiveFlows_t m_ActiveFlows; + + using PendingFlows_t = std::unordered_map< Addr, FlowID_t, Addr::Hash >; + /// flows that are pending authentication + PendingFlows_t m_PendingFlows; + + /// cookie used in flow id computation + AlignedBuffer< 32 > m_FlowCookie; + + OuterMessage m_OuterMsg; }; } // namespace iwp } // namespace llarp diff --git a/llarp/link/server.cpp b/llarp/link/server.cpp index c9ebfbef5..e4ee6c449 100644 --- a/llarp/link/server.cpp +++ b/llarp/link/server.cpp @@ -173,24 +173,25 @@ namespace llarp util::StatusObject ILinkLayer::ExtractStatus() const { - std::vector pending, established; - - std::transform(m_Pending.begin(), m_Pending.end(), std::back_inserter(pending), [](const auto & item) -> util::StatusObject { - return item.second->ExtractStatus(); - }); - std::transform(m_AuthedLinks.begin(), m_AuthedLinks.end(), std::back_inserter(established), [](const auto & item) -> util::StatusObject { - return item.second->ExtractStatus(); - }); - - return { - {"name", Name()}, - {"rank", uint64_t(Rank())}, - {"addr", m_ourAddr.ToString()}, - {"sessions", util::StatusObject{ - {"pending", pending}, - {"established", established} - }} - }; + std::vector< util::StatusObject > pending, established; + + std::transform(m_Pending.begin(), m_Pending.end(), + std::back_inserter(pending), + [](const auto& item) -> util::StatusObject { + return item.second->ExtractStatus(); + }); + std::transform(m_AuthedLinks.begin(), m_AuthedLinks.end(), + std::back_inserter(established), + [](const auto& item) -> util::StatusObject { + return item.second->ExtractStatus(); + }); + + return {{"name", Name()}, + {"rank", uint64_t(Rank())}, + {"addr", m_ourAddr.ToString()}, + {"sessions", + util::StatusObject{{"pending", pending}, + {"established", established}}}}; } bool diff --git a/llarp/link/server.hpp b/llarp/link/server.hpp index 7349d571f..9c38f26b4 100644 --- a/llarp/link/server.hpp +++ b/llarp/link/server.hpp @@ -124,7 +124,7 @@ namespace llarp virtual const char* Name() const = 0; - util::StatusObject + util::StatusObject ExtractStatus() const override; void @@ -180,7 +180,8 @@ namespace llarp bool MapAddr(const RouterID& pk, ILinkSession* s); - void Tick(llarp_time_t now); + void + Tick(llarp_time_t now); LinkMessageHandler HandleMessage; TimeoutHandler HandleTimeout; diff --git a/llarp/link/utp_internal.hpp b/llarp/link/utp_internal.hpp index 306e48d0b..ad4ea0436 100644 --- a/llarp/link/utp_internal.hpp +++ b/llarp/link/utp_internal.hpp @@ -139,7 +139,7 @@ namespace llarp void Alive(); - util::StatusObject + util::StatusObject ExtractStatus() const override; /// base diff --git a/llarp/messages/relay_commit.cpp b/llarp/messages/relay_commit.cpp index 5931a452a..b357b70ab 100644 --- a/llarp/messages/relay_commit.cpp +++ b/llarp/messages/relay_commit.cpp @@ -281,7 +281,7 @@ namespace llarp using namespace std::placeholders; if(self->record.work && self->record.work->IsValid( - std::bind(&Crypto::shorthash, crypto, _1, _2), now)) + std::bind(&Crypto::shorthash, crypto, _1, _2), now)) { llarp::LogDebug("LRCM extended lifetime by ", self->record.work->extendedLifetime, " seconds for ", diff --git a/llarp/path/path.cpp b/llarp/path/path.cpp index b68d2ff14..7717bedfe 100644 --- a/llarp/path/path.cpp +++ b/llarp/path/path.cpp @@ -160,26 +160,24 @@ namespace llarp IHopHandler* PathContext::GetByUpstream(const RouterID& remote, const PathID_t& id) { - auto own = MapGet( - m_OurPaths, id, - [](__attribute__((unused)) const PathSet* s) -> bool { - // TODO: is this right? - return true; - }, - [remote, id](PathSet* p) -> IHopHandler* { - return p->GetByUpstream(remote, id); - }); + auto own = MapGet(m_OurPaths, id, + [](__attribute__((unused)) const PathSet* s) -> bool { + // TODO: is this right? + return true; + }, + [remote, id](PathSet* p) -> IHopHandler* { + return p->GetByUpstream(remote, id); + }); if(own) return own; - return MapGet( - m_TransitPaths, id, - [remote](const std::shared_ptr< TransitHop >& hop) -> bool { - return hop->info.upstream == remote; - }, - [](const std::shared_ptr< TransitHop >& h) -> IHopHandler* { - return h.get(); - }); + return MapGet(m_TransitPaths, id, + [remote](const std::shared_ptr< TransitHop >& hop) -> bool { + return hop->info.upstream == remote; + }, + [](const std::shared_ptr< TransitHop >& h) -> IHopHandler* { + return h.get(); + }); } bool @@ -196,14 +194,13 @@ namespace llarp IHopHandler* PathContext::GetByDownstream(const RouterID& remote, const PathID_t& id) { - return MapGet( - m_TransitPaths, id, - [remote](const std::shared_ptr< TransitHop >& hop) -> bool { - return hop->info.downstream == remote; - }, - [](const std::shared_ptr< TransitHop >& h) -> IHopHandler* { - return h.get(); - }); + return MapGet(m_TransitPaths, id, + [remote](const std::shared_ptr< TransitHop >& hop) -> bool { + return hop->info.downstream == remote; + }, + [](const std::shared_ptr< TransitHop >& h) -> IHopHandler* { + return h.get(); + }); } PathSet* diff --git a/llarp/router/abstractrouter.cpp b/llarp/router/abstractrouter.cpp index b56ffcd57..f70ba51b3 100644 --- a/llarp/router/abstractrouter.cpp +++ b/llarp/router/abstractrouter.cpp @@ -2,5 +2,4 @@ namespace llarp { - } // namespace llarp diff --git a/llarp/router/abstractrouter.hpp b/llarp/router/abstractrouter.hpp index 07fcea604..781479071 100644 --- a/llarp/router/abstractrouter.hpp +++ b/llarp/router/abstractrouter.hpp @@ -41,7 +41,6 @@ namespace llarp struct AbstractRouter : public util::IStateful { - virtual void OnSessionEstablished(RouterContact rc) = 0; @@ -139,12 +138,13 @@ namespace llarp CheckRenegotiateValid(RouterContact newRc, RouterContact oldRC) = 0; /// set router's service node whitelist - virtual void - SetRouterWhitelist(const std::vector & routers) =0 ; + virtual void + SetRouterWhitelist(const std::vector< RouterID > &routers) = 0; - /// visit each connected link session + /// visit each connected link session virtual void - ForEachPeer(std::function visit) const = 0; + ForEachPeer( + std::function< void(const ILinkSession *, bool) > visit) const = 0; }; } // namespace llarp diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 1f015fca2..a6d569718 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -679,9 +679,7 @@ namespace llarp Router::NumberOfConnectedRouters() const { size_t s = 0; - ForEachPeer([&s](const auto *, bool) { - ++s; - }); + ForEachPeer([&s](const auto *, bool) { ++s; }); return s; } diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index f84072ce2..35aca13cb 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -141,7 +141,7 @@ namespace llarp } void - SetRouterWhitelist(const std::vector & routers) override; + SetRouterWhitelist(const std::vector< RouterID > &routers) override; exit::Context & exitContext() override @@ -430,7 +430,8 @@ namespace llarp RouterID remote, const std::vector< RouterContact > &results) override; void - ForEachPeer(std::function< void(const ILinkSession *, bool) > visit) const override; + ForEachPeer( + std::function< void(const ILinkSession *, bool) > visit) const override; void ForEachPeer(std::function< void(ILinkSession *) > visit); diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index dc125a6dc..a42e2a1db 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -266,9 +266,7 @@ namespace llarp auto itr = m_PrefetchedTags.find(tag); if(itr == m_PrefetchedTags.end()) { - itr = m_PrefetchedTags - .emplace(tag, CachedTagResult(tag, this)) - .first; + itr = m_PrefetchedTags.emplace(tag, CachedTagResult(tag, this)).first; } for(const auto& introset : itr->second.result) { @@ -1196,11 +1194,11 @@ namespace llarp m_SNodeSessions.emplace( snode, std::make_unique< exit::SNodeSession >( - snode, - std::bind(&Endpoint::HandleWriteIPPacket, this, - std::placeholders::_1, - [themIP]() -> huint32_t { return themIP; }), - m_Router, 2, numHops)); + snode, + std::bind(&Endpoint::HandleWriteIPPacket, this, + std::placeholders::_1, + [themIP]() -> huint32_t { return themIP; }), + m_Router, 2, numHops)); } } @@ -1310,13 +1308,12 @@ namespace llarp } } // no converstation - return EnsurePathToService( - remote, - [](Address, OutboundContext* c) { - if(c) - c->UpdateIntroSet(true); - }, - 5000, false); + return EnsurePathToService(remote, + [](Address, OutboundContext* c) { + if(c) + c->UpdateIntroSet(true); + }, + 5000, false); } bool diff --git a/llarp/service/protocol.hpp b/llarp/service/protocol.hpp index 191c2a0b6..f962ac931 100644 --- a/llarp/service/protocol.hpp +++ b/llarp/service/protocol.hpp @@ -134,7 +134,8 @@ namespace llarp Verify(Crypto* c, const ServiceInfo& from) const; bool - HandleMessage(routing::IMessageHandler* h, AbstractRouter* r) const override; + HandleMessage(routing::IMessageHandler* h, + AbstractRouter* r) const override; }; } // namespace service } // namespace llarp diff --git a/llarp/util/status.hpp b/llarp/util/status.hpp index 02e06ea42..772101595 100644 --- a/llarp/util/status.hpp +++ b/llarp/util/status.hpp @@ -71,7 +71,7 @@ namespace llarp /// an entity that has a status that can be extracted struct IStateful { - virtual ~IStateful() {}; + virtual ~IStateful(){}; virtual StatusObject ExtractStatus() const = 0; diff --git a/llarp/util/threadpool.h b/llarp/util/threadpool.h index c7a068bbc..5adaff876 100644 --- a/llarp/util/threadpool.h +++ b/llarp/util/threadpool.h @@ -16,8 +16,8 @@ struct llarp_threadpool std::queue< std::function< void(void) > > jobs; llarp_threadpool(int workers, const char *name) - : impl( - std::make_unique< llarp::thread::ThreadPool >(workers, workers * 128)) + : impl(std::make_unique< llarp::thread::ThreadPool >(workers, + workers * 128)) { (void)name; } diff --git a/test/link/test_llarp_link.cpp b/test/link/test_llarp_link.cpp index 1caf95780..7ada763a0 100644 --- a/test/link/test_llarp_link.cpp +++ b/test/link/test_llarp_link.cpp @@ -346,11 +346,10 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob) TEST_F(LinkLayerTest, TestIWPAliceConnectToBob) { - /* Alice.link = llarp::iwp::NewServer( &crypto, Alice.encryptionKey, [&]() -> const llarp::RouterContact& { return Alice.GetRC(); }, - [&](llarp::ILinkSession* s, llarp_buffer_t buf) -> bool { + [&](llarp::ILinkSession* s, const llarp_buffer_t& buf) -> bool { if(Alice.gotLIM) { return AliceGotMessage(buf); @@ -358,7 +357,8 @@ TEST_F(LinkLayerTest, TestIWPAliceConnectToBob) else { llarp::LinkIntroMessage msg; - if(!msg.BDecode(&buf)) + ManagedBuffer copy{buf}; + if(!msg.BDecode(©.underlying)) return false; if(!s->GotLIM(&msg)) return false; @@ -371,7 +371,7 @@ TEST_F(LinkLayerTest, TestIWPAliceConnectToBob) llarp::LogInfo("alice established with bob"); }, [&](llarp::RouterContact, llarp::RouterContact) -> bool { return true; }, - [&](llarp::Signature& sig, llarp_buffer_t buf) -> bool { + [&](llarp::Signature& sig, const llarp_buffer_t& buf) -> bool { return crypto.sign(sig, Alice.signingKey, buf); }, [&](llarp::ILinkSession* session) { @@ -382,8 +382,8 @@ TEST_F(LinkLayerTest, TestIWPAliceConnectToBob) auto sendDiscardMessage = [](llarp::ILinkSession* s) -> bool { // send discard message in reply to complete unit test - byte_t tmp[32] = {0}; - auto otherBuf = llarp::StackBuffer< decltype(tmp) >(tmp); + std::array< byte_t, 32 > tmp; + llarp_buffer_t otherBuf(tmp); llarp::DiscardMessage discard; if(!discard.BEncode(&otherBuf)) return false; @@ -395,9 +395,10 @@ TEST_F(LinkLayerTest, TestIWPAliceConnectToBob) Bob.link = llarp::iwp::NewServer( &crypto, Bob.encryptionKey, [&]() -> const llarp::RouterContact& { return Bob.GetRC(); }, - [&](llarp::ILinkSession* s, llarp_buffer_t buf) -> bool { + [&](llarp::ILinkSession* s, const llarp_buffer_t& buf) -> bool { llarp::LinkIntroMessage msg; - if(!msg.BDecode(&buf)) + ManagedBuffer copy{buf}; + if(!msg.BDecode(©.underlying)) return false; if(!s->GotLIM(&msg)) return false; @@ -411,7 +412,7 @@ TEST_F(LinkLayerTest, TestIWPAliceConnectToBob) sendDiscardMessage); }, [&](llarp::RouterContact, llarp::RouterContact) -> bool { return true; }, - [&](llarp::Signature& sig, llarp_buffer_t buf) -> bool { + [&](llarp::Signature& sig, const llarp_buffer_t& buf) -> bool { return crypto.sign(sig, Bob.signingKey, buf); }, [&](llarp::ILinkSession* session) { @@ -428,5 +429,4 @@ TEST_F(LinkLayerTest, TestIWPAliceConnectToBob) ASSERT_TRUE(Alice.gotLIM); ASSERT_TRUE(Bob.gotLIM); ASSERT_TRUE(success); - */ -} +}; From 41bb83aababdb4d14f66ad336e9b311edde193b2 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 19 Feb 2019 08:32:26 -0500 Subject: [PATCH 03/16] more --- llarp/link/iwp.cpp | 48 +++++++++++++++++++++++++++++++++++++++-- llarp/router/router.hpp | 2 +- 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/llarp/link/iwp.cpp b/llarp/link/iwp.cpp index d545a2fd9..72cf038e7 100644 --- a/llarp/link/iwp.cpp +++ b/llarp/link/iwp.cpp @@ -138,6 +138,41 @@ namespace llarp } } + ILinkSession* + LinkLayer::NewOutboundSession(const RouterContact& rc, + const AddressInfo& ai) + { + (void)rc; + (void)ai; + // TODO: implement me + return nullptr; + } + + void + LinkLayer::SendFlowID(const Addr& to, const FlowID_t& flow) + { + // TODO: implement me + (void)to; + (void)flow; + } + + bool + LinkLayer::ShouldSendFlowID(const Addr& to) const + { + (void)to; + // TODO: implement me + return false; + } + + void + LinkLayer::SendReject(const Addr& to, const FlowID_t& flow, const char* msg) + { + // TODO: implement me + (void)to; + (void)flow; + (void)msg; + } + std::unique_ptr< ILinkLayer > NewServerFromRouter(AbstractRouter* r) { @@ -158,8 +193,17 @@ namespace llarp SessionRenegotiateHandler reneg, SignBufferFunc sign, TimeoutHandler t, SessionClosedHandler closed) { - return std::unique_ptr< ILinkLayer >( - new LinkLayer(c, enckey, getrc, h, est, reneg, sign, t, closed)); + (void)c; + (void)enckey; + (void)getrc; + (void)h; + (void)est; + (void)reneg; + (void)sign; + (void)t; + (void)closed; + // TODO: implement me + return nullptr; } } // namespace iwp } // namespace llarp diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index 35aca13cb..be758e7eb 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -211,7 +211,7 @@ namespace llarp uint16_t m_OutboundPort = 0; /// always maintain this many connections to other routers - size_t minConnectedRouters = 1; + size_t minConnectedRouters = 3; /// hard upperbound limit on the number of router to router connections size_t maxConnectedRouters = 2000; From 591f3c92a9b95a73bba2b735c302a3baa0419748 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 19 Feb 2019 10:06:39 -0500 Subject: [PATCH 04/16] make build records smaller --- llarp/crypto/encrypted_frame.hpp | 10 ++++++++++ llarp/path/pathbuilder.cpp | 9 +++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/llarp/crypto/encrypted_frame.hpp b/llarp/crypto/encrypted_frame.hpp index 1ab5efe21..49406191a 100644 --- a/llarp/crypto/encrypted_frame.hpp +++ b/llarp/crypto/encrypted_frame.hpp @@ -39,6 +39,16 @@ namespace llarp return *this; } + void + Resize(size_t sz) + { + if(sz <= EncryptedFrameSize) + { + _sz = sz; + UpdateBuffer(); + } + } + bool DecryptInPlace(const SecretKey& seckey, llarp::Crypto* crypto); diff --git a/llarp/path/pathbuilder.cpp b/llarp/path/pathbuilder.cpp index 037ae0c93..bb42c68fb 100644 --- a/llarp/path/pathbuilder.cpp +++ b/llarp/path/pathbuilder.cpp @@ -89,17 +89,18 @@ namespace llarp record.nextHop = hop.upstream; record.commkey = seckey_topublic(hop.commkey); - auto buf = frame.Buffer(); - buf->cur = buf->base + EncryptedFrameOverheadSize; + llarp_buffer_t buf(frame.data(), frame.size()); + buf.cur = buf.base + EncryptedFrameOverheadSize; // encode record - if(!record.BEncode(buf)) + if(!record.BEncode(&buf)) { // failed to encode? LogError("Failed to generate Commit Record"); - DumpBuffer(*buf); + DumpBuffer(buf); delete ctx; return; } + frame.Resize(buf.cur - buf.base); // use ephemeral keypair for frame SecretKey framekey; ctx->crypto->encryption_keygen(framekey); From 393e70dfa6c9ab290a084f8568605b5944440583 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 19 Feb 2019 11:58:58 -0500 Subject: [PATCH 05/16] publish new introset on path death --- llarp/service/endpoint.cpp | 9 +++------ llarp/service/endpoint.hpp | 2 +- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index a42e2a1db..7e5ff71cd 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -972,9 +972,6 @@ namespace llarp p->SetDropHandler(std::bind( &Endpoint::OutboundContext::HandleDataDrop, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); - p->SetDeadChecker(std::bind(&Endpoint::CheckPathIsDead, m_Endpoint, - std::placeholders::_1, - std::placeholders::_2)); } void @@ -985,10 +982,10 @@ namespace llarp } bool - Endpoint::CheckPathIsDead(__attribute__((unused)) path::Path* p, - __attribute__((unused)) llarp_time_t latency) + Endpoint::CheckPathIsDead(path::Path*, llarp_time_t) { - return false; + Logic()->queue_job({this, &HandlePathDead}); + return true; } bool diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index 58b876fef..d6e235eb2 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -31,7 +31,7 @@ namespace llarp { /// minimum interval for publishing introsets static const llarp_time_t INTROSET_PUBLISH_INTERVAL = - DEFAULT_PATH_LIFETIME / 4; + DEFAULT_PATH_LIFETIME / 8; static const llarp_time_t INTROSET_PUBLISH_RETRY_INTERVAL = 5000; From b1dd10c00737a603f3c85ef26dfc03fc710c5584 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 19 Feb 2019 12:04:06 -0500 Subject: [PATCH 06/16] fix previous commit --- llarp/service/endpoint.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 7e5ff71cd..03d03c2b1 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -984,7 +984,12 @@ namespace llarp bool Endpoint::CheckPathIsDead(path::Path*, llarp_time_t) { - Logic()->queue_job({this, &HandlePathDead}); + RouterLogic()->call_later( + {100, this, [](void* u, uint64_t, uint64_t left) { + if(left) + return; + HandlePathDead(u); + }}); return true; } From be305169866ad99b55e7c6ba2cb1cba21dc0b81b Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Wed, 20 Feb 2019 07:09:00 -0500 Subject: [PATCH 07/16] use std::string for integers because i am tired of fixing formatting strings --- llarp/util/bencode.cpp | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/llarp/util/bencode.cpp b/llarp/util/bencode.cpp index e04a437dc..20fbfa490 100644 --- a/llarp/util/bencode.cpp +++ b/llarp/util/bencode.cpp @@ -73,18 +73,10 @@ bencode_write_bytestring(llarp_buffer_t* buff, const void* data, size_t sz) bool bencode_write_uint64(llarp_buffer_t* buff, uint64_t i) { -#ifndef __LP64__ - if(!buff->writef("i%llu", i)) -#else - if(!buff->write("i%lu", i)) -#endif - { - return false; - } - - static const char letter[1] = {'e'}; - assert(std::distance(std::begin(letter), std::end(letter)) == 1); - return buff->write(std::begin(letter), std::end(letter)); + std::string str = "i"; + str += std::to_string(i); + str += "e"; + return buff->write(str.begin(), str.end()); } bool From 7428855698c93f74752755ea7a2787ff0a5c099c Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Wed, 20 Feb 2019 07:09:18 -0500 Subject: [PATCH 08/16] try fixing timeout bug after 6 or so hours --- llarp/router/abstractrouter.hpp | 4 ++++ llarp/router/router.cpp | 13 ++++++++----- llarp/router/router.hpp | 2 +- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/llarp/router/abstractrouter.hpp b/llarp/router/abstractrouter.hpp index 781479071..4f6acdee6 100644 --- a/llarp/router/abstractrouter.hpp +++ b/llarp/router/abstractrouter.hpp @@ -99,6 +99,10 @@ namespace llarp virtual void OnConnectTimeout(ILinkSession *session) = 0; + /// connect to N random routers + virtual void + ConnectToRandomRouters(int N) = 0; + /// called by link when a remote session has no more sessions open virtual void SessionClosed(RouterID remote) = 0; diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index a6d569718..01c6cb85f 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -1084,15 +1084,18 @@ namespace llarp LogError("we have no bootstrap nodes specified"); } - if(inboundLinks.size() == 0) + if(!IsServiceNode()) { + size_t connected = NumberOfConnectedRouters(); + if(connected < minConnectedRouters) + { + size_t dlt = connected - minConnectedRouters; + LogInfo("connecting to ", dlt, " random routers to keep alive"); + ConnectToRandomRouters(dlt); + } paths.BuildPaths(now); hiddenServiceContext.Tick(now); } - if(NumberOfConnectedRouters() < minConnectedRouters) - { - ConnectToRandomRouters(minConnectedRouters); - } _exitContext.Tick(now); if(rpcCaller) rpcCaller->Tick(now); diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index be758e7eb..ae542b8c2 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -478,7 +478,7 @@ namespace llarp const PathID_t &rxid) override; void - ConnectToRandomRouters(int N); + ConnectToRandomRouters(int N) override; size_t NumberOfConnectedRouters() const override; From f67ffaf900dc7459c0228dcb5ce200643ab091a3 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Wed, 20 Feb 2019 07:22:39 -0500 Subject: [PATCH 09/16] fix typo --- llarp/router/router.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 01c6cb85f..9292bd6c6 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -1089,7 +1089,7 @@ namespace llarp size_t connected = NumberOfConnectedRouters(); if(connected < minConnectedRouters) { - size_t dlt = connected - minConnectedRouters; + size_t dlt = minConnectedRouters - connected; LogInfo("connecting to ", dlt, " random routers to keep alive"); ConnectToRandomRouters(dlt); } From ee95c1ed0c8aeb07c9206c152c9e1913e2938239 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Wed, 20 Feb 2019 08:28:35 -0500 Subject: [PATCH 10/16] make dht work again ._. --- llarp/service/tag.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/llarp/service/tag.hpp b/llarp/service/tag.hpp index da4353cd4..869e09a9d 100644 --- a/llarp/service/tag.hpp +++ b/llarp/service/tag.hpp @@ -65,7 +65,7 @@ namespace llarp bool Empty() const { - return ToString().empty(); + return data()[0] == 0; } using Hash = AlignedBuffer< SIZE >::Hash; From 262000570ba329d41365a0ce84d8a69e7aebee36 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 21 Feb 2019 11:21:41 -0500 Subject: [PATCH 11/16] use source path's intro as being able to reply on convo tags --- llarp/service/endpoint.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 03d03c2b1..5cc0798f2 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -895,11 +895,14 @@ namespace llarp } bool - Endpoint::HandleDataMessage(__attribute__((unused)) const PathID_t& src, - ProtocolMessage* msg) + Endpoint::HandleDataMessage(const PathID_t& src, ProtocolMessage* msg) { + auto path = GetPathByID(src); + if(path == nullptr) + return false; msg->sender.UpdateAddr(); PutIntroFor(msg->tag, msg->introReply); + PutSenderFor(msg->tag, path->intro); EnsureReplyPath(msg->sender); return ProcessDataMessage(msg); } From 3baf89e8ce572223973cf1114da9dcac752e283a Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 21 Feb 2019 11:45:33 -0500 Subject: [PATCH 12/16] more --- docs/proto_v0.txt | 2 +- llarp/CMakeLists.txt | 1 + llarp/service/endpoint.cpp | 41 ++++++++++++++++++++++++++++++++------ llarp/service/endpoint.hpp | 16 ++++++++++++--- llarp/service/handler.hpp | 6 ++++++ 5 files changed, 56 insertions(+), 10 deletions(-) diff --git a/docs/proto_v0.txt b/docs/proto_v0.txt index 2710ea397..114f0b120 100644 --- a/docs/proto_v0.txt +++ b/docs/proto_v0.txt @@ -639,7 +639,7 @@ sent inside a HSFM encrypted with a shared secret. n: uint_message_sequence_number, o: N seconds until this converstation plans terminate, s: SI of sender, - t: "<16 bytes converstation tag present only when n is 0>", + t: "<16 bytes converstation_tag>, v: 0 } diff --git a/llarp/CMakeLists.txt b/llarp/CMakeLists.txt index e2054623c..01ea34465 100644 --- a/llarp/CMakeLists.txt +++ b/llarp/CMakeLists.txt @@ -32,6 +32,7 @@ set(LIB_UTIL_SRC add_library(${UTIL_LIB} STATIC ${LIB_UTIL_SRC}) target_include_directories(${UTIL_LIB} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) +target_include_directories(${UTIL_LIB} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include) # cut back on fluff if (NOT WIN32) diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 5cc0798f2..0b6faff96 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -491,6 +491,28 @@ namespace llarp return true; } + void + Endpoint::PutReplyIntroFor(const ConvoTag& tag, const Introduction& intro) + { + auto itr = m_Sessions.find(tag); + if(itr == m_Sessions.end()) + { + itr = m_Sessions.emplace(tag, Session{}).first; + } + itr->second.replyIntro = intro; + itr->second.lastUsed = Now(); + } + + bool + Endpoint::GetReplyIntroFor(const ConvoTag& tag, Introduction& intro) const + { + auto itr = m_Sessions.find(tag); + if(itr == m_Sessions.end()) + return false; + intro = itr->second.replyIntro; + return true; + } + bool Endpoint::GetConvoTagsForService(const ServiceInfo& info, std::set< ConvoTag >& tags) const @@ -902,7 +924,7 @@ namespace llarp return false; msg->sender.UpdateAddr(); PutIntroFor(msg->tag, msg->introReply); - PutSenderFor(msg->tag, path->intro); + PutReplyIntroFor(msg->tag, path->intro); EnsureReplyPath(msg->sender); return ProcessDataMessage(msg); } @@ -1279,7 +1301,8 @@ namespace llarp ProtocolMessage m(f.T); m.proto = t; m.introReply = p->intro; - m.sender = m_Identity.pub; + PutReplyIntroFor(f.T, m.introReply); + m.sender = m_Identity.pub; m.PutBuffer(data); f.N.Randomize(); f.S = GetSeqNoForConvo(f.T); @@ -1617,7 +1640,7 @@ namespace llarp {ex, &AsyncKeyExchange::Encrypt}); } - void + bool Endpoint::SendContext::Send(ProtocolFrame& msg) { auto path = m_PathSet->GetPathByRouter(remoteIntro.router); @@ -1632,6 +1655,7 @@ namespace llarp llarp::LogDebug("sent data to ", remoteIntro.pathID, " on ", remoteIntro.router); lastGoodSend = m_Endpoint->Now(); + return true; } else llarp::LogError("Failed to send frame on path"); @@ -1639,6 +1663,7 @@ namespace llarp else llarp::LogError("cannot send because we have no path to ", remoteIntro.router); + return false; } std::string @@ -1845,8 +1870,12 @@ namespace llarp if(m_DataHandler->GetCachedSessionKeyFor(f.T, shared)) { ProtocolMessage m; - m.proto = t; - m.introReply = path->intro; + m.proto = t; + if(!m_DataHandler->GetReplyIntroFor(f.T, m.introReply)) + { + m_DataHandler->PutReplyIntroFor(f.T, path->intro); + m.introReply = path->intro; + } m_DataHandler->PutIntroFor(f.T, remoteIntro); m.sender = m_Endpoint->m_Identity.pub; m.PutBuffer(payload); @@ -1866,11 +1895,11 @@ namespace llarp msg.P = remoteIntro.pathID; msg.Y.Randomize(); - ++sequenceNo; if(path->SendRoutingMessage(&msg, m_Endpoint->Router())) { llarp::LogDebug("sent message via ", remoteIntro.pathID, " on ", remoteIntro.router); + ++sequenceNo; lastGoodSend = now; } else diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index d6e235eb2..76781c4c3 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -221,7 +221,7 @@ namespace llarp /// send a fully encrypted hidden service frame /// via a path on our pathset with path id p - void + bool Send(ProtocolFrame& f); llarp::SharedSecret sharedKey; @@ -390,6 +390,14 @@ namespace llarp bool GetIntroFor(const ConvoTag& remote, Introduction& intro) const override; + void + PutReplyIntroFor(const ConvoTag& remote, + const Introduction& intro) override; + + bool + GetReplyIntroFor(const ConvoTag& remote, + Introduction& intro) const override; + bool GetConvoTagsForService(const ServiceInfo& si, std::set< ConvoTag >& tag) const override; @@ -540,6 +548,7 @@ namespace llarp struct Session : public util::IStateful { + Introduction replyIntro; SharedSecret sharedKey; ServiceInfo remote; Introduction intro; @@ -550,9 +559,10 @@ namespace llarp ExtractStatus() const override { util::StatusObject obj{{"lastUsed", lastUsed}, + {"replyIntro", replyIntro.ExtractStatus()}, {"remote", remote.Addr().ToString()}, - {"seqno", seqno}}; - obj.Put("intro", intro.ExtractStatus()); + {"seqno", seqno}, + {"intro", intro.ExtractStatus()}}; return obj; }; diff --git a/llarp/service/handler.hpp b/llarp/service/handler.hpp index eea252b97..25e3a28d9 100644 --- a/llarp/service/handler.hpp +++ b/llarp/service/handler.hpp @@ -37,6 +37,12 @@ namespace llarp virtual bool GetIntroFor(const ConvoTag& remote, Introduction& intro) const = 0; + virtual void + PutReplyIntroFor(const ConvoTag& remote, const Introduction& intro) = 0; + + virtual bool + GetReplyIntroFor(const ConvoTag& remote, Introduction& intro) const = 0; + virtual bool GetConvoTagsForService(const ServiceInfo& si, std::set< ConvoTag >& tag) const = 0; From 762728c7eca82d3db10b9dd43ade01abc31e1a26 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 21 Feb 2019 12:13:27 -0500 Subject: [PATCH 13/16] fix --- llarp/service/endpoint.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 0b6faff96..83dcc8672 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -920,11 +920,10 @@ namespace llarp Endpoint::HandleDataMessage(const PathID_t& src, ProtocolMessage* msg) { auto path = GetPathByID(src); - if(path == nullptr) - return false; + if(path) + PutReplyIntroFor(msg->tag, path->intro); msg->sender.UpdateAddr(); PutIntroFor(msg->tag, msg->introReply); - PutReplyIntroFor(msg->tag, path->intro); EnsureReplyPath(msg->sender); return ProcessDataMessage(msg); } @@ -1636,6 +1635,7 @@ namespace llarp ex->msg.PutBuffer(payload); ex->msg.introReply = path->intro; + m_DataHandler->PutReplyIntroFor(currentConvoTag, path->intro); llarp_threadpool_queue_job(m_Endpoint->Worker(), {ex, &AsyncKeyExchange::Encrypt}); } From a6b9a19b3dc20dbbd5f9bad7af3b6b1ba036c0c3 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 26 Feb 2019 10:09:37 -0500 Subject: [PATCH 14/16] start working on 352 --- Makefile | 4 +- libabyss/main.cpp | 33 +++++++++++++--- libabyss/src/client.cpp | 85 +++++++++-------------------------------- 3 files changed, 46 insertions(+), 76 deletions(-) diff --git a/Makefile b/Makefile index 3784f6ebf..605ce7bc3 100644 --- a/Makefile +++ b/Makefile @@ -51,7 +51,7 @@ GRADLE ?= gradle JAVA_HOME ?= /usr/lib/jvm/default-java # jsonrpc server -JSONRPC ?= OFF +JSONRPC ?= ON # native avx2 code AVX2 ?= OFF # non x86 target @@ -63,7 +63,7 @@ NETNS ?= OFF # cross compile? CROSS ?= OFF # build liblokinet-shared.so -SHARED_LIB ?= ON +SHARED_LIB ?= OFF # enable generating coverage COVERAGE ?= OFF COVERAGE_OUTDIR ?= "$(TMPDIR)/lokinet-coverage" diff --git a/libabyss/main.cpp b/libabyss/main.cpp index 5d4d174d1..bd8a3672f 100644 --- a/libabyss/main.cpp +++ b/libabyss/main.cpp @@ -24,15 +24,28 @@ struct DemoHandler : public abyss::httpd::IRPCHandler struct DemoCall : public abyss::http::IRPCClientHandler { - DemoCall(abyss::http::ConnImpl* impl) : abyss::http::IRPCClientHandler(impl) + std::function< void(void) > m_Callback; + llarp::Logic* m_Logic; + + DemoCall(abyss::http::ConnImpl* impl, llarp::Logic* logic, + std::function< void(void) > callback) + : abyss::http::IRPCClientHandler(impl) + , m_Callback(callback) + , m_Logic(logic) { llarp::LogInfo("new call"); } - bool - HandleResponse(abyss::http::RPC_Response resp) override + static void + CallCallback(void* u) + { + static_cast< DemoCall* >(u)->m_Callback(); + } + + bool HandleResponse(abyss::http::RPC_Response) override { - llarp::json::ToString(resp, std::cout); + llarp::LogInfo("response get"); + m_Logic->queue_job({this, &CallCallback}); return true; } @@ -51,10 +64,18 @@ struct DemoCall : public abyss::http::IRPCClientHandler struct DemoClient : public abyss::http::JSONRPC { + llarp_ev_loop* m_Loop; + llarp::Logic* m_Logic; + + DemoClient(llarp_ev_loop* l, llarp::Logic* logic) + : abyss::http::JSONRPC(), m_Loop(l), m_Logic(logic) + { + } + abyss::http::IRPCClientHandler* NewConn(abyss::http::ConnImpl* impl) { - return new DemoCall(impl); + return new DemoCall(impl, m_Logic, std::bind(&llarp_ev_loop_stop, m_Loop)); } void @@ -109,7 +130,7 @@ main(__attribute__((unused)) int argc, __attribute__((unused)) char* argv[]) addr.sin_port = htons(1222); addr.sin_family = AF_INET; DemoServer serv; - DemoClient client; + DemoClient client(loop, logic); llarp::Addr a(addr); while(true) { diff --git a/libabyss/src/client.cpp b/libabyss/src/client.cpp index 5f0d00069..d2b31e680 100644 --- a/libabyss/src/client.cpp +++ b/libabyss/src/client.cpp @@ -70,7 +70,7 @@ namespace abyss ConnImpl* self = static_cast< ConnImpl* >(conn->user); if(!self->ProcessRead((const char*)buf.base, buf.sz)) { - self->CloseError(); + self->CloseError("on read failed"); } } @@ -138,8 +138,8 @@ namespace abyss Close(); return true; case json::IParser::eParseError: - handler->HandleError(); - return false; + CloseError("json parse error"); + return true; default: return false; } @@ -195,8 +195,9 @@ namespace abyss } void - CloseError() + CloseError(const char* msg) { + LogError("CloseError: ", msg); if(handler) handler->HandleError(); handler = nullptr; @@ -221,72 +222,20 @@ namespace abyss std::stringstream ss; json::ToString(m_RequestBody, ss); body = ss.str(); - // request base - char buf[512] = {0}; - int sz = snprintf(buf, sizeof(buf), - "POST /rpc HTTP/1.0\r\nContent-Type: " - "application/json\r\nContent-Length: %zu\r\nAccept: " - "application/json\r\n", - body.size()); - if(sz <= 0) - return; - if(!llarp_tcp_conn_async_write(m_Conn, llarp_buffer_t(buf, sz))) - { - llarp::LogError("failed to write first part of request"); - CloseError(); - return; - } - // header delimiter - buf[0] = ':'; - buf[1] = ' '; - // CRLF - buf[2] = '\r'; - buf[3] = '\n'; - // write extra request header + m_SendHeaders.emplace("Content-Type", "application/json"); + m_SendHeaders.emplace("Content-Length", std::to_string(body.size())); + m_SendHeaders.emplace("Accept", "application/json"); + std::stringstream request; + request << "POST /rpc HTTP/1.0\r\n"; for(const auto& item : m_SendHeaders) + request << item.first << ": " << item.second << "\r\n"; + request << "\r\n" << body; + std::string buf = request.str(); + + if(!llarp_tcp_conn_async_write(m_Conn, + llarp_buffer_t(buf.c_str(), buf.size()))) { - // header name - if(!llarp_tcp_conn_async_write( - m_Conn, llarp_buffer_t(item.first.c_str(), item.first.size()))) - { - CloseError(); - return; - } - // header delimiter - if(!llarp_tcp_conn_async_write(m_Conn, - llarp_buffer_t(buf, 2 * sizeof(char)))) - { - CloseError(); - return; - } - // header value - if(!llarp_tcp_conn_async_write( - m_Conn, - llarp_buffer_t(item.second.c_str(), item.second.size()))) - { - CloseError(); - return; - } - // CRLF - if(!llarp_tcp_conn_async_write( - m_Conn, llarp_buffer_t(buf + 2, 2 * sizeof(char)))) - { - CloseError(); - return; - } - } - // CRLF - if(!llarp_tcp_conn_async_write( - m_Conn, llarp_buffer_t(buf + 2, 2 * sizeof(char)))) - { - CloseError(); - return; - } - // request body - if(!llarp_tcp_conn_async_write( - m_Conn, llarp_buffer_t(body.c_str(), body.size()))) - { - CloseError(); + CloseError("failed to write request"); return; } llarp::LogDebug("request sent"); From f8d6becce8d30bb10d73c4199c925ef7617ee5cf Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Wed, 27 Feb 2019 07:55:26 -0500 Subject: [PATCH 15/16] make whitelist happy --- llarp/link/server.hpp | 4 +- llarp/link/utp.cpp | 5 +- llarp/path/path.cpp | 47 +++++++++---------- llarp/router/abstractrouter.hpp | 4 +- llarp/router/router.cpp | 26 ++++++----- llarp/router/router.hpp | 8 ++-- llarp/router_contact.hpp | 6 +++ llarp/service/endpoint.cpp | 13 +++--- llarp/util/threadpool.h | 4 +- test/link/test_llarp_link.cpp | 82 +++++++++++++-------------------- 10 files changed, 94 insertions(+), 105 deletions(-) diff --git a/llarp/link/server.hpp b/llarp/link/server.hpp index 9c38f26b4..97a460f5e 100644 --- a/llarp/link/server.hpp +++ b/llarp/link/server.hpp @@ -30,7 +30,9 @@ namespace llarp using GetRCFunc = std::function< const llarp::RouterContact&(void) >; /// handler of session established - using SessionEstablishedHandler = std::function< void(llarp::RouterContact) >; + /// return false to reject + /// return true to accept + using SessionEstablishedHandler = std::function< bool(ILinkSession*) >; /// f(new, old) /// handler of session renegotiation diff --git a/llarp/link/utp.cpp b/llarp/link/utp.cpp index cbec08545..40474ee64 100644 --- a/llarp/link/utp.cpp +++ b/llarp/link/utp.cpp @@ -777,10 +777,10 @@ namespace llarp Close(); return false; } - EnterState(eSessionReady); /// future LIM are used for session renegotiation GotLIM = std::bind(&Session::GotSessionRenegotiate, this, std::placeholders::_1); + EnterState(eSessionReady); return true; } @@ -982,7 +982,8 @@ namespace llarp if(st == eSessionReady) { parent->MapAddr(remoteRC.pubkey.as_array(), this); - parent->SessionEstablished(remoteRC); + if(!parent->SessionEstablished(this)) + Close(); } } diff --git a/llarp/path/path.cpp b/llarp/path/path.cpp index b68d2ff14..7717bedfe 100644 --- a/llarp/path/path.cpp +++ b/llarp/path/path.cpp @@ -160,26 +160,24 @@ namespace llarp IHopHandler* PathContext::GetByUpstream(const RouterID& remote, const PathID_t& id) { - auto own = MapGet( - m_OurPaths, id, - [](__attribute__((unused)) const PathSet* s) -> bool { - // TODO: is this right? - return true; - }, - [remote, id](PathSet* p) -> IHopHandler* { - return p->GetByUpstream(remote, id); - }); + auto own = MapGet(m_OurPaths, id, + [](__attribute__((unused)) const PathSet* s) -> bool { + // TODO: is this right? + return true; + }, + [remote, id](PathSet* p) -> IHopHandler* { + return p->GetByUpstream(remote, id); + }); if(own) return own; - return MapGet( - m_TransitPaths, id, - [remote](const std::shared_ptr< TransitHop >& hop) -> bool { - return hop->info.upstream == remote; - }, - [](const std::shared_ptr< TransitHop >& h) -> IHopHandler* { - return h.get(); - }); + return MapGet(m_TransitPaths, id, + [remote](const std::shared_ptr< TransitHop >& hop) -> bool { + return hop->info.upstream == remote; + }, + [](const std::shared_ptr< TransitHop >& h) -> IHopHandler* { + return h.get(); + }); } bool @@ -196,14 +194,13 @@ namespace llarp IHopHandler* PathContext::GetByDownstream(const RouterID& remote, const PathID_t& id) { - return MapGet( - m_TransitPaths, id, - [remote](const std::shared_ptr< TransitHop >& hop) -> bool { - return hop->info.downstream == remote; - }, - [](const std::shared_ptr< TransitHop >& h) -> IHopHandler* { - return h.get(); - }); + return MapGet(m_TransitPaths, id, + [remote](const std::shared_ptr< TransitHop >& hop) -> bool { + return hop->info.downstream == remote; + }, + [](const std::shared_ptr< TransitHop >& h) -> IHopHandler* { + return h.get(); + }); } PathSet* diff --git a/llarp/router/abstractrouter.hpp b/llarp/router/abstractrouter.hpp index 49a44f20d..a159da4ea 100644 --- a/llarp/router/abstractrouter.hpp +++ b/llarp/router/abstractrouter.hpp @@ -47,8 +47,8 @@ namespace llarp struct AbstractRouter : public util::IStateful { - virtual void - OnSessionEstablished(RouterContact rc) = 0; + virtual bool + OnSessionEstablished(ILinkSession *) = 0; virtual bool HandleRecvLinkMessageBuffer(ILinkSession *from, diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 988ea0f6d..49acee723 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -192,11 +192,10 @@ namespace llarp return false; } - void - Router::OnSessionEstablished(RouterContact rc) + bool + Router::OnSessionEstablished(ILinkSession * s) { - async_verify_RC(rc, nullptr); - LogInfo("session with ", RouterID(rc.pubkey), " established"); + return async_verify_RC(s->GetRemoteRC()); } Router::Router(struct llarp_threadpool *_tp, struct llarp_ev_loop *__netloop, @@ -989,7 +988,8 @@ namespace llarp return false; // store it in nodedb async - async_verify_RC(newrc, nullptr); + if(!async_verify_RC(newrc)) + return false; // update dht if required if(dht()->impl->Nodes()->HasNode(dht::Key_t{newrc.pubkey})) { @@ -1225,20 +1225,21 @@ namespace llarp return false; } - void - Router::async_verify_RC(const RouterContact &rc, ILinkLayer *link) + bool + Router::async_verify_RC(const RouterContact &rc) { - if(pendingVerifyRC.count(rc.pubkey)) - return; - if(rc.IsPublicRouter() && whitelistRouters) + + if(rc.IsPublicRouter() && whitelistRouters && IsServiceNode()) { if(lokinetRouters.find(rc.pubkey) == lokinetRouters.end()) { LogInfo(rc.pubkey, " is NOT a valid service node, rejecting"); - link->CloseSessionTo(rc.pubkey); - return; + return false; } } + if(pendingVerifyRC.count(rc.pubkey)) + return true; + LogInfo("session with ", RouterID(rc.pubkey), " established"); llarp_async_verify_rc *job = &pendingVerifyRC[rc.pubkey]; async_verify_context *ctx = new async_verify_context(); ctx->router = this; @@ -1263,6 +1264,7 @@ namespace llarp job->hook = &Router::on_verify_client_rc; llarp_nodedb_async_verify(job); + return true; } void diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index 62cb46218..67629e104 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -321,8 +321,8 @@ namespace llarp ~Router(); - void - OnSessionEstablished(RouterContact rc) override; + bool + OnSessionEstablished(ILinkSession *) override; bool HandleRecvLinkMessageBuffer(ILinkSession *from, @@ -504,8 +504,8 @@ namespace llarp bool GetRandomConnectedRouter(RouterContact &result) const override; - void - async_verify_RC(const RouterContact &rc, ILinkLayer *link); + bool + async_verify_RC(const RouterContact &rc); void HandleDHTLookupForSendTo(RouterID remote, diff --git a/llarp/router_contact.hpp b/llarp/router_contact.hpp index 54bf0c6e5..34d3b0a90 100644 --- a/llarp/router_contact.hpp +++ b/llarp/router_contact.hpp @@ -122,6 +122,12 @@ namespace llarp && nickname == other.nickname && last_updated == other.last_updated && netID == other.netID; } + + bool + operator!=(const RouterContact & other) const + { + return !(*this == other); + } void Clear(); diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 1ea143047..12bf459bd 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -1328,13 +1328,12 @@ namespace llarp } } // no converstation - return EnsurePathToService( - remote, - [](Address, OutboundContext* c) { - if(c) - c->UpdateIntroSet(true); - }, - 5000, false); + return EnsurePathToService(remote, + [](Address, OutboundContext* c) { + if(c) + c->UpdateIntroSet(true); + }, + 5000, false); } bool diff --git a/llarp/util/threadpool.h b/llarp/util/threadpool.h index c7a068bbc..5adaff876 100644 --- a/llarp/util/threadpool.h +++ b/llarp/util/threadpool.h @@ -16,8 +16,8 @@ struct llarp_threadpool std::queue< std::function< void(void) > > jobs; llarp_threadpool(int workers, const char *name) - : impl( - std::make_unique< llarp::thread::ThreadPool >(workers, workers * 128)) + : impl(std::make_unique< llarp::thread::ThreadPool >(workers, + workers * 128)) { (void)name; } diff --git a/test/link/test_llarp_link.cpp b/test/link/test_llarp_link.cpp index 7ada763a0..0d48d85bd 100644 --- a/test/link/test_llarp_link.cpp +++ b/test/link/test_llarp_link.cpp @@ -189,9 +189,9 @@ TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob) return true; } }, - [&](llarp::RouterContact rc) { - ASSERT_EQ(rc, Bob.GetRC()); - llarp::LogInfo("alice established with bob"); + [&](llarp::ILinkSession * s) -> bool { + const auto rc = s->GetRemoteRC(); + return rc.pubkey == Bob.GetRC().pubkey; }, [&](llarp::RouterContact, llarp::RouterContact) -> bool { return true; }, [&](llarp::Signature& sig, const llarp_buffer_t& buf) -> bool { @@ -228,10 +228,11 @@ TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob) Bob.gotLIM = true; return sendDiscardMessage(s); }, - [&](llarp::RouterContact rc) { - ASSERT_EQ(rc, Alice.GetRC()); + [&](llarp::ILinkSession * s) -> bool { + if(s->GetRemoteRC().pubkey != Alice.GetRC().pubkey) + return false; llarp::LogInfo("bob established with alice"); - Bob.link->VisitSessionByPubkey(Alice.GetRC().pubkey.as_array(), + return Bob.link->VisitSessionByPubkey(Alice.GetRC().pubkey.as_array(), sendDiscardMessage); }, [&](llarp::RouterContact newrc, llarp::RouterContact oldrc) -> bool { @@ -252,7 +253,6 @@ TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob) ASSERT_TRUE(Alice.link->TryEstablishTo(Bob.GetRC())); RunMainloop(); - ASSERT_TRUE(Alice.gotLIM); ASSERT_TRUE(Bob.gotLIM); ASSERT_TRUE(success); } @@ -262,26 +262,14 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob) Alice.link = llarp::utp::NewServer( &crypto, Alice.encryptionKey, [&]() -> const llarp::RouterContact& { return Alice.GetRC(); }, - [&](llarp::ILinkSession* s, const llarp_buffer_t& buf) -> bool { - if(Alice.gotLIM) - { - return AliceGotMessage(buf); - } - else - { - llarp::LinkIntroMessage msg; - ManagedBuffer copy{buf}; - if(!msg.BDecode(©.underlying)) - return false; - if(!s->GotLIM(&msg)) - return false; - Alice.gotLIM = true; - return true; - } + [&](llarp::ILinkSession*, const llarp_buffer_t& buf) -> bool { + return AliceGotMessage(buf); }, - [&](llarp::RouterContact rc) { - ASSERT_EQ(rc, Bob.GetRC()); + [&](llarp::ILinkSession * s) -> bool { + if(s->GetRemoteRC().pubkey != Bob.GetRC().pubkey) + return false; llarp::LogInfo("alice established with bob"); + return true; }, [&](llarp::RouterContact, llarp::RouterContact) -> bool { return true; }, [&](llarp::Signature& sig, const llarp_buffer_t& buf) -> bool { @@ -293,36 +281,28 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob) }, [&](llarp::RouterID router) { ASSERT_EQ(router, Bob.GetRouterID()); }); - auto sendDiscardMessage = [](llarp::ILinkSession* s) -> bool { - // send discard message in reply to complete unit test - std::array< byte_t, 32 > tmp; - llarp_buffer_t otherBuf(tmp); - llarp::DiscardMessage discard; - if(!discard.BEncode(&otherBuf)) - return false; - otherBuf.sz = otherBuf.cur - otherBuf.base; - otherBuf.cur = otherBuf.base; - return s->SendMessageBuffer(otherBuf); - }; - Bob.link = llarp::utp::NewServer( &crypto, Bob.encryptionKey, [&]() -> const llarp::RouterContact& { return Bob.GetRC(); }, - [&](llarp::ILinkSession* s, const llarp_buffer_t& buf) -> bool { - llarp::LinkIntroMessage msg; - ManagedBuffer copy{buf}; - if(!msg.BDecode(©.underlying)) - return false; - if(!s->GotLIM(&msg)) - return false; - Bob.gotLIM = true; + [&](llarp::ILinkSession*, const llarp_buffer_t& ) -> bool { return true; }, - [&](llarp::RouterContact rc) { - ASSERT_EQ(rc, Alice.GetRC()); + [&](llarp::ILinkSession * s) -> bool { + if(s->GetRemoteRC().pubkey != Alice.GetRC().pubkey) + return false; llarp::LogInfo("bob established with alice"); - Bob.link->VisitSessionByPubkey(Alice.GetRC().pubkey.as_array(), - sendDiscardMessage); + logic->queue_job({s, [](void * u) { + llarp::ILinkSession * self = static_cast(u); + std::array< byte_t, 32 > tmp; + llarp_buffer_t otherBuf(tmp); + llarp::DiscardMessage discard; + if(!discard.BEncode(&otherBuf)) + return; + otherBuf.sz = otherBuf.cur - otherBuf.base; + otherBuf.cur = otherBuf.base; + self->SendMessageBuffer(otherBuf); + }}); + return true; }, [&](llarp::RouterContact, llarp::RouterContact) -> bool { return true; }, [&](llarp::Signature& sig, const llarp_buffer_t& buf) -> bool { @@ -339,11 +319,12 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob) ASSERT_TRUE(Alice.link->TryEstablishTo(Bob.GetRC())); RunMainloop(); - ASSERT_TRUE(Alice.gotLIM); ASSERT_TRUE(Bob.gotLIM); ASSERT_TRUE(success); } +/* + TEST_F(LinkLayerTest, TestIWPAliceConnectToBob) { Alice.link = llarp::iwp::NewServer( @@ -430,3 +411,4 @@ TEST_F(LinkLayerTest, TestIWPAliceConnectToBob) ASSERT_TRUE(Bob.gotLIM); ASSERT_TRUE(success); }; +*/ \ No newline at end of file From 46b80e465dc79cc55d43fb143233958e98a7f30a Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Wed, 27 Feb 2019 10:19:55 -0500 Subject: [PATCH 16/16] * fix jsonrpc endpoint path for caller * make epoll not crash and leak * correct key name in config --- libabyss/src/client.cpp | 2 +- llarp/ev/ev.hpp | 23 +++++++++++++++++------ llarp/ev/ev_epoll.cpp | 25 +++++++++++++++++-------- llarp/router/router.cpp | 2 +- 4 files changed, 36 insertions(+), 16 deletions(-) diff --git a/libabyss/src/client.cpp b/libabyss/src/client.cpp index d2b31e680..9797924c1 100644 --- a/libabyss/src/client.cpp +++ b/libabyss/src/client.cpp @@ -226,7 +226,7 @@ namespace abyss m_SendHeaders.emplace("Content-Length", std::to_string(body.size())); m_SendHeaders.emplace("Accept", "application/json"); std::stringstream request; - request << "POST /rpc HTTP/1.0\r\n"; + request << "POST /json_rpc HTTP/1.0\r\n"; for(const auto& item : m_SendHeaders) request << item.first << ": " << item.second << "\r\n"; request << "\r\n" << body; diff --git a/llarp/ev/ev.hpp b/llarp/ev/ev.hpp index cdbeaf66d..20c97f651 100644 --- a/llarp/ev/ev.hpp +++ b/llarp/ev/ev.hpp @@ -555,13 +555,22 @@ namespace llarp void connected() { - // we are connected yeh boi - if(_conn) + sockaddr_storage st; + socklen_t sl; + if (getpeername(fd, (sockaddr*)&st, &sl) == 0) { - if(_conn->connected && !_calledConnected) - _conn->connected(_conn, &tcp); + // we are connected yeh boi + if(_conn) + { + if(_conn->connected && !_calledConnected) + _conn->connected(_conn, &tcp); + } + _calledConnected = true; + } + else + { + error(); } - _calledConnected = true; } void @@ -575,8 +584,9 @@ namespace llarp } void - error() + error() override { + _shouldClose = true; if(_conn) { #ifndef _WIN32 @@ -591,6 +601,7 @@ namespace llarp if(_conn->error) _conn->error(_conn); } + } virtual ssize_t diff --git a/llarp/ev/ev_epoll.cpp b/llarp/ev/ev_epoll.cpp index 12adf3061..3dbb4bf9d 100644 --- a/llarp/ev/ev_epoll.cpp +++ b/llarp/ev/ev_epoll.cpp @@ -20,7 +20,7 @@ namespace llarp if(tcp.read) tcp.read(&tcp, llarp_buffer_t(buf, amount)); } - else + else if(amount < 0) { // error _shouldClose = true; @@ -123,7 +123,7 @@ namespace llarp return -1; b.sz = ret; udp->recvfrom(udp, addr, ManagedBuffer{b}); - return 0; + return ret; } int @@ -323,6 +323,7 @@ llarp_epoll_loop::tick(int ms) epoll_event events[1024]; int result; result = epoll_wait(epollfd, events, 1024, ms); + bool didIO = false; if(result > 0) { int idx = 0; @@ -331,22 +332,27 @@ llarp_epoll_loop::tick(int ms) llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].data.ptr); if(ev) { - llarp::LogDebug(idx, " of ", result, + llarp::LogDebug(idx, " of ", result, " on ", ev->fd, " events=", std::to_string(events[idx].events)); if(events[idx].events & EPOLLERR) { + llarp::LogDebug("epoll error"); ev->error(); } - else + else { - if(events[idx].events & EPOLLIN) - { - ev->read(readbuf, sizeof(readbuf)); - } + // write THEN READ don't revert me if(events[idx].events & EPOLLOUT) { + llarp::LogDebug("epoll out"); ev->flush_write(); } + if(events[idx].events & EPOLLIN) + { + llarp::LogDebug("epoll in"); + if(ev->read(readbuf, sizeof(readbuf)) > 0) + didIO = true; + } } } ++idx; @@ -354,6 +360,9 @@ llarp_epoll_loop::tick(int ms) } if(result != -1) tick_listeners(); + /// if we didn't get an io events we sleep to avoid 100% cpu use + if(!didIO) + std::this_thread::sleep_for(std::chrono::milliseconds(ms)); return result; } diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 49acee723..a2daf68b9 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -850,7 +850,7 @@ namespace llarp { whitelistRouters = IsTrueValue(val); } - if(StrEq(key, "jsonrpc")) + if(StrEq(key, "jsonrpc") || StrEq(key, "addr")) { lokidRPCAddr = val; }