* fix dht feedback loop

* start on dht lookups for hidden services

* make debug logging confurable on runtime with env var LLARP_DEBUG=1

* make eventloop tick only when we get traffic

* make testnet parameters configurable on runtime
pull/5/head
Jeff Becker 6 years ago
parent bae4f746be
commit fe01c38d8e
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -21,6 +21,10 @@ TESTNET_ROOT=$(REPO)/testnet_tmp
TESTNET_CONF=$(TESTNET_ROOT)/supervisor.conf TESTNET_CONF=$(TESTNET_ROOT)/supervisor.conf
TESTNET_LOG=$(TESTNET_ROOT)/testnet.log TESTNET_LOG=$(TESTNET_ROOT)/testnet.log
TESTNET_CLIENTS ?= 50
TESTNET_SERVERS ?= 50
TESTNET_DEBUG ?= 0
clean: clean:
rm -f build.ninja rules.ninja cmake_install.cmake CMakeCache.txt rm -f build.ninja rules.ninja cmake_install.cmake CMakeCache.txt
rm -rf CMakeFiles rm -rf CMakeFiles
@ -70,8 +74,8 @@ testnet-build: testnet-configure
testnet: testnet-build testnet: testnet-build
mkdir -p $(TESTNET_ROOT) mkdir -p $(TESTNET_ROOT)
python3 contrib/testnet/genconf.py --bin=$(REPO)/llarpd --svc=30 --clients=1 --dir=$(TESTNET_ROOT) --out $(TESTNET_CONF) python3 contrib/testnet/genconf.py --bin=$(REPO)/llarpd --svc=$(TESTNET_SERVERS) --clients=$(TESTNET_CLIENTS) --dir=$(TESTNET_ROOT) --out $(TESTNET_CONF)
supervisord -n -d $(TESTNET_ROOT) -l $(TESTNET_LOG) -c $(TESTNET_CONF) LLARP_DEBUG=$(TESTNET_DEBUG) supervisord -n -d $(TESTNET_ROOT) -l $(TESTNET_LOG) -c $(TESTNET_CONF)
test: debug-configure test: debug-configure
ninja ninja

@ -112,8 +112,8 @@ namespace llarp
std::unordered_map< TXOwner, SearchJob, TXOwnerHash > pendingTX; std::unordered_map< TXOwner, SearchJob, TXOwnerHash > pendingTX;
Key_t ourKey; Key_t ourKey;
}; };
} } // namespace dht
} } // namespace llarp
struct llarp_dht_context struct llarp_dht_context
{ {

@ -13,7 +13,7 @@ namespace llarp
std::list< llarp::service::IntroSet > I; std::list< llarp::service::IntroSet > I;
uint64_t T = 0; uint64_t T = 0;
GotIntroMessage() : IMessage({}) GotIntroMessage(const Key_t& from) : IMessage(from)
{ {
} }
@ -27,10 +27,21 @@ namespace llarp
bool bool
DecodeKey(llarp_buffer_t key, llarp_buffer_t* val); DecodeKey(llarp_buffer_t key, llarp_buffer_t* val);
virtual bool
HandleMessage(llarp_dht_context* ctx,
std::vector< IMessage* >& replies) const;
};
struct RelayedGotIntroMessage : public GotIntroMessage
{
RelayedGotIntroMessage() : GotIntroMessage({})
{
}
bool bool
HandleMessage(llarp_dht_context* ctx, HandleMessage(llarp_dht_context* ctx,
std::vector< IMessage* >& replies) const; std::vector< IMessage* >& replies) const;
}; };
} } // namespace dht
} } // namespace llarp
#endif #endif

