diff --git a/ChangeLog b/ChangeLog index 6534a9b5..f7be41ab 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,6 +1,67 @@ # for this file format description, # see https://github.com/olivierlacan/keep-a-changelog +## [2.51.0] - 2024-04-06 +### Added +- Non-blocking mode for UDP sockets +- Set SSU2 socket buffer size based on bandwidth limit +- Encrypted tunnel tests +- Support for multiple UDP server tunnels on one destination +- Publish medium congestion indication +- Local domain sockets for SOCKS proxy upstream +- Tunnel status "declined" in web console +- SAM error reply "Incompatible crypto" if remote destination has incompatible crypto +- Reduce amount of traffic by handling local message drops +- Keep SSU2 socket open even if it fails to bind +- Lower SSU2 resend traffic spikes +- Expiration for messages in SSU2 send queue +- Use EWMA for stream RTT estimation +- Request choking delay if too many NACKs in stream +- Allow 0ms latency for tunnel +- Randomize tunnels selection for tests +### Changed +- Upstream SOCKS proxy from SOCKS4 to SOCKS5 +- Transit tunnels limit to 4 bytes. Default value to 10K +- Reply CANT_REACH_PEER if connect to ourselves in SAM +- Don't send already expired I2NP messages +- Use monotonic timer to measure tunnel test latency +- Standard NTCP2 frame doesn't exceed 16K +- Always send request through tunnels in case of restricted routes +- Don't delete connected routers from NetDb +- Send lookup reply directly to reply tunnel gateway if possible +- Reduce unreachable router ban interval to 8 minutes +- Don't request banned routers / don't try to connect to unreachable router +- Consider 'M' routers as low bandwidth +- Limit minimal received SSU2 packet size to 40 bytes +- Bob picks peer test session only if Charlie's address supports peer testing +- Reject peer test msg 2 if peer testing is not supported +- Don't request termination if SSU2 session was not established +- Set maximum SSU2 queue size depending on RTT value +- New streaming RTT calculation algorithm +- Don't double initial RTO for streams when changing tunnels +- Restore failed tunnel if test or data for inbound tunnel received +- Don't fail last remaining tunnel in pool +- Publish LeasetSet again if local destination was not ready or no tunnels +- Make more attempts to pick high bandwidth hop for client tunnel +- Reduced SSU2 session termination timeout to 165 seconds +- Reseeds list +### Fixed +- ECIESx25519 symmetric key tagset early expiration +- Encrypted LeaseSet lookup +- Outbound tunnel build fails if it's endpoint is the same as reply tunnel gateway +- I2PControl RouterManager returns invalid JSON when unknown params are passed +- Mix of data between different UDP sessions on the same server +- TARGET_OS_SIMULATOR check +- Handling of "reservedrange" param +- New NTCP2 session gets teminated upon termination of old one +- New SSU2 session gets teminated upon termination of old one +- Peer test to non-supporting router +- Streaming ackThrough off 1 if number of NACKs exceeds 255 +- Race condition in ECIESx25519 tags table +- Good tunnel becomes failed +- Crash when packet comes to terminated stream +- Stream hangs during LeaseSet update + ## [2.50.2] - 2024-01-06 ###Fixed - Crash with OpenSSL 3.2.0 diff --git a/Makefile.bsd b/Makefile.bsd index 00543193..14449381 100644 --- a/Makefile.bsd +++ b/Makefile.bsd @@ -6,7 +6,12 @@ CXXFLAGS ?= ${CXX_DEBUG} -Wall -Wextra -Wno-unused-parameter -pedantic -Wno-misl ## (e.g. -fstack-protector-strong -Wformat -Werror=format-security), we do not want to remove ## -std=c++11. If you want to remove this variable please do so in a way that allows setting ## custom FLAGS to work at build-time. -NEEDED_CXXFLAGS = -std=c++11 +CXXVER := $(shell $(CXX) -dumpversion) +ifeq (${CXXVER}, "4.2.1") # older clang always returned 4.2.1 + NEEDED_CXXFLAGS = -std=c++11 +else # newer versions support C++17 + NEEDED_CXXFLAGS = -std=c++17 +endif DEFINES = -D_GLIBCXX_USE_NANOSLEEP=1 INCFLAGS = -I/usr/include/ -I/usr/local/include/ LDFLAGS = ${LD_DEBUG} -Wl,-rpath,/usr/local/lib -L/usr/local/lib diff --git a/Win32/Win32App.cpp b/Win32/Win32App.cpp index 9f750c4c..742ad30d 100644 --- a/Win32/Win32App.cpp +++ b/Win32/Win32App.cpp @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2022, The PurpleI2P Project +* Copyright (c) 2013-2024, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -145,7 +145,7 @@ namespace win32 s << bytes << " Bytes\n"; } - static void ShowNetworkStatus (std::stringstream& s, RouterStatus status, bool testing) + static void ShowNetworkStatus (std::stringstream& s, RouterStatus status, bool testing, RouterError error) { switch (status) { @@ -158,18 +158,24 @@ namespace win32 }; if (testing) s << " (Test)"; - if (i2p::context.GetError () != eRouterErrorNone) + if (error != eRouterErrorNone) { - switch (i2p::context.GetError ()) + switch (error) { case eRouterErrorClockSkew: - s << " - Clock skew"; + s << " - " << tr("Clock skew"); break; case eRouterErrorOffline: - s << " - Offline"; + s << " - " << tr("Offline"); break; case eRouterErrorSymmetricNAT: - s << " - Symmetric NAT"; + s << " - " << tr("Symmetric NAT"); + break; + case eRouterErrorFullConeNAT: + s << " - " << tr("Full cone NAT"); + break; + case eRouterErrorNoDescriptors: + s << " - " << tr("No Descriptors"); break; default: ; } @@ -180,11 +186,11 @@ namespace win32 { s << "\n"; s << "Status: "; - ShowNetworkStatus (s, i2p::context.GetStatus (), i2p::context.GetTesting ()); + ShowNetworkStatus (s, i2p::context.GetStatus (), i2p::context.GetTesting(), i2p::context.GetError ()); if (i2p::context.SupportsV6 ()) { s << " / "; - ShowNetworkStatus (s, i2p::context.GetStatusV6 (), i2p::context.GetTestingV6 ()); + ShowNetworkStatus (s, i2p::context.GetStatusV6 (), i2p::context.GetTestingV6(), i2p::context.GetErrorV6 ()); } s << "; "; s << "Success Rate: " << i2p::tunnel::tunnels.GetTunnelCreationSuccessRate() << "%\n"; diff --git a/contrib/rpm/i2pd-git.spec b/contrib/rpm/i2pd-git.spec index 0bc46eab..e43d751b 100644 --- a/contrib/rpm/i2pd-git.spec +++ b/contrib/rpm/i2pd-git.spec @@ -1,7 +1,7 @@ %define git_hash %(git rev-parse HEAD | cut -c -7) Name: i2pd-git -Version: 2.50.2 +Version: 2.51.0 Release: git%{git_hash}%{?dist} Summary: I2P router written in C++ Conflicts: i2pd @@ -144,6 +144,9 @@ getent passwd i2pd >/dev/null || \ %changelog +* Sat Apr 06 2024 orignal - 2.51.0 +- update to 2.51.0 + * Sat Jan 06 2024 orignal - 2.50.2 - update to 2.50.2 diff --git a/contrib/rpm/i2pd.spec b/contrib/rpm/i2pd.spec index 1de4076e..9e270708 100644 --- a/contrib/rpm/i2pd.spec +++ b/contrib/rpm/i2pd.spec @@ -1,5 +1,5 @@ Name: i2pd -Version: 2.50.2 +Version: 2.51.0 Release: 1%{?dist} Summary: I2P router written in C++ Conflicts: i2pd-git @@ -142,6 +142,9 @@ getent passwd i2pd >/dev/null || \ %changelog +* Sat Apr 06 2024 orignal - 2.51.0 +- update to 2.51.0 + * Sat Jan 06 2024 orignal - 2.50.2 - update to 2.50.2 diff --git a/daemon/I2PControl.cpp b/daemon/I2PControl.cpp index da2443fd..e2ce7be4 100644 --- a/daemon/I2PControl.cpp +++ b/daemon/I2PControl.cpp @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2022, The PurpleI2P Project +* Copyright (c) 2013-2024, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -338,10 +338,11 @@ namespace client { for (auto it = params.begin (); it != params.end (); it++) { - if (it != params.begin ()) results << ","; LogPrint (eLogDebug, "I2PControl: RouterManager request: ", it->first); auto it1 = m_RouterManagerHandlers.find (it->first); - if (it1 != m_RouterManagerHandlers.end ()) { + if (it1 != m_RouterManagerHandlers.end ()) + { + if (it != params.begin ()) results << ","; (this->*(it1->second))(results); } else LogPrint (eLogError, "I2PControl: RouterManager unknown request: ", it->first); diff --git a/debian/changelog b/debian/changelog index 56ab7c16..9a410c14 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,8 +1,14 @@ -i2pd (2.50.2) unstable; urgency=medium +i2pd (2.51.0-1) unstable; urgency=medium + + * updated to version 2.51.0/0.9.62 + + -- orignal Sat, 06 Apr 2024 16:00:00 +0000 + +i2pd (2.50.2-1) unstable; urgency=medium * updated to version 2.50.2/0.9.61 --- orignal Sat, 06 Jan 2024 16:00:00 +0000 + -- orignal Sat, 06 Jan 2024 16:00:00 +0000 i2pd (2.50.1-1) unstable; urgency=medium diff --git a/libi2pd/ECIESX25519AEADRatchetSession.cpp b/libi2pd/ECIESX25519AEADRatchetSession.cpp index b4730d0e..a51f047e 100644 --- a/libi2pd/ECIESX25519AEADRatchetSession.cpp +++ b/libi2pd/ECIESX25519AEADRatchetSession.cpp @@ -863,7 +863,7 @@ namespace garlic payloadLen += msg->GetPayloadLength () + 13; if (m_Destination) payloadLen += 32; } - if (GetLeaseSetUpdateStatus () == eLeaseSetSubmitted && ts > GetLeaseSetSubmissionTime () + LEASET_CONFIRMATION_TIMEOUT) + if (GetLeaseSetUpdateStatus () == eLeaseSetSubmitted && ts > GetLeaseSetSubmissionTime () + LEASESET_CONFIRMATION_TIMEOUT) { // resubmit non-confirmed LeaseSet SetLeaseSetUpdateStatus (eLeaseSetUpdated); diff --git a/libi2pd/FS.cpp b/libi2pd/FS.cpp index d38bcc2f..233f5766 100644 --- a/libi2pd/FS.cpp +++ b/libi2pd/FS.cpp @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2022, The PurpleI2P Project +* Copyright (c) 2013-2024, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -9,6 +9,11 @@ #include #include +#if defined(MAC_OSX) +#include +#include +#endif + #ifdef _WIN32 #include #include @@ -251,8 +256,22 @@ namespace fs { auto p = root + i2p::fs::dirSep + prefix1 + chars[i]; if (boost::filesystem::exists(p)) continue; - if (boost::filesystem::create_directory(p)) +#if TARGET_OS_SIMULATOR + // ios simulator fs says it is case sensitive, but it is not + boost::system::error_code ec; + if (boost::filesystem::create_directory(p, ec)) + continue; + switch (ec.value()) { + case boost::system::errc::file_exists: + case boost::system::errc::success: + continue; + default: + throw boost::system::system_error( ec, __func__ ); + } +#else + if (boost::filesystem::create_directory(p)) continue; /* ^ throws exception on failure */ +#endif return false; } return true; diff --git a/libi2pd/Garlic.cpp b/libi2pd/Garlic.cpp index dd66af60..d5ea6730 100644 --- a/libi2pd/Garlic.cpp +++ b/libi2pd/Garlic.cpp @@ -80,7 +80,7 @@ namespace garlic void GarlicRoutingSession::CleanupUnconfirmedLeaseSet (uint64_t ts) { - if (m_LeaseSetUpdateMsgID && ts*1000LL > m_LeaseSetSubmissionTime + LEASET_CONFIRMATION_TIMEOUT) + if (m_LeaseSetUpdateMsgID && ts*1000LL > m_LeaseSetSubmissionTime + LEASESET_CONFIRMATION_TIMEOUT) { if (GetOwner ()) GetOwner ()->RemoveDeliveryStatusSession (m_LeaseSetUpdateMsgID); @@ -232,7 +232,7 @@ namespace garlic if (GetOwner ()) { // resubmit non-confirmed LeaseSet - if (GetLeaseSetUpdateStatus () == eLeaseSetSubmitted && ts > GetLeaseSetSubmissionTime () + LEASET_CONFIRMATION_TIMEOUT) + if (GetLeaseSetUpdateStatus () == eLeaseSetSubmitted && ts > GetLeaseSetSubmissionTime () + LEASESET_CONFIRMATION_TIMEOUT) { SetLeaseSetUpdateStatus (eLeaseSetUpdated); SetSharedRoutingPath (nullptr); // invalidate path since leaseset was not confirmed diff --git a/libi2pd/Garlic.h b/libi2pd/Garlic.h index 83e3b050..2c40f74a 100644 --- a/libi2pd/Garlic.h +++ b/libi2pd/Garlic.h @@ -50,7 +50,7 @@ namespace garlic const int INCOMING_TAGS_EXPIRATION_TIMEOUT = 960; // 16 minutes const int OUTGOING_TAGS_EXPIRATION_TIMEOUT = 720; // 12 minutes const int OUTGOING_TAGS_CONFIRMATION_TIMEOUT = 10; // 10 seconds - const int LEASET_CONFIRMATION_TIMEOUT = 4000; // in milliseconds + const int LEASESET_CONFIRMATION_TIMEOUT = 4000; // in milliseconds const int ROUTING_PATH_EXPIRATION_TIMEOUT = 30; // 30 seconds const int ROUTING_PATH_MAX_NUM_TIMES_USED = 100; // how many times might be used diff --git a/libi2pd/I2NPProtocol.h b/libi2pd/I2NPProtocol.h index b1e2d170..f110100e 100644 --- a/libi2pd/I2NPProtocol.h +++ b/libi2pd/I2NPProtocol.h @@ -152,6 +152,9 @@ namespace tunnel const size_t I2NP_MAX_MESSAGE_SIZE = 62708; const size_t I2NP_MAX_SHORT_MESSAGE_SIZE = 4096; const size_t I2NP_MAX_MEDIUM_MESSAGE_SIZE = 16384; + const unsigned int I2NP_MESSAGE_LOCAL_EXPIRATION_TIMEOUT_FACTOR = 3; // multiples of RTT + const unsigned int I2NP_MESSAGE_LOCAL_EXPIRATION_TIMEOUT_MIN = 200000; // in microseconds + const unsigned int I2NP_MESSAGE_LOCAL_EXPIRATION_TIMEOUT_MAX = 2000000; // in microseconds const unsigned int I2NP_MESSAGE_EXPIRATION_TIMEOUT = 8000; // in milliseconds (as initial RTT) const unsigned int I2NP_MESSAGE_CLOCK_SKEW = 60*1000; // 1 minute in milliseconds @@ -161,9 +164,10 @@ namespace tunnel size_t len, offset, maxLen; std::shared_ptr from; std::function onDrop; - - I2NPMessage (): buf (nullptr),len (I2NP_HEADER_SIZE + 2), - offset(2), maxLen (0), from (nullptr) {}; // reserve 2 bytes for NTCP header + uint64_t enqueueTime; // monotonic microseconds + + I2NPMessage (): buf (nullptr), len (I2NP_HEADER_SIZE + 2), + offset(2), maxLen (0), from (nullptr), enqueueTime (0) {}; // reserve 2 bytes for NTCP header // header accessors uint8_t * GetHeader () { return GetBuffer (); }; @@ -173,7 +177,9 @@ namespace tunnel void SetMsgID (uint32_t msgID) { htobe32buf (GetHeader () + I2NP_HEADER_MSGID_OFFSET, msgID); }; uint32_t GetMsgID () const { return bufbe32toh (GetHeader () + I2NP_HEADER_MSGID_OFFSET); }; void SetExpiration (uint64_t expiration) { htobe64buf (GetHeader () + I2NP_HEADER_EXPIRATION_OFFSET, expiration); }; + void SetEnqueueTime (uint64_t mts) { enqueueTime = mts; }; uint64_t GetExpiration () const { return bufbe64toh (GetHeader () + I2NP_HEADER_EXPIRATION_OFFSET); }; + uint64_t GetEnqueueTime () const { return enqueueTime; }; void SetSize (uint16_t size) { htobe16buf (GetHeader () + I2NP_HEADER_SIZE_OFFSET, size); }; uint16_t GetSize () const { return bufbe16toh (GetHeader () + I2NP_HEADER_SIZE_OFFSET); }; void UpdateSize () { SetSize (GetPayloadLength ()); }; diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index 812f49d3..a41619d6 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -436,7 +436,11 @@ namespace transport { auto ident = it->second->GetRemoteIdentity (); if (ident) - m_SessionsByRouterHash.erase (ident->GetIdentHash ()); + { + auto it1 = m_SessionsByRouterHash.find (ident->GetIdentHash ()); + if (it1 != m_SessionsByRouterHash.end () && it->second == it1->second) + m_SessionsByRouterHash.erase (it1); + } if (m_LastSession == it->second) m_LastSession = nullptr; m_Sessions.erase (it); diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index 60e70e7a..16dd7a20 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -21,8 +21,8 @@ namespace transport { const int SSU2_TERMINATION_CHECK_TIMEOUT = 25; // in seconds const int SSU2_CLEANUP_INTERVAL = 72; // in seconds - const int SSU2_RESEND_CHECK_TIMEOUT = 400; // in milliseconds - const int SSU2_RESEND_CHECK_TIMEOUT_VARIANCE = 100; // in milliseconds + const int SSU2_RESEND_CHECK_TIMEOUT = 40; // in milliseconds + const int SSU2_RESEND_CHECK_TIMEOUT_VARIANCE = 10; // in milliseconds const int SSU2_RESEND_CHECK_MORE_TIMEOUT = 10; // in milliseconds const size_t SSU2_MAX_RESEND_PACKETS = 128; // packets to resend at the time const uint64_t SSU2_SOCKET_MIN_BUFFER_SIZE = 128 * 1024; diff --git a/libi2pd/SSU2Session.cpp b/libi2pd/SSU2Session.cpp index 2c499753..42a26c5c 100644 --- a/libi2pd/SSU2Session.cpp +++ b/libi2pd/SSU2Session.cpp @@ -84,9 +84,12 @@ namespace transport m_Server (server), m_Address (addr), m_RemoteTransports (0), m_RemotePeerTestTransports (0), m_DestConnID (0), m_SourceConnID (0), m_State (eSSU2SessionStateUnknown), m_SendPacketNum (0), m_ReceivePacketNum (0), m_LastDatetimeSentPacketNum (0), - m_IsDataReceived (false), m_WindowSize (SSU2_MIN_WINDOW_SIZE), - m_RTT (SSU2_RESEND_INTERVAL), m_RTO (SSU2_RESEND_INTERVAL*SSU2_kAPPA), m_RelayTag (0), - m_ConnectTimer (server.GetService ()), m_TerminationReason (eSSU2TerminationReasonNormalClose), + m_IsDataReceived (false), m_RTT (SSU2_UNKNOWN_RTT), + m_MsgLocalExpirationTimeout (I2NP_MESSAGE_LOCAL_EXPIRATION_TIMEOUT_MAX), + m_MsgLocalSemiExpirationTimeout (I2NP_MESSAGE_LOCAL_EXPIRATION_TIMEOUT_MAX / 2), + m_WindowSize (SSU2_MIN_WINDOW_SIZE), + m_RTO (SSU2_INITIAL_RTO), m_RelayTag (0),m_ConnectTimer (server.GetService ()), + m_TerminationReason (eSSU2TerminationReasonNormalClose), m_MaxPayloadSize (SSU2_MIN_PACKET_SIZE - IPV6_HEADER_SIZE - UDP_HEADER_SIZE - 32) // min size { m_NoiseState.reset (new i2p::crypto::NoiseSymmetricState); @@ -294,8 +297,10 @@ namespace transport { m_TerminationReason = reason; SendTermination (); + m_State = eSSU2SessionStateClosing; } - m_State = eSSU2SessionStateClosing; + else + Done (); } void SSU2Session::Established () @@ -353,25 +358,33 @@ namespace transport void SSU2Session::PostI2NPMessages (std::vector > msgs) { if (m_State == eSSU2SessionStateTerminated) return; - bool isSemiFull = m_SendQueue.size () > SSU2_MAX_OUTGOING_QUEUE_SIZE/2; + uint64_t mts = i2p::util::GetMonotonicMicroseconds (); + bool isSemiFull = false; + if (m_SendQueue.size ()) + { + int64_t queueLag = (int64_t)mts - (int64_t)m_SendQueue.front ()->GetEnqueueTime (); + isSemiFull = queueLag > m_MsgLocalSemiExpirationTimeout; + if (isSemiFull) + { + LogPrint (eLogWarning, "SSU2: Outgoing messages queue to ", + i2p::data::GetIdentHashAbbreviation (GetRemoteIdentity ()->GetIdentHash ()), + " is semi-full (size = ", m_SendQueue.size (), ", lag = ", queueLag / 1000, ", rtt = ", (int)m_RTT, ")"); + } + } for (auto it: msgs) + { if (isSemiFull && it->onDrop) it->Drop (); // drop earlier because we can handle it else + { + it->SetEnqueueTime (mts); m_SendQueue.push_back (std::move (it)); + } + } SendQueue (); if (m_SendQueue.size () > 0) // windows is full - { - if (m_SendQueue.size () <= SSU2_MAX_OUTGOING_QUEUE_SIZE) - Resend (i2p::util::GetMillisecondsSinceEpoch ()); - else - { - LogPrint (eLogWarning, "SSU2: Outgoing messages queue size to ", - GetIdentHashBase64(), " exceeds ", SSU2_MAX_OUTGOING_QUEUE_SIZE); - RequestTermination (eSSU2TerminationReasonTimeout); - } - } + Resend (i2p::util::GetMillisecondsSinceEpoch ()); SetSendQueueSize (m_SendQueue.size ()); } @@ -380,6 +393,7 @@ namespace transport if (!m_SendQueue.empty () && m_SentPackets.size () <= m_WindowSize) { auto ts = i2p::util::GetMillisecondsSinceEpoch (); + uint64_t mts = i2p::util::GetMonotonicMicroseconds (); auto packet = m_Server.GetSentPacketsPool ().AcquireShared (); size_t ackBlockSize = CreateAckBlock (packet->payload, m_MaxPayloadSize); bool ackBlockSent = false; @@ -387,7 +401,7 @@ namespace transport while (!m_SendQueue.empty () && m_SentPackets.size () <= m_WindowSize) { auto msg = m_SendQueue.front (); - if (!msg || msg->IsExpired (ts)) + if (!msg || msg->IsExpired (ts) || msg->GetEnqueueTime() + m_MsgLocalExpirationTimeout < mts) { // drop null or expired message if (msg) msg->Drop (); @@ -527,7 +541,7 @@ namespace transport if (m_SentPackets.empty ()) return 0; std::map > resentPackets; for (auto it = m_SentPackets.begin (); it != m_SentPackets.end (); ) - if (ts >= it->second->sendTime + it->second->numResends*m_RTO) + if (ts >= it->second->sendTime + (it->second->numResends + 1) * m_RTO) { if (it->second->numResends > SSU2_MAX_NUM_RESENDS) { @@ -1743,8 +1757,15 @@ namespace transport if (ts > it1->second->sendTime) { auto rtt = ts - it1->second->sendTime; - m_RTT = std::round ((m_RTT*m_SendPacketNum + rtt)/(m_SendPacketNum + 1.0)); + if (m_RTT != SSU2_UNKNOWN_RTT) + m_RTT = SSU2_RTT_EWMA_ALPHA * rtt + (1.0 - SSU2_RTT_EWMA_ALPHA) * m_RTT; + else + m_RTT = rtt; m_RTO = m_RTT*SSU2_kAPPA; + m_MsgLocalExpirationTimeout = std::max (I2NP_MESSAGE_LOCAL_EXPIRATION_TIMEOUT_MIN, + std::min (I2NP_MESSAGE_LOCAL_EXPIRATION_TIMEOUT_MAX, + (unsigned int)(m_RTT * 1000 * I2NP_MESSAGE_LOCAL_EXPIRATION_TIMEOUT_FACTOR))); + m_MsgLocalSemiExpirationTimeout = m_MsgLocalExpirationTimeout / 2; if (m_RTO < SSU2_MIN_RTO) m_RTO = SSU2_MIN_RTO; if (m_RTO > SSU2_MAX_RTO) m_RTO = SSU2_MAX_RTO; } diff --git a/libi2pd/SSU2Session.h b/libi2pd/SSU2Session.h index ed17a6f4..6505b233 100644 --- a/libi2pd/SSU2Session.h +++ b/libi2pd/SSU2Session.h @@ -36,7 +36,6 @@ namespace transport const size_t SSU2_MAX_PACKET_SIZE = 1500; const size_t SSU2_MIN_PACKET_SIZE = 1280; const int SSU2_HANDSHAKE_RESEND_INTERVAL = 1000; // in milliseconds - const int SSU2_RESEND_INTERVAL = 300; // in milliseconds const int SSU2_MAX_NUM_RESENDS = 5; const int SSU2_INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT = 30; // in seconds const int SSU2_MAX_NUM_RECEIVED_I2NP_MSGIDS = 5000; // how many msgID we store for duplicates check @@ -45,9 +44,11 @@ namespace transport const size_t SSU2_MIN_WINDOW_SIZE = 16; // in packets const size_t SSU2_MAX_WINDOW_SIZE = 256; // in packets const size_t SSU2_MIN_RTO = 100; // in milliseconds + const size_t SSU2_INITIAL_RTO = 540; // in milliseconds const size_t SSU2_MAX_RTO = 2500; // in milliseconds + const double SSU2_UNKNOWN_RTT = -1; + const double SSU2_RTT_EWMA_ALPHA = 0.125; const float SSU2_kAPPA = 1.8; - const size_t SSU2_MAX_OUTGOING_QUEUE_SIZE = 500; // in messages const int SSU2_MAX_NUM_ACNT = 255; // acnt, acks or nacks const int SSU2_MAX_NUM_ACK_PACKETS = 511; // ackthrough + acnt + 1 range const int SSU2_MAX_NUM_ACK_RANGES = 32; // to send @@ -357,7 +358,10 @@ namespace transport std::list > m_SendQueue; i2p::I2NPMessagesHandler m_Handler; bool m_IsDataReceived; - size_t m_WindowSize, m_RTT, m_RTO; + double m_RTT; + int m_MsgLocalExpirationTimeout; + int m_MsgLocalSemiExpirationTimeout; + size_t m_WindowSize, m_RTO; uint32_t m_RelayTag; // between Bob and Charlie OnEstablished m_OnEstablished; // callback from Established boost::asio::deadline_timer m_ConnectTimer; diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index f6518163..14fc6407 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -68,11 +68,12 @@ namespace stream Stream::Stream (boost::asio::io_service& service, StreamingDestination& local, std::shared_ptr remote, int port): m_Service (service), - m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), + m_SendStreamID (0), m_SequenceNumber (0), + m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), - m_WindowSize (MIN_WINDOW_SIZE), m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO), + m_RTT (INITIAL_RTT), m_WindowSize (MIN_WINDOW_SIZE), m_RTO (INITIAL_RTO), m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), m_LastWindowSizeIncreaseTime (0), m_NumResendAttempts (0), m_MTU (STREAMING_MTU) { @@ -81,11 +82,12 @@ namespace stream } Stream::Stream (boost::asio::io_service& service, StreamingDestination& local): - m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), + m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), + m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), - m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_WindowSize (MIN_WINDOW_SIZE), - m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO), m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), + m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_RTT (INITIAL_RTT), + m_WindowSize (MIN_WINDOW_SIZE), m_RTO (INITIAL_RTO), m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), m_LastWindowSizeIncreaseTime (0), m_NumResendAttempts (0), m_MTU (STREAMING_MTU) { RAND_bytes ((uint8_t *)&m_RecvStreamID, 4); @@ -287,6 +289,8 @@ namespace stream m_AckSendTimer.async_wait (std::bind (&Stream::HandleAckSendTimer, shared_from_this (), std::placeholders::_1)); } + if (delayRequested >= DELAY_CHOKING) + m_WindowSize = 1; } optionData += 2; } @@ -404,6 +408,8 @@ namespace stream LogPrint (eLogError, "Streaming: Unexpected ackThrough=", ackThrough, " > seqn=", m_SequenceNumber); return; } + int rttSample = INT_MAX; + bool firstRttSample = false; int nackCount = packet->GetNACKCount (); for (auto it = m_SentPackets.begin (); it != m_SentPackets.end ();) { @@ -427,41 +433,51 @@ namespace stream } } auto sentPacket = *it; - uint64_t rtt = ts - sentPacket->sendTime; - if(ts < sentPacket->sendTime) + int64_t rtt = (int64_t)ts - (int64_t)sentPacket->sendTime; + if (rtt < 0) + LogPrint (eLogError, "Streaming: Packet ", seqn, "sent from the future, sendTime=", sentPacket->sendTime); + if (!seqn) { - LogPrint(eLogError, "Streaming: Packet ", seqn, "sent from the future, sendTime=", sentPacket->sendTime); - rtt = 1; + firstRttSample = true; + rttSample = rtt < 0 ? 1 : rtt; } - if (seqn) - m_RTT = std::round (RTT_EWMA_ALPHA * m_RTT + (1.0 - RTT_EWMA_ALPHA) * rtt); - else - m_RTT = rtt; - m_RTO = m_RTT*1.5; // TODO: implement it better + else if (!sentPacket->resent && seqn > m_TunnelsChangeSequenceNumber && rtt >= 0) + rttSample = std::min (rttSample, (int)rtt); LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime); m_SentPackets.erase (it++); m_LocalDestination.DeletePacket (sentPacket); acknowledged = true; if (m_WindowSize < WINDOW_SIZE) m_WindowSize++; // slow start - else - { - // linear growth - if (ts > m_LastWindowSizeIncreaseTime + m_RTT) - { - m_WindowSize++; - if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE; - m_LastWindowSizeIncreaseTime = ts; - } - } - if (!seqn && m_RoutingSession) // first message confirmed - m_RoutingSession->SetSharedRoutingPath ( - std::make_shared ( - i2p::garlic::GarlicRoutingPath{m_CurrentOutboundTunnel, m_CurrentRemoteLease, m_RTT, 0, 0})); } else break; } + if (rttSample != INT_MAX) + { + if (firstRttSample) + m_RTT = rttSample; + else + m_RTT = RTT_EWMA_ALPHA * rttSample + (1.0 - RTT_EWMA_ALPHA) * m_RTT; + bool wasInitial = m_RTO == INITIAL_RTO; + m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5)); // TODO: implement it better + if (wasInitial) + ScheduleResend (); + } + if (acknowledged && m_WindowSize >= WINDOW_SIZE) + { + // linear growth + if (ts > m_LastWindowSizeIncreaseTime + m_RTT) + { + m_WindowSize++; + if (m_WindowSize > MAX_WINDOW_SIZE) m_WindowSize = MAX_WINDOW_SIZE; + m_LastWindowSizeIncreaseTime = ts; + } + } + if (firstRttSample && m_RoutingSession) + m_RoutingSession->SetSharedRoutingPath ( + std::make_shared ( + i2p::garlic::GarlicRoutingPath{m_CurrentOutboundTunnel, m_CurrentRemoteLease, (int)m_RTT, 0, 0})); if (m_SentPackets.empty ()) m_ResendTimer.cancel (); if (acknowledged) @@ -677,6 +693,7 @@ namespace stream htobe32buf (packet + size, lastReceivedSeqn); size += 4; // ack Through uint8_t numNacks = 0; + bool choking = false; if (lastReceivedSeqn > m_LastReceivedSequenceNumber) { // fill NACKs @@ -688,7 +705,8 @@ namespace stream if (numNacks + (seqn - nextSeqn) >= 256) { LogPrint (eLogError, "Streaming: Number of NACKs exceeds 256. seqn=", seqn, " nextSeqn=", nextSeqn); - htobe32buf (packet + 12, nextSeqn); // change ack Through + htobe32buf (packet + 12, nextSeqn - 1); // change ack Through back + choking = true; break; } for (uint32_t i = nextSeqn; i < seqn; i++) @@ -710,10 +728,17 @@ namespace stream size++; // NACK count } packet[size] = 0; - size++; // resend delay - htobuf16 (packet + size, 0); // no flags set + size++; // resend delay + htobuf16 (packet + size, choking ? PACKET_FLAG_DELAY_REQUESTED : 0); // no flags set or delay size += 2; // flags - htobuf16 (packet + size, 0); // no options + if (choking) + { + htobuf16 (packet + size, 2); // 2 bytes delay interval + htobuf16 (packet + size + 2, DELAY_CHOKING); // set choking interval + size += 2; + } + else + htobuf16 (packet + size, 0); // no options size += 2; // options size p.len = size; @@ -881,7 +906,7 @@ namespace stream m_CurrentOutboundTunnel = routingPath->outboundTunnel; m_CurrentRemoteLease = routingPath->remoteLease; m_RTT = routingPath->rtt; - m_RTO = m_RTT*1.5; // TODO: implement it better + m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5)); // TODO: implement it better } } @@ -891,20 +916,32 @@ namespace stream UpdateCurrentRemoteLease (true); if (m_CurrentRemoteLease && ts < m_CurrentRemoteLease->endDate + i2p::data::LEASE_ENDDATE_THRESHOLD) { + bool freshTunnel = false; if (!m_CurrentOutboundTunnel) { auto leaseRouter = i2p::data::netdb.FindRouter (m_CurrentRemoteLease->tunnelGateway); m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (nullptr, leaseRouter ? leaseRouter->GetCompatibleTransports (false) : (i2p::data::RouterInfo::CompatibleTransports)i2p::data::RouterInfo::eAllTransports); + freshTunnel = true; } else if (!m_CurrentOutboundTunnel->IsEstablished ()) + { + auto oldOutboundTunnel = m_CurrentOutboundTunnel; m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNewOutboundTunnel (m_CurrentOutboundTunnel); + if (m_CurrentOutboundTunnel && oldOutboundTunnel->GetEndpointIdentHash() != m_CurrentOutboundTunnel->GetEndpointIdentHash()) + freshTunnel = true; + } if (!m_CurrentOutboundTunnel) { LogPrint (eLogError, "Streaming: No outbound tunnels in the pool, sSID=", m_SendStreamID); m_CurrentRemoteLease = nullptr; return; } + if (freshTunnel) + { + m_RTO = INITIAL_RTO; + m_TunnelsChangeSequenceNumber = m_SequenceNumber; // should be determined more precisely + } std::vector msgs; for (const auto& it: packets) @@ -936,10 +973,10 @@ namespace stream if (m_RoutingSession->IsLeaseSetNonConfirmed ()) { auto ts = i2p::util::GetMillisecondsSinceEpoch (); - if (ts > m_RoutingSession->GetLeaseSetSubmissionTime () + i2p::garlic::LEASET_CONFIRMATION_TIMEOUT) + if (ts > m_RoutingSession->GetLeaseSetSubmissionTime () + i2p::garlic::LEASESET_CONFIRMATION_TIMEOUT) { // LeaseSet was not confirmed, should try other tunnels - LogPrint (eLogWarning, "Streaming: LeaseSet was not confirmed in ", i2p::garlic::LEASET_CONFIRMATION_TIMEOUT, " milliseconds. Trying to resubmit"); + LogPrint (eLogWarning, "Streaming: LeaseSet was not confirmed in ", i2p::garlic::LEASESET_CONFIRMATION_TIMEOUT, " milliseconds. Trying to resubmit"); m_RoutingSession->SetSharedRoutingPath (nullptr); m_CurrentOutboundTunnel = nullptr; m_CurrentRemoteLease = nullptr; @@ -989,6 +1026,7 @@ namespace stream { if (ts >= it->sendTime + m_RTO) { + it->resent = true; it->sendTime = ts; packets.push_back (it); } @@ -998,31 +1036,31 @@ namespace stream if (packets.size () > 0) { m_NumResendAttempts++; - m_RTO *= 2; - switch (m_NumResendAttempts) + if (m_NumResendAttempts == 1 && m_RTO != INITIAL_RTO) { - case 1: // congestion avoidance - m_WindowSize -= (m_WindowSize + WINDOW_SIZE_DROP_FRACTION) / WINDOW_SIZE_DROP_FRACTION; // adjustment >= 1 - if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; - break; - case 2: - m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change first time -#if (__cplusplus >= 201703L) // C++ 17 or higher - [[fallthrough]]; -#endif - // no break here - case 4: - if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); - UpdateCurrentRemoteLease (); // pick another lease - LogPrint (eLogWarning, "Streaming: Another remote lease has been selected for stream with rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID); - break; - case 3: + // congestion avoidance + m_RTO *= 2; + m_WindowSize -= (m_WindowSize + WINDOW_SIZE_DROP_FRACTION) / WINDOW_SIZE_DROP_FRACTION; // adjustment >= 1 + if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; + } + else + { + m_TunnelsChangeSequenceNumber = m_SequenceNumber; + m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change + if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); + if (m_NumResendAttempts & 1) + { // pick another outbound tunnel - if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); - LogPrint (eLogWarning, "Streaming: Another outbound tunnel has been selected for stream with sSID=", m_SendStreamID); - break; - default: ; + LogPrint (eLogWarning, "Streaming: Resend #", m_NumResendAttempts, + ", another outbound tunnel has been selected for stream with sSID=", m_SendStreamID); + } + else + { + UpdateCurrentRemoteLease (); // pick another lease + LogPrint (eLogWarning, "Streaming: Resend #", m_NumResendAttempts, + ", another remote lease has been selected for stream with rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID); + } } SendPackets (packets); } @@ -1056,9 +1094,13 @@ namespace stream { if (m_RoutingSession && m_RoutingSession->IsLeaseSetNonConfirmed ()) { - // seems something went wrong and we should re-select tunnels - m_CurrentOutboundTunnel = nullptr; - m_CurrentRemoteLease = nullptr; + auto ts = i2p::util::GetMillisecondsSinceEpoch (); + if (ts > m_RoutingSession->GetLeaseSetSubmissionTime () + i2p::garlic::LEASESET_CONFIRMATION_TIMEOUT) + { + // seems something went wrong and we should re-select tunnels + m_CurrentOutboundTunnel = nullptr; + m_CurrentRemoteLease = nullptr; + } } SendQuickAck (); } diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index 1980b6fd..e27deb9c 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -52,12 +52,13 @@ namespace stream const size_t STREAMING_MTU_RATCHETS = 1812; const size_t MAX_PACKET_SIZE = 4096; const size_t COMPRESSION_THRESHOLD_SIZE = 66; - const int MAX_NUM_RESEND_ATTEMPTS = 6; + const int MAX_NUM_RESEND_ATTEMPTS = 9; const int WINDOW_SIZE = 6; // in messages const int MIN_WINDOW_SIZE = 1; const int MAX_WINDOW_SIZE = 128; const int WINDOW_SIZE_DROP_FRACTION = 10; // 1/10 - const double RTT_EWMA_ALPHA = 0.5; + const double RTT_EWMA_ALPHA = 0.125; + const int MIN_RTO = 20; // in milliseconds const int INITIAL_RTT = 8000; // in milliseconds const int INITIAL_RTO = 9000; // in milliseconds const int MIN_SEND_ACK_TIMEOUT = 2; // in milliseconds @@ -65,14 +66,16 @@ namespace stream const size_t MAX_PENDING_INCOMING_BACKLOG = 128; const int PENDING_INCOMING_TIMEOUT = 10; // in seconds const int MAX_RECEIVE_TIMEOUT = 20; // in seconds + const uint16_t DELAY_CHOKING = 60000; // in milliseconds struct Packet { size_t len, offset; uint8_t buf[MAX_PACKET_SIZE]; uint64_t sendTime; + bool resent; - Packet (): len (0), offset (0), sendTime (0) {}; + Packet (): len (0), offset (0), sendTime (0), resent (false) {}; uint8_t * GetBuffer () { return buf + offset; }; size_t GetLength () const { return len - offset; }; @@ -236,6 +239,7 @@ namespace stream boost::asio::io_service& m_Service; uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber; + uint32_t m_TunnelsChangeSequenceNumber; int32_t m_LastReceivedSequenceNumber; StreamStatus m_Status; bool m_IsAckSendScheduled; @@ -254,7 +258,8 @@ namespace stream uint16_t m_Port; SendBufferQueue m_SendBuffer; - int m_WindowSize, m_RTT, m_RTO, m_AckDelay; + double m_RTT; + int m_WindowSize, m_RTO, m_AckDelay; uint64_t m_LastWindowSizeIncreaseTime; int m_NumResendAttempts; size_t m_MTU; diff --git a/libi2pd/version.h b/libi2pd/version.h index 7b9e20c8..e05e6f3a 100644 --- a/libi2pd/version.h +++ b/libi2pd/version.h @@ -18,8 +18,8 @@ #define MAKE_VERSION_NUMBER(a,b,c) ((a*100+b)*100+c) #define I2PD_VERSION_MAJOR 2 -#define I2PD_VERSION_MINOR 50 -#define I2PD_VERSION_MICRO 2 +#define I2PD_VERSION_MINOR 51 +#define I2PD_VERSION_MICRO 0 #define I2PD_VERSION_PATCH 0 #ifdef GITVER #define I2PD_VERSION XSTRINGIZE(GITVER) @@ -33,7 +33,7 @@ #define I2P_VERSION_MAJOR 0 #define I2P_VERSION_MINOR 9 -#define I2P_VERSION_MICRO 61 +#define I2P_VERSION_MICRO 62 #define I2P_VERSION_PATCH 0 #define I2P_VERSION MAKE_VERSION(I2P_VERSION_MAJOR, I2P_VERSION_MINOR, I2P_VERSION_MICRO) #define I2P_VERSION_NUMBER MAKE_VERSION_NUMBER(I2P_VERSION_MAJOR, I2P_VERSION_MINOR, I2P_VERSION_MICRO)