make it work

pull/6/head^2
Jeff Becker 6 years ago
parent bb8532a281
commit f23ed98c33

@ -46,6 +46,7 @@ struct llarp_link
SessionMap_t m_Connected; SessionMap_t m_Connected;
mtx_t m_Connected_Mutex; mtx_t m_Connected_Mutex;
bool pumpingLogic = false;
typedef std::unordered_map< llarp::Addr, llarp_link_session *, typedef std::unordered_map< llarp::Addr, llarp_link_session *,
llarp::addrhash > llarp::addrhash >
@ -234,11 +235,15 @@ struct llarp_link
s->TickLogic(); s->TickLogic();
return true; return true;
}); });
self->pumpingLogic = false;
} }
void void
PumpLogic() PumpLogic()
{ {
if(pumpingLogic)
return;
pumpingLogic = true;
llarp_logic_queue_job(logic, {this, &handle_logic_pump}); llarp_logic_queue_job(logic, {this, &handle_logic_pump});
} }

@ -1,5 +1,6 @@
#pragma once #pragma once
#include <atomic>
#include "codel.hpp" #include "codel.hpp"
#include "frame_state.hpp" #include "frame_state.hpp"
#include "llarp/buffer.h" #include "llarp/buffer.h"
@ -114,7 +115,7 @@ struct llarp_link_session
llarp_time_t lastKeepalive = 0; llarp_time_t lastKeepalive = 0;
uint32_t establish_job_id = 0; uint32_t establish_job_id = 0;
uint32_t frames = 0; uint32_t frames = 0;
bool working = false; std::atomic< bool > working;
llarp::util::CoDelQueue< iwp_async_frame *, FrameGetTime, FramePutTime > llarp::util::CoDelQueue< iwp_async_frame *, FrameGetTime, FramePutTime >
outboundFrames; outboundFrames;

@ -65,6 +65,7 @@ namespace llarp
bool bool
ShouldPublishDescriptors() const; ShouldPublishDescriptors() const;
/// override me in subtype
virtual bool virtual bool
HandleGotIntroMessage(const llarp::dht::GotIntroMessage* msg) HandleGotIntroMessage(const llarp::dht::GotIntroMessage* msg)
{ {
@ -79,8 +80,7 @@ namespace llarp
std::list< llarp::service::Introduction >& intros) const; std::list< llarp::service::Introduction >& intros) const;
bool bool
PublishIntroSet(const llarp::service::IntroSet& introset, PublishIntroSet(llarp_router* r);
llarp_router* r);
typedef std::function< void(const llarp::service::IntroSet*) > typedef std::function< void(const llarp::service::IntroSet*) >
ServiceLookupHandler; ServiceLookupHandler;
@ -90,14 +90,24 @@ namespace llarp
LookupService(const llarp::service::Address& addr, LookupService(const llarp::service::Address& addr,
ServiceLookupHandler handler); ServiceLookupHandler handler);
protected:
/// our introset
service::IntroSet m_Introset;
protected: protected:
void void
IssueServiceLookup(const llarp::service::Address& addr); IssueServiceLookup(const llarp::service::Address& addr);
void
IntroSetPublished();
void
IntroSetPublishFail();
private: private:
typedef std::pair< RouterID, PathID_t > PathInfo_t; typedef std::pair< RouterID, PathID_t > PathInfo_t;
typedef std::map< PathInfo_t, Path* > PathMap_t; typedef std::map< PathInfo_t, Path* > PathMap_t;
bool m_PublishedIntroSet = false;
size_t m_NumPaths; size_t m_NumPaths;
PathMap_t m_Paths; PathMap_t m_Paths;
uint64_t m_CurrentPublishTX = 0; uint64_t m_CurrentPublishTX = 0;

@ -32,6 +32,7 @@ namespace llarp
static bool static bool
OnKey(dict_reader* r, llarp_buffer_t* key); OnKey(dict_reader* r, llarp_buffer_t* key);
bool firstKey; bool firstKey;
char key;
dict_reader reader; dict_reader reader;
IMessage* msg; IMessage* msg;
}; };

@ -48,6 +48,9 @@ namespace llarp
return out << "] V=" << i.version << " Z=" << i.Z; return out << "] V=" << i.version << " Z=" << i.Z;
} }
bool
HasExpiredIntros() const;
bool bool
BEncode(llarp_buffer_t* buf) const; BEncode(llarp_buffer_t* buf) const;

