make tcp work with libuv probably

pull/1334/head
Jeff Becker 5 years ago
parent 4c7f8e8351
commit 35bb7444fe
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -4,4 +4,4 @@ add_library(${ABYSS_LIB} "${CMAKE_CURRENT_SOURCE_DIR}/src/md5.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/src/server.cpp")
target_include_directories(${ABYSS_LIB} PUBLIC include)
target_link_libraries(${ABYSS_LIB} PUBLIC ${PLATFORM_LIB})
target_link_libraries(${ABYSS_LIB} PUBLIC ${PLATFORM_LIB} uv)

@ -117,7 +117,7 @@ main(ABSL_ATTRIBUTE_UNUSED int argc, ABSL_ATTRIBUTE_UNUSED char* argv[])
#endif
llarp::SetLogLevel(llarp::eLogDebug);
llarp_threadpool* threadpool = llarp_init_same_process_threadpool();
llarp_ev_loop_ptr loop = llarp_make_ev_loop();
llarp_ev_loop_ptr loop = llarp_make_uv_loop();
auto logic = std::make_shared< llarp::Logic >(threadpool);
sockaddr_in addr;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);

@ -63,6 +63,7 @@ namespace abyss
static void
OnClosed(llarp_tcp_conn* conn)
{
llarp::LogDebug("connection closed");
ConnImpl* self = static_cast< ConnImpl* >(conn->user);
self->state = eCloseMe;
}

