use shared_ptr

pull/579/head
Jeff Becker 5 years ago
parent 986e831579
commit d423ee02d2
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -13,7 +13,7 @@ namespace llarp
struct MessageDecoder
{
const Key_t &From;
std::unique_ptr< IMessage > msg;
IMessage::Ptr_t msg;
bool firstKey = true;
bool relayed = false;
@ -83,7 +83,7 @@ namespace llarp
}
};
std::unique_ptr< IMessage >
IMessage::Ptr_t
DecodeMesssage(const Key_t &from, llarp_buffer_t *buf, bool relayed)
{
MessageDecoder dec(from);
@ -94,20 +94,19 @@ namespace llarp
if(!bencode_read_dict(buf, &r))
return nullptr;
return std::unique_ptr< IMessage >(std::move(dec.msg));
return std::move(dec.msg);
}
struct ListDecoder
{
ListDecoder(const Key_t &from,
std::vector< std::unique_ptr< IMessage > > &list)
ListDecoder(const Key_t &from, std::vector< IMessage::Ptr_t > &list)
: From(from), l(list)
{
}
bool relayed = false;
const Key_t &From;
std::vector< std::unique_ptr< IMessage > > &l;
std::vector< IMessage::Ptr_t > &l;
static bool
on_item(list_reader *r, bool has)
@ -128,8 +127,7 @@ namespace llarp
bool
DecodeMesssageList(Key_t from, llarp_buffer_t *buf,
std::vector< std::unique_ptr< IMessage > > &list,
bool relayed)
std::vector< IMessage::Ptr_t > &list, bool relayed)
{
ListDecoder dec(from, list);
dec.relayed = relayed;

@ -25,21 +25,22 @@ namespace llarp
{
}
using Ptr_t = std::unique_ptr< IMessage >;
virtual bool
HandleMessage(
struct llarp_dht_context* dht,
std::vector< std::unique_ptr< IMessage > >& replies) const = 0;
HandleMessage(struct llarp_dht_context* dht,
std::vector< Ptr_t >& replies) const = 0;
Key_t From;
PathID_t pathID;
};
std::unique_ptr< IMessage >
IMessage::Ptr_t
DecodeMessage(const Key_t& from, llarp_buffer_t* buf, bool relayed = false);
bool
DecodeMesssageList(Key_t from, llarp_buffer_t* buf,
std::vector< std::unique_ptr< IMessage > >& dst,
std::vector< IMessage::Ptr_t >& dst,
bool relayed = false);
} // namespace dht
} // namespace llarp

@ -73,8 +73,7 @@ namespace llarp
bool
FindIntroMessage::HandleMessage(
llarp_dht_context* ctx,
std::vector< std::unique_ptr< IMessage > >& replies) const
llarp_dht_context* ctx, std::vector< IMessage::Ptr_t >& replies) const
{
if(R > 5)
{

@ -54,9 +54,8 @@ namespace llarp
DecodeKey(const llarp_buffer_t& key, llarp_buffer_t* val) override;
bool
HandleMessage(
llarp_dht_context* ctx,
std::vector< std::unique_ptr< IMessage > >& replies) const override;
HandleMessage(llarp_dht_context* ctx,
std::vector< IMessage::Ptr_t >& replies) const override;
};
} // namespace dht
} // namespace llarp

@ -56,9 +56,8 @@ namespace llarp
/// the path of the result
/// TODO: smart path expiration logic needs to be implemented
virtual bool
HandleMessage(
llarp_dht_context* ctx,
std::vector< std::unique_ptr< IMessage > >& replies) const override;
HandleMessage(llarp_dht_context* ctx,
std::vector< IMessage::Ptr_t >& replies) const override;
};
} // namespace dht
} // namespace llarp