@ -45,6 +45,7 @@ llarp_link_session::llarp_link_session(llarp_link *l, const byte_t *seckey,
crypto->randbytes(token, 32); crypto->randbytes(token, 32);
llarp::LogInfo("session created"); llarp::LogInfo("session created");
frame.alive(); frame.alive();
working.store(false);
} }
llarp_link_session::~llarp_link_session() llarp_link_session::~llarp_link_session()
@ -62,6 +63,9 @@ llarp_link_session::Router()
bool bool
llarp_link_session::sendto(llarp_buffer_t msg) llarp_link_session::sendto(llarp_buffer_t msg)
{ {
auto now = llarp_time_now_ms();
if(timedout(now))
return false;
auto id = ++frame.txids; auto id = ++frame.txids;
// llarp::LogDebug("session sending to, number", id); // llarp::LogDebug("session sending to, number", id);
llarp::ShortHash digest; llarp::ShortHash digest;
@ -432,7 +436,8 @@ llarp_link_session::Tick(llarp_time_t now)
// we are timed out // we are timed out
// when we are done doing stuff with all of our frames from the crypto // when we are done doing stuff with all of our frames from the crypto
// workers we are done // workers we are done
llarp::LogWarn("Tick - ", addr, " timed out with ", frames, " frames left"); llarp::LogWarn("Tick - ", addr, " timed out with ", frames,
" frames left, working=", working);
return !working; return !working;
} }
if(is_invalidated()) if(is_invalidated())
@ -501,6 +506,7 @@ static void
handle_introack_generated(iwp_async_introack *i) handle_introack_generated(iwp_async_introack *i)
{ {
llarp_link_session *link = static_cast< llarp_link_session * >(i->user); llarp_link_session *link = static_cast< llarp_link_session * >(i->user);
link->working = false;
if(i->buf && link->serv->has_intro_from(link->addr)) if(i->buf && link->serv->has_intro_from(link->addr))
{ {
// track it with the server here // track it with the server here

@ -45,6 +45,13 @@ namespace llarp
} }
} }
void
PathSet::IntroSetPublished()
{
m_CurrentPublishTX = 0;
m_PublishedIntroSet = true;
}
size_t size_t
PathSet::NumInStatus(PathStatus st) const PathSet::NumInStatus(PathStatus st) const
{ {
@ -92,6 +99,7 @@ namespace llarp
PathSet::GetCurrentIntroductions( PathSet::GetCurrentIntroductions(
std::list< llarp::service::Introduction >& intros) const std::list< llarp::service::Introduction >& intros) const
{ {
intros.clear();
size_t count = 0; size_t count = 0;
auto itr = m_Paths.begin(); auto itr = m_Paths.begin();
while(itr != m_Paths.end()) while(itr != m_Paths.end())
@ -109,8 +117,16 @@ namespace llarp
bool bool
PathSet::ShouldPublishDescriptors() const PathSet::ShouldPublishDescriptors() const
{ {
// TODO: implement me if(m_PublishedIntroSet)
return m_CurrentPublishTX == 0 || true; return m_Introset.I.size() == 0
|| (m_Introset.HasExpiredIntros() && m_CurrentPublishTX == 0);
return true;
}
void
PathSet::IntroSetPublishFail()
{
m_CurrentPublishTX = 0;
} }
Path* Path*
@ -134,16 +150,15 @@ namespace llarp
} }
bool bool
PathSet::PublishIntroSet(const llarp::service::IntroSet& introset, PathSet::PublishIntroSet(llarp_router* r)
llarp_router* r)
{ {
auto path = PickRandomEstablishedPath(); auto path = PickRandomEstablishedPath();
if(path) if(path)
{ {
m_CurrentPublishTX = rand(); m_CurrentPublishTX = rand();
llarp::routing::DHTMessage msg; llarp::routing::DHTMessage msg;
msg.M.push_back( msg.M.push_back(new llarp::dht::PublishIntroMessage(
new llarp::dht::PublishIntroMessage(introset, m_CurrentPublishTX)); m_Introset, m_CurrentPublishTX));
return path->SendRoutingMessage(&msg, r); return path->SendRoutingMessage(&msg, r);
} }
else else

@ -37,7 +37,8 @@ namespace llarp
return false; return false;
if(strbuf.sz != 1) if(strbuf.sz != 1)
return false; return false;
switch(*strbuf.cur) self->key = *strbuf.cur;
switch(self->key)
{ {
case 'L': case 'L':
self->msg = new PathLatencyMessage; self->msg = new PathLatencyMessage;
@ -76,7 +77,7 @@ namespace llarp
msg->from = from; msg->from = from;
result = msg->HandleMessage(h, r); result = msg->HandleMessage(h, r);
if(!result) if(!result)
llarp::LogWarn("Failed to handle inbound routing message"); llarp::LogWarn("Failed to handle inbound routing message ", key);
delete msg; delete msg;
} }
else else

@ -64,6 +64,16 @@ namespace llarp
return bencode_end(buf); return bencode_end(buf);
} }
bool
IntroSet::HasExpiredIntros() const
{
auto now = llarp_time_now_ms();
for(const auto& i : I)
if(now >= i.expiresAt)
return true;
return false;
}
Introduction::~Introduction() Introduction::~Introduction()
{ {
} }

@ -25,19 +25,18 @@ namespace llarp
{ {
if(ShouldPublishDescriptors()) if(ShouldPublishDescriptors())
{ {
IntroSet introset; if(!GetCurrentIntroductions(m_Introset.I))
if(!GetCurrentIntroductions(introset.I))
{ {
llarp::LogWarn("could not publish descriptors for endpoint ", Name(), llarp::LogWarn("could not publish descriptors for endpoint ", Name(),
" because we couldn't get any introductions"); " because we couldn't get any introductions");
return; return;
} }
if(!m_Identity.SignIntroSet(introset, &m_Router->crypto)) if(!m_Identity.SignIntroSet(m_Introset, &m_Router->crypto))
{ {
llarp::LogWarn("failed to sign introset for endpoint ", Name()); llarp::LogWarn("failed to sign introset for endpoint ", Name());
return; return;
} }
if(PublishIntroSet(introset, m_Router)) if(PublishIntroSet(m_Router))
{ {
llarp::LogInfo("publishing introset for endpoint ", Name()); llarp::LogInfo("publishing introset for endpoint ", Name());
} }
@ -65,6 +64,7 @@ namespace llarp
llarp::LogWarn( llarp::LogWarn(
"invalid signature in got intro message for service endpoint ", "invalid signature in got intro message for service endpoint ",
Name()); Name());
IntroSetPublishFail();
return false; return false;
} }
if(m_Identity.pub == introset.A) if(m_Identity.pub == introset.A)
@ -72,6 +72,7 @@ namespace llarp
llarp::LogInfo( llarp::LogInfo(
"got introset publish confirmation for hidden service endpoint ", "got introset publish confirmation for hidden service endpoint ",
Name()); Name());
IntroSetPublished();
} }
else else
{ {

Loading…
Cancel
Save