diff --git a/.vscode/settings.json b/.vscode/settings.json index f974c0315..e360b8362 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -57,6 +57,7 @@ "io": "cpp", "strstream": "cpp", "numeric": "cpp", - "valarray": "cpp" + "valarray": "cpp", + "*.ipp": "cpp" } } \ No newline at end of file diff --git a/include/llarp/link.h b/include/llarp/link.h index 59c53a823..1dffe646e 100644 --- a/include/llarp/link.h +++ b/include/llarp/link.h @@ -22,8 +22,8 @@ extern "C" { struct llarp_link; /** - * wire layer transport session for point to point communication between us and - * another + * wire layer transport session for point to point communication between us + * and another */ struct llarp_link_session; @@ -75,8 +75,9 @@ struct llarp_link const char *(*name)(void); void (*get_our_address)(struct llarp_link *, struct llarp_ai *); /* - int (*register_listener)(struct llarp_link *, struct llarp_link_ev_listener); - void (*deregister_listener)(struct llarp_link *, int); + int (*register_listener)(struct llarp_link *, struct + llarp_link_ev_listener); void (*deregister_listener)(struct llarp_link *, + int); */ bool (*configure)(struct llarp_link *, struct llarp_ev_loop *, const char *, int, uint16_t); @@ -101,7 +102,8 @@ llarp_link_initialized(struct llarp_link *link); struct llarp_link_session { void *impl; - /** send an entire message, splits up into smaller pieces and does encryption + /** send an entire message, splits up into smaller pieces and does + * encryption */ bool (*sendto)(struct llarp_link_session *, llarp_buffer_t); /** return true if this session is timed out */ diff --git a/include/llarp/nodedb.h b/include/llarp/nodedb.h index b04e36fa6..5c798fc10 100644 --- a/include/llarp/nodedb.h +++ b/include/llarp/nodedb.h @@ -94,7 +94,7 @@ struct llarp_async_verify_rc struct llarp_threadpool *cryptoworker; struct llarp_threadpool *diskworker; - /// router contact (should this be a pointer?) + /// router contact struct llarp_rc rc; /// result bool valid; diff --git a/include/llarp/router_contact.h b/include/llarp/router_contact.h index cd8c262bd..f9b3e12ca 100644 --- a/include/llarp/router_contact.h +++ b/include/llarp/router_contact.h @@ -53,6 +53,9 @@ llarp_rc_verify_sig(struct llarp_crypto *crypto, struct llarp_rc *rc); void llarp_rc_copy(struct llarp_rc *dst, const struct llarp_rc *src); +bool +llarp_rc_is_public_router(const struct llarp_rc *const rc); + void llarp_rc_set_addrs(struct llarp_rc *rc, struct llarp_alloc *mem, struct llarp_ai_list *addr); diff --git a/include/llarp/version.h b/include/llarp/version.h index 7eacf3a42..e7e0484db 100644 --- a/include/llarp/version.h +++ b/include/llarp/version.h @@ -10,7 +10,7 @@ #endif #ifndef LLARP_VERSION_PATCH -#define LLARP_VERSION_PATCH "0" +#define LLARP_VERSION_PATCH "1" #endif #ifndef LLARP_VERSION_NUM diff --git a/llarp/address_info.cpp b/llarp/address_info.cpp index 47d56ff8a..10b678f88 100644 --- a/llarp/address_info.cpp +++ b/llarp/address_info.cpp @@ -218,6 +218,16 @@ llarp_ai_list_pushback(struct llarp_ai_list *l, struct llarp_ai *ai) l->list.push_back(*ai); } +size_t +llarp_ai_list_size(struct llarp_ai_list *l) +{ + if(l) + { + return l->list.size(); + } + return 0; +} + void llarp_ai_list_iterate(struct llarp_ai_list *l, struct llarp_ai_list_iter *itr) { diff --git a/llarp/codel.hpp b/llarp/codel.hpp index 98e65453d..0fbf22b29 100644 --- a/llarp/codel.hpp +++ b/llarp/codel.hpp @@ -15,12 +15,10 @@ namespace llarp { }; - template < typename Mutex_t > struct DummyLock { - DummyLock(const Mutex_t& mtx){ + DummyLock(const DummyMutex& mtx){}; - }; ~DummyLock() { } @@ -64,8 +62,9 @@ namespace llarp firstPut = GetTime()(i); } + template < typename Queue_t > void - Process(std::queue< T >& result) + Process(Queue_t& result) { llarp_time_t lowest = 0xFFFFFFFFFFFFFFFFUL; // auto start = llarp_time_now_ms(); diff --git a/llarp/iwp_link.cpp b/llarp/iwp_link.cpp index 9b4ea0c5e..e43e06281 100644 --- a/llarp/iwp_link.cpp +++ b/llarp/iwp_link.cpp @@ -443,9 +443,10 @@ namespace iwp } bool - operator<(const InboundMessage &other) const + operator>(const InboundMessage &other) const { - return msgid < other.msgid; + // order in ascending order for codel queue + return msgid > other.msgid; } llarp_buffer_t @@ -463,6 +464,15 @@ namespace iwp } }; + struct OrderCompare + { + bool + operator()(const InboundMessage *left, const InboundMessage *right) + { + return left->msgid < right->msgid; + } + }; + struct PutTime { void @@ -486,8 +496,7 @@ namespace iwp typedef std::queue< sendbuf_t * > sendqueue_t; typedef llarp::util::CoDelQueue< InboundMessage *, InboundMessage::GetTime, InboundMessage::PutTime, - llarp::util::DummyMutex, - llarp::util::DummyLock< llarp::util::DummyMutex > > + llarp::util::DummyMutex, llarp::util::DummyLock > recvqueue_t; llarp_router *router = nullptr; @@ -511,17 +520,27 @@ namespace iwp bool process_inbound_queue() { - std::queue< InboundMessage * > q; + std::priority_queue< InboundMessage *, std::vector< InboundMessage * >, + InboundMessage::OrderCompare > + q; recvqueue.Process(q); + bool increment = false; while(q.size()) { // TODO: is this right? - nextMsgID = std::max(nextMsgID, q.front()->msgid); - if(!router->HandleRecvLinkMessage(parent, q.front()->Buffer())) - llarp::Warn("failed to process inbound message ", q.front()->msgid); - delete q.front(); + auto &front = q.top(); + // the items are already sorted anyways so this doesn't really do much + nextMsgID = std::max(nextMsgID, front->msgid); + if(!router->HandleRecvLinkMessage(parent, front->Buffer())) + { + llarp::Warn("failed to process inbound message ", front->msgid); + } + delete front; q.pop(); + increment = true; } + if(increment) + ++nextMsgID; // TODO: this isn't right return true; } @@ -1557,11 +1576,11 @@ namespace iwp { server *serv = static_cast< server * >(l->impl); { - lock_t lock(serv->m_Connected_Mutex); + // lock_t lock(serv->m_Connected_Mutex); auto itr = serv->m_Connected.find(pubkey); if(itr != serv->m_Connected.end()) { - lock_t innerlock(serv->m_sessions_Mutex); + // lock_t innerlock(serv->m_sessions_Mutex); auto inner_itr = serv->m_sessions.find(itr->second); if(inner_itr != serv->m_sessions.end()) { @@ -1774,9 +1793,10 @@ namespace iwp if(id == nextMsgID) { session *impl = static_cast< session * >(parent->impl); - success = router->HandleRecvLinkMessage(parent, buf); + if(id == 0) { + success = router->HandleRecvLinkMessage(parent, buf); if(impl->CheckRCValid()) { if(!impl->IsEstablished()) @@ -1784,6 +1804,7 @@ namespace iwp impl->send_LIM(); impl->session_established(); } + ++nextMsgID; } else { @@ -1792,15 +1813,17 @@ namespace iwp impl->parent->close(impl->parent); success = false; } - ++nextMsgID; } - else if(recvqueue.Size() > 2) + else { - return process_inbound_queue(); + recvqueue.Put(new InboundMessage(id, msg)); + success = process_inbound_queue(); } } else { + llarp::Warn("out of order message expected ", nextMsgID, " but got ", + id); recvqueue.Put(new InboundMessage(id, msg)); success = true; } diff --git a/llarp/link_intro.cpp b/llarp/link_intro.cpp index 43ed32b43..38d394d1a 100644 --- a/llarp/link_intro.cpp +++ b/llarp/link_intro.cpp @@ -2,6 +2,7 @@ #include #include #include "logger.hpp" +#include "router.hpp" namespace llarp { @@ -71,7 +72,7 @@ namespace llarp bool LinkIntroMessage::HandleMessage(llarp_router* router) const { - llarp::Info("got LIM from ", remote); + router->async_verify_RC(RC, !llarp_rc_is_public_router(RC)); return true; } -} +} // namespace llarp diff --git a/llarp/path.cpp b/llarp/path.cpp index 041f5ba9a..a430fd8e8 100644 --- a/llarp/path.cpp +++ b/llarp/path.cpp @@ -309,7 +309,7 @@ namespace llarp Path::Tick(llarp_time_t now, llarp_router* r) { auto dlt = now - m_LastLatencyTestTime; - if(dlt > 5000) + if(dlt > 5000 && m_LastLatencyTestID == 0) { llarp::routing::PathLatencyMessage latency; latency.T = rand(); @@ -432,6 +432,7 @@ namespace llarp Latency = llarp_time_now_ms() - m_LastLatencyTestTime; llarp::Info("path latency is ", Latency, " ms for tx=", TXID(), " rx=", RXID()); + m_LastLatencyTestID = 0; return true; } return false; diff --git a/llarp/router.cpp b/llarp/router.cpp index 7a7f052f4..e0bd3d539 100644 --- a/llarp/router.cpp +++ b/llarp/router.cpp @@ -348,13 +348,15 @@ llarp_router::on_verify_server_rc(llarp_async_verify_rc *job) llarp_dht_put_peer(router->dht, &router->validRouters[pk]); // this was an outbound establish job - if(ctx->establish_job->session) + if(ctx->establish_job) { auto session = ctx->establish_job->session; router->FlushOutboundFor(pk, session->get_parent(session)); // this frees the job router->pendingEstablishJobs.erase(pk); } + else // this was an inbound session + router->FlushOutboundFor(pk, router->GetLinkWithSessionByPubkey(pk)); } void @@ -482,6 +484,19 @@ llarp_router::SessionClosed(const llarp::RouterID &remote) validRouters.erase(itr); } +llarp_link * +llarp_router::GetLinkWithSessionByPubkey(const llarp::RouterID &pubkey) +{ + for(auto &link : inboundLinks) + { + if(link->has_session_to(link, pubkey)) + return link; + } + if(outboundLink->has_session_to(outboundLink, pubkey)) + return outboundLink; + return nullptr; +} + void llarp_router::FlushOutboundFor(const llarp::RouterID &remote, llarp_link *chosen) @@ -492,6 +507,11 @@ llarp_router::FlushOutboundFor(const llarp::RouterID &remote, { return; } + if(!chosen) + { + DiscardOutboundFor(remote); + return; + } while(itr->second.size()) { auto buf = llarp::StackBuffer< decltype(linkmsg_buffer) >(linkmsg_buffer); @@ -526,7 +546,7 @@ llarp_router::on_try_connect_result(llarp_link_establish_job *job) { // llarp::Debug("try_connect got session"); auto session = job->session; - router->async_verify_RC(session, false, job); + router->async_verify_RC(session->get_remote_router(session), false, job); return; } // llarp::Debug("try_connect no session"); @@ -563,8 +583,7 @@ llarp_router::DiscardOutboundFor(const llarp::RouterID &remote) } void -llarp_router::async_verify_RC(llarp_link_session *session, - bool isExpectingClient, +llarp_router::async_verify_RC(llarp_rc *rc, bool isExpectingClient, llarp_link_establish_job *establish_job) { llarp_async_verify_rc *job = new llarp_async_verify_rc; @@ -579,7 +598,7 @@ llarp_router::async_verify_RC(llarp_link_session *session, job->cryptoworker = tp; job->diskworker = disk; - llarp_rc_copy(&job->rc, session->get_remote_router(session)); + llarp_rc_copy(&job->rc, rc); if(isExpectingClient) job->hook = &llarp_router::on_verify_client_rc; else diff --git a/llarp/router.hpp b/llarp/router.hpp index e7047ddfa..2b59a7ab7 100644 --- a/llarp/router.hpp +++ b/llarp/router.hpp @@ -190,8 +190,11 @@ struct llarp_router void ScheduleTicker(uint64_t i = 1000); + llarp_link * + GetLinkWithSessionByPubkey(const llarp::RouterID &remote); + void - async_verify_RC(llarp_link_session *session, bool isExpectingClient, + async_verify_RC(llarp_rc *rc, bool isExpectingClient, llarp_link_establish_job *job = nullptr); static bool diff --git a/llarp/router_contact.cpp b/llarp/router_contact.cpp index cf83b3bfb..d144164e0 100644 --- a/llarp/router_contact.cpp +++ b/llarp/router_contact.cpp @@ -101,6 +101,12 @@ llarp_rc_decode_dict(struct dict_reader *r, llarp_buffer_t *key) return false; } +bool +llarp_rc_is_public_router(const struct llarp_rc *const rc) +{ + return rc->addrs && llarp_ai_list_size(rc->addrs) > 0; +} + void llarp_rc_copy(struct llarp_rc *dst, const struct llarp_rc *src) {