some more changes to pybind/hive code, read below

hive.py is currently largely for testing the pybind stuff, so changes to it will likely
be frequent and arbitrary for now.

Added pybind for llarp::path::PathHopConfig, but not every member -- just rc and upstream routerID

Hive now uses std::queue with mutex instead of our lockless queue.

Removed some functions from Hive that will not be necessary as things are being handled from python.
pull/1184/head
Thomas Winget 4 years ago
parent e0187fd690
commit c9a278c0de

@ -22,11 +22,9 @@ endpointName = "pyllarp"
def MakeEndpoint(router, after):
if router.IsRelay():
print("discarding make endpoint")
return
ep = pyllarp.Endpoint(endpointName, router)
router.AddEndpoint(ep)
print("made endpoint: {}".format(ep.OurAddress()))
if after is not None:
router.CallSafe(lambda : after(ep))
@ -102,7 +100,7 @@ def AddClient(hive, index, netid="hive"):
hive.AddRouter(config)
def main():
def main(n_routers=10, n_clients=10):
pyllarp.EnableDebug()
running = True
RemoveTmpDir(tmpdir)
@ -117,22 +115,20 @@ def main():
hive = pyllarp.RouterHive()
AddRouter(hive, 1)
hive.StartAll()
sleep(5)
print("sleeping 2 sec to give plenty of time to save bootstrap rc")
for i in range(2):
print(i+1)
sleep(1)
print('stop')
print("Resetting hive. Creating %d routers and %d clients" % (n_routers, n_clients))
hive.StopAll()
r = pyllarp.RouterContact()
r.ReadFile("/tmp/lokinet_hive/routers/1/rc.signed")
print(r.ToString())
hive = pyllarp.RouterHive()
addrs = []
def onGotEndpoint(ep):
addr = ep.OurAddress()
print("got endpoint: {}".format(addr))
addrs.append(pyllarp.ServiceAddress(addr))
def sendToAddress(router, toaddr, pkt):
@ -144,10 +140,10 @@ def main():
def broadcastTo(addr, pkt):
hive.ForEachRouter(lambda r : sendToAddress(r, addr, pkt))
for i in range(1, 11):
for i in range(1, n_routers + 1):
AddRouter(hive, i)
for i in range(1, 11):
for i in range(1, n_clients + 1):
AddClient(hive, i)
hive.StartAll()
@ -158,7 +154,12 @@ def main():
while running:
event = hive.GetNextEvent()
if event:
print("Event: %s -- Triggered: %s" % (event.__class__.__name__, event.triggered))
print(event)
hops = getattr(event, "hops", None)
if hops:
for hop in hops:
print(hop)
print('stopping')
hive.StopAll()

@ -447,8 +447,6 @@ namespace llarp
std::move(path_shortName));
LogInfo(Name(), " build ", path->ShortName(), ": ", path->HopsString());
tooling::RouterEventPtr event = std::make_unique<tooling::PathBuildAttemptEvent>(m_router->pubkey(), path->hops);
m_router->NotifyRouterEvent(std::move(event));
path->SetBuildResultHook(
[self](Path_ptr p) { self->HandlePathBuilt(p); });
@ -461,7 +459,11 @@ namespace llarp
{
buildIntervalLimit = MIN_PATH_BUILD_INTERVAL;
m_router->routerProfiling().MarkPathSuccess(p.get());
LogInfo(p->Name(), " built latency=", p->intro.latency);
tooling::RouterEventPtr event = std::make_unique<tooling::PathBuildAttemptEvent>(m_router->pubkey(), p->hops);
m_router->NotifyRouterEvent(std::move(event));
m_BuildStats.success++;
}

