From eabe113547757640d1107f52330f89d9b56888ba Mon Sep 17 00:00:00 2001 From: Andy Wang Date: Mon, 19 Aug 2019 23:23:41 +0100 Subject: [PATCH] Add Stream Timeout --- cmd/ck-client/ck-client.go | 128 ++++++++++++++++++---------------- cmd/ck-server/ck-server.go | 8 +-- example_config/ckclient.json | 3 +- example_config/ckserver.json | 3 +- internal/client/state.go | 5 +- internal/multiplex/session.go | 5 +- internal/server/state.go | 17 +++-- internal/util/util.go | 17 +++-- 8 files changed, 104 insertions(+), 82 deletions(-) diff --git a/cmd/ck-client/ck-client.go b/cmd/ck-client/ck-client.go index 5e1c91c..79d021d 100644 --- a/cmd/ck-client/ck-client.go +++ b/cmd/ck-client/ck-client.go @@ -97,82 +97,86 @@ func routeUDP(sta *client.State, adminUID []byte) { if err != nil { log.Fatal(err) } +start: localConn, err := net.ListenUDP("udp", localUDPAddr) if err != nil { log.Fatal(err) } - for { - var otherEnd atomic.Value - data := make([]byte, 10240) - i, oe, err := localConn.ReadFromUDP(data) - if err != nil { - log.Errorf("Failed to read first packet from proxy client: %v", err) - localConn.Close() - return - } - otherEnd.Store(oe) - - if sesh == nil || sesh.IsClosed() { - sesh = makeSession(sta, adminUID != nil, true) - } - log.Debugf("proxy local address %v", otherEnd.Load().(*net.UDPAddr).String()) - stream, err := sesh.OpenStream() - if err != nil { - log.Errorf("Failed to open stream: %v", err) - localConn.Close() - //localConnWrite.Close() - return - } - _, err = stream.Write(data[:i]) - if err != nil { - log.Errorf("Failed to write to stream: %v", err) - localConn.Close() - //localConnWrite.Close() - stream.Close() - return - } + var otherEnd atomic.Value + data := make([]byte, 10240) + i, oe, err := localConn.ReadFromUDP(data) + if err != nil { + log.Errorf("Failed to read first packet from proxy client: %v", err) + localConn.Close() + return + } + otherEnd.Store(oe) - // stream to proxy - go func() { - buf := make([]byte, 16380) - for { - i, err := io.ReadAtLeast(stream, buf, 1) - if err != nil { - log.Print(err) - go localConn.Close() - go stream.Close() - return - } - i, err = localConn.WriteToUDP(buf[:i], otherEnd.Load().(*net.UDPAddr)) - if err != nil { - log.Print(err) - go localConn.Close() - go stream.Close() - return - } - } - }() + if sesh == nil || sesh.IsClosed() { + sesh = makeSession(sta, adminUID != nil, true) + } + log.Debugf("proxy local address %v", otherEnd.Load().(*net.UDPAddr).String()) + stream, err := sesh.OpenStream() + if err != nil { + log.Errorf("Failed to open stream: %v", err) + localConn.Close() + //localConnWrite.Close() + return + } + _, err = stream.Write(data[:i]) + if err != nil { + log.Errorf("Failed to write to stream: %v", err) + localConn.Close() + //localConnWrite.Close() + stream.Close() + return + } - // proxy to stream + // stream to proxy + go func() { buf := make([]byte, 16380) for { - i, oe, err := localConn.ReadFromUDP(buf) + i, err := io.ReadAtLeast(stream, buf, 1) if err != nil { log.Print(err) - go localConn.Close() - go stream.Close() - return + localConn.Close() + stream.Close() + break } - otherEnd.Store(oe) - i, err = stream.Write(buf[:i]) + i, err = localConn.WriteToUDP(buf[:i], otherEnd.Load().(*net.UDPAddr)) if err != nil { log.Print(err) - go localConn.Close() - go stream.Close() - return + localConn.Close() + stream.Close() + break } } + }() + + // proxy to stream + buf := make([]byte, 16380) + if sta.Timeout != 0 { + localConn.SetReadDeadline(time.Now().Add(sta.Timeout)) + } + for { + if sta.Timeout != 0 { + localConn.SetReadDeadline(time.Now().Add(sta.Timeout)) + } + i, oe, err := localConn.ReadFromUDP(buf) + if err != nil { + localConn.Close() + stream.Close() + break + } + otherEnd.Store(oe) + i, err = stream.Write(buf[:i]) + if err != nil { + localConn.Close() + stream.Close() + break + } } + goto start } @@ -212,8 +216,8 @@ func routeTCP(sta *client.State, adminUID []byte) { stream.Close() return } - go util.Pipe(localConn, stream) - util.Pipe(stream, localConn) + go util.Pipe(localConn, stream, 0) + util.Pipe(stream, localConn, sta.Timeout) }() } diff --git a/cmd/ck-server/ck-server.go b/cmd/ck-server/ck-server.go index 2d5e55d..809ba04 100644 --- a/cmd/ck-server/ck-server.go +++ b/cmd/ck-server/ck-server.go @@ -48,8 +48,8 @@ func dispatchConnection(conn net.Conn, sta *server.State) { if err != nil { log.Error("Failed to send first packet to redirection server", err) } - go util.Pipe(webConn, conn) - go util.Pipe(conn, webConn) + go util.Pipe(webConn, conn, 0) + go util.Pipe(conn, webConn, 0) } ci, finishHandshake, err := server.PrepareConnection(data, sta, conn) @@ -177,8 +177,8 @@ func dispatchConnection(conn net.Conn, sta *server.State) { } log.Debugf("%v endpoint has been successfully connected", ci.ProxyMethod) - go util.Pipe(localConn, newStream) - go util.Pipe(newStream, localConn) + go util.Pipe(localConn, newStream, 0) + go util.Pipe(newStream, localConn, sta.Timeout) } diff --git a/example_config/ckclient.json b/example_config/ckclient.json index f1f0826..eb0fd21 100644 --- a/example_config/ckclient.json +++ b/example_config/ckclient.json @@ -5,5 +5,6 @@ "PublicKey":"IYoUzkle/T/kriE+Ufdm7AHQtIeGnBWbhhlTbmDpUUI=", "ServerName":"www.bing.com", "NumConn":4, - "BrowserSig":"chrome" + "BrowserSig": "chrome", + "StreamTimeout": 300 } diff --git a/example_config/ckserver.json b/example_config/ckserver.json index 2e88c97..3db9983 100644 --- a/example_config/ckserver.json +++ b/example_config/ckserver.json @@ -19,5 +19,6 @@ "RedirAddr": "204.79.197.200:443", "PrivateKey": "EN5aPEpNBO+vw+BtFQY2OnK9bQU7rvEj5qmnmgwEtUc=", "AdminUID": "5nneblJy6lniPJfr81LuYQ==", - "DatabasePath": "userinfo.db" + "DatabasePath": "userinfo.db", + "StreamTimeout": 300 } diff --git a/internal/client/state.go b/internal/client/state.go index d3827e0..28d87cc 100644 --- a/internal/client/state.go +++ b/internal/client/state.go @@ -20,6 +20,7 @@ type rawConfig struct { PublicKey string BrowserSig string NumConn int + StreamTimeout int } // State stores global variables @@ -41,6 +42,7 @@ type State struct { EncryptionMethod byte ServerName string NumConn int + Timeout time.Duration } func InitState(localHost, localPort, remoteHost, remotePort string, nowFunc func() time.Time) *State { @@ -73,7 +75,7 @@ func ssvToJson(ssv string) (ret []byte) { value := sp[1] // JSON doesn't like quotation marks around int // Yes this is extremely ugly but it's still better than writing a tokeniser - if key == "NumConn" || key == "Unordered" { + if key == "NumConn" || key == "Unordered" || key == "StreamTimeout" { ret = append(ret, []byte(`"`+key+`":`+value+`,`)...) } else { ret = append(ret, []byte(`"`+key+`":"`+value+`",`)...) @@ -124,6 +126,7 @@ func (sta *State) ParseConfig(conf string) (err error) { sta.ProxyMethod = preParse.ProxyMethod sta.ServerName = preParse.ServerName sta.NumConn = preParse.NumConn + sta.Timeout = time.Duration(preParse.StreamTimeout) * time.Second uid, err := base64.StdEncoding.DecodeString(preParse.UID) if err != nil { diff --git a/internal/multiplex/session.go b/internal/multiplex/session.go index ddede6a..f17189e 100644 --- a/internal/multiplex/session.go +++ b/internal/multiplex/session.go @@ -187,7 +187,10 @@ func (sesh *Session) TerminalMsg() string { func (sesh *Session) Close() error { log.Debugf("attempting to close session %v", sesh.id) - atomic.StoreUint32(&sesh.closed, 1) + if atomic.SwapUint32(&sesh.closed, 1) == 1 { + log.Debugf("session %v has already been closed", sesh.id) + return errRepeatSessionClosing + } sesh.streamsM.Lock() sesh.acceptCh <- nil for id, stream := range sesh.streams { diff --git a/internal/server/state.go b/internal/server/state.go index 318a02c..2343f24 100644 --- a/internal/server/state.go +++ b/internal/server/state.go @@ -17,13 +17,14 @@ import ( ) type rawConfig struct { - ProxyBook map[string][]string - BypassUID [][]byte - RedirAddr string - PrivateKey string - AdminUID string - DatabasePath string - CncMode bool + ProxyBook map[string][]string + BypassUID [][]byte + RedirAddr string + PrivateKey string + AdminUID string + DatabasePath string + StreamTimeout int + CncMode bool } // State type stores the global state of the program @@ -35,6 +36,7 @@ type State struct { Now func() time.Time AdminUID []byte + Timeout time.Duration BypassUID map[[16]byte]struct{} staticPv crypto.PrivateKey @@ -92,6 +94,7 @@ func (sta *State) ParseConfig(conf string) (err error) { } sta.RedirAddr = preParse.RedirAddr + sta.Timeout = time.Duration(preParse.StreamTimeout) * time.Second for name, pair := range preParse.ProxyBook { if len(pair) != 2 { diff --git a/internal/util/util.go b/internal/util/util.go index 0393775..44ca514 100644 --- a/internal/util/util.go +++ b/internal/util/util.go @@ -8,6 +8,7 @@ import ( "io" "net" "strconv" + "time" ) func AESGCMEncrypt(nonce []byte, key []byte, plaintext []byte) ([]byte, error) { @@ -86,22 +87,28 @@ func AddRecordLayer(input []byte, typ []byte, ver []byte) []byte { return ret } -func Pipe(dst io.ReadWriteCloser, src io.ReadWriteCloser) { +func Pipe(dst net.Conn, src net.Conn, srcReadTimeout time.Duration) { // The maximum size of TLS message will be 16380+12+16. 12 because of the stream header and 16 // because of the salt/mac // 16408 is the max TLS message size on Firefox buf := make([]byte, 16380) + if srcReadTimeout != 0 { + src.SetReadDeadline(time.Now().Add(srcReadTimeout)) + } for { + if srcReadTimeout != 0 { + src.SetReadDeadline(time.Now().Add(srcReadTimeout)) + } i, err := io.ReadAtLeast(src, buf, 1) if err != nil { - go dst.Close() - go src.Close() + dst.Close() + src.Close() return } i, err = dst.Write(buf[:i]) if err != nil { - go dst.Close() - go src.Close() + dst.Close() + src.Close() return } }