@ -7,9 +7,319 @@
# include <net.h>
# include <net.hpp>
# include <windows.h>
# include <process.h>
# include <cstdio>
// io packet for TUN read/write
struct asio_evt_pkt
{
OVERLAPPED pkt = {
0 , 0 , 0 , 0 , nullptr } ; // must be first, since this is part of the IO call
bool write = false ; // true, or false if read pkt
size_t sz ; // should match the queued data size, if not try again?
void * buf ; // must remain valid until we get notification; this is _supposed_
// to be zero-copy
} ;
struct win32_tun_io ;
extern " C " DWORD FAR PASCAL
tun_ev_loop ( void * unused ) ;
// list of TUN listeners (useful for exits or other nodes with multiple TUNs
std : : list < win32_tun_io * > tun_listeners ;
// a single event queue for the TUN interface
HANDLE tun_event_queue =
INVALID_HANDLE_VALUE ; // we pass this to the event loop thread procedure
// upon setup
// we hand the kernel our thread handles to process completion events
HANDLE * kThreadPool ;
void
begin_tun_loop ( int nThreads )
{
kThreadPool = new HANDLE [ nThreads ] ;
for ( int i = 0 ; i < nThreads ; + + i )
{
kThreadPool [ i ] =
CreateThread ( nullptr , 0 , & tun_ev_loop , nullptr , 0 , nullptr ) ;
}
llarp : : LogInfo ( " created " , nThreads , " threads for TUN event queue " ) ;
}
// A different kind of event loop,
// more suited for the native Windows NT
// event model
struct win32_tun_io
{
llarp_tun_io * t ;
device * tunif ;
byte_t readbuf [ EV_READ_BUF_SZ ] = { 0 } ;
struct WriteBuffer
{
llarp_time_t timestamp = 0 ;
size_t bufsz ;
byte_t buf [ EV_WRITE_BUF_SZ ] ;
WriteBuffer ( ) = default ;
WriteBuffer ( const byte_t * ptr , size_t sz )
{
if ( sz < = sizeof ( buf ) )
{
bufsz = sz ;
memcpy ( buf , ptr , bufsz ) ;
}
else
bufsz = 0 ;
}
struct GetTime
{
llarp_time_t
operator ( ) ( const WriteBuffer & buf ) const
{
return buf . timestamp ;
}
} ;
struct GetNow
{
void * loop ;
GetNow ( void * l ) : loop ( l )
{
}
llarp_time_t
operator ( ) ( ) const
{
return llarp : : time_now_ms ( ) ;
}
} ;
struct PutTime
{
void * loop ;
PutTime ( void * l ) : loop ( l )
{
}
void
operator ( ) ( WriteBuffer & buf )
{
buf . timestamp = llarp : : time_now_ms ( ) ;
}
} ;
struct Compare
{
bool
operator ( ) ( const WriteBuffer & left , const WriteBuffer & right ) const
{
return left . timestamp < right . timestamp ;
}
} ;
} ;
using LossyWriteQueue_t =
llarp : : util : : CoDelQueue < WriteBuffer , WriteBuffer : : GetTime ,
WriteBuffer : : PutTime , WriteBuffer : : Compare ,
WriteBuffer : : GetNow , llarp : : util : : NullMutex ,
llarp : : util : : NullLock , 5 , 100 , 128 > ;
std : : unique_ptr < LossyWriteQueue_t > m_LossyWriteQueue ;
win32_tun_io ( llarp_tun_io * tio ) : t ( tio ) , tunif ( tuntap_init ( ) )
{
// This is not your normal everyday event loop, this is _advanced_ event handling :>
m_LossyWriteQueue = std : : make_unique < LossyWriteQueue_t > ( " win32_tun_queue " , nullptr , nullptr ) ;
} ;
bool
queue_write ( const byte_t * buf , size_t sz )
{
if ( m_LossyWriteQueue )
{
m_LossyWriteQueue - > Emplace ( buf , sz ) ;
flush_write ( ) ;
return true ;
}
else
return false ;
}
bool
setup ( )
{
if ( tuntap_start ( tunif , TUNTAP_MODE_TUNNEL , 0 ) = = - 1 )
{
llarp : : LogWarn ( " failed to start interface " ) ;
return false ;
}
if ( tuntap_set_ip ( tunif , t - > ifaddr , t - > ifaddr , t - > netmask ) = = - 1 )
{
llarp : : LogWarn ( " failed to set ip " ) ;
return false ;
}
if ( tuntap_up ( tunif ) = = - 1 )
{
char ebuf [ 1024 ] ;
int err = GetLastError ( ) ;
FormatMessage ( FORMAT_MESSAGE_FROM_SYSTEM , nullptr , err , LANG_NEUTRAL ,
ebuf , 1024 , nullptr ) ;
llarp : : LogWarn ( " failed to put interface up: " , ebuf ) ;
return false ;
}
if ( tunif - > tun_fd = = INVALID_HANDLE_VALUE )
return false ;
return true ;
}
// first TUN device gets to set up the event port
bool
add_ev ( )
{
if ( tun_event_queue = = INVALID_HANDLE_VALUE )
{
SYSTEM_INFO sys_info ;
GetSystemInfo ( & sys_info ) ;
unsigned long numCPU = sys_info . dwNumberOfProcessors ;
// let the system handle 2x the number of CPUs or hardware
// threads
tun_event_queue = CreateIoCompletionPort ( tunif - > tun_fd , nullptr ,
( ULONG_PTR ) this , numCPU * 2 ) ;
begin_tun_loop ( numCPU * 2 ) ;
}
else
CreateIoCompletionPort ( tunif - > tun_fd , tun_event_queue , ( ULONG_PTR ) this ,
0 ) ;
// we're already non-blocking
// add to list
tun_listeners . push_back ( this ) ;
read ( readbuf , 4096 ) ;
return true ;
}
// places data in event queue for kernel to process
void
do_write ( void * data , size_t sz )
{
llarp : : LogInfo ( " writing some data " ) ;
asio_evt_pkt * pkt = new asio_evt_pkt ;
pkt - > buf = data ;
pkt - > sz = sz ;
pkt - > write = true ;
memset ( & pkt - > pkt , ' \0 ' , sizeof ( pkt - > pkt ) ) ;
WriteFile ( tunif - > tun_fd , data , sz , nullptr , & pkt - > pkt ) ;
}
// we call this one when we get a packet in the event port
// which then kicks off another write
void
flush_write ( )
{
if ( t - > before_write )
t - > before_write ( t ) ;
m_LossyWriteQueue - > Process ( [ & ] ( WriteBuffer & buffer ) {
do_write ( buffer . buf , buffer . bufsz ) ;
// we are NEVER going to block
// because Windows NT implements true async io
} ) ;
}
void
read ( byte_t * buf , size_t sz )
{
asio_evt_pkt * pkt = new asio_evt_pkt ;
pkt - > buf = buf ;
memset ( & pkt - > pkt , ' \0 ' , sizeof ( OVERLAPPED ) ) ;
pkt - > sz = sz ;
pkt - > write = false ;
ReadFile ( tunif - > tun_fd , buf , sz , nullptr , & pkt - > pkt ) ;
}
~ win32_tun_io ( )
{
CancelIo ( tunif - > tun_fd ) ;
if ( tunif - > tun_fd )
tuntap_destroy ( tunif ) ;
}
} ;
// and now the event loop itself
extern " C " DWORD FAR PASCAL
tun_ev_loop ( void * unused )
{
UNREFERENCED_PARAMETER ( unused ) ;
DWORD size = 0 ;
OVERLAPPED * ovl = nullptr ;
ULONG_PTR listener = 0 ;
asio_evt_pkt * pkt = nullptr ;
BOOL alert ;
while ( true )
{
alert =
GetQueuedCompletionStatus ( tun_event_queue , & size , & listener , & ovl , 100 ) ;
if ( ! alert )
continue ; // let's go at it once more
if ( listener = = ( ULONG_PTR ) ~ 0 )
break ;
// if we're here, then we got something interesting :>
pkt = ( asio_evt_pkt * ) ovl ;
win32_tun_io * ev = reinterpret_cast < win32_tun_io * > ( listener ) ;
if ( ! pkt - > write )
{
//llarp::LogInfo("read tun ", size, " bytes, pass to handler");
if ( ev - > t - > recvpkt )
ev - > t - > recvpkt ( ev - > t , llarp : : InitBuffer ( pkt - > buf , size ) ) ;
ev - > read ( ev - > readbuf , sizeof ( ev - > readbuf ) ) ;
}
else
{
llarp : : LogInfo ( " write " , size , " bytes to tunnel interface " ) ;
// ok let's queue another read!
ev - > read ( ev - > readbuf , sizeof ( ev - > readbuf ) ) ;
}
delete pkt ; // don't leak
}
llarp : : LogInfo ( " exit TUN event loop thread from system managed thread pool " ) ;
return 0 ;
}
void
exit_tun_loop ( )
{
// if we get all-ones in the queue, thread exits, and we clean up
PostQueuedCompletionStatus ( tun_event_queue , 0 , ~ 0 , nullptr ) ;
// kill the kernel's thread pool
int i = ( & kThreadPool ) [ 1 ] - kThreadPool ; // get the size of our thread pool
llarp : : LogInfo ( " closing " , i , " threads " ) ;
WaitForMultipleObjects ( i , kThreadPool , TRUE , INFINITE ) ;
for ( int j = 0 ; j < i ; + + j )
CloseHandle ( kThreadPool [ j ] ) ;
delete [ ] kThreadPool ;
// the IOCP refcount is decreased each time an associated fd
// is closed
// the fds are closed in their destructors
// once we get to zero, we can safely close the event port
auto itr = tun_listeners . begin ( ) ;
while ( itr ! = tun_listeners . end ( ) )
{
delete ( * itr ) ;
itr = tun_listeners . erase ( itr ) ;
}
CloseHandle ( tun_event_queue ) ;
}
namespace llarp
{
int
@ -18,7 +328,7 @@ namespace llarp
if ( _shouldClose )
return - 1 ;
ssize_t amount = uread ( fd .socket , ( char * ) buf , sz ) ;
ssize_t amount = uread ( fd , ( char * ) buf , sz ) ;
if ( amount > 0 )
{
@ -46,7 +356,7 @@ namespace llarp
{
if ( _shouldClose )
return - 1 ;
return uwrite ( fd .socket , ( char * ) buf , sz ) ;
return uwrite ( fd , ( char * ) buf , sz ) ;
}
void
@ -57,7 +367,7 @@ namespace llarp
slen = 115 ;
else if ( _addr . ss_family = = AF_INET6 )
slen = sizeof ( sockaddr_in6 ) ;
int result = : : connect ( fd .socket , ( const sockaddr * ) & _addr , slen ) ;
int result = : : connect ( fd , ( const sockaddr * ) & _addr , slen ) ;
if ( result = = 0 )
{
llarp : : LogDebug ( " connected immedidately " ) ;
@ -85,14 +395,14 @@ namespace llarp
int
tcp_serv : : read ( byte_t * , size_t )
{
int new_fd = : : accept ( fd .socket , nullptr , nullptr ) ;
int new_fd = : : accept ( fd , nullptr , nullptr ) ;
if ( new_fd = = - 1 )
{
char ebuf [ 1024 ] ;
int err = WSAGetLastError ( ) ;
FormatMessage ( FORMAT_MESSAGE_FROM_SYSTEM , nullptr , err , LANG_NEUTRAL ,
ebuf , 1024 , nullptr ) ;
llarp : : LogError ( " failed to accept on " , fd .socket , " : " , ebuf ) ;
llarp : : LogError ( " failed to accept on " , fd , " : " , ebuf ) ;
return - 1 ;
}
// build handler
@ -109,113 +419,6 @@ namespace llarp
return - 1 ;
}
struct tun : public ev_io
{
llarp_tun_io * t ;
device * tunif ;
tun ( llarp_tun_io * tio , llarp_ev_loop * l )
: ev_io ( INVALID_HANDLE_VALUE ,
new LossyWriteQueue_t ( " win32_tun_write_queue " , l , l ) )
, t ( tio )
, tunif ( tuntap_init ( ) )
{
this - > is_tun = true ;
} ;
int
sendto ( const sockaddr * to , const void * data , size_t sz )
{
UNREFERENCED_PARAMETER ( to ) ;
UNREFERENCED_PARAMETER ( data ) ;
UNREFERENCED_PARAMETER ( sz ) ;
return - 1 ;
}
void
flush_write ( )
{
if ( t - > before_write )
{
t - > before_write ( t ) ;
ev_io : : flush_write ( ) ;
}
}
bool
tick ( )
{
if ( t - > tick )
t - > tick ( t ) ;
flush_write ( ) ;
return true ;
}
virtual ssize_t
do_write ( void * buf , size_t sz )
{
DWORD x ;
bool r ;
asio_evt_pkt * pkt = new asio_evt_pkt ;
memset ( pkt , 0 , sizeof ( asio_evt_pkt ) ) ;
pkt - > sz = sz ;
pkt - > write = true ;
int e = 0 ;
r = WriteFile ( fd . tun , buf , sz , & x , & pkt - > pkt ) ;
if ( r ) // we returned immediately
return x ;
e = GetLastError ( ) ;
if ( e = = ERROR_IO_PENDING )
return sz ;
else
return - 1 ;
}
int
read ( byte_t * buf , size_t sz )
{
ssize_t ret = tuntap_read ( tunif , buf , sz ) ;
if ( ret > 0 & & t - > recvpkt )
t - > recvpkt ( t , llarp : : InitBuffer ( buf , ret ) ) ;
return ret ;
}
bool
setup ( )
{
if ( tuntap_start ( tunif , TUNTAP_MODE_TUNNEL , 0 ) = = - 1 )
{
llarp : : LogWarn ( " failed to start interface " ) ;
return false ;
}
if ( tuntap_set_ip ( tunif , t - > ifaddr , t - > ifaddr , t - > netmask ) = = - 1 )
{
llarp : : LogWarn ( " failed to set ip " ) ;
return false ;
}
if ( tuntap_up ( tunif ) = = - 1 )
{
char ebuf [ 1024 ] ;
int err = GetLastError ( ) ;
FormatMessage ( FORMAT_MESSAGE_FROM_SYSTEM , nullptr , err , LANG_NEUTRAL ,
ebuf , 1024 , nullptr ) ;
llarp : : LogWarn ( " failed to put interface up: " , ebuf ) ;
return false ;
}
fd . tun = tunif - > tun_fd ;
if ( fd . tun = = INVALID_HANDLE_VALUE )
return false ;
// we're already non-blocking
return true ;
}
~ tun ( )
{
}
} ;
struct udp_listener : public ev_io
{
llarp_udp_io * udp ;
@ -243,7 +446,7 @@ namespace llarp
sockaddr_in6 src ;
socklen_t slen = sizeof ( sockaddr_in6 ) ;
sockaddr * addr = ( sockaddr * ) & src ;
ssize_t ret = : : recvfrom ( fd .socket , ( char * ) b . base , sz , 0 , addr , & slen ) ;
ssize_t ret = : : recvfrom ( fd , ( char * ) b . base , sz , 0 , addr , & slen ) ;
if ( ret < 0 )
return - 1 ;
if ( static_cast < size_t > ( ret ) > sz )
@ -268,7 +471,7 @@ namespace llarp
default :
return - 1 ;
}
ssize_t sent = : : sendto ( fd .socket , ( char * ) data , sz , 0 , to , slen ) ;
ssize_t sent = : : sendto ( fd , ( char * ) data , sz , 0 , to , slen ) ;
if ( sent = = - 1 )
{
char ebuf [ 1024 ] ;
@ -286,9 +489,8 @@ namespace llarp
struct llarp_win32_loop : public llarp_ev_loop
{
upoll_t * upollfd ;
HANDLE tun_event_queue ;
llarp_win32_loop ( ) : upollfd ( nullptr ) , tun_event_queue ( INVALID_HANDLE_VALUE )
llarp_win32_loop ( ) : upollfd ( nullptr )
{
}
@ -320,10 +522,8 @@ struct llarp_win32_loop : public llarp_ev_loop
// for now, use the ID numbers directly until this comes out of
// beta
else if ( bindaddr - > sa_family = = AF_UNIX )
{
sz = sizeof ( sockaddr_un ) ; // current size in 10.0.17763, verify each time
// the beta PSDK is updated
}
sz = sizeof ( sockaddr_un ) ;
if ( : : bind ( fd , bindaddr , sz ) = = - 1 )
{
uclose ( fd ) ;
@ -342,7 +542,7 @@ struct llarp_win32_loop : public llarp_ev_loop
{
auto ev = create_udp ( l , src ) ;
if ( ev )
l - > fd = ev - > fd .socket ;
l - > fd = ev - > fd ;
return ev & & add_ev ( ev , false ) ;
}
@ -350,14 +550,12 @@ struct llarp_win32_loop : public llarp_ev_loop
{
if ( upollfd )
upoll_destroy ( upollfd ) ;
if ( tun_event_queue ! = INVALID_HANDLE_VALUE )
CloseHandle ( tun_event_queue ) ;
}
bool
running ( ) const
{
return ( upollfd ! = nullptr ) & & ( tun_event_queue ! = INVALID_HANDLE_VALUE ) ;
return ( upollfd ! = nullptr ) ;
}
bool
@ -365,16 +563,11 @@ struct llarp_win32_loop : public llarp_ev_loop
{
if ( ! upollfd )
upollfd = upoll_create ( 1 ) ;
if ( tun_event_queue = = INVALID_HANDLE_VALUE )
tun_event_queue =
CreateIoCompletionPort ( INVALID_HANDLE_VALUE , nullptr , 0 , 0 ) ;
return upollfd & & ( tun_event_queue ! = INVALID_HANDLE_VALUE ) ;
return upollfd ! = nullptr ;
}
// OK, the event loop, as it exists now, will _only_
// work on sockets (and not very efficiently at that).
// This will NOT work on device files like /dev/tun
// on Windows
int
tick ( int ms )
{
@ -409,32 +602,6 @@ struct llarp_win32_loop : public llarp_ev_loop
}
}
DWORD size = 0 ;
OVERLAPPED * ovl = nullptr ;
ULONG_PTR listener = 0 ;
asio_evt_pkt * pkt = nullptr ;
while (
GetQueuedCompletionStatus ( tun_event_queue , & size , & listener , & ovl , ms ) )
{
pkt = ( asio_evt_pkt * ) ovl ;
llarp : : ev_io * ev = reinterpret_cast < llarp : : ev_io * > ( listener ) ;
/*if(size != pkt->sz)
llarp : : LogWarn ( " incomplete async io operation: got " , size ,
" bytes, expected " , pkt - > sz , " bytes " ) ; */
if ( ! pkt - > write )
{
ev - > read ( readbuf , size ) ;
printf ( " read tun \n " ) ;
}
else
{
ev - > flush_write_buffers ( pkt - > sz ) ;
printf ( " write tun \n " ) ;
}
+ + result ;
delete pkt ; // don't leak
}
if ( result ! = - 1 )
tick_listeners ( ) ;
return result ;
@ -531,24 +698,14 @@ struct llarp_win32_loop : public llarp_ev_loop
bool
close_ev ( llarp : : ev_io * ev )
{
if ( ev - > is_tun )
{
CancelIo ( ev - > fd . tun ) ;
CloseHandle ( ev - > fd . tun ) ;
return true ;
}
return upoll_ctl ( upollfd , UPOLL_CTL_DEL , ev - > fd . socket , nullptr ) ! = - 1 ;
return upoll_ctl ( upollfd , UPOLL_CTL_DEL , ev - > fd , nullptr ) ! = - 1 ;
}
// no tunnels here
llarp : : ev_io *
create_tun ( llarp_tun_io * tun )
{
llarp : : tun * t = new llarp : : tun ( tun , this ) ;
if ( t - > setup ( ) )
{
return t ;
}
delete t ;
UNREFERENCED_PARAMETER ( tun ) ;
return nullptr ;
}
@ -566,26 +723,16 @@ struct llarp_win32_loop : public llarp_ev_loop
bool
add_ev ( llarp : : ev_io * e , bool write )
{
if ( e - > is_tun )
{
asio_evt_pkt * pkt = new asio_evt_pkt ;
memset ( pkt , 0 , sizeof ( asio_evt_pkt ) ) ;
pkt - > write = false ;
pkt - > sz = sizeof ( readbuf ) ;
CreateIoCompletionPort ( e - > fd . tun , tun_event_queue , ( ULONG_PTR ) e , 0 ) ;
goto add ;
}
upoll_event_t ev ;
ev . data . ptr = e ;
ev . events = UPOLLIN | UPOLLERR ;
if ( write )
ev . events | = UPOLLOUT ;
if ( upoll_ctl ( upollfd , UPOLL_CTL_ADD , e - > fd .socket , & ev ) = = - 1 )
if ( upoll_ctl ( upollfd , UPOLL_CTL_ADD , e - > fd , & ev ) = = - 1 )
{
delete e ;
return false ;
}
add :
handlers . emplace_back ( e ) ;
return true ;
}
@ -620,20 +767,7 @@ struct llarp_win32_loop : public llarp_ev_loop
if ( upollfd )
upoll_destroy ( upollfd ) ;
upollfd = nullptr ;
if ( tun_event_queue ! = INVALID_HANDLE_VALUE )
{
CloseHandle ( tun_event_queue ) ;
tun_event_queue = INVALID_HANDLE_VALUE ;
}
}
} ;
extern " C " asio_evt_pkt *
getTunEventPkt ( )
{
asio_evt_pkt * newpkt = new asio_evt_pkt ;
memset ( newpkt , 0 , sizeof ( asio_evt_pkt ) ) ;
return newpkt ;
}
# endif