incoming streams support

This commit is contained in:
orignal 2014-08-01 14:54:14 -04:00
parent acd21a04ae
commit ed0af7b937
2 changed files with 74 additions and 32 deletions

View File

@ -10,6 +10,7 @@
#include "Timestamp.h" #include "Timestamp.h"
#include "CryptoConst.h" #include "CryptoConst.h"
#include "Garlic.h" #include "Garlic.h"
#include "NetDb.h"
#include "Streaming.h" #include "Streaming.h"
namespace i2p namespace i2p
@ -19,13 +20,21 @@ namespace stream
Stream::Stream (boost::asio::io_service& service, StreamingDestination * local, Stream::Stream (boost::asio::io_service& service, StreamingDestination * local,
const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0), const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0),
m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), m_IsOpen (false), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), m_IsOpen (false),
m_LeaseSetUpdated (true), m_LocalDestination (local), m_RemoteLeaseSet (remote), m_IsOutgoing(true), m_LeaseSetUpdated (true), m_LocalDestination (local),
m_ReceiveTimer (m_Service) m_RemoteLeaseSet (&remote), m_ReceiveTimer (m_Service)
{ {
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
UpdateCurrentRemoteLease (); UpdateCurrentRemoteLease ();
} }
Stream::Stream (boost::asio::io_service& service, StreamingDestination * local):
m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0),
m_IsOpen (false), m_IsOutgoing(true), m_LeaseSetUpdated (true), m_LocalDestination (local),
m_RemoteLeaseSet (nullptr), m_ReceiveTimer (m_Service)
{
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
}
Stream::~Stream () Stream::~Stream ()
{ {
m_ReceiveTimer.cancel (); m_ReceiveTimer.cancel ();
@ -115,6 +124,13 @@ namespace stream
if (flags & PACKET_FLAG_FROM_INCLUDED) if (flags & PACKET_FLAG_FROM_INCLUDED)
{ {
LogPrint ("From identity"); LogPrint ("From identity");
if (!m_RemoteLeaseSet)
{
i2p::data::Identity * identity = (i2p::data::Identity *)optionData;
m_RemoteLeaseSet = i2p::data::netdb.FindLeaseSet (identity->Hash ());
if (!m_RemoteLeaseSet)
LogPrint ("LeaseSet ", identity->Hash ().ToBase64 (), " not found");
}
optionData += sizeof (i2p::data::Identity); optionData += sizeof (i2p::data::Identity);
} }
@ -281,6 +297,12 @@ namespace stream
bool Stream::SendPacket (const uint8_t * buf, size_t len) bool Stream::SendPacket (const uint8_t * buf, size_t len)
{ {
if (!m_RemoteLeaseSet)
{
LogPrint ("Can't send packet. Missing remote LeaseSet");
return false;
}
I2NPMessage * leaseSet = nullptr; I2NPMessage * leaseSet = nullptr;
if (m_LeaseSetUpdated) if (m_LeaseSetUpdated)
@ -289,7 +311,7 @@ namespace stream
m_LeaseSetUpdated = false; m_LeaseSetUpdated = false;
} }
I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet, I2NPMessage * msg = i2p::garlic::routing.WrapMessage (*m_RemoteLeaseSet,
CreateDataMessage (this, buf, len), leaseSet); CreateDataMessage (this, buf, len), leaseSet);
auto outboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel (); auto outboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel ();
if (outboundTunnel) if (outboundTunnel)
@ -318,7 +340,9 @@ namespace stream
void Stream::UpdateCurrentRemoteLease () void Stream::UpdateCurrentRemoteLease ()
{ {
auto leases = m_RemoteLeaseSet.GetNonExpiredLeases (); if (m_RemoteLeaseSet)
{
auto leases = m_RemoteLeaseSet->GetNonExpiredLeases ();
if (!leases.empty ()) if (!leases.empty ())
{ {
uint32_t i = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (0, leases.size () - 1); uint32_t i = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (0, leases.size () - 1);
@ -327,9 +351,13 @@ namespace stream
else else
m_CurrentRemoteLease.endDate = 0; m_CurrentRemoteLease.endDate = 0;
} }
else
m_CurrentRemoteLease.endDate = 0;
}
StreamingDestination::StreamingDestination (): m_LeaseSet (nullptr) StreamingDestination::StreamingDestination (boost::asio::io_service& service):
m_Service (service), m_LeaseSet (nullptr)
{ {
m_Keys = i2p::data::CreateRandomKeys (); m_Keys = i2p::data::CreateRandomKeys ();
@ -341,7 +369,8 @@ namespace stream
m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel
} }
StreamingDestination::StreamingDestination (const std::string& fullPath): m_LeaseSet (nullptr) StreamingDestination::StreamingDestination (boost::asio::io_service& service, const std::string& fullPath):
m_Service (service), m_LeaseSet (nullptr)
{ {
std::ifstream s(fullPath.c_str (), std::ifstream::binary); std::ifstream s(fullPath.c_str (), std::ifstream::binary);
if (s.is_open ()) if (s.is_open ())
@ -378,17 +407,23 @@ namespace stream
delete packet; delete packet;
} }
} }
else else // new incoming stream
{ {
LogPrint ("Uncoming stream is not implemented yet"); auto incomingStream = CreateNewIncomingStream ();
delete packet; incomingStream->HandleNextPacket (packet);
} }
} }
Stream * StreamingDestination::CreateNewStream (boost::asio::io_service& service, Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote)
const i2p::data::LeaseSet& remote)
{ {
Stream * s = new Stream (service, this, remote); Stream * s = new Stream (m_Service, this, remote);
m_Streams[s->GetRecvStreamID ()] = s;
return s;
}
Stream * StreamingDestination::CreateNewIncomingStream ()
{
Stream * s = new Stream (m_Service, this);
m_Streams[s->GetRecvStreamID ()] = s; m_Streams[s->GetRecvStreamID ()] = s;
return s; return s;
} }
@ -429,7 +464,7 @@ namespace stream
{ {
if (!m_SharedLocalDestination) if (!m_SharedLocalDestination)
{ {
m_SharedLocalDestination = new StreamingDestination (); m_SharedLocalDestination = new StreamingDestination (m_Service);
m_Destinations[m_SharedLocalDestination->GetIdentHash ()] = m_SharedLocalDestination; m_Destinations[m_SharedLocalDestination->GetIdentHash ()] = m_SharedLocalDestination;
} }
LoadLocalDestinations (); LoadLocalDestinations ();
@ -475,7 +510,7 @@ namespace stream
#else #else
it->path(); it->path();
#endif #endif
auto localDestination = new StreamingDestination (fullPath); auto localDestination = new StreamingDestination (m_Service, fullPath);
m_Destinations[localDestination->GetIdentHash ()] = localDestination; m_Destinations[localDestination->GetIdentHash ()] = localDestination;
numDestinations++; numDestinations++;
} }
@ -487,7 +522,7 @@ namespace stream
Stream * StreamingDestinations::CreateClientStream (const i2p::data::LeaseSet& remote) Stream * StreamingDestinations::CreateClientStream (const i2p::data::LeaseSet& remote)
{ {
if (!m_SharedLocalDestination) return nullptr; if (!m_SharedLocalDestination) return nullptr;
return m_SharedLocalDestination->CreateNewStream (m_Service, remote); return m_SharedLocalDestination->CreateNewOutgoingStream (remote);
} }
void StreamingDestinations::DeleteClientStream (Stream * stream) void StreamingDestinations::DeleteClientStream (Stream * stream)

