* more hidden service code

* begin lokinet api
pull/6/head^2
Jeff Becker 6 years ago
parent d38646ed54
commit 19e3b9c642

@ -242,7 +242,6 @@ set(LIB_SRC
llarp/service/address.cpp
llarp/service/context.cpp
llarp/service/endpoint.cpp
llarp/service/frame.cpp
llarp/service/lookup.cpp
llarp/service/protocol.cpp
llarp/service/tag.cpp

@ -0,0 +1 @@
insert lokinet api overview here

@ -47,7 +47,7 @@ namespace llarp
template < typename T, typename GetTime, typename PutTime, typename Compare,
typename Mutex_t = std::mutex,
typename Lock_t = std::unique_lock< Mutex_t >,
typename Lock_t = std::lock_guard< std::mutex >,
llarp_time_t dropMs = 5, llarp_time_t initialIntervalMs = 100 >
struct CoDelQueue
{

@ -18,10 +18,10 @@ struct InboundMessage
}
bool
operator>(const InboundMessage &other) const
operator<(const InboundMessage &other) const
{
// order in ascending order for codel queue
return msgid > other.msgid;
return msgid < other.msgid;
}
llarp_buffer_t
@ -42,7 +42,7 @@ struct InboundMessage
struct OrderCompare
{
bool
operator()(const InboundMessage *left, const InboundMessage *right)
operator()(const InboundMessage *left, const InboundMessage *right) const
{
return left->msgid < right->msgid;
}

@ -114,20 +114,6 @@ struct llarp_link
TickSessions()
{
auto now = llarp_time_now_ms();
std::set< llarp::Addr > remove;
{
lock_t lock(m_sessions_Mutex);
for(auto &itr : m_sessions)
{
llarp_link_session *s = itr.second;
if(s && s->Tick(now))
remove.insert(itr.first);
}
}
for(const auto &addr : remove)
RemoveSessionByAddr(addr);
{
lock_t lock(m_PendingSessions_Mutex);
auto itr = m_PendingSessions.begin();
@ -135,31 +121,49 @@ struct llarp_link
{
if(itr->second->timedout(now))
{
itr->second->done();
delete itr->second;
itr = m_PendingSessions.erase(itr);
}
else
++itr;
}
}
{
lock_t lock(m_sessions_Mutex);
auto itr = m_sessions.begin();
while(itr != m_sessions.end())
{
if(itr->second->Tick(now))
{
itr->second->done();
delete itr->second;
itr = m_sessions.erase(itr);
}
else
++itr;
}
}
}
static bool
sendto(llarp_link *serv, const byte_t *pubkey, llarp_buffer_t buf)
{
// lock_t lock(serv->m_Connected_Mutex);
auto itr = serv->m_Connected.find(pubkey);
if(itr != serv->m_Connected.end())
llarp_link_session *link = nullptr;
{
// lock_t innerlock(serv->m_sessions_Mutex);
auto inner_itr = serv->m_sessions.find(itr->second);
if(inner_itr != serv->m_sessions.end())
lock_t lock(serv->m_Connected_Mutex);
auto itr = serv->m_Connected.find(pubkey);
if(itr != serv->m_Connected.end())
{
llarp_link_session *link = inner_itr->second;
return link->sendto(buf);
lock_t innerlock(serv->m_sessions_Mutex);
auto inner_itr = serv->m_sessions.find(itr->second);
if(inner_itr != serv->m_sessions.end())
{
link = inner_itr->second;
}
}
}
return false;
return link && link->sendto(buf);
}
void
@ -229,15 +233,20 @@ struct llarp_link
void
iterate_sessions(std::function< bool(llarp_link_session *) > visitor)
{
auto now = llarp_time_now_ms();
std::list< llarp_link_session * > slist;
{
lock_t lock(m_sessions_Mutex);
for(const auto &itr : m_sessions)
auto itr = m_sessions.begin();
while(itr != m_sessions.end())
{
slist.push_back(itr.second);
// if not timing out soon add to list to iterate on
if(!itr->second->timedout(now, 11500))
slist.push_back(itr->second);
++itr;
}
}
for(auto s : slist)
for(auto &s : slist)
if(!visitor(s))
return;
}
@ -246,32 +255,31 @@ struct llarp_link
handle_logic_pump(void *user)
{
llarp_link *self = static_cast< llarp_link * >(user);
self->iterate_sessions([](llarp_link_session *s) -> bool {
s->TickLogic();
auto now = llarp_time_now_ms();
self->iterate_sessions([now](llarp_link_session *s) -> bool {
s->TickLogic(now);
return true;
});
self->pumpingLogic = false;
// self->pumpingLogic = false;
}
void
PumpLogic()
{
if(pumpingLogic)
return;
pumpingLogic = true;
// if(pumpingLogic)
// return;
// pumpingLogic = true;
llarp_logic_queue_job(logic, {this, &handle_logic_pump});
}
void
RemoveSessionByAddr(const llarp::Addr &addr)
RemoveSession(llarp_link_session *s)
{
lock_t lock(m_sessions_Mutex);
auto itr = m_sessions.find(addr);
auto itr = m_sessions.find(s->addr);
if(itr != m_sessions.end())
{
llarp::LogDebug("removing session ", addr);
UnmapAddr(addr);
llarp_link_session *s = itr->second;
UnmapAddr(s->addr);
s->done();
m_sessions.erase(itr);
delete s;

@ -86,7 +86,7 @@ struct llarp_link_session
// process inbound and outbound queues (logic thread)
void
TickLogic();
TickLogic(llarp_time_t now);
// this is called from net thread
void

@ -78,9 +78,11 @@ namespace llarp
std::time_t t;
std::time(&t);
std::string tag = fname;
/*
auto pos = tag.rfind('/');
if(pos != std::string::npos)
tag = tag.substr(pos + 1);
*/
ss << std::put_time(std::localtime(&t), "%F %T") << " " << tag;
auto sz = tag.size() % 8;
while(sz--)

@ -1,16 +0,0 @@
#ifndef LLARP_MESSAGES_HIDDEN_SERIVCE_HPP
#define LLARP_MESSAGES_HIDDEN_SERIVCE_HPP
#include <llarp/routing/message.hpp>
namespace llarp
{
namespace routing
{
struct HiddenServiceFrame : public IMessage
{
};
} // namespace routing
} // namespace llarp
#endif

@ -259,6 +259,9 @@ namespace llarp
const PathID_t&
TXID() const;
RouterID
Endpoint() const;
const PathID_t&
RXID() const;

@ -4,7 +4,7 @@
#include <llarp/buffer.h>
#include <llarp/router.h>
#include <llarp/dht.hpp>
#include <llarp/messages/hidden_service.hpp>
#include <llarp/messages/path_confirm.hpp>
#include <llarp/messages/path_latency.hpp>
#include <llarp/messages/path_transfer.hpp>
@ -21,7 +21,7 @@ namespace llarp
llarp_router *r) = 0;
virtual bool
HandleHiddenServiceFrame(const HiddenServiceFrame *msg)
HandleHiddenServiceFrame(const service::ProtocolFrame *msg)
{
return false;
}

@ -1,7 +1,6 @@
#ifndef LLARP_SERVICE_ENDPOINT_HPP
#define LLARP_SERVICE_ENDPOINT_HPP
#include <llarp/codel.hpp>
#include <llarp/messages/hidden_service.hpp>
#include <llarp/pathbuilder.hpp>
#include <llarp/service/Identity.hpp>
#include <llarp/service/protocol.hpp>
@ -58,7 +57,7 @@ namespace llarp
HandleGotIntroMessage(const llarp::dht::GotIntroMessage* msg);
bool
HandleHiddenServiceFrame(const llarp::routing::HiddenServiceFrame* msg);
HandleHiddenServiceFrame(const llarp::service::ProtocolFrame* msg);
/// return true if we have an established path to a hidden service
bool
@ -90,6 +89,11 @@ namespace llarp
void
ShiftIntroduction();
/// tick internal state
/// return true to remove otherwise don't remove
bool
Tick(llarp_time_t now);
/// encrypt asynchronously and send to remote endpoint from us
void
AsyncEncryptAndSendTo(llarp_buffer_t D, ProtocolType protocol);

@ -1,28 +0,0 @@
#ifndef LLARP_SERVICE_FRAME_HPP
#define LLARP_SERVICE_FRAME_HPP
#include <llarp/bencode.hpp>
#include <llarp/crypto.hpp>
#include <llarp/encrypted.hpp>
namespace llarp
{
namespace service
{
struct DataFrame : public llarp::IBEncodeMessage
{
llarp::Encrypted D;
llarp::PubKey H;
llarp::KeyExchangeNonce N;
uint64_t S = 0;
llarp::Signature Z;
bool
BEncode(llarp_buffer_t* buf) const;
bool
DecodeKey(llarp_buffer_t key, llarp_buffer_t* val);
};
} // namespace service
} // namespace llarp
#endif

@ -4,6 +4,7 @@
#include <llarp/bencode.hpp>
#include <llarp/crypto.hpp>
#include <llarp/encrypted.hpp>
#include <llarp/routing/message.hpp>
#include <llarp/service/Info.hpp>
#include <llarp/service/Intro.hpp>
#include <vector>
@ -41,7 +42,7 @@ namespace llarp
};
/// outer message
struct ProtocolFrame : public llarp::IBEncodeMessage
struct ProtocolFrame : public llarp::routing::IMessage
{
llarp::Encrypted D;
uint64_t S = 0;
@ -67,6 +68,9 @@ namespace llarp
bool
Verify(llarp_crypto* c, byte_t* signingkey);
bool
HandleMessage(llarp::routing::IMessageHandler* h, llarp_router* r) const;
};
} // namespace service
} // namespace llarp

@ -0,0 +1,60 @@
#ifndef LOKINET_LOKINET_HPP
#define LOKINET_LOKINET_HPP
#include <cstdint>
#include <llarp/bencode.hpp>
#include <llarp/router_id.hpp>
#include <memory>
#include <vector>
namespace lokinet
{
struct API_PImpl;
struct RPCMessage : public llarp::IBEncodeMessage
{
};
/// a persisting anonymized session to a serive node
struct Handle
{
llarp::RouterID remote;
bool
Send(const RPCMessage* msg);
RPCMessage*
Recv();
typedef std::shared_ptr< Handle > Ptr;
private:
API_PImpl* m_Impl;
};
struct Client
{
void
Run();
};
struct IMessageHandler
{
virtual RPCMessage*
HandleMessage(const RPCMessage* inmsg) = 0;
};
struct Server_PImpl;
struct Server
{
Server(const uint8_t* secretkey, IMessageHandler* h);
~Server();
void
Run();
private:
Server_PImpl* m_Impl;
};
} // namespace lokinet
#endif

@ -21,18 +21,15 @@ frame_state::process_inbound_queue()
InboundMessage::OrderCompare >
q;
recvqueue.Process(q);
bool increment = false;
while(q.size())
{
// TODO: is this right?
auto &front = q.top();
// the items are already sorted anyways so this doesn't really do much
if(front->msgid < nextMsgID && nextMsgID - front->msgid > 1)
{
// re queue
// re-queue because of an ordering gap
recvqueue.Put(front);
nextMsgID = front->msgid;
}
else
{
@ -373,7 +370,7 @@ frame_state::queue_tx(uint64_t id, transit_message *msg)
{
tx.insert(std::make_pair(id, msg));
msg->generate_xmit(sendqueue, txflags);
msg->retransmit_frags(sendqueue, txflags);
// msg->retransmit_frags(sendqueue, txflags);
}
void

@ -246,7 +246,7 @@ handle_verify_introack(iwp_async_introack *introack)
// invalid signature
llarp::LogError("introack verify failed from ", link->addr);
link->serv->remove_intro_from(link->addr);
link->serv->RemoveSessionByAddr(link->addr);
link->serv->RemoveSession(link);
return;
}
link->EnterState(llarp_link_session::eIntroAckRecv);
@ -379,7 +379,7 @@ llarp_link_session::on_intro_ack(const void *buf, size_t sz)
{
// too big?
llarp::LogError("introack too big");
serv->RemoveSessionByAddr(addr);
serv->RemoveSession(this);
return;
}
serv->put_intro_from(this);
@ -412,7 +412,7 @@ llarp_link_session::get_parent()
}
void
llarp_link_session::TickLogic()
llarp_link_session::TickLogic(llarp_time_t now)
{
std::queue< iwp_async_frame * > q;
decryptedFrames.Process(q);
@ -489,8 +489,7 @@ handle_verify_session_start(iwp_async_session_start *s)
// verify fail
// TODO: remove session?
llarp::LogWarn("session start verify failed from ", self->addr);
self->serv->RemoveSessionByAddr(self->addr);
self->serv->RemoveSession(self);
return;
}
self->send_LIM();

@ -67,19 +67,21 @@ transit_message::should_send_ack(llarp_time_t now) const
{
if(msginfo.numfrags() == 0)
return true;
return now - lastRetransmit > 250;
if(status.count() == 0)
return true;
return now - lastRetransmit > 100;
}
bool
transit_message::should_resend_xmit(llarp_time_t now) const
{
return lastAck == 0 && now - started > 1000;
return lastAck == 0 && now - started > 500;
}
bool
transit_message::should_resend_frags(llarp_time_t now) const
{
return now - lastAck > 1000 && !completed();
return lastAck > 0 && now - lastAck > 250 && !completed();
}
bool

@ -304,6 +304,12 @@ namespace llarp
m_BuiltHook = func;
}
RouterID
Path::Endpoint() const
{
return hops[hops.size() - 1].router.pubkey;
}
const PathID_t&
Path::TXID() const
{
@ -331,6 +337,8 @@ namespace llarp
void
Path::Tick(llarp_time_t now, llarp_router* r)
{
if(Expired(now))
return;
auto dlt = now - m_LastLatencyTestTime;
if(dlt > 5000 && m_LastLatencyTestID == 0)
{
@ -404,6 +412,8 @@ namespace llarp
// make nonce
TunnelNonce N;
N.Randomize();
llarp::LogInfo("send ", buf.sz, " bytes via ", TXID(), " on ", Upstream(),
" to ", Endpoint());
return HandleUpstream(buf, N, r);
}
@ -446,14 +456,19 @@ namespace llarp
Path::HandlePathLatencyMessage(
const llarp::routing::PathLatencyMessage* msg, llarp_router* r)
{
if(msg->L == m_LastLatencyTestID)
if(msg->L == m_LastLatencyTestID && status == ePathEstablished)
{
intro.latency = llarp_time_now_ms() - m_LastLatencyTestTime;
llarp::LogInfo("path latency is ", intro.latency, " ms for tx=", TXID(),
" rx=", RXID());
m_LastLatencyTestID = 0;
return true;
}
else
{
llarp::LogWarn("unwarrented path latency message via ", Upstream());
return false;
}
return true;
}
bool

@ -51,8 +51,11 @@ namespace llarp
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
{
if(itr->first.first == id)
return itr->second;
if(itr->second->IsReady())
{
if(itr->second->Endpoint() == id)
return itr->second;
}
++itr;
}
return nullptr;

@ -61,7 +61,7 @@ namespace llarp
PathTransferMessage::HandleMessage(IMessageHandler* h,
llarp_router* r) const
{
auto path = r->paths.GetByDownstream(r->pubkey(), P);
auto path = r->paths.GetByUpstream(r->pubkey(), P);
if(!path)
{
llarp::LogWarn("No such path for path transfer pathid=", P);
@ -84,6 +84,7 @@ namespace llarp
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
// send
llarp::LogInfo("Transfer ", buf.sz, " bytes", " to ", P);
return path->HandleDownstream(buf, Y, r);
}

@ -91,6 +91,21 @@ namespace llarp
llarp::LogWarn("failed to publish intro set for endpoint ", Name());
}
}
// expire pending tx
{
auto itr = m_PendingLookups.begin();
while(itr != m_PendingLookups.end())
{
if(itr->second->IsTimedOut(now))
{
itr->second->HandleResponse({});
itr = m_PendingLookups.erase(itr);
}
else
++itr;
}
}
// prefetch tags
for(const auto& tag : m_PrefetchTags)
{
@ -122,6 +137,21 @@ namespace llarp
}
}
}
// tick remote sessions
{
auto itr = m_RemoteSessions.begin();
while(itr != m_RemoteSessions.end())
{
if(itr->second->Tick(now))
{
delete itr->second;
itr = m_RemoteSessions.erase(itr);
}
else
++itr;
}
}
}
uint64_t
@ -306,6 +336,7 @@ namespace llarp
uint64_t tx)
: endpoint(parent), remote(addr), txid(tx)
{
llarp::LogInfo("New hidden service lookup for ", addr.ToString());
}
bool
@ -313,8 +344,15 @@ namespace llarp
{
if(results.size() == 1)
{
llarp::LogInfo("hidden service lookup for ", remote.ToString(),
" success");
endpoint->PutNewOutboundContext(*results.begin());
}
else
{
llarp::LogInfo("no response in hidden service lookup for ",
remote.ToString());
}
delete this;
return true;
}
@ -355,6 +393,12 @@ namespace llarp
Endpoint::EnsurePathToService(const Address& remote, PathEnsureHook hook,
llarp_time_t timeoutMS)
{
auto path = PickRandomEstablishedPath();
if(!path)
{
llarp::LogWarn("No outbound path for lookup yet");
return false;
}
{
auto itr = m_RemoteSessions.find(remote);
if(itr != m_RemoteSessions.end())
@ -369,11 +413,15 @@ namespace llarp
// duplicate
return false;
}
llarp::LogInfo(Name(), " Ensure Path to ", remote.ToString());
m_PendingServiceLookups.insert(std::make_pair(remote, hook));
HiddenServiceAddressLookup* job =
new HiddenServiceAddressLookup(this, remote, GenTXID());
m_PendingLookups.insert(std::make_pair(job->txid, job));
return true;
return job->SendRequestViaPath(path, Router());
}
Endpoint::OutboundContext::OutboundContext(const IntroSet& intro,
@ -385,6 +433,7 @@ namespace llarp
{
selectedIntro.Clear();
ShiftIntroduction();
}
Endpoint::OutboundContext::~OutboundContext()
@ -506,6 +555,15 @@ namespace llarp
Endpoint::OutboundContext::Send(ProtocolFrame& msg)
{
// in this context we assume the message contents are encrypted
auto now = llarp_time_now_ms();
if(currentIntroSet.HasExpiredIntros(now))
{
UpdateIntroSet();
}
if(selectedIntro.expiresAt <= now || now - selectedIntro.expiresAt > 1000)
{
ShiftIntroduction();
}
auto path = GetPathByRouter(selectedIntro.router);
if(path)
{
@ -513,8 +571,45 @@ namespace llarp
transfer.T = &msg;
transfer.Y.Randomize();
transfer.P = selectedIntro.pathID;
llarp::LogInfo("sending frame via ", path->Upstream(), " to ",
path->Endpoint());
path->SendRoutingMessage(&transfer, m_Parent->Router());
}
else
{
llarp::LogWarn("No path to ", selectedIntro.router);
}
}
void
Endpoint::OutboundContext::UpdateIntroSet()
{
auto path = PickRandomEstablishedPath();
if(path)
{
uint64_t txid = llarp_randint();
routing::DHTMessage msg;
msg.M.push_back(
new llarp::dht::FindIntroMessage(currentIntroSet.A.Addr(), txid));
path->SendRoutingMessage(&msg, m_Parent->Router());
}
else
{
llarp::LogWarn(
"Cannot update introset no path for outbound session to ",
currentIntroSet.A.Addr().ToString());
}
}
bool
Endpoint::OutboundContext::Tick(llarp_time_t now)
{
if(selectedIntro.expiresAt >= now || selectedIntro.expiresAt - now < 5000)
{
UpdateIntroSet();
}
// TODO: check for expiration
return false;
}
bool
@ -522,20 +617,10 @@ namespace llarp
llarp_rc* cur, size_t hop)
{
// TODO: don't hard code
llarp::LogInfo("Select hop ", hop);
if(hop == 3)
{
llarp_time_t lowest = 0xFFFFFFFFFFFFFFFFUL;
Introduction chosen;
// pick intro set with lowest latency
for(const auto& intro : currentIntroSet.I)
{
if(intro.latency < lowest)
{
chosen = intro;
lowest = intro.latency;
}
}
auto localcopy = llarp_nodedb_get_rc(db, chosen.router);
auto localcopy = llarp_nodedb_get_rc(db, selectedIntro.router);
if(localcopy)
{
llarp_rc_copy(cur, localcopy);
@ -546,7 +631,7 @@ namespace llarp
// we don't have it?
llarp::LogError(
"cannot build aligned path, don't have router for introduction ",
chosen);
selectedIntro);
return false;
}
}

@ -1,8 +0,0 @@
#include <llarp/service/frame.hpp>
namespace llarp
{
namespace service
{
}
} // namespace llarp

@ -1,3 +1,4 @@
#include <llarp/routing/handler.hpp>
#include <llarp/service/protocol.hpp>
#include "buffer.hpp"
@ -132,5 +133,13 @@ namespace llarp
return result;
}
bool
ProtocolFrame::HandleMessage(llarp::routing::IMessageHandler* h,
llarp_router* r) const
{
llarp::LogInfo("Got hidden service frame");
return h->HandleHiddenServiceFrame(this);
}
} // namespace service
} // namespace llarp

@ -59,6 +59,8 @@ namespace llarp
// rewind
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
llarp::LogInfo("Send ", buf.sz,
" bytes routing message from trasnit hop");
return HandleDownstream(buf, N, r);
}

Loading…
Cancel
Save