initial epoll event loop implementation for tcp

pull/35/head
Jeff Becker 6 years ago
parent 2e511dc414
commit 957a5ed833
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -7,6 +7,8 @@
"${workspaceFolder}/llarp",
"${workspaceFolder}/daemon",
"${workspaceFolder}/include",
"${workspaceFolder}/libabyss/src",
"${workspaceFolder}/crypto",
"${workspaceFolder}/vendor/cppbackport-master/lib"
],
"limitSymbolsToIncludedHeaders": true
@ -14,7 +16,9 @@
"includePath": [
"${workspaceFolder}/include",
"${workspaceFolder}/llarp",
"${workspaceFolder}/vendor/cppbackport-master/lib"
"${workspaceFolder}/vendor/cppbackport-master/lib",
"${workspaceFolder}/libabyss/include",
"${workspaceFolder}/crypto/include"
],
"defines": [],
"compilerPath": "/usr/bin/clang",

@ -65,6 +65,7 @@
"vector": "cpp",
"new": "cpp",
"shared_mutex": "cpp",
"complex": "cpp"
"complex": "cpp",
"variant": "cpp"
}
}

@ -478,11 +478,6 @@ set(CLIENT_SRC
client/main.cpp
)
set(ALL_SRC ${CLIENT_SRC} ${RC_SRC} ${EXE_SRC} ${DNS_SRC} ${LIB_PLATFORM_SRC} ${LIB_SRC} ${TEST_SRC})
foreach(F ${ALL_SRC})
set_source_files_properties(${F} PROPERTIES COMPILE_FLAGS -DLOG_TAG=\\\"${F}\\\")
endforeach(F)
# TODO: exclude this from includes and expose stuff properly for rcutil
include_directories(llarp)
@ -491,10 +486,12 @@ include_directories(include)
set(RC_EXE rcutil)
set(DNS_EXE dns)
set(ABYSS ${CMAKE_SOURCE_DIR}/libabyss)
set(ABYSS libabyss)
set(ABYSS_LIB abyss)
set(ABYSS_EXE ${ABYSS_LIB}-main)
include_directories(${ABYSS}/include)
set(ABYSS_SRC
@ -505,6 +502,13 @@ set(ABYSS_SRC
add_library(${ABYSS_LIB} ${ABYSS_SRC})
set(ALL_SRC ${CLIENT_SRC} ${RC_SRC} ${EXE_SRC} ${DNS_SRC} ${LIB_PLATFORM_SRC} ${LIB_SRC} ${TEST_SRC} ${ABYSS_SRC} ${ABYSS}/main.cpp)
foreach(F ${ALL_SRC})
set_source_files_properties(${F} PROPERTIES COMPILE_FLAGS -DLOG_TAG=\\\"${F}\\\")
endforeach(F)
if(SHADOW)
add_shadow_plugin(shadow-plugin-${SHARED_LIB} ${EXE_SRC} ${LIB_SRC} ${LIB_PLATFORM_SRC} ${CPP_BACKPORT_SRC} ${ABYSS_SRC} ${CRYPTOGRAPHY_SRC})
target_link_libraries(shadow-plugin-${SHARED_LIB} ${LIBS})
@ -513,6 +517,7 @@ else()
add_executable(${RC_EXE} ${RC_SRC})
add_executable(${EXE} ${EXE_SRC})
add_executable(${CLIENT_EXE} ${CLIENT_SRC})
add_executable(${ABYSS_EXE} ${ABYSS}/main.cpp)
add_executable(${DNS_EXE} ${DNS_SRC})
add_subdirectory(${GTEST_DIR})
@ -528,6 +533,7 @@ if(WITH_STATIC)
target_link_libraries(${PLATFORM_LIB} -lcap)
endif()
target_link_libraries(${STATIC_LIB} ${CRYPTOGRAPHY_LIB} ${LIBS} ${PLATFORM_LIB})
target_link_libraries(${ABYSS_EXE} ${STATIC_LIB})
if(NOT WITH_SHARED)
target_link_libraries(${EXE} ${STATIC_LINK_LIBS} ${STATIC_LIB} ${PLATFORM_LIB})

@ -37,7 +37,7 @@ TARGETS = $(REPO)/lokinet
SIGS = $(TARGETS:=.sig)
EXE = $(BUILD_ROOT)/lokinet
TEST_EXE = $(BUILD_ROOT)/testAll
ABYSS_EXE = $(BUILD_ROOT)/abyss-main
DNS_PORT ?= 53
@ -118,6 +118,9 @@ testnet:
test: debug
$(TEST_EXE)
abyss: debug
$(ABYSS_EXE)
format:
clang-format -i $$(find daemon llarp include | grep -E '\.[h,c](pp)?$$')

@ -99,7 +99,8 @@ struct llarp_tcp_conn
};
/// queue async write a buffer in full
void
/// return if we queueed it or not
bool
llarp_tcp_conn_async_write(struct llarp_tcp_conn *, const void *, size_t);
/// close a tcp connection
@ -114,8 +115,12 @@ struct llarp_tcp_acceptor
void *impl;
/// parent event loop (dont set me)
struct llarp_ev_loop *loop;
/// handle tick
void (*tick)(struct llarp_tcp_acceptor *);
/// handle inbound connection
void (*accepted)(struct llarp_tcp_acceptor *, struct llarp_tcp_conn *);
/// handle after server socket closed (free-ing is handled by event loop)
void (*closed)(struct llarp_tcp_acceptor *);
};
/// bind to an address and start serving async

@ -0,0 +1,14 @@
#ifndef __ABYSS_JSON_JSON_HPP
#define __ABYSS_JSON_JSON_HPP
#include <rapidjson/document.h>
namespace abyss
{
namespace json
{
typedef rapidjson::Document Object;
}
} // namespace abyss
#endif

@ -0,0 +1,74 @@
#ifndef __ABYSS_SERVER_HPP__
#define __ABYSS_SERVER_HPP__
#include <llarp/ev.h>
#include <llarp/logic.h>
#include <llarp/time.h>
#include <list>
#include <memory>
#include <string>
#include <abyss/json.hpp>
namespace abyss
{
namespace http
{
struct ConnImpl;
struct IRPCHandler
{
typedef std::string Method_t;
typedef abyss::json::Object Params;
typedef abyss::json::Object Response;
IRPCHandler(ConnImpl* impl);
virtual bool
HandleJSONRPC(const Method_t& method, const Params& params,
Response& response) = 0;
~IRPCHandler();
bool
ShouldClose(llarp_time_t now) const;
private:
ConnImpl* m_Impl;
};
struct BaseReqHandler
{
BaseReqHandler(llarp_time_t req_timeout);
~BaseReqHandler();
bool
ServeAsync(llarp_ev_loop* loop, llarp_logic* logic,
const sockaddr* bindaddr);
void
RemoveConn(IRPCHandler* handler);
protected:
virtual IRPCHandler*
CreateHandler(ConnImpl* connimpl) const = 0;
private:
static void
OnTick(llarp_tcp_acceptor*);
void
Tick();
static void
OnAccept(struct llarp_tcp_acceptor*, struct llarp_tcp_conn*);
llarp_ev_loop* m_loop;
llarp_logic* m_Logic;
llarp_tcp_acceptor m_acceptor;
std::list< std::unique_ptr< IRPCHandler > > m_Conns;
llarp_time_t m_ReqTimeout;
};
} // namespace http
} // namespace abyss
#endif

@ -1,86 +1,5 @@
#ifndef __LIB_ABYSS_HPP__
#define __LIB_ABYSS_HPP__
#include <llarp/ev.h>
#include <llarp/logic.h>
#include <llarp/time.h>
#include <list>
#include <memory>
#include <string>
#ifdef USE_RAPIDJSON
#include <rapidjson/document.h>
namespace json = rapidjson;
#else
namespace json
{
struct Document;
}
#endif
namespace abyss
{
namespace http
{
struct ConnImpl;
struct IRPCHandler
{
typedef std::string Method_t;
typedef ::json::Document Params;
typedef ::json::Document Response;
IRPCHandler(ConnImpl* impl);
virtual bool
HandleJSONRPC(const Method_t& method, const Params& params,
Response& response) = 0;
~IRPCHandler();
bool
ShouldClose(llarp_time_t now) const;
private:
ConnImpl* m_Impl;
};
struct BaseReqHandler
{
BaseReqHandler(llarp_time_t req_timeout);
~BaseReqHandler();
bool
ServeAsync(llarp_ev_loop* loop, llarp_logic* logic,
const sockaddr* bindaddr);
void
RemoveConn(IRPCHandler* handler);
protected:
virtual IRPCHandler*
CreateHandler(ConnImpl* connimpl) const = 0;
private:
void
ScheduleTick(llarp_time_t ms);
static void
OnTick(void* user, llarp_time_t orig, llarp_time_t left);
void
Tick();
static void
OnAccept(struct llarp_tcp_acceptor*, struct llarp_tcp_conn*);
llarp_ev_loop* m_loop;
llarp_logic* m_Logic;
llarp_tcp_acceptor m_acceptor;
std::list< std::unique_ptr< IRPCHandler > > m_Conns;
llarp_time_t m_ReqTimeout;
};
} // namespace http
} // namespace abyss
#include <abyss/server.hpp>
#include <abyss/json.hpp>
#endif

@ -0,0 +1,52 @@
#include <libabyss.hpp>
#include <llarp/net.hpp>
#include <sys/signal.h>
struct DemoHandler : public abyss::http::IRPCHandler
{
DemoHandler(abyss::http::ConnImpl* impl) : abyss::http::IRPCHandler(impl)
{
}
bool
HandleJSONRPC(const Method_t& method, const Params& params, Response& resp)
{
resp.SetObject().AddMember("test", "value", resp.GetAllocator());
return true;
}
};
struct DemoServer : public abyss::http::BaseReqHandler
{
DemoServer() : abyss::http::BaseReqHandler(1000)
{
}
abyss::http::IRPCHandler*
CreateHandler(abyss::http::ConnImpl* impl) const
{
return new DemoHandler(impl);
}
};
int
main(int argc, char* argv[])
{
signal(SIGPIPE, SIG_IGN);
llarp_threadpool* threadpool = llarp_init_same_process_threadpool();
llarp_ev_loop* loop = nullptr;
llarp_ev_loop_alloc(&loop);
llarp_logic* logic = llarp_init_single_process_logic(threadpool);
sockaddr_in addr;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_port = htons(1222);
addr.sin_family = AF_INET;
DemoServer serv;
llarp::Addr a(addr);
llarp::LogInfo("bind to ", a);
if(serv.ServeAsync(loop, logic, a))
llarp_ev_loop_run_single_process(loop, threadpool, logic);
else
llarp::LogError("Failed to serve: ", strerror(errno));
return 0;
}

@ -1,9 +1,10 @@
#include <libabyss.hpp>
#include <abyss/server.hpp>
#include <llarp/time.h>
#include <sstream>
#include <unordered_map>
// #include <string_view>
#include <string>
#include <llarp/logger.hpp>
#include <algorithm>
namespace abyss
{
@ -12,9 +13,9 @@ namespace abyss
struct RequestHeader
{
typedef std::unordered_multimap< std::string, std::string > Headers_t;
Headers_t m_Headers;
Headers_t Headers;
std::string Method;
std::string Version;
std::string Path;
};
struct ConnImpl
@ -47,9 +48,11 @@ namespace abyss
handler = nullptr;
m_LastActive = llarp_time_now_ms();
m_ReadTimeout = readtimeout;
c->read = &OnRead;
c->tick = &OnTick;
c->closed = nullptr;
// set up tcp members
_conn->user = this;
_conn->read = &ConnImpl::OnRead;
_conn->tick = &ConnImpl::OnTick;
_conn->closed = &ConnImpl::OnClosed;
m_Bad = false;
m_State = eReadHTTPMethodLine;
}
@ -59,18 +62,166 @@ namespace abyss
}
bool
FeedLine(const std::string& line)
FeedLine(std::string& line)
{
switch(m_State)
{
case eReadHTTPMethodLine:
return ProcessMethodLine(line);
case eReadHTTPHeaders:
return ProcessHeaderLine(line);
default:
return false;
}
}
bool
ProcessMethodLine(std::string& line)
{
// TODO: implement me
auto idx = line.find_first_of(' ');
if(idx == std::string::npos)
return false;
m_Header.Method = line.substr(0, idx);
line = line.substr(idx + 1);
idx = line.find_first_of(' ');
if(idx == std::string::npos)
return false;
m_Header.Path = line.substr(0, idx);
m_State = eReadHTTPHeaders;
return true;
}
bool
ShouldProcessHeader(const std::string& name) const
{
// TODO: header whitelist
return true;
}
bool
ProcessHeaderLine(std::string& line)
{
// TODO: implement me
auto idx = line.find_first_of(':');
if(idx == std::string::npos)
return false;
std::string header = line.substr(0, idx);
std::string val = line.substr(idx);
// to lowercase
std::transform(header.begin(), header.end(), header.begin(),
[](char ch) -> char { return ::tolower(ch); });
if(ShouldProcessHeader(header))
{
val = val.substr(val.find_first_not_of(' '));
m_Header.Headers.insert(std::make_pair(header, val));
}
return true;
}
bool
WriteStatusLine(int code, const std::string& message)
{
char buf[128] = {0};
int sz = snprintf(buf, sizeof(buf), "HTTP/1.0 %d %s\r\n", code,
message.c_str());
if(sz > 0)
{
llarp::LogInfo("HTTP ", code, " ", message);
return llarp_tcp_conn_async_write(_conn, buf, sz);
}
else
return false;
}
bool
WriteResponseSimple(int code, const std::string& msg,
const char* contentType, const char* content)
{
if(!WriteStatusLine(code, msg))
return false;
char buf[128] = {0};
int sz =
snprintf(buf, sizeof(buf), "Content-Type: %s\r\n", contentType);
if(sz <= 0)
return false;
if(!llarp_tcp_conn_async_write(_conn, buf, sz))
return false;
size_t contentLength = strlen(content);
sz = snprintf(buf, sizeof(buf), "Content-Length: %zu\r\n\r\n",
contentLength);
if(sz <= 0)
return false;
if(!llarp_tcp_conn_async_write(_conn, buf, sz))
return false;
if(!llarp_tcp_conn_async_write(_conn, content, contentLength))
return false;
m_State = eWriteHTTPBody;
return true;
}
bool
FeedBody(const char* buf, size_t sz)
{
return false;
llarp::LogInfo("HTTP ", m_Header.Method, " ", m_Header.Path, " ", sz);
if(sz == 0)
{
return WriteResponseSimple(400, "Bad Request", "text/plain", "nope");
}
if(m_Header.Method != "POST")
{
return WriteResponseSimple(400, "Bad Request", "text/plain", "nope");
}
return WriteResponseSimple(200, "OK", "text/json", "{}");
}
bool
ProcessRead(const char* buf, size_t sz)
{
if(m_Bad)
{
llarp::LogInfo("we bad");
return false;
}
m_LastActive = llarp_time_now_ms();
if(m_State < eReadHTTPBody)
{
if(strstr(buf, "\r\n") == nullptr)
{
// probably too big or small
return false;
}
m_ReadBuf << std::string(buf, sz);
std::string line;
while(std::getline(m_ReadBuf, line, '\n'))
{
if(line[0] == '\r')
{
m_State = eReadHTTPBody;
line = m_ReadBuf.str();
const char* ptr = strstr(line.c_str(), "\r\n\r\n");
if(ptr == nullptr)
return false;
line = std::string(ptr + 4);
m_ReadBuf.clear();
return FeedBody(line.c_str(), line.size());
}
auto pos = line.find_first_of('\r');
if(pos == std::string::npos)
{
return false;
}
line = line.substr(0, pos);
if(!FeedLine(line))
return false;
}
m_ReadBuf.str(line);
}
else
return FeedBody(buf, sz);
m_ReadBuf << std::string(buf, sz);
return true;
}
@ -82,6 +233,13 @@ namespace abyss
self->MarkBad();
}
static void
OnClosed(llarp_tcp_conn* conn)
{
ConnImpl* self = static_cast< ConnImpl* >(conn->user);
self->_conn = nullptr;
}
static void
OnTick(llarp_tcp_conn* conn)
{
@ -104,13 +262,18 @@ namespace abyss
bool
ShouldClose(llarp_time_t now) const
{
return now - m_LastActive > m_ReadTimeout || m_Bad;
return now - m_LastActive > m_ReadTimeout || m_Bad
|| m_State == eCloseMe;
}
void
Close()
{
llarp_tcp_conn_close(_conn);
if(_conn)
{
llarp_tcp_conn_close(_conn);
_conn = nullptr;
}
}
};
@ -137,6 +300,8 @@ namespace abyss
m_Logic = nullptr;
m_acceptor.accepted = &BaseReqHandler::OnAccept;
m_acceptor.user = this;
m_acceptor.tick = &OnTick;
m_acceptor.closed = nullptr;
}
bool
@ -145,22 +310,15 @@ namespace abyss
{
m_loop = loop;
m_Logic = logic;
if(!llarp_tcp_serve(m_loop, &m_acceptor, bindaddr))
return false;
ScheduleTick(1000);
return true;
return llarp_tcp_serve(m_loop, &m_acceptor, bindaddr);
}
void
BaseReqHandler::OnTick(void* user, llarp_time_t orig, llarp_time_t left)
BaseReqHandler::OnTick(llarp_tcp_acceptor* tcp)
{
if(left)
return;
BaseReqHandler* self = static_cast< BaseReqHandler* >(user);
BaseReqHandler* self = static_cast< BaseReqHandler* >(tcp->user);
self->Tick();
self->ScheduleTick(orig);
}
void
@ -177,12 +335,6 @@ namespace abyss
}
}
void
BaseReqHandler::ScheduleTick(llarp_time_t timeout)
{
llarp_logic_call_later(m_Logic, {timeout, this, &BaseReqHandler::OnTick});
}
BaseReqHandler::~BaseReqHandler()
{
llarp_tcp_acceptor_close(&m_acceptor);
@ -201,7 +353,6 @@ namespace abyss
return;
}
connimpl->handler = rpcHandler;
conn->user = connimpl;
self->m_Conns.emplace_back(rpcHandler);
}
} // namespace http