@ -75,7 +75,8 @@ namespace llarp
ctx->impl->GetRouter()->pathContext().GetLocalPathSet(pathID);
if(pathset)
{
return pathset->HandleGotIntroMessage(this);
auto copy = std::make_shared< const RelayedGotIntroMessage >(*this);
return pathset->HandleGotIntroMessage(copy);
}
llarp::LogWarn("No path for got intro message pathid=", pathID);
return false;

@ -3,6 +3,7 @@
#include <dht/message.hpp>
#include <service/intro_set.hpp>
#include <util/copy_or_nullptr.hpp>
#include <vector>
@ -24,6 +25,15 @@ namespace llarp
{
}
GotIntroMessage(const GotIntroMessage& other)
: IMessage(other.From)
, I(other.I)
, T(other.T)
, K(copy_or_nullptr(other.K))
{
version = other.version;
}
/// for iterative reply
GotIntroMessage(const Key_t& from, const Key_t& closer, uint64_t txid)
: IMessage(from), T(txid), K(new Key_t(closer))
@ -43,9 +53,8 @@ namespace llarp
DecodeKey(const llarp_buffer_t& key, llarp_buffer_t* val) override;
virtual bool
HandleMessage(
llarp_dht_context* ctx,
std::vector< std::unique_ptr< IMessage > >& replies) const override;
HandleMessage(llarp_dht_context* ctx,
std::vector< IMessage::Ptr_t >& replies) const override;
};
struct RelayedGotIntroMessage final : public GotIntroMessage
@ -55,10 +64,11 @@ namespace llarp
}
bool
HandleMessage(
llarp_dht_context* ctx,
std::vector< std::unique_ptr< IMessage > >& replies) const override;
HandleMessage(llarp_dht_context* ctx,
std::vector< IMessage::Ptr_t >& replies) const override;
};
using GotIntroMessage_constptr = std::shared_ptr< const GotIntroMessage >;
} // namespace dht
} // namespace llarp
#endif

@ -90,7 +90,8 @@ namespace llarp
{
auto pathset =
ctx->impl->GetRouter()->pathContext().GetLocalPathSet(pathID);
return pathset && pathset->HandleGotRouterMessage(this);
auto copy = std::make_shared< const GotRouterMessage >(*this);
return pathset && pathset->HandleGotRouterMessage(copy);
}
// not relayed
const TXOwner owner(From, txid);

@ -2,6 +2,7 @@
#define LLARP_DHT_MESSAGES_GOT_ROUTER_HPP
#include <dht/message.hpp>
#include <router_contact.hpp>
#include <util/copy_or_nullptr.hpp>
namespace llarp
{
@ -32,6 +33,17 @@ namespace llarp
{
}
GotRouterMessage(const GotRouterMessage& other)
: IMessage(other.From)
, R(other.R)
, N(other.N)
, K(copy_or_nullptr(other.K))
, txid(other.txid)
, relayed(other.relayed)
{
version = other.version;
}
~GotRouterMessage();
bool
@ -48,10 +60,11 @@ namespace llarp
std::vector< RouterContact > R;
std::vector< RouterID > N;
std::unique_ptr< Key_t > K;
uint64_t txid = 0;
uint64_t version = 0;
bool relayed = false;
uint64_t txid = 0;
bool relayed = false;
};
using GotRouterMessage_constptr = std::shared_ptr< const GotRouterMessage >;
} // namespace dht
} // namespace llarp
#endif

@ -67,7 +67,7 @@ namespace llarp
{
if(db->Get(m_ExitRouter, cur))
return true;
router->LookupRouter(m_ExitRouter);
router->LookupRouter(m_ExitRouter, nullptr);
return false;
}
else if(hop == numHops - 2)
@ -263,6 +263,15 @@ namespace llarp
for(auto& item : m_Upstream)
item.second.clear();
m_Upstream.clear();
if(numHops == 1)
{
auto r = router;
r->LookupRouter(m_ExitRouter,
[r](const std::vector< RouterContact > results) {
if(results.size())
r->TryConnectAsync(results[0], 5);
});
}
}
return true;
}

