pass I2NP message to transport session as shared_ptr

pull/210/head
orignal 9 years ago
parent d65257c7b0
commit 3a63f6775a

@ -83,8 +83,6 @@ namespace transport
m_Socket.close ();
transports.PeerDisconnected (shared_from_this ());
m_Server.RemoveNTCPSession (shared_from_this ());
for (auto it: m_SendQueue)
DeleteI2NPMessage (it);
m_SendQueue.clear ();
if (m_NextMessage)
{
@ -107,7 +105,7 @@ namespace transport
m_DHKeysPair = nullptr;
SendTimeSyncMessage ();
PostI2NPMessage (CreateDatabaseStoreMsg ()); // we tell immediately who we are
PostI2NPMessage (ToSharedI2NPMessage(CreateDatabaseStoreMsg ())); // we tell immediately who we are
transports.PeerConnected (shared_from_this ());
}
@ -600,14 +598,14 @@ namespace transport
return true;
}
void NTCPSession::Send (i2p::I2NPMessage * msg)
void NTCPSession::Send (std::shared_ptr<i2p::I2NPMessage> msg)
{
m_IsSending = true;
boost::asio::async_write (m_Socket, CreateMsgBuffer (msg), boost::asio::transfer_all (),
std::bind(&NTCPSession::HandleSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, std::vector<I2NPMessage *>{ msg }));
std::bind(&NTCPSession::HandleSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, std::vector<std::shared_ptr<I2NPMessage> >{ msg }));
}
boost::asio::const_buffers_1 NTCPSession::CreateMsgBuffer (I2NPMessage * msg)
boost::asio::const_buffers_1 NTCPSession::CreateMsgBuffer (std::shared_ptr<I2NPMessage> msg)
{
uint8_t * sendBuffer;
int len;
@ -616,10 +614,7 @@ namespace transport
{
// regular I2NP
if (msg->offset < 2)
{
LogPrint (eLogError, "Malformed I2NP message");
i2p::DeleteI2NPMessage (msg);
}
LogPrint (eLogError, "Malformed I2NP message"); // TODO:
sendBuffer = msg->GetBuffer () - 2;
len = msg->GetLength ();
htobe16buf (sendBuffer, len);
@ -644,7 +639,7 @@ namespace transport
}
void NTCPSession::Send (const std::vector<I2NPMessage *>& msgs)
void NTCPSession::Send (const std::vector<std::shared_ptr<I2NPMessage> >& msgs)
{
m_IsSending = true;
std::vector<boost::asio::const_buffer> bufs;
@ -654,11 +649,9 @@ namespace transport
std::bind(&NTCPSession::HandleSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, msgs));
}
void NTCPSession::HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector<I2NPMessage *> msgs)
void NTCPSession::HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector<std::shared_ptr<I2NPMessage> > msgs)
{
m_IsSending = false;
for (auto it: msgs)
if (it) i2p::DeleteI2NPMessage (it);
if (ecode)
{
LogPrint (eLogWarning, "Couldn't send msgs: ", ecode.message ());
@ -686,20 +679,16 @@ namespace transport
Send (nullptr);
}
void NTCPSession::SendI2NPMessage (I2NPMessage * msg)
void NTCPSession::SendI2NPMessage (std::shared_ptr<I2NPMessage> msg)
{
m_Server.GetService ().post (std::bind (&NTCPSession::PostI2NPMessage, shared_from_this (), msg));
}
void NTCPSession::PostI2NPMessage (I2NPMessage * msg)
void NTCPSession::PostI2NPMessage (std::shared_ptr<I2NPMessage> msg)
{
if (msg)
{
if (m_IsTerminated)
{
DeleteI2NPMessage (msg);
return;
}
if (m_IsTerminated) return;
if (m_IsSending)
m_SendQueue.push_back (msg);
else
@ -707,19 +696,14 @@ namespace transport
}
}
void NTCPSession::SendI2NPMessages (const std::vector<I2NPMessage *>& msgs)
void NTCPSession::SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs)
{
m_Server.GetService ().post (std::bind (&NTCPSession::PostI2NPMessages, shared_from_this (), msgs));
}
void NTCPSession::PostI2NPMessages (std::vector<I2NPMessage *> msgs)
void NTCPSession::PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs)
{
if (m_IsTerminated)
{
for (auto it: msgs)
DeleteI2NPMessage (it);
return;
}
if (m_IsTerminated) return;
if (m_IsSending)
{
for (auto it: msgs)

@ -61,13 +61,13 @@ namespace transport
void ClientLogin ();
void ServerLogin ();
void SendI2NPMessage (I2NPMessage * msg);
void SendI2NPMessages (const std::vector<I2NPMessage *>& msgs);
void SendI2NPMessage (std::shared_ptr<I2NPMessage> msg);
void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs);
private:
void PostI2NPMessage (I2NPMessage * msg);
void PostI2NPMessages (std::vector<I2NPMessage *> msgs);
void PostI2NPMessage (std::shared_ptr<I2NPMessage> msg);
void PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs);
void Connected ();
void SendTimeSyncMessage ();
void SetIsEstablished (bool isEstablished) { m_IsEstablished = isEstablished; }
@ -96,10 +96,10 @@ namespace transport
void HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred);
bool DecryptNextBlock (const uint8_t * encrypted);
void Send (i2p::I2NPMessage * msg);
boost::asio::const_buffers_1 CreateMsgBuffer (I2NPMessage * msg);
void Send (const std::vector<I2NPMessage *>& msgs);
void HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector<I2NPMessage *> msgs);
void Send (std::shared_ptr<i2p::I2NPMessage> msg);
boost::asio::const_buffers_1 CreateMsgBuffer (std::shared_ptr<I2NPMessage> msg);
void Send (const std::vector<std::shared_ptr<I2NPMessage> >& msgs);
void HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector<std::shared_ptr<I2NPMessage> > msgs);
// timer
@ -131,7 +131,7 @@ namespace transport
i2p::I2NPMessagesHandler m_Handler;
bool m_IsSending;
std::vector<I2NPMessage *> m_SendQueue;
std::vector<std::shared_ptr<I2NPMessage> > m_SendQueue;
boost::asio::ip::address m_ConnectedFrom; // for ban
};

