implement tcp evloop on win32

(with its now-obsolete distinction between socketfd and fd)

sockets and file descriptors used to be distinct objects....back in the
16-bit Winsock 1.1 era, which needlessly complicated the 32-bit port
back then. these days one can use [Read|Write]File(2) to operate on
sockfds...which also have some of the semantics of [read|write]v(2)
i.e. the scatter-gather thing it's known for is done in async handler

-rick

variants are nice

added note to self

clang-format

link abyss properly

oops

shut up

*shrugs*

oops forgot to start winsock

moved our async io status flags to the base class

let derived classes override them as needed

this is probably a synchronous op _anyway_

fix typo

wtf
pull/36/head
despair 6 years ago
parent e5ef2cdd27
commit d425b5d308

@ -131,6 +131,8 @@ if(JEMALLOC)
endif()
if (WIN32)
add_cflags("-Wno-unused-function")
add_cxxflags("-Wno-unused-function")
add_cxxflags("-std=c++17")
set(FS_LIB stdc++fs)
endif(WIN32)
@ -511,9 +513,9 @@ if(USE_LIBABYSS)
${ABYSS}/src/server.cpp
${ABYSS}/src/json.cpp)
add_library(${ABYSS_LIB} STATIC ${ABYSS_SRC})
set(ALL_SRC ${ALL_SRC} ${ABYSS_SRC} ${ABYSS}/main.cpp)
endif()
add_library(${ABYSS_LIB} STATIC ${ABYSS_SRC})
set(ALL_SRC ${CLIENT_SRC} ${RC_SRC} ${EXE_SRC} ${DNS_SRC} ${LIB_PLATFORM_SRC} ${LIB_SRC} ${TEST_SRC} ${ABYSS_SRC} ${ABYSS}/main.cpp)
foreach(F ${ALL_SRC})
set_source_files_properties(${F} PROPERTIES COMPILE_FLAGS -DLOG_TAG=\\\"${F}\\\")
@ -551,12 +553,14 @@ if(WITH_STATIC)
target_link_libraries(${RC_EXE} ${STATIC_LINK_LIBS} ${STATIC_LIB} ${PLATFORM_LIB})
target_link_libraries(${TEST_EXE} ${STATIC_LINK_LIBS} gtest_main ${STATIC_LIB} ${PLATFORM_LIB})
target_link_libraries(${DNS_EXE} ${STATIC_LIB} ${PLATFORM_LIB} ${THREAD_LIB})
target_link_libraries(${ABYSS_EXE} ${STATIC_LIB})
if (WIN32)
target_link_libraries(${EXE} ${STATIC_LINK_LIBS} ${STATIC_LIB} ${PLATFORM_LIB} ws2_32 iphlpapi)
target_link_libraries(${CLIENT_EXE} ${STATIC_LINK_LIBS} ${STATIC_LIB} ${PLATFORM_LIB} ws2_32 iphlpapi)
target_link_libraries(${RC_EXE} ${STATIC_LINK_LIBS} ${STATIC_LIB} ${PLATFORM_LIB} ws2_32 iphlpapi)
target_link_libraries(${TEST_EXE} ${STATIC_LINK_LIBS} gtest_main ${STATIC_LIB} ${PLATFORM_LIB} ws2_32 iphlpapi)
target_link_libraries(${DNS_EXE} ${STATIC_LIB} ${PLATFORM_LIB} ${THREAD_LIB} ws2_32 iphlpapi)
target_link_libraries(${ABYSS_EXE} ${STATIC_LINK_LIBS} ${STATIC_LIB} ${PLATFORM_LIB} ws2_32 iphlpapi)
endif(WIN32)
endif(NOT WITH_SHARED)
endif(WITH_STATIC)

@ -129,7 +129,7 @@ abyss: debug
$(ABYSS_EXE)
format:
clang-format -i $$(find daemon llarp include | grep -E '\.[h,c](pp)?$$')
clang-format -i $$(find daemon llarp include libabyss | grep -E '\.[h,c](pp)?$$')
analyze: clean
mkdir -p '$(BUILD_ROOT)'

@ -18,7 +18,7 @@ namespace llarp
}
EncryptedFrame(const EncryptedFrame& other)
: EncryptedFrame(other.data(), other.size())
: EncryptedFrame(other.data(), other.size())
{
}

