exponential backoff

pull/1/head
Jeff Becker 6 years ago
parent 11f9c3532b
commit 0d85577fac
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -60,6 +60,12 @@ llarp_nodedb_iterate_all(struct llarp_nodedb *n, struct llarp_nodedb_iter i);
bool
llarp_nodedb_put_rc(struct llarp_nodedb *n, struct llarp_rc *rc);
/**
return a pointer to an already loaded RC or nullptr if it's not there
*/
struct llarp_rc *
llarp_nodedb_get_rc(struct llarp_nodedb *n, const byte_t *pk);
/**
struct for async rc verification
*/

@ -20,8 +20,8 @@ struct llarp_nodedb
}
llarp_crypto *crypto;
// std::map< llarp::pubkey, llarp_rc * > entries;
std::unordered_map< llarp::PubKey, llarp_rc *, llarp::PubKeyHash > entries;
// std::map< llarp::pubkey, llarp_rc > entries;
std::unordered_map< llarp::PubKey, llarp_rc, llarp::PubKeyHash > entries;
fs::path nodePath;
void
@ -30,15 +30,21 @@ struct llarp_nodedb
auto itr = entries.begin();
while(itr != entries.end())
{
delete itr->second;
llarp_rc_clear(&itr->second);
itr = entries.erase(itr);
}
}
llarp_rc *
getRC(llarp::PubKey pk)
getRC(const llarp::PubKey &pk)
{
return entries[pk];
return &entries[pk];
}
bool
Has(const llarp::PubKey &pk)
{
return entries.find(pk) != entries.end();
}
bool
@ -65,16 +71,14 @@ struct llarp_nodedb
// serialize both and memcmp
byte_t nodetmp[MAX_RC_SIZE];
auto nodebuf = llarp::StackBuffer< decltype(nodetmp) >(nodetmp);
if(llarp_rc_bencode(entries[pk], &nodebuf))
if(llarp_rc_bencode(&entries[pk], &nodebuf))
{
byte_t paramtmp[MAX_RC_SIZE];
auto parambuf = llarp::StackBuffer< decltype(paramtmp) >(paramtmp);
if(llarp_rc_bencode(rc, &parambuf))
{
if(memcmp(&parambuf, &nodebuf, MAX_RC_SIZE) == 0)
{
return true;
}
if(nodebuf.sz == parambuf.sz)
return memcmp(&parambuf, &nodebuf, parambuf.sz) == 0;
}
}
return false;
@ -105,13 +109,16 @@ struct llarp_nodedb
// extract pk from rc
llarp::PubKey pk = rc->pubkey;
// set local db
entries[pk] = rc;
// set local db entry to have a copy we own
llarp_rc entry;
llarp::Zero(&entry, sizeof(entry));
llarp_rc_copy(&entry, rc);
entries[pk] = entry;
if(llarp_rc_bencode(rc, &buf))
if(llarp_rc_bencode(&entry, &buf))
{
buf.sz = buf.cur - buf.base;
auto filepath = getRCFilePath(rc->pubkey);
auto filepath = getRCFilePath(pk);
llarp::Debug("saving RC.pubkey ", filepath);
std::ofstream ofs(
filepath,
@ -185,19 +192,18 @@ struct llarp_nodedb
f.read((char *)buf.base, sz);
buf.sz = sz;
llarp_rc *rc = new llarp_rc;
llarp::Zero(rc, sizeof(llarp_rc));
if(llarp_rc_bdecode(rc, &buf))
llarp_rc rc;
llarp::Zero(&rc, sizeof(llarp_rc));
if(llarp_rc_bdecode(&rc, &buf))
{
if(llarp_rc_verify_sig(crypto, rc))
if(llarp_rc_verify_sig(crypto, &rc))
{
llarp::PubKey pk(rc->pubkey);
llarp::PubKey pk(rc.pubkey);
entries[pk] = rc;
return true;
}
}
llarp_rc_free(rc);
delete rc;
llarp_rc_free(&rc);
return false;
}
@ -355,4 +361,13 @@ llarp_nodedb_async_load_rc(struct llarp_async_load_rc *job)
llarp_threadpool_queue_job(job->diskworker, {job, &nodedb_async_load_rc});
}
struct llarp_rc *
llarp_nodedb_get_rc(struct llarp_nodedb *n, const byte_t *pk)
{
if(n->Has(pk))
return n->getRC(pk);
else
return nullptr;
}
} // end extern

