refactor (not done)

pull/15/head
Jeff Becker 6 years ago
parent 199dad09dd
commit b122fc59f8
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -346,7 +346,6 @@ set(LIB_SRC
llarp/dht/find_router.cpp
llarp/dht/got_intro.cpp
llarp/dht/got_router.cpp
llarp/dht/search_job.cpp
llarp/dht/publish_intro.cpp
llarp/handlers/tun.cpp
llarp/iwp/frame_header.cpp

@ -54,6 +54,17 @@ namespace llarp
}
};
template < typename Compare, typename T >
struct CoDelCompare
{
bool
operator()(const std::unique_ptr< T >& left,
const std::unique_ptr< T >& right) const
{
return Compare()(left.get(), right.get());
}
};
template < typename T, typename GetTime, typename PutTime, typename Compare,
typename Mutex_t = util::Mutex, typename Lock_t = util::Lock,
llarp_time_t dropMs = 5, llarp_time_t initialIntervalMs = 100 >
@ -85,7 +96,7 @@ namespace llarp
Lock_t lock(m_QueueMutex);
if(firstPut == 0)
firstPut = GetTime()(ptr);
m_Queue.push(ptr);
m_Queue.push(std::unique_ptr< T >(ptr));
}
return true;
}
@ -100,7 +111,7 @@ namespace llarp
Lock_t lock(m_QueueMutex);
if(firstPut == 0)
firstPut = GetTime()(ptr);
m_Queue.push(ptr);
m_Queue.push(std::unique_ptr< T >(ptr));
}
}
@ -112,27 +123,23 @@ namespace llarp
Lock_t lock(m_QueueMutex);
if(firstPut == 0)
firstPut = GetTime()(ptr);
m_Queue.push(ptr);
m_Queue.push(std::unique_ptr< T >(ptr));
}
}
/// visit returns true to discard entry otherwise the entry is
/// re quened
template < typename Visit >
void
ProcessIf(Visit visitor)
Process(Visit visitor)
{
llarp_time_t lowest = 0xFFFFFFFFFFFFFFFFUL;
// auto start = llarp_time_now_ms();
// llarp::LogInfo("CoDelQueue::Process - start at ", start);
Lock_t lock(m_QueueMutex);
auto start = firstPut;
Queue_t requeue;
while(m_Queue.size())
{
llarp::LogDebug("CoDelQueue::Process - queue has ", m_Queue.size());
T* item = m_Queue.top();
auto dlt = start - GetTime()(item);
const auto& item = m_Queue.top();
auto dlt = start - GetTime()(item.get());
// llarp::LogInfo("CoDelQueue::Process - dlt ", dlt);
lowest = std::min(dlt, lowest);
if(m_Queue.size() == 1)
@ -143,8 +150,8 @@ namespace llarp
{
nextTickInterval += initialIntervalMs / std::sqrt(++dropNum);
m_Queue.pop();
delete item;
break;
firstPut = 0;
return;
}
else
{
@ -153,35 +160,20 @@ namespace llarp
}
}
// llarp::LogInfo("CoDelQueue::Process - passing");
if(visitor(item))
{
delete item;
}
else
{
requeue.push(item);
}
visitor(item.get());
m_Queue.pop();
}
m_Queue = std::move(requeue);
firstPut = 0;
}
template < typename Func >
void
Process(Func visitor)
{
ProcessIf([visitor](T* t) -> bool {
visitor(t);
return true;
});
}
llarp_time_t firstPut = 0;
size_t dropNum = 0;
llarp_time_t nextTickInterval = initialIntervalMs;
Mutex_t m_QueueMutex;
typedef std::priority_queue< T*, std::vector< T* >, Compare > Queue_t;
typedef std::priority_queue< std::unique_ptr< T >,
std::vector< std::unique_ptr< T > >,
CoDelCompare< Compare, T > >
Queue_t;
Queue_t m_Queue;
std::string m_name;
};

@ -65,11 +65,11 @@ struct iwp_async_intro
uint8_t *buf;
size_t sz;
/// nonce paramter
uint8_t *nonce;
uint8_t nonce[32];
/// remote public key
uint8_t *remote_pubkey;
uint8_t remote_pubkey[32];
/// local private key
uint8_t *secretkey;
uint8_t secretkey[64];
/// callback
iwp_intro_hook hook;
};

@ -19,7 +19,8 @@ namespace llarp
Bucket(const Key_t& us) : nodes(XorMetric(us)){};
bool
GetRandomNodeExcluding(Key_t& result, std::set< Key_t > exclude) const
GetRandomNodeExcluding(Key_t& result,
const std::set< Key_t >& exclude) const
{
std::vector< Key_t > candidates;
for(const auto& item : nodes)
@ -50,6 +51,31 @@ namespace llarp
return nodes.size() > 0;
}
bool
GetManyRandom(std::set< Key_t >& result, size_t N) const
{
if(nodes.size() < N)
return false;
if(nodes.size() == N)
{
for(const auto& node : nodes)
{
result.insert(node.first);
}
return true;
}
size_t expecting = N;
size_t sz = nodes.size();
while(N)
{
auto itr = nodes.begin();
std::advance(itr, llarp_randint() % sz);
if(result.insert(itr->first).second)
--N;
}
return result.size() == expecting;
}
bool
GetManyNearExcluding(const Key_t& target, std::set< Key_t >& result,
size_t N, const std::set< Key_t >& exclude) const
@ -70,7 +96,7 @@ namespace llarp
bool
FindCloseExcluding(const Key_t& target, Key_t& result,
std::set< Key_t > exclude) const
const std::set< Key_t >& exclude) const
{
Key_t maxdist;
maxdist.Fill(0xff);

@ -8,7 +8,6 @@
#include <llarp/dht/message.hpp>
#include <llarp/dht/messages/findintro.hpp>
#include <llarp/dht/node.hpp>
#include <llarp/dht/search_job.hpp>
#include <llarp/service/IntroSet.hpp>
#include <set>
@ -17,119 +16,184 @@ namespace llarp
{
namespace dht
{
struct TXOwner
{
Key_t node;
uint64_t txid = 0;
TXOwner() = default;
TXOwner(const Key_t& k, uint64_t id) : node(k), txid(id)
{
}
bool
operator==(const TXOwner& other) const
{
return txid == other.txid && node == other.node;
}
bool
operator<(const TXOwner& other) const
{
return txid < other.txid || node < other.node;
}
struct Hash
{
std::size_t
operator()(TXOwner const& o) const noexcept
{
std::size_t sz2;
memcpy(&sz2, &o.node[0], sizeof(std::size_t));
return o.txid ^ (sz2 << 1);
}
};
};
struct Context;
template < typename K, typename V >
struct TX
{
TX(const TXOwner& asker, const K& k, Context* p)
: target(k), whoasked(asker)
{
parent = p;
}
K target;
Context* parent;
std::set< Key_t > peersAsked;
std::vector< V > valuesFound;
TXOwner whoasked;
void
OnFound(const Key_t& askedPeer, const V& value)
{
peersAsked.insert(askedPeer);
valuesFound.push_back(value);
}
virtual void
Start(const TXOwner& peer) = 0;
virtual bool
GetNextPeer(Key_t& next, const std::set< Key_t >& exclude) = 0;
virtual void
DoNextRequest(const Key_t& peer) = 0;
/// return true if we want to persist this tx
bool
AskNextPeer(const Key_t& prevPeer)
{
peersAsked.insert(prevPeer);
Key_t peer;
if(!GetNextPeer(peer, peersAsked))
{
// no more peers
SendReply();
return false;
}
DoNextRequest(peer);
return true;
}
virtual void
SendReply() = 0;
};
typedef std::function< void(const std::vector< service::IntroSet >&) >
IntroSetLookupHandler;
struct Context
{
Context();
~Context();
SearchJob*
FindPendingTX(const Key_t& owner, uint64_t txid);
/// on behalf of whoasked request introset for target from dht router with
/// key askpeer
void
RemovePendingTX(const Key_t& owner, uint64_t txid);
LookupIntroSetRecursive(const service::Address& target,
const Key_t& whoasked, uint64_t whoaskedTX,
const Key_t& askpeer, uint64_t R,
IntroSetLookupHandler result = nullptr);
void
LookupServiceDirect(const Key_t& target, const Key_t& whoasked,
uint64_t whoaskedTX, const Key_t& askpeer,
SearchJob::IntroSetHookFunc handler,
bool iterateive = false,
std::set< Key_t > excludes = {});
LookupIntroSetIterative(const service::Address& target,
const Key_t& whoasked, uint64_t whoaskedTX,
const Key_t& askpeer,
IntroSetLookupHandler result = nullptr);
/// on behalf of whoasked request router with public key target from dht
/// router with key askpeer
void
LookupRouter(const Key_t& target, const Key_t& whoasked,
uint64_t whoaskedTX, const Key_t& askpeer,
llarp_router_lookup_job* job = nullptr,
bool iterative = false, std::set< Key_t > excludes = {});
LookupRouterRecursive(const RouterID& target, const Key_t& whoasked,
uint64_t whoaskedTX, const Key_t& askpeer,
llarp_router_lookup_job* job = nullptr);
/// on behalf of whoasked request introsets with tag from dht router with
/// key askpeer with Recursion depth R
void
LookupIntroSet(const service::Address& addr, const Key_t& whoasked,
uint64_t whoaskedTX, const Key_t& askpeer, uint64_t R,
std::set< Key_t > excludes = {});
LookupTagRecursive(const service::Tag& tag, const Key_t& whoasked,
uint64_t whoaskedTX, const Key_t& askpeer, uint64_t R);
void
LookupTag(const service::Tag& tag, const Key_t& whoasked,
uint64_t whoaskedTX, const Key_t& askpeer,
const std::set< service::IntroSet >& include = {},
uint64_t R = 0);
void
LookupRouterViaJob(llarp_router_lookup_job* job);
/// issue dht lookup for tag via askpeer and send reply to local path
void
LookupTagForPath(const service::Tag& tag, uint64_t txid,
const llarp::PathID_t& path, const Key_t& askpeer);
/// issue dht lookup for introset for addr via askpeer and send reply to
/// local path
void
LookupIntroSetForPath(const service::Address& addr, uint64_t txid,
const llarp::PathID_t& path, Key_t askpeer);
template < typename Job, typename Result >
bool
TryLookupAgain(Job* j, Result r, uint64_t R)
{
const Key_t targetKey = j->target.ToKey();
Key_t askpeer;
std::set< Key_t > exclude = j->asked;
if(!nodes->FindCloseExcluding(targetKey, askpeer, exclude))
{
j->Exausted();
return true;
}
if((OurKey() ^ targetKey) < (askpeer ^ targetKey))
{
j->Exausted();
return true;
}
auto id = ++ids;
TXOwner ownerKey;
ownerKey.node = askpeer;
ownerKey.txid = id;
SearchJob job(j->whoasked, j->txid, r, [j]() { delete j; });
pendingTX[ownerKey] = job;
auto msg = new FindIntroMessage(id, j->target);
msg->R = R;
llarp::LogInfo("asking ", askpeer, " for ", j->target.ToString(),
" with txid=", id);
DHTSendTo(askpeer, msg);
j->asked.insert(std::move(askpeer));
return false;
}
const llarp::PathID_t& path, const Key_t& askpeer);
/// send a dht message to peer, if keepalive is true then keep the session
/// with that peer alive for 10 seconds
void
DHTSendTo(const Key_t& peer, IMessage* msg);
DHTSendTo(const Key_t& peer, IMessage* msg, bool keepalive = true);
/// get routers closest to target excluding requester
bool
LookupRouterExploritory(const Key_t& requester, uint64_t txid,
const RouterID& target,
std::vector< IMessage* >& reply);
void
LookupIntroSetRelayed(const Key_t& requester, uint64_t txid,
const service::Address& addr, bool recursive,
std::vector< IMessage* >& reply);
HandleExploritoryRouterLookup(const Key_t& requester, uint64_t txid,
const RouterID& target,
std::vector< IMessage* >& reply);
std::set< service::IntroSet >
FindRandomIntroSetsWithTag(const service::Tag& tag, size_t max = 2);
FindRandomIntroSetsWithTagExcluding(
const service::Tag& tag, size_t max = 2,
const std::set< service::IntroSet >& excludes = {});
/// handle rc lookup from requester for target
void
LookupRouterRelayed(const Key_t& requester, uint64_t txid,
const Key_t& target, bool recursive,
std::vector< IMessage* >& replies);
/// relay a dht messeage from a local path to the main network
bool
RelayRequestForPath(const llarp::PathID_t& localPath,
const IMessage* msg);
/// send introset to peer from source with S counter and excluding peers
void
PropagateIntroSetTo(const Key_t& from, uint64_t fromTX,
PropagateIntroSetTo(const Key_t& source, uint64_t sourceTX,
const service::IntroSet& introset, const Key_t& peer,
uint64_t S, const std::set< Key_t >& exclude);
/// initialize dht context and explore every exploreInterval milliseconds
void
Init(const Key_t& us, llarp_router* router, llarp_time_t exploreInterval);
/// get localally stored introset by service address
const llarp::service::IntroSet*
GetIntroSetByServiceAddress(const llarp::service::Address& addr) const;
/// queue lookup router via job
void
QueueRouterLookup(llarp_router_lookup_job* job);
@ -160,47 +224,144 @@ namespace llarp
return ourKey;
}
private:
void
ScheduleCleanupTimer();
template < typename K, typename V, typename K_Hash,
llarp_time_t requestTimeoutMS = 5000UL >
struct TXHolder
{
// tx who are waiting for a reply for each key
std::unordered_multimap< K, TXOwner, K_Hash > waiting;
// tx timesouts by key
std::unordered_map< K, llarp_time_t, K_Hash > timeouts;
// maps remote peer with tx to handle reply from them
std::unordered_map< TXOwner, std::unique_ptr< TX< K, V > >,
TXOwner::Hash >
tx;
const TX< K, V >*
GetPendingLookupFrom(const TXOwner& owner) const
{
auto itr = tx.find(owner);
if(itr == tx.end())
return nullptr;
else
return itr->second.get();
}
void
HandleExploreResult(const std::vector< RouterID >& result);
bool
HasPendingLookupFrom(const TXOwner& owner) const
{
return GetPendingLookupFrom(owner) != nullptr;
}
void
CleanupTX();
TX< K, V >*
NewTX(const TXOwner& owner, const K& k, TX< K, V >* t)
{
tx.emplace(owner, std::unique_ptr< TX< K, V > >(t));
waiting.insert(std::make_pair(k, owner));
auto itr = timeouts.find(k);
if(itr == timeouts.end())
timeouts.insert(
std::make_pair(k, llarp_time_now_ms() + requestTimeoutMS));
return t;
}
uint64_t ids;
/// mark tx as not fond
void
NotFound(const TXOwner& from)
{
bool sendReply = true;
auto txitr = tx.find(from);
if(txitr == tx.end())
return;
// ask for next peer
if(txitr->second->AskNextPeer(from.node))
sendReply = false;
Inform(from, txitr->second->target, {}, sendReply, sendReply);
}
struct TXOwner
{
Key_t node;
uint64_t txid = 0;
void
Found(const TXOwner& from, const K& k, const std::vector< V >& values)
{
Inform(from, k, values, true);
}
bool
operator==(const TXOwner& other) const
/// inform all watches for key of values found
void
Inform(const TXOwner& from, const K& key,
const std::vector< V >& values, bool sendreply = false,
bool removeTimeouts = true)
{
return txid == other.txid && node == other.node;
auto range = waiting.equal_range(key);
auto itr = range.first;
while(itr != range.second)
{
auto txitr = tx.find(itr->second);
if(txitr != tx.end())
{
for(const auto& value : values)
txitr->second->OnFound(from.node, value);
if(sendreply)
{
txitr->second->SendReply();
tx.erase(txitr);
}
}
++itr;
}
if(sendreply)
waiting.erase(key);
if(removeTimeouts)
timeouts.erase(key);
}
bool
operator<(const TXOwner& other) const
void
Expire(llarp_time_t now)
{
return txid < other.txid || node < other.node;
auto itr = timeouts.begin();
while(itr != timeouts.end())
{
if(now > itr->second && now - itr->second >= requestTimeoutMS)
{
Inform(TXOwner{}, itr->first, {}, true, false);
itr = timeouts.erase(itr);
}
else
++itr;
}
}
};
struct TXOwnerHash
TXHolder< service::Address, service::IntroSet, service::Address::Hash >
pendingIntrosetLookups;
TXHolder< service::Tag, service::IntroSet, service::Tag::Hash >
pendingTagLookups;
TXHolder< RouterID, llarp_rc, RouterID::Hash > pendingRouterLookups;
TXHolder< RouterID, RouterID, RouterID::Hash > pendingExploreLookups;
uint64_t
NextID()
{
std::size_t
operator()(TXOwner const& o) const noexcept
{
std::size_t sz2;
memcpy(&sz2, &o.node[0], sizeof(std::size_t));
return o.txid ^ (sz2 << 1);
}
}; // namespace dht
return ++ids;
}
private:
void
ExploreNetworkVia(const Key_t& peer);
void
ScheduleCleanupTimer();
void
CleanupTX();
uint64_t ids;
std::unordered_map< TXOwner, SearchJob, TXOwnerHash > pendingTX;
Key_t ourKey;
}; // namespace llarp
} // namespace dht

@ -38,4 +38,4 @@ namespace llarp
} // namespace dht
} // namespace llarp
#endif
#endif