@ -40,7 +40,7 @@ namespace llarp
bool firstKey;
char key;
dict_reader reader;
std::unique_ptr<IMessage> msg;
std::unique_ptr< IMessage > msg;
};
} // namespace routing
} // namespace llarp

@ -41,6 +41,15 @@ main(int argc, char* argv[])
// Microsoft libc only covers six signals
#ifndef _WIN32
signal(SIGPIPE, SIG_IGN);
#else
WSADATA wsockd;
int err;
err = ::WSAStartup(MAKEWORD(2, 2), &wsockd);
if(err)
{
perror("Failed to start Windows Sockets");
return err;
}
#endif
llarp_threadpool* threadpool = llarp_init_same_process_threadpool();
llarp_ev_loop* loop = nullptr;

@ -271,7 +271,7 @@ namespace abyss
const char* end = strstr(buf, "\r\n");
while(end)
{
string_view line(buf, end);
string_view line(buf, *end);
switch(m_State)
{
case eReadHTTPMethodLine:

@ -52,7 +52,7 @@ namespace llarp
dec->msg.reset(new GotRouterMessage(dec->From, dec->relayed));
break;
case 'I':
dec->msg.reset( new PublishIntroMessage());
dec->msg.reset(new PublishIntroMessage());
break;
case 'G':
if(dec->relayed)
@ -88,7 +88,7 @@ namespace llarp
r.on_key = &MessageDecoder::on_key;
if(!bencode_read_dict(buf, &r))
return nullptr;
return std::unique_ptr< IMessage >(std::move(dec.msg));
}

@ -605,8 +605,9 @@ llarp_host_resolved(dnsc_answer_request *const request)
dns_tracker *tracker = (dns_tracker *)request->context->tracker;
auto val = std::find_if(
tracker->client_request.begin(), tracker->client_request.end(),
[request](std::pair< const uint32_t, std::unique_ptr< dnsc_answer_request > >
&element) { return element.second.get() == request; });
[request](
std::pair< const uint32_t, std::unique_ptr< dnsc_answer_request > >
&element) { return element.second.get() == request; });
if(val != tracker->client_request.end())
{
tracker->client_request[val->first].reset();

@ -151,9 +151,9 @@ writesend_dnss_revresponse(std::string reverse, const struct sockaddr *from,
dnsd_question_request *request)
{
const size_t BUFFER_SIZE = 1500;
char buf[BUFFER_SIZE] = {0};
char *write_buffer = buf;
char *bufferBegin = buf;
char buf[BUFFER_SIZE] = {0};
char *write_buffer = buf;
char *bufferBegin = buf;
// build header
put16bits(write_buffer, request->id);
int fields = (1 << 15); // QR => message type, 1 = response

@ -11,7 +11,7 @@ namespace llarp
}
Encrypted::Encrypted(const Encrypted& other)
: Encrypted(other.data(), other.size())
: Encrypted(other.data(), other.size())
{
}

@ -205,13 +205,22 @@ namespace llarp
int
tcp_serv::read(void *, size_t)
{
#ifndef _WIN32
int new_fd = ::accept(fd, nullptr, nullptr);
if(new_fd == -1)
{
llarp::LogError("failed to accept on ", fd, ":", strerror(errno));
return -1;
}
#else
SOCKET new_fd = ::accept(std::get< SOCKET >(fd), nullptr, nullptr);
if(new_fd == INVALID_SOCKET)
{
llarp::LogError("failed to accept on ", std::get< SOCKET >(fd), ":",
strerror(errno));
return -1;
}
#endif
llarp_tcp_conn *conn = new llarp_tcp_conn;
// zero out callbacks
conn->tick = nullptr;

@ -102,18 +102,34 @@ namespace llarp
{
}
#else
// on windows, udp event loops are socket fds
// on windows, tcp/udp event loops are socket fds
// and TUN device is a plain old fd
std::variant< SOCKET, HANDLE > fd;
// the unique completion key that helps us to
// identify the object instance for which we receive data
// Here, we'll use the address of the udp_listener instance, converted
// to its literal int/int64 representation.
// These....shouldn't be here, but because of the distinction,
// coupled with the async events api, we have to add our file
// descriptors to the event queue at object construction,
// unlike UNIX where these can be separated
ULONG_PTR listener_id = 0;
ev_io(SOCKET f) : fd(f), m_writeq("writequeue"){};
ev_io(HANDLE t)
: fd(t), m_writeq("writequeue"){}; // overload for TUN device, which
// _is_ a regular file descriptor
bool isTCP = false;
bool write = false;
WSAOVERLAPPED portfd[2];
// for udp?
ev_io(SOCKET f) : fd(f)
{
memset((void*)&portfd[0], 0, sizeof(WSAOVERLAPPED) * 2);
};
// for tun
ev_io(HANDLE t, LossyWriteQueue_t* q) : fd(t), m_LossyWriteQueue(q)
{
}
// for tcp
ev_io(SOCKET f, LosslessWriteQueue_t* q) : fd(f), m_BlockingWriteQueue(q)
{
memset((void*)&portfd[0], 0, sizeof(WSAOVERLAPPED) * 2);
isTCP = true;
}
#endif
virtual int
read(void* buf, size_t sz) = 0;
@ -139,7 +155,8 @@ namespace llarp
return write(fd, data, sz);
#else
DWORD w;
WriteFile(std::get< HANDLE >(fd), data, sz, &w, nullptr);
WriteFile(std::get< HANDLE >(fd), data, sz, nullptr, &portfd[1]);
GetOverlappedResult(std::get< HANDLE >(fd), &portfd[1], &w, TRUE);
return w;
#endif
}
@ -241,7 +258,8 @@ namespace llarp
#ifdef __linux__
return ::send(fd, buf, sz, MSG_NOSIGNAL); // ignore sigpipe
#else
return ::send(fd, buf, sz, 0);
// TODO: make async
return ::send(std::get< SOCKET >(fd), (char*)buf, sz, 0);
#endif
}
@ -250,7 +268,12 @@ namespace llarp
{
if(_shouldClose)
return -1;
#ifndef _WIN32
ssize_t amount = ::read(fd, buf, sz);
#else
// TODO: make async
ssize_t amount = ::recv(std::get< SOCKET >(fd), (char*)buf, sz, 0);
#endif
if(amount > 0)
{
if(tcp->read)
@ -323,7 +346,7 @@ struct llarp_ev_loop
#ifdef _WIN32
l->fd = std::get< SOCKET >(ev->fd);
#else
l->fd = ev->fd;
l->fd = ev->fd;
#endif
}
return ev && add_ev(ev, false);

@ -14,25 +14,18 @@ namespace llarp
{
llarp_udp_io* udp;
// we receive queued data in the OVERLAPPED data field,
// much like the pipefds in the UNIX kqueue and loonix
// epoll handles
WSAOVERLAPPED portfd[2];
udp_listener(SOCKET fd, llarp_udp_io* u) : ev_io(fd), udp(u)
{
memset((void*)&portfd[0], 0, sizeof(WSAOVERLAPPED) * 2);
};
udp_listener(SOCKET fd, llarp_udp_io* u) : ev_io(fd), udp(u){};
~udp_listener()
{
}
virtual void
bool
tick()
{
if(udp->tick)
udp->tick(udp);
return true;
}
virtual int
@ -95,7 +88,7 @@ namespace llarp
device* tunif;
OVERLAPPED* tun_async[2];
tun(llarp_tun_io* tio)
: ev_io(INVALID_HANDLE_VALUE)
: ev_io(INVALID_HANDLE_VALUE, new LossyWriteQueue_t("tun_write_queue"))
, t(tio)
, tunif(tuntap_init())
@ -119,11 +112,13 @@ namespace llarp
ev_io::flush_write();
}
void
bool
tick()
{
if(t->tick)
t->tick(t);
flush_write();
return true;
}
bool
@ -136,9 +131,10 @@ namespace llarp
read(void* buf, size_t sz)
{
ssize_t ret = tuntap_read(tunif, buf, sz);
if(ret > 4 && t->recvpkt)
if(ret > 0 && t->recvpkt)
// should have pktinfo
t->recvpkt(t, ((byte_t*)buf) + 4, ret - 4);
// I have no idea...
t->recvpkt(t, (byte_t*)buf, ret);
return ret;
}
@ -192,6 +188,7 @@ struct llarp_win32_loop : public llarp_ev_loop
{
if(iocpfd != INVALID_HANDLE_VALUE)
::CloseHandle(iocpfd);
iocpfd = INVALID_HANDLE_VALUE;
}
bool
@ -212,7 +209,7 @@ struct llarp_win32_loop : public llarp_ev_loop
{
// The only field we really care about is
// the listener_id, as it contains the address
// of the udp_listener instance.
// of the ev_io instance.
DWORD iolen = 0;
// ULONG_PTR is guaranteed to be the same size
// as an arch-specific pointer value
@ -224,13 +221,19 @@ struct llarp_win32_loop : public llarp_ev_loop
if(result && qdata)
{
llarp::udp_listener* ev = reinterpret_cast< llarp::udp_listener* >(ev_id);
llarp::ev_io* ev = reinterpret_cast< llarp::ev_io* >(ev_id);
if(ev)
{
llarp::LogDebug("size: ", iolen, "\tev_id: ", ev_id,
"\tqdata: ", qdata);
if(iolen <= sizeof(readbuf))
if(ev->write)
{
ev->flush_write();
}
else
{
ev->read(readbuf, iolen);
}
}
++idx;
}
@ -286,6 +289,44 @@ struct llarp_win32_loop : public llarp_ev_loop
return result;
}
llarp::ev_io*
bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* bindaddr)
{
DWORD on = 1;
SOCKET fd = ::socket(bindaddr->sa_family, SOCK_STREAM, 0);
if(fd == INVALID_SOCKET)
return nullptr;
socklen_t sz = sizeof(sockaddr_in);
if(bindaddr->sa_family == AF_INET6)
{
sz = sizeof(sockaddr_in6);
}
// keep. inexplicably, windows now has unix domain sockets
// for now, use the ID numbers directly until this comes out of
// beta
else if(bindaddr->sa_family == AF_UNIX)
{
sz = 110; // current size in 10.0.17763, verify each time the beta PSDK
// is updated
}
if(::bind(fd, bindaddr, sz) == SOCKET_ERROR)
{
::closesocket(fd);
return nullptr;
}
if(::listen(fd, 5) == SOCKET_ERROR)
{
::closesocket(fd);
return nullptr;
}
llarp::ev_io* serv = new llarp::tcp_serv(this, fd, tcp);
tcp->impl = serv;
// We're non-blocking now, but can't really make use of it
// until we cut over to WSA* functions
ioctlsocket(fd, FIONBIO, &on);
return serv;
}
SOCKET
udp_bind(const sockaddr* addr)
{
@ -339,11 +380,29 @@ struct llarp_win32_loop : public llarp_ev_loop
bool
close_ev(llarp::ev_io* ev)
{
// On Windows, just close the socket to decrease the iocp refcount
// On Windows, just close the descriptor to decrease the iocp refcount
// and stop any pending I/O
BOOL stopped =
::CancelIo(reinterpret_cast< HANDLE >(std::get< SOCKET >(ev->fd)));
return closesocket(std::get< SOCKET >(ev->fd)) == 0 && stopped == TRUE;
BOOL stopped;
int close_fd;
switch(ev->fd.index())
{
case 0:
stopped =
::CancelIo(reinterpret_cast< HANDLE >(std::get< SOCKET >(ev->fd)));
close_fd = closesocket(std::get< SOCKET >(ev->fd));
break;
case 1:
stopped = ::CancelIo(std::get< HANDLE >(ev->fd));
close_fd = CloseHandle(std::get< HANDLE >(ev->fd));
if(close_fd)
close_fd = 0; // must be zero
else
close_fd = 1;
break;
default:
return false;
}
return close_fd == 0 && stopped == TRUE;
}
llarp::ev_io*
@ -372,23 +431,53 @@ struct llarp_win32_loop : public llarp_ev_loop
add_ev(llarp::ev_io* ev, bool write)
{
uint8_t buf[1024];
llarp::udp_listener* udp = nullptr;
llarp::tun* t = nullptr;
ev->listener_id = reinterpret_cast< ULONG_PTR >(ev);
llarp::tun* t = nullptr;
ev->listener_id = reinterpret_cast< ULONG_PTR >(ev);
memset(&buf, 0, 1024);
if(ev->isTCP)
{
if(!::CreateIoCompletionPort((HANDLE)std::get< SOCKET >(ev->fd), iocpfd,
ev->listener_id, 0))
{
delete ev;
return false;
}
if(write)
{
::WriteFile((HANDLE)std::get< SOCKET >(ev->fd), &buf, 1024, nullptr,
&ev->portfd[1]);
ev->write = true;
}
else
{
::ReadFile((HANDLE)std::get< SOCKET >(ev->fd), &buf, 1024, nullptr,
&ev->portfd[0]);
}
handlers.emplace_back(ev);
return true;
}
switch(ev->fd.index())
{
case 0:
udp = dynamic_cast< llarp::udp_listener* >(ev);
if(!::CreateIoCompletionPort((HANDLE)std::get< 0 >(ev->fd), iocpfd,
ev->listener_id, 0))
{
delete ev;
return false;
}
::ReadFile((HANDLE)std::get< 0 >(ev->fd), &buf, 1024, nullptr,
&udp->portfd[0]);
if(write)
{
::WriteFile((HANDLE)std::get< 0 >(ev->fd), &buf, 1024, nullptr,
&ev->portfd[1]);
ev->write = true;
}
else
{
::ReadFile((HANDLE)std::get< 0 >(ev->fd), &buf, 1024, nullptr,
&ev->portfd[0]);
}
break;
case 1:
t = dynamic_cast< llarp::tun* >(ev);
@ -398,7 +487,17 @@ struct llarp_win32_loop : public llarp_ev_loop
delete ev;
return false;
}
::ReadFile(std::get< 1 >(ev->fd), &buf, 1024, nullptr, t->tun_async[0]);
if(write)
{
::WriteFile(std::get< 1 >(ev->fd), &buf, 1024, nullptr,
t->tun_async[1]);
ev->write = true;
}
else
{
::ReadFile(std::get< 1 >(ev->fd), &buf, 1024, nullptr,
t->tun_async[0]);
}
break;
default:
return false;
@ -415,10 +514,18 @@ struct llarp_win32_loop : public llarp_ev_loop
static_cast< llarp::udp_listener* >(l->impl);
if(listener)
{
ret = close_ev(listener);
close_ev(listener);
// remove handler
auto itr = handlers.begin();
while(itr != handlers.end())
{
if(itr->get() == listener)
itr = handlers.erase(itr);
else
++itr;
}
l->impl = nullptr;
delete listener;
ret = true;
ret = true;
}
return ret;
}
@ -432,7 +539,12 @@ struct llarp_win32_loop : public llarp_ev_loop
void
stop()
{
// still does nothing
// Are we leaking any file descriptors?
// This was part of the reason I had this
// in the destructor.
if(iocpfd != INVALID_HANDLE_VALUE)
::CloseHandle(iocpfd);
iocpfd = INVALID_HANDLE_VALUE;
}
};

Loading…
Cancel
Save