* kill dht feedback loop

* add dht exploration for discovering new routers

* tweak loopback testnet paramters to not be initially fully connected
pull/15/head
Jeff Becker 6 years ago
parent 1e233fe5ad
commit 199dad09dd
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -47,12 +47,13 @@ def main():
'dir': 'netdb'
}
config['connect'] = {}
for otherid in range(args.svc):
if otherid != nodeid:
name = svcNodeName(otherid)
config['connect'][name] = os.path.join(
basedir, name, 'rc.signed')
for otherid in range(args.connect):
otherid = (nodeid + otherid) % args.svc
name = svcNodeName(otherid)
config['connect'][name] = os.path.join(
basedir, name, 'rc.signed')
d = os.path.join(args.dir, svcNodeName(nodeid))
if not os.path.exists(d):
os.mkdir(d)

@ -106,6 +106,12 @@ sent in reply to FRCM only
V: 0
}
* send a GRCM with R to requesters in all linked transactions
* terminate transaction with id T
in response to an exploritory FRCM if the target router is not found the form
{
A: "S",
N: [list, of, router, publickeys, near, target],
R: [],
T: transaction_id_uint64,
V: 0
}

@ -50,6 +50,24 @@ namespace llarp
return nodes.size() > 0;
}
bool
GetManyNearExcluding(const Key_t& target, std::set< Key_t >& result,
size_t N, const std::set< Key_t >& exclude) const
{
std::set< Key_t > s;
for(const auto& k : exclude)
s.insert(k);
Key_t peer;
while(N--)
{
if(!FindCloseExcluding(target, peer, s))
return false;
s.insert(peer);
result.insert(peer);
}
return true;
}
bool
FindCloseExcluding(const Key_t& target, Key_t& result,
std::set< Key_t > exclude) const
@ -91,4 +109,4 @@ namespace llarp
};
} // namespace dht
} // namespace llarp
#endif
#endif

@ -51,7 +51,6 @@ namespace llarp
uint64_t whoaskedTX, const Key_t& askpeer,
const std::set< service::IntroSet >& include = {},
uint64_t R = 0);
void
LookupRouterViaJob(llarp_router_lookup_job* job);
@ -98,6 +97,11 @@ namespace llarp
void
DHTSendTo(const Key_t& peer, IMessage* msg);
bool
LookupRouterExploritory(const Key_t& requester, uint64_t txid,
const RouterID& target,
std::vector< IMessage* >& reply);
void
LookupIntroSetRelayed(const Key_t& requester, uint64_t txid,
const service::Address& addr, bool recursive,
@ -121,7 +125,7 @@ namespace llarp
uint64_t S, const std::set< Key_t >& exclude);
void
Init(const Key_t& us, llarp_router* router);
Init(const Key_t& us, llarp_router* router, llarp_time_t exploreInterval);
const llarp::service::IntroSet*
GetIntroSetByServiceAddress(const llarp::service::Address& addr) const;
@ -132,6 +136,13 @@ namespace llarp
static void
handle_cleaner_timer(void* user, uint64_t orig, uint64_t left);
static void
handle_explore_timer(void* user, uint64_t orig, uint64_t left);
/// explore dht for new routers
void
Explore();
static void
queue_router_lookup(void* user);
@ -153,6 +164,9 @@ namespace llarp
void
ScheduleCleanupTimer();
void
HandleExploreResult(const std::vector< RouterID >& result);
void
CleanupTX();
@ -199,4 +213,4 @@ struct llarp_dht_context
llarp_dht_context(llarp_router* router);
};
#endif
#endif

@ -17,6 +17,13 @@ namespace llarp
{
}
// exploritory
FindRouterMessage(const Key_t& from, uint64_t id)
: IMessage(from), exploritory(true), txid(id)
{
K.Randomize();
}
~FindRouterMessage();
bool
@ -31,6 +38,7 @@ namespace llarp
Key_t K;
bool iterative = false;
bool exploritory = false;
uint64_t txid = 0;
uint64_t version = 0;
};
@ -51,4 +59,4 @@ namespace llarp
};
} // namespace dht
} // namespace llarp
#endif
#endif

@ -24,6 +24,12 @@ namespace llarp
}
}
GotRouterMessage(uint64_t id, const std::vector< RouterID >& near,
bool tunneled)
: IMessage({}), N(near), txid(id), relayed(tunneled)
{
}
~GotRouterMessage();
bool
@ -37,10 +43,11 @@ namespace llarp
std::vector< IMessage* >& replies) const;
std::vector< llarp_rc > R;
std::vector< RouterID > N;
uint64_t txid = 0;
uint64_t version = 0;
bool relayed = false;
};
} // namespace dht
} // namespace llarp
#endif
#endif

