lokinet/llarp/ev/ev_win32.cpp
Thomas Winget a91bb35dbf
Some Windows fixes (#1415)
* Should fix some windows service issues

* fix return condition inversion

* Add some Trace level logging

also make the logger actually respect the log level you set.

* event loop should not queue things to itself...

at present, logic thread queue continues until it is empty, so
queueing things onto itself is just wasteful.

* call_later(foreach thing) is better than foreach thing (call later)

also if you already queued those things but they have not happened yet,
there is no sense to queue them to happen again.

* do not queue read on write finish, only on read finish

* failure to start DNS server should be proper startup failure.

without the DNS server working lokinet is...kinda pointless, right?

* format

* don't queue stuff to logic thread if in logic thread
the thing that clears the queue...clears it.  So you're just delaying and adding overhead.

* windows unbound thread sleep instead of just busy-waiting

also clang-format decided I can't have a blank line for some reason...

* fix unbound async worker on windows
2020-10-21 09:06:43 -04:00

227 lines
5.8 KiB
C++

#include <ev/ev_win32.hpp>
#ifdef _WIN32
#include <util/logging/logger.hpp>
#include <atomic>
// a single event queue for the TUN interface
static HANDLE tun_event_queue = INVALID_HANDLE_VALUE;
// we hand the kernel our thread handles to process completion events
static HANDLE* kThreadPool;
static int poolSize;
// list of TUN listeners (useful for exits or other nodes with multiple TUNs)
std::list<win32_tun_io*> tun_listeners;
void
begin_tun_loop(int nThreads, llarp_ev_loop* loop)
{
kThreadPool = new HANDLE[nThreads];
for (int i = 0; i < nThreads; ++i)
{
kThreadPool[i] = CreateThread(nullptr, 0, &tun_ev_loop, loop, 0, nullptr);
}
llarp::LogInfo("created ", nThreads, " threads for TUN event queue");
poolSize = nThreads;
}
// this one is called from the TUN handler
bool
win32_tun_io::queue_write(const byte_t* buf, size_t sz)
{
return do_write((void*)buf, sz);
}
bool
win32_tun_io::setup()
{
if (tuntap_start(tunif, TUNTAP_MODE_TUNNEL, 0) == -1)
{
llarp::LogWarn("failed to start interface");
return false;
}
if (tuntap_up(tunif) == -1)
{
char ebuf[1024];
int err = GetLastError();
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, nullptr, err, LANG_NEUTRAL, ebuf, 1024, nullptr);
llarp::LogWarn("failed to put interface up: ", ebuf);
return false;
}
tunif->bindaddr = t->dnsaddr;
if (tuntap_set_ip(tunif, t->ifaddr, t->ifaddr, t->netmask) == -1)
{
llarp::LogWarn("failed to set ip");
return false;
}
if (tunif->tun_fd == INVALID_HANDLE_VALUE)
return false;
return true;
}
// first TUN device gets to set up the event port
bool
win32_tun_io::add_ev(llarp_ev_loop* loop)
{
if (tun_event_queue == INVALID_HANDLE_VALUE)
{
SYSTEM_INFO sys_info;
GetSystemInfo(&sys_info);
unsigned long numCPU = sys_info.dwNumberOfProcessors;
// let the system handle 2x the number of CPUs or hardware
// threads
tun_event_queue = CreateIoCompletionPort(tunif->tun_fd, nullptr, (ULONG_PTR)this, numCPU * 2);
begin_tun_loop(numCPU * 2, loop);
}
else
CreateIoCompletionPort(tunif->tun_fd, tun_event_queue, (ULONG_PTR)this, 0);
// we're already non-blocking
// add to list
tun_listeners.push_back(this);
byte_t* readbuf = (byte_t*)malloc(1500);
read(readbuf, 1500);
return true;
}
// places data in event queue for kernel to process
bool
win32_tun_io::do_write(void* data, size_t sz)
{
DWORD code;
asio_evt_pkt* pkt = new asio_evt_pkt;
pkt->buf = data;
pkt->sz = sz;
pkt->write = true;
memset(&pkt->pkt, '\0', sizeof(pkt->pkt));
WriteFile(tunif->tun_fd, data, sz, nullptr, &pkt->pkt);
code = GetLastError();
// llarp::LogInfo("wrote data, error ", code);
return (code == 0 || code == 997);
}
// while this one is called from the event loop
// eventually comes back and calls queue_write()
void
win32_tun_io::flush_write()
{
if (t->before_write)
t->before_write(t);
}
void
win32_tun_io::read(byte_t* buf, size_t sz)
{
asio_evt_pkt* pkt = new asio_evt_pkt;
pkt->buf = buf;
memset(&pkt->pkt, '\0', sizeof(OVERLAPPED));
pkt->sz = sz;
pkt->write = false;
ReadFile(tunif->tun_fd, buf, sz, nullptr, &pkt->pkt);
}
// and now the event loop itself
extern "C" DWORD FAR PASCAL
tun_ev_loop(void* u)
{
llarp_ev_loop* logic = static_cast<llarp_ev_loop*>(u);
DWORD size = 0;
OVERLAPPED* ovl = nullptr;
ULONG_PTR listener = 0;
asio_evt_pkt* pkt = nullptr;
BOOL alert;
std::atomic_flag tick_queued;
while (true)
{
alert = GetQueuedCompletionStatus(tun_event_queue, &size, &listener, &ovl, EV_TICK_INTERVAL);
if (!alert)
{
// tick listeners on io timeout, this is required to be done every tick
// cycle regardless of any io being done, this manages the internal state
// of the tun logic
if (tick_queued.test_and_set())
continue; // if tick queued, don't queue another
logic->call_soon([&]() {
for (const auto& tun : tun_listeners)
{
tun->flush_write();
if (tun->t->tick)
tun->t->tick(tun->t);
}
tick_queued.clear();
});
continue;
}
if (listener == (ULONG_PTR)~0)
break;
// if we're here, then we got something interesting :>
pkt = (asio_evt_pkt*)ovl;
win32_tun_io* ev = reinterpret_cast<win32_tun_io*>(listener);
if (!pkt->write)
{
// llarp::LogInfo("read tun ", size, " bytes, pass to handler");
logic->call_soon([pkt, size, ev]() {
if (ev->t->recvpkt)
ev->t->recvpkt(ev->t, llarp_buffer_t(pkt->buf, size));
free(pkt->buf);
delete pkt;
});
byte_t* readbuf = (byte_t*)malloc(1500);
ev->read(readbuf, 1500);
}
logic->call_soon([ev]() {
ev->flush_write();
if (ev->t->tick)
ev->t->tick(ev->t);
});
}
llarp::LogDebug("exit TUN event loop thread from system managed thread pool");
return 0;
}
void
exit_tun_loop()
{
if (kThreadPool)
{
// kill the kernel's thread pool
// int i = (&kThreadPool)[1] - kThreadPool; // get the size of our thread
// pool
llarp::LogInfo("closing ", poolSize, " threads");
// if we get all-ones in the queue, thread exits, and we clean up
for (int j = 0; j < poolSize; ++j)
PostQueuedCompletionStatus(tun_event_queue, 0, ~0, nullptr);
WaitForMultipleObjects(poolSize, kThreadPool, TRUE, INFINITE);
for (int j = 0; j < poolSize; ++j)
CloseHandle(kThreadPool[j]);
delete[] kThreadPool;
kThreadPool = nullptr;
// the IOCP refcount is decreased each time an associated fd
// is closed
// the fds are closed in their destructors
// once we get to zero, we can safely close the event port
auto itr = tun_listeners.begin();
while (itr != tun_listeners.end())
{
delete (*itr);
itr = tun_listeners.erase(itr);
}
CloseHandle(tun_event_queue);
}
}
#endif