@ -27,8 +27,9 @@ namespace llarp
S.Zero();
}
FindIntroMessage(uint64_t txid, const llarp::service::Address& addr)
: IMessage({}), S(addr), T(txid)
FindIntroMessage(uint64_t txid, const llarp::service::Address& addr,
uint64_t r)
: IMessage({}), R(r), S(addr), T(txid)
{
N.Zero();
}
@ -47,4 +48,4 @@ namespace llarp
};
} // namespace dht
} // namespace llarp
#endif
#endif

@ -12,7 +12,7 @@ namespace llarp
{
}
FindRouterMessage(const Key_t& from, const Key_t& target, uint64_t id)
FindRouterMessage(const Key_t& from, const RouterID& target, uint64_t id)
: IMessage(from), K(target), txid(id)
{
}
@ -36,7 +36,7 @@ namespace llarp
HandleMessage(llarp_dht_context* ctx,
std::vector< IMessage* >& replies) const;
Key_t K;
RouterID K;
bool iterative = false;
bool exploritory = false;
uint64_t txid = 0;

@ -46,4 +46,4 @@ namespace llarp
};
} // namespace dht
} // namespace llarp
#endif
#endif

@ -1,74 +0,0 @@
#ifndef LLARP_DHT_SEARCH_JOB_HPP
#define LLARP_DHT_SEARCH_JOB_HPP
#include <llarp/dht.h>
#include <llarp/time.h>
#include <functional>
#include <llarp/dht/key.hpp>
#include <llarp/service/IntroSet.hpp>
#include <set>
#include <vector>
namespace llarp
{
namespace dht
{
/// TODO: this should be made into a templated type
struct SearchJob
{
const static uint64_t JobTimeout = 30000;
typedef std::function< bool(
const std::vector< llarp::service::IntroSet >&) >
IntroSetHookFunc;
typedef std::function< void(const std::vector< RouterID >&) >
FoundNearFunc;
typedef std::function< void(void) > DoneFunc;
SearchJob();
/// for routers
SearchJob(const Key_t& requester, uint64_t requesterTX,
const Key_t& target, const std::set< Key_t >& excludes,
llarp_router_lookup_job* job);
/// for introsets
SearchJob(const Key_t& requester, uint64_t requesterTX,
const Key_t& target, const std::set< Key_t >& excludes,
IntroSetHookFunc found, DoneFunc done);
// for introsets via tag
SearchJob(const Key_t& requester, uint64_t requseterTX,
IntroSetHookFunc found, DoneFunc done);
// for network exploration
SearchJob(FoundNearFunc near, DoneFunc done);
void
FoundRouter(const llarp_rc* router) const;
bool
FoundIntros(
const std::vector< llarp::service::IntroSet >& introset) const;
void
Timeout() const;
bool
IsExpired(llarp_time_t now) const;
// only set if looking up router
llarp_router_lookup_job* job = nullptr;
IntroSetHookFunc foundIntroHook;
// hook for exploritory router lookups
FoundNearFunc foundNear;
DoneFunc onDone;
llarp_time_t started;
Key_t requester;
uint64_t requesterTX;
Key_t target;
std::set< Key_t > exclude;
};
} // namespace dht
} // namespace llarp
#endif

@ -45,12 +45,10 @@ struct frame_state
uint64_t txids = 0;
llarp_time_t lastEvent = 0;
std::unordered_map< uint64_t, llarp::ShortHash > rxIDs;
std::unordered_map< llarp::ShortHash, transit_message *,
std::unordered_map< llarp::ShortHash, std::unique_ptr< transit_message >,
llarp::ShortHash::Hash >
rx;
std::unordered_map< uint64_t, transit_message * > tx;
// typedef std::queue< sendbuf_t * > sendqueue_t;
std::unordered_map< uint64_t, std::unique_ptr< transit_message > > tx;
typedef llarp::util::CoDelQueue<
InboundMessage, InboundMessage::GetTime, InboundMessage::PutTime,

@ -25,9 +25,9 @@ struct InboundMessage
}
llarp_buffer_t
Buffer()
Buffer() const
{
return llarp::Buffer< decltype(msg) >(msg);
return llarp::ConstBuffer< decltype(msg) >(msg);
}
struct GetTime

@ -16,8 +16,8 @@
struct llarp_link
{
typedef llarp::util::NullMutex mtx_t;
typedef llarp::util::NullLock lock_t;
typedef llarp::util::Mutex mtx_t;
typedef llarp::util::Lock lock_t;
llarp_router *router;
llarp_crypto *crypto;
@ -39,8 +39,8 @@ struct llarp_link
const char *m_name;
typedef std::unordered_map< llarp::Addr, llarp_link_session *,
llarp::Addr::Hash >
typedef std::unordered_map<
llarp::Addr, std::unique_ptr< llarp_link_session >, llarp::Addr::Hash >
LinkMap_t;
LinkMap_t m_sessions;
@ -53,8 +53,8 @@ struct llarp_link
mtx_t m_Connected_Mutex;
std::atomic< bool > pumpingLogic;
typedef std::unordered_map< llarp::Addr, llarp_link_session *,
llarp::Addr::Hash >
typedef std::unordered_map<
llarp::Addr, std::unique_ptr< llarp_link_session >, llarp::Addr::Hash >
PendingSessionMap_t;
PendingSessionMap_t m_PendingSessions;
mtx_t m_PendingSessions_Mutex;
@ -68,16 +68,9 @@ struct llarp_link
bool
has_intro_from(const llarp::Addr &from);
void
put_intro_from(llarp_link_session *s);
void
remove_intro_from(const llarp::Addr &from);
// set that src address has identity pubkey
void
MapAddr(const llarp::Addr &src, const llarp::PubKey &identity);
/// does nothing if we have no session already established
void
KeepAliveSessionTo(const byte_t *pubkey);
@ -95,27 +88,26 @@ struct llarp_link
bool
sendto(const byte_t *pubkey, llarp_buffer_t buf);
void
UnmapAddr(const llarp::Addr &src);
llarp_link_session *
create_session(const llarp::Addr &src);
bool
has_session_via(const llarp::Addr &dst);
llarp_link_session *
find_session(const llarp::Addr &addr);
void
MapAddr(const llarp::Addr &addr, const llarp::PubKey &pk);
void
put_session(const llarp::Addr &src, llarp_link_session *impl);
visit_session(
const llarp::Addr &addr,
std::function< void(const std::unique_ptr< llarp_link_session > &) >
visit);
void
clear_sessions();
pending_session_active(const llarp::Addr &addr);
/// safe iterate sessions
void
iterate_sessions(std::function< bool(llarp_link_session *) > visitor);
clear_sessions();
static void
handle_logic_pump(void *user);
@ -123,9 +115,6 @@ struct llarp_link
void
PumpLogic();
void
RemoveSession(llarp_link_session *s);
const uint8_t *
pubkey();

@ -134,14 +134,14 @@ struct llarp_link_session
decryptedFrames;
llarp::Addr addr;
iwp_async_intro intro;
iwp_async_introack introack;
iwp_async_session_start start;
/// timestamp last intro packet sent at
llarp_time_t lastIntroSentAt = 0;
uint32_t intro_resend_job_id = 0;
iwp_async_session_start start;
iwp_async_introack introack;
byte_t token[32];
byte_t workbuf[MAX_PAD + 128];

@ -18,7 +18,8 @@ namespace llarp
public:
ServiceInfo() = default;
ServiceInfo(const ServiceInfo&& other)
ServiceInfo(ServiceInfo&& other) = delete;
/*
{
enckey = std::move(other.enckey);
signkey = std::move(other.signkey);
@ -26,14 +27,15 @@ namespace llarp
vanity = std::move(other.vanity);
m_CachedAddr = std::move(other.m_CachedAddr);
}
*/
ServiceInfo(const ServiceInfo& other)
: enckey(other.enckey)
, signkey(other.signkey)
, vanity(other.vanity)
, m_CachedAddr(other.m_CachedAddr)
{
enckey = other.enckey;
signkey = other.signkey;
version = other.version;
vanity = other.vanity;
m_CachedAddr = other.m_CachedAddr;
version = other.version;
}
void
@ -137,4 +139,4 @@ namespace llarp
} // namespace service
} // namespace llarp
#endif
#endif

