From 21cbe6ab5d395fb8874e22f6e352518a1a61c3a9 Mon Sep 17 00:00:00 2001 From: Andy Wang Date: Sun, 12 Apr 2020 11:33:11 +0100 Subject: [PATCH] Implement WriterTo for receive buffers --- internal/multiplex/bufferedPipe.go | 42 ++++++++++++++++++++++---- internal/multiplex/datagramBuffer.go | 45 +++++++++++++++++++++++++--- internal/multiplex/recvBuffer.go | 1 + 3 files changed, 79 insertions(+), 9 deletions(-) diff --git a/internal/multiplex/bufferedPipe.go b/internal/multiplex/bufferedPipe.go index 6ab9f49..4d2c4eb 100644 --- a/internal/multiplex/bufferedPipe.go +++ b/internal/multiplex/bufferedPipe.go @@ -7,7 +7,6 @@ import ( "errors" "io" "sync" - "sync/atomic" "time" ) @@ -20,7 +19,7 @@ type bufferedPipe struct { // only alloc when on first Read or Write buf *bytes.Buffer - closed uint32 + closed bool rwCond *sync.Cond rDeadline time.Time } @@ -39,7 +38,7 @@ func (p *bufferedPipe) Read(target []byte) (int, error) { p.buf = new(bytes.Buffer) } for { - if atomic.LoadUint32(&p.closed) == 1 && p.buf.Len() == 0 { + if p.closed && p.buf.Len() == 0 { return 0, io.EOF } if !p.rDeadline.IsZero() { @@ -60,6 +59,36 @@ func (p *bufferedPipe) Read(target []byte) (int, error) { return n, err } +func (p *bufferedPipe) WriteTo(w io.Writer) (n int64, err error) { + p.rwCond.L.Lock() + defer p.rwCond.L.Unlock() + if p.buf == nil { + p.buf = new(bytes.Buffer) + } + for { + if p.closed && p.buf.Len() == 0 { + return 0, io.EOF + } + if !p.rDeadline.IsZero() { + d := time.Until(p.rDeadline) + if d <= 0 { + return 0, ErrTimeout + } + time.AfterFunc(d, p.rwCond.Broadcast) + } + if p.buf.Len() > 0 { + written, er := p.buf.WriteTo(w) + n += written + if er != nil { + p.rwCond.Broadcast() + return n, er + } + p.rwCond.Broadcast() + } + p.rwCond.Wait() + } +} + func (p *bufferedPipe) Write(input []byte) (int, error) { p.rwCond.L.Lock() defer p.rwCond.L.Unlock() @@ -67,7 +96,7 @@ func (p *bufferedPipe) Write(input []byte) (int, error) { p.buf = new(bytes.Buffer) } for { - if atomic.LoadUint32(&p.closed) == 1 { + if p.closed { return 0, io.ErrClosedPipe } if p.buf.Len() <= BUF_SIZE_LIMIT { @@ -83,7 +112,10 @@ func (p *bufferedPipe) Write(input []byte) (int, error) { } func (p *bufferedPipe) Close() error { - atomic.StoreUint32(&p.closed, 1) + p.rwCond.L.Lock() + defer p.rwCond.L.Unlock() + + p.closed = true p.rwCond.Broadcast() return nil } diff --git a/internal/multiplex/datagramBuffer.go b/internal/multiplex/datagramBuffer.go index 96e650f..d7664e0 100644 --- a/internal/multiplex/datagramBuffer.go +++ b/internal/multiplex/datagramBuffer.go @@ -16,7 +16,7 @@ import ( type datagramBuffer struct { pLens []int buf *bytes.Buffer - closed uint32 + closed bool rwCond *sync.Cond rDeadline time.Time } @@ -63,6 +63,40 @@ func (d *datagramBuffer) Read(target []byte) (int, error) { return dataLen, nil } +func (d *datagramBuffer) WriteTo(w io.Writer) (n int64, err error) { + d.rwCond.L.Lock() + defer d.rwCond.L.Unlock() + if d.buf == nil { + d.buf = new(bytes.Buffer) + } + for { + if d.closed && len(d.pLens) == 0 { + return 0, io.EOF + } + + if !d.rDeadline.IsZero() { + delta := time.Until(d.rDeadline) + if delta <= 0 { + return 0, ErrTimeout + } + time.AfterFunc(delta, d.rwCond.Broadcast) + } + + if len(d.pLens) > 0 { + var dataLen int + dataLen, d.pLens = d.pLens[0], d.pLens[1:] + written, er := w.Write(d.buf.Next(dataLen)) + n += int64(written) + if er != nil { + d.rwCond.Broadcast() + return n, er + } + d.rwCond.Broadcast() + } + d.rwCond.Wait() + } +} + func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) { d.rwCond.L.Lock() defer d.rwCond.L.Unlock() @@ -70,7 +104,7 @@ func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) { d.buf = new(bytes.Buffer) } for { - if atomic.LoadUint32(&d.closed) == 1 { + if d.closed { return true, io.ErrClosedPipe } if d.buf.Len() <= BUF_SIZE_LIMIT { @@ -81,7 +115,7 @@ func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) { } if f.Closing != C_NOOP { - atomic.StoreUint32(&d.closed, 1) + d.closed = true d.rwCond.Broadcast() return true, nil } @@ -95,7 +129,10 @@ func (d *datagramBuffer) Write(f Frame) (toBeClosed bool, err error) { } func (d *datagramBuffer) Close() error { - atomic.StoreUint32(&d.closed, 1) + d.rwCond.L.Lock() + defer d.rwCond.L.Unlock() + + d.closed = true d.rwCond.Broadcast() return nil } diff --git a/internal/multiplex/recvBuffer.go b/internal/multiplex/recvBuffer.go index 0a2cbcf..9697867 100644 --- a/internal/multiplex/recvBuffer.go +++ b/internal/multiplex/recvBuffer.go @@ -8,6 +8,7 @@ import ( type recvBuffer interface { // Read calls' err must be nil | io.EOF | io.ErrShortBuffer io.ReadCloser + io.WriterTo Write(Frame) (toBeClosed bool, err error) SetReadDeadline(time time.Time) }