You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lokinet/llarp/service/endpoint.cpp

427 lines
11 KiB
C++

6 years ago
#include <llarp/dht/messages/findintro.hpp>
#include <llarp/messages/dht.hpp>
#include <llarp/service/endpoint.hpp>
#include <llarp/service/protocol.hpp>
#include "buffer.hpp"
#include "router.hpp"
namespace llarp
{
namespace service
{
Endpoint::Endpoint(const std::string& name, llarp_router* r)
6 years ago
: llarp_pathbuilder_context(r, r->dht, 2), m_Router(r), m_Name(name)
{
6 years ago
m_Tag.Zero();
}
bool
Endpoint::SetOption(const std::string& k, const std::string& v)
{
if(k == "keyfile")
{
m_Keyfile = v;
}
6 years ago
if(k == "tag")
{
m_Tag = v;
llarp::LogInfo("Setting tag to ", v);
6 years ago
}
if(k == "prefetch-tag")
{
m_PrefetchTags.insert(v);
}
return true;
}
struct PathAlignJob
{
Address remote;
PathAlignJob(const Address& addr) : remote(addr)
{
}
void
HandleResult(Endpoint::OutboundContext* context)
{
if(context)
{
byte_t tmp[128] = {0};
memcpy(tmp, "BEEP", 4);
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
context->AsyncEncryptAndSendTo(buf, eProtocolText);
}
else
{
llarp::LogWarn("PathAlignJob timed out");
delete this;
}
}
};
void
6 years ago
Endpoint::Tick(llarp_time_t now)
{
// publish descriptors
6 years ago
if(ShouldPublishDescriptors(now))
{
std::set< Introduction > I;
6 years ago
if(!GetCurrentIntroductions(I))
{
6 years ago
llarp::LogWarn("could not publish descriptors for endpoint ", Name(),
" because we couldn't get any introductions");
return;
}
6 years ago
m_IntroSet.I = I;
m_IntroSet.topic = m_Tag;
6 years ago
if(!m_Identity.SignIntroSet(m_IntroSet, &m_Router->crypto))
{
6 years ago
llarp::LogWarn("failed to sign introset for endpoint ", Name());
return;
}
6 years ago
if(PublishIntroSet(m_Router))
{
6 years ago
llarp::LogInfo("publishing introset for endpoint ", Name());
}
else
{
6 years ago
llarp::LogWarn("failed to publish intro set for endpoint ", Name());
}
}
// prefetch tags
6 years ago
for(const auto& tag : m_PrefetchTags)
{
auto itr = m_PrefetchedTags.find(tag);
if(itr == m_PrefetchedTags.end())
{
itr = m_PrefetchedTags
.insert(std::make_pair(tag, CachedTagResult(tag, now)))
.first;
}
for(const auto& introset : itr->second.result)
{
PathAlignJob* j = new PathAlignJob(introset.A.Addr());
if(!EnsurePathToService(j->remote,
std::bind(&PathAlignJob::HandleResult, j,
std::placeholders::_1),
10000))
delete j;
6 years ago
}
6 years ago
itr->second.Expire(now);
if(itr->second.ShouldRefresh(now))
6 years ago
{
auto path = PickRandomEstablishedPath();
if(path)
{
6 years ago
itr->second.pendingTX = GenTXID();
m_PendingLookups[itr->second.pendingTX] = &itr->second;
itr->second.SendRequestViaPath(path, m_Router);
6 years ago
}
}
}
}
uint64_t
Endpoint::GenTXID()
{
uint64_t txid = rand();
while(m_PendingLookups.find(txid) != m_PendingLookups.end())
++txid;
return txid;
}
6 years ago
std::string
Endpoint::Name() const
{
return m_Name + ":" + m_Identity.pub.Name();
}
bool
Endpoint::HandleGotIntroMessage(const llarp::dht::GotIntroMessage* msg)
{
auto crypto = &m_Router->crypto;
6 years ago
std::set< IntroSet > remote;
for(const auto& introset : msg->I)
{
6 years ago
if(!introset.VerifySignature(crypto))
{
6 years ago
llarp::LogInfo("invalid introset signature for ", introset,
" on endpoint ", Name());
if(m_Identity.pub == introset.A && m_CurrentPublishTX == msg->T)
{
IntroSetPublishFail();
}
6 years ago
return false;
}
if(m_Identity.pub == introset.A && m_CurrentPublishTX == msg->T)
6 years ago
{
llarp::LogInfo(
"got introset publish confirmation for hidden service endpoint ",
6 years ago
Name());
6 years ago
IntroSetPublished();
6 years ago
return true;
}
else
{
6 years ago
remote.insert(introset);
}
}
6 years ago
auto itr = m_PendingLookups.find(msg->T);
if(itr == m_PendingLookups.end())
{
llarp::LogWarn("invalid lookup response for hidden service endpoint ",
Name(), " txid=", msg->T);
return false;
}
bool result = itr->second->HandleResponse(remote);
m_PendingLookups.erase(itr);
return result;
}
bool
Endpoint::Start()
{
auto crypto = &m_Router->crypto;
if(m_Keyfile.size())
{
if(!m_Identity.EnsureKeys(m_Keyfile, crypto))
return false;
}
else
{
m_Identity.RegenerateKeys(crypto);
}
return true;
}
Endpoint::~Endpoint()
{
}
6 years ago
6 years ago
Endpoint::CachedTagResult::~CachedTagResult()
{
}
bool
Endpoint::CachedTagResult::HandleResponse(
6 years ago
const std::set< IntroSet >& introsets)
6 years ago
{
auto now = llarp_time_now_ms();
pendingTX = 0;
6 years ago
for(const auto& introset : introsets)
if(result.insert(introset).second)
lastModified = now;
llarp::LogInfo("Tag result for ", tag.ToString(), " got ",
introsets.size(), " results from lookup, have ",
result.size(), " cached last modified at ", lastModified,
" is ", now - lastModified, "ms old");
6 years ago
return true;
}
6 years ago
void
Endpoint::CachedTagResult::Expire(llarp_time_t now)
{
auto itr = result.begin();
while(itr != result.end())
{
if(itr->HasExpiredIntros(now))
{
llarp::LogInfo("Removing expired tag Entry ", itr->A.Name());
itr = result.erase(itr);
lastModified = now;
6 years ago
}
else
{
++itr;
}
}
}
6 years ago
llarp::routing::IMessage*
Endpoint::CachedTagResult::BuildRequestMessage()
{
llarp::routing::DHTMessage* msg = new llarp::routing::DHTMessage();
msg->M.push_back(new llarp::dht::FindIntroMessage(tag, pendingTX));
lastRequest = llarp_time_now_ms();
6 years ago
return msg;
}
bool
Endpoint::PublishIntroSet(llarp_router* r)
{
auto path = PickRandomEstablishedPath();
if(path)
{
m_CurrentPublishTX = rand();
llarp::routing::DHTMessage msg;
msg.M.push_back(new llarp::dht::PublishIntroMessage(
6 years ago
m_IntroSet, m_CurrentPublishTX, 3));
6 years ago
if(path->SendRoutingMessage(&msg, r))
{
m_LastPublishAttempt = llarp_time_now_ms();
llarp::LogInfo(Name(), " publishing introset");
return true;
}
}
llarp::LogWarn(Name(), " publish introset failed");
return false;
6 years ago
}
void
Endpoint::IntroSetPublishFail()
{
llarp::LogWarn("failed to publish introset for ", Name());
m_CurrentPublishTX = 0;
}
bool
6 years ago
Endpoint::ShouldPublishDescriptors(llarp_time_t now) const
6 years ago
{
6 years ago
if(m_IntroSet.HasExpiredIntros(now))
6 years ago
return m_CurrentPublishTX == 0
&& now - m_LastPublishAttempt >= INTROSET_PUBLISH_RETRY_INTERVAL;
return m_CurrentPublishTX == 0
&& now - m_LastPublish >= INTROSET_PUBLISH_INTERVAL;
}
void
Endpoint::IntroSetPublished()
{
m_CurrentPublishTX = 0;
m_LastPublish = llarp_time_now_ms();
llarp::LogInfo(Name(), " IntroSet publish confirmed");
}
bool
Endpoint::EnsurePathToService(const Address& remote, PathEnsureHook hook,
llarp_time_t timeoutMS)
{
// TODO: implement me
return false;
}
6 years ago
Endpoint::OutboundContext::OutboundContext(Endpoint* parent)
: llarp_pathbuilder_context(parent->m_Router, parent->m_Router->dht, 2)
, m_SendQueue(parent->Name() + "::outbound_queue")
6 years ago
, m_Parent(parent)
{
}
Endpoint::OutboundContext::~OutboundContext()
{
}
bool
Endpoint::OutboundContext::HandleGotIntroMessage(
const llarp::dht::GotIntroMessage* msg)
{
// TODO: implement me
6 years ago
6 years ago
return false;
}
void
Endpoint::OutboundContext::AsyncEncryptAndSendTo(llarp_buffer_t data,
ProtocolType protocol)
{
auto sendto =
std::bind(&OutboundContext::SendMessage, this, std::placeholders::_1);
ProtocolMessage* msg = new ProtocolMessage(protocol);
msg->PutBuffer(data);
if(sequenceNo)
{
AsyncEncrypt(msg, sendto);
}
else
{
AsyncGenIntro(msg, sendto);
}
}
struct AsyncKeyExchange
{
llarp_logic* logic;
llarp_crypto* crypto;
byte_t* sharedKey;
byte_t* remotePubkey;
byte_t* localSeckey;
byte_t* nonce;
ProtocolMessage* msg = nullptr;
std::function< void(ProtocolMessage*) > hook;
AsyncKeyExchange(llarp_logic* l, llarp_crypto* c, byte_t* key,
byte_t* remote, byte_t* localSecret, byte_t* n)
: logic(l)
, crypto(c)
, sharedKey(key)
, remotePubkey(remote)
, localSeckey(localSecret)
, nonce(n)
{
}
static void
Work(void* user)
{
AsyncKeyExchange* self = static_cast< AsyncKeyExchange* >(user);
self->crypto->dh_server(self->sharedKey, self->remotePubkey,
self->localSeckey, self->nonce);
}
};
void
Endpoint::OutboundContext::AsyncGenIntro(
ProtocolMessage* msg, std::function< void(ProtocolMessage*) > result)
{
msg->N.Randomize();
AsyncKeyExchange* ex = new AsyncKeyExchange(
m_Parent->Logic(), m_Parent->Crypto(), sharedKey,
currentIntroSet.A.enckey, m_Parent->GetEncryptionSecretKey(), msg->N);
llarp_threadpool_queue_job(m_Parent->Worker(),
{ex, &AsyncKeyExchange::Work});
}
void
Endpoint::OutboundContext::SendMessage(ProtocolMessage* msg)
{
// TODO: delete msg
// TODO: implement me
}
void
Endpoint::OutboundContext::AsyncEncrypt(
ProtocolMessage* msg, std::function< void(ProtocolMessage*) > result)
{
// TODO: implement me
}
llarp_logic*
Endpoint::Logic()
{
return m_Router->logic;
}
llarp_crypto*
Endpoint::Crypto()
{
return &m_Router->crypto;
}
llarp_threadpool*
Endpoint::Worker()
{
return m_Router->tp;
}
byte_t*
Endpoint::GetEncryptionSecretKey()
{
return m_Identity.enckey;
}
6 years ago
} // namespace service
6 years ago
} // namespace llarp