@ -28,7 +28,8 @@ namespace llarp
BaseSession(const llarp::RouterID& exitRouter,
std::function< bool(const llarp_buffer_t&) > writepkt,
AbstractRouter* r, size_t numpaths, size_t hoplen, bool bundleRC);
AbstractRouter* r, size_t numpaths, size_t hoplen,
bool bundleRC);
virtual ~BaseSession();
@ -157,7 +158,8 @@ namespace llarp
{
ExitSession(const llarp::RouterID& snodeRouter,
std::function< bool(const llarp_buffer_t&) > writepkt,
AbstractRouter* r, size_t numpaths, size_t hoplen, bool bundleRC)
AbstractRouter* r, size_t numpaths, size_t hoplen,
bool bundleRC)
: BaseSession(snodeRouter, writepkt, r, numpaths, hoplen, bundleRC)
{
}

@ -12,7 +12,7 @@ namespace llarp
{
struct DHTMessage final : public IMessage
{
std::vector< std::unique_ptr< llarp::dht::IMessage > > M;
std::vector< llarp::dht::IMessage::Ptr_t > M;
uint64_t V = 0;
~DHTMessage();

@ -159,16 +159,14 @@ namespace llarp
/// override me in subtype
virtual bool
HandleGotIntroMessage(__attribute__((unused))
const dht::GotIntroMessage* msg)
HandleGotIntroMessage(std::shared_ptr< const dht::GotIntroMessage >)
{
return false;
}
/// override me in subtype
virtual bool
HandleGotRouterMessage(__attribute__((unused))
const dht::GotRouterMessage* msg)
HandleGotRouterMessage(std::shared_ptr< const dht::GotRouterMessage >)
{
return false;
}

@ -5,6 +5,8 @@
#include <util/status.hpp>
#include <vector>
#include <ev/ev.h>
#include <functional>
#include <router_contact.hpp>
struct llarp_buffer_t;
struct llarp_dht_context;
@ -16,7 +18,6 @@ namespace llarp
class Logic;
struct Config;
struct Crypto;
struct RouterContact;
struct RouterID;
struct ILinkMessage;
struct ILinkSession;
@ -137,6 +138,9 @@ namespace llarp
virtual bool
Reconfigure(Config *conf) = 0;
virtual bool
TryConnectAsync(RouterContact rc, uint16_t tries) = 0;
/// validate new configuration against old one
/// return true on 100% valid
/// return false if not 100% valid
@ -183,7 +187,7 @@ namespace llarp
/// if we are a service node this is done direct otherwise it's done via
/// path
virtual void
LookupRouter(RouterID remote) = 0;
LookupRouter(RouterID remote, RouterLookupHandler resultHandler) = 0;
/// check if newRc matches oldRC and update local rc for this remote contact
/// if valid

@ -1124,17 +1124,20 @@ namespace llarp
}
void
Router::LookupRouter(RouterID remote)
Router::LookupRouter(RouterID remote, RouterLookupHandler resultHandler)
{
if(IsServiceNode())
{
ServiceNodeLookupRouterWhenExpired(remote);
if(resultHandler)
dht()->impl->LookupRouter(remote, resultHandler);
else
ServiceNodeLookupRouterWhenExpired(remote);
return;
}
_hiddenServiceContext.ForEachService(
[=](const std::string &,
const std::shared_ptr< service::Endpoint > &ep) -> bool {
return !ep->LookupRouterAnon(remote);
return !ep->LookupRouterAnon(remote, resultHandler);
});
}

@ -442,7 +442,7 @@ namespace llarp
FlushOutboundFor(RouterID remote, ILinkLayer *chosen = nullptr);
void
LookupRouter(RouterID remote) override;
LookupRouter(RouterID remote, RouterLookupHandler handler) override;
/// manually discard all pending messages to remote router
void
@ -517,7 +517,7 @@ namespace llarp
NumberOfConnectedRouters() const override;
bool
TryConnectAsync(RouterContact rc, uint16_t tries);
TryConnectAsync(RouterContact rc, uint16_t tries) override;
bool
GetRandomConnectedRouter(RouterContact &result) const override;

@ -107,7 +107,7 @@ namespace llarp
// TODO: we need to stop looking up service nodes that are gone forever
// how do?
for(const auto &k : expired)
if(!ep->LookupRouterAnon(k))
if(!ep->LookupRouterAnon(k, nullptr))
return false;
return true;
});

