Implement WriterTo for receive buffers

This commit is contained in:
Andy Wang 2020-04-12 11:33:11 +01:00
parent 029da207ce
commit 21cbe6ab5d
3 changed files with 79 additions and 9 deletions

View File

@ -7,7 +7,6 @@ import (
"errors" "errors"
"io" "io"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
@ -20,7 +19,7 @@ type bufferedPipe struct {
// only alloc when on first Read or Write // only alloc when on first Read or Write
buf *bytes.Buffer buf *bytes.Buffer
closed uint32 closed bool
rwCond *sync.Cond rwCond *sync.Cond
rDeadline time.Time rDeadline time.Time
} }
@ -39,7 +38,7 @@ func (p *bufferedPipe) Read(target []byte) (int, error) {
p.buf = new(bytes.Buffer) p.buf = new(bytes.Buffer)
} }
for { for {
if atomic.LoadUint32(&p.closed) == 1 && p.buf.Len() == 0 { if p.closed && p.buf.Len() == 0 {
return 0, io.EOF return 0, io.EOF
} }
if !p.rDeadline.IsZero() { if !p.rDeadline.IsZero() {
@ -60,6 +59,36 @@ func (p *bufferedPipe) Read(target []byte) (int, error) {
return n, err return n, err
} }
func (p *bufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
p.rwCond.L.Lock()
defer p.rwCond.L.Unlock()
if p.buf == nil {
p.buf = new(bytes.Buffer)
}
for {
if p.closed && p.buf.Len() == 0 {
return 0, io.EOF
}
if !p.rDeadline.IsZero() {
d := time.Until(p.rDeadline)
if d <= 0 {
return 0, ErrTimeout
}
time.AfterFunc(d, p.rwCond.Broadcast)
}
if p.buf.Len() > 0 {
written, er := p.buf.WriteTo(w)
n += written
if er != nil {
p.rwCond.Broadcast()
return n, er
}
p.rwCond.Broadcast()
}
p.rwCond.Wait()
}
}
func (p *bufferedPipe) Write(input []byte) (int, error) { func (p *bufferedPipe) Write(input []byte) (int, error) {
p.rwCond.L.Lock() p.rwCond.L.Lock()
defer p.rwCond.L.Unlock() defer p.rwCond.L.Unlock()
@ -67,7 +96,7 @@ func (p *bufferedPipe) Write(input []byte) (int, error) {
p.buf = new(bytes.Buffer) p.buf = new(bytes.Buffer)
} }
for { for {
if atomic.LoadUint32(&p.closed) == 1 { if p.closed {
return 0, io.ErrClosedPipe return 0, io.ErrClosedPipe
} }
if p.buf.Len() <= BUF_SIZE_LIMIT { if p.buf.Len() <= BUF_SIZE_LIMIT {
@ -83,7 +112,10 @@ func (p *bufferedPipe) Write(input []byte) (int, error) {
} }
func (p *bufferedPipe) Close() error { func (p *bufferedPipe) Close() error {
atomic.StoreUint32(&p.closed, 1) p.rwCond.L.Lock()
defer p.rwCond.L.Unlock()
p.closed = true
p.rwCond.Broadcast() p.rwCond.Broadcast()
return nil return nil
} }

View File

@ -16,7 +16,7 @@ import (
type datagramBuffer struct { type datagramBuffer struct {
pLens []int pLens []int
buf *bytes.Buffer buf *bytes.Buffer
closed uint32 closed bool
rwCond *sync.Cond rwCond *sync.Cond
rDeadline time.Time rDeadline time.Time
} }
@ -63,6 +63,40 @@ func (d *datagramBuffer) Read(target []byte) (int, error) {
return dataLen, nil return dataLen, nil
} }
func (d *datagramBuffer) WriteTo(w io.Writer) (n int64, err error) {
d.rwCond.L.Lock()
defer d.rwCond.L.Unlock()
if d.buf == nil {
d.buf = new(bytes.Buffer)
}
for {
if d.closed && len(d.pLens) == 0 {
return 0, io.EOF
}
if !d.rDeadline.IsZero() {
delta := time.Until(d.rDeadline)
if delta <= 0 {
return 0, ErrTimeout
}
time.AfterFunc(delta, d.rwCond.Broadcast)
}
if len(d.pLens) > 0 {
var dataLen int
dataLen, d.pLens = d.pLens[0], d.pLens[1:]
written, er := w.Write(d.buf.Next(dataLen))
n += int64(written)
if er != nil {
d.rwCond.Broadcast()
return n, er
}
d.rwCond.Broadcast()
}
d.rwCond.Wait()
}
}
func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) { func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) {
d.rwCond.L.Lock() d.rwCond.L.Lock()
defer d.rwCond.L.Unlock() defer d.rwCond.L.Unlock()
@ -70,7 +104,7 @@ func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) {
d.buf = new(bytes.Buffer) d.buf = new(bytes.Buffer)
} }
for { for {
if atomic.LoadUint32(&d.closed) == 1 { if d.closed {
return true, io.ErrClosedPipe return true, io.ErrClosedPipe
} }
if d.buf.Len() <= BUF_SIZE_LIMIT { if d.buf.Len() <= BUF_SIZE_LIMIT {
@ -81,7 +115,7 @@ func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) {
} }
if f.Closing != C_NOOP { if f.Closing != C_NOOP {
atomic.StoreUint32(&d.closed, 1) d.closed = true
d.rwCond.Broadcast() d.rwCond.Broadcast()
return true, nil return true, nil
} }
@ -95,7 +129,10 @@ func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) {
} }
func (d *datagramBuffer) Close() error { func (d *datagramBuffer) Close() error {
atomic.StoreUint32(&d.closed, 1) d.rwCond.L.Lock()
defer d.rwCond.L.Unlock()
d.closed = true
d.rwCond.Broadcast() d.rwCond.Broadcast()
return nil return nil
} }

View File

@ -8,6 +8,7 @@ import (
type recvBuffer interface { type recvBuffer interface {
// Read calls' err must be nil | io.EOF | io.ErrShortBuffer // Read calls' err must be nil | io.EOF | io.ErrShortBuffer
io.ReadCloser io.ReadCloser
io.WriterTo
Write(Frame) (toBeClosed bool, err error) Write(Frame) (toBeClosed bool, err error)
SetReadDeadline(time time.Time) SetReadDeadline(time time.Time)
} }