From f8ced20d1c7267826f9976e4190a49119f3fb878 Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 10 Aug 2014 18:27:23 -0400 Subject: [PATCH] acks and resends --- Streaming.cpp | 71 ++++++++++++++++++++++++++++++++++++++++++++++----- Streaming.h | 9 ++++++- 2 files changed, 72 insertions(+), 8 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index 774f475f..0df28a7e 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -21,7 +21,7 @@ namespace stream const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false), m_LeaseSetUpdated (true), m_LocalDestination (local), - m_RemoteLeaseSet (&remote), m_ReceiveTimer (m_Service) + m_RemoteLeaseSet (&remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); UpdateCurrentRemoteLease (); @@ -30,7 +30,7 @@ namespace stream Stream::Stream (boost::asio::io_service& service, StreamingDestination * local): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false), m_LeaseSetUpdated (true), m_LocalDestination (local), - m_RemoteLeaseSet (nullptr), m_ReceiveTimer (m_Service) + m_RemoteLeaseSet (nullptr), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); } @@ -38,6 +38,7 @@ namespace stream Stream::~Stream () { m_ReceiveTimer.cancel (); + m_ResendTimer.cancel (); while (!m_ReceiveQueue.empty ()) { auto packet = m_ReceiveQueue.front (); @@ -46,6 +47,8 @@ namespace stream } for (auto it: m_SavedPackets) delete it; + for (auto it: m_SentPackets) + delete it; } void Stream::HandleNextPacket (Packet * packet) @@ -53,6 +56,9 @@ namespace stream if (!m_SendStreamID) m_SendStreamID = packet->GetReceiveStreamID (); + if (!packet->IsNoAck ()) // ack received + ProcessAck (packet); + int32_t receivedSeqn = packet->GetSeqn (); bool isSyn = packet->IsSYN (); if (!receivedSeqn && !isSyn) @@ -172,9 +178,30 @@ namespace stream m_ReceiveTimer.cancel (); } } + + void Stream::ProcessAck (Packet * packet) + { + uint32_t ackThrough = packet->GetAckThrough (); + // TODO: handle NACKs + for (auto it = m_SentPackets.begin (); it != m_SentPackets.end ();) + { + if ((*it)->GetSeqn () <= ackThrough) + { + auto sentPacket = *it; + LogPrint ("Packet ", sentPacket->GetSeqn (), " acknowledged"); + m_SentPackets.erase (it++); + delete sentPacket; + } + else + break; + } + if (m_SentPackets.empty ()) + m_ResendTimer.cancel (); + } size_t Stream::Send (const uint8_t * buf, size_t len, int timeout) { + bool isNoAck = m_LastReceivedSequenceNumber < 0; // first packet Packet * p = new Packet (); uint8_t * packet = p->GetBuffer (); // TODO: implement setters @@ -185,7 +212,10 @@ namespace stream size += 4; // receiveStreamID *(uint32_t *)(packet + size) = htobe32 (m_SequenceNumber++); size += 4; // sequenceNum - *(uint32_t *)(packet + size) = 0; // TODO + if (isNoAck) + *(uint32_t *)(packet + size) = htobe32 (m_LastReceivedSequenceNumber); + else + *(uint32_t *)(packet + size) = 0; size += 4; // ack Through packet[size] = 0; size++; // NACK count @@ -194,9 +224,10 @@ namespace stream { // initial packet m_IsOpen = true; - *(uint16_t *)(packet + size) = htobe16 (PACKET_FLAG_SYNCHRONIZE | - PACKET_FLAG_FROM_INCLUDED | PACKET_FLAG_SIGNATURE_INCLUDED | - PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED | PACKET_FLAG_NO_ACK); + uint16_t flags = PACKET_FLAG_SYNCHRONIZE | PACKET_FLAG_FROM_INCLUDED | + PACKET_FLAG_SIGNATURE_INCLUDED | PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED; + if (isNoAck) flags |= PACKET_FLAG_NO_ACK; + *(uint16_t *)(packet + size) = htobe16 (flags); size += 2; // flags *(uint16_t *)(packet + size) = htobe16 (i2p::data::DEFAULT_IDENTITY_SIZE + 40 + 2); // identity + signature + packet size size += 2; // options size @@ -310,7 +341,15 @@ namespace stream if (packet) { bool ret = SendPacket (packet->GetBuffer (), packet->GetLength ()); - delete packet; + if (ret) + { + bool isEmpty = m_SentPackets.empty (); + m_SentPackets.insert (packet); + if (isEmpty) + ScheduleResend (); + } + else + delete packet; return ret; } else @@ -364,6 +403,24 @@ namespace stream return false; } + void Stream::ScheduleResend () + { + m_ResendTimer.cancel (); + m_ResendTimer.expires_from_now (boost::posix_time::seconds(RESEND_TIMEOUT)); + m_ResendTimer.async_wait (boost::bind (&Stream::HandleResendTimer, + this, boost::asio::placeholders::error)); + } + + void Stream::HandleResendTimer (const boost::system::error_code& ecode) + { + if (ecode != boost::asio::error::operation_aborted) + { + for (auto it : m_SentPackets) + SendPacket (it->GetBuffer (), it->GetLength ()); + ScheduleResend (); + } + } + void Stream::UpdateCurrentRemoteLease () { if (!m_RemoteLeaseSet) diff --git a/Streaming.h b/Streaming.h index 288332e7..42df5278 100644 --- a/Streaming.h +++ b/Streaming.h @@ -36,6 +36,7 @@ namespace stream const size_t STREAMING_MTU = 1730; const size_t MAX_PACKET_SIZE = 4096; const size_t COMPRESSION_THRESHOLD_SIZE = 66; + const int RESEND_TIMEOUT = 10; // in seconds struct Packet { @@ -58,6 +59,7 @@ namespace stream const uint8_t * GetPayload () const { return GetOptionData () + GetOptionSize (); }; bool IsSYN () const { return GetFlags () & PACKET_FLAG_SYNCHRONIZE; }; + bool IsNoAck () const { return GetFlags () & PACKET_FLAG_NO_ACK; }; }; struct PacketCmp @@ -102,6 +104,7 @@ namespace stream void SavePacket (Packet * packet); void ProcessPacket (Packet * packet); + void ProcessAck (Packet * packet); size_t ConcatenatePackets (uint8_t * buf, size_t len); void UpdateCurrentRemoteLease (); @@ -109,6 +112,9 @@ namespace stream template void HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler); + void ScheduleResend (); + void HandleResendTimer (const boost::system::error_code& ecode); + private: boost::asio::io_service& m_Service; @@ -121,7 +127,8 @@ namespace stream i2p::data::Lease m_CurrentRemoteLease; std::queue m_ReceiveQueue; std::set m_SavedPackets; - boost::asio::deadline_timer m_ReceiveTimer; + std::set m_SentPackets; + boost::asio::deadline_timer m_ReceiveTimer, m_ResendTimer; }; class StreamingDestination: public i2p::data::LocalDestination