From f23ed98c33d7205cff017993f27d33583ecd9863 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 17 Jul 2018 16:17:13 +1000 Subject: [PATCH] make it work --- include/llarp/iwp/server.hpp | 5 +++++ include/llarp/iwp/session.hpp | 3 ++- include/llarp/pathset.hpp | 16 +++++++++++++--- include/llarp/routing/message.hpp | 1 + include/llarp/service/IntroSet.hpp | 3 +++ llarp/iwp/session.cpp | 8 +++++++- llarp/pathset.cpp | 27 +++++++++++++++++++++------ llarp/routing/message_parser.cpp | 5 +++-- llarp/service.cpp | 10 ++++++++++ llarp/service/endpoint.cpp | 9 +++++---- 10 files changed, 70 insertions(+), 17 deletions(-) diff --git a/include/llarp/iwp/server.hpp b/include/llarp/iwp/server.hpp index 62d79f914..697b2d312 100644 --- a/include/llarp/iwp/server.hpp +++ b/include/llarp/iwp/server.hpp @@ -46,6 +46,7 @@ struct llarp_link SessionMap_t m_Connected; mtx_t m_Connected_Mutex; + bool pumpingLogic = false; typedef std::unordered_map< llarp::Addr, llarp_link_session *, llarp::addrhash > @@ -234,11 +235,15 @@ struct llarp_link s->TickLogic(); return true; }); + self->pumpingLogic = false; } void PumpLogic() { + if(pumpingLogic) + return; + pumpingLogic = true; llarp_logic_queue_job(logic, {this, &handle_logic_pump}); } diff --git a/include/llarp/iwp/session.hpp b/include/llarp/iwp/session.hpp index 94edb827d..ecc4c7979 100644 --- a/include/llarp/iwp/session.hpp +++ b/include/llarp/iwp/session.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include "codel.hpp" #include "frame_state.hpp" #include "llarp/buffer.h" @@ -114,7 +115,7 @@ struct llarp_link_session llarp_time_t lastKeepalive = 0; uint32_t establish_job_id = 0; uint32_t frames = 0; - bool working = false; + std::atomic< bool > working; llarp::util::CoDelQueue< iwp_async_frame *, FrameGetTime, FramePutTime > outboundFrames; diff --git a/include/llarp/pathset.hpp b/include/llarp/pathset.hpp index b7580c457..40e2b4be5 100644 --- a/include/llarp/pathset.hpp +++ b/include/llarp/pathset.hpp @@ -65,6 +65,7 @@ namespace llarp bool ShouldPublishDescriptors() const; + /// override me in subtype virtual bool HandleGotIntroMessage(const llarp::dht::GotIntroMessage* msg) { @@ -79,8 +80,7 @@ namespace llarp std::list< llarp::service::Introduction >& intros) const; bool - PublishIntroSet(const llarp::service::IntroSet& introset, - llarp_router* r); + PublishIntroSet(llarp_router* r); typedef std::function< void(const llarp::service::IntroSet*) > ServiceLookupHandler; @@ -90,14 +90,24 @@ namespace llarp LookupService(const llarp::service::Address& addr, ServiceLookupHandler handler); + protected: + /// our introset + service::IntroSet m_Introset; + protected: void IssueServiceLookup(const llarp::service::Address& addr); + void + IntroSetPublished(); + + void + IntroSetPublishFail(); + private: typedef std::pair< RouterID, PathID_t > PathInfo_t; typedef std::map< PathInfo_t, Path* > PathMap_t; - + bool m_PublishedIntroSet = false; size_t m_NumPaths; PathMap_t m_Paths; uint64_t m_CurrentPublishTX = 0; diff --git a/include/llarp/routing/message.hpp b/include/llarp/routing/message.hpp index f7b59d701..f3e010bb9 100644 --- a/include/llarp/routing/message.hpp +++ b/include/llarp/routing/message.hpp @@ -32,6 +32,7 @@ namespace llarp static bool OnKey(dict_reader* r, llarp_buffer_t* key); bool firstKey; + char key; dict_reader reader; IMessage* msg; }; diff --git a/include/llarp/service/IntroSet.hpp b/include/llarp/service/IntroSet.hpp index 27032118c..0fa7b91de 100644 --- a/include/llarp/service/IntroSet.hpp +++ b/include/llarp/service/IntroSet.hpp @@ -48,6 +48,9 @@ namespace llarp return out << "] V=" << i.version << " Z=" << i.Z; } + bool + HasExpiredIntros() const; + bool BEncode(llarp_buffer_t* buf) const; diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index a21056eb1..f55da3528 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -45,6 +45,7 @@ llarp_link_session::llarp_link_session(llarp_link *l, const byte_t *seckey, crypto->randbytes(token, 32); llarp::LogInfo("session created"); frame.alive(); + working.store(false); } llarp_link_session::~llarp_link_session() @@ -62,6 +63,9 @@ llarp_link_session::Router() bool llarp_link_session::sendto(llarp_buffer_t msg) { + auto now = llarp_time_now_ms(); + if(timedout(now)) + return false; auto id = ++frame.txids; // llarp::LogDebug("session sending to, number", id); llarp::ShortHash digest; @@ -432,7 +436,8 @@ llarp_link_session::Tick(llarp_time_t now) // we are timed out // when we are done doing stuff with all of our frames from the crypto // workers we are done - llarp::LogWarn("Tick - ", addr, " timed out with ", frames, " frames left"); + llarp::LogWarn("Tick - ", addr, " timed out with ", frames, + " frames left, working=", working); return !working; } if(is_invalidated()) @@ -501,6 +506,7 @@ static void handle_introack_generated(iwp_async_introack *i) { llarp_link_session *link = static_cast< llarp_link_session * >(i->user); + link->working = false; if(i->buf && link->serv->has_intro_from(link->addr)) { // track it with the server here diff --git a/llarp/pathset.cpp b/llarp/pathset.cpp index 56b0fa76a..9e59eac6c 100644 --- a/llarp/pathset.cpp +++ b/llarp/pathset.cpp @@ -45,6 +45,13 @@ namespace llarp } } + void + PathSet::IntroSetPublished() + { + m_CurrentPublishTX = 0; + m_PublishedIntroSet = true; + } + size_t PathSet::NumInStatus(PathStatus st) const { @@ -92,6 +99,7 @@ namespace llarp PathSet::GetCurrentIntroductions( std::list< llarp::service::Introduction >& intros) const { + intros.clear(); size_t count = 0; auto itr = m_Paths.begin(); while(itr != m_Paths.end()) @@ -109,8 +117,16 @@ namespace llarp bool PathSet::ShouldPublishDescriptors() const { - // TODO: implement me - return m_CurrentPublishTX == 0 || true; + if(m_PublishedIntroSet) + return m_Introset.I.size() == 0 + || (m_Introset.HasExpiredIntros() && m_CurrentPublishTX == 0); + return true; + } + + void + PathSet::IntroSetPublishFail() + { + m_CurrentPublishTX = 0; } Path* @@ -134,16 +150,15 @@ namespace llarp } bool - PathSet::PublishIntroSet(const llarp::service::IntroSet& introset, - llarp_router* r) + PathSet::PublishIntroSet(llarp_router* r) { auto path = PickRandomEstablishedPath(); if(path) { m_CurrentPublishTX = rand(); llarp::routing::DHTMessage msg; - msg.M.push_back( - new llarp::dht::PublishIntroMessage(introset, m_CurrentPublishTX)); + msg.M.push_back(new llarp::dht::PublishIntroMessage( + m_Introset, m_CurrentPublishTX)); return path->SendRoutingMessage(&msg, r); } else diff --git a/llarp/routing/message_parser.cpp b/llarp/routing/message_parser.cpp index 898481bf2..5c44425af 100644 --- a/llarp/routing/message_parser.cpp +++ b/llarp/routing/message_parser.cpp @@ -37,7 +37,8 @@ namespace llarp return false; if(strbuf.sz != 1) return false; - switch(*strbuf.cur) + self->key = *strbuf.cur; + switch(self->key) { case 'L': self->msg = new PathLatencyMessage; @@ -76,7 +77,7 @@ namespace llarp msg->from = from; result = msg->HandleMessage(h, r); if(!result) - llarp::LogWarn("Failed to handle inbound routing message"); + llarp::LogWarn("Failed to handle inbound routing message ", key); delete msg; } else diff --git a/llarp/service.cpp b/llarp/service.cpp index 9c8ca1630..f5a4c8be0 100644 --- a/llarp/service.cpp +++ b/llarp/service.cpp @@ -64,6 +64,16 @@ namespace llarp return bencode_end(buf); } + bool + IntroSet::HasExpiredIntros() const + { + auto now = llarp_time_now_ms(); + for(const auto& i : I) + if(now >= i.expiresAt) + return true; + return false; + } + Introduction::~Introduction() { } diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 5bb0be06a..37fc728c4 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -25,19 +25,18 @@ namespace llarp { if(ShouldPublishDescriptors()) { - IntroSet introset; - if(!GetCurrentIntroductions(introset.I)) + if(!GetCurrentIntroductions(m_Introset.I)) { llarp::LogWarn("could not publish descriptors for endpoint ", Name(), " because we couldn't get any introductions"); return; } - if(!m_Identity.SignIntroSet(introset, &m_Router->crypto)) + if(!m_Identity.SignIntroSet(m_Introset, &m_Router->crypto)) { llarp::LogWarn("failed to sign introset for endpoint ", Name()); return; } - if(PublishIntroSet(introset, m_Router)) + if(PublishIntroSet(m_Router)) { llarp::LogInfo("publishing introset for endpoint ", Name()); } @@ -65,6 +64,7 @@ namespace llarp llarp::LogWarn( "invalid signature in got intro message for service endpoint ", Name()); + IntroSetPublishFail(); return false; } if(m_Identity.pub == introset.A) @@ -72,6 +72,7 @@ namespace llarp llarp::LogInfo( "got introset publish confirmation for hidden service endpoint ", Name()); + IntroSetPublished(); } else {