@ -119,23 +119,41 @@ llarp_ev_add_tun(struct llarp_ev_loop *loop, struct llarp_tun_io *tun)
bool
llarp_ev_tun_async_write(struct llarp_tun_io *tun, const void *pkt, size_t sz)
{
// TODO: queue write
return static_cast< llarp::ev_io * >(tun->impl)->do_write((void *)pkt, sz);
return static_cast< llarp::ev_io * >(tun->impl)->queue_write(
(const byte_t *)pkt, sz);
}
bool
llarp_tcp_serve(struct llarp_ev_loop *loop, struct llarp_tcp_acceptor *tcp,
const struct sockaddr *bindaddr)
{
tcp->loop = loop;
// TODO: implement me
tcp->loop = loop;
llarp::ev_io *impl = loop->bind_tcp(tcp, bindaddr);
if(impl)
{
tcp->impl = impl;
return loop->add_ev(impl);
}
return false;
}
void
llarp_tcp_acceptor_close(struct llarp_tcp_acceptor *tcp)
{
// TODO: implement me
llarp::ev_io *impl = static_cast< llarp::ev_io * >(tcp->user);
tcp->impl = nullptr;
tcp->loop->close_ev(impl);
if(tcp->closed)
tcp->closed(tcp);
// dont free acceptor because it may be stack allocated
}
bool
llarp_tcp_conn_async_write(struct llarp_tcp_conn *conn, const void *buf,
size_t sz)
{
return static_cast< llarp::ev_io * >(conn->impl)
->queue_write((const byte_t *)buf, sz);
}
void
@ -143,15 +161,17 @@ llarp_tcp_conn_close(struct llarp_tcp_conn *conn)
{
if(!conn)
return;
llarp::ev_io *impl = static_cast< llarp::ev_io * >(conn->impl);
conn->impl = nullptr;
// deregister
conn->loop->close_ev(impl);
// close fd and delete impl
delete impl;
if(conn->impl)
{
llarp::ev_io *impl = static_cast< llarp::ev_io * >(conn->impl);
// deregister and dealloc
conn->loop->close_ev(impl);
conn->impl = nullptr;
}
// call hook if needed
if(conn->closed)
conn->closed(conn);
// delete
// delete conn
delete conn;
}