View File

@ -69,11 +69,13 @@ namespace stream
{ {
public: public:
Stream (boost::asio::io_service& service, StreamingDestination * local, const i2p::data::LeaseSet& remote); Stream (boost::asio::io_service& service, StreamingDestination * local, const i2p::data::LeaseSet& remote); // outgoing
Stream (boost::asio::io_service& service, StreamingDestination * local); // incoming
~Stream (); ~Stream ();
uint32_t GetSendStreamID () const { return m_SendStreamID; }; uint32_t GetSendStreamID () const { return m_SendStreamID; };
uint32_t GetRecvStreamID () const { return m_RecvStreamID; }; uint32_t GetRecvStreamID () const { return m_RecvStreamID; };
const i2p::data::LeaseSet& GetRemoteLeaseSet () const { return m_RemoteLeaseSet; }; const i2p::data::LeaseSet * GetRemoteLeaseSet () const { return m_RemoteLeaseSet; };
bool IsOpen () const { return m_IsOpen; }; bool IsOpen () const { return m_IsOpen; };
bool IsEstablished () const { return m_SendStreamID; }; bool IsEstablished () const { return m_SendStreamID; };
@ -106,9 +108,9 @@ namespace stream
boost::asio::io_service& m_Service; boost::asio::io_service& m_Service;
uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber, m_LastReceivedSequenceNumber; uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber, m_LastReceivedSequenceNumber;
bool m_IsOpen, m_LeaseSetUpdated; bool m_IsOpen, m_IsOutgoing, m_LeaseSetUpdated;
StreamingDestination * m_LocalDestination; StreamingDestination * m_LocalDestination;
const i2p::data::LeaseSet& m_RemoteLeaseSet; const i2p::data::LeaseSet * m_RemoteLeaseSet;
i2p::data::Lease m_CurrentRemoteLease; i2p::data::Lease m_CurrentRemoteLease;
std::queue<Packet *> m_ReceiveQueue; std::queue<Packet *> m_ReceiveQueue;
std::set<Packet *, PacketCmp> m_SavedPackets; std::set<Packet *, PacketCmp> m_SavedPackets;
@ -119,15 +121,15 @@ namespace stream
{ {
public: public:
StreamingDestination (); StreamingDestination (boost::asio::io_service& service);
StreamingDestination (const std::string& fullPath); StreamingDestination (boost::asio::io_service& service, const std::string& fullPath);
~StreamingDestination (); ~StreamingDestination ();
const i2p::data::PrivateKeys& GetKeys () const { return m_Keys; }; const i2p::data::PrivateKeys& GetKeys () const { return m_Keys; };
I2NPMessage * GetLeaseSetMsg (); I2NPMessage * GetLeaseSetMsg ();
i2p::tunnel::TunnelPool * GetTunnelPool () const { return m_Pool; }; i2p::tunnel::TunnelPool * GetTunnelPool () const { return m_Pool; };
Stream * CreateNewStream (boost::asio::io_service& service, const i2p::data::LeaseSet& remote); Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote);
void DeleteStream (Stream * stream); void DeleteStream (Stream * stream);
void HandleNextPacket (Packet * packet); void HandleNextPacket (Packet * packet);
@ -140,6 +142,11 @@ namespace stream
private: private:
Stream * CreateNewIncomingStream ();
private:
boost::asio::io_service& m_Service;
std::map<uint32_t, Stream *> m_Streams; std::map<uint32_t, Stream *> m_Streams;
i2p::data::PrivateKeys m_Keys; i2p::data::PrivateKeys m_Keys;
i2p::data::IdentHash m_IdentHash; i2p::data::IdentHash m_IdentHash;