@ -7,22 +7,16 @@
namespace tooling
{
RouterEvent::RouterEvent(llarp::RouterID routerID)
: routerID(routerID)
RouterEvent::RouterEvent(llarp::RouterID routerID, bool triggered)
: routerID(routerID), triggered(triggered)
{
}
PathBuildAttemptEvent::PathBuildAttemptEvent(const llarp::RouterID& routerID, std::vector<llarp::path::PathHopConfig> hops)
: RouterEvent(routerID), hops(hops)
: RouterEvent(routerID, false), hops(hops)
{
}
void
PathBuildAttemptEvent::Process(RouterHive& hive) const
{
hive.ProcessPathBuildAttempt(*this);
}
std::string
PathBuildAttemptEvent::ToString() const
{

@ -25,15 +25,15 @@ namespace tooling
struct RouterEvent
{
RouterEvent(llarp::RouterID);
RouterEvent(llarp::RouterID, bool triggered);
virtual ~RouterEvent() = default;
virtual void Process(RouterHive& hive) const = 0;
virtual std::string ToString() const = 0;
llarp::RouterID routerID;
bool triggered = false;
};
using RouterEventPtr = std::unique_ptr<RouterEvent>;
@ -43,8 +43,6 @@ namespace tooling
{
PathBuildAttemptEvent(const llarp::RouterID& routerID, std::vector<llarp::path::PathHopConfig> hops);
void Process(RouterHive& hive) const;
std::string ToString() const override;
std::vector<llarp::path::PathHopConfig> hops;

@ -11,12 +11,6 @@ using namespace std::chrono_literals;
namespace tooling
{
const size_t RouterHive::MAX_EVENT_QUEUE_SIZE = 200;
RouterHive::RouterHive(size_t eventQueueSize) : eventQueue(eventQueueSize)
{
}
void
RouterHive::AddRouter(const std::shared_ptr<llarp::Config> & config)
{
@ -44,80 +38,56 @@ namespace tooling
RouterHive::StopRouters()
{
llarp::LogWarn("Signalling all routers to stop");
llarp::LogInfo("Signalling all routers to stop");
for (llarp_main* ctx : routers)
{
llarp_main_signal(ctx, 2 /* SIGINT */);
}
size_t i=0;
llarp::LogWarn("Waiting on routers to be stopped");
llarp::LogInfo("Waiting on routers to be stopped");
for (llarp_main* ctx : routers)
{
while(llarp_main_is_running(ctx))
{
llarp::LogWarn("Waiting on router ", i, " to stop");
std::this_thread::sleep_for(10ms);
}
i++;
}
//llarp::LogWarn("Joining router threads");
//i=0;
llarp::LogInfo("Joining all router threads");
for (auto& thread : routerMainThreads)
{
//llarp::LogWarn("Attempting to join router thread ", i);
while (not thread.joinable())
{
//llarp::LogWarn("Waiting on router thread ", i, " to be joinable");
std::this_thread::sleep_for(500ms);
}
thread.join();
//llarp::LogWarn("Joined router thread ", i);
//i++;
}
llarp::LogWarn("RouterHive::StopRouters finished");
llarp::LogInfo("RouterHive::StopRouters finished");
}
void
RouterHive::NotifyEvent(RouterEventPtr event)
{
if(eventQueue.tryPushBack(std::move(event))
!= llarp::thread::QueueReturn::Success)
{
llarp::LogError("RouterHive Event Queue appears to be full. Either implement/change time dilation or increase the queue size.");
}
}
void
RouterHive::ProcessEventQueue()
{
while(not eventQueue.empty())
{
RouterEventPtr event = eventQueue.popFront();
std::lock_guard<std::mutex> guard{eventQueueMutex};
event->Process(*this);
}
eventQueue.push(std::move(event));
}
RouterEventPtr
RouterHive::GetNextEvent()
{
auto ptr = eventQueue.popFrontWithTimeout(50ms);
if (ptr)
std::lock_guard<std::mutex> guard{eventQueueMutex};
if (not eventQueue.empty())
{
return std::move(ptr.value());
auto ptr = std::move(eventQueue.front());
eventQueue.pop();
return ptr;
}
return nullptr;
}
void
RouterHive::ProcessPathBuildAttempt(const PathBuildAttemptEvent& event)
{
}
void
RouterHive::VisitRouter(size_t index, std::function<void(Context_ptr)> visit)
{

@ -4,10 +4,11 @@
#include <llarp.h>
#include <config/config.hpp>
#include <util/thread/queue.hpp>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
struct llarp_config;
struct llarp_main;
@ -22,9 +23,7 @@ namespace tooling
struct RouterHive
{
static const size_t MAX_EVENT_QUEUE_SIZE;
RouterHive(size_t eventQueueSize = MAX_EVENT_QUEUE_SIZE);
RouterHive() = default;
void
AddRouter(const std::shared_ptr<llarp::Config> & conf);
@ -38,9 +37,6 @@ namespace tooling
void
NotifyEvent(RouterEventPtr event);
void
ProcessEventQueue();
RouterEventPtr
GetNextEvent();
@ -61,19 +57,13 @@ namespace tooling
void
VisitRouter(size_t index, std::function<void(Context_ptr)> visit);
/*
* Event processing function declarations
*/
void
ProcessPathBuildAttempt(const PathBuildAttemptEvent& event);
std::vector<llarp_main *> routers;
std::vector<std::thread> routerMainThreads;
llarp::thread::Queue<RouterEventPtr> eventQueue;
std::mutex eventQueueMutex;
std::queue<RouterEventPtr> eventQueue;
};
} // namespace tooling

@ -107,14 +107,21 @@ namespace llarp
/** internal */
template < typename... TArgs >
inline static void
#ifndef LOKINET_HIVE
_Log(LogLevel lvl, const char* fname, int lineno, TArgs&&... args) noexcept
#else
_Log(LogLevel, const char*, int, TArgs&&...) noexcept
#endif
{
/* nop out logging for hive mode for now */
#ifndef LOKINET_HIVE
auto& log = LogContext::Instance();
if(log.curLevel > lvl)
return;
std::stringstream ss;
LogAppend(ss, std::forward< TArgs >(args)...);
log.logStream->AppendLog(lvl, fname, lineno, log.nodeName, ss.str());
#endif
}
/*
std::stringstream ss;

@ -4,9 +4,11 @@ find_package(pybind11 REQUIRED)
set(LLARP_PYBIND_SRC
module.cpp
llarp/context.cpp
llarp/router_id.cpp
llarp/router_contact.cpp
llarp/crypto/types.cpp
llarp/config.cpp
llarp/path/path_hop_config.cpp
llarp/handlers/pyhandler.cpp
llarp/tooling/router_hive.cpp
llarp/tooling/router_event.cpp

@ -40,12 +40,21 @@ namespace llarp
void
CryptoTypes_Init(py::module &mod);
void
RouterID_Init(py::module &mod);
void
RouterContact_Init(py::module &mod);
void
Config_Init(py::module & mod);
namespace path
{
void
PathHopConfig_Init(py::module & mod);
}
namespace handlers
{
void

@ -10,13 +10,6 @@ namespace llarp
.def(py::init<>())
.def("FromHex", &PubKey::FromString)
.def("__repr__", &PubKey::ToString);
py::class_< RouterID >(mod, "RouterID")
.def(py::init<>())
.def("FromHex",
[](RouterID* r, const std::string& hex) -> bool {
return HexDecode(hex.c_str(), r->data(), r->size());
})
.def("__repr__", &RouterID::ToString);
py::class_< SecretKey >(mod, "SecretKey")
.def(py::init<>())
.def("LoadFile", &SecretKey::LoadFromFile)

@ -12,10 +12,11 @@ namespace tooling
py::class_<RouterEvent>(mod, "RouterEvent")
.def("__repr__", &RouterEvent::ToString)
.def("__str__", &RouterEvent::ToString)
.def_readonly("routerID", &RouterEvent::routerID);
.def_readonly("routerID", &RouterEvent::routerID)
.def_readonly("triggered", &RouterEvent::triggered);
py::class_<PathBuildAttemptEvent, RouterEvent>(mod, "PathBuildAttemptEvent");
//.def_readonly("hops", &PathBuildAttemptEvent::hops);
py::class_<PathBuildAttemptEvent, RouterEvent>(mod, "PathBuildAttemptEvent")
.def_readonly("hops", &PathBuildAttemptEvent::hops);
}
} // namespace tooling

@ -5,10 +5,12 @@ PYBIND11_MODULE(pyllarp, m)
{
tooling::RouterHive_Init(m);
tooling::RouterEvent_Init(m);
llarp::RouterID_Init(m);
llarp::RouterContact_Init(m);
llarp::CryptoTypes_Init(m);
llarp::Context_Init(m);
llarp::Config_Init(m);
llarp::path::PathHopConfig_Init(m);
llarp::handlers::PyHandler_Init(m);
llarp::service::Address_Init(m);
m.def("EnableDebug", []() {

Loading…
Cancel
Save