make testnet no longer crash

pull/13/head
Jeff Becker 6 years ago
parent 0d0a3357f7
commit 1d3e9f6adc
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -119,20 +119,13 @@ namespace llarp
bool
IsZero() const
{
size_t idx = sz / 8;
while(idx)
{
if(l[--idx])
return false;
}
return true;
return sodium_is_zero(b, sz) != 0;
}
void
Zero()
{
for(size_t idx = 0; idx * 8 < sz; ++idx)
l[idx] = 0;
sodium_memzero(l, sz);
}
void

@ -26,7 +26,7 @@ namespace llarp
FindPendingTX(const Key_t& owner, uint64_t txid);
void
RemovePendingLookup(const Key_t& owner, uint64_t txid);
RemovePendingTX(const Key_t& owner, uint64_t txid);
void
LookupServiceDirect(const Key_t& target, const Key_t& whoasked,
@ -64,7 +64,7 @@ namespace llarp
const llarp::PathID_t& path, Key_t askpeer);
template < typename Job, typename Result >
void
bool
TryLookupAgain(Job* j, Result r, uint64_t R)
{
const Key_t targetKey = j->target.ToKey();
@ -73,14 +73,12 @@ namespace llarp
if(!nodes->FindCloseExcluding(targetKey, askpeer, exclude))
{
j->Exausted();
delete j;
return;
return true;
}
if((OurKey() ^ targetKey) < (askpeer ^ targetKey))
{
j->Exausted();
delete j;
return;
return true;
}
auto id = ++ids;
TXOwner ownerKey;
@ -94,6 +92,7 @@ namespace llarp
" with txid=", id);
DHTSendTo(askpeer, msg);
j->asked.insert(std::move(askpeer));
return false;
}
void

@ -24,14 +24,24 @@ namespace llarp
return bencode_write_bytestring(buf, _data, _sz);
}
Encrypted&
operator=(const Encrypted& other)
{
return (*this) = other.Buffer();
}
Encrypted&
operator=(llarp_buffer_t buf)
{
if(_data)
delete[] _data;
_data = nullptr;
_sz = buf.sz;
_data = new byte_t[_sz];
memcpy(_data, buf.base, _sz);
if(_sz)
{
_data = new byte_t[_sz];
memcpy(_data, buf.base, _sz);
}
UpdateBuffer();
return *this;
}

@ -72,6 +72,14 @@ struct llarp_link
void
MapAddr(const llarp::Addr &src, const llarp::PubKey &identity);
/// does nothing if we have no session already established
void
KeepAliveSessionTo(const byte_t *pubkey);
/// does nothing if we have no session already established
void
CloseSessionTo(const byte_t *pubkey);
bool
has_session_to(const byte_t *pubkey);

@ -87,6 +87,9 @@ struct llarp_link_session
bool
Tick(llarp_time_t now);
void
keepalive();
void
PumpCryptoOutbound();

@ -5,7 +5,7 @@
#define MAXHOPS (8)
#define DEFAULT_PATH_LIFETIME (10 * 60 * 1000)
#define PATH_BUILD_TIMEOUT (30 * 1000)
#define PATH_BUILD_TIMEOUT (10 * 1000)
#define MESSAGE_PAD_SIZE (1024)
struct llarp_path_hop

@ -235,6 +235,12 @@ namespace llarp
m_DataHandler = func;
}
llarp_time_t
ExpireTime() const
{
return buildStarted + hops[0].lifetime;
}
bool
Expired(llarp_time_t now) const;