@ -79,6 +79,13 @@ llarp_router::SendToOrQueue(const llarp::RouterID &remote,
if(!chosen)
{
// we don't have an open session to that router right now
auto rc = llarp_nodedb_get_rc(nodedb, remote);
if(rc)
{
// try connecting directly as the rc is loaded from disk
llarp_router_try_connect(this, rc, 10);
return true;
}
// try requesting the rc from the disk
llarp_async_load_rc *job = new llarp_async_load_rc;
job->diskworker = disk;
@ -268,8 +275,10 @@ llarp_router::Close()
}
void
llarp_router::connect_job_retry(void *user)
llarp_router::connect_job_retry(void *user, uint64_t orig, uint64_t left)
{
if(left)
return;
llarp_link_establish_job *job =
static_cast< llarp_link_establish_job * >(user);
@ -283,7 +292,9 @@ llarp_router::on_verify_client_rc(llarp_async_verify_rc *job)
{
llarp::async_verify_context *ctx =
static_cast< llarp::async_verify_context * >(job->user);
llarp::PubKey pk = job->rc.pubkey;
llarp_rc_free(&job->rc);
ctx->router->pendingEstablishJobs.erase(pk);
delete ctx;
}
@ -294,6 +305,7 @@ llarp_router::on_verify_server_rc(llarp_async_verify_rc *job)
static_cast< llarp::async_verify_context * >(job->user);
auto router = ctx->router;
llarp::Debug("rc verified? ", job->valid ? "valid" : "invalid");
llarp::PubKey pk(job->rc.pubkey);
if(!job->valid)
{
llarp::Warn("invalid server RC");
@ -303,17 +315,15 @@ llarp_router::on_verify_server_rc(llarp_async_verify_rc *job)
auto session = ctx->establish_job->session;
if(session)
session->close(session);
delete ctx->establish_job;
}
llarp_rc_free(&job->rc);
delete job;
router->pendingEstablishJobs.erase(pk);
router->DiscardOutboundFor(pk);
return;
}
llarp::Debug("rc verified");
llarp::PubKey pk(job->rc.pubkey);
// refresh valid routers RC value if it's there
auto v = router->validRouters.find(pk);
if(v != router->validRouters.end())
@ -333,9 +343,9 @@ llarp_router::on_verify_server_rc(llarp_async_verify_rc *job)
{
auto session = ctx->establish_job->session;
router->FlushOutboundFor(pk, session->get_parent(session));
// this frees the job
router->pendingEstablishJobs.erase(pk);
}
llarp_rc_free(&job->rc);
delete job;
}
void
@ -470,18 +480,23 @@ llarp_router::on_try_connect_result(llarp_link_establish_job *job)
router->async_verify_RC(session, false, job);
return;
}
llarp::PubKey pk = job->pubkey;
if(job->retries > 0)
{
job->retries--;
llarp::Info("session not established");
llarp_logic_queue_job(router->logic,
{job, &llarp_router::connect_job_retry});
job->timeout *= 3;
job->timeout /= 2;
llarp::Info("session not established with ", pk, " relaxing timeout to ",
job->timeout);
// exponential backoff
llarp_logic_call_later(
router->logic, {job->timeout, job, &llarp_router::connect_job_retry});
}
else
{
llarp::PubKey pk = job->pubkey;
llarp::Warn("failed to connect to ", pk, " dropping all pending messages");
router->DiscardOutboundFor(pk);
router->pendingEstablishJobs.erase(pk);
}
}
@ -684,17 +699,27 @@ llarp_run_router(struct llarp_router *router, struct llarp_nodedb *nodedb)
router->Run();
}
bool
llarp_router::HasPendingConnectJob(const llarp::RouterID &remote)
{
return pendingEstablishJobs.find(remote) != pendingEstablishJobs.end();
}
bool
llarp_router_try_connect(struct llarp_router *router, struct llarp_rc *remote,
uint16_t numretries)
{
// do we already have a pending job for this remote?
if(router->HasPendingConnectJob(remote->pubkey))
return false;
// try first address only
llarp_ai addr;
if(llarp_ai_list_index(remote->addrs, 0, &addr))
{
auto link = router->outboundLink;
llarp_link_establish_job *job = new llarp_link_establish_job;
auto link = router->outboundLink;
auto itr = router->pendingEstablishJobs.emplace(
std::make_pair(remote->pubkey, llarp_link_establish_job{}));
auto job = &itr.first->second;
llarp_ai_copy(&job->ai, &addr);
memcpy(job->pubkey, remote->pubkey, PUBKEYSIZE);
job->retries = numretries;
@ -702,6 +727,7 @@ llarp_router_try_connect(struct llarp_router *router, struct llarp_rc *remote,
job->result = &llarp_router::on_try_connect_result;
// give router as user pointer
job->user = router;
// try establishing
link->try_establish(link, job);
return true;
}
@ -889,9 +915,6 @@ namespace llarp
af = AF_INET;
if(link->configure(link, self->netloop, key, af, proto))
{
llarp_ai ai;
link->get_our_address(link, &ai);
llarp::Addr addr = ai;
self->AddInboundLink(link);
return;
}

@ -83,6 +83,10 @@ struct llarp_router
/// loki verified routers
std::unordered_map< llarp::PubKey, llarp_rc, llarp::PubKeyHash > validRouters;
std::unordered_map< llarp::PubKey, llarp_link_establish_job,
llarp::PubKeyHash >
pendingEstablishJobs;
llarp_router();
~llarp_router();
@ -126,6 +130,9 @@ struct llarp_router
return llarp::seckey_topublic(identity);
}
bool
HasPendingConnectJob(const llarp::RouterID &remote);
void
try_connect(fs::path rcfile);
@ -176,7 +183,7 @@ struct llarp_router
on_try_connect_result(llarp_link_establish_job *job);
static void
connect_job_retry(void *user);
connect_job_retry(void *user, uint64_t orig, uint64_t left);
static void
on_verify_client_rc(llarp_async_verify_rc *context);

Loading…
Cancel
Save