diff --git a/Destination.cpp b/Destination.cpp index 26330c29..e6380ffd 100644 --- a/Destination.cpp +++ b/Destination.cpp @@ -140,7 +140,7 @@ namespace client m_IsRunning = true; m_Pool->SetLocalDestination (shared_from_this ()); m_Pool->SetActive (true); - m_Thread = new std::thread (std::bind (&ClientDestination::Run, this)); + m_Thread = new std::thread (std::bind (&ClientDestination::Run, shared_from_this ())); m_StreamingDestination = std::make_shared (shared_from_this ()); // TODO: m_StreamingDestination->Start (); for (auto it: m_StreamingDestinationsByPorts) @@ -148,7 +148,7 @@ namespace client m_CleanupTimer.expires_from_now (boost::posix_time::minutes (DESTINATION_CLEANUP_TIMEOUT)); m_CleanupTimer.async_wait (std::bind (&ClientDestination::HandleCleanupTimer, - this, std::placeholders::_1)); + shared_from_this (), std::placeholders::_1)); } } @@ -229,21 +229,22 @@ namespace client } data; memcpy (data.k, key, 32); memcpy (data.t, tag, 32); - m_Service.post ([this,data](void) + auto s = shared_from_this (); + m_Service.post ([s,data](void) { - this->AddSessionKey (data.k, data.t); + s->AddSessionKey (data.k, data.t); }); return true; } void ClientDestination::ProcessGarlicMessage (std::shared_ptr msg) { - m_Service.post (std::bind (&ClientDestination::HandleGarlicMessage, this, msg)); + m_Service.post (std::bind (&ClientDestination::HandleGarlicMessage, shared_from_this (), msg)); } void ClientDestination::ProcessDeliveryStatusMessage (std::shared_ptr msg) { - m_Service.post (std::bind (&ClientDestination::HandleDeliveryStatusMessage, this, msg)); + m_Service.post (std::bind (&ClientDestination::HandleDeliveryStatusMessage, shared_from_this (), msg)); } void ClientDestination::HandleI2NPMessage (const uint8_t * buf, size_t len, std::shared_ptr from) @@ -286,15 +287,20 @@ namespace client if (it != m_RemoteLeaseSets.end ()) { leaseSet = it->second; - leaseSet->Update (buf + offset, len - offset); - if (leaseSet->IsValid ()) - LogPrint (eLogDebug, "Remote LeaseSet updated"); - else - { - LogPrint (eLogDebug, "Remote LeaseSet update failed"); - m_RemoteLeaseSets.erase (it); - leaseSet = nullptr; + if (leaseSet->IsNewer (buf + offset, len - offset)) + { + leaseSet->Update (buf + offset, len - offset); + if (leaseSet->IsValid ()) + LogPrint (eLogDebug, "Remote LeaseSet updated"); + else + { + LogPrint (eLogDebug, "Remote LeaseSet update failed"); + m_RemoteLeaseSets.erase (it); + leaseSet = nullptr; + } } + else + LogPrint (eLogDebug, "Remote LeaseSet is older. Not updated"); } else { diff --git a/LeaseSet.cpp b/LeaseSet.cpp index 891b73d6..5d259b33 100644 --- a/LeaseSet.cpp +++ b/LeaseSet.cpp @@ -48,6 +48,7 @@ namespace data m_Buffer[m_BufferLen] = tunnels.size (); // num leases m_BufferLen++; // leases + auto currentTime = i2p::util::GetMillisecondsSinceEpoch (); for (auto it: tunnels) { memcpy (m_Buffer + m_BufferLen, it->GetNextIdentHash (), 32); @@ -56,8 +57,9 @@ namespace data m_BufferLen += 4; // tunnel id uint64_t ts = it->GetCreationTime () + i2p::tunnel::TUNNEL_EXPIRATION_TIMEOUT - i2p::tunnel::TUNNEL_EXPIRATION_THRESHOLD; // 1 minute before expiration ts *= 1000; // in milliseconds - ts += rand () % 6; // + random milliseconds 0-5 if (ts > m_ExpirationTime) m_ExpirationTime = ts; + // make sure leaseset is newer than previous, but adding some time to expiration date + ts += (currentTime - it->GetCreationTime ()*1000LL)*2/i2p::tunnel::TUNNEL_EXPIRATION_TIMEOUT; // up to 2 secs htobe64buf (m_Buffer + m_BufferLen, ts); m_BufferLen += 8; // end date } @@ -182,7 +184,35 @@ namespace data m_IsValid = false; } } - + + uint64_t LeaseSet::ExtractTimestamp (const uint8_t * buf, size_t len) const + { + if (!m_Identity) return 0; + size_t size = m_Identity->GetFullLen (); + if (size > len) return 0; + size += 256; // encryption key + size += m_Identity->GetSigningPublicKeyLen (); // unused signing key + if (size > len) return 0; + uint8_t num = buf[size]; + size++; // num + if (size + num*44 > len) return 0; + uint64_t timestamp= 0 ; + for (int i = 0; i < num; i++) + { + size += 36; // gateway (32) + tunnelId(4) + auto endDate = bufbe64toh (buf + size); + size += 8; // end date + if (!timestamp || endDate < timestamp) + timestamp = endDate; + } + return timestamp; + } + + bool LeaseSet::IsNewer (const uint8_t * buf, size_t len) const + { + return ExtractTimestamp (buf, len) > ExtractTimestamp (m_Buffer, m_BufferLen); + } + const std::vector > LeaseSet::GetNonExpiredLeases (bool withThreshold) const { auto ts = i2p::util::GetMillisecondsSinceEpoch (); diff --git a/LeaseSet.h b/LeaseSet.h index cffa60dd..0afca269 100644 --- a/LeaseSet.h +++ b/LeaseSet.h @@ -47,6 +47,7 @@ namespace data LeaseSet (std::shared_ptr pool); ~LeaseSet () { delete[] m_Buffer; }; void Update (const uint8_t * buf, size_t len); + bool IsNewer (const uint8_t * buf, size_t len) const; void PopulateLeases (); // from buffer std::shared_ptr GetIdentity () const { return m_Identity; }; @@ -69,6 +70,7 @@ namespace data private: void ReadFromBuffer (bool readIdentity = true); + uint64_t ExtractTimestamp (const uint8_t * buf, size_t len) const; // min expiration time private: diff --git a/NetDb.cpp b/NetDb.cpp index 9f0e4312..b4b4ef08 100644 --- a/NetDb.cpp +++ b/NetDb.cpp @@ -149,22 +149,30 @@ namespace data } } - void NetDb::AddRouterInfo (const uint8_t * buf, int len) + bool NetDb::AddRouterInfo (const uint8_t * buf, int len) { IdentityEx identity; if (identity.FromBuffer (buf, len)) - AddRouterInfo (identity.GetIdentHash (), buf, len); + return AddRouterInfo (identity.GetIdentHash (), buf, len); + return false; } - void NetDb::AddRouterInfo (const IdentHash& ident, const uint8_t * buf, int len) + bool NetDb::AddRouterInfo (const IdentHash& ident, const uint8_t * buf, int len) { + bool updated = true; auto r = FindRouter (ident); if (r) { - auto ts = r->GetTimestamp (); - r->Update (buf, len); - if (r->GetTimestamp () > ts) + if (r->IsNewer (buf, len)) + { + r->Update (buf, len); LogPrint (eLogInfo, "NetDb: RouterInfo updated: ", ident.ToBase64()); + } + else + { + LogPrint (eLogDebug, "NetDb: RouterInfo is older: ", ident.ToBase64()); + updated = false; + } } else { @@ -182,27 +190,39 @@ namespace data m_Floodfills.push_back (r); } } + else + updated = false; } // take care about requested destination m_Requests.RequestComplete (ident, r); + return updated; } - void NetDb::AddLeaseSet (const IdentHash& ident, const uint8_t * buf, int len, + bool NetDb::AddLeaseSet (const IdentHash& ident, const uint8_t * buf, int len, std::shared_ptr from) { + bool updated = false; if (!from) // unsolicited LS must be received directly { auto it = m_LeaseSets.find(ident); if (it != m_LeaseSets.end ()) { - it->second->Update (buf, len); - if (it->second->IsValid ()) - LogPrint (eLogInfo, "NetDb: LeaseSet updated: ", ident.ToBase64()); - else + if (it->second->IsNewer (buf, len)) { - LogPrint (eLogWarning, "NetDb: LeaseSet update failed: ", ident.ToBase64()); - m_LeaseSets.erase (it); - } + it->second->Update (buf, len); + if (it->second->IsValid ()) + { + LogPrint (eLogInfo, "NetDb: LeaseSet updated: ", ident.ToBase64()); + updated = true; + } + else + { + LogPrint (eLogWarning, "NetDb: LeaseSet update failed: ", ident.ToBase64()); + m_LeaseSets.erase (it); + } + } + else + LogPrint (eLogDebug, "NetDb: LeaseSet is older: ", ident.ToBase64()); } else { @@ -211,11 +231,13 @@ namespace data { LogPrint (eLogInfo, "NetDb: LeaseSet added: ", ident.ToBase64()); m_LeaseSets[ident] = leaseSet; + updated = true; } else LogPrint (eLogError, "NetDb: new LeaseSet validation failed: ", ident.ToBase64()); } } + return updated; } std::shared_ptr NetDb::FindRouter (const IdentHash& ident) const @@ -487,39 +509,14 @@ namespace data LogPrint (eLogError, "NetDb: no outbound tunnels for DatabaseStore reply found"); } offset += 32; - - if (context.IsFloodfill ()) - { - // flood it - auto floodMsg = NewI2NPShortMessage (); - uint8_t * payload = floodMsg->GetPayload (); - memcpy (payload, buf, 33); // key + type - htobe32buf (payload + DATABASE_STORE_REPLY_TOKEN_OFFSET, 0); // zero reply token - auto msgLen = len - offset; - floodMsg->len += DATABASE_STORE_HEADER_SIZE + msgLen; - if (floodMsg->len < floodMsg->maxLen) - { - memcpy (payload + DATABASE_STORE_HEADER_SIZE, buf + offset, msgLen); - floodMsg->FillI2NPMessageHeader (eI2NPDatabaseStore); - std::set excluded; - for (int i = 0; i < 3; i++) - { - auto floodfill = GetClosestFloodfill (ident, excluded, true); // we need a floodfill close than us only - if (floodfill) - transports.SendMessage (floodfill->GetIdentHash (), floodMsg); - else - break; - } - } - else - LogPrint (eLogError, "Database store message is too long ", floodMsg->len); - } } - + size_t payloadOffset = offset; + + bool updated = false; if (buf[DATABASE_STORE_TYPE_OFFSET]) // type { LogPrint (eLogDebug, "NetDb: store request: LeaseSet"); - AddLeaseSet (ident, buf + offset, len - offset, m->from); + updated = AddLeaseSet (ident, buf + offset, len - offset, m->from); } else { @@ -534,7 +531,34 @@ namespace data uint8_t uncompressed[2048]; size_t uncompressedSize = m_Inflator.Inflate (buf + offset, size, uncompressed, 2048); if (uncompressedSize) - AddRouterInfo (ident, uncompressed, uncompressedSize); + updated = AddRouterInfo (ident, uncompressed, uncompressedSize); + } + + if (replyToken && context.IsFloodfill () && updated) + { + // flood updated + auto floodMsg = NewI2NPShortMessage (); + uint8_t * payload = floodMsg->GetPayload (); + memcpy (payload, buf, 33); // key + type + htobe32buf (payload + DATABASE_STORE_REPLY_TOKEN_OFFSET, 0); // zero reply token + auto msgLen = len - payloadOffset; + floodMsg->len += DATABASE_STORE_HEADER_SIZE + msgLen; + if (floodMsg->len < floodMsg->maxLen) + { + memcpy (payload + DATABASE_STORE_HEADER_SIZE, buf + payloadOffset, msgLen); + floodMsg->FillI2NPMessageHeader (eI2NPDatabaseStore); + std::set excluded; + for (int i = 0; i < 3; i++) + { + auto floodfill = GetClosestFloodfill (ident, excluded); + if (floodfill) + transports.SendMessage (floodfill->GetIdentHash (), floodMsg); + else + break; + } + } + else + LogPrint (eLogError, "Database store message is too long ", floodMsg->len); } } diff --git a/NetDb.h b/NetDb.h index f1b97728..cad00aa7 100644 --- a/NetDb.h +++ b/NetDb.h @@ -34,9 +34,9 @@ namespace data void Start (); void Stop (); - void AddRouterInfo (const uint8_t * buf, int len); - void AddRouterInfo (const IdentHash& ident, const uint8_t * buf, int len); - void AddLeaseSet (const IdentHash& ident, const uint8_t * buf, int len, std::shared_ptr from); + bool AddRouterInfo (const uint8_t * buf, int len); + bool AddRouterInfo (const IdentHash& ident, const uint8_t * buf, int len); + bool AddLeaseSet (const IdentHash& ident, const uint8_t * buf, int len, std::shared_ptr from); std::shared_ptr FindRouter (const IdentHash& ident) const; std::shared_ptr FindLeaseSet (const IdentHash& destination) const; std::shared_ptr FindRouterProfile (const IdentHash& ident) const; diff --git a/RouterInfo.cpp b/RouterInfo.cpp index 56482234..3267392b 100644 --- a/RouterInfo.cpp +++ b/RouterInfo.cpp @@ -459,6 +459,14 @@ namespace data s.write (properties.str ().c_str (), properties.str ().size ()); } + bool RouterInfo::IsNewer (const uint8_t * buf, size_t len) const + { + if (!m_RouterIdentity) return false; + size_t size = m_RouterIdentity->GetFullLen (); + if (size + 8 > len) return false; + return bufbe64toh (buf + size) > m_Timestamp; + } + const uint8_t * RouterInfo::LoadBuffer () { if (!m_Buffer) diff --git a/RouterInfo.h b/RouterInfo.h index 2d32edef..4870eb50 100644 --- a/RouterInfo.h +++ b/RouterInfo.h @@ -157,7 +157,8 @@ namespace data void Update (const uint8_t * buf, int len); void DeleteBuffer () { delete[] m_Buffer; m_Buffer = nullptr; }; - + bool IsNewer (const uint8_t * buf, size_t len) const; + // implements RoutingDestination const IdentHash& GetIdentHash () const { return m_RouterIdentity->GetIdentHash (); }; const uint8_t * GetEncryptionPublicKey () const { return m_RouterIdentity->GetStandardIdentity ().publicKey; }; diff --git a/TunnelPool.cpp b/TunnelPool.cpp index 72d6735a..c74ff475 100644 --- a/TunnelPool.cpp +++ b/TunnelPool.cpp @@ -201,7 +201,14 @@ namespace tunnel void TunnelPool::TestTunnels () { - for (auto it: m_Tests) + decltype(m_Tests) tests; + { + std::unique_lock l(m_TestsMutex); + tests = m_Tests; + m_Tests.clear (); + } + + for (auto it: tests) { LogPrint (eLogWarning, "Tunnels: test of tunnel ", it.first, " failed"); // if test failed again with another tunnel we consider it failed @@ -232,7 +239,7 @@ namespace tunnel it.second.second->SetState (eTunnelStateTestFailed); } } - m_Tests.clear (); + // new tests auto it1 = m_OutboundTunnels.begin (); auto it2 = m_InboundTunnels.begin (); @@ -253,7 +260,10 @@ namespace tunnel { uint32_t msgID; RAND_bytes ((uint8_t *)&msgID, 4); - m_Tests[msgID] = std::make_pair (*it1, *it2); + { + std::unique_lock l(m_TestsMutex); + m_Tests[msgID] = std::make_pair (*it1, *it2); + } (*it1)->SendTunnelDataMsg ((*it2)->GetNextIdentHash (), (*it2)->GetNextTunnelID (), CreateDeliveryStatusMsg (msgID)); it1++; it2++; @@ -276,16 +286,26 @@ namespace tunnel buf += 4; uint64_t timestamp = bufbe64toh (buf); - auto it = m_Tests.find (msgID); - if (it != m_Tests.end ()) + decltype(m_Tests)::mapped_type test; + bool found = false; + { + std::unique_lock l(m_TestsMutex); + auto it = m_Tests.find (msgID); + if (it != m_Tests.end ()) + { + found = true; + test = it->second; + m_Tests.erase (it); + } + } + if (found) { // restore from test failed state if any - if (it->second.first->GetState () == eTunnelStateTestFailed) - it->second.first->SetState (eTunnelStateEstablished); - if (it->second.second->GetState () == eTunnelStateTestFailed) - it->second.second->SetState (eTunnelStateEstablished); - LogPrint (eLogDebug, "Tunnels: test of ", it->first, " successful. ", i2p::util::GetMillisecondsSinceEpoch () - timestamp, " milliseconds"); - m_Tests.erase (it); + if (test.first->GetState () == eTunnelStateTestFailed) + test.first->SetState (eTunnelStateEstablished); + if (test.second->GetState () == eTunnelStateTestFailed) + test.second->SetState (eTunnelStateEstablished); + LogPrint (eLogDebug, "Tunnels: test of ", msgID, " successful. ", i2p::util::GetMillisecondsSinceEpoch () - timestamp, " milliseconds"); } else { diff --git a/TunnelPool.h b/TunnelPool.h index 97a018b7..0cd2057d 100644 --- a/TunnelPool.h +++ b/TunnelPool.h @@ -77,6 +77,7 @@ namespace tunnel std::set, TunnelCreationTimeCmp> m_InboundTunnels; // recent tunnel appears first mutable std::mutex m_OutboundTunnelsMutex; std::set, TunnelCreationTimeCmp> m_OutboundTunnels; + mutable std::mutex m_TestsMutex; std::map, std::shared_ptr > > m_Tests; bool m_IsActive;