@ -3,8 +3,9 @@
#define LLARP_DHT_SEARCH_JOB_HPP #define LLARP_DHT_SEARCH_JOB_HPP
#include <llarp/dht.h> #include <llarp/dht.h>
#include <llarp/time.h> #include <llarp/time.h>
#include <functional>
#include <llarp/dht/key.hpp> #include <llarp/dht/key.hpp>
#include <llarp/service/IntroSet.hpp>
#include <set> #include <set>
namespace llarp namespace llarp
@ -14,26 +15,39 @@ namespace llarp
struct SearchJob struct SearchJob
{ {
const static uint64_t JobTimeout = 30000; const static uint64_t JobTimeout = 30000;
typedef std::function< void(const llarp::service::IntroSet*) >
IntroSetHookFunc;
SearchJob(); SearchJob();
/// for routers
SearchJob(const Key_t& requester, uint64_t requesterTX,
const Key_t& target, const std::set< Key_t >& excludes,
llarp_router_lookup_job* job);
/// for introsets
SearchJob(const Key_t& requester, uint64_t requesterTX, SearchJob(const Key_t& requester, uint64_t requesterTX,
const Key_t& target, llarp_router_lookup_job* job, const Key_t& target, const std::set< Key_t >& excludes,
const std::set< Key_t >& excludes); IntroSetHookFunc found);
void
FoundRouter(const llarp_rc* router) const;
void
FoundIntro(const llarp::service::IntroSet* introset) const;
void void
Completed(const llarp_rc* router, bool timeout = false) const; Timeout() const;
bool bool
IsExpired(llarp_time_t now) const; IsExpired(llarp_time_t now) const;
// only set if looking up router
llarp_router_lookup_job* job = nullptr; llarp_router_lookup_job* job = nullptr;
IntroSetHookFunc foundIntroHook;
llarp_time_t started; llarp_time_t started;
Key_t requester; Key_t requester;
uint64_t requesterTX; uint64_t requesterTX;
Key_t target; Key_t target;
std::set< Key_t > exclude; std::set< Key_t > exclude;
}; };
} } // namespace dht
} } // namespace llarp
#endif #endif

@ -41,6 +41,9 @@ struct transit_message
bool bool
should_send_ack(llarp_time_t now) const; should_send_ack(llarp_time_t now) const;
bool
should_resend_frags(llarp_time_t now) const;
bool bool
should_resend_xmit(llarp_time_t now) const; should_resend_xmit(llarp_time_t now) const;
bool bool

@ -45,7 +45,7 @@ namespace llarp
/// calculate our address /// calculate our address
bool bool
CalculateAddress(Address& addr) const; CalculateAddress(byte_t* buf) const;
bool bool
BEncode(llarp_buffer_t* buf) const; BEncode(llarp_buffer_t* buf) const;

@ -14,9 +14,9 @@ namespace llarp
{ {
llarp::PubKey router; llarp::PubKey router;
llarp::PathID_t pathID; llarp::PathID_t pathID;
uint64_t latency = 0; uint64_t latency = 0;
uint64_t version = 0; uint64_t version = 0;
uint64_t expiresAt; uint64_t expiresAt = 0;
Introduction() = default; Introduction() = default;
Introduction(const Introduction& other) Introduction(const Introduction& other)

@ -1,17 +1,28 @@
#ifndef LLARP_SERVICE_ADDRESS_HPP #ifndef LLARP_SERVICE_ADDRESS_HPP
#define LLARP_SERVICE_ADDRESS_HPP #define LLARP_SERVICE_ADDRESS_HPP
#include <llarp/aligned.hpp> #include <llarp/aligned.hpp>
#include <llarp/dht/key.hpp>
#include <string> #include <string>
namespace llarp namespace llarp
{ {
namespace service namespace service
{ {
typedef llarp::AlignedBuffer< 32 > Address; struct Address : public llarp::AlignedBuffer< 32 >
{
std::string
ToString() const;
std::string Address() : llarp::AlignedBuffer< 32 >()
AddressToString(const Address& addr); {
} }
}
Address(const byte_t* data) : llarp::AlignedBuffer< 32 >(data)
{
}
};
} // namespace service
} // namespace llarp
#endif #endif

@ -1,4 +1,5 @@
#include <llarp.h> #include <llarp.h>
#include <llarp/logger.h>
#include <signal.h> #include <signal.h>
#include <llarp.hpp> #include <llarp.hpp>
#include "logger.hpp" #include "logger.hpp"
@ -308,7 +309,11 @@ llarp_main_init(const char *fname, bool multiProcess)
{ {
if(!fname) if(!fname)
fname = "daemon.ini"; fname = "daemon.ini";
char *var = getenv("LLARP_DEBUG");
if(var && *var == '1')
{
cSetLogLevel(eLogDebug);
}
llarp_main *m = new llarp_main; llarp_main *m = new llarp_main;
m->ctx.reset(new llarp::Context(std::cout, !multiProcess)); m->ctx.reset(new llarp::Context(std::cout, !multiProcess));
if(!m->ctx->LoadConfig(fname)) if(!m->ctx->LoadConfig(fname))

@ -123,7 +123,7 @@ namespace llarp
{ {
if(itr->second.IsExpired(now)) if(itr->second.IsExpired(now))
{ {
itr->second.Completed(nullptr, true); itr->second.Timeout();
itr = pendingTX.erase(itr); itr = pendingTX.erase(itr);
} }
else else
@ -174,9 +174,8 @@ namespace llarp
ownerKey.txid = id; ownerKey.txid = id;
if(txid == 0) if(txid == 0)
txid = id; txid = id;
SearchJob j(whoasked, txid, target, excludes, job);
pendingTX[ownerKey] = SearchJob(whoasked, txid, target, job, excludes); pendingTX[ownerKey] = j;
llarp::LogInfo("Asking ", askpeer, " for router ", target, " for ", llarp::LogInfo("Asking ", askpeer, " for router ", target, " for ",
whoasked); whoasked);
auto msg = new llarp::DHTImmeidateMessage(askpeer); auto msg = new llarp::DHTImmeidateMessage(askpeer);

@ -60,14 +60,13 @@ namespace llarp
case 'G': case 'G':
if(dec->relayed) if(dec->relayed)
{ {
dec->msg = new GotIntroMessage(); dec->msg = new RelayedGotIntroMessage();
break; break;
} }
else else
{ {
llarp::LogWarn( dec->msg = new GotIntroMessage(dec->From);
"GotIntroMessage found when parsing direct DHT message"); break;
return false;
} }
default: default:
llarp::LogWarn("unknown dht message type: ", (char)*strbuf.base); llarp::LogWarn("unknown dht message type: ", (char)*strbuf.base);
@ -91,7 +90,9 @@ namespace llarp
r.user = &dec; r.user = &dec;
r.on_key = &MessageDecoder::on_key; r.on_key = &MessageDecoder::on_key;
if(bencode_read_dict(buf, &r)) if(bencode_read_dict(buf, &r))
{
return dec.msg; return dec.msg;
}
else else
{ {
if(dec.msg) if(dec.msg)
@ -137,5 +138,5 @@ namespace llarp
r.on_item = &ListDecoder::on_item; r.on_item = &ListDecoder::on_item;
return bencode_read_list(buf, &r); return bencode_read_list(buf, &r);
} }
} } // namespace dht
} } // namespace llarp

@ -68,6 +68,14 @@ namespace llarp
{ {
result &= msg->HandleMessage(router->dht, reply->msgs); result &= msg->HandleMessage(router->dht, reply->msgs);
} }
return result && router->SendToOrQueue(remote.data(), reply); if(reply->msgs.size())
{
return result && router->SendToOrQueue(remote.data(), reply);
}
else
{
delete reply;
return result;
}
} }
} }

