From e832734d94b77b436c52add9d3702a45116920eb Mon Sep 17 00:00:00 2001 From: orignal Date: Thu, 12 Dec 2013 21:36:24 -0500 Subject: [PATCH] Streaming added --- Makefile | 2 +- Streaming.cpp | 74 +++++++++++++++++++++++++++++++++++++++++++++++++++ Streaming.h | 35 ++++++++++++++++++++++++ 3 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 Streaming.cpp create mode 100644 Streaming.h diff --git a/Makefile b/Makefile index e363a5b8..a03d5f45 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ CC = g++ CFLAGS = -g -Wall -std=c++0x OBJECTS = i2p.o base64.o NTCPSession.o RouterInfo.o Transports.o RouterContext.o \ NetDb.o LeaseSet.o Tunnel.o TunnelEndpoint.o TunnelGateway.o TransitTunnel.o \ - I2NPProtocol.o Log.o Garlic.o HTTPServer.o + I2NPProtocol.o Log.o Garlic.o HTTPServer.o Streaming.o INCFLAGS = LDFLAGS = -Wl,-rpath,/usr/local/lib -lcryptopp -lboost_system -lboost_filesystem LIBS = diff --git a/Streaming.cpp b/Streaming.cpp new file mode 100644 index 00000000..f9d253ac --- /dev/null +++ b/Streaming.cpp @@ -0,0 +1,74 @@ +#include +#include +#include +#include "Log.h" +#include "RouterInfo.h" +#include "Streaming.h" + +namespace i2p +{ +namespace stream +{ + void StreamingDestination::HandleNextPacket (const uint8_t * buf, size_t len) + { + const uint8_t * end = buf + len; + buf += 4; // sendStreamID + buf += 4; // receiveStreamID + buf += 4; // sequenceNum + buf += 4; // ackThrough + int nackCount = buf[0]; + buf++; // NACK count + buf += 4*nackCount; // NACKs + buf++; // resendDelay + uint16_t flags = be16toh (*(uint16_t *)buf); + buf += 2; // flags + uint16_t optionalSize = be16toh (*(uint16_t *)buf); + buf += 2; // optional size + const uint8_t * optionalData = buf; + buf += optionalSize; + + // process flags + if (flags & PACKET_FLAG_SYNCHRONIZE) + { + LogPrint ("Synchronize"); + } + + if (flags & PACKET_FLAG_SIGNATURE_INCLUDED) + { + LogPrint ("Signature"); + optionalData += 40; + } + + if (flags & PACKET_FLAG_FROM_INCLUDED) + { + LogPrint ("From identity"); + optionalData += sizeof (i2p::data::RouterIdentity); + } + + // we have reached payload section + std::string str((const char *)buf, end-buf); + LogPrint ("Payload: ", str); + } + + + void HandleDataMessage (i2p::data::IdentHash * destination, const uint8_t * buf, size_t len) + { + uint32_t length = be32toh (*(uint32_t *)buf); + buf += 4; + // we assume I2CP payload + if (buf[9] == 6) // streaming protocol + { + // unzip it + CryptoPP::Gunzip decompressor; + decompressor.Put (buf, length); + decompressor.MessageEnd(); + uint8_t uncompressed[2048]; + int uncomressedSize = decompressor.MaxRetrievable (); + decompressor.Get (uncompressed, uncomressedSize); + // then forward to streaming engine + } + else + LogPrint ("Data: protocol ", buf[9], " is not supported"); + } +} +} diff --git a/Streaming.h b/Streaming.h new file mode 100644 index 00000000..a52c440e --- /dev/null +++ b/Streaming.h @@ -0,0 +1,35 @@ +#ifndef STREAMING_H__ +#define STREAMING_H__ + +#include +#include "LeaseSet.h" + +namespace i2p +{ +namespace stream +{ + const uint16_t PACKET_FLAG_SYNCHRONIZE = 0x0001; + const uint16_t PACKET_FLAG_CLOSE = 0x0002; + const uint16_t PACKET_FLAG_RESET = 0x0004; + const uint16_t PACKET_FLAG_SIGNATURE_INCLUDED = 0x0008; + const uint16_t PACKET_FLAG_SIGNATURE_REQUESTED = 0x0010; + const uint16_t PACKET_FLAG_FROM_INCLUDED = 0x0020; + const uint16_t PACKET_FLAG_DELAY_REQUESTED = 0x0040; + const uint16_t PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED = 0x0080; + const uint16_t PACKET_FLAG_PROFILE_INTERACTIVE = 0x0100; + const uint16_t PACKET_FLAG_ECHO = 0x0200; + const uint16_t PACKET_FLAG_NO_ACK = 0x0400; + + class StreamingDestination + { + public: + + void HandleNextPacket (const uint8_t * buf, size_t len); + }; + + // assuming data is I2CP message + void HandleDataMessage (i2p::data::IdentHash * destination, const uint8_t * buf, size_t len); +} +} + +#endif