Merge pull request #1 from dr7ana/liblokinet-cherrypick

Merging cherry-picks back to testnet branch:
-  #2164
-  #2134
pull/2141/head
dr7ana 1 year ago committed by GitHub
commit a2889174e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -144,6 +144,8 @@ namespace llarp
void
Session::EncryptAndSend(ILinkSession::Packet_t data)
{
if (m_State == State::Closed)
return;
m_EncryptNext.emplace_back(std::move(data));
TriggerPump();
if (!IsEstablished())
@ -179,12 +181,9 @@ namespace llarp
return;
auto close_msg = CreatePacket(Command::eCLOS, 0, 16, 16);
m_Parent->UnmapAddr(m_RemoteAddr);
m_State = State::Closed;
if (m_SentClosed.test_and_set())
return;
EncryptAndSend(std::move(close_msg));
LogInfo(m_Parent->PrintableName(), " closing connection to ", m_RemoteAddr);
m_State = State::Closed;
}
bool
@ -355,7 +354,7 @@ namespace llarp
bool
Session::TimedOut(llarp_time_t now) const
{
if (m_State == State::Ready)
if (m_State == State::Ready || m_State == State::LinkIntro)
{
return now > m_LastRX
&& now - m_LastRX

@ -206,7 +206,6 @@ namespace llarp
std::atomic_flag m_PlaintextEmpty;
llarp::thread::Queue<CryptoQueue_t> m_PlaintextRecv;
std::atomic_flag m_SentClosed;
void
EncryptWorker(CryptoQueue_t msgs);

@ -715,8 +715,7 @@ extern "C"
return;
}
auto on_open = [ctx, localAddr, remote, open_cb](
bool success, void* user_data) {
auto on_open = [ctx, localAddr, remote, open_cb](bool success, void* user_data) {
llarp::log::info(
logcat,
"Quic tunnel {}<->{}.",

@ -298,7 +298,13 @@ namespace llarp::quic
ngtcp2_pkt_info pi;
auto written = ngtcp2_conn_write_connection_close(
conn, &conn.path.path, &pi, u8data(conn.conn_buffer), conn.conn_buffer.size(), &err, get_timestamp());
conn,
&conn.path.path,
&pi,
u8data(conn.conn_buffer),
conn.conn_buffer.size(),
&err,
get_timestamp());
if (written <= 0)
{
log::warning(

@ -135,7 +135,7 @@ namespace llarp::quic
log::info(logcat, "EOF on connection to {}:{}", c.peer().ip, c.peer().port);
if (auto stream = c.data<Stream>())
{
stream->set_eof(); // CloseEvent will send graceful shutdown to other end
stream->set_eof(); // CloseEvent will send graceful shutdown to other end
}
c.close();
});

@ -72,6 +72,7 @@ namespace llarp
_lastTick = llarp::time_now_ms();
m_NextExploreAt = Clock_t::now();
m_Pump = _loop->make_waker([this]() { PumpLL(); });
m_Work = _loop->make_waker([this]() { submit_work(); });
}
Router::~Router()
@ -79,6 +80,15 @@ namespace llarp
llarp_dht_context_free(_dht);
}
void
Router::submit_work()
{
m_lmq->job([work = std::move(m_WorkJobs)]() {
for (const auto& job : work)
job();
});
}
void
Router::PumpLL()
{
@ -482,8 +492,8 @@ namespace llarp
LogError("RC is invalid, not saving");
return false;
}
if (m_isServiceNode)
_nodedb->Put(_rc);
if (IsServiceNode())
_nodedb->Put(rc());
QueueDiskIO([&]() { HandleSaveRC(); });
return true;
}
@ -1631,7 +1641,10 @@ namespace llarp
void
Router::QueueWork(std::function<void(void)> func)
{
m_lmq->job(std::move(func));
_loop->call([this, func = std::move(func)]() mutable {
m_WorkJobs.push_back(std::move(func));
m_Work->Trigger();
});
}
void

@ -78,6 +78,12 @@ namespace llarp
path::BuildLimiter m_PathBuildLimiter;
std::shared_ptr<EventLoopWakeup> m_Pump;
std::shared_ptr<EventLoopWakeup> m_Work;
std::vector<std::function<void()>> m_WorkJobs;
/// submits cpu heavy work from last event loop tick cycle to worker threads.
void
submit_work();
path::BuildLimiter&
pathBuildLimiter() override
@ -196,9 +202,11 @@ namespace llarp
return _vpnPlatform.get();
}
/// queue functionally pure cpu heavy work to be done in another thread.
void
QueueWork(std::function<void(void)> func) override;
/// queue disk io bound work to be done in the disk io thread.
void
QueueDiskIO(std::function<void(void)> func) override;

Loading…
Cancel
Save