@ -25,6 +25,37 @@ namespace llarp
bool bool
GotIntroMessage::HandleMessage(llarp_dht_context *ctx, GotIntroMessage::HandleMessage(llarp_dht_context *ctx,
std::vector< IMessage * > &replies) const std::vector< IMessage * > &replies) const
{
auto &dht = ctx->impl;
auto crypto = &dht.router->crypto;
if(I.size() == 1)
{
const auto &introset = I.front();
if(!introset.VerifySignature(crypto))
{
llarp::LogWarn(
"Invalid introset signature while handling direct GotIntro from ",
From);
return false;
}
llarp::dht::Key_t addr;
if(!introset.A.CalculateAddress(addr))
{
llarp::LogWarn(
"failed to calculate hidden service address for direct GotIntro "
"message from ",
From);
return false;
}
// TODO: inform any pending tx
return true;
}
return false;
}
bool
RelayedGotIntroMessage::HandleMessage(
llarp_dht_context *ctx, std::vector< IMessage * > &replies) const
{ {
// TODO: implement me better? // TODO: implement me better?
auto pathset = ctx->impl.router->paths.GetLocalPathSet(pathID); auto pathset = ctx->impl.router->paths.GetLocalPathSet(pathID);
@ -66,5 +97,5 @@ namespace llarp
return false; return false;
return bencode_end(buf); return bencode_end(buf);
} }
} } // namespace dht
} } // namespace llarp

