unit tests for llarp_ev_pkt_pipe

pull/495/head
Jeff Becker 5 years ago
parent 48254c8ea0
commit ac69213dd7
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -448,7 +448,7 @@ namespace llarp
{
auto& itr = m_BlockingWriteQueue->front();
ssize_t result = do_write(itr.buf, std::min(amount, itr.bufsz));
if(result == -1)
if(result <= 0)
return;
ssize_t dlt = itr.bufsz - result;
if(dlt > 0)
@ -471,7 +471,7 @@ namespace llarp
{
auto& itr = m_BlockingWriteQueue->front();
ssize_t result = do_write(itr.buf, itr.bufsz);
if(result == -1)
if(result <= 0)
{
errno = 0;
return;

@ -1 +1,59 @@
#include <ev/pipe.hpp>
#include <unistd.h>
#include <fcntl.h>
llarp_ev_pkt_pipe::llarp_ev_pkt_pipe(llarp_ev_loop_ptr loop)
: llarp::ev_io(-1, new LosslessWriteQueue_t()), m_Loop(loop)
{
}
bool
llarp_ev_pkt_pipe::Start()
{
int _fds[2];
if(pipe2(_fds, O_DIRECT | O_NONBLOCK) == -1)
return false;
fd = _fds[0];
writefd = _fds[1];
return true;
}
int
llarp_ev_pkt_pipe::read(byte_t* pkt, size_t sz)
{
auto res = ::read(fd, pkt, sz);
if(res <= 0)
return res;
llarp::LogDebug("read ", res, " on pipe");
llarp_buffer_t buf(pkt, res);
OnRead(buf);
return res;
}
ssize_t
llarp_ev_pkt_pipe::do_write(void* buf, size_t sz)
{
llarp::LogInfo("pipe write ", sz);
return ::write(writefd, buf, sz);
}
bool
llarp_ev_pkt_pipe::Write(const llarp_buffer_t& pkt)
{
const ssize_t sz = pkt.sz;
llarp::LogDebug("write ", sz, " on pipe");
if(::write(writefd, pkt.base, pkt.sz) != sz)
{
llarp::LogDebug("queue write ", pkt.sz);
return queue_write(pkt.base, pkt.sz);
}
return true;
}
bool
llarp_ev_pkt_pipe::tick()
{
llarp::ev_io::flush_write();
return true;
}

@ -3,13 +3,37 @@
#include <ev/ev.hpp>
struct llarp_ev_pipe
/// a unidirectional packet pipe
struct llarp_ev_pkt_pipe : public llarp::ev_io
{
llarp_ev_pipe(llarp_ev_loop_ptr loop);
llarp_ev_pkt_pipe(llarp_ev_loop_ptr loop);
/// start the pipe, initialize fds
bool
Start();
/// write to the pipe from outside the event loop
/// returns true on success
/// returns false on failure
bool
Write(const llarp_buffer_t& buf);
/// override me to handle a packet from the other side in the owned event loop
virtual void
OnRead(const llarp_buffer_t& buf) = 0;
ssize_t
do_write(void* buf, size_t sz) override;
virtual bool
tick() override;
int
read(byte_t* buf, size_t sz) override;
private:
std::pair< int, int > m_EventFDs;
std::pair< int, int > m_ExternFDs;
llarp_ev_loop_ptr m_Loop;
int writefd;
};
#endif

@ -15,6 +15,7 @@ list(APPEND TEST_SRC
dht/test_llarp_dht_tx.cpp
dht/test_llarp_dht_txowner.cpp
dns/test_llarp_dns_dns.cpp
ev/test_ev_loop.cpp
exit/test_llarp_exit_context.cpp
link/test_llarp_link.cpp
net/test_llarp_net_inaddr.cpp

@ -0,0 +1,134 @@
#include <crypto/crypto_libsodium.hpp>
#include <ev/ev.h>
#include <ev/pipe.hpp>
#include <util/aligned.hpp>
#include <util/logic.hpp>
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include <functional>
using TestPipeReadFunc = std::function< bool(const llarp_buffer_t) >;
struct EventLoopTest : public ::testing::Test
{
llarp_ev_loop_ptr loop;
llarp::Logic _logic;
llarp::sodium::CryptoLibSodium crypto;
static void
OnTimeout(void* u, uint64_t, uint64_t left)
{
if(left)
return;
static_cast< EventLoopTest* >(u)->StopFail();
}
void
SetUp()
{
loop = llarp_make_ev_loop();
_logic.call_later({10000, this, &OnTimeout});
}
void
StopFail()
{
Stop();
ASSERT_FALSE(true);
}
void
Stop()
{
llarp_ev_loop_stop(loop.get());
_logic.stop();
}
void
TearDown()
{
Stop();
loop.reset();
}
void
RunLoop()
{
llarp_ev_loop_run_single_process(loop, _logic.thread, &_logic);
}
};
TEST_F(EventLoopTest, PipeWriteOne)
{
llarp::AlignedBuffer< 32 > data;
data.Randomize();
llarp_buffer_t other(data);
struct TestPipe : public llarp_ev_pkt_pipe
{
const llarp_buffer_t& other;
std::function< void(void) > stop;
TestPipe(llarp_buffer_t& buf, llarp_ev_loop_ptr l,
std::function< void(void) > _stop)
: llarp_ev_pkt_pipe(l), other(buf), stop(_stop)
{
}
void
OnRead(const llarp_buffer_t& buf) override
{
ASSERT_EQ(buf.sz, other.sz);
ASSERT_EQ(memcmp(buf.base, other.base, other.sz), 0);
stop();
}
};
TestPipe* testpipe = new TestPipe(other, loop, [&]() { Stop(); });
ASSERT_TRUE(testpipe->Start());
ASSERT_TRUE(loop->add_ev(testpipe, false));
ASSERT_TRUE(testpipe->Write(other));
RunLoop();
}
TEST_F(EventLoopTest, PipeWrite1K)
{
struct TestPipe : public llarp_ev_pkt_pipe
{
using Data_t = std::vector< llarp::AlignedBuffer< 1500 > >;
Data_t data;
size_t idx = 0;
std::function< void(void) > stop;
TestPipe(size_t num, llarp_ev_loop_ptr l, std::function< void(void) > _stop)
: llarp_ev_pkt_pipe(l), stop(_stop)
{
data.resize(num);
for(auto& d : data)
d.Randomize();
}
void
OnRead(const llarp_buffer_t& buf) override
{
llarp_buffer_t other(data[idx]);
ASSERT_EQ(buf.sz, other.sz);
ASSERT_EQ(memcmp(buf.base, other.base, other.sz), 0);
++idx;
if(idx < data.size())
PumpIt();
else
stop();
}
void
PumpIt()
{
llarp_buffer_t buf(data[idx]);
ASSERT_TRUE(Write(buf));
}
};
TestPipe* testpipe = new TestPipe(1000, loop, [&]() { Stop(); });
ASSERT_TRUE(testpipe->Start());
ASSERT_TRUE(loop->add_ev(testpipe, false));
testpipe->PumpIt();
RunLoop();
};
Loading…
Cancel
Save