persist sessions between routers

pull/14/head
Jeff Becker 6 years ago
parent 51029f0f2f
commit 4e3acd0277
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -159,9 +159,8 @@ namespace llarp
[](const std::vector< service::IntroSet > &) -> bool { return true; },
[]() {});
pendingTX[ownerKey] = job;
auto msg = new llarp::DHTImmeidateMessage(peer);
msg->msgs.push_back(new PublishIntroMessage(introset, id, S, E));
router->SendToOrQueue(peer, msg);
router->dht->impl.DHTSendTo(peer,
new PublishIntroMessage(introset, id, S, E));
}
void
@ -180,14 +179,12 @@ namespace llarp
[j]() { delete j; });
pendingTX[ownerKey] = job;
auto msg = new llarp::DHTImmeidateMessage(askpeer);
auto dhtmsg = new FindIntroMessage(tag, id);
dhtmsg->R = 5;
j->R = 5;
msg->msgs.push_back(dhtmsg);
llarp::LogInfo("asking ", askpeer, " for tag ", tag.ToString(), " with ",
j->localIntroSets.size(), " local tags txid=", txid);
router->SendToOrQueue(askpeer, msg);
router->dht->impl.DHTSendTo(askpeer, dhtmsg);
}
void
@ -210,13 +207,12 @@ namespace llarp
[j]() { delete j; });
pendingTX[ownerKey] = job;
auto msg = new llarp::DHTImmeidateMessage(askpeer);
auto dhtmsg = new FindIntroMessage(id, addr);
dhtmsg->R = 5;
msg->msgs.push_back(dhtmsg);
llarp::LogInfo("asking ", askpeer, " for ", addr.ToString(),
" with txid=", id);
router->SendToOrQueue(askpeer, msg);
router->dht->impl.DHTSendTo(askpeer, dhtmsg);
}
std::set< service::IntroSet >
@ -397,6 +393,9 @@ namespace llarp
auto m = new llarp::DHTImmeidateMessage(peer);
m->msgs.push_back(msg);
router->SendToOrQueue(peer, m);
// keep alive for 10 more seconds for response
auto now = llarp_time_now_ms();
router->PersistSessionUntil(peer, now + 10000);
}
bool
@ -432,9 +431,7 @@ namespace llarp
void
Exausted()
{
auto msg = new llarp::DHTImmeidateMessage(whoasked);
msg->msgs.push_back(new GotIntroMessage({}, txid));
m_Router->SendToOrQueue(whoasked, msg);
m_Router->dht->impl.DHTSendTo(whoasked, new GotIntroMessage({}, txid));
m_Router->dht->impl.RemovePendingTX(whoasked, txid);
}
@ -468,9 +465,8 @@ namespace llarp
reply.push_back(std::move(introset));
}
localIntroSets.clear();
auto msg = new llarp::DHTImmeidateMessage(whoasked);
msg->msgs.push_back(new GotIntroMessage(reply, txid));
m_Router->SendToOrQueue(whoasked, msg);
m_Router->dht->impl.DHTSendTo(whoasked,
new GotIntroMessage(reply, txid));
}
else if(!target.IsZero())
{
@ -504,11 +500,9 @@ namespace llarp
[j]() { delete j; });
pendingTX[ownerKey] = job;
auto msg = new llarp::DHTImmeidateMessage(askpeer);
auto dhtmsg = new FindIntroMessage(tag, id);
dhtmsg->R = R;
msg->msgs.push_back(dhtmsg);
router->SendToOrQueue(askpeer, msg);
router->dht->impl.DHTSendTo(askpeer, dhtmsg);
}
void
@ -534,13 +528,12 @@ namespace llarp
[j]() { delete j; });
pendingTX[ownerKey] = job;
auto msg = new llarp::DHTImmeidateMessage(askpeer);
auto dhtmsg = new FindIntroMessage(id, addr);
dhtmsg->R = R;
msg->msgs.push_back(dhtmsg);
llarp::LogInfo("asking ", askpeer, " for ", addr.ToString(),
" on request of ", whoasked);
router->SendToOrQueue(askpeer, msg);
router->dht->impl.DHTSendTo(askpeer, dhtmsg);
}
void
@ -563,11 +556,9 @@ namespace llarp
pendingTX[ownerKey] = j;
llarp::LogInfo("Asking ", askpeer, " for router ", target, " for ",
whoasked);
auto msg = new llarp::DHTImmeidateMessage(askpeer);
auto dhtmsg = new FindRouterMessage(askpeer, target, id);
dhtmsg->iterative = iterative;
msg->msgs.push_back(dhtmsg);
router->SendToOrQueue(askpeer, msg);
router->dht->impl.DHTSendTo(askpeer, dhtmsg);
}
void

@ -70,7 +70,11 @@ namespace llarp
}
if(reply->msgs.size())
{
return result && router->SendToOrQueue(remote.data(), reply);
if(result)
{
result = router->SendToOrQueue(remote.data(), reply);
}
return result;
}
else
{
@ -78,4 +82,4 @@ namespace llarp
return result;
}
}
} // namespace llarp
} // namespace llarp

@ -65,7 +65,7 @@ namespace llarp
std::deque< EncryptedFrame >& frames)
{
llarp::LogDebug("fowarding LRCM to ", nextHop);
LR_CommitMessage* msg = new LR_CommitMessage;
LR_CommitMessage* msg = new LR_CommitMessage();
while(frames.size())
{
msg->frames.push_back(frames.front());
@ -506,6 +506,10 @@ namespace llarp
m_BuiltHook(this);
m_BuiltHook = nullptr;
// persist session with upstream router until the path is done
r->PersistSessionUntil(Upstream(), intro.expiresAt);
// send path latency test
llarp::routing::PathLatencyMessage latency;
latency.T = llarp_randint();
m_LastLatencyTestID = latency.T;

@ -186,23 +186,31 @@ namespace llarp
delete decrypter;
}
/// this must be done from logic thread
/// this is done from logic thread
static void
SendLRCM(void* user)
{
LRCMFrameDecrypt* self = static_cast< LRCMFrameDecrypt* >(user);
// persist sessions to upstream and downstream routers until the commit
// ends
self->context->Router()->PersistSessionUntil(self->hop->info.downstream,
self->hop->ExpireTime());
self->context->Router()->PersistSessionUntil(self->hop->info.upstream,
self->hop->ExpireTime());
// forward to next hop
self->context->ForwardLRCM(self->hop->info.upstream, self->frames);
delete self;
}
// this is called from the logic thread
static void
SendPathConfirm(void* user)
{
LRCMFrameDecrypt* self = static_cast< LRCMFrameDecrypt* >(user);
// persist session to downstream until path expiration
self->context->Router()->PersistSessionUntil(self->hop->info.downstream,
self->hop->ExpireTime());
// send path confirmation
llarp::routing::PathConfirmMessage confirm(self->hop->lifetime);
if(!self->hop->SendRoutingMessage(&confirm, self->context->Router()))
{

Loading…
Cancel
Save