fix up tcp connection logic

pull/1020/head
Jeff Becker 4 years ago
parent 989146f63a
commit c6d77e72f2
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -135,6 +135,7 @@ llarp_tcp_conn_async_write(struct llarp_tcp_conn *conn, const llarp_buffer_t &b)
if(amount <= 0) if(amount <= 0)
{ {
llarp::LogError("write underrun"); llarp::LogError("write underrun");
llarp_tcp_conn_close(conn);
return false; return false;
} }
buf.underlying.cur += amount; buf.underlying.cur += amount;

@ -49,7 +49,7 @@ struct llarp_ev_pkt_pipe;
#define EV_READ_BUF_SZ (4 * 1024UL) #define EV_READ_BUF_SZ (4 * 1024UL)
#endif #endif
#ifndef EV_WRITE_BUF_SZ #ifndef EV_WRITE_BUF_SZ
#define EV_WRITE_BUF_SZ (2 * 1024UL) #define EV_WRITE_BUF_SZ (4 * 1024UL)
#endif #endif
/// do io and reset errno after /// do io and reset errno after

@ -29,9 +29,11 @@ namespace libuv
WriteEvent() = default; WriteEvent() = default;
explicit WriteEvent(WriteBuffer_t buf) : data(std::move(buf)) explicit WriteEvent(size_t sz, char* ptr)
{ {
request.data = this; request.data = this;
data.resize(sz);
std::copy_n(ptr, sz, data.begin());
} }
uv_buf_t uv_buf_t
@ -54,11 +56,9 @@ namespace libuv
llarp_tcp_acceptor* const m_Accept; llarp_tcp_acceptor* const m_Accept;
llarp_tcp_conn m_Conn; llarp_tcp_conn m_Conn;
llarp::Addr m_Addr; llarp::Addr m_Addr;
llarp::thread::Queue< WriteBuffer_t > m_WriteQueue;
uv_async_t m_WriteNotify;
conn_glue(uv_loop_t* loop, llarp_tcp_connecter* tcp, const sockaddr* addr) conn_glue(uv_loop_t* loop, llarp_tcp_connecter* tcp, const sockaddr* addr)
: m_TCP(tcp), m_Accept(nullptr), m_Addr(*addr), m_WriteQueue(32) : m_TCP(tcp), m_Accept(nullptr), m_Addr(*addr)
{ {
m_Connect.data = this; m_Connect.data = this;
m_Handle.data = this; m_Handle.data = this;
@ -66,29 +66,24 @@ namespace libuv
uv_tcp_init(loop, &m_Handle); uv_tcp_init(loop, &m_Handle);
m_Ticker.data = this; m_Ticker.data = this;
uv_check_init(loop, &m_Ticker); uv_check_init(loop, &m_Ticker);
m_Conn.close = &ExplicitClose; m_Conn.close = &ExplicitClose;
m_Conn.write = &ExplicitWrite; m_Conn.write = &ExplicitWrite;
m_WriteNotify.data = this;
uv_async_init(loop, &m_WriteNotify, &OnShouldWrite);
} }
conn_glue(uv_loop_t* loop, llarp_tcp_acceptor* tcp, const sockaddr* addr) conn_glue(uv_loop_t* loop, llarp_tcp_acceptor* tcp, const sockaddr* addr)
: m_TCP(nullptr), m_Accept(tcp), m_Addr(*addr), m_WriteQueue(32) : m_TCP(nullptr), m_Accept(tcp), m_Addr(*addr)
{ {
m_Connect.data = nullptr; m_Connect.data = nullptr;
m_Handle.data = this; m_Handle.data = this;
uv_tcp_init(loop, &m_Handle); uv_tcp_init(loop, &m_Handle);
m_Ticker.data = this; m_Ticker.data = this;
uv_check_init(loop, &m_Ticker); uv_check_init(loop, &m_Ticker);
m_Accept->close = &ExplicitCloseAccept; m_Accept->close = &ExplicitCloseAccept;
m_Conn.write = nullptr; m_Conn.write = nullptr;
m_Conn.closed = nullptr; m_Conn.closed = nullptr;
m_WriteNotify.data = this;
uv_async_init(loop, &m_WriteNotify, &OnShouldWrite);
} }
conn_glue(conn_glue* parent) conn_glue(conn_glue* parent) : m_TCP(nullptr), m_Accept(nullptr)
: m_TCP(nullptr), m_Accept(nullptr), m_WriteQueue(32)
{ {
m_Connect.data = nullptr; m_Connect.data = nullptr;
m_Conn.close = &ExplicitClose; m_Conn.close = &ExplicitClose;
@ -97,8 +92,6 @@ namespace libuv
uv_tcp_init(parent->m_Handle.loop, &m_Handle); uv_tcp_init(parent->m_Handle.loop, &m_Handle);
m_Ticker.data = this; m_Ticker.data = this;
uv_check_init(parent->m_Handle.loop, &m_Ticker); uv_check_init(parent->m_Handle.loop, &m_Ticker);
m_WriteNotify.data = this;
uv_async_init(parent->m_Handle.loop, &m_WriteNotify, &OnShouldWrite);
} }
static void static void
@ -221,45 +214,19 @@ namespace libuv
{ {
if(uv_is_closing((const uv_handle_t*)&m_Handle)) if(uv_is_closing((const uv_handle_t*)&m_Handle))
return -1; return -1;
if(uv_is_closing((const uv_handle_t*)&m_WriteNotify)) WriteEvent* ev = new WriteEvent(sz, data);
return -1; auto buf = ev->Buffer();
WriteBuffer_t buf(sz); if(uv_write(ev->Request(), Stream(), &buf, 1, &OnWritten) == 0)
std::copy_n(data, sz, buf.begin()); return sz;
int result = -1; delete ev;
if(m_WriteQueue.tryPushBack(std::move(buf)) return -1;
== llarp::thread::QueueReturn::Success)
{
result = sz;
}
else
{
WriteFail();
}
uv_async_send(&m_WriteNotify);
return result;
}
void
FlushWrite()
{
while(not m_WriteQueue.empty())
{
auto data = m_WriteQueue.popFront();
WriteEvent* ev = new WriteEvent(std::move(data));
auto buf = ev->Buffer();
if(uv_write(ev->Request(), Stream(), &buf, 1, &OnWritten) != 0)
{
Close();
return;
}
}
} }
static void static void
OnClosed(uv_handle_t* h) OnClosed(uv_handle_t* h)
{ {
conn_glue* conn = static_cast< conn_glue* >(h->data); conn_glue* conn = static_cast< conn_glue* >(h->data);
LoopCall(h, std::bind(&conn_glue::HandleClosed, conn)); conn->HandleClosed();
} }
static void static void
@ -299,25 +266,17 @@ namespace libuv
delete shut; delete shut;
} }
static void
OnWriteClosed(uv_handle_t* h)
{
conn_glue* conn = static_cast< conn_glue* >(h->data);
auto* shut = new uv_shutdown_t();
shut->data = conn;
uv_shutdown(shut, conn->Stream(), &OnShutdown);
}
void void
Close() override Close() override
{ {
if(uv_is_closing((uv_handle_t*)&m_WriteNotify)) if(uv_is_closing((uv_handle_t*)Stream()))
return; return;
llarp::LogDebug("close tcp connection"); llarp::LogDebug("close tcp connection");
m_WriteQueue.disable();
uv_close((uv_handle_t*)&m_WriteNotify, &OnWriteClosed);
uv_check_stop(&m_Ticker); uv_check_stop(&m_Ticker);
uv_read_stop(Stream()); uv_read_stop(Stream());
auto* shut = new uv_shutdown_t();
shut->data = this;
uv_shutdown(shut, Stream(), &OnShutdown);
} }
static void static void
@ -326,7 +285,7 @@ namespace libuv
if(status == 0) if(status == 0)
{ {
conn_glue* conn = static_cast< conn_glue* >(stream->data); conn_glue* conn = static_cast< conn_glue* >(stream->data);
LoopCall(stream, std::bind(&conn_glue::Accept, conn)); conn->Accept();
} }
else else
{ {
@ -334,17 +293,11 @@ namespace libuv
} }
} }
static void
OnShouldWrite(uv_async_t* h)
{
static_cast< conn_glue* >(h->data)->FlushWrite();
}
static void static void
OnTick(uv_check_t* t) OnTick(uv_check_t* t)
{ {
conn_glue* conn = static_cast< conn_glue* >(t->data); conn_glue* conn = static_cast< conn_glue* >(t->data);
LoopCall(t, std::bind(&conn_glue::Tick, conn)); conn->Tick();
} }
void void

Loading…
Cancel
Save