mirror of
https://github.com/cbeuw/Cloak.git
synced 2024-11-15 18:13:29 +00:00
Refactor frameSorter and datagramBuffer under one interface
This commit is contained in:
parent
c3ff3f5d1a
commit
2006e5971a
@ -51,12 +51,12 @@ func (d *datagramBuffer) Read(target []byte) (int, error) {
|
|||||||
return len(data), nil
|
return len(data), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *datagramBuffer) Write(input []byte) (int, error) {
|
func (d *datagramBuffer) Write(f Frame) error {
|
||||||
d.rwCond.L.Lock()
|
d.rwCond.L.Lock()
|
||||||
defer d.rwCond.L.Unlock()
|
defer d.rwCond.L.Unlock()
|
||||||
for {
|
for {
|
||||||
if d.closed {
|
if d.closed {
|
||||||
return 0, io.ErrClosedPipe
|
return io.ErrClosedPipe
|
||||||
}
|
}
|
||||||
if len(d.buf) <= DATAGRAM_NUMBER_LIMIT {
|
if len(d.buf) <= DATAGRAM_NUMBER_LIMIT {
|
||||||
// if d.buf gets too large, write() will panic. We don't want this to happen
|
// if d.buf gets too large, write() will panic. We don't want this to happen
|
||||||
@ -64,12 +64,12 @@ func (d *datagramBuffer) Write(input []byte) (int, error) {
|
|||||||
}
|
}
|
||||||
d.rwCond.Wait()
|
d.rwCond.Wait()
|
||||||
}
|
}
|
||||||
data := make([]byte, len(input))
|
data := make([]byte, len(f.Payload))
|
||||||
copy(data, input)
|
copy(data, f.Payload)
|
||||||
d.buf = append(d.buf, data)
|
d.buf = append(d.buf, data)
|
||||||
// err will always be nil
|
// err will always be nil
|
||||||
d.rwCond.Broadcast()
|
d.rwCond.Broadcast()
|
||||||
return len(data), nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *datagramBuffer) Close() error {
|
func (d *datagramBuffer) Close() error {
|
||||||
|
@ -9,15 +9,7 @@ import (
|
|||||||
func TestDatagramBuffer_RW(t *testing.T) {
|
func TestDatagramBuffer_RW(t *testing.T) {
|
||||||
pipe := NewDatagramBuffer()
|
pipe := NewDatagramBuffer()
|
||||||
b := []byte{0x01, 0x02, 0x03}
|
b := []byte{0x01, 0x02, 0x03}
|
||||||
n, err := pipe.Write(b)
|
err := pipe.Write(Frame{Payload: b})
|
||||||
if n != len(b) {
|
|
||||||
t.Error(
|
|
||||||
"For", "number of bytes written",
|
|
||||||
"expecting", len(b),
|
|
||||||
"got", n,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(
|
t.Error(
|
||||||
"For", "simple write",
|
"For", "simple write",
|
||||||
@ -28,7 +20,7 @@ func TestDatagramBuffer_RW(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
b2 := make([]byte, len(b))
|
b2 := make([]byte, len(b))
|
||||||
n, err = pipe.Read(b2)
|
n, err := pipe.Read(b2)
|
||||||
if n != len(b) {
|
if n != len(b) {
|
||||||
t.Error(
|
t.Error(
|
||||||
"For", "number of bytes read",
|
"For", "number of bytes read",
|
||||||
@ -64,7 +56,7 @@ func TestDatagramBuffer_BlockingRead(t *testing.T) {
|
|||||||
b := []byte{0x01, 0x02, 0x03}
|
b := []byte{0x01, 0x02, 0x03}
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
pipe.Write(b)
|
pipe.Write(Frame{Payload: b})
|
||||||
}()
|
}()
|
||||||
b2 := make([]byte, len(b))
|
b2 := make([]byte, len(b))
|
||||||
n, err := pipe.Read(b2)
|
n, err := pipe.Read(b2)
|
||||||
@ -97,7 +89,7 @@ func TestDatagramBuffer_BlockingRead(t *testing.T) {
|
|||||||
func TestDatagramBuffer_CloseThenRead(t *testing.T) {
|
func TestDatagramBuffer_CloseThenRead(t *testing.T) {
|
||||||
pipe := NewDatagramBuffer()
|
pipe := NewDatagramBuffer()
|
||||||
b := []byte{0x01, 0x02, 0x03}
|
b := []byte{0x01, 0x02, 0x03}
|
||||||
pipe.Write(b)
|
pipe.Write(Frame{Payload: b})
|
||||||
b2 := make([]byte, len(b))
|
b2 := make([]byte, len(b))
|
||||||
pipe.Close()
|
pipe.Close()
|
||||||
n, err := pipe.Read(b2)
|
n, err := pipe.Read(b2)
|
||||||
|
@ -1,156 +0,0 @@
|
|||||||
package multiplex
|
|
||||||
|
|
||||||
import (
|
|
||||||
"container/heap"
|
|
||||||
"io"
|
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
)
|
|
||||||
|
|
||||||
// The data is multiplexed through several TCP connections, therefore the
|
|
||||||
// order of arrival is not guaranteed. A stream's first packet may be sent through
|
|
||||||
// connection0 and its second packet may be sent through connection1. Although both
|
|
||||||
// packets are transmitted reliably (as TCP is reliable), packet1 may arrive to the
|
|
||||||
// remote side before packet0. Cloak have to therefore sequence the packets so that they
|
|
||||||
// arrive in order as they were sent by the proxy software
|
|
||||||
//
|
|
||||||
// Cloak packets will have a 32-bit sequence number on them, so we know in which order
|
|
||||||
// they should be sent to the proxy software. The code in this file provides buffering and sorting.
|
|
||||||
//
|
|
||||||
// Similar to TCP, the next seq number after 2^32-1 is 0. This is called wrap around.
|
|
||||||
//
|
|
||||||
// Note that in golang, integer overflow results in wrap around
|
|
||||||
//
|
|
||||||
// Stream.nextRecvSeq is the expected sequence number of the next packet
|
|
||||||
// Stream.rev counts the amount of time the sequence number gets wrapped
|
|
||||||
|
|
||||||
type frameNode struct {
|
|
||||||
trueSeq uint64
|
|
||||||
frame *Frame
|
|
||||||
}
|
|
||||||
type sorterHeap []*frameNode
|
|
||||||
|
|
||||||
func (sh sorterHeap) Less(i, j int) bool {
|
|
||||||
return sh[i].trueSeq < sh[j].trueSeq
|
|
||||||
}
|
|
||||||
func (sh sorterHeap) Len() int {
|
|
||||||
return len(sh)
|
|
||||||
}
|
|
||||||
func (sh sorterHeap) Swap(i, j int) {
|
|
||||||
sh[i], sh[j] = sh[j], sh[i]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sh *sorterHeap) Push(x interface{}) {
|
|
||||||
*sh = append(*sh, x.(*frameNode))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sh *sorterHeap) Pop() interface{} {
|
|
||||||
old := *sh
|
|
||||||
n := len(old)
|
|
||||||
x := old[n-1]
|
|
||||||
*sh = old[0 : n-1]
|
|
||||||
return x
|
|
||||||
}
|
|
||||||
|
|
||||||
type frameSorter struct {
|
|
||||||
nextRecvSeq uint32
|
|
||||||
rev int
|
|
||||||
sh sorterHeap
|
|
||||||
wrapMode bool
|
|
||||||
|
|
||||||
// New frames are received through newFrameCh by frameSorter
|
|
||||||
newFrameCh chan *Frame
|
|
||||||
|
|
||||||
output io.WriteCloser
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewFrameSorter(output io.WriteCloser) *frameSorter {
|
|
||||||
fs := &frameSorter{
|
|
||||||
sh: []*frameNode{},
|
|
||||||
newFrameCh: make(chan *Frame, 1024),
|
|
||||||
rev: 0,
|
|
||||||
output: output,
|
|
||||||
}
|
|
||||||
go fs.recvNewFrame()
|
|
||||||
return fs
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fs *frameSorter) writeNewFrame(f *Frame) {
|
|
||||||
fs.newFrameCh <- f
|
|
||||||
}
|
|
||||||
func (fs *frameSorter) Close() error {
|
|
||||||
fs.newFrameCh <- nil
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// recvNewFrame is a forever running loop which receives frames unordered,
|
|
||||||
// cache and order them and send them into sortedBufCh
|
|
||||||
func (fs *frameSorter) recvNewFrame() {
|
|
||||||
// TODO: add timeout
|
|
||||||
defer log.Tracef("a recvNewFrame has returned gracefully")
|
|
||||||
for {
|
|
||||||
f := <-fs.newFrameCh
|
|
||||||
if f == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// when there'fs no ooo packages in heap and we receive the next package in order
|
|
||||||
if len(fs.sh) == 0 && f.Seq == fs.nextRecvSeq {
|
|
||||||
if f.Closing == 1 {
|
|
||||||
// empty data indicates closing signal
|
|
||||||
fs.output.Close()
|
|
||||||
return
|
|
||||||
} else {
|
|
||||||
fs.output.Write(f.Payload)
|
|
||||||
fs.nextRecvSeq += 1
|
|
||||||
if fs.nextRecvSeq == 0 { // getting wrapped
|
|
||||||
fs.rev += 1
|
|
||||||
fs.wrapMode = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
node := &frameNode{
|
|
||||||
trueSeq: 0,
|
|
||||||
frame: f,
|
|
||||||
}
|
|
||||||
|
|
||||||
if f.Seq < fs.nextRecvSeq {
|
|
||||||
// For the ease of demonstration, assume seq is uint8, i.e. it wraps around after 255
|
|
||||||
// e.g. we are on rev=0 (wrap has not happened yet)
|
|
||||||
// and we get the order of recv as 253 254 0 1
|
|
||||||
// after 254, nextN should be 255, but 0 is received and 0 < 255
|
|
||||||
// now 0 should have a trueSeq of 256
|
|
||||||
if !fs.wrapMode {
|
|
||||||
// wrapMode is true when the latest seq is wrapped but nextN is not
|
|
||||||
fs.wrapMode = true
|
|
||||||
}
|
|
||||||
node.trueSeq = uint64(1<<32)*uint64(fs.rev+1) + uint64(f.Seq) + 1
|
|
||||||
// +1 because wrapped 0 should have trueSeq of 256 instead of 255
|
|
||||||
// when this bit was run on 1, the trueSeq of 1 would become 256
|
|
||||||
} else {
|
|
||||||
node.trueSeq = uint64(1<<32)*uint64(fs.rev) + uint64(f.Seq)
|
|
||||||
// when this bit was run on 255, the trueSeq of 255 would be 255
|
|
||||||
}
|
|
||||||
|
|
||||||
heap.Push(&fs.sh, node)
|
|
||||||
// Keep popping from the heap until empty or to the point that the wanted seq was not received
|
|
||||||
for len(fs.sh) > 0 && fs.sh[0].frame.Seq == fs.nextRecvSeq {
|
|
||||||
f = heap.Pop(&fs.sh).(*frameNode).frame
|
|
||||||
if f.Closing == 1 {
|
|
||||||
// empty data indicates closing signal
|
|
||||||
fs.output.Close()
|
|
||||||
return
|
|
||||||
} else {
|
|
||||||
fs.output.Write(f.Payload)
|
|
||||||
fs.nextRecvSeq += 1
|
|
||||||
if fs.nextRecvSeq == 0 { // getting wrapped
|
|
||||||
fs.rev += 1
|
|
||||||
fs.wrapMode = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
9
internal/multiplex/recvBuffer.go
Normal file
9
internal/multiplex/recvBuffer.go
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
package multiplex
|
||||||
|
|
||||||
|
import "io"
|
||||||
|
|
||||||
|
type recvBuffer interface {
|
||||||
|
io.ReadCloser
|
||||||
|
Write(Frame) error
|
||||||
|
Len() int
|
||||||
|
}
|
@ -151,7 +151,7 @@ func (sesh *Session) recvDataFromRemote(data []byte) error {
|
|||||||
defer sesh.streamsM.Unlock()
|
defer sesh.streamsM.Unlock()
|
||||||
stream, existing := sesh.streams[frame.StreamID]
|
stream, existing := sesh.streams[frame.StreamID]
|
||||||
if existing {
|
if existing {
|
||||||
stream.writeFrame(frame)
|
stream.writeFrame(*frame)
|
||||||
return nil
|
return nil
|
||||||
} else {
|
} else {
|
||||||
if frame.Closing == 1 {
|
if frame.Closing == 1 {
|
||||||
@ -166,7 +166,7 @@ func (sesh *Session) recvDataFromRemote(data []byte) error {
|
|||||||
// we ignore the error here. If the switchboard is broken, it will be reflected upon stream.Write
|
// we ignore the error here. If the switchboard is broken, it will be reflected upon stream.Write
|
||||||
stream = makeStream(sesh, frame.StreamID, connId)
|
stream = makeStream(sesh, frame.StreamID, connId)
|
||||||
sesh.acceptCh <- stream
|
sesh.acceptCh <- stream
|
||||||
stream.writeFrame(frame)
|
stream.writeFrame(*frame)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -27,9 +27,7 @@ type Stream struct {
|
|||||||
|
|
||||||
session *Session
|
session *Session
|
||||||
|
|
||||||
buf ReadWriteCloseLener
|
recvBuf recvBuffer
|
||||||
|
|
||||||
sorter *frameSorter
|
|
||||||
|
|
||||||
// atomic
|
// atomic
|
||||||
nextSendSeq uint32
|
nextSendSeq uint32
|
||||||
@ -49,19 +47,18 @@ type Stream struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func makeStream(sesh *Session, id uint32, assignedConnId uint32) *Stream {
|
func makeStream(sesh *Session, id uint32, assignedConnId uint32) *Stream {
|
||||||
var buf ReadWriteCloseLener
|
var recvBuf recvBuffer
|
||||||
if sesh.Unordered {
|
if sesh.Unordered {
|
||||||
buf = NewDatagramBuffer()
|
recvBuf = NewDatagramBuffer()
|
||||||
} else {
|
} else {
|
||||||
buf = NewBufferedPipe()
|
recvBuf = NewStreamBuffer()
|
||||||
}
|
}
|
||||||
|
|
||||||
stream := &Stream{
|
stream := &Stream{
|
||||||
id: id,
|
id: id,
|
||||||
session: sesh,
|
session: sesh,
|
||||||
buf: buf,
|
recvBuf: recvBuf,
|
||||||
obfsBuf: make([]byte, 17000),
|
obfsBuf: make([]byte, 17000),
|
||||||
sorter: NewFrameSorter(buf),
|
|
||||||
assignedConnId: assignedConnId,
|
assignedConnId: assignedConnId,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -70,13 +67,9 @@ func makeStream(sesh *Session, id uint32, assignedConnId uint32) *Stream {
|
|||||||
|
|
||||||
func (s *Stream) isClosed() bool { return atomic.LoadUint32(&s.closed) == 1 }
|
func (s *Stream) isClosed() bool { return atomic.LoadUint32(&s.closed) == 1 }
|
||||||
|
|
||||||
func (s *Stream) writeFrame(frame *Frame) {
|
func (s *Stream) writeFrame(frame Frame) {
|
||||||
// TODO: refactor this through an interface
|
// TODO: Return error
|
||||||
if s.session.Unordered {
|
s.recvBuf.Write(frame)
|
||||||
s.buf.Write(frame.Payload)
|
|
||||||
} else {
|
|
||||||
s.sorter.writeNewFrame(frame)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read implements io.Read
|
// Read implements io.Read
|
||||||
@ -92,15 +85,15 @@ func (s *Stream) Read(buf []byte) (n int, err error) {
|
|||||||
|
|
||||||
if s.isClosed() {
|
if s.isClosed() {
|
||||||
// TODO: Len check may not be necessary as this can be offloaded to buffer implementation
|
// TODO: Len check may not be necessary as this can be offloaded to buffer implementation
|
||||||
if s.buf.Len() == 0 {
|
if s.recvBuf.Len() == 0 {
|
||||||
return 0, ErrBrokenStream
|
return 0, ErrBrokenStream
|
||||||
} else {
|
} else {
|
||||||
n, err = s.buf.Read(buf)
|
n, err = s.recvBuf.Read(buf)
|
||||||
log.Tracef("%v read from stream %v with err %v", n, s.id, err)
|
log.Tracef("%v read from stream %v with err %v", n, s.id, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
n, err = s.buf.Read(buf)
|
n, err = s.recvBuf.Read(buf)
|
||||||
log.Tracef("%v read from stream %v with err %v", n, s.id, err)
|
log.Tracef("%v read from stream %v with err %v", n, s.id, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -142,9 +135,9 @@ func (s *Stream) Write(in []byte) (n int, err error) {
|
|||||||
|
|
||||||
// the necessary steps to mark the stream as closed and to release resources
|
// the necessary steps to mark the stream as closed and to release resources
|
||||||
func (s *Stream) _close() {
|
func (s *Stream) _close() {
|
||||||
|
// TODO: return err here
|
||||||
atomic.StoreUint32(&s.closed, 1)
|
atomic.StoreUint32(&s.closed, 1)
|
||||||
s.sorter.Close() // this will trigger frameSorter to return
|
s.recvBuf.Close()
|
||||||
s.buf.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// only close locally. Used when the stream close is notified by the remote
|
// only close locally. Used when the stream close is notified by the remote
|
||||||
|
144
internal/multiplex/streamBuffer.go
Normal file
144
internal/multiplex/streamBuffer.go
Normal file
@ -0,0 +1,144 @@
|
|||||||
|
package multiplex
|
||||||
|
|
||||||
|
// The data is multiplexed through several TCP connections, therefore the
|
||||||
|
// order of arrival is not guaranteed. A stream's first packet may be sent through
|
||||||
|
// connection0 and its second packet may be sent through connection1. Although both
|
||||||
|
// packets are transmitted reliably (as TCP is reliable), packet1 may arrive to the
|
||||||
|
// remote side before packet0. Cloak have to therefore sequence the packets so that they
|
||||||
|
// arrive in order as they were sent by the proxy software
|
||||||
|
//
|
||||||
|
// Cloak packets will have a 32-bit sequence number on them, so we know in which order
|
||||||
|
// they should be sent to the proxy software. The code in this file provides buffering and sorting.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"container/heap"
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type frameNode struct {
|
||||||
|
trueSeq uint64
|
||||||
|
frame Frame
|
||||||
|
}
|
||||||
|
type sorterHeap []*frameNode
|
||||||
|
|
||||||
|
func (sh sorterHeap) Less(i, j int) bool {
|
||||||
|
return sh[i].trueSeq < sh[j].trueSeq
|
||||||
|
}
|
||||||
|
func (sh sorterHeap) Len() int {
|
||||||
|
return len(sh)
|
||||||
|
}
|
||||||
|
func (sh sorterHeap) Swap(i, j int) {
|
||||||
|
sh[i], sh[j] = sh[j], sh[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sh *sorterHeap) Push(x interface{}) {
|
||||||
|
*sh = append(*sh, x.(*frameNode))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sh *sorterHeap) Pop() interface{} {
|
||||||
|
old := *sh
|
||||||
|
n := len(old)
|
||||||
|
x := old[n-1]
|
||||||
|
*sh = old[0 : n-1]
|
||||||
|
return x
|
||||||
|
}
|
||||||
|
|
||||||
|
type streamBuffer struct {
|
||||||
|
recvM sync.Mutex
|
||||||
|
|
||||||
|
nextRecvSeq uint32
|
||||||
|
rev int
|
||||||
|
sh sorterHeap
|
||||||
|
wrapMode bool
|
||||||
|
|
||||||
|
buf *bufferedPipe
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStreamBuffer() *streamBuffer {
|
||||||
|
sb := &streamBuffer{
|
||||||
|
sh: []*frameNode{},
|
||||||
|
rev: 0,
|
||||||
|
buf: NewBufferedPipe(),
|
||||||
|
}
|
||||||
|
return sb
|
||||||
|
}
|
||||||
|
|
||||||
|
var ClosingFrameReceived = errors.New("closed by closing frame")
|
||||||
|
|
||||||
|
// recvNewFrame is a forever running loop which receives frames unordered,
|
||||||
|
// cache and order them and send them into sortedBufCh
|
||||||
|
func (sb *streamBuffer) Write(f Frame) error {
|
||||||
|
sb.recvM.Lock()
|
||||||
|
defer sb.recvM.Unlock()
|
||||||
|
// when there'fs no ooo packages in heap and we receive the next package in order
|
||||||
|
if len(sb.sh) == 0 && f.Seq == sb.nextRecvSeq {
|
||||||
|
if f.Closing == 1 {
|
||||||
|
// empty data indicates closing signal
|
||||||
|
sb.buf.Close()
|
||||||
|
return ClosingFrameReceived
|
||||||
|
} else {
|
||||||
|
sb.buf.Write(f.Payload)
|
||||||
|
sb.nextRecvSeq += 1
|
||||||
|
if sb.nextRecvSeq == 0 { // getting wrapped
|
||||||
|
sb.rev += 1
|
||||||
|
sb.wrapMode = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
node := &frameNode{
|
||||||
|
trueSeq: 0,
|
||||||
|
frame: f,
|
||||||
|
}
|
||||||
|
|
||||||
|
if f.Seq < sb.nextRecvSeq {
|
||||||
|
// For the ease of demonstration, assume seq is uint8, i.e. it wraps around after 255
|
||||||
|
// e.g. we are on rev=0 (wrap has not happened yet)
|
||||||
|
// and we get the order of recv as 253 254 0 1
|
||||||
|
// after 254, nextN should be 255, but 0 is received and 0 < 255
|
||||||
|
// now 0 should have a trueSeq of 256
|
||||||
|
if !sb.wrapMode {
|
||||||
|
// wrapMode is true when the latest seq is wrapped but nextN is not
|
||||||
|
sb.wrapMode = true
|
||||||
|
}
|
||||||
|
node.trueSeq = uint64(1<<32)*uint64(sb.rev+1) + uint64(f.Seq) + 1
|
||||||
|
// +1 because wrapped 0 should have trueSeq of 256 instead of 255
|
||||||
|
// when this bit was run on 1, the trueSeq of 1 would become 256
|
||||||
|
} else {
|
||||||
|
node.trueSeq = uint64(1<<32)*uint64(sb.rev) + uint64(f.Seq)
|
||||||
|
// when this bit was run on 255, the trueSeq of 255 would be 255
|
||||||
|
}
|
||||||
|
|
||||||
|
heap.Push(&sb.sh, node)
|
||||||
|
// Keep popping from the heap until empty or to the point that the wanted seq was not received
|
||||||
|
for len(sb.sh) > 0 && sb.sh[0].frame.Seq == sb.nextRecvSeq {
|
||||||
|
f = heap.Pop(&sb.sh).(*frameNode).frame
|
||||||
|
if f.Closing == 1 {
|
||||||
|
// empty data indicates closing signal
|
||||||
|
sb.buf.Close()
|
||||||
|
return ClosingFrameReceived
|
||||||
|
} else {
|
||||||
|
sb.buf.Write(f.Payload)
|
||||||
|
sb.nextRecvSeq += 1
|
||||||
|
if sb.nextRecvSeq == 0 { // getting wrapped
|
||||||
|
sb.rev += 1
|
||||||
|
sb.wrapMode = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sb *streamBuffer) Read(buf []byte) (int, error) {
|
||||||
|
return sb.buf.Read(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sb *streamBuffer) Close() error {
|
||||||
|
return sb.buf.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sb *streamBuffer) Len() int {
|
||||||
|
return sb.buf.Len()
|
||||||
|
}
|
@ -1,7 +1,6 @@
|
|||||||
package multiplex
|
package multiplex
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -10,31 +9,23 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
type BufferReaderWriterCloser struct {
|
|
||||||
*bytes.Buffer
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *BufferReaderWriterCloser) Close() error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func TestRecvNewFrame(t *testing.T) {
|
func TestRecvNewFrame(t *testing.T) {
|
||||||
inOrder := []uint64{5, 6, 7, 8, 9, 10, 11}
|
inOrder := []uint64{5, 6, 7, 8, 9, 10, 11}
|
||||||
outOfOrder0 := []uint64{5, 7, 8, 6, 11, 10, 9}
|
outOfOrder0 := []uint64{5, 7, 8, 6, 11, 10, 9}
|
||||||
outOfOrder1 := []uint64{1, 96, 47, 2, 29, 18, 60, 8, 74, 22, 82, 58, 44, 51, 57, 71, 90, 94, 68, 83, 61, 91, 39, 97, 85, 63, 46, 73, 54, 84, 76, 98, 93, 79, 75, 50, 67, 37, 92, 99, 42, 77, 17, 16, 38, 3, 100, 24, 31, 7, 36, 40, 86, 64, 34, 45, 12, 5, 9, 27, 21, 26, 35, 6, 65, 69, 53, 4, 48, 28, 30, 56, 32, 11, 80, 66, 25, 41, 78, 13, 88, 62, 15, 70, 49, 43, 72, 23, 10, 55, 52, 95, 14, 59, 87, 33, 19, 20, 81, 89}
|
outOfOrder1 := []uint64{1, 96, 47, 2, 29, 18, 60, 8, 74, 22, 82, 58, 44, 51, 57, 71, 90, 94, 68, 83, 61, 91, 39, 97, 85, 63, 46, 73, 54, 84, 76, 98, 93, 79, 75, 50, 67, 37, 92, 99, 42, 77, 17, 16, 38, 3, 100, 24, 31, 7, 36, 40, 86, 64, 34, 45, 12, 5, 9, 27, 21, 26, 35, 6, 65, 69, 53, 4, 48, 28, 30, 56, 32, 11, 80, 66, 25, 41, 78, 13, 88, 62, 15, 70, 49, 43, 72, 23, 10, 55, 52, 95, 14, 59, 87, 33, 19, 20, 81, 89}
|
||||||
outOfOrderWrap0 := []uint64{1<<32 - 5, 1<<32 + 3, 1 << 32, 1<<32 - 3, 1<<32 - 4, 1<<32 + 2, 1<<32 - 2, 1<<32 - 1, 1<<32 + 1}
|
outOfOrderWrap0 := []uint64{1<<32 - 5, 1<<32 + 3, 1 << 32, 1<<32 - 3, 1<<32 - 4, 1<<32 + 2, 1<<32 - 2, 1<<32 - 1, 1<<32 + 1}
|
||||||
|
|
||||||
sortedBuf := &BufferReaderWriterCloser{new(bytes.Buffer)}
|
|
||||||
test := func(set []uint64, ct *testing.T) {
|
test := func(set []uint64, ct *testing.T) {
|
||||||
fs := NewFrameSorter(sortedBuf)
|
sb := NewStreamBuffer()
|
||||||
fs.nextRecvSeq = uint32(set[0])
|
sb.nextRecvSeq = uint32(set[0])
|
||||||
for _, n := range set {
|
for _, n := range set {
|
||||||
bu64 := make([]byte, 8)
|
bu64 := make([]byte, 8)
|
||||||
binary.BigEndian.PutUint64(bu64, n)
|
binary.BigEndian.PutUint64(bu64, n)
|
||||||
frame := &Frame{
|
frame := Frame{
|
||||||
Seq: uint32(n),
|
Seq: uint32(n),
|
||||||
Payload: bu64,
|
Payload: bu64,
|
||||||
}
|
}
|
||||||
fs.writeNewFrame(frame)
|
sb.Write(frame)
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
@ -42,7 +33,7 @@ func TestRecvNewFrame(t *testing.T) {
|
|||||||
var sortedResult []uint64
|
var sortedResult []uint64
|
||||||
for x := 0; x < len(set); x++ {
|
for x := 0; x < len(set); x++ {
|
||||||
oct := make([]byte, 8)
|
oct := make([]byte, 8)
|
||||||
n, err := sortedBuf.Read(oct)
|
n, err := sb.Read(oct)
|
||||||
if n != 8 || err != nil {
|
if n != 8 || err != nil {
|
||||||
ct.Error("failed to read from sorted Buf", n, err)
|
ct.Error("failed to read from sorted Buf", n, err)
|
||||||
return
|
return
|
||||||
@ -59,7 +50,7 @@ func TestRecvNewFrame(t *testing.T) {
|
|||||||
goto fail
|
goto fail
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fs.Close()
|
sb.Close()
|
||||||
return
|
return
|
||||||
fail:
|
fail:
|
||||||
ct.Error(
|
ct.Error(
|
Loading…
Reference in New Issue
Block a user