|
|
|
@ -11,6 +11,8 @@ namespace llarp
|
|
|
|
|
{
|
|
|
|
|
namespace service
|
|
|
|
|
{
|
|
|
|
|
static constexpr size_t SendContextQueueSize = 512;
|
|
|
|
|
|
|
|
|
|
SendContext::SendContext(ServiceInfo ident, const Introduction& intro,
|
|
|
|
|
path::PathSet* send, Endpoint* ep)
|
|
|
|
|
: remoteIdent(std::move(ident))
|
|
|
|
@ -18,6 +20,7 @@ namespace llarp
|
|
|
|
|
, m_PathSet(send)
|
|
|
|
|
, m_DataHandler(ep)
|
|
|
|
|
, m_Endpoint(ep)
|
|
|
|
|
, m_SendQueue(SendContextQueueSize)
|
|
|
|
|
{
|
|
|
|
|
createdAt = ep->Now();
|
|
|
|
|
}
|
|
|
|
@ -25,11 +28,15 @@ namespace llarp
|
|
|
|
|
bool
|
|
|
|
|
SendContext::Send(std::shared_ptr< ProtocolFrame > msg, path::Path_ptr path)
|
|
|
|
|
{
|
|
|
|
|
util::Lock lock(&m_SendQueueMutex);
|
|
|
|
|
m_SendQueue.emplace_back(
|
|
|
|
|
std::make_shared< const routing::PathTransferMessage >(
|
|
|
|
|
*msg, remoteIntro.pathID),
|
|
|
|
|
path);
|
|
|
|
|
if(m_SendQueue.empty() or m_SendQueue.full())
|
|
|
|
|
{
|
|
|
|
|
LogicCall(m_Endpoint->RouterLogic(),
|
|
|
|
|
[self = this]() { self->FlushUpstream(); });
|
|
|
|
|
}
|
|
|
|
|
m_SendQueue.pushBack(
|
|
|
|
|
std::make_pair(std::make_shared< const routing::PathTransferMessage >(
|
|
|
|
|
*msg, remoteIntro.pathID),
|
|
|
|
|
path));
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -39,17 +46,19 @@ namespace llarp
|
|
|
|
|
auto r = m_Endpoint->Router();
|
|
|
|
|
std::unordered_set< path::Path_ptr, path::Path::Ptr_Hash > flushpaths;
|
|
|
|
|
{
|
|
|
|
|
util::Lock lock(&m_SendQueueMutex);
|
|
|
|
|
for(const auto& item : m_SendQueue)
|
|
|
|
|
do
|
|
|
|
|
{
|
|
|
|
|
auto maybe = m_SendQueue.tryPopFront();
|
|
|
|
|
if(not maybe.has_value())
|
|
|
|
|
break;
|
|
|
|
|
auto& item = maybe.value();
|
|
|
|
|
if(item.second->SendRoutingMessage(*item.first, r))
|
|
|
|
|
{
|
|
|
|
|
lastGoodSend = r->Now();
|
|
|
|
|
flushpaths.emplace(item.second);
|
|
|
|
|
m_Endpoint->MarkConvoTagActive(item.first->T.T);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
m_SendQueue.clear();
|
|
|
|
|
} while(not m_SendQueue.empty());
|
|
|
|
|
}
|
|
|
|
|
// flush the select path's upstream
|
|
|
|
|
for(const auto& path : flushpaths)
|
|
|
|
|