From 9363547205844a7613af14e162c6a09f6c3bfcb4 Mon Sep 17 00:00:00 2001 From: orignal Date: Fri, 10 Jan 2014 22:23:17 -0500 Subject: [PATCH] send quick ack --- Streaming.cpp | 49 +++++++++++++++++++++++++++++++++++++++++++------ Streaming.h | 8 +++++++- 2 files changed, 50 insertions(+), 7 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index d0df63de..e1a155fd 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -16,7 +16,8 @@ namespace i2p namespace stream { Stream::Stream (StreamingDestination * local, const i2p::data::LeaseSet * remote): - m_SendStreamID (0), m_SequenceNumber (0), m_LocalDestination (local), m_RemoteLeaseSet (remote) + m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), + m_LocalDestination (local), m_RemoteLeaseSet (remote), m_OutboundTunnel (nullptr) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); } @@ -33,7 +34,8 @@ namespace stream buf += 4; // sendStreamID if (!m_SendStreamID) m_SendStreamID = be32toh (*(uint32_t *)buf); - buf += 4; // receiveStreamID + buf += 4; // receiveStreamID + m_LastReceivedSequenceNumber = be32toh (*(uint32_t *)buf); buf += 4; // sequenceNum buf += 4; // ackThrough int nackCount = buf[0]; @@ -64,13 +66,15 @@ namespace stream LogPrint ("From identity"); optionalData += sizeof (i2p::data::Identity); } - + // we have reached payload section std::string str((const char *)buf, end-buf); LogPrint ("Payload: ", str); packet->offset = buf - packet->buf; m_ReceiveQueue.Put (packet); + + SendQuickAck (); } size_t Stream::Send (uint8_t * buf, size_t len, int timeout) @@ -105,17 +109,50 @@ namespace stream I2NPMessage * msg = i2p::garlic::routing.WrapSingleMessage (m_RemoteLeaseSet, CreateDataMessage (this, packet, size), m_LocalDestination->GetLeaseSet ()); - auto outbound = i2p::tunnel::tunnels.GetNextOutboundTunnel (); - if (outbound) + if (!m_OutboundTunnel) + m_OutboundTunnel = i2p::tunnel::tunnels.GetNextOutboundTunnel (); + if (m_OutboundTunnel) { auto& lease = m_RemoteLeaseSet->GetLeases ()[0]; // TODO: - outbound->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); + m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); } else DeleteI2NPMessage (msg); return len; } + void Stream::SendQuickAck () + { + uint8_t packet[STREAMING_MTU]; + size_t size = 0; + *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); + size += 4; // sendStreamID + *(uint32_t *)(packet + size) = htobe32 (m_RecvStreamID); + size += 4; // receiveStreamID + *(uint32_t *)(packet + size) = 0; // this is plain Ack message + size += 4; // sequenceNum + *(uint32_t *)(packet + size) = htobe32 (m_LastReceivedSequenceNumber); + size += 4; // ack Through + packet[size] = 0; + size++; // NACK count + size++; // resend delay + *(uint16_t *)(packet + size) = 0; // nof flags set + size += 2; // flags + *(uint16_t *)(packet + size) = 0; // nof flags set + size += 2; // options size + + I2NPMessage * msg = i2p::garlic::routing.WrapSingleMessage (m_RemoteLeaseSet, + CreateDataMessage (this, packet, size)); + if (m_OutboundTunnel) + { + auto& lease = m_RemoteLeaseSet->GetLeases ()[0]; // TODO: + m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); + LogPrint ("Quick Ack sent"); + } + else + DeleteI2NPMessage (msg); + } + size_t Stream::Receive (uint8_t * buf, size_t len, int timeout) { if (m_ReceiveQueue.IsEmpty ()) diff --git a/Streaming.h b/Streaming.h index 5f0ab5f4..7851589c 100644 --- a/Streaming.h +++ b/Streaming.h @@ -8,6 +8,7 @@ #include "Identity.h" #include "LeaseSet.h" #include "I2NPProtocol.h" +#include "Tunnel.h" namespace i2p { @@ -52,13 +53,18 @@ namespace stream void HandleNextPacket (Packet * packet); size_t Send (uint8_t * buf, size_t len, int timeout); // timeout in seconds size_t Receive (uint8_t * buf, size_t len, int timeout); // returns 0 if timeout expired + + private: + + void SendQuickAck (); private: - uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber; + uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber, m_LastReceivedSequenceNumber; StreamingDestination * m_LocalDestination; const i2p::data::LeaseSet * m_RemoteLeaseSet; i2p::util::Queue m_ReceiveQueue; + i2p::tunnel::OutboundTunnel * m_OutboundTunnel; }; class StreamingDestination