From cf77be0eeb403155a1b3c440f3709add13e6f001 Mon Sep 17 00:00:00 2001 From: Vort Date: Tue, 19 Mar 2024 06:37:21 +0200 Subject: [PATCH 1/4] add lower limit for stream RTO --- libi2pd/Streaming.cpp | 4 ++-- libi2pd/Streaming.h | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index aad39db8..bc165478 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -438,7 +438,7 @@ namespace stream else rttUpdated = false; if (rttUpdated) - m_RTO = m_RTT * 1.5; // TODO: implement it better + m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5)); // TODO: implement it better LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime); m_SentPackets.erase (it++); m_LocalDestination.DeletePacket (sentPacket); @@ -882,7 +882,7 @@ namespace stream m_CurrentOutboundTunnel = routingPath->outboundTunnel; m_CurrentRemoteLease = routingPath->remoteLease; m_RTT = routingPath->rtt; - m_RTO = m_RTT*1.5; // TODO: implement it better + m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5)); // TODO: implement it better } } diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index 9865ba95..35f642bc 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -58,6 +58,7 @@ namespace stream const int MAX_WINDOW_SIZE = 128; const int WINDOW_SIZE_DROP_FRACTION = 10; // 1/10 const double RTT_EWMA_ALPHA = 0.125; + const int MIN_RTO = 20; // in milliseconds const int INITIAL_RTT = 8000; // in milliseconds const int INITIAL_RTO = 9000; // in milliseconds const int MIN_SEND_ACK_TIMEOUT = 2; // in milliseconds From 83f0b9c0418d6f96950dd7f2c85505b77d360b58 Mon Sep 17 00:00:00 2001 From: Vort Date: Tue, 19 Mar 2024 08:28:34 +0200 Subject: [PATCH 2/4] extract single RTT sample from stream ACK --- libi2pd/Streaming.cpp | 50 +++++++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index bc165478..a190f1fb 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -404,6 +404,8 @@ namespace stream LogPrint (eLogError, "Streaming: Unexpected ackThrough=", ackThrough, " > seqn=", m_SequenceNumber); return; } + int rttSample = INT_MAX; + bool firstRttSample = false; int nackCount = packet->GetNACKCount (); for (auto it = m_SentPackets.begin (); it != m_SentPackets.end ();) { @@ -430,39 +432,45 @@ namespace stream int64_t rtt = (int64_t)ts - (int64_t)sentPacket->sendTime; if (rtt < 0) LogPrint (eLogError, "Streaming: Packet ", seqn, "sent from the future, sendTime=", sentPacket->sendTime); - bool rttUpdated = true; if (!seqn) - m_RTT = rtt < 0 ? 1 : rtt; + { + firstRttSample = true; + rttSample = rtt < 0 ? 1 : rtt; + } else if (!sentPacket->resent && rtt >= 0) - m_RTT = RTT_EWMA_ALPHA * rtt + (1.0 - RTT_EWMA_ALPHA) * m_RTT; - else - rttUpdated = false; - if (rttUpdated) - m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5)); // TODO: implement it better + rttSample = std::min (rttSample, (int)rtt); LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime); m_SentPackets.erase (it++); m_LocalDestination.DeletePacket (sentPacket); acknowledged = true; if (m_WindowSize < WINDOW_SIZE) m_WindowSize++; // slow start - else - { - // linear growth - if (ts > m_LastWindowSizeIncreaseTime + m_RTT) - { - m_WindowSize++; - if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE; - m_LastWindowSizeIncreaseTime = ts; - } - } - if (!seqn && m_RoutingSession) // first message confirmed - m_RoutingSession->SetSharedRoutingPath ( - std::make_shared ( - i2p::garlic::GarlicRoutingPath{m_CurrentOutboundTunnel, m_CurrentRemoteLease, (int)m_RTT, 0, 0})); } else break; } + if (rttSample != INT_MAX) + { + if (firstRttSample) + m_RTT = rttSample; + else + m_RTT = RTT_EWMA_ALPHA * rttSample + (1.0 - RTT_EWMA_ALPHA) * m_RTT; + m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5)); // TODO: implement it better + } + if (acknowledged && m_WindowSize >= WINDOW_SIZE) + { + // linear growth + if (ts > m_LastWindowSizeIncreaseTime + m_RTT) + { + m_WindowSize++; + if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE; + m_LastWindowSizeIncreaseTime = ts; + } + } + if (firstRttSample && m_RoutingSession) + m_RoutingSession->SetSharedRoutingPath ( + std::make_shared ( + i2p::garlic::GarlicRoutingPath{m_CurrentOutboundTunnel, m_CurrentRemoteLease, (int)m_RTT, 0, 0})); if (m_SentPackets.empty ()) m_ResendTimer.cancel (); if (acknowledged) From 4f8f3a386f6d278f598d65327c789dc008dd1299 Mon Sep 17 00:00:00 2001 From: Vort Date: Tue, 19 Mar 2024 08:43:49 +0200 Subject: [PATCH 3/4] restart stream resend timer after updating initial RTO --- libi2pd/Streaming.cpp | 13 ++++++++++--- libi2pd/Streaming.h | 1 + 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index a190f1fb..575d5b5c 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -68,7 +68,8 @@ namespace stream Stream::Stream (boost::asio::io_service& service, StreamingDestination& local, std::shared_ptr remote, int port): m_Service (service), - m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), + m_SendStreamID (0), m_SequenceNumber (0), + m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), @@ -81,7 +82,8 @@ namespace stream } Stream::Stream (boost::asio::io_service& service, StreamingDestination& local): - m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), + m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), + m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_RTT (INITIAL_RTT), @@ -437,7 +439,7 @@ namespace stream firstRttSample = true; rttSample = rtt < 0 ? 1 : rtt; } - else if (!sentPacket->resent && rtt >= 0) + else if (!sentPacket->resent && seqn > m_TunnelsChangeSequenceNumber && rtt >= 0) rttSample = std::min (rttSample, (int)rtt); LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime); m_SentPackets.erase (it++); @@ -455,7 +457,10 @@ namespace stream m_RTT = rttSample; else m_RTT = RTT_EWMA_ALPHA * rttSample + (1.0 - RTT_EWMA_ALPHA) * m_RTT; + bool wasInitial = m_RTO == INITIAL_RTO; m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5)); // TODO: implement it better + if (wasInitial) + ScheduleResend (); } if (acknowledged && m_WindowSize >= WINDOW_SIZE) { @@ -1016,6 +1021,7 @@ namespace stream if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; break; case 2: + m_TunnelsChangeSequenceNumber = m_SequenceNumber; m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change first time #if (__cplusplus >= 201703L) // C++ 17 or higher [[fallthrough]]; @@ -1028,6 +1034,7 @@ namespace stream break; case 3: // pick another outbound tunnel + m_TunnelsChangeSequenceNumber = m_SequenceNumber; if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); LogPrint (eLogWarning, "Streaming: Another outbound tunnel has been selected for stream with sSID=", m_SendStreamID); diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index 35f642bc..7e12b8cb 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -238,6 +238,7 @@ namespace stream boost::asio::io_service& m_Service; uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber; + uint32_t m_TunnelsChangeSequenceNumber; int32_t m_LastReceivedSequenceNumber; StreamStatus m_Status; bool m_IsAckSendScheduled; From a703d318939a3430e0d4e5c8205379ae288524cc Mon Sep 17 00:00:00 2001 From: Vort Date: Tue, 19 Mar 2024 10:48:42 +0200 Subject: [PATCH 4/4] don't double initial RTO --- libi2pd/Streaming.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index 575d5b5c..322285c3 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -1013,7 +1013,8 @@ namespace stream if (packets.size () > 0) { m_NumResendAttempts++; - m_RTO *= 2; + if (m_RTO != INITIAL_RTO) + m_RTO *= 2; switch (m_NumResendAttempts) { case 1: // congestion avoidance