@ -194,7 +194,7 @@ llarp_ai_list_bencode(struct llarp_ai_list *l, llarp_buffer_t *buff)
struct llarp_ai_list *
llarp_ai_list_new()
{
return new llarp_ai_list;
return new llarp_ai_list();
}
void
@ -240,10 +240,13 @@ llarp_ai_list_size(struct llarp_ai_list *l)
void
llarp_ai_list_iterate(struct llarp_ai_list *l, struct llarp_ai_list_iter *itr)
{
itr->list = l;
for(auto &ai : l->list)
if(!itr->visit(itr, &ai))
return;
if(l)
{
itr->list = l;
for(auto &ai : l->list)
if(!itr->visit(itr, &ai))
return;
}
}
bool

@ -17,22 +17,6 @@ struct llarp_async_iwp
namespace iwp
{
void
inform_keygen(void *user)
{
iwp_async_keygen *keygen = static_cast< iwp_async_keygen * >(user);
keygen->hook(keygen);
}
void
keygen(void *user)
{
iwp_async_keygen *keygen = static_cast< iwp_async_keygen * >(user);
keygen->iwp->crypto->encryption_keygen(keygen->keybuf);
keygen->hook(keygen);
// llarp_logic_queue_job(keygen->iwp->logic, job);
}
void
inform_intro(void *user)
{
@ -56,6 +40,8 @@ namespace iwp
memcpy(tmp, intro->remote_pubkey, 32);
memcpy(tmp + 32, intro->nonce, 32);
crypto->shorthash(e_k, buf);
// put nonce
memcpy(intro->buf + 32, intro->nonce, 32);
// e = SE(a.k, e_k, n[0:24])
memcpy(intro->buf + 64, llarp::seckey_topublic(intro->secretkey), 32);
buf.base = intro->buf + 64;
@ -91,9 +77,8 @@ namespace iwp
buf.base = intro->remote_pubkey;
buf.cur = buf.base;
buf.sz = 32;
memcpy(intro->remote_pubkey, intro->buf + 64, 32);
memcpy(buf.base, intro->buf + 64, 32);
crypto->xchacha20(buf, e_K, intro->nonce);
llarp::LogInfo("handshake from ", llarp::RouterID(intro->remote_pubkey));
// S = TKE(a.k, b.k, n)
crypto->transport_dh_server(sharedkey, intro->remote_pubkey,
intro->secretkey, intro->nonce);
@ -105,6 +90,7 @@ namespace iwp
if(memcmp(h, intro->buf, 32))
{
// hmac fail
delete[] intro->buf;
intro->buf = nullptr;
}
// inform result
@ -348,14 +334,6 @@ namespace iwp
}
} // namespace iwp
void
iwp_call_async_keygen(struct llarp_async_iwp *iwp,
struct iwp_async_keygen *keygen)
{
keygen->iwp = iwp;
llarp_threadpool_queue_job(iwp->worker, {keygen, &iwp::keygen});
}
bool
iwp_decrypt_frame(struct iwp_async_frame *frame)
{

@ -54,7 +54,6 @@ llarp_dht_lookup_router(struct llarp_dht_context *ctx,
{
job->dht = ctx;
job->found = false;
// TODO: check for reuse
llarp_rc_clear(&job->result);
llarp_logic_queue_job(ctx->parent->logic,
{job, &llarp::dht::Context::queue_router_lookup});

@ -12,6 +12,7 @@ namespace llarp
Context::Context()
{
randombytes((byte_t *)&ids, sizeof(uint64_t));
allowTransit = false;
}
Context::~Context()
@ -23,47 +24,69 @@ namespace llarp
}
void
Context::HandleExploreResult(const std::vector< RouterID > &result)
Context::Explore()
{
llarp::LogInfo("got ", result.size(), " routers from exploration");
for(const auto &pk : result)
// ask N random peers for new routers
llarp::LogInfo("Exploring network");
std::set< Key_t > peers;
if(nodes->GetManyRandom(peers, 3))
{
if(llarp_nodedb_get_rc(router->nodedb, pk) == nullptr)
{
// try connecting to it we don't know it
// this triggers a dht lookup
router->TryEstablishTo(pk);
}
for(const auto &peer : peers)
ExploreNetworkVia(peer);
}
else
llarp::LogError("failed to select random nodes for exploration");
}
void
Context::Explore()
struct ExploreNetworkJob : public TX< RouterID, RouterID >
{
// ask N random peers for new routers
llarp::LogInfo("Exploring network");
std::set< Key_t > peers;
Key_t peer;
size_t N = 5;
while(N--)
ExploreNetworkJob(const RouterID &peer, Context *ctx)
: TX< RouterID, RouterID >(TXOwner{}, peer, ctx)
{
}
void
Start(const TXOwner &peer)
{
parent->DHTSendTo(peer.node,
new FindRouterMessage(parent->OurKey(), peer.txid));
}
bool
GetNextPeer(Key_t &, const std::set< Key_t > &)
{
return false;
}
void
DoNextRequest(const Key_t &)
{
}
void
SendReply()
{
if(nodes->GetRandomNodeExcluding(peer, peers))
llarp::LogInfo("got ", valuesFound.size(), " routers from exploration");
for(const auto &pk : valuesFound)
{
peers.insert(peer);
uint64_t txid = ++ids;
TXOwner ownerKey;
ownerKey.node = peer;
ownerKey.txid = txid;
pendingTX.insert(
std::make_pair(ownerKey,
SearchJob(std::bind(&Context::HandleExploreResult,
this, std::placeholders::_1),
[]() {})));
DHTSendTo(peer, new FindRouterMessage(ourKey, txid));
if(llarp_nodedb_get_rc(parent->router->nodedb, pk) == nullptr)
{
// try connecting to it we don't know it
// this triggers a dht lookup
parent->router->TryEstablishTo(pk);
}
}
else
llarp::LogError("failed to select random nodes for exploration");
}
};
void
Context::ExploreNetworkVia(const Key_t &askpeer)
{
TXOwner peer(askpeer, ++ids);
auto tx = pendingExploreLookups.NewTX(
peer, askpeer, new ExploreNetworkJob(askpeer, this));
tx->Start(peer);
}
void
@ -106,173 +129,16 @@ namespace llarp
ctx->ScheduleCleanupTimer();
}
struct PathLookupJob
{
Key_t whoasked;
service::Address target;
uint64_t txid;
PathID_t pathID;
llarp_router *m_router;
std::set< service::IntroSet > localIntroSets;
std::set< Key_t > asked;
int m_TriesLeft = 5;
uint64_t R = 0;
PathLookupJob(llarp_router *r, const PathID_t &localpath, uint64_t tx)
: txid(tx), pathID(localpath), m_router(r)
{
whoasked = r->dht->impl.OurKey();
}
bool
TryAgain()
{
--m_TriesLeft;
auto &dht = m_router->dht->impl;
llarp::LogInfo("try lookup again");
return dht.TryLookupAgain(
this,
std::bind(&PathLookupJob::OnResult, this, std::placeholders::_1),
R);
}
void
Exausted()
{
llarp::LogWarn("Exausted peers for lookup");
auto path =
m_router->paths.GetByUpstream(m_router->dht->impl.OurKey(), pathID);
if(path)
{
llarp::routing::DHTMessage msg;
msg.M.push_back(new llarp::dht::GotIntroMessage(
std::vector< service::IntroSet >(), txid));
path->SendRoutingMessage(&msg, m_router);
}
else
llarp::LogError("no path for lookup pathid=", pathID);
m_router->dht->impl.RemovePendingTX(whoasked, txid);
}
bool
OnResult(const std::vector< service::IntroSet > &results)
{
auto path =
m_router->paths.GetByUpstream(m_router->dht->impl.OurKey(), pathID);
if(path)
{
for(const auto &introset : results)
{
localIntroSets.insert(std::move(introset));
}
auto sz = localIntroSets.size();
if(sz || target.IsZero() || m_TriesLeft == 0)
{
llarp::routing::DHTMessage msg;
std::vector< service::IntroSet > intros(sz);
for(const auto &i : localIntroSets)
{
intros[--sz] = i;
}
llarp::LogInfo("found ", sz, " introsets for txid=", txid);
msg.M.push_back(new llarp::dht::GotIntroMessage(intros, txid));
path->SendRoutingMessage(&msg, m_router);
m_router->dht->impl.RemovePendingTX(whoasked, txid);
}
else if(!target.IsZero())
{
return m_TriesLeft && TryAgain();
}
}
else
{
llarp::LogWarn("no local path for reply on PathTagLookupJob pathid=",
pathID);
}
return true;
}
};
void
Context::PropagateIntroSetTo(const Key_t &from, uint64_t txid,
const service::IntroSet &introset,
const Key_t &peer, uint64_t S,
const std::set< Key_t > &exclude)
{
llarp::LogInfo("Propagate Introset for ", introset.A.Name(), " to ",
peer);
auto id = ++ids;
std::vector< Key_t > E;
for(const auto &ex : exclude)
E.push_back(ex);
TXOwner ownerKey;
ownerKey.node = peer;
ownerKey.txid = id;
SearchJob job(
from, txid,
[](const std::vector< service::IntroSet > &) -> bool { return true; },
[]() {});
pendingTX[ownerKey] = job;
router->dht->impl.DHTSendTo(peer,
new PublishIntroMessage(introset, id, S, E));
}
void
Context::LookupTagForPath(const service::Tag &tag, uint64_t txid,
const llarp::PathID_t &path, const Key_t &askpeer)
{
auto id = ++ids;
TXOwner ownerKey;
ownerKey.node = askpeer;
ownerKey.txid = id;
PathLookupJob *j = new PathLookupJob(router, path, txid);
j->localIntroSets = FindRandomIntroSetsWithTag(tag);
SearchJob job(
OurKey(), txid,
std::bind(&PathLookupJob::OnResult, j, std::placeholders::_1),
[j]() { delete j; });
pendingTX[ownerKey] = job;
auto dhtmsg = new FindIntroMessage(tag, id);
dhtmsg->R = 5;
j->R = 5;
llarp::LogInfo("asking ", askpeer, " for tag ", tag.ToString(), " with ",
j->localIntroSets.size(), " local tags txid=", txid);
router->dht->impl.DHTSendTo(askpeer, dhtmsg);
}
void
Context::LookupIntroSetForPath(const service::Address &addr, uint64_t txid,
const llarp::PathID_t &path, Key_t askpeer)
{
auto id = ++ids;
TXOwner ownerKey;
ownerKey.node = askpeer;
ownerKey.txid = id;
PathLookupJob *j = new PathLookupJob(router, path, txid);
j->target = addr;
j->R = 5;
j->asked.insert(askpeer);
j->asked.insert(OurKey());
SearchJob job(
OurKey(), txid,
std::bind(&PathLookupJob::OnResult, j, std::placeholders::_1),
[j]() { delete j; });
pendingTX[ownerKey] = job;
auto dhtmsg = new FindIntroMessage(id, addr);
dhtmsg->R = 5;
llarp::LogInfo("asking ", askpeer, " for ", addr.ToString(),
" with txid=", id);
router->dht->impl.DHTSendTo(askpeer, dhtmsg);
}
std::set< service::IntroSet >
Context::FindRandomIntroSetsWithTag(const service::Tag &tag, size_t max)
Context::FindRandomIntroSetsWithTagExcluding(
const service::Tag &tag, size_t max,
const std::set< service::IntroSet > &exclude)
{
std::set< service::IntroSet > found;
auto &nodes = services->nodes;
@ -290,9 +156,12 @@ namespace llarp
{
if(itr->second.introset.topic.ToString() == tagname)
{
found.insert(itr->second.introset);
if(found.size() == max)
return found;
if(exclude.count(itr->second.introset) == 0)
{
found.insert(itr->second.introset);
if(found.size() == max)
return found;
}
}
++itr;
}
@ -301,9 +170,12 @@ namespace llarp
{
if(itr->second.introset.topic.ToString() == tagname)
{
found.insert(itr->second.introset);
if(found.size() == max)
return found;
if(exclude.count(itr->second.introset) == 0)
{
found.insert(itr->second.introset);
if(found.size() == max)
return found;
}
}
++itr;
}
@ -334,42 +206,30 @@ namespace llarp
}
else if(recursive) // are we doing a recursive lookup?
{
if((requester ^ target) < (ourKey ^ target))
// is the next peer we ask closer to the target than us?
if((next ^ target) < (ourKey ^ target))
{
// we aren't closer to the target than next hop
// so we won't ask neighboor recursively, tell them we don't have it
llarp::LogInfo("we aren't closer to ", target, " than ", next,
" so we end it here");
replies.push_back(
new GotRouterMessage(requester, txid, nullptr, false));
// yes it is closer, ask neighboor recursively
LookupRouterRecursive(target, requester, txid, next);
}
else
{
// yeah, ask neighboor recursively
// don't request with a new lookup if a pending job exists as this
// causes a dht feedback loop
auto pending = FindPendingTX(requester, txid);
if(pending)
LookupRouter(target, requester, txid, next, pending->job);
else
LookupRouter(target, requester, txid, next);
// no we are closer to the target so tell requester it's not there
// so they switch to iterative lookup
replies.push_back(
new GotRouterMessage(requester, txid, nullptr, false));
}
}
else // otherwise tell them we don't have it
else // iterative lookup and we don't have it tell them we don't have
// the target router
{
llarp::LogInfo("we don't have ", target,
" and this was an iterative request so telling ",
requester, " that we don't have it");
replies.push_back(
new GotRouterMessage(requester, txid, nullptr, false));
}
}
else
{
// we don't know it and have no closer peers
llarp::LogInfo("we don't have ", target,
" and have no closer peers so telling ", requester,
" that we don't have it");
// we don't know it and have no closer peers to ask
replies.push_back(
new GotRouterMessage(requester, txid, nullptr, false));
}
@ -385,48 +245,16 @@ namespace llarp
return &itr->second.introset;
}
void
Context::RemovePendingTX(const Key_t &owner, uint64_t id)
{
TXOwner search;
search.node = owner;
search.txid = id;
auto itr = pendingTX.find(search);
if(itr == pendingTX.end())
return;
pendingTX.erase(itr);
}
SearchJob *
Context::FindPendingTX(const Key_t &owner, uint64_t id)
{
TXOwner search;
search.node = owner;
search.txid = id;
auto itr = pendingTX.find(search);
if(itr == pendingTX.end())
return nullptr;
else
return &itr->second;
}
void
Context::CleanupTX()
{
auto now = llarp_time_now_ms();
llarp::LogDebug("DHT tick");
auto itr = pendingTX.begin();
while(itr != pendingTX.end())
{
if(itr->second.IsExpired(now))
{
itr->second.Timeout();
itr = pendingTX.erase(itr);
}
else
++itr;
}
pendingRouterLookups.Expire(now);
pendingIntrosetLookups.Expire(now);
pendingTagLookups.Expire(now);
pendingExploreLookups.Expire(now);
}
void
@ -452,14 +280,16 @@ namespace llarp
}
void
Context::DHTSendTo(const Key_t &peer, IMessage *msg)
Context::DHTSendTo(const Key_t &peer, IMessage *msg, bool keepalive)
{
auto m = new llarp::DHTImmeidateMessage(peer);
m->msgs.push_back(msg);
router->SendToOrQueue(peer, m);
// keep alive for 10 more seconds for response
auto now = llarp_time_now_ms();
router->PersistSessionUntil(peer, now + 10000);
if(keepalive)
{
auto now = llarp_time_now_ms();
router->PersistSessionUntil(peer, now + 10000);
}
}
bool
@ -476,133 +306,288 @@ namespace llarp
return true;
}
/// handles replying with a GIM for a lookup
struct IntroSetInformJob
struct ServiceAddressLookup
: public TX< service::Address, service::IntroSet >
{
service::Address target;
uint64_t R = 0;
int m_TriesLeft = 5;
std::set< service::IntroSet > localIntroSets;
std::set< Key_t > asked;
Key_t whoasked;
uint64_t txid;
llarp_router *m_Router;
IntroSetInformJob(llarp_router *r, const Key_t &replyTo, uint64_t id)
: whoasked(replyTo), txid(id), m_Router(r)
IntroSetLookupHandler handleResult;
uint64_t R;
ServiceAddressLookup(const TXOwner &asker, const service::Address &addr,
Context *ctx, uint64_t r,
IntroSetLookupHandler handler)
: TX< service::Address, service::IntroSet >(asker, addr, ctx)
, handleResult(handler)
, R(r)
{
peersAsked.insert(ctx->OurKey());
}
void
Exausted()
DoNextRequest(const Key_t &nextPeer)
{
m_Router->dht->impl.DHTSendTo(whoasked, new GotIntroMessage({}, txid));
m_Router->dht->impl.RemovePendingTX(whoasked, txid);
// iterate to next peer
parent->LookupIntroSetIterative(
target, whoasked.node, whoasked.txid, nextPeer,
std::bind(&ServiceAddressLookup::HandleNextRequestResult, this,
std::placeholders::_1));
}
void
HandleNextRequestResult(const std::vector< service::IntroSet > &results)
{
// merge results
std::set< service::IntroSet > found;
for(const auto &introset : valuesFound)
found.insert(introset);
for(const auto &introset : results)
found.insert(introset);
valuesFound.clear();
for(const auto &introset : found)
valuesFound.push_back(introset);
// send reply
SendReply();
}
bool
TryAgain()
GetNextPeer(Key_t &next, const std::set< Key_t > &exclude)
{
Key_t k = target.data();
return parent->nodes->FindCloseExcluding(k, next, exclude);
}
void
Start(const TXOwner &peer)
{
--m_TriesLeft;
llarp::LogInfo("try lookup again");
auto &dht = m_Router->dht->impl;
return dht.TryLookupAgain(this,
std::bind(&IntroSetInformJob::OnResult, this,
std::placeholders::_1),
R);
parent->DHTSendTo(peer.node,
new FindIntroMessage(peer.txid, target, R));
}
virtual void
SendReply()
{
if(handleResult)
handleResult(valuesFound);
else
parent->DHTSendTo(whoasked.node,
new GotIntroMessage(valuesFound, whoasked.txid));
}
};
struct LocalServiceAddressLookup : public ServiceAddressLookup
{
PathID_t localPath;
LocalServiceAddressLookup(const PathID_t &pathid, uint64_t txid,
const service::Address &addr, Context *ctx,
const Key_t &askpeer)
: ServiceAddressLookup(TXOwner{ctx->OurKey(), txid}, addr, ctx, 4,
nullptr)
, localPath(pathid)
{
}
void
SendReply()
{
auto path =
parent->router->paths.GetByUpstream(parent->OurKey(), localPath);
if(!path)
{
llarp::LogWarn(
"did not send reply for relayed dht request, no such local path "
"for pathid=",
localPath);
return;
}
routing::DHTMessage msg;
msg.M.push_back(new GotIntroMessage(valuesFound, whoasked.txid));
if(!path->SendRoutingMessage(&msg, parent->router))
{
llarp::LogWarn(
"failed to send routing message when informing result of dht "
"request, pathid=",
localPath);
}
}
};
void
Context::LookupIntroSetForPath(const service::Address &addr, uint64_t txid,
const llarp::PathID_t &path,
const Key_t &askpeer)
{
TXOwner asker(OurKey(), txid);
TXOwner peer(askpeer, ++ids);
auto tx = pendingIntrosetLookups.NewTX(
peer, addr,
new LocalServiceAddressLookup(path, txid, addr, this, askpeer));
tx->Start(peer);
}
struct PublishServiceJob : public TX< service::Address, service::IntroSet >
{
uint64_t S;
std::set< Key_t > dontTell;
service::IntroSet I;
PublishServiceJob(const TXOwner &asker, const service::IntroSet &introset,
Context *ctx, uint64_t s,
const std::set< Key_t > &exclude)
: TX< service::Address, service::IntroSet >(asker, introset.A.Addr(),
ctx)
, S(s)
, dontTell(exclude)
, I(introset)
{
}
void
Start(const TXOwner &peer)
{
std::vector< Key_t > exclude;
for(const auto &router : dontTell)
exclude.push_back(router);
parent->DHTSendTo(peer.node,
new PublishIntroMessage(I, peer.txid, S, exclude));
}
bool
OnResult(const std::vector< llarp::service::IntroSet > &results)
GetNextPeer(Key_t &, const std::set< Key_t > &)
{
for(const auto &introset : results)
return false;
}
void
DoNextRequest(const Key_t &)
{
}
void
SendReply()
{
// don't need this
}
};
void
Context::PropagateIntroSetTo(const Key_t &from, uint64_t txid,
const service::IntroSet &introset,
const Key_t &tellpeer, uint64_t S,
const std::set< Key_t > &exclude)
{
TXOwner asker(from, txid);
TXOwner peer(tellpeer, ++ids);
service::Address addr = introset.A.Addr();
auto tx = pendingIntrosetLookups.NewTX(
asker, addr,
new PublishServiceJob(asker, introset, this, S, exclude));
tx->Start(peer);
}
void
Context::LookupIntroSetRecursive(const service::Address &addr,
const Key_t &whoasked, uint64_t txid,
const Key_t &askpeer, uint64_t R,
IntroSetLookupHandler handler)
{
TXOwner asker(whoasked, txid);
TXOwner peer(askpeer, ++ids);
auto tx = pendingIntrosetLookups.NewTX(
peer, addr, new ServiceAddressLookup(asker, addr, this, R, handler));
tx->Start(peer);
}
void
Context::LookupIntroSetIterative(const service::Address &addr,
const Key_t &whoasked, uint64_t txid,
const Key_t &askpeer,
IntroSetLookupHandler handler)
{
TXOwner asker(whoasked, txid);
TXOwner peer(askpeer, ++ids);
auto tx = pendingIntrosetLookups.NewTX(
peer, addr, new ServiceAddressLookup(asker, addr, this, 0, handler));
tx->Start(peer);
}
struct TagLookup : public TX< service::Tag, service::IntroSet >
{
uint64_t R;
TagLookup(const TXOwner &asker, const service::Tag &tag, Context *ctx,
uint64_t r)
: TX< service::Tag, service::IntroSet >(asker, tag, ctx), R(r)
{
}
void
Start(const TXOwner &peer)
{
parent->DHTSendTo(peer.node, new FindIntroMessage(target, peer.txid));
}
bool
GetNextPeer(Key_t &nextpeer, const std::set< Key_t > &exclude)
{
return false;
}
void
DoNextRequest(const Key_t &nextPeer)
{
}
void
SendReply()
{
std::set< service::IntroSet > found;
for(const auto &remoteTag : valuesFound)
{
localIntroSets.insert(std::move(introset));
found.insert(remoteTag);
}
size_t sz = localIntroSets.size();
if(sz || target.IsZero() || m_TriesLeft == 0)
// collect our local values if we haven't hit a limit
if(found.size() < 8)
{
std::vector< service::IntroSet > reply;
for(const auto &introset : localIntroSets)
for(const auto &localTag :
parent->FindRandomIntroSetsWithTagExcluding(target, 2, found))
{
reply.push_back(std::move(introset));
found.insert(localTag);
}
localIntroSets.clear();
m_Router->dht->impl.DHTSendTo(whoasked,
new GotIntroMessage(reply, txid));
m_Router->dht->impl.RemovePendingTX(whoasked, txid);
}
else if(!target.IsZero())
std::vector< service::IntroSet > values;
for(const auto &introset : found)
{
return m_TriesLeft && TryAgain();
values.push_back(introset);
}
return true;
parent->DHTSendTo(whoasked.node,
new GotIntroMessage(values, whoasked.txid));
}
};
void
Context::LookupTag(const llarp::service::Tag &tag, const Key_t &whoasked,
uint64_t txid, const Key_t &askpeer,
const std::set< service::IntroSet > &include, uint64_t R)
{
auto id = ++ids;
if(txid == 0)
txid = id;
TXOwner ownerKey;
ownerKey.node = askpeer;
ownerKey.txid = id;
IntroSetInformJob *j = new IntroSetInformJob(router, whoasked, txid);
j->localIntroSets = include;
SearchJob job(
whoasked, txid,
std::bind(&IntroSetInformJob::OnResult, j, std::placeholders::_1),
[j]() { delete j; });
pendingTX[ownerKey] = job;
auto dhtmsg = new FindIntroMessage(tag, id);
dhtmsg->R = R;
router->dht->impl.DHTSendTo(askpeer, dhtmsg);
}
void
Context::LookupIntroSet(const service::Address &addr, const Key_t &whoasked,
uint64_t txid, const Key_t &askpeer, uint64_t R,
std::set< Key_t > excludes)
Context::LookupTagRecursive(const service::Tag &tag, const Key_t &whoasked,
uint64_t whoaskedTX, const Key_t &askpeer,
uint64_t R)
{
auto id = ++ids;
if(txid == 0)
txid = id;
TXOwner ownerKey;
ownerKey.node = askpeer;
ownerKey.txid = id;
IntroSetInformJob *j = new IntroSetInformJob(router, whoasked, txid);
j->target = addr;
for(const auto item : excludes)
j->asked.insert(item);
j->R = R;
SearchJob job(
whoasked, txid, addr.ToKey(), {},
std::bind(&IntroSetInformJob::OnResult, j, std::placeholders::_1),
[j]() { delete j; });
pendingTX[ownerKey] = job;
auto dhtmsg = new FindIntroMessage(id, addr);
dhtmsg->R = R;
llarp::LogInfo("asking ", askpeer, " for ", addr.ToString(),
" on request of ", whoasked);
router->dht->impl.DHTSendTo(askpeer, dhtmsg);
TXOwner asker(whoasked, whoaskedTX);
TXOwner peer(askpeer, ++ids);
auto tx = pendingTagLookups.NewTX(peer, tag,
new TagLookup(asker, tag, this, R));
tx->Start(peer);
}
bool
Context::LookupRouterExploritory(const Key_t &requester, uint64_t txid,
const RouterID &target,
std::vector< IMessage * > &reply)
Context::HandleExploritoryRouterLookup(const Key_t &requester,
uint64_t txid,
const RouterID &target,
std::vector< IMessage * > &reply)
{
std::vector< RouterID > closer;
Key_t t(target.data());
std::set< Key_t > found;
if(!nodes->GetManyNearExcluding(t, found, 2,
if(!nodes->GetManyNearExcluding(t, found, 4,
std::set< Key_t >{ourKey, requester}))
{
llarp::LogError(
@ -610,64 +595,114 @@ namespace llarp
return false;
}
for(const auto &f : found)
closer.push_back(f);
closer.emplace_back(f.data());
reply.push_back(new GotRouterMessage(txid, closer, false));
return true;
}
void
Context::LookupRouter(const Key_t &target, const Key_t &whoasked,
uint64_t txid, const Key_t &askpeer,
llarp_router_lookup_job *job, bool iterative,
std::set< Key_t > excludes)
struct RecursiveRouterLookup : public TX< RouterID, llarp_rc >
{
if(target.IsZero() || whoasked.IsZero() || askpeer.IsZero())
llarp_router_lookup_job *job;
RecursiveRouterLookup(const TXOwner &whoasked, const RouterID &target,
Context *ctx, llarp_router_lookup_job *j)
: TX< RouterID, llarp_rc >(whoasked, target, ctx)
{
return;
job = j;
peersAsked.insert(ctx->OurKey());
}
auto id = ++ids;
TXOwner ownerKey;
ownerKey.node = askpeer;
ownerKey.txid = id;
if(txid == 0)
txid = id;
SearchJob j(whoasked, txid, target, excludes, job);
pendingTX[ownerKey] = j;
llarp::LogInfo("Asking ", askpeer, " for router ", target, " for ",
whoasked);
auto dhtmsg = new FindRouterMessage(askpeer, target, id);
dhtmsg->iterative = iterative;
router->dht->impl.DHTSendTo(askpeer, dhtmsg);
bool
GetNextPeer(Key_t &next, const std::set< Key_t > &exclude)
{
// TODO: implement iterative (?)
return false;
}
void
DoNextRequest(const Key_t &next)
{
}
void
Start(const TXOwner &peer)
{
parent->DHTSendTo(
peer.node,
new FindRouterMessage(parent->OurKey(), target, peer.txid));
}
void
SendTo(const Key_t &peer, IMessage *msg) const
{
return parent->DHTSendTo(peer, msg);
}
void
SendReply()
{
if(job)
{
job->found = false;
if(valuesFound.size())
{
job->found =
memcmp(valuesFound[0].pubkey, job->target, PUBKEYSIZE) == 0;
if(job->found)
llarp_rc_copy(&job->result, &valuesFound[0]);
}
if(job->hook)
job->hook(job);
else
delete job;
}
else
{
llarp_rc *found = nullptr;
if(valuesFound.size())
found = &valuesFound[0];
parent->DHTSendTo(
whoasked.node,
new GotRouterMessage({}, whoasked.txid, found, false));
}
for(auto rc : valuesFound)
llarp_rc_free(&rc);
}
};
void
Context::LookupRouterRecursive(const RouterID &target,
const Key_t &whoasked, uint64_t txid,
const Key_t &askpeer,
llarp_router_lookup_job *job)
{
TXOwner asker(whoasked, txid);
TXOwner peer(askpeer, ++ids);
auto tx = pendingRouterLookups.NewTX(
peer, target, new RecursiveRouterLookup(asker, target, this, job));
tx->Start(peer);
}
void
Context::LookupRouterViaJob(llarp_router_lookup_job *job)
{
Key_t peer;
/*
llarp::LogInfo("LookupRouterViaJob dumping nodes");
for(const auto &item : nodes->nodes)
{
llarp::LogInfo("LookupRouterViaJob dumping node: ", item.first);
}
*/
llarp::LogInfo("LookupRouterViaJob node count: ", nodes->nodes.size());
llarp::LogInfo("LookupRouterViaJob recursive: ",
job->iterative ? "yes" : "no");
if(nodes->FindClosest(job->target, peer))
LookupRouter(job->target, ourKey, 0, peer, job, job->iterative);
LookupRouterRecursive(job->target, ourKey, 0, peer, job);
else if(job->hook)
{
job->found = false;
job->hook(job);
}
else
delete job;
}
void
Context::queue_router_lookup(void *user)
{
llarp_router_lookup_job *job =
struct llarp_router_lookup_job *job =
static_cast< llarp_router_lookup_job * >(user);
job->dht->impl.LookupRouterViaJob(job);
}

@ -95,7 +95,7 @@ namespace llarp
return false;
}
auto& dht = ctx->impl;
if((!relayed) && dht.FindPendingTX(From, T))
if(dht.pendingIntrosetLookups.HasPendingLookupFrom(TXOwner{From, T}))
{
llarp::LogWarn("duplicate FIM from ", From, " txid=", T);
return false;
@ -108,7 +108,6 @@ namespace llarp
const auto introset = dht.GetIntroSetByServiceAddress(S);
if(introset)
{
llarp::LogInfo("introset found locally");
service::IntroSet i = *introset;
replies.push_back(new GotIntroMessage({i}, T));
return true;
@ -118,7 +117,6 @@ namespace llarp
if(R == 0)
{
// we don't have it, reply with a direct reply
llarp::LogInfo("dont have intro set and no recursion");
replies.push_back(new GotIntroMessage({}, T));
return true;
}
@ -136,20 +134,22 @@ namespace llarp
if((us ^ target) < (peer ^ target))
{
// we are not closer than our peer to the target so don't
// revurse
// recurse farther
replies.push_back(new GotIntroMessage({}, T));
return true;
}
else if(R >= 1)
dht.LookupIntroSet(S, From, T, peer, R - 1, exclude);
else if(R > 0)
dht.LookupIntroSetRecursive(S, From, T, peer, R - 1);
else
dht.LookupIntroSet(S, From, T, peer, 0, exclude);
dht.LookupIntroSetIterative(S, From, T, peer);
}
return true;
}
else
{
llarp::LogError(
"cannot find closer peers for introset lookup for ", S);
return true;
}
}
}
@ -170,15 +170,15 @@ namespace llarp
}
else
{
auto introsets = dht.FindRandomIntroSetsWithTag(N);
if(R == 0)
{
// base case
auto introsets = dht.FindRandomIntroSetsWithTagExcluding(N, 2, {});
std::vector< service::IntroSet > reply;
for(const auto& introset : introsets)
{
reply.push_back(introset);
}
// we are iterative and don't have it, reply with a direct reply
replies.push_back(new GotIntroMessage(reply, T));
return true;
}
@ -187,7 +187,7 @@ namespace llarp
// tag lookup
if(dht.nodes->GetRandomNodeExcluding(peer, exclude))
{
dht.LookupTag(N, From, T, peer, introsets, R - 1);
dht.LookupTagRecursive(N, From, T, peer, R - 1);
}
}
}
@ -195,4 +195,4 @@ namespace llarp
return true;
}
} // namespace dht
} // namespace llarp
} // namespace llarp

