Control flow optimisations

pull/158/head
Andy Wang 3 years ago
parent 3ad04aa7e9
commit cbd71fae6d
No known key found for this signature in database
GPG Key ID: 181B49F9F38F3374

@ -13,8 +13,7 @@ import (
// instead of byte-oriented. The integrity of datagrams written into this buffer is preserved.
// it won't get chopped up into individual bytes
type datagramBufferedPipe struct {
pLens []int
// lazily allocated
pLens []int
buf *bytes.Buffer
closed bool
rwCond *sync.Cond
@ -27,6 +26,7 @@ type datagramBufferedPipe struct {
func NewDatagramBufferedPipe() *datagramBufferedPipe {
d := &datagramBufferedPipe{
rwCond: sync.NewCond(&sync.Mutex{}),
buf: new(bytes.Buffer),
}
return d
}
@ -34,9 +34,6 @@ func NewDatagramBufferedPipe() *datagramBufferedPipe {
func (d *datagramBufferedPipe) Read(target []byte) (int, error) {
d.rwCond.L.Lock()
defer d.rwCond.L.Unlock()
if d.buf == nil {
d.buf = new(bytes.Buffer)
}
for {
if d.closed && len(d.pLens) == 0 {
return 0, io.EOF
@ -72,9 +69,6 @@ func (d *datagramBufferedPipe) Read(target []byte) (int, error) {
func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
d.rwCond.L.Lock()
defer d.rwCond.L.Unlock()
if d.buf == nil {
d.buf = new(bytes.Buffer)
}
for {
if d.closed && len(d.pLens) == 0 {
return 0, io.EOF
@ -115,9 +109,6 @@ func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
func (d *datagramBufferedPipe) Write(f *Frame) (toBeClosed bool, err error) {
d.rwCond.L.Lock()
defer d.rwCond.L.Unlock()
if d.buf == nil {
d.buf = new(bytes.Buffer)
}
for {
if d.closed {
return true, io.ErrClosedPipe

@ -11,7 +11,6 @@ import (
// The point of a streamBufferedPipe is that Read() will block until data is available
type streamBufferedPipe struct {
// only alloc when on first Read or Write
buf *bytes.Buffer
closed bool
@ -25,6 +24,7 @@ type streamBufferedPipe struct {
func NewStreamBufferedPipe() *streamBufferedPipe {
p := &streamBufferedPipe{
rwCond: sync.NewCond(&sync.Mutex{}),
buf: new(bytes.Buffer),
}
return p
}
@ -32,9 +32,6 @@ func NewStreamBufferedPipe() *streamBufferedPipe {
func (p *streamBufferedPipe) Read(target []byte) (int, error) {
p.rwCond.L.Lock()
defer p.rwCond.L.Unlock()
if p.buf == nil {
p.buf = new(bytes.Buffer)
}
for {
if p.closed && p.buf.Len() == 0 {
return 0, io.EOF
@ -64,9 +61,6 @@ func (p *streamBufferedPipe) Read(target []byte) (int, error) {
func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
p.rwCond.L.Lock()
defer p.rwCond.L.Unlock()
if p.buf == nil {
p.buf = new(bytes.Buffer)
}
for {
if p.closed && p.buf.Len() == 0 {
return 0, io.EOF
@ -104,9 +98,6 @@ func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
func (p *streamBufferedPipe) Write(input []byte) (int, error) {
p.rwCond.L.Lock()
defer p.rwCond.L.Unlock()
if p.buf == nil {
p.buf = new(bytes.Buffer)
}
for {
if p.closed {
return 0, io.ErrClosedPipe

@ -72,46 +72,43 @@ func (sb *switchboard) addConn(conn net.Conn) {
// a pointer to connId is passed here so that the switchboard can reassign it if that connId isn't usable
func (sb *switchboard) send(data []byte, connId *uint32) (n int, err error) {
writeAndRegUsage := func(conn net.Conn, d []byte) (int, error) {
n, err = conn.Write(d)
if err != nil {
sb.conns.Delete(*connId)
sb.session.SetTerminalMsg("failed to write to remote " + err.Error())
sb.session.passiveClose()
return n, err
}
sb.valve.AddTx(int64(n))
return n, nil
}
sb.valve.txWait(len(data))
if atomic.LoadUint32(&sb.broken) == 1 || sb.connsCount() == 0 {
return 0, errBrokenSwitchboard
}
var conn net.Conn
switch sb.strategy {
case UNIFORM_SPREAD:
_, conn, err := sb.pickRandConn()
_, conn, err = sb.pickRandConn()
if err != nil {
return 0, errBrokenSwitchboard
}
return writeAndRegUsage(conn, data)
case FIXED_CONN_MAPPING:
connI, ok := sb.conns.Load(*connId)
if ok {
conn := connI.(net.Conn)
return writeAndRegUsage(conn, data)
conn = connI.(net.Conn)
} else {
newConnId, conn, err := sb.pickRandConn()
var newConnId uint32
newConnId, conn, err = sb.pickRandConn()
if err != nil {
return 0, errBrokenSwitchboard
}
*connId = newConnId
return writeAndRegUsage(conn, data)
}
default:
return 0, errors.New("unsupported traffic distribution strategy")
}
n, err = conn.Write(data)
if err != nil {
sb.conns.Delete(*connId)
sb.session.SetTerminalMsg("failed to write to remote " + err.Error())
sb.session.passiveClose()
return n, err
}
sb.valve.AddTx(int64(n))
return n, nil
}
// returns a random connId

Loading…
Cancel
Save