meek-lite: combine small writes at request dispatch time.

This dramatically improves bulk upload performance, from totally shit
to just shit.
merge-requests/3/head
Yawning Angel 9 years ago
parent 611205be68
commit 43cdc20e7e

@ -161,33 +161,22 @@ func (c *meekConn) Write(b []byte) (n int, err error) {
return 0, io.ErrClosedPipe
}
if len(b) > 0 {
// Copy the data to be written to a new slice, since
// we return immediately after queuing and the peer can
// happily reuse `b` before data has been sent.
toWrite := len(b)
b2 := make([]byte, toWrite)
copy(b2, b)
offset := 0
for toWrite > 0 {
// Chunk up the writes to keep them under the maximum
// payload length.
sz := toWrite
if sz > maxPayloadLength {
sz = maxPayloadLength
}
if len(b) == 0 {
return 0, nil
}
// Enqueue a properly sized subslice of our copy.
if ok := c.enqueueWrite(b2[offset : offset+sz]); !ok {
// Technically we did enqueue data, but the worker's
// got closed out from under us.
return 0, io.ErrClosedPipe
}
toWrite -= sz
offset += sz
runtime.Gosched()
}
// Copy the data to be written to a new slice, since
// we return immediately after queuing and the peer can
// happily reuse `b` before data has been sent.
toWrite := len(b)
b2 := make([]byte, toWrite)
copy(b2, b)
if ok := c.enqueueWrite(b2); !ok {
// Technically we did enqueue data, but the worker's
// got closed out from under us.
return 0, io.ErrClosedPipe
}
runtime.Gosched()
return len(b), nil
}
@ -269,9 +258,11 @@ func (c *meekConn) roundTrip(sndBuf []byte) (recvBuf []byte, err error) {
func (c *meekConn) ioWorker() {
interval := initPollInterval
var sndBuf, leftBuf []byte
loop:
for {
var sndBuf []byte
sndBuf = nil
select {
case <-time.After(interval):
// If the poll interval has elapsed, issue a request.
@ -281,19 +272,42 @@ loop:
break loop
}
// Combine short writes as long as data is available to be
// sent immediately and it will not put us over the max
// payload limit. Any excess data is stored and dispatched
// as the next request).
sndBuf = append(leftBuf, sndBuf...)
wrSz := len(sndBuf)
for len(c.workerWrChan) > 0 && wrSz < maxPayloadLength {
b := <-c.workerWrChan
sndBuf = append(sndBuf, b...)
wrSz = len(sndBuf)
}
if wrSz > maxPayloadLength {
wrSz = maxPayloadLength
}
// Issue a request.
rdBuf, err := c.roundTrip(sndBuf)
rdBuf, err := c.roundTrip(sndBuf[:wrSz])
if err != nil {
// Welp, something went horrifically wrong.
break loop
}
// Stash the remaining payload if any.
leftBuf = sndBuf[wrSz:] // Store the remaining data
if len(leftBuf) == 0 {
leftBuf = nil
}
// Determine the next poll interval.
if len(rdBuf) > 0 {
// Received data, enqueue the read.
c.workerRdChan <- rdBuf
// And poll immediately.
interval = 0
} else if sndBuf != nil {
} else if wrSz > 0 {
// Sent data, poll immediately.
interval = 0
} else if interval == 0 {

Loading…
Cancel
Save