@ -294,13 +294,12 @@ namespace transport
ProcessFragments (buf);
}
void SSUData::Send (i2p::I2NPMessage * msg)
void SSUData::Send (std::shared_ptr<i2p::I2NPMessage> msg)
{
uint32_t msgID = msg->ToSSU ();
if (m_SentMessages.count (msgID) > 0)
{
LogPrint (eLogWarning, "SSU message ", msgID, " already sent");
DeleteI2NPMessage (msg);
return;
}
if (m_SentMessages.empty ()) // schedule resend at first message only
@ -368,7 +367,6 @@ namespace transport
len = 0;
fragmentNum++;
}
DeleteI2NPMessage (msg);
}
void SSUData::SendMsgAck (uint32_t msgID)

@ -89,7 +89,7 @@ namespace transport
void ProcessMessage (uint8_t * buf, size_t len);
void FlushReceivedMessage ();
void Send (i2p::I2NPMessage * msg);
void Send (std::shared_ptr<i2p::I2NPMessage> msg);
void UpdatePacketSize (const i2p::data::IdentHash& remoteIdent);

@ -262,7 +262,7 @@ namespace transport
if (paddingSize > 0) paddingSize = 16 - paddingSize;
payload += paddingSize;
// TODO: verify signature (need data from session request), payload points to signature
m_Data.Send (CreateDeliveryStatusMsg (0));
m_Data.Send (ToSharedI2NPMessage(CreateDeliveryStatusMsg (0)));
Established ();
}
@ -783,7 +783,7 @@ namespace transport
m_DHKeysPair = nullptr;
}
m_Data.Start ();
m_Data.Send (CreateDatabaseStoreMsg ());
m_Data.Send (ToSharedI2NPMessage(CreateDatabaseStoreMsg ()));
transports.PeerConnected (shared_from_this ());
if (m_PeerTest && (m_RemoteRouter && m_RemoteRouter->IsPeerTesting ()))
SendPeerTest ();
@ -832,39 +832,29 @@ namespace transport
}
}
void SSUSession::SendI2NPMessage (I2NPMessage * msg)
void SSUSession::SendI2NPMessage (std::shared_ptr<I2NPMessage> msg)
{
GetService ().post (std::bind (&SSUSession::PostI2NPMessage, shared_from_this (), msg));
}
void SSUSession::PostI2NPMessage (I2NPMessage * msg)
void SSUSession::PostI2NPMessage (std::shared_ptr<I2NPMessage> msg)
{
if (msg)
{
if (m_State == eSessionStateEstablished)
m_Data.Send (msg);
else
DeleteI2NPMessage (msg);
}
if (msg &&m_State == eSessionStateEstablished)
m_Data.Send (msg);
}
void SSUSession::SendI2NPMessages (const std::vector<I2NPMessage *>& msgs)
void SSUSession::SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs)
{
GetService ().post (std::bind (&SSUSession::PostI2NPMessages, shared_from_this (), msgs));
}
void SSUSession::PostI2NPMessages (std::vector<I2NPMessage *> msgs)
void SSUSession::PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs)
{
if (m_State == eSessionStateEstablished)
{
for (auto it: msgs)
if (it) m_Data.Send (it);
}
else
{
for (auto it: msgs)
DeleteI2NPMessage (it);
}
}
void SSUSession::ProcessData (uint8_t * buf, size_t len)

