Refactor endpoint state management to a new class

pull/706/head
Michael 5 years ago
parent 0a7021d827
commit e52492911d
No known key found for this signature in database
GPG Key ID: 2D51757B47E2434C

@ -26,7 +26,7 @@ namespace llarp
struct RouterContact;
namespace thread
{
struct ThreadPool;
class ThreadPool;
}
namespace metrics

@ -214,6 +214,7 @@ set(LIB_SRC
service/async_key_exchange.cpp
service/config.cpp
service/context.cpp
service/endpoint_state.cpp
service/endpoint_util.cpp
service/endpoint.cpp
service/handler.cpp
@ -226,6 +227,7 @@ set(LIB_SRC
service/outbound_context.cpp
service/pendingbuffer.cpp
service/protocol.cpp
service/router_lookup_job.cpp
service/sendcontext.cpp
service/session.cpp
service/tag_lookup_job.cpp

@ -552,7 +552,7 @@ namespace llarp
}
if(m_Exit)
{
for(const auto &snode : m_SnodeBlacklist)
for(const auto &snode : SnodeBlacklist())
m_Exit->BlacklistSnode(snode);
}
return SetupNetworking();

@ -96,9 +96,9 @@ namespace llarp
template < typename H >
friend H
AbslHashValue(H h, const huint_t< UInt_t >& i)
AbslHashValue(H hash, const huint_t< UInt_t >& i)
{
return H::combine(std::move(h), i.h);
return H::combine(std::move(hash), i.h);
}
using V6Container = std::vector< uint8_t >;

@ -5,6 +5,7 @@
#include <util/status.hpp>
#include <atomic>
#include <set>
namespace llarp
{

File diff suppressed because it is too large Load Diff

@ -32,6 +32,7 @@ namespace llarp
{
struct AsyncKeyExchange;
struct Context;
struct EndpointState;
struct OutboundContext;
struct IConvoEventListener
@ -133,10 +134,7 @@ namespace llarp
CryptoWorker();
AbstractRouter*
Router()
{
return m_Router;
}
Router();
virtual bool
LoadKeyFile();
@ -349,6 +347,9 @@ namespace llarp
uint64_t
GenTXID();
const std::set< RouterID >&
SnodeBlacklist() const;
protected:
bool
SendToServiceOrQueue(const service::Address& addr,
@ -413,9 +414,6 @@ namespace llarp
return false;
}
public:
std::set< RouterID > m_SnodeBlacklist;
protected:
IDataHandler* m_DataHandler = nullptr;
Identity m_Identity;
@ -427,119 +425,16 @@ namespace llarp
private:
friend struct EndpointUtil;
struct RouterLookupJob
{
RouterLookupJob(Endpoint* p, RouterLookupHandler h) : handler(h)
{
started = p->Now();
txid = p->GenTXID();
}
RouterLookupHandler handler;
uint64_t txid;
llarp_time_t started;
bool
IsExpired(llarp_time_t now) const
{
if(now < started)
return false;
return now - started > 30000;
}
void
InformResult(std::vector< RouterContact > result)
{
if(handler)
handler(result);
}
};
using Msg_ptr = std::shared_ptr< const routing::PathTransferMessage >;
using SendEvent_t = std::pair< Msg_ptr, path::Path_ptr >;
using PendingTraffic =
std::unordered_map< Address, PendingBufferQueue, Address::Hash >;
using ProtocolMessagePtr = std::shared_ptr< ProtocolMessage >;
using RecvPacketQueue_t =
std::priority_queue< ProtocolMessagePtr,
std::vector< ProtocolMessagePtr >,
ComparePtr< ProtocolMessagePtr > >;
util::Mutex m_InboundTrafficQueueMutex;
/// ordered queue for inbound hidden service traffic
RecvPacketQueue_t m_InboundTrafficQueue
GUARDED_BY(m_InboundTrafficQueueMutex);
using PendingRouters =
std::unordered_map< RouterID, RouterLookupJob, RouterID::Hash >;
using PendingLookups =
std::unordered_map< uint64_t,
std::unique_ptr< service::IServiceLookup > >;
using Sessions =
std::unordered_multimap< Address, std::shared_ptr< OutboundContext >,
Address::Hash >;
using SNodeSessionValue =
std::pair< std::shared_ptr< exit::BaseSession >, ConvoTag >;
using SNodeSessions =
std::unordered_multimap< RouterID, SNodeSessionValue,
RouterID::Hash >;
// clang-format off
const IntroSet& introSet() const;
IntroSet& introSet();
using ConvoMap = std::unordered_map< ConvoTag, Session, ConvoTag::Hash >;
const ConvoMap& Sessions() const;
ConvoMap& Sessions();
// clang-format on
AbstractRouter* m_Router;
std::shared_ptr< Logic > m_IsolatedLogic = nullptr;
llarp_ev_loop_ptr m_IsolatedNetLoop = nullptr;
std::string m_Keyfile;
std::string m_Name;
std::string m_NetNS;
bool m_BundleRC = false;
util::Mutex m_SendQueueMutex;
std::deque< SendEvent_t > m_SendQueue GUARDED_BY(m_SendQueueMutex);
PendingTraffic m_PendingTraffic;
Sessions m_RemoteSessions;
Sessions m_DeadSessions;
std::set< ConvoTag > m_InboundConvos;
SNodeSessions m_SNodeSessions;
std::unordered_multimap< Address, PathEnsureHook, Address::Hash >
m_PendingServiceLookups;
std::unordered_map< RouterID, uint32_t, RouterID::Hash >
m_ServiceLookupFails;
PendingRouters m_PendingRouters;
uint64_t m_CurrentPublishTX = 0;
llarp_time_t m_LastPublish = 0;
llarp_time_t m_LastPublishAttempt = 0;
llarp_time_t m_MinPathLatency = (5 * 1000);
/// our introset
service::IntroSet m_IntroSet;
/// pending remote service lookups by id
PendingLookups m_PendingLookups;
/// prefetch remote address list
std::set< Address > m_PrefetchAddrs;
/// hidden service tag
Tag m_Tag;
/// prefetch descriptors for these hidden service tags
std::set< Tag > m_PrefetchTags;
/// on initialize functions
std::list< std::function< bool(void) > > m_OnInit;
/// conversations
ConvoMap m_Sessions;
std::unordered_map< Tag, CachedTagResult, Tag::Hash > m_PrefetchedTags;
std::unique_ptr< EndpointState > m_state;
};
using Endpoint_ptr = std::shared_ptr< Endpoint >;

@ -0,0 +1,119 @@
#include <service/endpoint_state.hpp>
#include <exit/session.hpp>
#include <hook/shell.hpp>
#include <service/outbound_context.hpp>
#include <util/str.hpp>
namespace llarp
{
namespace service
{
bool
EndpointState::SetOption(const std::string& k, const std::string& v,
const std::string& name)
{
if(k == "keyfile")
{
m_Keyfile = v;
}
if(k == "tag")
{
m_Tag = v;
LogInfo("Setting tag to ", v);
}
if(k == "prefetch-tag")
{
m_PrefetchTags.insert(v);
}
if(k == "prefetch-addr")
{
Address addr;
if(addr.FromString(v))
m_PrefetchAddrs.insert(addr);
}
if(k == "min-latency")
{
auto val = atoi(v.c_str());
if(val > 0)
m_MinPathLatency = val;
}
if(k == "bundle-rc")
{
m_BundleRC = IsTrueValue(v.c_str());
}
if(k == "blacklist-snode")
{
RouterID snode;
if(!snode.FromString(v))
{
LogError(name, " invalid snode value: ", v);
return false;
}
const auto result = m_SnodeBlacklist.insert(snode);
if(!result.second)
{
LogError(name, " duplicate blacklist-snode: ", snode.ToString());
return false;
}
LogInfo(name, " adding ", snode.ToString(), " to blacklist");
}
if(k == "on-up")
{
m_OnUp = hooks::ExecShellBackend(v);
if(m_OnUp)
LogInfo(name, " added on up script: ", v);
else
LogError(name, " failed to add on up script");
}
if(k == "on-down")
{
m_OnDown = hooks::ExecShellBackend(v);
if(m_OnDown)
LogInfo(name, " added on down script: ", v);
else
LogError(name, " failed to add on down script");
}
if(k == "on-ready")
{
m_OnReady = hooks::ExecShellBackend(v);
if(m_OnReady)
LogInfo(name, " added on ready script: ", v);
else
LogError(name, " failed to add on ready script");
}
return true;
}
util::StatusObject
EndpointState::ExtractStatus(util::StatusObject& obj) const
{
obj.Put("lastPublished", m_LastPublish);
obj.Put("lastPublishAttempt", m_LastPublishAttempt);
obj.Put("introset", m_IntroSet.ExtractStatus());
if(!m_Tag.IsZero())
obj.Put("tag", m_Tag.ToString());
static auto getSecond = [](const auto& item) -> const auto&
{
return item.second;
};
obj.PutContainer("deadSessions", m_DeadSessions, getSecond);
obj.PutContainer("remoteSessions", m_RemoteSessions, getSecond);
obj.PutContainer("lookups", m_PendingLookups, getSecond);
obj.PutContainer("snodeSessions", m_SNodeSessions,
[](const auto& item) { return item.second.first; });
util::StatusObject sessionObj{};
for(const auto& item : m_Sessions)
{
std::string k = item.first.ToHex();
sessionObj.Put(k, item.second.ExtractStatus());
}
obj.Put("converstations", sessionObj);
return obj;
}
} // namespace service
} // namespace llarp

@ -0,0 +1,109 @@
#ifndef LLARP_SERVICE_ENDPOINT_STATE_HPP
#define LLARP_SERVICE_ENDPOINT_STATE_HPP
#include <hook/ihook.hpp>
#include <router_id.hpp>
#include <service/address.hpp>
#include <service/pendingbuffer.hpp>
#include <service/router_lookup_job.hpp>
#include <service/session.hpp>
#include <service/tag_lookup_job.hpp>
#include <service/endpoint_types.hpp>
#include <util/compare_ptr.hpp>
#include <util/status.hpp>
#include <memory>
#include <queue>
#include <set>
#include <unordered_map>
struct llarp_ev_loop;
using llarp_ev_loop_ptr = std::shared_ptr< llarp_ev_loop >;
namespace llarp
{
// clang-format off
namespace exit { struct BaseSession; }
namespace path { struct Path; using Path_ptr = std::shared_ptr< Path >; }
namespace routing { struct PathTransferMessage; }
// clang-format on
namespace service
{
struct IServiceLookup;
struct OutboundContext;
struct EndpointState
{
hooks::Backend_ptr m_OnUp;
hooks::Backend_ptr m_OnDown;
hooks::Backend_ptr m_OnReady;
util::Mutex m_InboundTrafficQueueMutex;
/// ordered queue for inbound hidden service traffic
RecvPacketQueue_t m_InboundTrafficQueue
GUARDED_BY(m_InboundTrafficQueueMutex);
std::set< RouterID > m_SnodeBlacklist;
AbstractRouter* m_Router;
std::shared_ptr< Logic > m_IsolatedLogic = nullptr;
llarp_ev_loop_ptr m_IsolatedNetLoop = nullptr;
std::string m_Keyfile;
std::string m_Name;
std::string m_NetNS;
bool m_BundleRC = false;
util::Mutex m_SendQueueMutex;
std::deque< SendEvent_t > m_SendQueue GUARDED_BY(m_SendQueueMutex);
PendingTraffic m_PendingTraffic;
Sessions m_RemoteSessions;
Sessions m_DeadSessions;
std::set< ConvoTag > m_InboundConvos;
SNodeSessions m_SNodeSessions;
std::unordered_multimap< Address, PathEnsureHook, Address::Hash >
m_PendingServiceLookups;
std::unordered_map< RouterID, uint32_t, RouterID::Hash >
m_ServiceLookupFails;
PendingRouters m_PendingRouters;
uint64_t m_CurrentPublishTX = 0;
llarp_time_t m_LastPublish = 0;
llarp_time_t m_LastPublishAttempt = 0;
llarp_time_t m_MinPathLatency = (5 * 1000);
/// our introset
IntroSet m_IntroSet;
/// pending remote service lookups by id
PendingLookups m_PendingLookups;
/// prefetch remote address list
std::set< Address > m_PrefetchAddrs;
/// hidden service tag
Tag m_Tag;
/// prefetch descriptors for these hidden service tags
std::set< Tag > m_PrefetchTags;
/// on initialize functions
std::list< std::function< bool(void) > > m_OnInit;
/// conversations
ConvoMap m_Sessions;
std::unordered_map< Tag, CachedTagResult, Tag::Hash > m_PrefetchedTags;
bool
SetOption(const std::string& k, const std::string& v,
const std::string& name);
util::StatusObject
ExtractStatus(util::StatusObject& obj) const;
};
} // namespace service
} // namespace llarp
#endif

@ -0,0 +1,62 @@
#ifndef LLARP_SERVICE_ENDPOINT_TYPES_HPP
#define LLARP_SERVICE_ENDPOINT_TYPES_HPP
#include <service/pendingbuffer.hpp>
#include <service/router_lookup_job.hpp>
#include <service/session.hpp>
#include <util/compare_ptr.hpp>
#include <deque>
#include <memory>
#include <queue>
#include <unordered_map>
namespace llarp
{
// clang-format off
namespace exit { struct BaseSession; }
namespace path { struct Path; using Path_ptr = std::shared_ptr< Path >; }
namespace routing { struct PathTransferMessage; }
// clang-format on
namespace service
{
struct IServiceLookup;
struct OutboundContext;
using Msg_ptr = std::shared_ptr< const routing::PathTransferMessage >;
using SendEvent_t = std::pair< Msg_ptr, path::Path_ptr >;
using PendingBufferQueue = std::deque< PendingBuffer >;
using PendingTraffic =
std::unordered_map< Address, PendingBufferQueue, Address::Hash >;
using ProtocolMessagePtr = std::shared_ptr< ProtocolMessage >;
using RecvPacketQueue_t =
std::priority_queue< ProtocolMessagePtr,
std::vector< ProtocolMessagePtr >,
ComparePtr< ProtocolMessagePtr > >;
using PendingRouters =
std::unordered_map< RouterID, RouterLookupJob, RouterID::Hash >;
using PendingLookups =
std::unordered_map< uint64_t, std::unique_ptr< IServiceLookup > >;
using Sessions =
std::unordered_multimap< Address, std::shared_ptr< OutboundContext >,
Address::Hash >;
using SNodeSessionValue =
std::pair< std::shared_ptr< exit::BaseSession >, ConvoTag >;
using SNodeSessions =
std::unordered_multimap< RouterID, SNodeSessionValue, RouterID::Hash >;
using ConvoMap = std::unordered_map< ConvoTag, Session, ConvoTag::Hash >;
using PathEnsureHook = std::function< void(Address, OutboundContext*) >;
} // namespace service
} // namespace llarp
#endif

@ -1,6 +1,8 @@
#include <service/endpoint_util.hpp>
#include <exit/session.hpp>
#include <service/outbound_context.hpp>
#include <service/lookup.hpp>
#include <util/logger.hpp>
namespace llarp
@ -8,8 +10,7 @@ namespace llarp
namespace service
{
void
EndpointUtil::ExpireSNodeSessions(llarp_time_t now,
Endpoint::SNodeSessions& sessions)
EndpointUtil::ExpireSNodeSessions(llarp_time_t now, SNodeSessions& sessions)
{
auto itr = sessions.begin();
while(itr != sessions.end())
@ -34,8 +35,7 @@ namespace llarp
}
void
EndpointUtil::ExpirePendingTx(llarp_time_t now,
Endpoint::PendingLookups& lookups)
EndpointUtil::ExpirePendingTx(llarp_time_t now, PendingLookups& lookups)
{
for(auto itr = lookups.begin(); itr != lookups.end();)
{
@ -54,7 +54,7 @@ namespace llarp
void
EndpointUtil::ExpirePendingRouterLookups(llarp_time_t now,
Endpoint::PendingRouters& routers)
PendingRouters& routers)
{
for(auto itr = routers.begin(); itr != routers.end();)
{
@ -70,8 +70,7 @@ namespace llarp
}
void
EndpointUtil::DeregisterDeadSessions(llarp_time_t now,
Endpoint::Sessions& sessions)
EndpointUtil::DeregisterDeadSessions(llarp_time_t now, Sessions& sessions)
{
auto itr = sessions.begin();
while(itr != sessions.end())
@ -88,9 +87,8 @@ namespace llarp
}
void
EndpointUtil::TickRemoteSessions(llarp_time_t now,
Endpoint::Sessions& remoteSessions,
Endpoint::Sessions& deadSessions)
EndpointUtil::TickRemoteSessions(llarp_time_t now, Sessions& remoteSessions,
Sessions& deadSessions)
{
auto itr = remoteSessions.begin();
while(itr != remoteSessions.end())
@ -110,8 +108,7 @@ namespace llarp
}
void
EndpointUtil::ExpireConvoSessions(llarp_time_t now,
Endpoint::ConvoMap& sessions)
EndpointUtil::ExpireConvoSessions(llarp_time_t now, ConvoMap& sessions)
{
auto itr = sessions.begin();
while(itr != sessions.end())
@ -126,7 +123,7 @@ namespace llarp
}
void
EndpointUtil::StopRemoteSessions(Endpoint::Sessions& remoteSessions)
EndpointUtil::StopRemoteSessions(Sessions& remoteSessions)
{
for(auto& item : remoteSessions)
{
@ -135,7 +132,7 @@ namespace llarp
}
void
EndpointUtil::StopSnodeSessions(Endpoint::SNodeSessions& sessions)
EndpointUtil::StopSnodeSessions(SNodeSessions& sessions)
{
for(auto& item : sessions)
{
@ -145,7 +142,7 @@ namespace llarp
bool
EndpointUtil::HasPathToService(const Address& addr,
const Endpoint::Sessions& remoteSessions)
const Sessions& remoteSessions)
{
auto range = remoteSessions.equal_range(addr);
auto itr = range.first;
@ -159,7 +156,7 @@ namespace llarp
}
bool
EndpointUtil::GetConvoTagsForService(const Endpoint::ConvoMap& sessions,
EndpointUtil::GetConvoTagsForService(const ConvoMap& sessions,
const Address& info,
std::set< ConvoTag >& tags)
{

@ -1,7 +1,7 @@
#ifndef LLARP_SERVICE_ENDPOINT_UTIL_HPP
#define LLARP_SERVICE_ENDPOINT_UTIL_HPP
#include <service/endpoint.hpp>
#include <service/endpoint_types.hpp>
namespace llarp
{
@ -10,38 +10,36 @@ namespace llarp
struct EndpointUtil
{
static void
ExpireSNodeSessions(llarp_time_t now, Endpoint::SNodeSessions& sessions);
ExpireSNodeSessions(llarp_time_t now, SNodeSessions& sessions);
static void
ExpirePendingTx(llarp_time_t now, Endpoint::PendingLookups& lookups);
ExpirePendingTx(llarp_time_t now, PendingLookups& lookups);
static void
ExpirePendingRouterLookups(llarp_time_t now,
Endpoint::PendingRouters& routers);
ExpirePendingRouterLookups(llarp_time_t now, PendingRouters& routers);
static void
DeregisterDeadSessions(llarp_time_t now, Endpoint::Sessions& sessions);
DeregisterDeadSessions(llarp_time_t now, Sessions& sessions);
static void
TickRemoteSessions(llarp_time_t now, Endpoint::Sessions& remoteSessions,
Endpoint::Sessions& deadSessions);
TickRemoteSessions(llarp_time_t now, Sessions& remoteSessions,
Sessions& deadSessions);
static void
ExpireConvoSessions(llarp_time_t now, Endpoint::ConvoMap& sessions);
ExpireConvoSessions(llarp_time_t now, ConvoMap& sessions);
static void
StopRemoteSessions(Endpoint::Sessions& remoteSessions);
StopRemoteSessions(Sessions& remoteSessions);
static void
StopSnodeSessions(Endpoint::SNodeSessions& sessions);
StopSnodeSessions(SNodeSessions& sessions);
static bool
HasPathToService(const Address& addr,
const Endpoint::Sessions& remoteSessions);
HasPathToService(const Address& addr, const Sessions& remoteSessions);
static bool
GetConvoTagsForService(const Endpoint::ConvoMap& sessions,
const Address& addr, std::set< ConvoTag >& tags);
GetConvoTagsForService(const ConvoMap& sessions, const Address& addr,
std::set< ConvoTag >& tags);
};
} // namespace service

@ -5,7 +5,9 @@
#include <path/path.hpp>
#include <service/intro_set.hpp>
#include <util/aligned.hpp>
#include <memory>
#include <set>
namespace llarp
{

@ -303,7 +303,7 @@ namespace llarp
}
std::set< RouterID > exclude = prev;
exclude.insert(m_NextIntro.router);
for(const auto& snode : m_Endpoint->m_SnodeBlacklist)
for(const auto& snode : m_Endpoint->SnodeBlacklist())
exclude.insert(snode);
if(hop == numHops - 1)
{
@ -384,7 +384,7 @@ namespace llarp
{
if(intro.ExpiresSoon(now))
continue;
if(m_Endpoint->m_SnodeBlacklist.count(intro.router))
if(m_Endpoint->SnodeBlacklist().count(intro.router))
continue;
if(m_BadIntros.find(intro) == m_BadIntros.end()
&& remoteIntro.router == intro.router)
@ -402,7 +402,7 @@ namespace llarp
/// pick newer intro not on same router
for(const auto& intro : currentIntroSet.I)
{
if(m_Endpoint->m_SnodeBlacklist.count(intro.router))
if(m_Endpoint->SnodeBlacklist().count(intro.router))
continue;
m_Endpoint->EnsureRouterIsKnown(intro.router);
if(intro.ExpiresSoon(now))

@ -0,0 +1,15 @@
#include <service/router_lookup_job.hpp>
#include <service/endpoint.hpp>
namespace llarp
{
namespace service
{
RouterLookupJob::RouterLookupJob(Endpoint* p, RouterLookupHandler h)
: handler(h), txid(p->GenTXID()), started(p->Now())
{
}
} // namespace service
} // namespace llarp

@ -0,0 +1,37 @@
#ifndef LLARP_SERVICE_ROUTER_LOOKUP_JOB_HPP
#define LLARP_SERVICE_ROUTER_LOOKUP_JOB_HPP
#include <router_contact.hpp>
namespace llarp
{
namespace service
{
struct Endpoint;
struct RouterLookupJob
{
RouterLookupJob(Endpoint* p, RouterLookupHandler h);
RouterLookupHandler handler;
uint64_t txid;
llarp_time_t started;
bool
IsExpired(llarp_time_t now) const
{
if(now < started)
return false;
return now - started > 30000;
}
void
InformResult(std::vector< RouterContact > result)
{
if(handler)
handler(result);
}
};
} // namespace service
} // namespace llarp
#endif

@ -278,18 +278,11 @@ tuntap_read(struct device *dev, void *buf, size_t size)
return 0;
}
#ifdef Darwin
unsigned int pktinfo = 0;
unsigned int pktinfo = 0;
const struct iovec vecs[2] = {
{
.iov_base = &pktinfo,
.iov_len = sizeof(unsigned int)
},
{
.iov_base = buf,
.iov_len = size
}
};
n = readv(dev->tun_fd, vecs, 2);
{.iov_base = &pktinfo, .iov_len = sizeof(unsigned int)},
{.iov_base = buf, .iov_len = size}};
n = readv(dev->tun_fd, vecs, 2);
if(n >= (int)(sizeof(unsigned int)))
n -= sizeof(unsigned int);
#else
@ -320,17 +313,13 @@ tuntap_write(struct device *dev, void *buf, size_t size)
static unsigned int af6 = htonl(AF_INET6);
const struct iovec vecs[2] = {
{
.iov_base = (((unsigned char*)buf)[0] & 0x60) == 0x60 ? &af6 : &af4, .iov_len = sizeof(unsigned int)
},
{
.iov_base = buf,
.iov_len = size
}
};
{.iov_base = (((unsigned char *)buf)[0] & 0x60) == 0x60 ? &af6 : &af4,
.iov_len = sizeof(unsigned int)},
{.iov_base = buf, .iov_len = size}};
n = writev(dev->tun_fd, &vecs, 2);
if (n >= sizeof(unsigned int)) n -= sizeof(unsigned int);
n = writev(dev->tun_fd, vecs, 2);
if(n >= (int)sizeof(unsigned int))
n -= sizeof(unsigned int);
#else
n = write(dev->tun_fd, buf, size);

Loading…
Cancel
Save