@ -75,7 +75,7 @@ namespace llarp
if(path)
{
replies.push_back(
new GotRouterMessage(K, txid, &dht.router->rc, false));
new GotRouterMessage(K.data(), txid, &dht.router->rc, false));
return true;
}
return false;
@ -90,8 +90,9 @@ namespace llarp
job->dht = ctx;
memcpy(job->target, K, sizeof(job->target));
Key_t peer;
if(dht.nodes->FindClosest(K, peer))
dht.LookupRouter(K, dht.OurKey(), txid, peer, job);
Key_t k = K.data();
if(dht.nodes->FindClosest(k, peer))
dht.LookupRouterRecursive(K, dht.OurKey(), txid, peer, job);
return true;
}
@ -200,16 +201,15 @@ namespace llarp
" when we are not allowing dht transit");
return false;
}
auto pending = dht.FindPendingTX(From, txid);
if(pending)
if(dht.pendingRouterLookups.HasPendingLookupFrom({From, txid}))
{
llarp::LogWarn("Got duplicate DHT lookup from ", From, " txid=", txid);
llarp::LogWarn("Duplicate FRM from ", From, " txid=", txid);
return false;
}
if(exploritory)
return dht.LookupRouterExploritory(From, txid, K, replies);
return dht.HandleExploritoryRouterLookup(From, txid, K, replies);
else
dht.LookupRouterRelayed(From, txid, K, !iterative, replies);
dht.LookupRouterRelayed(From, txid, K.data(), !iterative, replies);
return true;
}
} // namespace dht