@ -76,8 +76,8 @@ namespace transport
void Done ();
boost::asio::ip::udp::endpoint& GetRemoteEndpoint () { return m_RemoteEndpoint; };
bool IsV6 () const { return m_RemoteEndpoint.address ().is_v6 (); };
void SendI2NPMessage (I2NPMessage * msg);
void SendI2NPMessages (const std::vector<I2NPMessage *>& msgs);
void SendI2NPMessage (std::shared_ptr<I2NPMessage> msg);
void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs);
void SendPeerTest (); // Alice
SessionState GetState () const { return m_State; };
@ -95,8 +95,8 @@ namespace transport
boost::asio::io_service& GetService ();
void CreateAESandMacKey (const uint8_t * pubKey);
void PostI2NPMessage (I2NPMessage * msg);
void PostI2NPMessages (std::vector<I2NPMessage *> msgs);
void PostI2NPMessage (std::shared_ptr<I2NPMessage> msg);
void PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs);
void ProcessMessage (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); // call for established session
void ProcessSessionRequest (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint);
void SendSessionRequest ();

@ -71,8 +71,8 @@ namespace transport
size_t GetNumSentBytes () const { return m_NumSentBytes; };
size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; };
virtual void SendI2NPMessage (I2NPMessage * msg) = 0;
virtual void SendI2NPMessages (const std::vector<I2NPMessage *>& msgs) = 0;
virtual void SendI2NPMessage (std::shared_ptr<I2NPMessage> msg) = 0;
virtual void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) = 0;
protected:

@ -255,11 +255,17 @@ namespace transport
}
}
if (!it->second.sessions.empty ())
it->second.sessions.front ()->SendI2NPMessages (msgs);
{
// TODO: remove this copy operation later
std::vector<std::shared_ptr<i2p::I2NPMessage> > msgs1;
for (auto it1: msgs)
msgs1.push_back (ToSharedI2NPMessage(it1));
it->second.sessions.front ()->SendI2NPMessages (msgs1);
}
else
{
for (auto it1: msgs)
it->second.delayedMessages.push_back (it1);
it->second.delayedMessages.push_back (ToSharedI2NPMessage(it1));
}
}

@ -62,19 +62,13 @@ namespace transport
std::shared_ptr<const i2p::data::RouterInfo> router;
std::list<std::shared_ptr<TransportSession> > sessions;
uint64_t creationTime;
std::vector<i2p::I2NPMessage *> delayedMessages;
std::vector<std::shared_ptr<i2p::I2NPMessage> > delayedMessages;
void Done ()
{
for (auto it: sessions)
it->Done ();
}
~Peer ()
{
for (auto it :delayedMessages)
i2p::DeleteI2NPMessage (it);
}
};
const size_t SESSION_CREATION_TIMEOUT = 10; // in seconds

Loading…
Cancel
Save