@ -357,7 +357,7 @@ namespace llarp
}
bool
Endpoint::HandleGotIntroMessage(const dht::GotIntroMessage* msg)
Endpoint::HandleGotIntroMessage(dht::GotIntroMessage_constptr msg)
{
auto crypto = m_Router->crypto();
std::set< IntroSet > remote;
@ -716,21 +716,33 @@ namespace llarp
}
bool
Endpoint::HandleGotRouterMessage(const dht::GotRouterMessage* msg)
Endpoint::HandleGotRouterMessage(dht::GotRouterMessage_constptr msg)
{
auto itr = m_PendingRouters.find(msg->R[0].pubkey);
if(itr == m_PendingRouters.end())
return false;
if(msg->R.size() == 1)
{
auto itr = m_PendingRouters.find(msg->R[0].pubkey);
if(itr == m_PendingRouters.end())
return false;
llarp_async_verify_rc* job = new llarp_async_verify_rc;
job->nodedb = m_Router->nodedb();
job->cryptoworker = m_Router->threadpool();
job->diskworker = m_Router->diskworker();
job->logic = m_Router->logic();
job->hook = nullptr;
job->rc = msg->R[0];
job->hook = [=](llarp_async_verify_rc* j) {
auto i = m_PendingRouters.find(msg->R[0].pubkey);
if(j->valid)
i->second.InformResult(msg->R);
else
i->second.InformResult({});
m_PendingRouters.erase(i);
delete j;
};
job->rc = msg->R[0];
llarp_nodedb_async_verify(job);
}
else
{
itr->second.InformResult({});
m_PendingRouters.erase(itr);
}
return true;
@ -743,12 +755,12 @@ namespace llarp
return;
if(!m_Router->nodedb()->Has(router))
{
LookupRouterAnon(router);
LookupRouterAnon(router, nullptr);
}
}
bool
Endpoint::LookupRouterAnon(RouterID router)
Endpoint::LookupRouterAnon(RouterID router, RouterLookupHandler handler)
{
if(m_PendingRouters.find(router) == m_PendingRouters.end())
{
@ -761,7 +773,7 @@ namespace llarp
if(path && path->SendRoutingMessage(msg, m_Router))
{
LogInfo(Name(), " looking up ", router);
m_PendingRouters.emplace(router, RouterLookupJob(this));
m_PendingRouters.emplace(router, RouterLookupJob(this, handler));
return true;
}
else
@ -797,7 +809,8 @@ namespace llarp
}
bool
Endpoint::HandleDataMessage(const PathID_t& src, ProtocolMessage* msg)
Endpoint::HandleDataMessage(const PathID_t& src,
std::shared_ptr< ProtocolMessage > msg)
{
auto path = GetPathByID(src);
if(path)
@ -825,7 +838,7 @@ namespace llarp
}
bool
Endpoint::ProcessDataMessage(ProtocolMessage* msg)
Endpoint::ProcessDataMessage(std::shared_ptr< ProtocolMessage > msg)
{
if(msg->proto == eProtocolTraffic)
{

@ -135,10 +135,12 @@ namespace llarp
PublishIntroSetVia(AbstractRouter* r, path::Path_ptr p);
bool
HandleGotIntroMessage(const dht::GotIntroMessage* msg) override;
HandleGotIntroMessage(
std::shared_ptr< const dht::GotIntroMessage > msg) override;
bool
HandleGotRouterMessage(const dht::GotRouterMessage* msg) override;
HandleGotRouterMessage(
std::shared_ptr< const dht::GotRouterMessage > msg) override;
bool
HandleHiddenServiceFrame(path::Path_ptr p,
@ -165,14 +167,15 @@ namespace llarp
ForgetPathToService(const Address& remote);
bool
HandleDataMessage(const PathID_t&, ProtocolMessage* msg) override;
HandleDataMessage(const PathID_t&,
std::shared_ptr< ProtocolMessage > msg) override;
virtual bool
HandleWriteIPPacket(const llarp_buffer_t& pkt,
std::function< huint32_t(void) > getFromIP) = 0;
bool
ProcessDataMessage(ProtocolMessage* msg);
ProcessDataMessage(std::shared_ptr< ProtocolMessage > msg);
/// ensure that we know a router, looks up if it doesn't
void
@ -180,7 +183,7 @@ namespace llarp
/// lookup a router via closest path
bool
LookupRouterAnon(RouterID router);
LookupRouterAnon(RouterID router, RouterLookupHandler handler);
/// called on event loop pump
virtual void
@ -397,12 +400,13 @@ namespace llarp
struct RouterLookupJob
{
RouterLookupJob(Endpoint* p)
RouterLookupJob(Endpoint* p, RouterLookupHandler h) : handler(h)
{
started = p->Now();
txid = p->GenTXID();
}
RouterLookupHandler handler;
uint64_t txid;
llarp_time_t started;
@ -413,6 +417,13 @@ namespace llarp
return false;
return now - started > 5000;
}
void
InformResult(const std::vector< RouterContact >& result)
{
if(handler)
handler(result);
}
};
using PendingRouters =

@ -5,6 +5,7 @@
#include <path/path_types.hpp>
#include <service/intro_set.hpp>
#include <util/aligned.hpp>
#include <memory>
namespace llarp
{
@ -16,7 +17,8 @@ namespace llarp
struct IDataHandler
{
virtual bool
HandleDataMessage(const PathID_t&, ProtocolMessage* msg) = 0;
HandleDataMessage(const PathID_t&,
std::shared_ptr< ProtocolMessage > msg) = 0;
virtual bool
GetCachedSessionKeyFor(const ConvoTag& remote,

@ -30,12 +30,10 @@ namespace llarp
}
void
ProtocolMessage::ProcessAsync(void* user)
ProtocolMessage::ProcessAsync(std::shared_ptr< ProtocolMessage > self)
{
ProtocolMessage* self = static_cast< ProtocolMessage* >(user);
if(!self->handler->HandleDataMessage(self->srcPath, self))
LogWarn("failed to handle data message from ", self->srcPath);
delete self;
}
bool
@ -245,14 +243,15 @@ namespace llarp
{
Crypto* crypto;
Logic* logic;
ProtocolMessage* msg;
std::shared_ptr< ProtocolMessage > msg;
const Identity& m_LocalIdentity;
IDataHandler* handler;
const ProtocolFrame frame;
const Introduction fromIntro;
AsyncFrameDecrypt(Logic* l, Crypto* c, const Identity& localIdent,
IDataHandler* h, ProtocolMessage* m,
IDataHandler* h,
const std::shared_ptr< ProtocolMessage >& m,
const ProtocolFrame& f, const Introduction& recvIntro)
: crypto(c)
, logic(l)
@ -277,7 +276,7 @@ namespace llarp
pq_keypair_to_secret(self->m_LocalIdentity.pq)))
{
LogError("pqke failed C=", self->frame.C);
delete self->msg;
self->msg.reset();
delete self;
return;
}
@ -288,7 +287,7 @@ namespace llarp
{
LogError("failed to decode inner protocol message");
DumpBuffer(*buf);
delete self->msg;
self->msg.reset();
delete self;
return;
}
@ -299,7 +298,7 @@ namespace llarp
" from ", self->msg->sender.Addr());
self->frame.Dump< MAX_PROTOCOL_MESSAGE_SIZE >();
self->msg->Dump< MAX_PROTOCOL_MESSAGE_SIZE >();
delete self->msg;
self->msg.reset();
delete self;
return;
}
@ -308,7 +307,7 @@ namespace llarp
{
LogError("dropping duplicate convo tag T=", self->msg->tag);
// TODO: send convotag reset
delete self->msg;
self->msg.reset();
delete self;
return;
}
@ -324,7 +323,7 @@ namespace llarp
{
LogError("x25519 key exchange failed");
self->frame.Dump< MAX_PROTOCOL_MESSAGE_SIZE >();
delete self->msg;
self->msg.reset();
delete self;
return;
}
@ -340,8 +339,9 @@ namespace llarp
self->handler->PutSenderFor(self->msg->tag, self->msg->sender);
self->handler->PutCachedSessionKeyFor(self->msg->tag, sharedKey);
self->msg->handler = self->handler;
self->logic->queue_job({self->msg, &ProtocolMessage::ProcessAsync});
self->msg->handler = self->handler;
std::shared_ptr< ProtocolMessage > msg = std::move(self->msg);
self->logic->queue_func([=]() { ProtocolMessage::ProcessAsync(msg); });
delete self;
}
};
@ -368,11 +368,11 @@ namespace llarp
const Identity& localIdent,
IDataHandler* handler) const
{
auto msg = std::make_shared< ProtocolMessage >();
if(T.IsZero())
{
LogInfo("Got protocol frame with new convo");
ProtocolMessage* msg = new ProtocolMessage();
msg->srcPath = recvPath->RXID();
msg->srcPath = recvPath->RXID();
// we need to dh
auto dh = new AsyncFrameDecrypt(logic, c, localIdent, handler, msg,
*this, recvPath->intro);
@ -396,16 +396,14 @@ namespace llarp
LogError("Signature failure from ", si.Addr());
return false;
}
ProtocolMessage* msg = new ProtocolMessage();
if(!DecryptPayloadInto(c, shared, *msg))
{
LogError("failed to decrypt message");
delete msg;
return false;
}
msg->srcPath = recvPath->RXID();
msg->handler = handler;
logic->queue_job({msg, &ProtocolMessage::ProcessAsync});
logic->queue_func([=]() { ProtocolMessage::ProcessAsync(msg); });
return true;
}