@ -22,8 +22,13 @@ namespace llarp
const std::vector< llarp::service::IntroSet >&) >
IntroSetHookFunc;
typedef std::function< void(const std::vector< RouterID >&) >
FoundNearFunc;
typedef std::function< void(void) > DoneFunc;
SearchJob();
/// for routers
SearchJob(const Key_t& requester, uint64_t requesterTX,
const Key_t& target, const std::set< Key_t >& excludes,
@ -36,6 +41,9 @@ namespace llarp
SearchJob(const Key_t& requester, uint64_t requseterTX,
IntroSetHookFunc found, DoneFunc done);
// for network exploration
SearchJob(FoundNearFunc near, DoneFunc done);
void
FoundRouter(const llarp_rc* router) const;
@ -52,6 +60,8 @@ namespace llarp
// only set if looking up router
llarp_router_lookup_job* job = nullptr;
IntroSetHookFunc foundIntroHook;
// hook for exploritory router lookups
FoundNearFunc foundNear;
DoneFunc onDone;
llarp_time_t started;
Key_t requester;
@ -61,4 +71,4 @@ namespace llarp
};
} // namespace dht
} // namespace llarp
#endif
#endif

@ -75,6 +75,9 @@ struct frame_state
bool
flags_agree(byte_t flags) const;
bool
either_has_flag(byte_t flag) const;
bool
process_inbound_queue();

@ -218,7 +218,7 @@ namespace llarp
{
in_addr_t addr = this->addr4()->s_addr;
unsigned byte = ntohl(addr);
unsigned byte1 = byte >> 24 & 0xff;
unsigned byte1 = (byte >> 24) & 0xff;
unsigned byte2 = (0x00ff0000 & byte) >> 16;
return (byte1 == 10 || (byte1 == 192 && byte2 == 168)
|| (byte1 == 172 && (byte2 >= 16 || byte2 <= 31)));
@ -239,7 +239,7 @@ namespace llarp
{
return a.port() ^ a.addr4()->s_addr;
}
uint8_t empty[16] = {0};
static const uint8_t empty[16] = {0};
return (a.af() + memcmp(a.addr6(), empty, 16)) ^ a.port();
}
};

@ -45,7 +45,7 @@ llarp_dht_allow_transit(llarp_dht_context *ctx)
void
llarp_dht_context_start(struct llarp_dht_context *ctx, const byte_t *key)
{
ctx->impl.Init(key, ctx->parent);
ctx->impl.Init(key, ctx->parent, 20000);
}
void