@ -146,9 +146,6 @@ namespace llarp
bool
HandleHiddenServiceFrame(const ProtocolFrame* frame);
void
PutLookup(IServiceLookup* lookup, uint64_t txid);
std::string
Name() const;
@ -157,10 +154,10 @@ namespace llarp
OnIntroSetUpdate(const IntroSet* i);
void
EncryptAndSendTo(llarp_buffer_t payload);
EncryptAndSendTo(path::Path* p, llarp_buffer_t payload, ProtocolType t);
void
AsyncGenIntro(llarp_buffer_t payload);
AsyncGenIntro(path::Path* p, llarp_buffer_t payload, ProtocolType t);
/// send a fully encrypted hidden service frame
void
@ -263,7 +260,28 @@ namespace llarp
std::unordered_map< Address, PathEnsureHook, Address::Hash >
m_PendingServiceLookups;
std::unordered_map< RouterID, uint64_t, RouterID::Hash > m_PendingRouters;
struct RouterLookupJob
{
RouterLookupJob(Endpoint* p)
{
started = llarp_time_now_ms();
txid = p->GenTXID();
}
uint64_t txid;
llarp_time_t started;
bool
IsExpired(llarp_time_t now) const
{
if(now < started)
return false;
return now - started > 5000;
}
};
std::unordered_map< RouterID, RouterLookupJob, RouterID::Hash >
m_PendingRouters;
uint64_t m_CurrentPublishTX = 0;
llarp_time_t m_LastPublish = 0;
@ -271,7 +289,8 @@ namespace llarp
/// our introset
service::IntroSet m_IntroSet;
/// pending remote service lookups by id
std::unordered_map< uint64_t, service::IServiceLookup* > m_PendingLookups;
std::unordered_map< uint64_t, std::unique_ptr< service::IServiceLookup > >
m_PendingLookups;
/// prefetch remote address list
std::set< Address > m_PrefetchAddrs;
/// hidden service tag
@ -293,7 +312,7 @@ namespace llarp
/// sessions
std::unordered_map< ConvoTag, Session, ConvoTag::Hash > m_Sessions;
struct CachedTagResult : public IServiceLookup
struct CachedTagResult
{
const static llarp_time_t TTL = 10000;
llarp_time_t lastRequest = 0;
@ -301,12 +320,13 @@ namespace llarp
std::set< IntroSet > result;
Tag tag;
CachedTagResult(Endpoint* p, const Tag& t, uint64_t tx)
: IServiceLookup(p, tx), tag(t)
CachedTagResult(const Tag& t) : tag(t)
{
}
~CachedTagResult();
~CachedTagResult()
{
}
void
Expire(llarp_time_t now);
@ -320,12 +340,39 @@ namespace llarp
}
llarp::routing::IMessage*
BuildRequestMessage();
BuildRequestMessage(uint64_t txid);
bool
HandleResponse(const std::set< IntroSet >& results);
};
struct TagLookupJob : public IServiceLookup
{
TagLookupJob(Endpoint* parent, CachedTagResult* result)
: IServiceLookup(parent, parent->GenTXID(), "taglookup")
, m_result(result)
{
}
~TagLookupJob()
{
}
llarp::routing::IMessage*
BuildRequestMessage()
{
return m_result->BuildRequestMessage(txid);
}
bool
HandleResponse(const std::set< IntroSet >& results)
{
return m_result->HandleResponse(results);
}
CachedTagResult* m_result;
};
std::unordered_map< Tag, CachedTagResult, Tag::Hash > m_PrefetchedTags;
};
} // namespace service

@ -20,13 +20,15 @@ namespace llarp
struct IServiceLookup
{
IServiceLookup(ILookupHolder* parent, uint64_t tx);
IServiceLookup() = delete;
virtual ~IServiceLookup(){};
/// handle lookup result
virtual bool
HandleResponse(const std::set< IntroSet >& results) = 0;
HandleResponse(const std::set< IntroSet >& results)
{
return false;
}
/// determine if this request has timed out
bool
@ -47,8 +49,12 @@ namespace llarp
ILookupHolder* parent;
uint64_t txid;
const std::string name;
protected:
IServiceLookup(ILookupHolder* parent, uint64_t tx,
const std::string& name);
llarp_time_t m_created;
};

@ -59,10 +59,12 @@ namespace llarp
llarp::service::ConvoTag T;
ProtocolFrame();
ProtocolFrame(const ProtocolFrame& other);
~ProtocolFrame();
ProtocolFrame&
operator=(const ProtocolFrame& other);
bool
EncryptAndSign(llarp_crypto* c, const ProtocolMessage& msg,
const byte_t* sharedkey, const Identity& localIdent);

