transports/meeklite: Cleanups, bugfixes and improvements

* Properly close the response body on HTTP error.
 * Cleanup close signaling.
 * Write() should return faster on closed connections.
merge-requests/3/head
Yawning Angel 5 years ago
parent f8bf80479f
commit 816cff15f4

@ -39,6 +39,7 @@ import (
"net"
"net/http"
gourl "net/url"
"os"
"runtime"
"sync"
"time"
@ -107,16 +108,14 @@ func newClientArgs(args *pt.Args) (ca *meekClientArgs, err error) {
}
type meekConn struct {
sync.Mutex
args *meekClientArgs
sessionID string
transport *http.Transport
workerRunning bool
closeOnce sync.Once
workerWrChan chan []byte
workerRdChan chan []byte
workerCloseChan chan bool
workerCloseChan chan struct{}
rdBuf *bytes.Buffer
}
@ -154,11 +153,10 @@ func (c *meekConn) Read(p []byte) (n int, err error) {
func (c *meekConn) Write(b []byte) (n int, err error) {
// Check to see if the connection is actually open.
c.Lock()
closed := !c.workerRunning
c.Unlock()
if closed {
select {
case <-c.workerCloseChan:
return 0, io.ErrClosedPipe
default:
}
if len(b) == 0 {
@ -168,9 +166,7 @@ func (c *meekConn) Write(b []byte) (n int, err error) {
// Copy the data to be written to a new slice, since
// we return immediately after queuing and the peer can
// happily reuse `b` before data has been sent.
toWrite := len(b)
b2 := make([]byte, toWrite)
copy(b2, b)
b2 := append([]byte{}, b...)
if ok := c.enqueueWrite(b2); !ok {
// Technically we did enqueue data, but the worker's
// got closed out from under us.
@ -181,18 +177,15 @@ func (c *meekConn) Write(b []byte) (n int, err error) {
}
func (c *meekConn) Close() error {
// Ensure that we do this once and only once.
c.Lock()
defer c.Unlock()
if !c.workerRunning {
return nil
}
err := os.ErrClosed
// Tear down the worker.
c.workerRunning = false
c.workerCloseChan <- true
c.closeOnce.Do(func() {
// Tear down the worker, if it is still running.
close(c.workerCloseChan)
err = nil
})
return nil
return err
}
func (c *meekConn) LocalAddr() net.Addr {
@ -216,7 +209,11 @@ func (c *meekConn) SetWriteDeadline(t time.Time) error {
}
func (c *meekConn) enqueueWrite(b []byte) (ok bool) {
defer func() { _ = recover() }()
defer func() {
if err := recover(); err != nil {
ok = false
}
}()
c.workerWrChan <- b
return true
}
@ -249,14 +246,16 @@ func (c *meekConn) roundTrip(sndBuf []byte) (recvBuf []byte, err error) {
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("status code was %d, not %d", resp.StatusCode, http.StatusOK)
time.Sleep(retryDelay)
} else {
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
recvBuf, err = ioutil.ReadAll(io.LimitReader(resp.Body, maxPayloadLength))
resp.Body.Close()
return
}
resp.Body.Close()
err = fmt.Errorf("status code was %d, not %d", resp.StatusCode, http.StatusOK)
time.Sleep(retryDelay)
}
return
}
@ -264,8 +263,8 @@ func (c *meekConn) roundTrip(sndBuf []byte) (recvBuf []byte, err error) {
func (c *meekConn) ioWorker() {
interval := initPollInterval
var sndBuf, leftBuf []byte
loop:
loop:
for {
sndBuf = nil
select {
@ -316,7 +315,7 @@ loop:
// Sent data, poll immediately.
interval = 0
} else if interval == 0 {
// Neither sent nor received data, initialize the delay.
// Neither sent nor received data after a poll, re-initialize the delay.
interval = initPollInterval
} else {
// Apply a multiplicative backoff.
@ -334,11 +333,8 @@ loop:
close(c.workerRdChan)
close(c.workerWrChan)
// In case the close was done on an error condition, update the state
// variable so that further calls to Write() will fail.
c.Lock()
defer c.Unlock()
c.workerRunning = false
// Close the connection (extra calls to Close() are harmless).
_ = c.Close()
}
func newMeekConn(network, addr string, dialFn base.DialFunc, ca *meekClientArgs) (net.Conn, error) {
@ -347,15 +343,13 @@ func newMeekConn(network, addr string, dialFn base.DialFunc, ca *meekClientArgs)
return nil, err
}
tr := &http.Transport{Dial: dialFn}
conn := &meekConn{
args: ca,
sessionID: id,
transport: tr,
workerRunning: true,
transport: &http.Transport{Dial: dialFn},
workerWrChan: make(chan []byte, maxChanBacklog),
workerRdChan: make(chan []byte, maxChanBacklog),
workerCloseChan: make(chan bool),
workerCloseChan: make(chan struct{}),
}
// Start the I/O worker.

Loading…
Cancel
Save