memrory pool for SSU2IncompleteMessage

pull/1854/head
orignal 1 year ago
parent 7196db09d6
commit e8be39af17

@ -851,6 +851,7 @@ namespace transport
m_PacketsPool.CleanUpMt (); m_PacketsPool.CleanUpMt ();
m_SentPacketsPool.CleanUp (); m_SentPacketsPool.CleanUp ();
m_IncompleteMessagesPool.CleanUp ();
m_FragmentsPool.CleanUp (); m_FragmentsPool.CleanUp ();
ScheduleCleanup (); ScheduleCleanup ();
} }

@ -98,8 +98,9 @@ namespace transport
void RescheduleIntroducersUpdateTimerV6 (); void RescheduleIntroducersUpdateTimerV6 ();
i2p::util::MemoryPool<SSU2SentPacket>& GetSentPacketsPool () { return m_SentPacketsPool; }; i2p::util::MemoryPool<SSU2SentPacket>& GetSentPacketsPool () { return m_SentPacketsPool; };
i2p::util::MemoryPool<SSU2IncompleteMessage>& GetIncompleteMessagesPool () { return m_IncompleteMessagesPool; };
i2p::util::MemoryPool<SSU2IncompleteMessage::Fragment>& GetFragmentsPool () { return m_FragmentsPool; }; i2p::util::MemoryPool<SSU2IncompleteMessage::Fragment>& GetFragmentsPool () { return m_FragmentsPool; };
private: private:
boost::asio::ip::udp::socket& OpenSocket (const boost::asio::ip::udp::endpoint& localEndpoint); boost::asio::ip::udp::socket& OpenSocket (const boost::asio::ip::udp::endpoint& localEndpoint);
@ -152,6 +153,7 @@ namespace transport
std::list<i2p::data::IdentHash> m_Introducers, m_IntroducersV6; // introducers we are connected to std::list<i2p::data::IdentHash> m_Introducers, m_IntroducersV6; // introducers we are connected to
i2p::util::MemoryPoolMt<Packet> m_PacketsPool; i2p::util::MemoryPoolMt<Packet> m_PacketsPool;
i2p::util::MemoryPool<SSU2SentPacket> m_SentPacketsPool; i2p::util::MemoryPool<SSU2SentPacket> m_SentPacketsPool;
i2p::util::MemoryPool<SSU2IncompleteMessage> m_IncompleteMessagesPool;
i2p::util::MemoryPool<SSU2IncompleteMessage::Fragment> m_FragmentsPool; i2p::util::MemoryPool<SSU2IncompleteMessage::Fragment> m_FragmentsPool;
boost::asio::deadline_timer m_TerminationTimer, m_CleanupTimer, m_ResendTimer, boost::asio::deadline_timer m_TerminationTimer, m_CleanupTimer, m_ResendTimer,
m_IntroducersUpdateTimer, m_IntroducersUpdateTimerV6; m_IntroducersUpdateTimer, m_IntroducersUpdateTimerV6;

@ -1772,7 +1772,7 @@ namespace transport
} }
else else
{ {
m = std::make_shared<SSU2IncompleteMessage>(); m = m_Server.GetIncompleteMessagesPool ().AcquireShared ();
m_IncompleteMessages.emplace (msgID, m); m_IncompleteMessages.emplace (msgID, m);
} }
m->msg = msg; m->msg = msg;
@ -1791,11 +1791,17 @@ namespace transport
{ {
if (len < 5) return; if (len < 5) return;
uint8_t fragmentNum = buf[0] >> 1; uint8_t fragmentNum = buf[0] >> 1;
if (!fragmentNum || fragmentNum >= SSU2_MAX_NUM_FRAGMENTS)
{
LogPrint (eLogWarning, "SSU2: Invalid follow-on fragment num ", fragmentNum);
return;
}
bool isLast = buf[0] & 0x01; bool isLast = buf[0] & 0x01;
uint32_t msgID; memcpy (&msgID, buf + 1, 4); uint32_t msgID; memcpy (&msgID, buf + 1, 4);
auto it = m_IncompleteMessages.find (msgID); auto it = m_IncompleteMessages.find (msgID);
if (it != m_IncompleteMessages.end ()) if (it != m_IncompleteMessages.end ())
{ {
if (fragmentNum < it->second->nextFragmentNum) return; // duplicate
if (it->second->nextFragmentNum == fragmentNum && fragmentNum < SSU2_MAX_NUM_FRAGMENTS && if (it->second->nextFragmentNum == fragmentNum && fragmentNum < SSU2_MAX_NUM_FRAGMENTS &&
it->second->msg) it->second->msg)
{ {
@ -1823,16 +1829,11 @@ namespace transport
else else
{ {
// follow-on fragment before first fragment // follow-on fragment before first fragment
auto msg = std::make_shared<SSU2IncompleteMessage> (); auto msg = m_Server.GetIncompleteMessagesPool ().AcquireShared ();
msg->nextFragmentNum = 0; msg->nextFragmentNum = 0;
it = m_IncompleteMessages.emplace (msgID, msg).first; it = m_IncompleteMessages.emplace (msgID, msg).first;
} }
// insert out of sequence fragment // insert out of sequence fragment
if (fragmentNum >= SSU2_MAX_NUM_FRAGMENTS)
{
LogPrint (eLogWarning, "SSU2: Fragment number ", fragmentNum, " exceeds ", SSU2_MAX_NUM_FRAGMENTS);
return;
}
auto fragment = m_Server.GetFragmentsPool ().AcquireShared (); auto fragment = m_Server.GetFragmentsPool ().AcquireShared ();
memcpy (fragment->buf, buf + 5, len -5); memcpy (fragment->buf, buf + 5, len -5);
fragment->len = len - 5; fragment->len = len - 5;

@ -345,7 +345,7 @@ namespace transport
uint32_t m_SendPacketNum, m_ReceivePacketNum; uint32_t m_SendPacketNum, m_ReceivePacketNum;
std::set<uint32_t> m_OutOfSequencePackets; // packet nums > receive packet num std::set<uint32_t> m_OutOfSequencePackets; // packet nums > receive packet num
std::map<uint32_t, std::shared_ptr<SSU2SentPacket> > m_SentPackets; // packetNum -> packet std::map<uint32_t, std::shared_ptr<SSU2SentPacket> > m_SentPackets; // packetNum -> packet
std::map<uint32_t, std::shared_ptr<SSU2IncompleteMessage> > m_IncompleteMessages; // I2NP std::unordered_map<uint32_t, std::shared_ptr<SSU2IncompleteMessage> > m_IncompleteMessages; // msgID -> I2NP
std::map<uint32_t, std::pair <std::shared_ptr<SSU2Session>, uint64_t > > m_RelaySessions; // nonce->(Alice, timestamp) for Bob or nonce->(Charlie, timestamp) for Alice std::map<uint32_t, std::pair <std::shared_ptr<SSU2Session>, uint64_t > > m_RelaySessions; // nonce->(Alice, timestamp) for Bob or nonce->(Charlie, timestamp) for Alice
std::map<uint32_t, std::pair <std::shared_ptr<SSU2Session>, uint64_t > > m_PeerTests; // same as for relay sessions std::map<uint32_t, std::pair <std::shared_ptr<SSU2Session>, uint64_t > > m_PeerTests; // same as for relay sessions
std::list<std::shared_ptr<I2NPMessage> > m_SendQueue; std::list<std::shared_ptr<I2NPMessage> > m_SendQueue;

Loading…
Cancel
Save