@ -67,13 +67,13 @@ namespace llarp
whoasked = r->dht->impl.OurKey();
}
void
bool
TryAgain()
{
--m_TriesLeft;
auto &dht = m_router->dht->impl;
llarp::LogInfo("try lookup again");
dht.TryLookupAgain(
return dht.TryLookupAgain(
this,
std::bind(&PathLookupJob::OnResult, this, std::placeholders::_1),
R);
@ -94,6 +94,7 @@ namespace llarp
}
else
llarp::LogError("no path for lookup pathid=", pathID);
m_router->dht->impl.RemovePendingTX(whoasked, txid);
}
bool
@ -120,14 +121,11 @@ namespace llarp
llarp::LogInfo("found ", sz, " introsets for txid=", txid);
msg.M.push_back(new llarp::dht::GotIntroMessage(intros, txid));
path->SendRoutingMessage(&msg, m_router);
m_router->dht->impl.RemovePendingTX(whoasked, txid);
}
else if(!target.IsZero())
{
if(m_TriesLeft)
{
TryAgain();
return false;
}
return m_TriesLeft && TryAgain();
}
}
else
@ -333,7 +331,7 @@ namespace llarp
}
void
Context::RemovePendingLookup(const Key_t &owner, uint64_t id)
Context::RemovePendingTX(const Key_t &owner, uint64_t id)
{
TXOwner search;
search.node = owner;
@ -437,18 +435,19 @@ namespace llarp
auto msg = new llarp::DHTImmeidateMessage(whoasked);
msg->msgs.push_back(new GotIntroMessage({}, txid));
m_Router->SendToOrQueue(whoasked, msg);
m_Router->dht->impl.RemovePendingTX(whoasked, txid);
}
void
bool
TryAgain()
{
--m_TriesLeft;
llarp::LogInfo("try lookup again");
auto &dht = m_Router->dht->impl;
dht.TryLookupAgain(this,
std::bind(&IntroSetInformJob::OnResult, this,
std::placeholders::_1),
R);
return dht.TryLookupAgain(this,
std::bind(&IntroSetInformJob::OnResult, this,
std::placeholders::_1),
R);
}
bool
@ -475,11 +474,7 @@ namespace llarp
}
else if(!target.IsZero())
{
if(m_TriesLeft)
{
TryAgain();
return false;
}
return m_TriesLeft && TryAgain();
}
}
else

@ -111,6 +111,7 @@ namespace llarp
llarp::LogInfo("introset found locally");
service::IntroSet i = *introset;
replies.push_back(new GotIntroMessage({i}, T));
return true;
}
else
{
@ -119,6 +120,7 @@ namespace llarp
// we don't have it, reply with a direct reply
llarp::LogInfo("dont have intro set and no recursion");
replies.push_back(new GotIntroMessage({}, T));
return true;
}
else
{
@ -136,6 +138,7 @@ namespace llarp
// we are not closer than our peer to the target so don't
// revurse
replies.push_back(new GotIntroMessage({}, T));
return true;
}
else if(R >= 1)
dht.LookupIntroSet(S, From, T, peer, R - 1, exclude);
@ -177,6 +180,7 @@ namespace llarp
}
// we are iterative and don't have it, reply with a direct reply
replies.push_back(new GotIntroMessage(reply, T));
return true;
}
else
{

@ -41,7 +41,7 @@ namespace llarp
{
if(pending->FoundIntros(I))
{
dht.RemovePendingLookup(From, T);
dht.RemovePendingTX(From, T);
llarp::LogInfo("removed pending tx from ", From, " txid=", T);
}
return true;

@ -113,7 +113,7 @@ namespace llarp
}
}
}
dht.RemovePendingLookup(From, txid);
dht.RemovePendingTX(From, txid);
return true;
}
llarp::LogWarn(

@ -39,6 +39,46 @@ llarp_link::remove_intro_from(const llarp::Addr& from)
m_PendingSessions.erase(from);
}
void
llarp_link::CloseSessionTo(const byte_t* pubkey)
{
llarp::Addr addr;
llarp::PubKey pk(pubkey);
{
lock_t lock(m_Connected_Mutex);
auto itr = m_Connected.find(pk);
if(itr == m_Connected.end())
return;
addr = itr->second;
}
{
lock_t lock(m_sessions_Mutex);
auto itr = m_sessions.find(addr);
if(itr != m_sessions.end())
itr->second->close();
}
}
void
llarp_link::KeepAliveSessionTo(const byte_t* pubkey)
{
llarp::Addr addr;
llarp::PubKey pk(pubkey);
{
lock_t lock(m_Connected_Mutex);
auto itr = m_Connected.find(pk);
if(itr == m_Connected.end())
return;
addr = itr->second;
}
{
lock_t lock(m_sessions_Mutex);
auto itr = m_sessions.find(addr);
if(itr != m_sessions.end())
itr->second->keepalive();
}
}
void
llarp_link::MapAddr(const llarp::Addr& src, const llarp::PubKey& identity)
{

@ -126,8 +126,7 @@ llarp_link_session::close()
// set our side invalidated and close async when the other side also marks
// as session invalidated
frame.txflags |= eSessionInvalidated;
// TODO: add timer for session invalidation
llarp_logic_queue_job(serv->logic, {this, &send_keepalive});
keepalive();
}
void
@ -445,6 +444,12 @@ llarp_link_session::Tick(llarp_time_t now)
return false;
}
void
llarp_link_session::keepalive()
{
llarp_logic_queue_job(serv->logic, {this, &send_keepalive});
}
void
llarp_link_session::EncryptOutboundFrames()
{

@ -141,8 +141,12 @@ namespace llarp
llarp::LogError("failed to send LRCM");
return;
}
ctx->path->status = llarp::path::ePathBuilding;
ctx->path->buildStarted = llarp_time_now_ms();
// persist session with router until this path is done
router->PersistSessionUntil(remote, ctx->path->ExpireTime());
// add own path
router->paths.AddOwnPath(ctx->pathset, ctx->path);
ctx->user->pathBuildStarted(ctx->user);
}