@ -22,13 +22,70 @@ namespace llarp
delete services;
}
void
Context::HandleExploreResult(const std::vector< RouterID > &result)
{
llarp::LogInfo("got ", result.size(), " routers from exploration");
for(const auto &pk : result)
{
if(llarp_nodedb_get_rc(router->nodedb, pk) == nullptr)
{
// try connecting to it we don't know it
// this triggers a dht lookup
router->TryEstablishTo(pk);
}
}
}
void
Context::Explore()
{
// ask N random peers for new routers
llarp::LogInfo("Exploring network");
std::set< Key_t > peers;
Key_t peer;
size_t N = 5;
while(N--)
{
if(nodes->GetRandomNodeExcluding(peer, peers))
{
peers.insert(peer);
uint64_t txid = ++ids;
TXOwner ownerKey;
ownerKey.node = peer;
ownerKey.txid = txid;
pendingTX.insert(
std::make_pair(ownerKey,
SearchJob(std::bind(&Context::HandleExploreResult,
this, std::placeholders::_1),
[]() {})));
DHTSendTo(peer, new FindRouterMessage(ourKey, txid));
}
else
llarp::LogError("failed to select random nodes for exploration");
}
}
void
Context::handle_explore_timer(void *u, uint64_t orig, uint64_t left)
{
if(left)
return;
Context *ctx = static_cast< Context * >(u);
ctx->Explore();
llarp_logic_call_later(ctx->router->logic,
{orig, ctx, &handle_explore_timer});
}
void
Context::handle_cleaner_timer(void *u, uint64_t orig, uint64_t left)
{
if(left)
return;
Context *ctx = static_cast< Context * >(u);
// clean up transactions
ctx->CleanupTX();
if(ctx->services)
{
// expire intro sets
@ -198,9 +255,8 @@ namespace llarp
PathLookupJob *j = new PathLookupJob(router, path, txid);
j->target = addr;
j->R = 5;
j->asked.emplace(askpeer);
Key_t us = OurKey();
j->asked.emplace(us);
j->asked.insert(askpeer);
j->asked.insert(OurKey());
SearchJob job(
OurKey(), txid,
std::bind(&PathLookupJob::OnResult, j, std::placeholders::_1),
@ -290,10 +346,13 @@ namespace llarp
else
{
// yeah, ask neighboor recursively
// FIXME: we may need to pass a job here...
// auto sj = FindPendingTX(requester, txid);
// LookupRouter(target, requester, txid, next, sj->job);
LookupRouter(target, requester, txid, next);
// don't request with a new lookup if a pending job exists as this
// causes a dht feedback loop
auto pending = FindPendingTX(requester, txid);
if(pending)
LookupRouter(target, requester, txid, next, pending->job);
else
LookupRouter(target, requester, txid, next);
}
}
else // otherwise tell them we don't have it
@ -371,13 +430,18 @@ namespace llarp
}
void
Context::Init(const Key_t &us, llarp_router *r)
Context::Init(const Key_t &us, llarp_router *r,
llarp_time_t exploreInterval)
{
router = r;
ourKey = us;
nodes = new Bucket< RCNode >(ourKey);
services = new Bucket< ISNode >(ourKey);
llarp::LogDebug("intialize dht with key ", ourKey);
// start exploring
llarp_logic_call_later(
r->logic,
{exploreInterval, this, &llarp::dht::Context::handle_explore_timer});
}
void
@ -513,8 +577,8 @@ namespace llarp
ownerKey.txid = id;
IntroSetInformJob *j = new IntroSetInformJob(router, whoasked, txid);
j->target = addr;
for(const auto &item : excludes)
j->asked.emplace(item);
for(const auto item : excludes)
j->asked.insert(item);
j->R = R;
SearchJob job(
whoasked, txid, addr.ToKey(), {},
@ -530,6 +594,27 @@ namespace llarp
router->dht->impl.DHTSendTo(askpeer, dhtmsg);
}
bool
Context::LookupRouterExploritory(const Key_t &requester, uint64_t txid,
const RouterID &target,
std::vector< IMessage * > &reply)
{
std::vector< RouterID > closer;
Key_t t(target.data());
std::set< Key_t > found;
if(!nodes->GetManyNearExcluding(t, found, 2,
std::set< Key_t >{ourKey, requester}))
{
llarp::LogError(
"not enough dht nodes to handle exploritory router lookup");
return false;
}
for(const auto &f : found)
closer.push_back(f);
reply.push_back(new GotRouterMessage(txid, closer, false));
return true;
}
void
Context::LookupRouter(const Key_t &target, const Key_t &whoasked,
uint64_t txid, const Key_t &askpeer,

@ -111,6 +111,12 @@ namespace llarp
if(!bencode_write_bytestring(buf, "R", 1))
return false;
// exploritory or not?
if(!bencode_write_bytestring(buf, "E", 1))
return false;
if(!bencode_write_uint64(buf, exploritory ? 1 : 0))
return false;
// iterative or not?
if(!bencode_write_bytestring(buf, "I", 1))
return false;
@ -143,6 +149,16 @@ namespace llarp
{
llarp_buffer_t strbuf;
if(llarp_buffer_eq(key, "E"))
{
uint64_t result;
if(!bencode_read_integer(val, &result))
return false;
exploritory = result != 0;
return true;
}
if(llarp_buffer_eq(key, "I"))
{
uint64_t result;
@ -190,8 +206,11 @@ namespace llarp
llarp::LogWarn("Got duplicate DHT lookup from ", From, " txid=", txid);
return false;
}
dht.LookupRouterRelayed(From, txid, K, !iterative, replies);
if(exploritory)
return dht.LookupRouterExploritory(From, txid, K, replies);
else
dht.LookupRouterRelayed(From, txid, K, !iterative, replies);
return true;
}
} // namespace dht
} // namespace llarp
} // namespace llarp

