hive can now instantiate and start relays/clients separately

pull/1184/head
Thomas Winget 4 years ago
parent 2bbb274131
commit ee7b7e917d

@ -31,7 +31,7 @@ def MakeEndpoint(router, after):
if after is not None:
router.CallSafe(lambda : after(ep))
def AddRouter(hive, index, netid="hive"):
def AddRelay(hive, index, netid="hive"):
dirname = "%s/routers/%d" % (tmpdir, index)
makedirs("%s/netdb" % dirname, exist_ok=True)
@ -70,7 +70,7 @@ def AddRouter(hive, index, netid="hive"):
if index != 1:
config.bootstrap.routers = ["%s/routers/1/rc.signed" % tmpdir]
hive.AddRouter(config)
hive.AddRelay(config)
def AddClient(hive, index, netid="hive"):
@ -101,9 +101,9 @@ def AddClient(hive, index, netid="hive"):
config.bootstrap.routers = ["%s/routers/1/rc.signed" % tmpdir]
hive.AddRouter(config)
hive.AddClient(config)
def main(n_routers=10, n_clients=10):
def main(n_relays=10, n_clients=10):
pyllarp.EnableDebug()
running = True
if not RemoveTmpDir(tmpdir):
@ -117,14 +117,14 @@ def main(n_routers=10, n_clients=10):
signal(SIGINT, handle_sigint)
hive = pyllarp.RouterHive()
AddRouter(hive, 1)
hive.StartAll()
AddRelay(hive, 1)
hive.StartRelays()
print("sleeping 2 sec to give plenty of time to save bootstrap rc")
for i in range(2):
print(i+1)
sleep(1)
print("Resetting hive. Creating %d routers and %d clients" % (n_routers, n_clients))
print("Resetting hive. Creating %d relays and %d clients" % (n_relays, n_clients))
hive.StopAll()
hive = pyllarp.RouterHive()
@ -144,15 +144,26 @@ def main(n_routers=10, n_clients=10):
def broadcastTo(addr, pkt):
hive.ForEachRouter(lambda r : sendToAddress(r, addr, pkt))
for i in range(1, n_routers + 1):
AddRouter(hive, i)
for i in range(1, n_relays + 1):
AddRelay(hive, i)
for i in range(1, n_clients + 1):
AddClient(hive, i)
hive.StartAll()
print("Starting relays")
hive.StartRelays()
sleep(1)
hive.ForEachRelay(lambda r: MakeEndpoint(r, onGotEndpoint))
print("Sleeping 5 seconds before starting clients")
sleep(5)
hive.StartClients()
sleep(1)
hive.ForEachRouter(lambda r: MakeEndpoint(r, onGotEndpoint))
hive.ForEachClient(lambda r: MakeEndpoint(r, onGotEndpoint))
total_events = 0

