lokinet/llarp/service/endpoint.cpp

1236 lines
34 KiB
C++
Raw Normal View History

#include <service/endpoint.hpp>
2018-12-12 00:48:54 +00:00
#include <dht/messages/findintro.hpp>
2019-01-16 00:24:16 +00:00
#include <dht/messages/findrouter.hpp>
#include <dht/messages/gotintro.hpp>
#include <dht/messages/gotrouter.hpp>
#include <dht/messages/pubintro.hpp>
#include <messages/dht.hpp>
#include <messages/path_transfer.hpp>
#include <nodedb.hpp>
#include <profiling.hpp>
#include <router/abstractrouter.hpp>
#include <service/endpoint_util.hpp>
#include <service/hidden_service_address_lookup.hpp>
#include <service/outbound_context.hpp>
2018-12-12 02:15:08 +00:00
#include <service/protocol.hpp>
2019-01-11 01:19:36 +00:00
#include <util/logic.hpp>
#include <util/str.hpp>
#include <util/buffer.hpp>
2019-06-02 21:19:10 +00:00
#include <util/memfn.hpp>
#include <hook/shell.hpp>
namespace llarp
{
namespace service
{
Endpoint::Endpoint(const std::string& name, AbstractRouter* r,
Context* parent)
2019-04-05 14:58:22 +00:00
: path::Builder(r, r->dht(), 3, path::default_len)
, context(parent)
, m_Router(r)
, m_Name(name)
{
2018-07-18 22:50:05 +00:00
m_Tag.Zero();
}
bool
Endpoint::SetOption(const std::string& k, const std::string& v)
{
if(k == "keyfile")
{
m_Keyfile = v;
}
2018-07-18 03:10:21 +00:00
if(k == "tag")
{
m_Tag = v;
2019-04-21 15:40:32 +00:00
LogInfo("Setting tag to ", v);
2018-07-18 03:10:21 +00:00
}
if(k == "prefetch-tag")
{
m_PrefetchTags.insert(v);
}
if(k == "prefetch-addr")
{
Address addr;
if(addr.FromString(v))
m_PrefetchAddrs.insert(addr);
}
if(k == "min-latency")
{
auto val = atoi(v.c_str());
if(val > 0)
m_MinPathLatency = val;
}
if(k == "bundle-rc")
{
m_BundleRC = IsTrueValue(v.c_str());
}
2019-05-10 16:19:33 +00:00
if(k == "blacklist-snode")
{
RouterID snode;
if(!snode.FromString(v))
{
LogError(Name(), " invalid snode value: ", v);
return false;
}
2019-05-11 12:48:54 +00:00
const auto result = m_SnodeBlacklist.insert(snode);
2019-05-10 16:19:33 +00:00
if(!result.second)
{
LogError(Name(), " duplicate blacklist-snode: ", snode.ToString());
return false;
}
LogInfo(Name(), " adding ", snode.ToString(), " to blacklist");
}
if(k == "on-up")
{
m_OnUp = hooks::ExecShellBackend(v);
if(m_OnUp)
LogInfo(Name(), " added on up script: ", v);
else
LogError(Name(), " failed to add on up script");
}
if(k == "on-down")
{
m_OnDown = hooks::ExecShellBackend(v);
if(m_OnDown)
LogInfo(Name(), " added on down script: ", v);
else
LogError(Name(), " failed to add on down script");
}
if(k == "on-ready")
{
m_OnReady = hooks::ExecShellBackend(v);
if(m_OnReady)
LogInfo(Name(), " added on ready script: ", v);
else
LogError(Name(), " failed to add on ready script");
}
2018-08-09 19:02:17 +00:00
return true;
}
2019-04-08 12:01:52 +00:00
llarp_ev_loop_ptr
Endpoint::EndpointNetLoop()
{
if(m_IsolatedNetLoop)
return m_IsolatedNetLoop;
else
return m_Router->netloop();
}
2018-08-16 14:34:15 +00:00
bool
Endpoint::NetworkIsIsolated() const
{
return m_IsolatedLogic.get() != nullptr && m_IsolatedNetLoop != nullptr;
2018-08-09 19:02:17 +00:00
}
2018-08-10 03:51:38 +00:00
bool
Endpoint::HasPendingPathToService(const Address& addr) const
{
2019-02-08 14:12:31 +00:00
return m_PendingServiceLookups.find(addr)
!= m_PendingServiceLookups.end();
2018-08-10 03:51:38 +00:00
}
void
Endpoint::RegenAndPublishIntroSet(llarp_time_t now, bool forceRebuild)
{
std::set< Introduction > I;
if(!GetCurrentIntroductionsWithFilter(
I, [now](const service::Introduction& intro) -> bool {
return now < intro.expiresAt
&& intro.expiresAt - now > (2 * 60 * 1000);
}))
{
2019-04-21 15:40:32 +00:00
LogWarn("could not publish descriptors for endpoint ", Name(),
" because we couldn't get enough valid introductions");
2018-10-29 16:48:36 +00:00
if(ShouldBuildMore(now) || forceRebuild)
2018-11-22 15:52:04 +00:00
ManualRebuild(1);
return;
}
m_IntroSet.I.clear();
for(auto& intro : I)
{
m_IntroSet.I.emplace_back(std::move(intro));
}
if(m_IntroSet.I.size() == 0)
{
2019-04-21 15:40:32 +00:00
LogWarn("not enough intros to publish introset for ", Name());
if(ShouldBuildMore(now) || forceRebuild)
ManualRebuild(1);
return;
}
m_IntroSet.topic = m_Tag;
if(!m_Identity.SignIntroSet(m_IntroSet, now))
{
2019-04-21 15:40:32 +00:00
LogWarn("failed to sign introset for endpoint ", Name());
return;
}
if(PublishIntroSet(m_Router))
{
2019-04-21 15:40:32 +00:00
LogInfo("(re)publishing introset for endpoint ", Name());
}
else
{
2019-04-21 15:40:32 +00:00
LogWarn("failed to publish intro set for endpoint ", Name());
}
}
bool
Endpoint::IsReady() const
{
const auto now = Now();
if(m_IntroSet.I.size() == 0)
return false;
if(m_IntroSet.IsExpired(now))
return false;
return true;
}
bool
Endpoint::HasPendingRouterLookup(const RouterID remote) const
{
return m_PendingRouters.find(remote) != m_PendingRouters.end();
}
bool
Endpoint::IntrosetIsStale() const
{
return m_IntroSet.HasExpiredIntros(Now());
}
2019-02-11 17:14:43 +00:00
util::StatusObject
Endpoint::ExtractStatus() const
2019-02-08 19:43:25 +00:00
{
2019-02-11 17:14:43 +00:00
auto obj = path::Builder::ExtractStatus();
obj.Put("identity", m_Identity.pub.Addr().ToString());
2019-02-08 19:43:25 +00:00
2019-02-11 17:14:43 +00:00
obj.Put("lastPublished", m_LastPublish);
obj.Put("lastPublishAttempt", m_LastPublishAttempt);
obj.Put("introset", m_IntroSet.ExtractStatus());
2019-02-08 19:43:25 +00:00
if(!m_Tag.IsZero())
2019-02-11 17:14:43 +00:00
obj.Put("tag", m_Tag.ToString());
2019-02-08 19:43:25 +00:00
2019-05-07 08:29:47 +00:00
obj.PutContainer("deadSessions", m_DeadSessions);
obj.PutContainer("remoteSessions", m_RemoteSessions);
obj.PutContainer("snodeSessions", m_SNodeSessions);
obj.PutContainer("lookups", m_PendingLookups);
2019-02-08 19:43:25 +00:00
2019-02-11 17:14:43 +00:00
util::StatusObject sessionObj{};
2019-02-08 19:43:25 +00:00
2019-02-11 17:14:43 +00:00
for(const auto& item : m_Sessions)
2019-02-08 19:43:25 +00:00
{
2019-02-11 17:14:43 +00:00
std::string k = item.first.ToHex();
sessionObj.Put(k, item.second.ExtractStatus());
2019-02-08 19:43:25 +00:00
}
2019-02-11 17:14:43 +00:00
obj.Put("converstations", sessionObj);
return obj;
2019-02-08 19:43:25 +00:00
}
void
2018-07-18 22:50:05 +00:00
Endpoint::Tick(llarp_time_t now)
{
2019-04-23 16:13:22 +00:00
path::Builder::Tick(now);
2018-07-19 04:58:39 +00:00
// publish descriptors
2018-07-18 22:50:05 +00:00
if(ShouldPublishDescriptors(now))
{
RegenAndPublishIntroSet(now);
}
2019-04-21 15:40:32 +00:00
else if(NumInStatus(path::ePathEstablished) < 3)
2019-02-21 19:26:59 +00:00
{
if(m_IntroSet.HasExpiredIntros(now))
ManualRebuild(1);
}
2018-12-13 12:27:14 +00:00
// expire snode sessions
EndpointUtil::ExpireSNodeSessions(now, m_SNodeSessions);
// expire pending tx
EndpointUtil::ExpirePendingTx(now, m_PendingLookups);
2018-08-14 21:17:18 +00:00
// expire pending router lookups
EndpointUtil::ExpirePendingRouterLookups(now, m_PendingRouters);
2018-08-14 21:17:18 +00:00
// prefetch addrs
for(const auto& addr : m_PrefetchAddrs)
{
2019-05-07 08:29:47 +00:00
if(!EndpointUtil::HasPathToService(addr, m_RemoteSessions))
{
2018-08-22 15:52:10 +00:00
if(!EnsurePathToService(
addr,
[](ABSL_ATTRIBUTE_UNUSED Address addr,
ABSL_ATTRIBUTE_UNUSED OutboundContext* ctx) {},
10000))
{
2019-04-21 15:40:32 +00:00
LogWarn("failed to ensure path to ", addr);
}
}
}
#ifdef TESTNET
2018-07-19 04:58:39 +00:00
// prefetch tags
2018-07-18 03:10:21 +00:00
for(const auto& tag : m_PrefetchTags)
{
auto itr = m_PrefetchedTags.find(tag);
if(itr == m_PrefetchedTags.end())
{
2019-02-18 19:44:41 +00:00
itr = m_PrefetchedTags.emplace(tag, CachedTagResult(tag, this)).first;
2018-07-19 04:58:39 +00:00
}
for(const auto& introset : itr->second.result)
{
2018-08-10 03:51:38 +00:00
if(HasPendingPathToService(introset.A.Addr()))
continue;
std::array< byte_t, 128 > tmp = {0};
2019-02-02 23:12:42 +00:00
llarp_buffer_t buf(tmp);
if(SendToServiceOrQueue(introset.A.Addr().data(), buf,
eProtocolControl))
2019-04-21 15:40:32 +00:00
LogInfo(Name(), " send message to ", introset.A.Addr(), " for tag ",
tag.ToString());
2019-02-08 19:43:25 +00:00
else
2019-04-21 15:40:32 +00:00
LogWarn(Name(), " failed to send/queue data to ", introset.A.Addr(),
" for tag ", tag.ToString());
2018-07-18 03:10:21 +00:00
}
2018-07-18 22:50:05 +00:00
itr->second.Expire(now);
if(itr->second.ShouldRefresh(now))
2018-07-18 03:10:21 +00:00
{
auto path = PickRandomEstablishedPath();
if(path)
{
2018-08-14 21:17:18 +00:00
auto job = new TagLookupJob(this, &itr->second);
if(!job->SendRequestViaPath(path, Router()))
2019-04-21 15:40:32 +00:00
LogError(Name(), " failed to send tag lookup");
}
else
{
2019-04-21 15:40:32 +00:00
LogError(Name(), " has no paths for tag lookup");
2018-07-18 03:10:21 +00:00
}
}
}
#endif
2019-02-05 14:50:33 +00:00
// deregister dead sessions
EndpointUtil::DeregisterDeadSessions(now, m_DeadSessions);
// tick remote sessions
EndpointUtil::TickRemoteSessions(now, m_RemoteSessions, m_DeadSessions);
2019-02-09 14:37:24 +00:00
// expire convotags
EndpointUtil::ExpireConvoSessions(now, m_Sessions);
2018-09-24 15:52:25 +00:00
}
bool
Endpoint::Stop()
{
// stop remote sessions
EndpointUtil::StopRemoteSessions(m_RemoteSessions);
// stop snode sessions
EndpointUtil::StopSnodeSessions(m_SNodeSessions);
if(m_OnDown)
m_OnDown->NotifyAsync(NotifyParams());
2019-04-21 15:40:32 +00:00
return path::Builder::Stop();
}
2018-07-18 03:10:21 +00:00
uint64_t
Endpoint::GenTXID()
{
2019-04-21 15:40:32 +00:00
uint64_t txid = randint();
2018-07-18 03:10:21 +00:00
while(m_PendingLookups.find(txid) != m_PendingLookups.end())
++txid;
return txid;
}
2018-07-16 03:32:13 +00:00
std::string
Endpoint::Name() const
{
return m_Name + ":" + m_Identity.pub.Name();
}
2018-08-04 02:59:32 +00:00
void
Endpoint::PutLookup(IServiceLookup* lookup, uint64_t txid)
{
// std::unique_ptr< service::IServiceLookup > ptr(lookup);
// m_PendingLookups.emplace(txid, ptr);
// m_PendingLookups[txid] = std::move(ptr);
m_PendingLookups.emplace(txid, std::unique_ptr< IServiceLookup >(lookup));
2018-08-04 02:59:32 +00:00
}
bool
2019-05-03 13:15:03 +00:00
Endpoint::HandleGotIntroMessage(dht::GotIntroMessage_constptr msg)
{
2018-07-18 03:10:21 +00:00
std::set< IntroSet > remote;
for(const auto& introset : msg->I)
{
if(!introset.Verify(Now()))
{
2018-07-19 04:58:39 +00:00
if(m_Identity.pub == introset.A && m_CurrentPublishTX == msg->T)
IntroSetPublishFail();
return true;
2018-07-18 22:50:05 +00:00
}
2018-07-19 04:58:39 +00:00
if(m_Identity.pub == introset.A && m_CurrentPublishTX == msg->T)
2018-07-18 22:50:05 +00:00
{
2019-04-21 15:40:32 +00:00
LogInfo(
"got introset publish confirmation for hidden service endpoint ",
2018-07-16 03:32:13 +00:00
Name());
2018-07-17 06:17:13 +00:00
IntroSetPublished();
2018-07-18 03:10:21 +00:00
return true;
}
else
{
2018-07-18 03:10:21 +00:00
remote.insert(introset);
}
}
2018-07-18 03:10:21 +00:00
auto itr = m_PendingLookups.find(msg->T);
if(itr == m_PendingLookups.end())
{
2019-04-21 15:40:32 +00:00
LogWarn("invalid lookup response for hidden service endpoint ", Name(),
" txid=", msg->T);
2018-07-20 04:50:28 +00:00
return true;
2018-07-18 03:10:21 +00:00
}
2018-08-14 21:17:18 +00:00
std::unique_ptr< IServiceLookup > lookup = std::move(itr->second);
2018-07-18 03:10:21 +00:00
m_PendingLookups.erase(itr);
2018-08-14 21:17:18 +00:00
lookup->HandleResponse(remote);
return true;
}
2018-08-09 19:02:17 +00:00
void
Endpoint::PutSenderFor(const ConvoTag& tag, const ServiceInfo& info)
{
auto itr = m_Sessions.find(tag);
if(itr == m_Sessions.end())
{
itr = m_Sessions.emplace(tag, Session{}).first;
2018-08-09 19:02:17 +00:00
}
itr->second.remote = info;
2018-10-29 16:48:36 +00:00
itr->second.lastUsed = Now();
2018-08-09 19:02:17 +00:00
}
bool
Endpoint::GetSenderFor(const ConvoTag& tag, ServiceInfo& si) const
{
auto itr = m_Sessions.find(tag);
if(itr == m_Sessions.end())
return false;
si = itr->second.remote;
return true;
}
void
Endpoint::PutIntroFor(const ConvoTag& tag, const Introduction& intro)
{
auto itr = m_Sessions.find(tag);
if(itr == m_Sessions.end())
{
itr = m_Sessions.emplace(tag, Session{}).first;
2018-08-09 19:02:17 +00:00
}
itr->second.intro = intro;
2018-10-29 16:48:36 +00:00
itr->second.lastUsed = Now();
2018-08-09 19:02:17 +00:00
}
bool
Endpoint::GetIntroFor(const ConvoTag& tag, Introduction& intro) const
{
auto itr = m_Sessions.find(tag);
if(itr == m_Sessions.end())
return false;
intro = itr->second.intro;
return true;
}
2019-02-21 16:45:33 +00:00
void
Endpoint::PutReplyIntroFor(const ConvoTag& tag, const Introduction& intro)
{
auto itr = m_Sessions.find(tag);
if(itr == m_Sessions.end())
{
itr = m_Sessions.emplace(tag, Session{}).first;
}
itr->second.replyIntro = intro;
itr->second.lastUsed = Now();
}
bool
Endpoint::GetReplyIntroFor(const ConvoTag& tag, Introduction& intro) const
{
auto itr = m_Sessions.find(tag);
if(itr == m_Sessions.end())
return false;
intro = itr->second.replyIntro;
return true;
}
2018-08-09 19:02:17 +00:00
bool
Endpoint::GetConvoTagsForService(const ServiceInfo& info,
std::set< ConvoTag >& tags) const
{
2019-05-07 08:29:47 +00:00
return EndpointUtil::GetConvoTagsForService(m_Sessions, info, tags);
2018-08-09 19:02:17 +00:00
}
bool
Endpoint::GetCachedSessionKeyFor(const ConvoTag& tag,
SharedSecret& secret) const
2018-08-09 19:02:17 +00:00
{
auto itr = m_Sessions.find(tag);
if(itr == m_Sessions.end())
return false;
secret = itr->second.sharedKey;
2018-08-09 19:02:17 +00:00
return true;
}
void
Endpoint::PutCachedSessionKeyFor(const ConvoTag& tag, const SharedSecret& k)
{
auto itr = m_Sessions.find(tag);
if(itr == m_Sessions.end())
{
itr = m_Sessions.emplace(tag, Session{}).first;
2018-08-09 19:02:17 +00:00
}
itr->second.sharedKey = k;
2018-10-29 16:48:36 +00:00
itr->second.lastUsed = Now();
2018-08-09 19:02:17 +00:00
}
bool
Endpoint::LoadKeyFile()
{
if(!m_Keyfile.empty())
{
if(!m_Identity.EnsureKeys(m_Keyfile))
{
LogError("Can't ensure keyfile [", m_Keyfile, "]");
return false;
}
}
else
{
m_Identity.RegenerateKeys();
}
return true;
}
bool
Endpoint::Start()
{
// how can I tell if a m_Identity isn't loaded?
2018-08-09 19:02:17 +00:00
if(!m_DataHandler)
{
m_DataHandler = this;
}
2018-08-16 14:34:15 +00:00
// this does network isolation
2018-08-09 19:02:17 +00:00
while(m_OnInit.size())
{
if(m_OnInit.front()())
m_OnInit.pop_front();
else
{
2019-04-21 15:40:32 +00:00
LogWarn("Can't call init of network isolation");
2018-08-09 19:02:17 +00:00
return false;
}
2018-08-09 19:02:17 +00:00
}
return true;
}
Endpoint::~Endpoint()
{
if(m_OnUp)
m_OnUp->Stop();
if(m_OnDown)
m_OnDown->Stop();
if(m_OnReady)
m_OnReady->Stop();
}
2018-07-18 03:10:21 +00:00
bool
Endpoint::PublishIntroSet(AbstractRouter* r)
2018-07-18 03:10:21 +00:00
{
// publish via near router
RouterID location = m_Identity.pub.Addr().as_array();
2018-09-24 14:31:58 +00:00
auto path = GetEstablishedPathClosestTo(location);
2018-10-23 17:15:22 +00:00
return path && PublishIntroSetVia(r, path);
2018-07-18 03:10:21 +00:00
}
2018-09-18 14:48:06 +00:00
struct PublishIntroSetJob : public IServiceLookup
{
IntroSet m_IntroSet;
Endpoint* m_Endpoint;
PublishIntroSetJob(Endpoint* parent, uint64_t id,
const IntroSet& introset)
: IServiceLookup(parent, id, "PublishIntroSet")
, m_IntroSet(introset)
, m_Endpoint(parent)
{
}
std::shared_ptr< routing::IMessage >
2018-09-18 14:48:06 +00:00
BuildRequestMessage()
{
auto msg = std::make_shared< routing::DHTMessage >();
msg->M.emplace_back(
std::make_unique< dht::PublishIntroMessage >(m_IntroSet, txid, 1));
2018-09-18 14:48:06 +00:00
return msg;
}
bool
HandleResponse(const std::set< IntroSet >& response)
{
if(response.size())
m_Endpoint->IntroSetPublished();
else
m_Endpoint->IntroSetPublishFail();
return true;
}
};
2018-07-18 03:10:21 +00:00
void
Endpoint::IntroSetPublishFail()
{
auto now = Now();
if(ShouldPublishDescriptors(now))
{
RegenAndPublishIntroSet(now);
}
2019-04-21 15:40:32 +00:00
else if(NumInStatus(path::ePathEstablished) < 3)
{
if(m_IntroSet.HasExpiredIntros(now))
ManualRebuild(1);
}
2018-09-18 14:48:06 +00:00
}
bool
Endpoint::PublishIntroSetVia(AbstractRouter* r, path::Path_ptr path)
2018-09-18 14:48:06 +00:00
{
auto job = new PublishIntroSetJob(this, GenTXID(), m_IntroSet);
if(job->SendRequestViaPath(path, r))
{
2018-10-29 16:48:36 +00:00
m_LastPublishAttempt = Now();
2018-09-18 14:48:06 +00:00
return true;
}
return false;
2018-07-18 03:10:21 +00:00
}
2019-05-07 17:46:38 +00:00
void
Endpoint::ResetInternalState()
{
path::Builder::ResetInternalState();
static auto resetState = [](auto& container) {
std::for_each(container.begin(), container.end(),
[](auto& item) { item.second->ResetInternalState(); });
};
resetState(m_RemoteSessions);
resetState(m_SNodeSessions);
}
2018-07-18 03:10:21 +00:00
bool
2018-07-18 22:50:05 +00:00
Endpoint::ShouldPublishDescriptors(llarp_time_t now) const
2018-07-18 03:10:21 +00:00
{
2019-04-21 15:40:32 +00:00
if(NumInStatus(path::ePathEstablished) < 3)
return false;
2019-04-01 19:56:11 +00:00
// make sure we have all paths that are established
// in our introset
bool should = false;
2019-04-23 16:13:22 +00:00
ForEachPath([&](const path::Path_ptr& p) {
2019-04-05 14:58:22 +00:00
if(!p->IsReady())
return;
for(const auto& i : m_IntroSet.I)
{
if(i == p->intro)
return;
}
should = true;
});
2019-04-01 19:56:11 +00:00
if(m_IntroSet.HasExpiredIntros(now) || should)
return now - m_LastPublishAttempt >= INTROSET_PUBLISH_RETRY_INTERVAL;
2018-09-18 14:48:06 +00:00
return now - m_LastPublishAttempt >= INTROSET_PUBLISH_INTERVAL;
2018-07-18 03:10:21 +00:00
}
void
Endpoint::IntroSetPublished()
{
2018-10-29 16:48:36 +00:00
m_LastPublish = Now();
2019-04-21 15:40:32 +00:00
LogInfo(Name(), " IntroSet publish confirmed");
if(m_OnReady)
m_OnReady->NotifyAsync(NotifyParams());
m_OnReady = nullptr;
2018-07-18 03:10:21 +00:00
}
void
Endpoint::IsolatedNetworkMainLoop()
{
m_IsolatedNetLoop = llarp_make_ev_loop();
2019-05-22 16:20:50 +00:00
m_IsolatedLogic = std::make_shared< llarp::Logic >();
if(SetupNetworking())
2019-05-22 16:20:50 +00:00
llarp_ev_loop_run_single_process(
m_IsolatedNetLoop, m_IsolatedLogic->thread, m_IsolatedLogic);
else
{
m_IsolatedNetLoop.reset();
m_IsolatedLogic.reset();
}
2018-08-09 19:02:17 +00:00
}
2019-05-10 16:19:33 +00:00
bool
Endpoint::SelectHop(llarp_nodedb* db, const std::set< RouterID >& prev,
RouterContact& cur, size_t hop, path::PathRole roles)
{
std::set< RouterID > exclude = prev;
for(const auto& snode : m_SnodeBlacklist)
exclude.insert(snode);
return path::Builder::SelectHop(db, exclude, cur, hop, roles);
}
bool
Endpoint::ShouldBundleRC() const
{
return m_BundleRC;
}
2018-07-22 23:14:29 +00:00
void
2019-04-21 15:40:32 +00:00
Endpoint::PutNewOutboundContext(const service::IntroSet& introset)
2018-07-22 23:14:29 +00:00
{
Address addr;
introset.A.CalculateAddress(addr.as_array());
2018-07-22 23:14:29 +00:00
if(m_RemoteSessions.count(addr) >= MAX_OUTBOUND_CONTEXT_COUNT)
2018-07-22 23:14:29 +00:00
{
auto itr = m_RemoteSessions.find(addr);
auto range = m_PendingServiceLookups.equal_range(addr);
auto i = range.first;
if(i != range.second)
{
i->second(addr, itr->second.get());
++i;
}
m_PendingServiceLookups.erase(addr);
return;
2018-07-22 23:14:29 +00:00
}
auto it = m_RemoteSessions.emplace(
2019-04-23 16:13:22 +00:00
addr, std::make_shared< OutboundContext >(introset, this));
2019-04-21 15:40:32 +00:00
LogInfo("Created New outbound context for ", addr.ToString());
2018-07-22 23:14:29 +00:00
// inform pending
auto range = m_PendingServiceLookups.equal_range(addr);
auto itr = range.first;
if(itr != range.second)
2018-07-22 23:14:29 +00:00
{
itr->second(addr, it->second.get());
++itr;
2018-07-22 23:14:29 +00:00
}
m_PendingServiceLookups.erase(addr);
2018-07-22 23:14:29 +00:00
}
void
Endpoint::HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg,
llarp_async_verify_rc* j)
{
auto itr = m_PendingRouters.find(msg->R[0].pubkey);
if(itr != m_PendingRouters.end())
{
if(j->valid)
itr->second.InformResult(msg->R);
else
itr->second.InformResult({});
m_PendingRouters.erase(itr);
}
delete j;
}
2018-08-10 21:34:11 +00:00
bool
2019-05-03 13:15:03 +00:00
Endpoint::HandleGotRouterMessage(dht::GotRouterMessage_constptr msg)
2018-08-10 21:34:11 +00:00
{
if(msg->R.size())
2018-08-10 21:34:11 +00:00
{
llarp_async_verify_rc* job = new llarp_async_verify_rc;
job->nodedb = m_Router->nodedb();
job->cryptoworker = m_Router->threadpool();
job->diskworker = m_Router->diskworker();
job->logic = m_Router->logic();
job->hook = std::bind(&Endpoint::HandleVerifyGotRouter, this, msg,
std::placeholders::_1);
job->rc = msg->R[0];
2018-08-10 21:34:11 +00:00
llarp_nodedb_async_verify(job);
2019-05-03 13:15:03 +00:00
}
else
{
auto itr = m_PendingRouters.begin();
while(itr != m_PendingRouters.end())
{
if(itr->second.txid == msg->txid)
{
itr->second.InformResult({});
itr = m_PendingRouters.erase(itr);
}
else
++itr;
}
2018-08-10 21:34:11 +00:00
}
return true;
2018-08-10 21:34:11 +00:00
}
void
Endpoint::EnsureRouterIsKnown(const RouterID& router)
{
2018-08-14 22:07:58 +00:00
if(router.IsZero())
return;
2019-05-02 16:23:31 +00:00
if(!m_Router->nodedb()->Has(router))
2018-08-10 21:34:11 +00:00
{
2019-05-03 13:15:03 +00:00
LookupRouterAnon(router, nullptr);
2018-12-19 17:48:29 +00:00
}
}
2018-08-10 21:34:11 +00:00
2018-12-19 17:48:29 +00:00
bool
2019-05-03 13:15:03 +00:00
Endpoint::LookupRouterAnon(RouterID router, RouterLookupHandler handler)
2018-12-19 17:48:29 +00:00
{
if(m_PendingRouters.find(router) == m_PendingRouters.end())
{
auto path = GetEstablishedPathClosestTo(router);
routing::DHTMessage msg;
auto txid = GenTXID();
msg.M.emplace_back(
std::make_unique< dht::FindRouterMessage >(txid, router));
2018-12-19 17:48:29 +00:00
if(path && path->SendRoutingMessage(msg, m_Router))
2018-12-19 17:48:29 +00:00
{
2019-04-21 15:40:32 +00:00
LogInfo(Name(), " looking up ", router);
2019-05-03 13:15:03 +00:00
m_PendingRouters.emplace(router, RouterLookupJob(this, handler));
2018-12-19 17:48:29 +00:00
return true;
2018-08-10 21:34:11 +00:00
}
2018-12-19 17:48:29 +00:00
else
2019-04-21 15:40:32 +00:00
LogError("failed to send request for router lookup");
2018-08-10 21:34:11 +00:00
}
2018-12-19 17:48:29 +00:00
return false;
2018-08-10 21:34:11 +00:00
}
void
Endpoint::HandlePathBuilt(path::Path_ptr p)
{
2019-06-02 21:19:10 +00:00
p->SetDataHandler(util::memFn(&Endpoint::HandleHiddenServiceFrame, this));
p->SetDropHandler(util::memFn(&Endpoint::HandleDataDrop, this));
p->SetDeadChecker(util::memFn(&Endpoint::CheckPathIsDead, this));
path::Builder::HandlePathBuilt(p);
}
bool
2019-04-23 16:13:22 +00:00
Endpoint::HandleDataDrop(path::Path_ptr p, const PathID_t& dst,
uint64_t seq)
{
2019-04-21 15:40:32 +00:00
LogWarn(Name(), " message ", seq, " dropped by endpoint ", p->Endpoint(),
" via ", dst);
return true;
}
std::unordered_map< std::string, std::string >
Endpoint::NotifyParams() const
{
return {{"LOKINET_ADDR", m_Identity.pub.Addr().ToString()}};
}
bool
2019-05-03 13:15:03 +00:00
Endpoint::HandleDataMessage(const PathID_t& src,
std::shared_ptr< ProtocolMessage > msg)
{
2019-06-06 10:52:27 +00:00
msg->sender.UpdateAddr();
auto path = GetPathByID(src);
2019-02-21 17:13:27 +00:00
if(path)
2019-06-06 11:10:18 +00:00
PutReplyIntroFor(msg->tag, path->intro);
2019-06-06 10:52:27 +00:00
PutSenderFor(msg->tag, msg->sender);
2019-06-06 11:10:18 +00:00
PutIntroFor(msg->tag, msg->introReply);
EnsureReplyPath(msg->sender);
2018-09-18 17:48:26 +00:00
return ProcessDataMessage(msg);
}
2018-11-29 14:01:13 +00:00
bool
2019-04-21 15:40:32 +00:00
Endpoint::HasPathToSNode(const RouterID& ident) const
2018-11-29 14:01:13 +00:00
{
auto range = m_SNodeSessions.equal_range(ident);
auto itr = range.first;
2018-11-29 14:01:13 +00:00
while(itr != range.second)
{
2018-11-29 14:01:13 +00:00
if(itr->second->IsReady())
{
return true;
}
++itr;
}
return false;
}
2018-11-29 13:12:35 +00:00
bool
2019-05-03 13:15:03 +00:00
Endpoint::ProcessDataMessage(std::shared_ptr< ProtocolMessage > msg)
2018-11-29 13:12:35 +00:00
{
2019-06-11 16:44:05 +00:00
if(msg->proto == eProtocolTrafficV4 || msg->proto == eProtocolTrafficV6)
2018-11-29 13:12:35 +00:00
{
util::Lock l(&m_InboundTrafficQueueMutex);
m_InboundTrafficQueue.emplace(msg);
2019-05-22 17:47:33 +00:00
return true;
2018-11-29 13:12:35 +00:00
}
else if(msg->proto == eProtocolControl)
2018-11-29 13:12:35 +00:00
{
// TODO: implement me (?)
// right now it's just random noise
2018-11-29 13:12:35 +00:00
return true;
}
return false;
}
2019-03-08 16:00:45 +00:00
void
Endpoint::RemoveConvoTag(const ConvoTag& t)
{
m_Sessions.erase(t);
}
bool
Endpoint::HandleHiddenServiceFrame(path::Path_ptr p,
const ProtocolFrame& frame)
{
if(frame.R)
2019-03-08 16:00:45 +00:00
{
// handle discard
ServiceInfo si;
if(!GetSenderFor(frame.T, si))
2019-03-08 16:00:45 +00:00
return false;
// verify source
if(!frame.Verify(si))
2019-03-08 16:00:45 +00:00
return false;
// remove convotag it doesn't exist
LogWarn("remove convotag T=", frame.T);
RemoveConvoTag(frame.T);
2019-03-08 16:00:45 +00:00
return true;
}
if(!frame.AsyncDecryptAndVerify(EndpointLogic(), p, CryptoWorker(),
m_Identity, m_DataHandler))
2019-03-08 16:00:45 +00:00
{
// send discard
ProtocolFrame f;
f.R = 1;
f.T = frame.T;
2019-03-08 16:00:45 +00:00
f.F = p->intro.pathID;
if(!f.Sign(m_Identity))
2019-03-08 16:00:45 +00:00
return false;
2019-05-02 16:23:31 +00:00
{
util::Lock lock(&m_SendQueueMutex);
m_SendQueue.emplace_back(
std::make_shared< const routing::PathTransferMessage >(f,
frame.F),
p);
}
return true;
2019-03-08 16:00:45 +00:00
}
return true;
}
2019-04-23 16:13:22 +00:00
void Endpoint::HandlePathDied(path::Path_ptr)
2018-09-17 15:32:37 +00:00
{
2019-03-30 13:02:10 +00:00
RegenAndPublishIntroSet(Now(), true);
}
bool
Endpoint::CheckPathIsDead(path::Path_ptr, llarp_time_t dlt)
{
2019-04-05 14:58:22 +00:00
return dlt > path::alive_timeout;
}
2018-08-10 21:34:11 +00:00
bool
2018-10-15 15:43:41 +00:00
Endpoint::OnLookup(const Address& addr, const IntroSet* introset,
const RouterID& endpoint)
2018-08-10 21:34:11 +00:00
{
2018-10-29 16:48:36 +00:00
auto now = Now();
2018-10-10 21:31:03 +00:00
if(introset == nullptr || introset->IsExpired(now))
{
2019-04-21 15:40:32 +00:00
LogError(Name(), " failed to lookup ", addr.ToString(), " from ",
endpoint);
2018-10-23 17:04:35 +00:00
m_ServiceLookupFails[endpoint] = m_ServiceLookupFails[endpoint] + 1;
// inform one
auto itr = m_PendingServiceLookups.find(addr);
if(itr != m_PendingServiceLookups.end())
{
itr->second(addr, nullptr);
m_PendingServiceLookups.erase(itr);
}
2018-08-10 21:34:11 +00:00
return false;
}
else
PutNewOutboundContext(*introset);
2018-08-10 21:34:11 +00:00
return true;
}
2018-07-19 04:58:39 +00:00
bool
Endpoint::EnsurePathToService(const Address& remote, PathEnsureHook hook,
ABSL_ATTRIBUTE_UNUSED llarp_time_t timeoutMS,
bool randomPath)
2018-07-19 04:58:39 +00:00
{
path::Path_ptr path = nullptr;
if(randomPath)
path = PickRandomEstablishedPath();
else
path = GetEstablishedPathClosestTo(remote.ToRouter());
if(!path)
{
2019-04-21 15:40:32 +00:00
LogWarn("No outbound path for lookup yet");
2018-12-02 15:26:26 +00:00
BuildOne();
return false;
}
2019-04-21 15:40:32 +00:00
LogInfo(Name(), " Ensure Path to ", remote.ToString());
2018-07-22 23:14:29 +00:00
{
auto itr = m_RemoteSessions.find(remote);
if(itr != m_RemoteSessions.end())
{
2018-08-22 15:52:10 +00:00
hook(itr->first, itr->second.get());
2018-07-22 23:14:29 +00:00
return true;
}
}
if(m_PendingServiceLookups.count(remote) >= MaxConcurrentLookups)
{
2019-04-21 15:40:32 +00:00
LogWarn(Name(), " has too many pending service lookups for ",
remote.ToString());
return false;
}
using namespace std::placeholders;
2018-08-10 21:34:11 +00:00
HiddenServiceAddressLookup* job = new HiddenServiceAddressLookup(
2019-06-02 21:19:10 +00:00
this, util::memFn(&Endpoint::OnLookup, this), remote, GenTXID());
2019-04-21 15:40:32 +00:00
LogInfo("doing lookup for ", remote, " via ", path->Endpoint());
2018-08-10 21:34:11 +00:00
if(job->SendRequestViaPath(path, Router()))
{
m_PendingServiceLookups.emplace(remote, hook);
2018-08-10 21:34:11 +00:00
return true;
}
2019-04-21 15:40:32 +00:00
LogError("send via path failed");
2018-08-10 21:34:11 +00:00
return false;
2018-07-19 04:58:39 +00:00
}
2018-11-29 13:12:35 +00:00
void
Endpoint::EnsurePathToSNode(const RouterID& snode, SNodeEnsureHook h)
2018-11-29 13:12:35 +00:00
{
using namespace std::placeholders;
2018-12-12 18:37:03 +00:00
if(m_SNodeSessions.count(snode) == 0)
2018-11-29 13:12:35 +00:00
{
2019-04-23 16:13:22 +00:00
auto themIP = ObtainIPForAddr(snode, true);
auto session = std::make_shared< exit::SNodeSession >(
snode,
2019-04-23 16:13:22 +00:00
std::bind(&Endpoint::HandleWriteIPPacket, this, _1,
2019-06-11 16:44:05 +00:00
[themIP]() -> huint128_t { return themIP; }),
m_Router, m_NumPaths, numHops, false, ShouldBundleRC());
2019-04-23 16:13:22 +00:00
m_SNodeSessions.emplace(snode, session);
2018-11-29 13:12:35 +00:00
}
2019-04-30 21:36:27 +00:00
EnsureRouterIsKnown(snode);
auto range = m_SNodeSessions.equal_range(snode);
auto itr = range.first;
while(itr != range.second)
{
2019-03-07 15:17:29 +00:00
if(itr->second->IsReady())
2019-04-23 16:13:22 +00:00
h(snode, itr->second);
2019-03-07 15:17:29 +00:00
else
2019-04-30 21:36:27 +00:00
{
itr->second->AddReadyHook(std::bind(h, snode, _1));
2019-04-30 21:36:27 +00:00
itr->second->BuildOne();
}
++itr;
}
2018-11-29 13:12:35 +00:00
}
bool
2019-02-02 23:12:42 +00:00
Endpoint::SendToSNodeOrQueue(const RouterID& addr,
const llarp_buffer_t& buf)
2018-11-29 13:12:35 +00:00
{
2019-06-11 16:44:05 +00:00
auto pkt = std::make_shared< net::IPPacket >();
2019-04-30 21:36:27 +00:00
if(!pkt->Load(buf))
2018-11-29 13:12:35 +00:00
return false;
2019-04-30 21:36:27 +00:00
EnsurePathToSNode(addr, [pkt](RouterID, exit::BaseSession_ptr s) {
if(s)
s->QueueUpstreamTraffic(*pkt, routing::ExitPadSize);
});
return true;
2018-11-29 13:12:35 +00:00
}
2019-04-30 16:07:17 +00:00
void Endpoint::Pump(llarp_time_t)
2019-04-25 17:15:56 +00:00
{
2019-04-30 13:56:39 +00:00
EndpointLogic()->queue_func([&]() {
2019-05-22 16:20:50 +00:00
// send downstream packets to user for snode
2019-04-30 13:56:39 +00:00
for(const auto& item : m_SNodeSessions)
item.second->FlushDownstream();
// send downstream traffic to user for hidden service
util::Lock lock(&m_InboundTrafficQueueMutex);
while(m_InboundTrafficQueue.size())
{
2019-05-22 16:20:50 +00:00
const auto& msg = m_InboundTrafficQueue.top();
llarp_buffer_t buf(msg->payload);
2019-06-11 16:44:05 +00:00
HandleWriteIPPacket(buf, [&]() -> huint128_t {
return ObtainIPForAddr(msg->sender.Addr(), false);
});
m_InboundTrafficQueue.pop();
}
2019-04-30 13:56:39 +00:00
});
2019-04-30 16:07:17 +00:00
auto router = Router();
// TODO: locking on this container
2019-04-30 16:07:17 +00:00
for(const auto& item : m_RemoteSessions)
item.second->FlushUpstream();
// TODO: locking on this container
2019-04-30 16:07:17 +00:00
for(const auto& item : m_SNodeSessions)
item.second->FlushUpstream();
util::Lock lock(&m_SendQueueMutex);
// send outbound traffic
2019-04-30 16:07:17 +00:00
for(const auto& item : m_SendQueue)
item.second->SendRoutingMessage(*item.first, router);
m_SendQueue.clear();
2019-04-25 17:15:56 +00:00
}
2018-08-22 15:52:10 +00:00
bool
2019-06-06 10:52:27 +00:00
Endpoint::SendToServiceOrQueue(const service::Address& remote,
const llarp_buffer_t& data, ProtocolType t)
2018-08-22 15:52:10 +00:00
{
// inbound converstation
2018-10-29 16:48:36 +00:00
auto now = Now();
2018-11-14 12:23:08 +00:00
{
auto itr = m_AddressToService.find(remote);
if(itr != m_AddressToService.end())
{
2019-04-23 16:13:22 +00:00
auto transfer = std::make_shared< routing::PathTransferMessage >();
ProtocolFrame& f = transfer->T;
2019-04-23 16:13:22 +00:00
std::shared_ptr< path::Path > p;
std::set< ConvoTag > tags;
2019-03-07 22:53:36 +00:00
if(GetConvoTagsForService(itr->second, tags))
{
2019-03-07 22:53:36 +00:00
Introduction remoteIntro;
SharedSecret K;
// pick tag
for(const auto& tag : tags)
{
2019-03-07 22:53:36 +00:00
if(tag.IsZero())
continue;
if(!GetCachedSessionKeyFor(tag, K))
continue;
2019-06-04 13:43:49 +00:00
if(GetIntroFor(tag, remoteIntro))
{
2019-03-07 22:53:36 +00:00
if(!remoteIntro.ExpiresSoon(now))
p = GetNewestPathByRouter(remoteIntro.router);
if(p)
{
f.T = tag;
}
}
}
2019-03-07 22:53:36 +00:00
if(p)
{
2019-03-07 22:53:36 +00:00
// TODO: check expiration of our end
ProtocolMessage m(f.T);
m.PutBuffer(data);
f.N.Randomize();
f.C.Zero();
transfer->Y.Randomize();
m.proto = t;
m.introReply = p->intro;
2019-04-06 13:52:04 +00:00
PutReplyIntroFor(f.T, m.introReply);
2019-04-23 16:13:22 +00:00
m.sender = m_Identity.pub;
2019-06-06 10:52:27 +00:00
m.seqno = GetSeqNoForConvo(f.T);
2019-06-06 11:16:03 +00:00
f.S = 1;
2019-04-23 16:13:22 +00:00
f.F = m.introReply.pathID;
transfer->P = remoteIntro.pathID;
if(!f.EncryptAndSign(m, K, m_Identity))
2019-03-07 22:53:36 +00:00
{
2019-04-21 15:40:32 +00:00
LogError("failed to encrypt and sign");
2019-03-07 22:53:36 +00:00
return false;
}
2019-04-21 15:40:32 +00:00
LogDebug(Name(), " send ", data.sz, " via ", remoteIntro.router);
2019-04-30 13:56:39 +00:00
{
util::Lock lock(&m_SendQueueMutex);
m_SendQueue.emplace_back(transfer, p);
}
return true;
}
}
}
}
// outbound converstation
2019-05-07 08:29:47 +00:00
if(EndpointUtil::HasPathToService(remote, m_RemoteSessions))
2018-08-22 15:52:10 +00:00
{
auto range = m_RemoteSessions.equal_range(remote);
auto itr = range.first;
while(itr != range.second)
{
if(itr->second->ReadyToSend())
{
itr->second->AsyncEncryptAndSendTo(data, t);
return true;
}
++itr;
}
2018-08-22 15:52:10 +00:00
}
2019-04-30 16:49:34 +00:00
m_PendingTraffic[remote].emplace_back(data, t);
// no converstation
2019-05-11 14:13:35 +00:00
return EnsurePathToService(
remote,
[&](Address r, OutboundContext* c) {
if(c)
{
c->UpdateIntroSet(true);
for(auto& pending : m_PendingTraffic[r])
c->AsyncEncryptAndSendTo(pending.Buffer(), pending.protocol);
}
m_PendingTraffic.erase(r);
},
5000, true);
}
2018-08-22 15:52:10 +00:00
2018-07-19 04:58:39 +00:00
void
Endpoint::EnsureReplyPath(const ServiceInfo& ident)
{
m_AddressToService[ident.Addr()] = ident;
}
2019-03-08 17:00:13 +00:00
bool
Endpoint::HasConvoTag(const ConvoTag& t) const
{
return m_Sessions.find(t) != m_Sessions.end();
}
2018-08-09 19:02:17 +00:00
uint64_t
Endpoint::GetSeqNoForConvo(const ConvoTag& tag)
{
auto itr = m_Sessions.find(tag);
if(itr == m_Sessions.end())
return 0;
return ++(itr->second.seqno);
}
2019-03-08 14:36:24 +00:00
bool
Endpoint::ShouldBuildMore(llarp_time_t now) const
{
2019-05-02 16:23:31 +00:00
const bool should = path::Builder::ShouldBuildMore(now);
2019-01-16 00:24:16 +00:00
// determine newest intro
Introduction intro;
if(!GetNewestIntro(intro))
return should;
// time from now that the newest intro expires at
2019-05-02 16:23:31 +00:00
if(intro.ExpiresSoon(now))
2018-09-27 11:07:20 +00:00
return should;
2019-05-02 16:31:08 +00:00
const auto dlt = intro.expiresAt - now;
2018-09-27 11:09:00 +00:00
return should
|| ( // try spacing tunnel builds out evenly in time
(dlt <= (path::default_lifetime / 4))
&& (NumInStatus(path::ePathBuilding) < m_NumPaths));
}
2019-05-22 16:20:50 +00:00
std::shared_ptr< Logic >
2018-08-09 19:02:17 +00:00
Endpoint::RouterLogic()
2018-07-19 04:58:39 +00:00
{
return m_Router->logic();
2018-07-19 04:58:39 +00:00
}
2019-05-22 16:20:50 +00:00
std::shared_ptr< Logic >
2018-08-09 19:02:17 +00:00
Endpoint::EndpointLogic()
{
return m_IsolatedLogic ? m_IsolatedLogic : m_Router->logic();
2018-08-09 19:02:17 +00:00
}
2018-07-19 04:58:39 +00:00
llarp_threadpool*
Endpoint::CryptoWorker()
2018-07-19 04:58:39 +00:00
{
return m_Router->threadpool();
2018-07-19 04:58:39 +00:00
}
2018-07-12 18:21:44 +00:00
} // namespace service
2018-07-16 03:32:13 +00:00
} // namespace llarp