@ -171,11 +171,17 @@ namespace llarp
return false ;
return false ;
}
}
virtual void
flush_write ( )
{
flush_write_buffers ( 0 ) ;
}
/// called in event loop when fd is ready for writing
/// called in event loop when fd is ready for writing
/// requeues anything not written
/// requeues anything not written
/// this assumes fd is set to non blocking
/// this assumes fd is set to non blocking
virtual void
virtual void
flush_write ( )
flush_write _buffers ( size_t amount )
{
{
if ( m_LossyWriteQueue )
if ( m_LossyWriteQueue )
m_LossyWriteQueue - > Process ( [ & ] ( WriteBuffer & buffer ) {
m_LossyWriteQueue - > Process ( [ & ] ( WriteBuffer & buffer ) {
@ -185,34 +191,58 @@ namespace llarp
} ) ;
} ) ;
else if ( m_BlockingWriteQueue )
else if ( m_BlockingWriteQueue )
{
{
// write buffers
if ( amount )
while ( m_BlockingWriteQueue - > size ( ) )
{
{
auto & itr = m_BlockingWriteQueue - > front ( ) ;
while ( amount & & m_BlockingWriteQueue - > size ( ) )
ssize_t result = do_write ( itr . buf , itr . bufsz ) ;
if ( result = = - 1 )
return ;
ssize_t dlt = itr . bufsz - result ;
if ( dlt > 0 )
{
{
// queue remaining to front of queue
auto & itr = m_BlockingWriteQueue - > front ( ) ;
WriteBuffer buff ( itr . buf + dlt , itr . bufsz - dlt ) ;
ssize_t result = do_write ( itr . buf , std : : min ( amount , itr . bufsz ) ) ;
if ( result = = - 1 )
return ;
ssize_t dlt = itr . bufsz - result ;
if ( dlt > 0 )
{
// queue remaining to front of queue
WriteBuffer buff ( itr . buf + dlt , itr . bufsz - dlt ) ;
m_BlockingWriteQueue - > pop_front ( ) ;
m_BlockingWriteQueue - > push_front ( buff ) ;
// TODO: errno?
return ;
}
m_BlockingWriteQueue - > pop_front ( ) ;
m_BlockingWriteQueue - > pop_front ( ) ;
m_BlockingWriteQueue - > push_front ( buff ) ;
amount - = result ;
// TODO: errno?
return ;
}
}
m_BlockingWriteQueue - > pop_front ( ) ;
}
if ( errno = = EAGAIN | | errno = = EWOULDBLOCK )
else
{
// write buffers
while ( m_BlockingWriteQueue - > size ( ) )
{
{
errno = 0 ;
auto & itr = m_BlockingWriteQueue - > front ( ) ;
return ;
ssize_t result = do_write ( itr . buf , itr . bufsz ) ;
if ( result = = - 1 )
return ;
ssize_t dlt = itr . bufsz - result ;
if ( dlt > 0 )
{
// queue remaining to front of queue
WriteBuffer buff ( itr . buf + dlt , itr . bufsz - dlt ) ;
m_BlockingWriteQueue - > pop_front ( ) ;
m_BlockingWriteQueue - > push_front ( buff ) ;
// TODO: errno?
return ;
}
m_BlockingWriteQueue - > pop_front ( ) ;
if ( errno = = EAGAIN | | errno = = EWOULDBLOCK )
{
errno = 0 ;
return ;
}
}
}
}
}
}
}
/// reset errno
/// reset errno
errno = 0 ;
errno = 0 ;
SetLastError ( 0 ) ;
}
}
std : : unique_ptr < LossyWriteQueue_t > m_LossyWriteQueue ;
std : : unique_ptr < LossyWriteQueue_t > m_LossyWriteQueue ;
@ -439,6 +469,7 @@ namespace llarp
// finally create aliases by platform
// finally create aliases by platform
# ifdef _WIN32
# ifdef _WIN32
using ev_io = win32_ev_io ;
using ev_io = win32_ev_io ;
# define sizeof(sockaddr_un) 115
# else
# else
using ev_io = posix_ev_io ;
using ev_io = posix_ev_io ;
# endif
# endif