@ -36,22 +36,22 @@ namespace llarp
return false;
}
}
auto pending = dht.FindPendingTX(From, T);
if(pending)
TXOwner owner(From, T);
auto tagLookup = dht.pendingTagLookups.GetPendingLookupFrom(owner);
if(tagLookup)
{
if(pending->FoundIntros(I))
{
dht.RemovePendingTX(From, T);
llarp::LogInfo("removed pending tx from ", From, " txid=", T);
}
dht.pendingTagLookups.Inform(owner, tagLookup->target, I);
return true;
}
else
auto serviceLookup =
dht.pendingIntrosetLookups.GetPendingLookupFrom(owner);
if(serviceLookup)
{
llarp::LogWarn("got GIM from ", From,
" with no previous pending transaction, txid=", T);
return false;
dht.pendingIntrosetLookups.Inform(owner, serviceLookup->target, I);
return true;
}
llarp::LogError("no pending TX for GIM from ", From, " txid=", T);
return false;
}
bool
@ -99,4 +99,4 @@ namespace llarp
return bencode_end(buf);
}
} // namespace dht
} // namespace llarp
} // namespace llarp

@ -9,7 +9,7 @@ namespace llarp
{
GotRouterMessage::~GotRouterMessage()
{
for(auto &rc : R)
for(auto rc : R)
llarp_rc_free(&rc);
R.clear();
}
@ -81,60 +81,27 @@ namespace llarp
return pathset->HandleGotRouterMessage(this);
}
}
SearchJob *pending = dht.FindPendingTX(From, txid);
if(pending)
TXOwner owner(From, txid);
if(dht.pendingExploreLookups.HasPendingLookupFrom(owner))
{
if(R.size())
{
pending->FoundRouter(&R[0]);
if(pending->requester != dht.OurKey())
{
replies.push_back(new GotRouterMessage(
pending->target, pending->requesterTX, &R[0], false));
}
}
else if(N.empty())
{
// iterate to next closest peer
Key_t nextPeer;
pending->exclude.insert(From);
if(pending->exclude.size() < 3
&& dht.nodes->FindCloseExcluding(pending->target, nextPeer,
pending->exclude))
{
llarp::LogInfo(pending->target, " was not found via ", From,
" iterating to next peer ", nextPeer,
" already asked ", pending->exclude.size(),
" other peers");
// REVIEW: is this ok to relay the pending->job as the current job
// (seems to make things work)
dht.LookupRouter(pending->target, pending->requester,
pending->requesterTX, nextPeer, pending->job, true,
pending->exclude);
}
else
{
llarp::LogInfo(pending->target, " was not found via ", From,
" and we won't look it up");
pending->FoundRouter(nullptr);
if(pending->requester != dht.OurKey())
{
replies.push_back(new GotRouterMessage(
pending->target, pending->requesterTX, nullptr, false));
}
}
}
else if(pending->foundNear)
{
// near peers provided
pending->foundNear(N);
}
dht.RemovePendingTX(From, txid);
if(N.size() == 0)
dht.pendingExploreLookups.NotFound(owner);
else
dht.pendingExploreLookups.Found(owner, From, N);
return true;
}
llarp::LogWarn(
"Got response for DHT transaction we are not tracking, txid=", txid);
return false;
if(!dht.pendingRouterLookups.HasPendingLookupFrom(owner))
{
llarp::LogWarn("Unwarrented GRM from ", From, " txid=", txid);
return false;
}
if(R.size() == 1)
dht.pendingRouterLookups.Found(owner, R[0].pubkey, R);
else
dht.pendingRouterLookups.NotFound(owner);
return true;
}
} // namespace dht
} // namespace llarp