@ -24,6 +24,13 @@ namespace llarp
if(!BEncodeWriteDictMsgType(buf, "A", "S"))
return false;
// near
if(N.size())
{
if(!BEncodeWriteDictList("N", N, buf))
return false;
}
if(!BEncodeWriteDictList("R", R, buf))
return false;
@ -41,6 +48,10 @@ namespace llarp
bool
GotRouterMessage::DecodeKey(llarp_buffer_t key, llarp_buffer_t *val)
{
if(llarp_buffer_eq(key, "N"))
{
return BEncodeReadList(N, val);
}
if(llarp_buffer_eq(key, "R"))
{
return BEncodeReadList(R, val);
@ -82,7 +93,7 @@ namespace llarp
pending->target, pending->requesterTX, &R[0], false));
}
}
else
else if(N.empty())
{
// iterate to next closest peer
Key_t nextPeer;
@ -113,6 +124,11 @@ namespace llarp
}
}
}
else if(pending->foundNear)
{
// near peers provided
pending->foundNear(N);
}
dht.RemovePendingTX(From, txid);
return true;
}

@ -46,13 +46,21 @@ namespace llarp
target.Zero();
}
SearchJob::SearchJob(FoundNearFunc near, DoneFunc done)
: foundNear(near), onDone(done)
{
target.Randomize();
started = llarp_time_now_ms();
}
bool
SearchJob::FoundIntros(
const std::vector< llarp::service::IntroSet > &introsets) const
{
if(foundIntroHook && foundIntroHook(introsets))
{
onDone();
if(onDone)
onDone();
return true;
}
return foundIntroHook == nullptr;
@ -92,4 +100,4 @@ namespace llarp
}
}
} // namespace dht
} // namespace llarp
} // namespace llarp

@ -49,6 +49,12 @@ frame_state::flags_agree(byte_t flags) const
return ((rxflags & flags) & (txflags & flags)) == flags;
}
bool
frame_state::either_has_flag(byte_t flag) const
{
return (rxflags & flag) == flag || (txflags & flag) == flag;
}
void
frame_state::clear()
{

@ -19,15 +19,6 @@ handle_crypto_outbound(void *u)
self->working = false;
}
// TODO: move this orphan function?
static void
handle_frame_encrypt(iwp_async_frame *frame)
{
llarp_link_session *self = static_cast< llarp_link_session * >(frame->user);
if(llarp_ev_udp_sendto(self->udp, self->addr, frame->buf, frame->sz) == -1)
llarp::LogWarn("sendto failed");
}
llarp_link_session::llarp_link_session(llarp_link *l, const byte_t *seckey,
const llarp::Addr &a)
: udp(&l->udp)
@ -98,11 +89,8 @@ static void
send_keepalive(void *user)
{
llarp_link_session *self = static_cast< llarp_link_session * >(user);
// if both sides agree on invalidation
if(self->is_invalidated())
{
// don't send keepalive
llarp::LogInfo("session cant send keepalive because were invalid");
return;
}
// all zeros means keepalive
@ -424,8 +412,6 @@ llarp_link_session::Tick(llarp_time_t now)
// both sides agreeed to session invalidation
// terminate our session when all of our frames from the crypto workers
// are done
llarp::LogWarn("Tick - ", addr, " invaldiated session with ", frames,
" frames left");
return !working;
}
if(state == eLIMSent || state == eEstablished)
@ -439,15 +425,18 @@ llarp_link_session::Tick(llarp_time_t now)
void
llarp_link_session::keepalive()
{
llarp_logic_queue_job(serv->logic, {this, &send_keepalive});
send_keepalive(this);
}
void
llarp_link_session::EncryptOutboundFrames()
{
outboundFrames.Process([&](iwp_async_frame *frame) {
llarp_link_session *self = this;
outboundFrames.Process([self](iwp_async_frame *frame) {
if(iwp_encrypt_frame(frame))
handle_frame_encrypt(frame);
if(llarp_ev_udp_sendto(self->udp, self->addr, frame->buf, frame->sz)
== -1)
llarp::LogError("sendto ", self->addr, " failed");
});
}

@ -528,7 +528,7 @@ SetThreadName(DWORD dwThreadID, LPCSTR szThreadName)
/* current win10 flights now have a new named-thread API, let's try to use
* that first! */
/* first, dlsym(2) the new call from system library */
hThread = NULL;
hThread = NULL;
_SetThreadDescription = (p_SetThreadDescription)GetProcAddress(
GetModuleHandle("kernel32"), "SetThreadDescription");
if(_SetThreadDescription)

Loading…
Cancel
Save