@ -8,7 +8,8 @@
#include <unistd.h>
#include <llarp/buffer.h>
#include <llarp/codel.hpp>
#include <vector>
#include <list>
#include <deque>
#ifdef _WIN32
#include <variant>
@ -26,62 +27,6 @@ namespace llarp
{
struct ev_io
{
#ifndef _WIN32
int fd;
ev_io(int f) : fd(f), m_writeq("writequeue"){};
#else
// on windows, udp event loops are socket fds
// and TUN device is a plain old fd
std::variant< SOCKET, HANDLE > fd;
// the unique completion key that helps us to
// identify the object instance for which we receive data
// Here, we'll use the address of the udp_listener instance, converted
// to its literal int/int64 representation.
ULONG_PTR listener_id = 0;
ev_io(SOCKET f) : fd(f), m_writeq("writequeue"){};
ev_io(HANDLE t)
: fd(t), m_writeq("writequeue"){}; // overload for TUN device, which
// _is_ a regular file descriptor
#endif
virtual int
read(void* buf, size_t sz) = 0;
virtual int
sendto(const sockaddr* dst, const void* data, size_t sz) = 0;
virtual void
tick(){};
/// used for tun interface and tcp conn
virtual bool
do_write(void* data, size_t sz)
{
#ifndef _WIN32
return write(fd, data, sz) != -1;
#else
DWORD w;
return WriteFile(std::get< HANDLE >(fd), data, sz, &w, nullptr);
#endif
}
/// called in event loop when fd is ready for writing
/// requeues anything not written
/// this assumes fd is set to non blocking
virtual void
flush_write()
{
m_writeq.Process([&](WriteBuffer& buffer) {
do_write(buffer.buf, buffer.bufsz);
// if we would block we save the entries for later
// discard entry
});
/// reset errno
errno = 0;
#if _WIN32
SetLastError(0);
#endif
}
struct WriteBuffer
{
llarp_time_t timestamp = 0;
@ -129,11 +74,135 @@ namespace llarp
};
};
llarp::util::CoDelQueue< WriteBuffer, WriteBuffer::GetTime,
WriteBuffer::PutTime, WriteBuffer::Compare,
llarp::util::NullMutex, llarp::util::NullLock >
m_writeq;
typedef llarp::util::CoDelQueue< WriteBuffer, WriteBuffer::GetTime,
WriteBuffer::PutTime, WriteBuffer::Compare,
llarp::util::NullMutex,
llarp::util::NullLock, 5, 100, 128 >
LossyWriteQueue_t;
typedef std::deque< WriteBuffer > LosslessWriteQueue_t;
#ifndef _WIN32
int fd;
ev_io(int f) : fd(f)
{
}
/// for tun
ev_io(int f, LossyWriteQueue_t* lossyqueue)
: fd(f), m_LossyWriteQueue(lossyqueue)
{
}
/// for tcp
ev_io(int f, LosslessWriteQueue_t* q) : fd(f), m_BlockingWriteQueue(q)
{
}
#else
// on windows, udp event loops are socket fds
// and TUN device is a plain old fd
std::variant< SOCKET, HANDLE > fd;
// the unique completion key that helps us to
// identify the object instance for which we receive data
// Here, we'll use the address of the udp_listener instance, converted
// to its literal int/int64 representation.
ULONG_PTR listener_id = 0;
ev_io(SOCKET f) : fd(f), m_writeq("writequeue"){};
ev_io(HANDLE t)
: fd(t), m_writeq("writequeue"){}; // overload for TUN device, which
// _is_ a regular file descriptor
#endif
virtual int
read(void* buf, size_t sz) = 0;
virtual int
sendto(const sockaddr* dst, const void* data, size_t sz)
{
return -1;
};
virtual void
tick(){};
/// used for tun interface and tcp conn
ssize_t
do_write(void* data, size_t sz)
{
#ifndef _WIN32
return write(fd, data, sz);
#else
DWORD w;
WriteFile(std::get< HANDLE >(fd), data, sz, &w, nullptr);
return w;
#endif
}
bool
queue_write(const byte_t* buf, size_t sz)
{
if(m_LossyWriteQueue)
{
m_LossyWriteQueue->Emplace(buf, sz);
return true;
}
else if(m_BlockingWriteQueue)
{
m_BlockingWriteQueue->emplace_back(buf, sz);
return true;
}
else
return false;
}
/// called in event loop when fd is ready for writing
/// requeues anything not written
/// this assumes fd is set to non blocking
virtual void
flush_write()
{
if(m_LossyWriteQueue)
m_LossyWriteQueue->Process([&](WriteBuffer& buffer) {
do_write(buffer.buf, buffer.bufsz);
// if we would block we save the entries for later
// discard entry
});
else if(m_BlockingWriteQueue)
{
// write buffers
while(m_BlockingWriteQueue->size())
{
auto& itr = m_BlockingWriteQueue->front();
ssize_t result = do_write(itr.buf, itr.bufsz);
if(result == -1)
return;
ssize_t dlt = itr.bufsz - result;
if(dlt > 0)
{
// queue remaining to front of queue
WriteBuffer buff(itr.buf + dlt, itr.bufsz - dlt);
m_BlockingWriteQueue->pop_front();
m_BlockingWriteQueue->push_front(buff);
// TODO: errno?
return;
}
m_BlockingWriteQueue->pop_front();
if(errno == EAGAIN || errno == EWOULDBLOCK)
{
errno = 0;
return;
}
}
}
/// reset errno
errno = 0;
#if _WIN32
SetLastError(0);
#endif
}
std::unique_ptr< LossyWriteQueue_t > m_LossyWriteQueue;
std::unique_ptr< LosslessWriteQueue_t > m_BlockingWriteQueue;
virtual ~ev_io()
{
#ifndef _WIN32
@ -186,6 +255,9 @@ struct llarp_ev_loop
virtual llarp::ev_io*
create_tun(llarp_tun_io* tun) = 0;
virtual llarp::ev_io*
bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* addr) = 0;
virtual bool
add_ev(llarp::ev_io* ev, bool write = false) = 0;
@ -194,7 +266,7 @@ struct llarp_ev_loop
virtual ~llarp_ev_loop(){};
std::vector< std::unique_ptr< llarp::ev_io > > handlers;
std::list< std::unique_ptr< llarp::ev_io > > handlers;
void
tick_listeners()

@ -5,6 +5,7 @@
#include <llarp/net.h>
#include <signal.h>
#include <sys/epoll.h>
#include <sys/un.h>
#include <tuntap.h>
#include <unistd.h>
#include <cstdio>
@ -13,15 +14,106 @@
#include "llarp/net.hpp"
#include "logger.hpp"
#include "mem.hpp"
#include <cassert>
namespace llarp
{
struct tcp_serv : public ev_io
struct tcp_conn : public ev_io
{
llarp_tcp_conn* tcp;
tcp_conn(int fd, llarp_tcp_conn* conn)
: ev_io(fd, new LosslessWriteQueue_t()), tcp(conn)
{
}
virtual int
do_write(const void* buf, size_t sz)
{
return ::send(fd, buf, sz, MSG_NOSIGNAL); // ignore sigpipe
}
int
read(void* buf, size_t sz)
{
ssize_t amount = ::read(fd, buf, sz);
if(amount > 0)
{
if(tcp->read)
tcp->read(tcp, buf, amount);
}
else
{
// error
llarp_tcp_conn_close(tcp);
return -1;
}
return 0;
}
void
tick()
{
if(tcp->tick)
tcp->tick(tcp);
}
int
sendto(const sockaddr*, const void*, size_t)
{
return -1;
}
};
struct tcp_conn : public ev_io
struct tcp_serv : public ev_io
{
llarp_ev_loop* loop;
llarp_tcp_acceptor* tcp;
tcp_serv(llarp_ev_loop* l, int fd, llarp_tcp_acceptor* t)
: ev_io(fd), loop(l), tcp(t)
{
// TODO: handle fail
assert(listen(fd, 5) != -1);
}
void
tick()
{
if(tcp->tick)
tcp->tick(tcp);
}
/// actually does accept() :^)
int
read(void*, size_t)
{
int new_fd = ::accept(fd, nullptr, nullptr);
if(new_fd == -1)
{
llarp::LogError("failed to accept on ", fd, ":", strerror(errno));
return -1;
}
llarp_tcp_conn* conn = new llarp_tcp_conn;
// zero out callbacks
conn->tick = nullptr;
conn->closed = nullptr;
conn->read = nullptr;
// build handler
llarp::tcp_conn* connimpl = new tcp_conn(new_fd, conn);
conn->impl = connimpl;
conn->loop = loop;
if(loop->add_ev(connimpl, true))
{
// call callback
if(tcp->accepted)
tcp->accepted(tcp, conn);
return 0;
}
// cleanup error
delete conn;
delete connimpl;
return -1;
}
};
struct udp_listener : public ev_io
@ -34,14 +126,14 @@ namespace llarp
{
}
virtual void
void
tick()
{
if(udp->tick)
udp->tick(udp);
}
virtual int
int
read(void* buf, size_t sz)
{
sockaddr_in6 src;
@ -56,7 +148,7 @@ namespace llarp
return 0;
}
virtual int
int
sendto(const sockaddr* to, const void* data, size_t sz)
{
socklen_t slen;
@ -85,7 +177,7 @@ namespace llarp
llarp_tun_io* t;
device* tunif;
tun(llarp_tun_io* tio)
: ev_io(-1)
: ev_io(-1, new LossyWriteQueue_t("tun_write_queue"))
, t(tio)
, tunif(tuntap_init())
@ -99,7 +191,7 @@ namespace llarp
return -1;
}
virtual void
void
tick()
{
if(t->tick)
@ -205,9 +297,16 @@ struct llarp_epoll_loop : public llarp_ev_loop
while(idx < result)
{
llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].data.ptr);
if(events[idx].events & EPOLLIN)
if(ev)
{
ev->read(readbuf, sizeof(readbuf));
if(events[idx].events & EPOLLOUT)
{
ev->flush_write();
}
if(events[idx].events & EPOLLIN)
{
ev->read(readbuf, sizeof(readbuf));
}
}
++idx;
}
@ -231,9 +330,16 @@ struct llarp_epoll_loop : public llarp_ev_loop
while(idx < result)
{
llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].data.ptr);
if(events[idx].events & EPOLLIN)
if(ev)
{
ev->read(readbuf, sizeof(readbuf));
if(events[idx].events & EPOLLOUT)
{
ev->flush_write();
}
if(events[idx].events & EPOLLIN)
{
ev->read(readbuf, sizeof(readbuf));
}
}
++idx;
}
@ -296,13 +402,39 @@ struct llarp_epoll_loop : public llarp_ev_loop
if(epoll_ctl(epollfd, EPOLL_CTL_DEL, ev->fd, nullptr) == -1)
return false;
// deallocate
std::remove_if(handlers.begin(), handlers.end(),
[ev](const std::unique_ptr< llarp::ev_io >& i) -> bool {
return i.get() == ev;
});
handlers.erase(
std::remove_if(handlers.begin(), handlers.end(),
[ev](const std::unique_ptr< llarp::ev_io >& i) -> bool {
return i.get() == ev;
}));
return true;
}
llarp::ev_io*
bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* bindaddr)
{
int fd = ::socket(bindaddr->sa_family, SOCK_STREAM, 0);
if(fd == -1)
return nullptr;
socklen_t sz = sizeof(sockaddr_in);
if(bindaddr->sa_family == AF_INET6)
{
sz = sizeof(sockaddr_in6);
}
else if(bindaddr->sa_family == AF_UNIX)
{
sz = sizeof(sockaddr_un);
}
if(bind(fd, bindaddr, sz) == -1)
{
::close(fd);
return nullptr;
}
llarp::ev_io* serv = new llarp::tcp_serv(this, fd, tcp);
tcp->impl = serv;
return serv;
}
llarp::ev_io*
create_tun(llarp_tun_io* tun)
{
@ -332,8 +464,8 @@ struct llarp_epoll_loop : public llarp_ev_loop
epoll_event ev;
ev.data.ptr = e;
ev.events = EPOLLIN;
// if(write)
// ev.events |= EPOLLOUT;
if(write)
ev.events |= EPOLLOUT;
if(epoll_ctl(epollfd, EPOLL_CTL_ADD, e->fd, &ev) == -1)
{
delete e;

@ -34,7 +34,8 @@ namespace llarp
{
}
void tick()
void
tick()
{
if(udp->tick)
udp->tick(udp);
@ -81,7 +82,7 @@ namespace llarp
ssize_t sent = ::sendto(fd, data, sz, 0, to, slen);
if(sent == -1 || errno)
{
llarp::LogError("failed to send udp: ",strerror(errno));
llarp::LogError("failed to send udp: ", strerror(errno));
errno = 0;
}
return sent;
@ -130,13 +131,14 @@ namespace llarp
}
}
void tick()
void
tick()
{
if(t->tick)
t->tick(t);
flush_write();
}
int
read(void* buf, size_t sz)
{
@ -339,10 +341,11 @@ struct llarp_kqueue_loop : public llarp_ev_loop
EV_SET(&change, ev->fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
if(kevent(kqueuefd, &change, 1, nullptr, 0, nullptr) != -1)
{
std::remove_if(handlers.begin(), handlers.end(),
[ev](const std::unique_ptr<llarp::ev_io> & i) -> bool {
return i.get() == ev;
});
handlers.erase(std::remove_if(
handlers.begin(), handlers.end(),
[ev](const std::unique_ptr< llarp::ev_io >& i) -> bool {
return i.get() == ev;
}));
return true;
}
return false;
@ -355,7 +358,7 @@ struct llarp_kqueue_loop : public llarp_ev_loop
if(fd == -1)
return nullptr;
llarp::udp_listener* listener = new llarp::udp_listener(fd, l);
l->impl = listener;
l->impl = listener;
return listener;
}
@ -385,7 +388,7 @@ struct llarp_kqueue_loop : public llarp_ev_loop
// printf("Calling close_ev for [%x] fd[%d]\n", listener, listener->fd);
ret = close_ev(listener);
l->impl = nullptr;
ret = true;
ret = true;
}
return ret;
}

Loading…
Cancel
Save