@ -10,22 +10,33 @@ using namespace std::chrono_literals;
namespace tooling
{
void
RouterHive::AddRouter(const std::shared_ptr<llarp::Config> & config)
RouterHive::AddRouter(const std::shared_ptr<llarp::Config> & config, std::vector<llarp_main *> *routers)
{
llarp_main* ctx = llarp_main_init_from_config(config->Copy());
if(llarp_main_setup(ctx) == 0)
{
llarp::Context::Get(ctx)->InjectHive(this);
routers.push_back(ctx);
routers->push_back(ctx);
}
}
void
RouterHive::StartRouters()
RouterHive::AddRelay(const std::shared_ptr<llarp::Config> & config)
{
AddRouter(config, &relays);
}
void
RouterHive::AddClient(const std::shared_ptr<llarp::Config> & config)
{
for (llarp_main* ctx : routers)
AddRouter(config, &clients);
}
void
RouterHive::StartRouters(std::vector<llarp_main *> *routers)
{
for (llarp_main* ctx : *routers)
{
routerMainThreads.emplace_back([ctx](){
llarp_main_run(ctx, llarp_main_runtime_opts{false, false, false});
@ -34,18 +45,41 @@ namespace tooling
}
}
void
RouterHive::StartRelays()
{
StartRouters(&relays);
}
void
RouterHive::StartClients()
{
StartRouters(&clients);
}
void
RouterHive::StopRouters()
{
llarp::LogInfo("Signalling all routers to stop");
for (llarp_main* ctx : routers)
for (llarp_main* ctx : relays)
{
llarp_main_signal(ctx, 2 /* SIGINT */);
}
for (llarp_main* ctx : clients)
{
llarp_main_signal(ctx, 2 /* SIGINT */);
}
llarp::LogInfo("Waiting on routers to be stopped");
for (llarp_main* ctx : routers)
for (llarp_main* ctx : relays)
{
while(llarp_main_is_running(ctx))
{
std::this_thread::sleep_for(10ms);
}
}
for (llarp_main* ctx : clients)
{
while(llarp_main_is_running(ctx))
{
@ -89,18 +123,34 @@ namespace tooling
}
void
RouterHive::VisitRouter(size_t index, std::function<void(Context_ptr)> visit)
RouterHive::VisitRouter(llarp_main *router, std::function<void(Context_ptr)> visit)
{
auto ctx = llarp::Context::Get(router);
LogicCall(ctx->logic, [visit, ctx]() {
visit(ctx);
});
}
void
RouterHive::VisitRelay(size_t index, std::function<void(Context_ptr)> visit)
{
if(index >= routers.size())
if(index >= relays.size())
{
visit(nullptr);
return;
}
auto * r = routers[index];
auto ctx = llarp::Context::Get(r);
LogicCall(ctx->logic, [visit, ctx]() {
visit(ctx);
});
VisitRouter(relays[index], visit);
}
void
RouterHive::VisitClient(size_t index, std::function<void(Context_ptr)> visit)
{
if(index >= clients.size())
{
visit(nullptr);
return;
}
VisitRouter(clients[index], visit);
}
} // namespace tooling

@ -13,7 +13,7 @@
struct llarp_config;
struct llarp_main;
namespace llarp
namespace llarp
{
struct Context;
}
@ -23,13 +23,42 @@ namespace tooling
struct RouterHive
{
using Context_ptr = std::shared_ptr<llarp::Context>;
private:
void
StartRouters(std::vector<llarp_main *> *routers);
void
AddRouter(const std::shared_ptr<llarp::Config> & config, std::vector<llarp_main *> *routers);
/// safely visit router
void
VisitRouter(llarp_main *router, std::function<void(Context_ptr)> visit);
/// safely visit relay at index N
void
VisitRelay(size_t index, std::function<void(Context_ptr)> visit);
/// safely visit client at index N
void
VisitClient(size_t index, std::function<void(Context_ptr)> visit);
public:
RouterHive() = default;
void
AddRouter(const std::shared_ptr<llarp::Config> & conf);
AddRelay(const std::shared_ptr<llarp::Config> & conf);
void
AddClient(const std::shared_ptr<llarp::Config> & conf);
void
StartRelays();
void
StartRouters();
StartClients();
void
StopRouters();
@ -40,25 +69,37 @@ namespace tooling
RouterEventPtr
GetNextEvent();
void
ForEachRelay(std::function<void(Context_ptr)> visit)
{
for(size_t idx = 0; idx < relays.size(); ++idx)
{
VisitRelay(idx, visit);
}
}
using Context_ptr = std::shared_ptr<llarp::Context>;
/// safely visit every router context
void
ForEachRouter(std::function<void(Context_ptr)> visit)
void
ForEachClient(std::function<void(Context_ptr)> visit)
{
for(size_t idx = 0; idx < routers.size(); ++idx)
for(size_t idx = 0; idx < clients.size(); ++idx)
{
VisitRouter(idx, visit);
VisitClient(idx, visit);
}
}
/// safely visit router at index N
/// safely visit every router context
void
VisitRouter(size_t index, std::function<void(Context_ptr)> visit);
ForEachRouter(std::function<void(Context_ptr)> visit)
{
ForEachRelay(visit);
ForEachClient(visit);
}
std::vector<llarp_main *> routers;
std::vector<llarp_main *> relays;
std::vector<llarp_main *> clients;
std::vector<std::thread> routerMainThreads;

@ -23,12 +23,12 @@ namespace tooling
.def_readonly("prevHop", &PathRequestReceivedEvent::prevHop)
.def_readonly("nextHop", &PathRequestReceivedEvent::nextHop)
.def_readonly("isEndpoint", &PathRequestReceivedEvent::isEndpoint);
py::class_<PubIntroReceivedEvent, RouterEvent>(mod, "DhtPubIntroReceievedEvent")
py::class_<PubIntroReceivedEvent, RouterEvent>(mod, "DhtPubIntroReceivedEvent")
.def_readonly("from", &PubIntroReceivedEvent::From)
.def_readonly("location", &PubIntroReceivedEvent::IntrosetLocation)
.def_readonly("relayOrder", &PubIntroReceivedEvent::RelayOrder)
.def_readonly("txid", &PubIntroReceivedEvent::TxID);
py::class_<GotIntroReceivedEvent, RouterEvent>(mod, "DhtGotIntroReceievedEvent")
py::class_<GotIntroReceivedEvent, RouterEvent>(mod, "DhtGotIntroReceivedEvent")
.def_readonly("from", &GotIntroReceivedEvent::From)
.def_readonly("location", &GotIntroReceivedEvent::Introset)
.def_readonly("relayOrder", &GotIntroReceivedEvent::RelayOrder)

@ -10,11 +10,14 @@ namespace tooling
using RouterHive_ptr = std::shared_ptr< RouterHive >;
py::class_< RouterHive, RouterHive_ptr >(mod, "RouterHive")
.def(py::init<>())
.def("AddRouter", &RouterHive::AddRouter)
.def("StartAll", &RouterHive::StartRouters)
.def("AddRelay", &RouterHive::AddRelay)
.def("AddClient", &RouterHive::AddClient)
.def("StartRelays", &RouterHive::StartRelays)
.def("StartClients", &RouterHive::StartClients)
.def("StopAll", &RouterHive::StopRouters)
.def("ForEachRelay", &RouterHive::ForEachRelay)
.def("ForEachClient", &RouterHive::ForEachClient)
.def("ForEachRouter", &RouterHive::ForEachRouter)
.def("VisitRouter", &RouterHive::VisitRouter)
.def("GetNextEvent", &RouterHive::GetNextEvent);
}

Loading…
Cancel
Save