diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index aad39db8..322285c3 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -68,7 +68,8 @@ namespace stream Stream::Stream (boost::asio::io_service& service, StreamingDestination& local, std::shared_ptr remote, int port): m_Service (service), - m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), + m_SendStreamID (0), m_SequenceNumber (0), + m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), @@ -81,7 +82,8 @@ namespace stream } Stream::Stream (boost::asio::io_service& service, StreamingDestination& local): - m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), + m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), + m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_RTT (INITIAL_RTT), @@ -404,6 +406,8 @@ namespace stream LogPrint (eLogError, "Streaming: Unexpected ackThrough=", ackThrough, " > seqn=", m_SequenceNumber); return; } + int rttSample = INT_MAX; + bool firstRttSample = false; int nackCount = packet->GetNACKCount (); for (auto it = m_SentPackets.begin (); it != m_SentPackets.end ();) { @@ -430,39 +434,48 @@ namespace stream int64_t rtt = (int64_t)ts - (int64_t)sentPacket->sendTime; if (rtt < 0) LogPrint (eLogError, "Streaming: Packet ", seqn, "sent from the future, sendTime=", sentPacket->sendTime); - bool rttUpdated = true; if (!seqn) - m_RTT = rtt < 0 ? 1 : rtt; - else if (!sentPacket->resent && rtt >= 0) - m_RTT = RTT_EWMA_ALPHA * rtt + (1.0 - RTT_EWMA_ALPHA) * m_RTT; - else - rttUpdated = false; - if (rttUpdated) - m_RTO = m_RTT * 1.5; // TODO: implement it better + { + firstRttSample = true; + rttSample = rtt < 0 ? 1 : rtt; + } + else if (!sentPacket->resent && seqn > m_TunnelsChangeSequenceNumber && rtt >= 0) + rttSample = std::min (rttSample, (int)rtt); LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime); m_SentPackets.erase (it++); m_LocalDestination.DeletePacket (sentPacket); acknowledged = true; if (m_WindowSize < WINDOW_SIZE) m_WindowSize++; // slow start - else - { - // linear growth - if (ts > m_LastWindowSizeIncreaseTime + m_RTT) - { - m_WindowSize++; - if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE; - m_LastWindowSizeIncreaseTime = ts; - } - } - if (!seqn && m_RoutingSession) // first message confirmed - m_RoutingSession->SetSharedRoutingPath ( - std::make_shared ( - i2p::garlic::GarlicRoutingPath{m_CurrentOutboundTunnel, m_CurrentRemoteLease, (int)m_RTT, 0, 0})); } else break; } + if (rttSample != INT_MAX) + { + if (firstRttSample) + m_RTT = rttSample; + else + m_RTT = RTT_EWMA_ALPHA * rttSample + (1.0 - RTT_EWMA_ALPHA) * m_RTT; + bool wasInitial = m_RTO == INITIAL_RTO; + m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5)); // TODO: implement it better + if (wasInitial) + ScheduleResend (); + } + if (acknowledged && m_WindowSize >= WINDOW_SIZE) + { + // linear growth + if (ts > m_LastWindowSizeIncreaseTime + m_RTT) + { + m_WindowSize++; + if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE; + m_LastWindowSizeIncreaseTime = ts; + } + } + if (firstRttSample && m_RoutingSession) + m_RoutingSession->SetSharedRoutingPath ( + std::make_shared ( + i2p::garlic::GarlicRoutingPath{m_CurrentOutboundTunnel, m_CurrentRemoteLease, (int)m_RTT, 0, 0})); if (m_SentPackets.empty ()) m_ResendTimer.cancel (); if (acknowledged) @@ -882,7 +895,7 @@ namespace stream m_CurrentOutboundTunnel = routingPath->outboundTunnel; m_CurrentRemoteLease = routingPath->remoteLease; m_RTT = routingPath->rtt; - m_RTO = m_RTT*1.5; // TODO: implement it better + m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5)); // TODO: implement it better } } @@ -1000,7 +1013,8 @@ namespace stream if (packets.size () > 0) { m_NumResendAttempts++; - m_RTO *= 2; + if (m_RTO != INITIAL_RTO) + m_RTO *= 2; switch (m_NumResendAttempts) { case 1: // congestion avoidance @@ -1008,6 +1022,7 @@ namespace stream if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; break; case 2: + m_TunnelsChangeSequenceNumber = m_SequenceNumber; m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change first time #if (__cplusplus >= 201703L) // C++ 17 or higher [[fallthrough]]; @@ -1020,6 +1035,7 @@ namespace stream break; case 3: // pick another outbound tunnel + m_TunnelsChangeSequenceNumber = m_SequenceNumber; if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); LogPrint (eLogWarning, "Streaming: Another outbound tunnel has been selected for stream with sSID=", m_SendStreamID); diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index 9865ba95..7e12b8cb 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -58,6 +58,7 @@ namespace stream const int MAX_WINDOW_SIZE = 128; const int WINDOW_SIZE_DROP_FRACTION = 10; // 1/10 const double RTT_EWMA_ALPHA = 0.125; + const int MIN_RTO = 20; // in milliseconds const int INITIAL_RTT = 8000; // in milliseconds const int INITIAL_RTO = 9000; // in milliseconds const int MIN_SEND_ACK_TIMEOUT = 2; // in milliseconds @@ -237,6 +238,7 @@ namespace stream boost::asio::io_service& m_Service; uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber; + uint32_t m_TunnelsChangeSequenceNumber; int32_t m_LastReceivedSequenceNumber; StreamStatus m_Status; bool m_IsAckSendScheduled;