@ -1,103 +0,0 @@
#include <llarp/dht/search_job.hpp>
namespace llarp
{
namespace dht
{
SearchJob::SearchJob()
{
started = 0;
requester.Zero();
target.Zero();
}
SearchJob::SearchJob(const Key_t &asker, uint64_t tx, const Key_t &key,
const std::set< Key_t > &excludes,
llarp_router_lookup_job *j)
: job(j)
, started(llarp_time_now_ms())
, requester(asker)
, requesterTX(tx)
, target(key)
, exclude(excludes)
{
}
SearchJob::SearchJob(const Key_t &asker, uint64_t tx, const Key_t &key,
const std::set< Key_t > &excludes,
IntroSetHookFunc foundIntroset, DoneFunc done)
: foundIntroHook(foundIntroset)
, onDone(done)
, started(llarp_time_now_ms())
, requester(asker)
, requesterTX(tx)
, target(key)
, exclude(excludes)
{
}
SearchJob::SearchJob(const Key_t &asker, uint64_t tx,
IntroSetHookFunc found, DoneFunc done)
: foundIntroHook(found)
, onDone(done)
, started(llarp_time_now_ms())
, requester(asker)
, requesterTX(tx)
{
target.Zero();
}
SearchJob::SearchJob(FoundNearFunc near, DoneFunc done)
: foundNear(near), onDone(done)
{
target.Randomize();
started = llarp_time_now_ms();
}
bool
SearchJob::FoundIntros(
const std::vector< llarp::service::IntroSet > &introsets) const
{
if(foundIntroHook && foundIntroHook(introsets))
{
if(onDone)
onDone();
return true;
}
return foundIntroHook == nullptr;
}
void
SearchJob::FoundRouter(const llarp_rc *router) const
{
if(job && job->hook)
{
if(router)
{
job->found = true;
llarp_rc_copy(&job->result, router);
}
job->hook(job);
}
}
bool
SearchJob::IsExpired(llarp_time_t now) const
{
return now - started >= JobTimeout;
}
void
SearchJob::Timeout() const
{
if(job)
{
job->found = false;
job->hook(job);
}
else if(foundIntroHook)
{
foundIntroHook({});
}
}
} // namespace dht
} // namespace llarp