@ -67,7 +67,7 @@ namespace llarp
{ {
if(R.size()) if(R.size())
{ {
pending->Completed(&R[0]); pending->FoundRouter(&R[0]);
if(pending->requester != dht.OurKey()) if(pending->requester != dht.OurKey())
{ {
replies.push_back(new GotRouterMessage( replies.push_back(new GotRouterMessage(
@ -95,7 +95,7 @@ namespace llarp
{ {
llarp::LogInfo(pending->target, " was not found via ", From, llarp::LogInfo(pending->target, " was not found via ", From,
" and we won't look it up"); " and we won't look it up");
pending->Completed(nullptr); pending->FoundRouter(nullptr);
if(pending->requester != dht.OurKey()) if(pending->requester != dht.OurKey())
{ {
replies.push_back(new GotRouterMessage( replies.push_back(new GotRouterMessage(
@ -110,5 +110,5 @@ namespace llarp
"Got response for DHT transaction we are not tracking, txid=", txid); "Got response for DHT transaction we are not tracking, txid=", txid);
return false; return false;
} }
} } // namespace dht
} } // namespace llarp

@ -2,6 +2,7 @@
#include <llarp/dht/context.hpp> #include <llarp/dht/context.hpp>
#include <llarp/dht/messages/pubintro.hpp> #include <llarp/dht/messages/pubintro.hpp>
#include <llarp/messages/dht.hpp> #include <llarp/messages/dht.hpp>
#include <llarp/messages/dht_immediate.hpp>
#include "router.hpp" #include "router.hpp"
namespace llarp namespace llarp
@ -49,9 +50,27 @@ namespace llarp
llarp::LogWarn("proof of work not good enough for IntroSet"); llarp::LogWarn("proof of work not good enough for IntroSet");
return false; return false;
} }
// TODO: make this smarter (?) llarp::dht::Key_t addr;
if(!I.A.CalculateAddress(addr))
{
llarp::LogWarn(
"failed to calculate hidden service address for PubIntro message");
return false;
}
dht.services->PutNode(I); dht.services->PutNode(I);
replies.push_back(new GotIntroMessage(txID, &I)); replies.push_back(new GotIntroMessage(txID, &I));
Key_t peer;
std::set< Key_t > exclude;
exclude.insert(From);
if(txID && dht.nodes->FindCloseExcluding(addr, peer, exclude))
{
// we are not the closest to this address, send it to the closest node
llarp::LogInfo("telling closer peer ", peer, " we got an IntroSet for ",
addr);
auto msg = new llarp::DHTImmeidateMessage(peer);
msg->msgs.push_back(new PublishIntroMessage(I, 0));
dht.router->SendToOrQueue(peer, msg);
}
return true; return true;
} }
@ -77,5 +96,5 @@ namespace llarp
return false; return false;
return bencode_end(buf); return bencode_end(buf);
} }
} } // namespace dht
} } // namespace llarp

@ -11,8 +11,8 @@ namespace llarp
} }
SearchJob::SearchJob(const Key_t &asker, uint64_t tx, const Key_t &key, SearchJob::SearchJob(const Key_t &asker, uint64_t tx, const Key_t &key,
llarp_router_lookup_job *j, const std::set< Key_t > &excludes,
const std::set< Key_t > &excludes) llarp_router_lookup_job *j)
: job(j) : job(j)
, started(llarp_time_now_ms()) , started(llarp_time_now_ms())
, requester(asker) , requester(asker)
@ -22,8 +22,27 @@ namespace llarp
{ {
} }
SearchJob::SearchJob(const Key_t &asker, uint64_t tx, const Key_t &key,
const std::set< Key_t > &excludes,
IntroSetHookFunc foundIntroset)
: foundIntroHook(foundIntroset)
, started(llarp_time_now_ms())
, requester(asker)
, requesterTX(tx)
, target(key)
, exclude(excludes)
{
}
void
SearchJob::FoundIntro(const llarp::service::IntroSet *introset) const
{
if(foundIntroHook)
foundIntroHook(introset);
}
void void
SearchJob::Completed(const llarp_rc *router, bool timeout) const SearchJob::FoundRouter(const llarp_rc *router) const
{ {
if(job && job->hook) if(job && job->hook)
{ {
@ -41,5 +60,19 @@ namespace llarp
{ {
return now - started >= JobTimeout; return now - started >= JobTimeout;
} }
}
} void
SearchJob::Timeout() const
{
if(job)
{
job->found = false;
job->hook(job);
}
else if(foundIntroHook)
{
foundIntroHook(nullptr);
}
}
} // namespace dht
} // namespace llarp

@ -130,10 +130,10 @@ struct llarp_epoll_loop : public llarp_ev_loop
} }
++idx; ++idx;
} }
for(auto& l : udp_listeners)
if(l->tick)
l->tick(l);
} }
for(auto& l : udp_listeners)
if(l->tick)
l->tick(l);
return result; return result;
} }
@ -168,10 +168,11 @@ struct llarp_epoll_loop : public llarp_ev_loop
} }
++idx; ++idx;
} }
for(auto& l : udp_listeners)
if(l->tick)
l->tick(l);
} }
for(auto& l : udp_listeners)
if(l->tick)
l->tick(l);
} while(epollfd != -1); } while(epollfd != -1);
return result; return result;
} }