@ -55,6 +55,13 @@ llarp_router::HandleRecvLinkMessage(llarp_link_session *session,
return inbound_link_msg_parser.ProcessFrom(session, buf);
}
void
llarp_router::PersistSessionUntil(const llarp::RouterID &remote,
llarp_time_t until)
{
m_PersistingSessions[remote] = until;
}
bool
llarp_router::SendToOrQueue(const llarp::RouterID &remote,
const llarp::ILinkMessage *msg)
@ -158,6 +165,7 @@ llarp_router::HandleDHTLookupForSendTo(llarp_router_lookup_job *job)
{
self->DiscardOutboundFor(job->target);
}
llarp_rc_free(&job->result);
delete job;
}
@ -355,6 +363,39 @@ llarp_router::handle_router_ticker(void *user, uint64_t orig, uint64_t left)
self->ScheduleTicker(orig);
}
void
llarp_router::TryEstablishTo(const llarp::RouterID &remote)
{
auto rc = llarp_nodedb_get_rc(nodedb, remote);
if(rc)
{
// try connecting
llarp_router_try_connect(this, rc, 5);
}
else
{
// dht lookup as we don't know it
llarp_router_lookup_job *lookup = new llarp_router_lookup_job();
lookup->user = this;
llarp_rc_clear(&lookup->result);
memcpy(lookup->target, remote, PUBKEYSIZE);
lookup->hook = &HandleDHTLookupForTryEstablishTo;
llarp_dht_lookup_router(this->dht, lookup);
}
}
void
llarp_router::HandleDHTLookupForTryEstablishTo(llarp_router_lookup_job *job)
{
if(job->found)
{
llarp_router_try_connect(static_cast< llarp_router * >(job->user),
&job->result, 5);
}
llarp_rc_free(&job->result);
delete job;
}
void
llarp_router::HandleExploritoryPathBuildStarted(llarp_pathbuild_job *job)
{
@ -365,12 +406,38 @@ void
llarp_router::Tick()
{
// llarp::LogDebug("tick router");
auto now = llarp_time_now_ms();
paths.ExpirePaths();
// TODO: don't do this if we have enough paths already
// FIXME: build paths even if we have inbound links
if(inboundLinks.size() == 0)
{
{
auto itr = m_PersistingSessions.begin();
while(itr != m_PersistingSessions.end())
{
auto link = GetLinkWithSessionByPubkey(itr->first);
if(now <= itr->second)
{
// persisting ended
if(link)
link->CloseSessionTo(itr->first);
itr = m_PersistingSessions.erase(itr);
}
else
{
if(link)
{
link->KeepAliveSessionTo(itr->first);
}
else
{
TryEstablishTo(itr->first);
}
++itr;
}
}
}
auto N = llarp_nodedb_num_loaded(nodedb);
if(N > 3)
{

@ -110,8 +110,12 @@ struct llarp_router
/// loki verified routers
std::map< llarp::RouterID, llarp_rc > validRouters;
// pending establishing session with routers
std::map< llarp::PubKey, llarp_link_establish_job > pendingEstablishJobs;
// sessions to persist -> timestamp to end persist at
std::map< llarp::RouterID, llarp_time_t > m_PersistingSessions;
llarp_router();
virtual ~llarp_router();
@ -143,6 +147,9 @@ struct llarp_router
void
Run();
void
PersistSessionUntil(const llarp::RouterID &remote, llarp_time_t until);
static void
ConnectAll(void *user, uint64_t orig, uint64_t left);
@ -188,6 +195,10 @@ struct llarp_router
void
DiscardOutboundFor(const llarp::RouterID &remote);
/// try establishing a session to a remote router
void
TryEstablishTo(const llarp::RouterID &remote);
/// flush outbound message queue
void
FlushOutbound();
@ -245,6 +256,9 @@ struct llarp_router
static void
HandleExploritoryPathBuildStarted(llarp_pathbuild_job *job);
static void
HandleDHTLookupForTryEstablishTo(llarp_router_lookup_job *job);
};
#endif

@ -57,12 +57,6 @@ namespace llarp
struct PathAlignJob
{
Address remote;
PathAlignJob(const Address& addr) : remote(addr)
{
}
void
HandleResult(Endpoint::OutboundContext* context)
{
@ -71,6 +65,7 @@ namespace llarp
byte_t tmp[128] = {0};
memcpy(tmp, "BEEP", 4);
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
buf.sz = 4;
context->AsyncEncryptAndSendTo(buf, eProtocolText);
}
else
@ -108,6 +103,8 @@ namespace llarp
{
llarp::LogWarn("could not publish descriptors for endpoint ", Name(),
" because we couldn't get any introductions");
if(ShouldBuildMore())
ManualRebuild(1);
return;
}
m_IntroSet.I.clear();
@ -130,12 +127,16 @@ namespace llarp
}
// expire pending tx
{
std::set< service::IntroSet > empty;
auto itr = m_PendingLookups.begin();
while(itr != m_PendingLookups.end())
{
if(itr->second->IsTimedOut(now))
{
itr->second->HandleResponse({});
std::unique_ptr< IServiceLookup > lookup = std::move(itr->second);
llarp::LogInfo(lookup->name, " timed out txid=", lookup->txid);
lookup->HandleResponse(empty);
itr = m_PendingLookups.erase(itr);
}
else
@ -143,12 +144,25 @@ namespace llarp
}
}
// expire pending router lookups
{
auto itr = m_PendingRouters.begin();
while(itr != m_PendingRouters.end())
{
if(itr->second.IsExpired(now))
itr = m_PendingRouters.erase(itr);
else
++itr;
}
}
// prefetch addrs
for(const auto& addr : m_PrefetchAddrs)
{
if(!HasPathToService(addr))
{
PathAlignJob* j = new PathAlignJob(addr);
if(!EnsurePathToService(j->remote,
PathAlignJob* j = new PathAlignJob();
if(!EnsurePathToService(addr,
std::bind(&PathAlignJob::HandleResult, j,
std::placeholders::_1),
10000))
@ -165,17 +179,16 @@ namespace llarp
auto itr = m_PrefetchedTags.find(tag);
if(itr == m_PrefetchedTags.end())
{
itr = m_PrefetchedTags
.insert(std::make_pair(
tag, CachedTagResult(this, tag, GenTXID())))
.first;
itr =
m_PrefetchedTags.insert(std::make_pair(tag, CachedTagResult(tag)))
.first;
}
for(const auto& introset : itr->second.result)
{
if(HasPendingPathToService(introset.A.Addr()))
continue;
PathAlignJob* j = new PathAlignJob(introset.A.Addr());
if(!EnsurePathToService(j->remote,
PathAlignJob* j = new PathAlignJob();
if(!EnsurePathToService(introset.A.Addr(),
std::bind(&PathAlignJob::HandleResult, j,
std::placeholders::_1),
10000))
@ -191,8 +204,8 @@ namespace llarp
auto path = PickRandomEstablishedPath();
if(path)
{
itr->second.txid = GenTXID();
itr->second.SendRequestViaPath(path, m_Router);
auto job = new TagLookupJob(this, &itr->second);
job->SendRequestViaPath(path, Router());
}
}
}
@ -277,9 +290,10 @@ namespace llarp
Name(), " txid=", msg->T);
return true;
}
bool result = itr->second->HandleResponse(remote);
std::unique_ptr< IServiceLookup > lookup = std::move(itr->second);
m_PendingLookups.erase(itr);
return result;
lookup->HandleResponse(remote);
return true;
}
void
@ -395,18 +409,12 @@ namespace llarp
Endpoint::~Endpoint()
{
}
Endpoint::CachedTagResult::~CachedTagResult()
{
}
bool
Endpoint::CachedTagResult::HandleResponse(
const std::set< IntroSet >& introsets)
{
auto now = llarp_time_now_ms();
txid = 0;
for(const auto& introset : introsets)
if(result.insert(introset).second)
lastModified = now;
@ -437,12 +445,11 @@ namespace llarp
}
llarp::routing::IMessage*
Endpoint::CachedTagResult::BuildRequestMessage()
Endpoint::CachedTagResult::BuildRequestMessage(uint64_t txid)
{
llarp::routing::DHTMessage* msg = new llarp::routing::DHTMessage();
msg->M.push_back(new llarp::dht::FindIntroMessage(tag, txid));
lastRequest = llarp_time_now_ms();
parent->PutLookup(this, txid);
return msg;
}
@ -494,13 +501,17 @@ namespace llarp
struct HiddenServiceAddressLookup : public IServiceLookup
{
~HiddenServiceAddressLookup()
{
}
Address remote;
typedef std::function< bool(const IntroSet*) > HandlerFunc;
HandlerFunc handle;
HiddenServiceAddressLookup(Endpoint* p, HandlerFunc h,
const Address& addr, uint64_t tx)
: IServiceLookup(p, tx), remote(addr), handle(h)
: IServiceLookup(p, tx, "HSLookup"), remote(addr), handle(h)
{
}
@ -520,8 +531,7 @@ namespace llarp
remote.ToString());
handle(nullptr);
}
delete this;
return true;
return false;
}
llarp::routing::IMessage*
@ -561,8 +571,9 @@ namespace llarp
auto itr = m_PendingServiceLookups.find(addr);
if(itr != m_PendingServiceLookups.end())
{
itr->second(m_RemoteSessions.at(addr));
auto f = itr->second;
m_PendingServiceLookups.erase(itr);
f(m_RemoteSessions.at(addr));
}
}
@ -602,10 +613,11 @@ namespace llarp
msg.M.push_back(
new dht::FindRouterMessage({}, dht::Key_t(router), txid));
if(path->SendRoutingMessage(&msg, m_Router))
if(path && path->SendRoutingMessage(&msg, m_Router))
{
llarp::LogInfo(Name(), " looking up ", router);
m_PendingRouters.insert(std::make_pair(router, txid));
m_PendingRouters.insert(
std::make_pair(router, RouterLookupJob(this)));
}
else
{
@ -637,12 +649,6 @@ namespace llarp
std::placeholders::_1));
}
void
Endpoint::OutboundContext::PutLookup(IServiceLookup* lookup, uint64_t txid)
{
m_Parent->PutLookup(lookup, txid);
}
bool
Endpoint::OutboundContext::HandleHiddenServiceFrame(
const ProtocolFrame* frame)
@ -737,20 +743,26 @@ namespace llarp
selectedIntro = intro;
}
}
ManualRebuild(numHops);
ManualRebuild(2);
}
void
Endpoint::OutboundContext::AsyncEncryptAndSendTo(llarp_buffer_t data,
ProtocolType protocol)
{
auto path = GetPathByRouter(selectedIntro.router);
if(!path)
{
llarp::LogError("No Path to ", selectedIntro.router, " yet");
return;
}
if(sequenceNo)
{
EncryptAndSendTo(data);
EncryptAndSendTo(path, data, protocol);
}
else
{
AsyncGenIntro(data);
AsyncGenIntro(path, data, protocol);
}
}
@ -764,19 +776,21 @@ namespace llarp
ProtocolMessage msg;
ProtocolFrame frame;
Introduction intro;
PQPubKey introPubKey;
const PQPubKey introPubKey;
std::function< void(ProtocolFrame&) > hook;
IDataHandler* handler;
AsyncIntroGen(llarp_logic* l, llarp_crypto* c, byte_t* key,
const ServiceInfo& r, const Identity& localident,
const Introduction& us, IDataHandler* h)
const PQPubKey& introsetPubKey, const Introduction& us,
IDataHandler* h)
: logic(l)
, crypto(c)
, sharedKey(key)
, remote(r)
, m_LocalIdentity(localident)
, intro(us)
, introPubKey(introsetPubKey)
, handler(h)
{
}
@ -829,16 +843,18 @@ namespace llarp
};
void
Endpoint::OutboundContext::AsyncGenIntro(llarp_buffer_t payload)
Endpoint::OutboundContext::AsyncGenIntro(path::Path* p,
llarp_buffer_t payload,
ProtocolType t)
{
AsyncIntroGen* ex = new AsyncIntroGen(
m_Parent->RouterLogic(), m_Parent->Crypto(), sharedKey,
currentIntroSet.A, m_Parent->GetIdentity(), selectedIntro,
m_Parent->m_DataHandler);
currentIntroSet.A, m_Parent->GetIdentity(), currentIntroSet.K,
selectedIntro, m_Parent->m_DataHandler);
ex->hook = std::bind(&Endpoint::OutboundContext::Send, this,
std::placeholders::_1);
ex->msg.PutBuffer(payload);
ex->msg.introReply = p->intro;
llarp_threadpool_queue_job(m_Parent->Worker(),
{ex, &AsyncIntroGen::Work});
}
@ -856,13 +872,16 @@ namespace llarp
{
ShiftIntroduction();
}
// XXX: this may be a different path that that was put into the protocol
// message inside the protocol frame
auto path = GetPathByRouter(selectedIntro.router);
if(path)
{
routing::PathTransferMessage transfer(msg, selectedIntro.pathID);
llarp::LogInfo("sending frame via ", path->Upstream(), " to ",
path->Endpoint(), " for ", Name());
path->SendRoutingMessage(&transfer, m_Parent->Router());
llarp::LogDebug("sending frame via ", path->Upstream(), " to ",
path->Endpoint(), " for ", Name());
if(!path->SendRoutingMessage(&transfer, m_Parent->Router()))
llarp::LogError("Failed to send frame on path");
}
else
{
@ -909,7 +928,8 @@ namespace llarp
{
UpdateIntroSet();
}
// TODO: check for expiration
m_Parent->EnsureRouterIsKnown(selectedIntro.router);
// TODO: check for expiration of outbound context
return false;
}
@ -950,7 +970,9 @@ namespace llarp
}
void
Endpoint::OutboundContext::EncryptAndSendTo(llarp_buffer_t payload)
Endpoint::OutboundContext::EncryptAndSendTo(path::Path* p,
llarp_buffer_t payload,
ProtocolType t)
{
auto path = GetPathByRouter(selectedIntro.router);
if(path)
@ -973,7 +995,8 @@ namespace llarp
if(m_Parent->m_DataHandler->GetCachedSessionKeyFor(f.T, shared))
{
ProtocolMessage m;
m.introReply = selectedIntro;
m.proto = t;
m.introReply = path->intro;
m.sender = m_Parent->m_Identity.pub;
m.PutBuffer(payload);

@ -6,9 +6,11 @@ namespace llarp
{
namespace service
{
IServiceLookup::IServiceLookup(ILookupHolder *p, uint64_t tx)
: parent(p), txid(tx)
IServiceLookup::IServiceLookup(ILookupHolder *p, uint64_t tx,
const std::string &n)
: parent(p), txid(tx), name(n)
{
m_created = llarp_time_now_ms();
p->PutLookup(this, tx);
}

@ -156,10 +156,9 @@ namespace llarp
const byte_t* sharedkey,
ProtocolMessage& msg) const
{
auto buf = D.Buffer();
crypto->xchacha20(buf, sharedkey, N);
msg.PutBuffer(buf);
return true;
msg.PutBuffer(D.Buffer());
auto buf = llarp::Buffer(msg.payload);
return crypto->xchacha20(buf, sharedkey, N);
}
bool
@ -168,16 +167,23 @@ namespace llarp
const byte_t* sessionKey,
const Identity& localIdent)
{
// put payload and encrypt
D = llarp::ConstBuffer(msg.payload);
memcpy(D.data(), msg.payload.data(), D.size());
auto dbuf = D.Buffer();
crypto->xchacha20(*dbuf, sessionKey, N);
// zero out signature
Z.Zero();
// encode
byte_t tmp[MAX_PROTOCOL_MESSAGE_SIZE];
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
// encode message
if(!msg.BEncode(&buf))
return false;
// rewind
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
// encrypt
crypto->xchacha20(buf, sessionKey, N);
// put encrypted buffer
D = buf;
// zero out signature
Z.Zero();
// reset size
buf.sz = sizeof(tmp);
// encode frame
if(!BEncode(&buf))
return false;
// rewind
@ -187,26 +193,23 @@ namespace llarp
return localIdent.Sign(crypto, Z, buf);
}
struct AsyncFrameDH
struct AsyncFrameDecrypt
{
llarp_crypto* crypto;
llarp_logic* logic;
ProtocolMessage* msg;
const Identity& m_LocalIdentity;
PQPubKey introPubKey;
IDataHandler* handler;
Address remote;
Encrypted D;
const ProtocolFrame* frame;
AsyncFrameDH(llarp_logic* l, llarp_crypto* c, const Identity& localIdent,
IDataHandler* h, ProtocolMessage* m, const ProtocolFrame* f)
AsyncFrameDecrypt(llarp_logic* l, llarp_crypto* c,
const Identity& localIdent, IDataHandler* h,
ProtocolMessage* m, const ProtocolFrame* f)
: crypto(c)
, logic(l)
, msg(m)
, m_LocalIdentity(localIdent)
, handler(h)
, D(f->D)
, frame(f)
{
}
@ -214,21 +217,24 @@ namespace llarp
static void
Work(void* user)
{
AsyncFrameDH* self = static_cast< AsyncFrameDH* >(user);
auto crypto = self->crypto;
AsyncFrameDecrypt* self = static_cast< AsyncFrameDecrypt* >(user);
auto crypto = self->crypto;
SharedSecret K;
SharedSecret sharedKey;
// copy
ProtocolFrame frame = *self->frame;
ProtocolFrame frame;
frame = *self->frame;
if(!crypto->pqe_decrypt(self->frame->C, K,
pq_keypair_to_secret(self->m_LocalIdentity.pq)))
{
llarp::LogError("pqke failed");
llarp::LogError("pqke failed C=", self->frame->C);
frame.Dump< MAX_PROTOCOL_MESSAGE_SIZE >();
delete self->msg;
delete self;
return;
}
auto buf = self->D.Buffer();
// decrypt
auto buf = frame.D.Buffer();
crypto->xchacha20(*buf, K, self->frame->N);
if(!self->msg->BDecode(buf))
{
@ -239,10 +245,11 @@ namespace llarp
return;
}
// verify signature of outer message after we parsed the inner message
if(!frame.Verify(crypto, self->msg->sender))
if(!self->frame->Verify(crypto, self->msg->sender))
{
llarp::LogError("intro frame has invalid signature");
frame.Dump< MAX_PROTOCOL_MESSAGE_SIZE >();
llarp::LogError("intro frame has invalid signature Z=",
self->frame->Z, " from ", self->msg->sender);
self->frame->Dump< MAX_PROTOCOL_MESSAGE_SIZE >();
delete self->msg;
delete self;
return;
@ -255,7 +262,7 @@ namespace llarp
crypto->dh_server, tmp + 32, self->msg->sender, self->frame->N))
{
llarp::LogError("x25519 key exchange failed");
frame.Dump< MAX_PROTOCOL_MESSAGE_SIZE >();
self->frame->Dump< MAX_PROTOCOL_MESSAGE_SIZE >();
delete self->msg;
delete self;
return;
@ -274,18 +281,30 @@ namespace llarp
}
};
ProtocolFrame&
ProtocolFrame::operator=(const ProtocolFrame& other)
{
C = other.C;
D = other.D;
N = other.N;
Z = other.Z;
T = other.T;
return *this;
}
bool
ProtocolFrame::AsyncDecryptAndVerify(llarp_logic* logic, llarp_crypto* c,
llarp_threadpool* worker,
const Identity& localIdent,
IDataHandler* handler) const
{
if(S == 0)
if(T.IsZero())
{
ProtocolMessage* msg = new ProtocolMessage();
// we need to dh
auto dh = new AsyncFrameDH(logic, c, localIdent, handler, msg, this);
llarp_threadpool_queue_job(worker, {dh, &AsyncFrameDH::Work});
auto dh =
new AsyncFrameDecrypt(logic, c, localIdent, handler, msg, this);
llarp_threadpool_queue_job(worker, {dh, &AsyncFrameDecrypt::Work});
return true;
}
const byte_t* shared = nullptr;
@ -322,15 +341,11 @@ namespace llarp
T.Zero();
}
ProtocolFrame::ProtocolFrame(const ProtocolFrame& other)
: C(other.C), D(other.D), N(other.N), Z(other.Z), T(other.T)
{
}
bool
ProtocolFrame::Verify(llarp_crypto* crypto, const ServiceInfo& from) const
{
ProtocolFrame copy(*this);
ProtocolFrame copy;
copy = *this;
// save signature
// zero out signature for verify
copy.Z.Zero();

@ -48,7 +48,7 @@ namespace llarp
llarp_router* r)
{
msg->S = m_SequenceNum++;
byte_t tmp[MAX_LINK_MSG_SIZE / 2];
byte_t tmp[MAX_LINK_MSG_SIZE - 1024];
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
if(!msg->BEncode(&buf))
{

Loading…
Cancel
Save