@ -63,7 +63,7 @@ namespace llarp
PutBuffer(const llarp_buffer_t& payload);
static void
ProcessAsync(void* user);
ProcessAsync(std::shared_ptr< ProtocolMessage > self);
};
/// outer message

@ -0,0 +1,14 @@
#ifndef LLARP_UTIL_COPY_OR_NULLPTR_HPP
#define LLARP_UTIL_COPY_OR_NULLPTR_HPP
#include <memory>
template < typename T >
static constexpr std::unique_ptr< T >
copy_or_nullptr(const std::unique_ptr< T >& other)
{
if(other)
return std::make_unique< T >(*other);
return nullptr;
}
#endif

@ -12,7 +12,7 @@ namespace llarp
std::string value = str;
std::transform(value.begin(), value.end(), value.begin(),
[](char ch) -> char { return std::tolower(ch); });
return value == "no" || value == "false" || value == "0" || value == "off";
return value == "no" || value == "false" || value == "0";
}
bool
@ -21,7 +21,7 @@ namespace llarp
std::string value = str;
std::transform(value.begin(), value.end(), value.begin(),
[](char ch) -> char { return std::tolower(ch); });
return value == "yes" || value == "true" || value == "1" || value == "on";
return value == "yes" || value == "true" || value == "1";
}
bool

Loading…
Cancel
Save