@ -122,10 +122,11 @@ struct llarp_kqueue_loop : public llarp_ev_loop
} }
++idx; ++idx;
} }
for(auto& l : udp_listeners)
if(l->tick)
l->tick(l);
} }
for(auto& l : udp_listeners)
if(l->tick)
l->tick(l);
return result; return result;
} }
@ -153,10 +154,10 @@ struct llarp_kqueue_loop : public llarp_ev_loop
} }
++idx; ++idx;
} }
for(auto& l : udp_listeners)
if(l->tick)
l->tick(l);
} }
for(auto& l : udp_listeners)
if(l->tick)
l->tick(l);
} while(result != -1); } while(result != -1);
return result; return result;
} }

@ -276,7 +276,7 @@ frame_state::got_acks(frame_header hdr, size_t sz)
tx.erase(msgid); tx.erase(msgid);
delete msg; delete msg;
} }
else else if(msg->should_resend_frags(llarp_time_now_ms()))
{ {
llarp::LogDebug("message ", msgid, " retransmit fragments"); llarp::LogDebug("message ", msgid, " retransmit fragments");
msg->retransmit_frags(sendqueue, txflags); msg->retransmit_frags(sendqueue, txflags);
@ -348,6 +348,7 @@ frame_state::queue_tx(uint64_t id, transit_message *msg)
{ {
tx.insert(std::make_pair(id, msg)); tx.insert(std::make_pair(id, msg));
msg->generate_xmit(sendqueue, txflags); msg->generate_xmit(sendqueue, txflags);
msg->retransmit_frags(sendqueue, txflags);
} }
void void

@ -417,7 +417,6 @@ llarp_link_session::TickLogic()
q.pop(); q.pop();
} }
frame.process_inbound_queue(); frame.process_inbound_queue();
frame.retransmit(llarp_time_now_ms());
pump(); pump();
} }
@ -605,7 +604,10 @@ llarp_link_session::decrypt_frame(const void *buf, size_t sz)
decryptedFrames.Put(f); decryptedFrames.Put(f);
} }
else else
{
llarp::LogWarn("decrypt frame fail"); llarp::LogWarn("decrypt frame fail");
delete f;
}
// f->hook = &handle_frame_decrypt; // f->hook = &handle_frame_decrypt;
// iwp_call_async_frame_decrypt(iwp, f); // iwp_call_async_frame_decrypt(iwp, f);
} }

@ -76,6 +76,12 @@ transit_message::should_resend_xmit(llarp_time_t now) const
return lastAck == 0 && now - started > 1000; return lastAck == 0 && now - started > 1000;
} }
bool
transit_message::should_resend_frags(llarp_time_t now) const
{
return now - lastAck > 1000 && !completed();
}
bool bool
transit_message::completed() const transit_message::completed() const
{ {

@ -59,14 +59,14 @@ namespace llarp
} }
bool bool
ServiceInfo::CalculateAddress(Address& addr) const ServiceInfo::CalculateAddress(byte_t* addr) const
{ {
byte_t tmp[128]; byte_t tmp[128];
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp); auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
if(!BEncode(&buf)) if(!BEncode(&buf))
return false; return false;
return crypto_generichash(addr, addr.size(), buf.base, buf.cur - buf.base, return crypto_generichash(addr, 32, buf.base, buf.cur - buf.base, nullptr,
nullptr, 0) 0)
!= -1; != -1;
} }

@ -13,7 +13,6 @@ namespace llarp
auto itr = m_Endpoints.begin(); auto itr = m_Endpoints.begin();
while(itr != m_Endpoints.end()) while(itr != m_Endpoints.end())
{ {
delete itr->second;
itr = m_Endpoints.erase(itr); itr = m_Endpoints.erase(itr);
} }
} }
@ -46,6 +45,8 @@ namespace llarp
auto &v = option.second; auto &v = option.second;
if(!service->SetOption(k, v)) if(!service->SetOption(k, v))
{ {
llarp::LogError("failed to set ", k, "=", v,
" for hidden service endpoint ", conf.first);
delete service; delete service;
return false; return false;
} }

@ -16,9 +16,8 @@ namespace llarp
if(k == "keyfile") if(k == "keyfile")
{ {
m_Keyfile = v; m_Keyfile = v;
return true;
} }
return false; return true;
} }
void void

Loading…
Cancel
Save