@ -212,6 +212,7 @@ namespace abyss
bool
ProcessRead(const char* buf, size_t sz)
{
llarp::LogDebug("http read ", sz, " bytes");
if(m_Bad)
{
return false;
@ -266,6 +267,7 @@ namespace abyss
static void
OnClosed(llarp_tcp_conn* conn)
{
llarp::LogDebug("connection closed");
ConnImpl* self = static_cast< ConnImpl* >(conn->user);
self->_conn = nullptr;
}
@ -280,6 +282,8 @@ namespace abyss
void
Tick()
{
if(m_Bad)
Close();
}
/// mark bad so next tick we are closed

@ -186,11 +186,14 @@ llarp_tcp_conn_async_write(struct llarp_tcp_conn *conn, const llarp_buffer_t &b)
{
ssize_t amount = conn->write(conn, buf.underlying.cur, EV_WRITE_BUF_SZ);
if(amount <= 0)
{
llarp::LogError("write underrun");
return false;
}
buf.underlying.cur += amount;
sz -= amount;
}
return conn->write(conn, buf.underlying.cur, sz);
return conn->write(conn, buf.underlying.cur, sz) > 0;
}
void
@ -241,12 +244,7 @@ llarp_tcp_serve(struct llarp_ev_loop *loop, struct llarp_tcp_acceptor *tcp,
void
llarp_tcp_acceptor_close(struct llarp_tcp_acceptor *tcp)
{
llarp::ev_io *impl = static_cast< llarp::ev_io * >(tcp->user);
tcp->impl = nullptr;
tcp->loop->close_ev(impl);
if(tcp->closed)
tcp->closed(tcp);
// dont free acceptor because it may be stack allocated
tcp->close(tcp);
}
void

@ -178,6 +178,8 @@ struct llarp_tcp_acceptor
void (*accepted)(struct llarp_tcp_acceptor *, struct llarp_tcp_conn *);
/// handle after server socket closed (free-ing is handled by event loop)
void (*closed)(struct llarp_tcp_acceptor *);
/// set by impl
void (*close)(struct llarp_tcp_acceptor *);
};
/// bind to an address and start serving async

@ -14,14 +14,18 @@ namespace libuv
llarp_tcp_conn m_Conn;
llarp::Addr m_Addr;
std::deque< std::vector< char > > m_WriteQueue;
conn_glue(uv_loop_t* loop, llarp_tcp_connecter* tcp, const sockaddr* addr)
: m_TCP(tcp), m_Accept(nullptr), m_Addr(*addr)
{
m_Connect.data = this;
m_Handle.data = tcp;
m_Handle.data = this;
uv_tcp_init(loop, &m_Handle);
m_Ticker.data = this;
uv_timer_init(loop, &m_Ticker);
m_Conn.close = &ExplicitClose;
m_Conn.write = &ExplicitWrite;
}
conn_glue(uv_loop_t* loop, llarp_tcp_acceptor* tcp, const sockaddr* addr)
@ -31,10 +35,15 @@ namespace libuv
uv_tcp_init(loop, &m_Handle);
m_Ticker.data = this;
uv_timer_init(loop, &m_Ticker);
m_Accept->close = &ExplicitCloseAccept;
m_Conn.write = nullptr;
m_Conn.closed = nullptr;
}
conn_glue(conn_glue* parent) : m_TCP(nullptr), m_Accept(nullptr)
{
m_Conn.close = &ExplicitClose;
m_Conn.write = &ExplicitWrite;
m_Handle.data = this;
uv_tcp_init(parent->m_Handle.loop, &m_Handle);
m_Ticker.data = this;
@ -60,19 +69,25 @@ namespace libuv
{
static_cast< conn_glue* >(conn->impl)->Close();
}
static void
ExplicitCloseAccept(llarp_tcp_acceptor* tcp)
{
static_cast< conn_glue* >(tcp->impl)->Close();
}
static ssize_t
ExplicitWrite(llarp_tcp_conn* conn, const byte_t* ptr, size_t sz)
{
if(static_cast< conn_glue* >(conn->impl)->WriteAsync(ptr, sz))
return sz;
return 0;
return static_cast< conn_glue* >(conn->impl)->WriteAsync((char*)ptr, sz);
}
static void
OnRead(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
{
static_cast< conn_glue* >(stream->data)->Read(buf->base, nread);
if(nread >= 0)
static_cast< conn_glue* >(stream->data)->Read(buf->base, nread);
else if(nread < 0)
static_cast< conn_glue* >(stream->data)->Close();
delete[] buf->base;
}
@ -86,8 +101,12 @@ namespace libuv
void
Read(const char* ptr, ssize_t sz)
{
const llarp_buffer_t buf(ptr, sz);
m_Conn.read(&m_Conn, buf);
if(m_Conn.read)
{
llarp::LogDebug("tcp read ", sz, " bytes");
const llarp_buffer_t buf(ptr, sz);
m_Conn.read(&m_Conn, buf);
}
}
void
@ -105,38 +124,53 @@ namespace libuv
Start();
}
else if(m_TCP->error)
{
llarp::LogError("failed to connect tcp ", uv_strerror(status));
m_TCP->error(m_TCP);
}
}
}
void
WriteFail()
{
m_Conn.close(&m_Conn);
if(m_Conn.close)
m_Conn.close(&m_Conn);
}
uv_stream_t*
Stream()
{
return (uv_stream_t*)&m_Handle;
}
static void
OnWritten(uv_write_t* req, int status)
{
conn_glue* self = static_cast< conn_glue* >(req->data);
if(status)
self->WriteFail();
delete req;
{
llarp::LogError("write failed on tcp: ", uv_strerror(status));
static_cast< conn_glue* >(req->data)->Close();
return;
}
static_cast< conn_glue* >(req->data)->DrainOne();
}
uv_stream_t*
Stream()
void
DrainOne()
{
return (uv_stream_t*)&m_Handle;
m_WriteQueue.pop_front();
}
bool
WriteAsync(const void* data, size_t sz)
int
WriteAsync(char* data, size_t sz)
{
uv_write_t* request = new uv_write_t();
request->data = this;
auto buf = uv_buf_init((char*)data, sz);
return uv_write(request, Stream(), &buf, 1, &OnWritten) != -1;
m_WriteQueue.emplace_back(sz);
std::copy_n(data, sz, m_WriteQueue.back().begin());
auto buf = uv_buf_init(m_WriteQueue.back().data(), sz);
uv_write_t* req = new uv_write_t();
req->data = this;
return uv_write(req, Stream(), &buf, 1, &OnWritten) == 0 ? sz : 0;
}
static void
@ -150,15 +184,22 @@ namespace libuv
{
m_Handle.data = nullptr;
if(m_Accept && m_Accept->closed)
{
m_Accept->impl = nullptr;
m_Accept->closed(m_Accept);
}
m_Conn.impl = nullptr;
if(m_Conn.closed)
{
m_Conn.closed(&m_Conn);
}
delete this;
}
void
Close()
{
llarp::LogDebug("close tcp connection");
uv_timer_stop(&m_Ticker);
uv_close((uv_handle_t*)&m_Handle, &OnClosed);
}
@ -170,6 +211,10 @@ namespace libuv
{
static_cast< conn_glue* >(stream->data)->Accept();
}
else
{
llarp::LogError("tcp accept failed: ", uv_strerror(status));
}
}
static void
@ -191,8 +236,12 @@ namespace libuv
void
Start()
{
uv_timer_start((uv_timer_t*)&m_Ticker, &OnTick, 10, 10);
uv_read_start(Stream(), &Alloc, &OnRead);
auto result = uv_timer_start((uv_timer_t*)&m_Ticker, &OnTick, 10, 10);
if(result)
llarp::LogError("failed to start timer ", uv_strerror(result));
result = uv_read_start(Stream(), &Alloc, &OnRead);
if(result)
llarp::LogError("failed to start reader ", uv_strerror(result));
}
void
@ -201,11 +250,18 @@ namespace libuv
if(m_Accept && m_Accept->accepted)
{
conn_glue* child = new conn_glue(this);
uv_accept(Stream(), child->Stream());
child->m_Conn.impl = this;
llarp::LogDebug("accepted new connection");
child->m_Conn.impl = child;
child->m_Conn.loop = m_Accept->loop;
child->m_Conn.close = &ExplicitClose;
child->m_Conn.write = &ExplicitWrite;
auto res = uv_accept(Stream(), child->Stream());
if(res)
{
llarp::LogError("failed to accept tcp connection ", uv_strerror(res));
child->Close();
return;
}
m_Accept->accepted(m_Accept, &child->m_Conn);
child->Start();
}
@ -214,6 +270,7 @@ namespace libuv
bool
Server()
{
m_Accept->close = &ExplicitCloseAccept;
return uv_tcp_bind(&m_Handle, m_Addr, 0) == 0
&& uv_listen(Stream(), 5, &OnAccept) == 0;
}
@ -256,7 +313,7 @@ namespace libuv
void
RecvFrom(ssize_t sz, const uv_buf_t* buf, const struct sockaddr* fromaddr)
{
if(sz >= 0)
if(sz >= 0 && m_UDP && m_UDP->recvfrom)
{
const size_t pktsz = sz;
const llarp_buffer_t pkt{(const byte_t*)buf->base, pktsz};
@ -274,7 +331,6 @@ namespace libuv
void
Tick()
{
llarp::LogDebug("udp tick");
if(m_UDP && m_UDP->tick)
m_UDP->tick(m_UDP);
uv_timer_again(&m_Ticker);
@ -467,6 +523,7 @@ namespace libuv
m_Impl.reset(uv_loop_new());
if(uv_loop_init(m_Impl.get()) == -1)
return false;
uv_loop_configure(m_Impl.get(), UV_LOOP_BLOCK_SIGNAL, SIGPIPE);
m_TickTimer.data = this;
m_Run.store(true);
return uv_timer_init(m_Impl.get(), &m_TickTimer) != -1;

Loading…
Cancel
Save