diff --git a/internal/multiplex/datagramBuffer.go b/internal/multiplex/datagramBuffer.go index 0870dd6..96e650f 100644 --- a/internal/multiplex/datagramBuffer.go +++ b/internal/multiplex/datagramBuffer.go @@ -3,19 +3,19 @@ package multiplex import ( + "bytes" "io" "sync" "sync/atomic" "time" ) -const DATAGRAM_NUMBER_LIMIT = 1024 - // datagramBuffer is the same as bufferedPipe with the exception that it's message-oriented, // instead of byte-oriented. The integrity of datagrams written into this buffer is preserved. // it won't get chopped up into individual bytes type datagramBuffer struct { - buf [][]byte + pLens []int + buf *bytes.Buffer closed uint32 rwCond *sync.Cond rDeadline time.Time @@ -23,7 +23,6 @@ type datagramBuffer struct { func NewDatagramBuffer() *datagramBuffer { d := &datagramBuffer{ - buf: make([][]byte, 0), rwCond: sync.NewCond(&sync.Mutex{}), } return d @@ -32,8 +31,11 @@ func NewDatagramBuffer() *datagramBuffer { func (d *datagramBuffer) Read(target []byte) (int, error) { d.rwCond.L.Lock() defer d.rwCond.L.Unlock() + if d.buf == nil { + d.buf = new(bytes.Buffer) + } for { - if atomic.LoadUint32(&d.closed) == 1 && len(d.buf) == 0 { + if atomic.LoadUint32(&d.closed) == 1 && len(d.pLens) == 0 { return 0, io.EOF } @@ -45,30 +47,33 @@ func (d *datagramBuffer) Read(target []byte) (int, error) { time.AfterFunc(delta, d.rwCond.Broadcast) } - if len(d.buf) > 0 { + if len(d.pLens) > 0 { break } d.rwCond.Wait() } - data := d.buf[0] - if len(target) < len(data) { + dataLen := d.pLens[0] + if len(target) < dataLen { return 0, io.ErrShortBuffer } - d.buf = d.buf[1:] - copy(target, data) + d.pLens = d.pLens[1:] + d.buf.Read(target[:dataLen]) // err will always be nil because we have already verified that buf.Len() != 0 d.rwCond.Broadcast() - return len(data), nil + return dataLen, nil } func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) { d.rwCond.L.Lock() defer d.rwCond.L.Unlock() + if d.buf == nil { + d.buf = new(bytes.Buffer) + } for { if atomic.LoadUint32(&d.closed) == 1 { return true, io.ErrClosedPipe } - if len(d.buf) <= DATAGRAM_NUMBER_LIMIT { + if d.buf.Len() <= BUF_SIZE_LIMIT { // if d.buf gets too large, write() will panic. We don't want this to happen break } @@ -81,9 +86,9 @@ func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) { return true, nil } - data := make([]byte, len(f.Payload)) - copy(data, f.Payload) - d.buf = append(d.buf, data) + dataLen := len(f.Payload) + d.pLens = append(d.pLens, dataLen) + d.buf.Write(f.Payload) // err will always be nil d.rwCond.Broadcast() return false, nil diff --git a/internal/multiplex/datagramBuffer_test.go b/internal/multiplex/datagramBuffer_test.go index 4907866..add8e86 100644 --- a/internal/multiplex/datagramBuffer_test.go +++ b/internal/multiplex/datagramBuffer_test.go @@ -47,7 +47,7 @@ func TestDatagramBuffer_RW(t *testing.T) { "got", b2, ) } - if len(pipe.buf) != 0 { + if pipe.buf.Len() != 0 { t.Error("buf len is not 0 after finished reading") return }