From 2006e5971a799005658f56912daa4e5593aeb25d Mon Sep 17 00:00:00 2001 From: Andy Wang Date: Thu, 22 Aug 2019 11:48:10 +0100 Subject: [PATCH] Refactor frameSorter and datagramBuffer under one interface --- internal/multiplex/datagramBuffer.go | 10 +- internal/multiplex/datagramBuffer_test.go | 16 +- internal/multiplex/frameSorter.go | 156 ------------------ internal/multiplex/recvBuffer.go | 9 + internal/multiplex/session.go | 4 +- internal/multiplex/stream.go | 33 ++-- internal/multiplex/streamBuffer.go | 144 ++++++++++++++++ ...ameSorter_test.go => streamBuffer_test.go} | 21 +-- 8 files changed, 183 insertions(+), 210 deletions(-) delete mode 100644 internal/multiplex/frameSorter.go create mode 100644 internal/multiplex/recvBuffer.go create mode 100644 internal/multiplex/streamBuffer.go rename internal/multiplex/{frameSorter_test.go => streamBuffer_test.go} (84%) diff --git a/internal/multiplex/datagramBuffer.go b/internal/multiplex/datagramBuffer.go index d5e8ad7..3689428 100644 --- a/internal/multiplex/datagramBuffer.go +++ b/internal/multiplex/datagramBuffer.go @@ -51,12 +51,12 @@ func (d *datagramBuffer) Read(target []byte) (int, error) { return len(data), nil } -func (d *datagramBuffer) Write(input []byte) (int, error) { +func (d *datagramBuffer) Write(f Frame) error { d.rwCond.L.Lock() defer d.rwCond.L.Unlock() for { if d.closed { - return 0, io.ErrClosedPipe + return io.ErrClosedPipe } if len(d.buf) <= DATAGRAM_NUMBER_LIMIT { // if d.buf gets too large, write() will panic. We don't want this to happen @@ -64,12 +64,12 @@ func (d *datagramBuffer) Write(input []byte) (int, error) { } d.rwCond.Wait() } - data := make([]byte, len(input)) - copy(data, input) + data := make([]byte, len(f.Payload)) + copy(data, f.Payload) d.buf = append(d.buf, data) // err will always be nil d.rwCond.Broadcast() - return len(data), nil + return nil } func (d *datagramBuffer) Close() error { diff --git a/internal/multiplex/datagramBuffer_test.go b/internal/multiplex/datagramBuffer_test.go index 22ec4bd..6b31e75 100644 --- a/internal/multiplex/datagramBuffer_test.go +++ b/internal/multiplex/datagramBuffer_test.go @@ -9,15 +9,7 @@ import ( func TestDatagramBuffer_RW(t *testing.T) { pipe := NewDatagramBuffer() b := []byte{0x01, 0x02, 0x03} - n, err := pipe.Write(b) - if n != len(b) { - t.Error( - "For", "number of bytes written", - "expecting", len(b), - "got", n, - ) - return - } + err := pipe.Write(Frame{Payload: b}) if err != nil { t.Error( "For", "simple write", @@ -28,7 +20,7 @@ func TestDatagramBuffer_RW(t *testing.T) { } b2 := make([]byte, len(b)) - n, err = pipe.Read(b2) + n, err := pipe.Read(b2) if n != len(b) { t.Error( "For", "number of bytes read", @@ -64,7 +56,7 @@ func TestDatagramBuffer_BlockingRead(t *testing.T) { b := []byte{0x01, 0x02, 0x03} go func() { time.Sleep(10 * time.Millisecond) - pipe.Write(b) + pipe.Write(Frame{Payload: b}) }() b2 := make([]byte, len(b)) n, err := pipe.Read(b2) @@ -97,7 +89,7 @@ func TestDatagramBuffer_BlockingRead(t *testing.T) { func TestDatagramBuffer_CloseThenRead(t *testing.T) { pipe := NewDatagramBuffer() b := []byte{0x01, 0x02, 0x03} - pipe.Write(b) + pipe.Write(Frame{Payload: b}) b2 := make([]byte, len(b)) pipe.Close() n, err := pipe.Read(b2) diff --git a/internal/multiplex/frameSorter.go b/internal/multiplex/frameSorter.go deleted file mode 100644 index 236f3fe..0000000 --- a/internal/multiplex/frameSorter.go +++ /dev/null @@ -1,156 +0,0 @@ -package multiplex - -import ( - "container/heap" - "io" - - log "github.com/sirupsen/logrus" -) - -// The data is multiplexed through several TCP connections, therefore the -// order of arrival is not guaranteed. A stream's first packet may be sent through -// connection0 and its second packet may be sent through connection1. Although both -// packets are transmitted reliably (as TCP is reliable), packet1 may arrive to the -// remote side before packet0. Cloak have to therefore sequence the packets so that they -// arrive in order as they were sent by the proxy software -// -// Cloak packets will have a 32-bit sequence number on them, so we know in which order -// they should be sent to the proxy software. The code in this file provides buffering and sorting. -// -// Similar to TCP, the next seq number after 2^32-1 is 0. This is called wrap around. -// -// Note that in golang, integer overflow results in wrap around -// -// Stream.nextRecvSeq is the expected sequence number of the next packet -// Stream.rev counts the amount of time the sequence number gets wrapped - -type frameNode struct { - trueSeq uint64 - frame *Frame -} -type sorterHeap []*frameNode - -func (sh sorterHeap) Less(i, j int) bool { - return sh[i].trueSeq < sh[j].trueSeq -} -func (sh sorterHeap) Len() int { - return len(sh) -} -func (sh sorterHeap) Swap(i, j int) { - sh[i], sh[j] = sh[j], sh[i] -} - -func (sh *sorterHeap) Push(x interface{}) { - *sh = append(*sh, x.(*frameNode)) -} - -func (sh *sorterHeap) Pop() interface{} { - old := *sh - n := len(old) - x := old[n-1] - *sh = old[0 : n-1] - return x -} - -type frameSorter struct { - nextRecvSeq uint32 - rev int - sh sorterHeap - wrapMode bool - - // New frames are received through newFrameCh by frameSorter - newFrameCh chan *Frame - - output io.WriteCloser -} - -func NewFrameSorter(output io.WriteCloser) *frameSorter { - fs := &frameSorter{ - sh: []*frameNode{}, - newFrameCh: make(chan *Frame, 1024), - rev: 0, - output: output, - } - go fs.recvNewFrame() - return fs -} - -func (fs *frameSorter) writeNewFrame(f *Frame) { - fs.newFrameCh <- f -} -func (fs *frameSorter) Close() error { - fs.newFrameCh <- nil - return nil -} - -// recvNewFrame is a forever running loop which receives frames unordered, -// cache and order them and send them into sortedBufCh -func (fs *frameSorter) recvNewFrame() { - // TODO: add timeout - defer log.Tracef("a recvNewFrame has returned gracefully") - for { - f := <-fs.newFrameCh - if f == nil { - return - } - - // when there'fs no ooo packages in heap and we receive the next package in order - if len(fs.sh) == 0 && f.Seq == fs.nextRecvSeq { - if f.Closing == 1 { - // empty data indicates closing signal - fs.output.Close() - return - } else { - fs.output.Write(f.Payload) - fs.nextRecvSeq += 1 - if fs.nextRecvSeq == 0 { // getting wrapped - fs.rev += 1 - fs.wrapMode = false - } - } - continue - } - - node := &frameNode{ - trueSeq: 0, - frame: f, - } - - if f.Seq < fs.nextRecvSeq { - // For the ease of demonstration, assume seq is uint8, i.e. it wraps around after 255 - // e.g. we are on rev=0 (wrap has not happened yet) - // and we get the order of recv as 253 254 0 1 - // after 254, nextN should be 255, but 0 is received and 0 < 255 - // now 0 should have a trueSeq of 256 - if !fs.wrapMode { - // wrapMode is true when the latest seq is wrapped but nextN is not - fs.wrapMode = true - } - node.trueSeq = uint64(1<<32)*uint64(fs.rev+1) + uint64(f.Seq) + 1 - // +1 because wrapped 0 should have trueSeq of 256 instead of 255 - // when this bit was run on 1, the trueSeq of 1 would become 256 - } else { - node.trueSeq = uint64(1<<32)*uint64(fs.rev) + uint64(f.Seq) - // when this bit was run on 255, the trueSeq of 255 would be 255 - } - - heap.Push(&fs.sh, node) - // Keep popping from the heap until empty or to the point that the wanted seq was not received - for len(fs.sh) > 0 && fs.sh[0].frame.Seq == fs.nextRecvSeq { - f = heap.Pop(&fs.sh).(*frameNode).frame - if f.Closing == 1 { - // empty data indicates closing signal - fs.output.Close() - return - } else { - fs.output.Write(f.Payload) - fs.nextRecvSeq += 1 - if fs.nextRecvSeq == 0 { // getting wrapped - fs.rev += 1 - fs.wrapMode = false - } - } - } - } - -} diff --git a/internal/multiplex/recvBuffer.go b/internal/multiplex/recvBuffer.go new file mode 100644 index 0000000..49a1f07 --- /dev/null +++ b/internal/multiplex/recvBuffer.go @@ -0,0 +1,9 @@ +package multiplex + +import "io" + +type recvBuffer interface { + io.ReadCloser + Write(Frame) error + Len() int +} diff --git a/internal/multiplex/session.go b/internal/multiplex/session.go index 4c51845..848f286 100644 --- a/internal/multiplex/session.go +++ b/internal/multiplex/session.go @@ -151,7 +151,7 @@ func (sesh *Session) recvDataFromRemote(data []byte) error { defer sesh.streamsM.Unlock() stream, existing := sesh.streams[frame.StreamID] if existing { - stream.writeFrame(frame) + stream.writeFrame(*frame) return nil } else { if frame.Closing == 1 { @@ -166,7 +166,7 @@ func (sesh *Session) recvDataFromRemote(data []byte) error { // we ignore the error here. If the switchboard is broken, it will be reflected upon stream.Write stream = makeStream(sesh, frame.StreamID, connId) sesh.acceptCh <- stream - stream.writeFrame(frame) + stream.writeFrame(*frame) return nil } } diff --git a/internal/multiplex/stream.go b/internal/multiplex/stream.go index bd9d305..8912cd9 100644 --- a/internal/multiplex/stream.go +++ b/internal/multiplex/stream.go @@ -27,9 +27,7 @@ type Stream struct { session *Session - buf ReadWriteCloseLener - - sorter *frameSorter + recvBuf recvBuffer // atomic nextSendSeq uint32 @@ -49,19 +47,18 @@ type Stream struct { } func makeStream(sesh *Session, id uint32, assignedConnId uint32) *Stream { - var buf ReadWriteCloseLener + var recvBuf recvBuffer if sesh.Unordered { - buf = NewDatagramBuffer() + recvBuf = NewDatagramBuffer() } else { - buf = NewBufferedPipe() + recvBuf = NewStreamBuffer() } stream := &Stream{ id: id, session: sesh, - buf: buf, + recvBuf: recvBuf, obfsBuf: make([]byte, 17000), - sorter: NewFrameSorter(buf), assignedConnId: assignedConnId, } @@ -70,13 +67,9 @@ func makeStream(sesh *Session, id uint32, assignedConnId uint32) *Stream { func (s *Stream) isClosed() bool { return atomic.LoadUint32(&s.closed) == 1 } -func (s *Stream) writeFrame(frame *Frame) { - // TODO: refactor this through an interface - if s.session.Unordered { - s.buf.Write(frame.Payload) - } else { - s.sorter.writeNewFrame(frame) - } +func (s *Stream) writeFrame(frame Frame) { + // TODO: Return error + s.recvBuf.Write(frame) } // Read implements io.Read @@ -92,15 +85,15 @@ func (s *Stream) Read(buf []byte) (n int, err error) { if s.isClosed() { // TODO: Len check may not be necessary as this can be offloaded to buffer implementation - if s.buf.Len() == 0 { + if s.recvBuf.Len() == 0 { return 0, ErrBrokenStream } else { - n, err = s.buf.Read(buf) + n, err = s.recvBuf.Read(buf) log.Tracef("%v read from stream %v with err %v", n, s.id, err) return } } else { - n, err = s.buf.Read(buf) + n, err = s.recvBuf.Read(buf) log.Tracef("%v read from stream %v with err %v", n, s.id, err) return } @@ -142,9 +135,9 @@ func (s *Stream) Write(in []byte) (n int, err error) { // the necessary steps to mark the stream as closed and to release resources func (s *Stream) _close() { + // TODO: return err here atomic.StoreUint32(&s.closed, 1) - s.sorter.Close() // this will trigger frameSorter to return - s.buf.Close() + s.recvBuf.Close() } // only close locally. Used when the stream close is notified by the remote diff --git a/internal/multiplex/streamBuffer.go b/internal/multiplex/streamBuffer.go new file mode 100644 index 0000000..eb8b469 --- /dev/null +++ b/internal/multiplex/streamBuffer.go @@ -0,0 +1,144 @@ +package multiplex + +// The data is multiplexed through several TCP connections, therefore the +// order of arrival is not guaranteed. A stream's first packet may be sent through +// connection0 and its second packet may be sent through connection1. Although both +// packets are transmitted reliably (as TCP is reliable), packet1 may arrive to the +// remote side before packet0. Cloak have to therefore sequence the packets so that they +// arrive in order as they were sent by the proxy software +// +// Cloak packets will have a 32-bit sequence number on them, so we know in which order +// they should be sent to the proxy software. The code in this file provides buffering and sorting. + +import ( + "container/heap" + "errors" + "sync" +) + +type frameNode struct { + trueSeq uint64 + frame Frame +} +type sorterHeap []*frameNode + +func (sh sorterHeap) Less(i, j int) bool { + return sh[i].trueSeq < sh[j].trueSeq +} +func (sh sorterHeap) Len() int { + return len(sh) +} +func (sh sorterHeap) Swap(i, j int) { + sh[i], sh[j] = sh[j], sh[i] +} + +func (sh *sorterHeap) Push(x interface{}) { + *sh = append(*sh, x.(*frameNode)) +} + +func (sh *sorterHeap) Pop() interface{} { + old := *sh + n := len(old) + x := old[n-1] + *sh = old[0 : n-1] + return x +} + +type streamBuffer struct { + recvM sync.Mutex + + nextRecvSeq uint32 + rev int + sh sorterHeap + wrapMode bool + + buf *bufferedPipe +} + +func NewStreamBuffer() *streamBuffer { + sb := &streamBuffer{ + sh: []*frameNode{}, + rev: 0, + buf: NewBufferedPipe(), + } + return sb +} + +var ClosingFrameReceived = errors.New("closed by closing frame") + +// recvNewFrame is a forever running loop which receives frames unordered, +// cache and order them and send them into sortedBufCh +func (sb *streamBuffer) Write(f Frame) error { + sb.recvM.Lock() + defer sb.recvM.Unlock() + // when there'fs no ooo packages in heap and we receive the next package in order + if len(sb.sh) == 0 && f.Seq == sb.nextRecvSeq { + if f.Closing == 1 { + // empty data indicates closing signal + sb.buf.Close() + return ClosingFrameReceived + } else { + sb.buf.Write(f.Payload) + sb.nextRecvSeq += 1 + if sb.nextRecvSeq == 0 { // getting wrapped + sb.rev += 1 + sb.wrapMode = false + } + } + return nil + } + + node := &frameNode{ + trueSeq: 0, + frame: f, + } + + if f.Seq < sb.nextRecvSeq { + // For the ease of demonstration, assume seq is uint8, i.e. it wraps around after 255 + // e.g. we are on rev=0 (wrap has not happened yet) + // and we get the order of recv as 253 254 0 1 + // after 254, nextN should be 255, but 0 is received and 0 < 255 + // now 0 should have a trueSeq of 256 + if !sb.wrapMode { + // wrapMode is true when the latest seq is wrapped but nextN is not + sb.wrapMode = true + } + node.trueSeq = uint64(1<<32)*uint64(sb.rev+1) + uint64(f.Seq) + 1 + // +1 because wrapped 0 should have trueSeq of 256 instead of 255 + // when this bit was run on 1, the trueSeq of 1 would become 256 + } else { + node.trueSeq = uint64(1<<32)*uint64(sb.rev) + uint64(f.Seq) + // when this bit was run on 255, the trueSeq of 255 would be 255 + } + + heap.Push(&sb.sh, node) + // Keep popping from the heap until empty or to the point that the wanted seq was not received + for len(sb.sh) > 0 && sb.sh[0].frame.Seq == sb.nextRecvSeq { + f = heap.Pop(&sb.sh).(*frameNode).frame + if f.Closing == 1 { + // empty data indicates closing signal + sb.buf.Close() + return ClosingFrameReceived + } else { + sb.buf.Write(f.Payload) + sb.nextRecvSeq += 1 + if sb.nextRecvSeq == 0 { // getting wrapped + sb.rev += 1 + sb.wrapMode = false + } + } + } + return nil +} + +func (sb *streamBuffer) Read(buf []byte) (int, error) { + return sb.buf.Read(buf) +} + +func (sb *streamBuffer) Close() error { + return sb.buf.Close() +} + +func (sb *streamBuffer) Len() int { + return sb.buf.Len() +} diff --git a/internal/multiplex/frameSorter_test.go b/internal/multiplex/streamBuffer_test.go similarity index 84% rename from internal/multiplex/frameSorter_test.go rename to internal/multiplex/streamBuffer_test.go index 8671090..b4f8e3e 100644 --- a/internal/multiplex/frameSorter_test.go +++ b/internal/multiplex/streamBuffer_test.go @@ -1,7 +1,6 @@ package multiplex import ( - "bytes" "encoding/binary" "time" @@ -10,31 +9,23 @@ import ( "testing" ) -type BufferReaderWriterCloser struct { - *bytes.Buffer -} - -func (b *BufferReaderWriterCloser) Close() error { - return nil -} func TestRecvNewFrame(t *testing.T) { inOrder := []uint64{5, 6, 7, 8, 9, 10, 11} outOfOrder0 := []uint64{5, 7, 8, 6, 11, 10, 9} outOfOrder1 := []uint64{1, 96, 47, 2, 29, 18, 60, 8, 74, 22, 82, 58, 44, 51, 57, 71, 90, 94, 68, 83, 61, 91, 39, 97, 85, 63, 46, 73, 54, 84, 76, 98, 93, 79, 75, 50, 67, 37, 92, 99, 42, 77, 17, 16, 38, 3, 100, 24, 31, 7, 36, 40, 86, 64, 34, 45, 12, 5, 9, 27, 21, 26, 35, 6, 65, 69, 53, 4, 48, 28, 30, 56, 32, 11, 80, 66, 25, 41, 78, 13, 88, 62, 15, 70, 49, 43, 72, 23, 10, 55, 52, 95, 14, 59, 87, 33, 19, 20, 81, 89} outOfOrderWrap0 := []uint64{1<<32 - 5, 1<<32 + 3, 1 << 32, 1<<32 - 3, 1<<32 - 4, 1<<32 + 2, 1<<32 - 2, 1<<32 - 1, 1<<32 + 1} - sortedBuf := &BufferReaderWriterCloser{new(bytes.Buffer)} test := func(set []uint64, ct *testing.T) { - fs := NewFrameSorter(sortedBuf) - fs.nextRecvSeq = uint32(set[0]) + sb := NewStreamBuffer() + sb.nextRecvSeq = uint32(set[0]) for _, n := range set { bu64 := make([]byte, 8) binary.BigEndian.PutUint64(bu64, n) - frame := &Frame{ + frame := Frame{ Seq: uint32(n), Payload: bu64, } - fs.writeNewFrame(frame) + sb.Write(frame) } time.Sleep(100 * time.Millisecond) @@ -42,7 +33,7 @@ func TestRecvNewFrame(t *testing.T) { var sortedResult []uint64 for x := 0; x < len(set); x++ { oct := make([]byte, 8) - n, err := sortedBuf.Read(oct) + n, err := sb.Read(oct) if n != 8 || err != nil { ct.Error("failed to read from sorted Buf", n, err) return @@ -59,7 +50,7 @@ func TestRecvNewFrame(t *testing.T) { goto fail } } - fs.Close() + sb.Close() return fail: ct.Error(