@ -42,7 +42,7 @@ llarp_ev_loop_free(struct llarp_ev_loop **ev)
int
llarp_ev_loop_run(struct llarp_ev_loop *ev, struct llarp_logic *logic)
{
while(true)
while(ev->running())
{
if(ev->tick(EV_TICK_INTERVAL) == -1)
return -1;
@ -59,7 +59,7 @@ llarp_ev_loop_run_single_process(struct llarp_ev_loop *ev,
while(ev->running())
{
ev->tick(EV_TICK_INTERVAL);
llarp_logic_tick(logic);
llarp_logic_tick_async(logic);
llarp_threadpool_tick(tp);
}
}

@ -53,19 +53,16 @@ namespace llarp
virtual void
flush_write()
{
m_writeq.ProcessIf([&](WriteBuffer* buffer) -> bool {
m_writeq.Process([&](WriteBuffer* buffer) {
// todo: wtf???
#ifndef _WIN32
if(write(fd, buffer->buf, buffer->bufsz) == -1)
{
// if we would block we save the entries for later
return errno == EWOULDBLOCK || errno == EAGAIN;
}
write(fd, buffer->buf, buffer->bufsz);
// if we would block we save the entries for later
// discard entry
return true;
#else
// writefile
return false;
// writefile
#endif
});
/// reset errno

@ -23,7 +23,7 @@ bool
frame_state::process_inbound_queue()
{
uint64_t last = 0;
recvqueue.Process([&](InboundMessage *msg) {
recvqueue.Process([&](const InboundMessage *msg) {
if(last != msg->msgid)
{
auto buffer = msg->Buffer();
@ -58,12 +58,6 @@ frame_state::either_has_flag(byte_t flag) const
void
frame_state::clear()
{
auto _rx = rx;
auto _tx = tx;
for(auto &item : _rx)
delete item.second;
for(auto &item : _tx)
delete item.second;
rx.clear();
tx.clear();
}
@ -100,9 +94,11 @@ frame_state::got_xmit(frame_header hdr, size_t sz)
auto itr = rx.find(h);
if(itr == rx.end())
{
auto msg = new transit_message(x);
rx[h] = msg;
rxIDs[id] = h;
auto msg = rx.insert(std::make_pair(h,
std::unique_ptr< transit_message >(
new transit_message(x))))
.first->second.get();
rxIDs.insert(std::make_pair(id, h));
llarp::LogDebug("got message XMIT with ", (int)x.numfrags(),
" fragment"
"s");
@ -199,7 +195,9 @@ frame_state::inbound_frame_complete(uint64_t id)
{
bool success = false;
std::vector< byte_t > msg;
auto rxmsg = rx[rxIDs[id]];
std::unique_ptr< transit_message > rxmsg = std::move(rx[rxIDs[id]]);
rx.erase(rxIDs[id]);
rxIDs.erase(id);
llarp::ShortHash digest;
if(rxmsg->reassemble(msg))
{
@ -243,10 +241,6 @@ frame_state::inbound_frame_complete(uint64_t id)
}
}
delete rxmsg;
rxIDs.erase(id);
rx.erase(digest);
if(!success)
llarp::LogWarn("Failed to process inbound message ", id);
@ -281,27 +275,23 @@ frame_state::got_acks(frame_header hdr, size_t sz)
auto now = llarp_time_now_ms();
transit_message *msg = itr->second;
if(bitmask == ~(0U))
{
tx.erase(msgid);
delete msg;
}
else
{
msg->ack(bitmask);
itr->second->ack(bitmask);
if(msg->completed())
if(itr->second->completed())
{
llarp::LogDebug("message transmitted msgid=", msgid);
tx.erase(msgid);
delete msg;
tx.erase(itr);
}
else if(msg->should_resend_frags(now))
else if(itr->second->should_resend_frags(now))
{
llarp::LogDebug("message ", msgid, " retransmit fragments");
msg->retransmit_frags(sendqueue, txflags);
itr->second->retransmit_frags(sendqueue, txflags);
}
}
return true;
@ -341,36 +331,10 @@ frame_state::process(byte_t *buf, size_t sz)
}
}
/*
bool
frame_state::next_frame(llarp_buffer_t *buf)
{
auto left = sendqueue.size();
if(left)
{
llarp::LogDebug("next frame, ", left, " frames left in send queue");
auto &send = sendqueue.front();
buf->base = send->data();
buf->cur = send->data();
buf->sz = send->size();
return true;
}
return false;
}
void
frame_state::pop_next_frame()
{
auto &buf = sendqueue.front();
delete buf;
sendqueue.pop();
}
*/
void
frame_state::queue_tx(uint64_t id, transit_message *msg)
{
tx.insert(std::make_pair(id, msg));
tx.insert(std::make_pair(id, std::unique_ptr< transit_message >(msg)));
msg->generate_xmit(sendqueue, txflags);
// msg->retransmit_frags(sendqueue, txflags);
}

@ -25,13 +25,6 @@ llarp_link::has_intro_from(const llarp::Addr& from)
return m_PendingSessions.find(from) != m_PendingSessions.end();
}
void
llarp_link::put_intro_from(llarp_link_session* s)
{
lock_t lock(m_PendingSessions_Mutex);
m_PendingSessions[s->addr] = s;
}
void
llarp_link::remove_intro_from(const llarp::Addr& from)
{
@ -105,7 +98,6 @@ llarp_link::TickSessions()
{
if(itr->second->timedout(now))
{
itr->second->done();
itr = m_PendingSessions.erase(itr);
}
else
@ -119,8 +111,8 @@ llarp_link::TickSessions()
{
if(itr->second->Tick(now))
{
itr->second->done();
delete itr->second;
if(itr->second->get_remote_router())
m_Connected.erase(itr->second->get_remote_router()->pubkey);
itr = m_sessions.erase(itr);
}
else
@ -142,32 +134,13 @@ llarp_link::sendto(const byte_t* pubkey, llarp_buffer_t buf)
auto inner_itr = m_sessions.find(itr->second);
if(inner_itr != m_sessions.end())
{
link = inner_itr->second;
link = inner_itr->second.get();
}
}
}
return link && link->sendto(buf);
}
void
llarp_link::UnmapAddr(const llarp::Addr& src)
{
lock_t lock(m_Connected_Mutex);
// std::unordered_map< llarp::pubkey, llarp::Addr, llarp::pubkeyhash >
auto itr = std::find_if(
m_Connected.begin(), m_Connected.end(),
[src](const std::pair< llarp::PubKey, llarp::Addr >& item) -> bool {
return src == item.second;
});
if(itr == std::end(m_Connected))
return;
// tell router we are done with this session
router->SessionClosed(itr->first);
m_Connected.erase(itr);
}
llarp_link_session*
llarp_link::create_session(const llarp::Addr& src)
{
@ -177,92 +150,38 @@ llarp_link::create_session(const llarp::Addr& src)
bool
llarp_link::has_session_via(const llarp::Addr& dst)
{
lock_t lock(m_sessions_Mutex);
return m_sessions.find(dst) != m_sessions.end();
}
llarp_link_session*
llarp_link::find_session(const llarp::Addr& addr)
{
lock_t lock(m_sessions_Mutex);
auto itr = m_sessions.find(addr);
if(itr == m_sessions.end())
return nullptr;
else
return itr->second;
}
void
llarp_link::put_session(const llarp::Addr& src, llarp_link_session* impl)
llarp_link::pending_session_active(const llarp::Addr& addr)
{
lock_t lock(m_sessions_Mutex);
impl->our_router = &router->rc;
m_sessions.insert(std::make_pair(src, impl));
lock_t lockpending(m_PendingSessions_Mutex);
auto itr = m_PendingSessions.find(addr);
if(itr == m_PendingSessions.end())
return;
itr->second->our_router = &router->rc;
m_sessions.insert(std::make_pair(addr, std::move(itr->second)));
m_PendingSessions.erase(itr);
}
void
llarp_link::clear_sessions()
{
lock_t lock(m_sessions_Mutex);
auto itr = m_sessions.begin();
while(itr != m_sessions.end())
{
delete itr->second;
itr = m_sessions.erase(itr);
}
}
void
llarp_link::iterate_sessions(std::function< bool(llarp_link_session*) > visitor)
{
auto now = llarp_time_now_ms();
std::list< llarp_link_session* > slist;
{
lock_t lock(m_sessions_Mutex);
auto itr = m_sessions.begin();
while(itr != m_sessions.end())
{
// if not timing out soon add to list to iterate on
if(!itr->second->timedout(now, 11500))
slist.push_back(itr->second);
++itr;
}
}
for(auto& s : slist)
if(!visitor(s))
return;
m_sessions.clear();
}
void
llarp_link::PumpLogic()
{
auto now = llarp_time_now_ms();
iterate_sessions([now](llarp_link_session* s) -> bool {
s->TickLogic(now);
return true;
});
}
void
llarp_link::RemoveSession(llarp_link_session* s)
{
{
lock_t lock(m_sessions_Mutex);
auto itr = m_sessions.find(s->addr);
if(itr != m_sessions.end())
{
UnmapAddr(s->addr);
s->done();
m_sessions.erase(itr);
}
}
auto itr = m_sessions.begin();
while(itr != m_sessions.end())
{
lock_t lock(m_PendingSessions_Mutex);
auto itr = m_PendingSessions.find(s->addr);
if(itr != m_PendingSessions.end())
m_PendingSessions.erase(itr);
itr->second->TickLogic(now);
++itr;
}
delete s;
}
const uint8_t*
@ -315,6 +234,30 @@ llarp_link::handle_cleanup_timer(void* l, uint64_t orig, uint64_t left)
link->issue_cleanup_timer(orig);
}
void
llarp_link::visit_session(
const llarp::Addr& fromaddr,
std::function< void(const std::unique_ptr< llarp_link_session >&) > visit)
{
{
auto itr = m_sessions.find(fromaddr);
if(itr == m_sessions.end())
{
auto pitr = m_PendingSessions.find(fromaddr);
if(pitr == m_PendingSessions.end())
visit(m_PendingSessions
.insert(std::make_pair(fromaddr,
std::unique_ptr< llarp_link_session >(
create_session(fromaddr))))
.first->second);
else
visit(pitr->second);
}
else
visit(itr->second);
}
}
void
llarp_link::handle_recvfrom(struct llarp_udp_io* udp,
const struct sockaddr* saddr, const void* buf,
@ -322,13 +265,10 @@ llarp_link::handle_recvfrom(struct llarp_udp_io* udp,
{
llarp_link* link = static_cast< llarp_link* >(udp->user);
llarp_link_session* s = link->find_session(*saddr);
if(s == nullptr)
{
// new inbound session
s = link->create_session(*saddr);
}
s->recv(buf, sz);
link->visit_session(
*saddr, [buf, sz](const std::unique_ptr< llarp_link_session >& s) {
s->recv(buf, sz);
});
}
void
@ -362,7 +302,9 @@ void
llarp_link::after_recv(llarp_udp_io* udp)
{
llarp_link* self = static_cast< llarp_link* >(udp->user);
self->PumpLogic();
llarp_logic_queue_job(
self->logic,
{self, [](void* u) { static_cast< llarp_link* >(u)->PumpLogic(); }});
}
bool
@ -461,18 +403,18 @@ bool
llarp_link::try_establish(struct llarp_link_establish_job* job)
{
llarp::Addr dst(job->ai);
llarp::LogDebug("establish session to ", dst);
llarp_link_session* s = find_session(dst);
if(s == nullptr)
if(has_session_via(dst))
return false;
if(m_PendingSessions.find(dst) == m_PendingSessions.end())
{
s = create_session(dst);
put_session(dst, s);
llarp::LogDebug("establish session to ", dst);
visit_session(dst, [job](const std::unique_ptr< llarp_link_session >& s) {
s->establish_job = job;
s->frame.alive(); // mark it alive
s->introduce(job->ai.enc_key);
});
return true;
}
else
return false;
s->establish_job = job;
s->frame.alive(); // mark it alive
s->introduce(job->ai.enc_key);
return true;
}
}

@ -174,7 +174,6 @@ llarp_link_session::IsEstablished()
void
llarp_link_session::send_LIM()
{
llarp::LogDebug("send LIM");
llarp::ShortHash digest;
// 64 bytes overhead for link message
byte_t tmp[MAX_RC_SIZE + 64];
@ -204,8 +203,12 @@ handle_generated_session_start(iwp_async_session_start *start)
llarp_link_session *link = static_cast< llarp_link_session * >(start->user);
if(llarp_ev_udp_sendto(link->udp, link->addr, start->buf, start->sz) == -1)
{
llarp::LogError("sendto failed");
return;
}
link->EnterState(llarp_link_session::State::eSessionStartSent);
link->serv->pending_session_active(link->addr);
link->working = false;
}
@ -213,14 +216,15 @@ static void
handle_verify_intro(iwp_async_intro *intro)
{
llarp_link_session *self = static_cast< llarp_link_session * >(intro->user);
if(self == nullptr)
return;
self->working = false;
if(!intro->buf)
{
self->serv->remove_intro_from(self->addr);
llarp::LogError("intro verify failed from ", self->addr, " via ",
self->serv->addr);
delete self;
return;
}
delete[] intro->buf;
memcpy(self->remote, intro->remote_pubkey, 32);
self->intro_ack();
}
@ -236,6 +240,7 @@ handle_verify_introack(iwp_async_introack *introack)
{
// invalid signature
llarp::LogError("introack verify failed from ", link->addr);
link->serv->remove_intro_from(link->addr);
return;
}
// cancel resend
@ -273,6 +278,10 @@ handle_establish_timeout(void *user, uint64_t orig, uint64_t left)
void
llarp_link_session::done()
{
if(intro_resend_job_id)
llarp_logic_cancel_call(serv->logic, intro_resend_job_id);
if(establish_job_id)
llarp_logic_cancel_call(serv->logic, establish_job_id);
}
void
@ -314,38 +323,23 @@ llarp_link_session::EnterState(State st)
void
llarp_link_session::on_intro(const void *buf, size_t sz)
{
if(sz >= sizeof(workbuf))
{
// too big?
llarp::LogError("intro too big from ", addr);
delete this;
return;
}
if(serv->has_intro_from(addr))
{
llarp::LogError("duplicate intro from ", addr);
delete this;
if(sz < 32 * 3)
return;
}
serv->put_intro_from(this);
// copy so we own it
memcpy(workbuf, buf, sz);
intro.buf = workbuf;
intro.sz = sz;
// copy
auto intro = new iwp_async_intro;
intro->buf = new byte_t[sz];
memcpy(intro->buf, buf, sz);
memcpy(intro->nonce, intro->buf + 32, 32);
intro->sz = sz;
// give secret key
intro.secretkey = eph_seckey;
// and nonce
intro.nonce = intro.buf + 32;
intro.user = this;
memcpy(intro->secretkey, eph_seckey, 64);
intro->user = this;
// set call back hook
intro.hook = &handle_verify_intro;
// put remote pubkey into this buffer
intro.remote_pubkey = remote;
intro->hook = &handle_verify_intro;
// call
EnterState(eIntroRecv);
working = true;
iwp_call_async_verify_intro(iwp, &intro);
iwp_call_async_verify_intro(iwp, intro);
}
void
@ -355,6 +349,7 @@ llarp_link_session::on_intro_ack(const void *buf, size_t sz)
{
// too big?
llarp::LogError("introack too big");
serv->remove_intro_from(addr);
return;
}
// copy buffer so we own it
@ -388,8 +383,7 @@ llarp_link_session::get_parent()
void
llarp_link_session::TickLogic(llarp_time_t now)
{
decryptedFrames.Process(
[&](iwp_async_frame *msg) { handle_frame_decrypt(msg); });
decryptedFrames.Process([=](iwp_async_frame *f) { handle_frame_decrypt(f); });
frame.process_inbound_queue();
frame.retransmit(now);
pump();
@ -444,37 +438,37 @@ static void
handle_verify_session_start(iwp_async_session_start *s)
{
llarp_link_session *self = static_cast< llarp_link_session * >(s->user);
self->working = false;
if(!s->buf)
{
// verify fail
// TODO: remove session?
llarp::LogWarn("session start verify failed from ", self->addr);
self->serv->RemoveSession(self);
return;
self->serv->remove_intro_from(self->addr);
}
else
{
self->serv->pending_session_active(self->addr);
self->send_LIM();
}
self->serv->remove_intro_from(self->addr);
self->send_LIM();
self->working = false;
}
static void
handle_introack_generated(iwp_async_introack *i)
{
llarp_link_session *link = static_cast< llarp_link_session * >(i->user);
if(i->buf && link->serv->has_intro_from(link->addr))
link->working = false;
if(i->buf)
{
// track it with the server here
if(link->serv->has_session_via(link->addr))
{
// duplicate session
llarp::LogWarn("duplicate session to ", link->addr);
link->working = false;
link->serv->remove_intro_from(link->addr);
return;
}
link->frame.alive();
link->EnterState(llarp_link_session::State::eIntroAckSent);
link->serv->put_session(link->addr, link);
llarp::LogDebug("send introack to ", link->addr, " via ", link->serv->addr);
llarp_ev_udp_sendto(link->udp, link->addr, i->buf, i->sz);
}
@ -483,7 +477,6 @@ handle_introack_generated(iwp_async_introack *i)
// failed to generate?
llarp::LogWarn("failed to generate introack");
}
link->working = false;
}
static void
@ -493,12 +486,8 @@ handle_generated_intro(iwp_async_intro *i)
link->working = false;
if(i->buf)
{
llarp::LogInfo("send intro to ", link->addr);
if(llarp_ev_udp_sendto(link->udp, link->addr, i->buf, i->sz) == -1)
{
llarp::LogWarn("send intro failed");
return;
}
llarp_ev_udp_sendto(link->udp, link->addr, i->buf, i->sz);
delete[] i->buf;
link->EnterState(llarp_link_session::eIntroSent);
link->lastIntroSentAt = llarp_time_now_ms();
auto dlt = (link->createdAt - link->lastIntroSentAt) + 500;
@ -509,7 +498,9 @@ handle_generated_intro(iwp_async_intro *i)
else
{
llarp::LogWarn("failed to generate intro");
link->serv->remove_intro_from(link->addr);
}
delete i;
}
void
@ -531,26 +522,26 @@ llarp_link_session::introduce(uint8_t *pub)
llarp::LogDebug("session introduce");
if(pub)
memcpy(remote, pub, PUBKEYSIZE);
intro.buf = workbuf;
auto intro = new iwp_async_intro;
intro->buf = new byte_t[1500];
size_t w0sz = (llarp_randint() % MAX_PAD);
intro.sz = (32 * 3) + w0sz;
intro->sz = (32 * 3) + w0sz;
// randomize w0
if(w0sz)
{
crypto->randbytes(intro.buf + (32 * 3), w0sz);
crypto->randbytes(intro->buf + (32 * 3), w0sz);
}
intro.nonce = intro.buf + 32;
intro.secretkey = eph_seckey;
memcpy(intro->secretkey, eph_seckey, 64);
// copy in pubkey
intro.remote_pubkey = remote;
memcpy(intro->remote_pubkey, remote, 32);
// randomize nonce
crypto->randbytes(intro.nonce, 32);
crypto->randbytes(intro->nonce, 32);
// async generate intro packet
intro.user = this;
intro.hook = &handle_generated_intro;
working = true;
iwp_call_async_gen_intro(iwp, &intro);
intro->user = this;
intro->hook = &handle_generated_intro;
working = true;
iwp_call_async_gen_intro(iwp, intro);
// start introduce timer
if(pub)
establish_job_id = llarp_logic_call_later(
@ -627,6 +618,8 @@ llarp_link_session::on_session_start(const void *buf, size_t sz)
if(sz > sizeof(workbuf))
{
llarp::LogDebug("session start too big");
working = false;
serv->remove_intro_from(addr);
return;
}
if(working)
@ -655,7 +648,9 @@ llarp_link_session::intro_ack()
{
if(serv->has_session_via(addr))
{
working = false;
llarp::LogWarn("won't ack intro for duplicate session from ", addr);
serv->remove_intro_from(addr);
return;
}
llarp::LogDebug("session introack");
@ -772,9 +767,8 @@ void
llarp_link_session::pump()
{
bool flush = false;
frame.sendqueue.Process([&](sendbuf_t *msg) {
llarp_buffer_t buf = msg->Buffer();
encrypt_frame_async_send(buf.base, buf.sz);
frame.sendqueue.Process([&, this](sendbuf_t *msg) {
encrypt_frame_async_send(msg->data(), msg->size());
flush = true;
});
if(flush)

@ -37,6 +37,7 @@ void
llarp_logic_tick_async(struct llarp_logic* logic)
{
llarp_timer_tick_all_async(logic->timer, logic->thread);
llarp_threadpool_tick(logic->thread);
}
void

@ -174,7 +174,7 @@ namespace llarp
LRCMFrameDecrypt(Context* ctx, Decrypter* dec,
const LR_CommitMessage* commit)
: decrypter(dec), context(ctx), hop(new Hop)
: decrypter(dec), context(ctx), hop(new Hop())
{
for(const auto& f : commit->frames)
frames.push_back(f);

@ -71,8 +71,10 @@ llarp_router::PersistSessionUntil(const llarp::RouterID &remote,
bool
llarp_router::SendToOrQueue(const llarp::RouterID &remote,
const llarp::ILinkMessage *msg)
const llarp::ILinkMessage *m)
{
std::unique_ptr< const llarp::ILinkMessage > msg =
std::unique_ptr< const llarp::ILinkMessage >(m);
llarp_link *chosen = nullptr;
if(!outboundLink->has_session_to(remote))
{
@ -91,16 +93,15 @@ llarp_router::SendToOrQueue(const llarp::RouterID &remote,
if(chosen)
{
SendTo(remote, msg, chosen);
delete msg;
return true;
}
// this will create an entry in the obmq if it's not already there
auto itr = outboundMesssageQueue.find(remote);
if(itr == outboundMesssageQueue.end())
auto itr = outboundMessageQueue.find(remote);
if(itr == outboundMessageQueue.end())
{
outboundMesssageQueue.insert(std::make_pair(remote, MessageQueue()));
outboundMessageQueue.insert(std::make_pair(remote, MessageQueue()));
}
outboundMesssageQueue[remote].push(msg);
outboundMessageQueue[remote].push(std::move(msg));
// we don't have an open session to that router right now
auto rc = llarp_nodedb_get_rc(nodedb, remote);
@ -111,68 +112,33 @@ llarp_router::SendToOrQueue(const llarp::RouterID &remote,
return true;
}
// this would never be true, as everything is in memory
// but we'll keep around if we ever need to swap them out of memory
// but it's best to keep the paradigm that everythign is in memory at this
// point in development as it will reduce complexity
/*
// try requesting the rc from the disk
llarp_async_load_rc *job = new llarp_async_load_rc;
job->diskworker = disk;
job->nodedb = nodedb;
job->logic = logic;
job->user = this;
job->hook = &HandleAsyncLoadRCForSendTo;
memcpy(job->pubkey, remote, PUBKEYSIZE);
llarp_nodedb_async_load_rc(job);
*/
// we don't have the RC locally so do a dht lookup
llarp_router_lookup_job *lookup = new llarp_router_lookup_job;
llarp_router_lookup_job *lookup = new llarp_router_lookup_job();
lookup->user = this;
lookup->iterative = false;
llarp_rc_clear(&lookup->result);
memcpy(lookup->target, remote, PUBKEYSIZE);
lookup->hook = &HandleDHTLookupForSendTo;
llarp_dht_lookup_router(this->dht, lookup);
lookup->hook = &llarp_router::HandleDHTLookupForSendTo;
lookup->user = this;
llarp_dht_lookup_router(dht, lookup);
return true;
}
/*
void
llarp_router::HandleAsyncLoadRCForSendTo(llarp_async_load_rc *job)
{
llarp_router *router = static_cast< llarp_router * >(job->user);
if(job->loaded)
{
llarp_router_try_connect(router, &job->rc, 10);
}
else
{
// we don't have the RC locally so do a dht lookup
llarp_router_lookup_job *lookup = new llarp_router_lookup_job;
lookup->user = router;
memcpy(lookup->target, job->pubkey, PUBKEYSIZE);
lookup->hook = &HandleDHTLookupForSendTo;
llarp_dht_lookup_router(router->dht, lookup);
}
delete job;
}
*/
void
llarp_router::HandleDHTLookupForSendTo(llarp_router_lookup_job *job)
{
llarp_router *self = static_cast< llarp_router * >(job->user);
if(job->found)
{
llarp_nodedb_put_rc(self->nodedb, &job->result);
llarp_router_try_connect(self, &job->result, 10);
llarp_rc_free(&job->result);
}
else
{
self->DiscardOutboundFor(job->target);
}
llarp_rc_free(&job->result);
delete job;
}
@ -385,13 +351,13 @@ llarp_router::TryEstablishTo(const llarp::RouterID &remote)
else
{
// dht lookup as we don't know it
llarp_router_lookup_job *lookup = new llarp_router_lookup_job();
lookup->user = this;
llarp_router_lookup_job *lookup = new llarp_router_lookup_job;
llarp_rc_clear(&lookup->result);
memcpy(lookup->target, remote, PUBKEYSIZE);
lookup->hook = &HandleDHTLookupForTryEstablishTo;
lookup->hook = &llarp_router::HandleDHTLookupForTryEstablishTo;
lookup->iterative = false;
llarp_dht_lookup_router(this->dht, lookup);
lookup->user = this;
llarp_dht_lookup_router(dht, lookup);
}
}
@ -400,10 +366,11 @@ llarp_router::HandleDHTLookupForTryEstablishTo(llarp_router_lookup_job *job)
{
if(job->found)
{
llarp_router_try_connect(static_cast< llarp_router * >(job->user),
&job->result, 5);
llarp_router *self = static_cast< llarp_router * >(job->user);
llarp_nodedb_put_rc(self->nodedb, &job->result);
llarp_router_try_connect(self, &job->result, 5);
llarp_rc_free(&job->result);
}
llarp_rc_free(&job->result);
delete job;
}
@ -472,7 +439,8 @@ llarp_router::Tick()
}
void
llarp_router::SendTo(llarp::RouterID remote, const llarp::ILinkMessage *msg,
llarp_router::SendTo(llarp::RouterID remote,
std::unique_ptr< const llarp::ILinkMessage > &msg,
llarp_link *link)
{
llarp_buffer_t buf =
@ -543,8 +511,8 @@ llarp_router::FlushOutboundFor(const llarp::RouterID &remote,
llarp_link *chosen)
{
llarp::LogDebug("Flush outbound for ", remote);
auto itr = outboundMesssageQueue.find(remote);
if(itr == outboundMesssageQueue.end())
auto itr = outboundMessageQueue.find(remote);
if(itr == outboundMessageQueue.end())
{
return;
}
@ -557,13 +525,12 @@ llarp_router::FlushOutboundFor(const llarp::RouterID &remote,
{
auto buf = llarp::StackBuffer< decltype(linkmsg_buffer) >(linkmsg_buffer);
auto &msg = itr->second.front();
const auto &msg = itr->second.front();
if(!msg->BEncode(&buf))
{
llarp::LogWarn("failed to encode outbound message, buffer size left: ",
llarp_buffer_size_left(buf));
delete msg;
itr->second.pop();
continue;
}
@ -574,7 +541,6 @@ llarp_router::FlushOutboundFor(const llarp::RouterID &remote,
llarp::LogWarn("failed to send outboud message to ", remote, " via ",
chosen->name());
delete msg;
itr->second.pop();
}
}
@ -615,13 +581,7 @@ llarp_router::on_try_connect_result(llarp_link_establish_job *job)
void
llarp_router::DiscardOutboundFor(const llarp::RouterID &remote)
{
auto &queue = outboundMesssageQueue[remote];
while(queue.size())
{
delete queue.front();
queue.pop();
}
outboundMesssageQueue.erase(remote);
outboundMessageQueue.erase(remote);
}
bool
@ -643,7 +603,7 @@ void
llarp_router::async_verify_RC(llarp_rc *rc, bool isExpectingClient,
llarp_link_establish_job *establish_job)
{
llarp_async_verify_rc *job = new llarp_async_verify_rc;
llarp_async_verify_rc *job = new llarp_async_verify_rc();
job->user = new llarp::async_verify_context{this, establish_job};
job->rc = {};
job->valid = false;

@ -100,10 +100,11 @@ struct llarp_router
llarp_link *outboundLink = nullptr;
std::list< llarp_link * > inboundLinks;
typedef std::queue< const llarp::ILinkMessage * > MessageQueue;
typedef std::queue< std::unique_ptr< const llarp::ILinkMessage > >
MessageQueue;
/// outbound message queue
std::map< llarp::RouterID, MessageQueue > outboundMesssageQueue;
std::map< llarp::RouterID, MessageQueue > outboundMessageQueue;
/// loki verified routers
std::map< llarp::RouterID, llarp_rc > validRouters;
@ -182,7 +183,8 @@ struct llarp_router
/// sendto or drop
void
SendTo(llarp::RouterID remote, const llarp::ILinkMessage *msg,
SendTo(llarp::RouterID remote,
std::unique_ptr< const llarp::ILinkMessage > &msg,
llarp_link *chosen = nullptr);
/// manually flush outbound message queue for just 1 router

@ -530,9 +530,7 @@ namespace llarp
BuildRequestMessage()
{
llarp::routing::DHTMessage* msg = new llarp::routing::DHTMessage();
auto FIM = new llarp::dht::FindIntroMessage(txid, remote);
FIM->R = 5;
msg->M.push_back(FIM);
msg->M.push_back(new llarp::dht::FindIntroMessage(txid, remote, 5));
llarp::LogInfo("build request for ", remote);
return msg;
}

@ -49,7 +49,7 @@ namespace llarp
}
for(;;)
{
llarp_thread_job *job;
llarp_thread_job job;
{
lock_t lock(this->queue_mutex);
this->condition.WaitUntil(
@ -64,13 +64,13 @@ namespace llarp
}
return;
}
job = this->jobs.top().job;
job.user = this->jobs.top().job->user;
job.work = this->jobs.top().job->work;
delete this->jobs.top().job;
this->jobs.pop();
}
// do work
job->work(job->user);
delete job;
job.work(job.user);
}
});
}

Loading…
Cancel
Save