From 0a6846fbfc38fa6633985c29d9d76fca568fe222 Mon Sep 17 00:00:00 2001 From: Andy Wang Date: Sun, 9 Oct 2022 22:02:00 +0200 Subject: [PATCH] Refactor singleplex handling --- README.md | 10 +++---- cmd/ck-client/ck-client.go | 11 ++++---- internal/cli_client/config.go | 21 ++++++++++----- internal/cli_client/config_test.go | 42 ++++++++++++++++++++++++++++-- internal/multiplex/session.go | 39 +++++++++++---------------- internal/multiplex/session_test.go | 4 +-- internal/multiplex/stream_test.go | 31 ++++++++++++++-------- internal/test/integration_test.go | 29 ++++++++++++--------- libcloak/client/config.go | 25 ++++++++++++------ libcloak/client/factory.go | 2 +- 10 files changed, 136 insertions(+), 78 deletions(-) diff --git a/README.md b/README.md index 48225a7..493d253 100644 --- a/README.md +++ b/README.md @@ -123,7 +123,7 @@ upstream proxy server. Zero or negative value disables it. Default is 0 (disable `UID` is your UID in base64. `Transport` can be either `direct` or `CDN`. If the server host wishes you to connect to it directly, use `direct`. If -instead a CDN is used, use `CDN`. +instead a CDN is used, use `CDN`. Defaults to `direct` `PublicKey` is the static curve25519 public key in base64, given by the server admin. @@ -131,7 +131,7 @@ instead a CDN is used, use `CDN`. server's `ProxyBook` exactly. `EncryptionMethod` is the name of the encryption algorithm you want Cloak to use. Options are `plain`, `aes-256-gcm` ( -synonymous to `aes-gcm`), `aes-128-gcm`, and `chacha20-poly1305`. Note: Cloak isn't intended to provide transport +synonymous to `aes-gcm`), `aes-128-gcm`, and `chacha20-poly1305`. Defaults to `aes-256-gcm` if empty. Note: Cloak isn't intended to provide transport security. The point of encryption is to hide fingerprints of proxy protocols and render the payload statistically random-like. **You may only leave it as `plain` if you are certain that your underlying proxy tool already provides BOTH encryption and authentication (via AEAD or similar techniques).** @@ -141,7 +141,7 @@ match `RedirAddr` in the server's configuration, a major site the censor allows, `AlternativeNames` is an array used alongside `ServerName` to shuffle between different ServerNames for every new connection. **This may conflict with `CDN` Transport mode** if the CDN provider prohibits domain fronting and rejects -the alternative domains. +the alternative domains. Default is empty. Example: @@ -165,8 +165,8 @@ requests under specific url path are forwarded. `NumConn` is the amount of underlying TCP connections you want to use. The default of 4 should be appropriate for most people. Setting it too high will hinder the performance. Setting it to 0 will disable connection multiplexing and each -TCP connection will spawn a separate short-lived session that will be closed after it is terminated. This makes it -behave like GoQuiet. This maybe useful for people with unstable connections. +TCP connection will spawn a separate short-lived session that will be closed after it is terminated. This maybe useful +for people with unstable connections. `BrowserSig` is the browser you want to **appear** to be using. It's not relevant to the browser you are actually using. Currently, `chrome`, `firefox` and `safari` are supported. diff --git a/cmd/ck-client/ck-client.go b/cmd/ck-client/ck-client.go index 19ea8b7..58ae46e 100644 --- a/cmd/ck-client/ck-client.go +++ b/cmd/ck-client/ck-client.go @@ -20,6 +20,10 @@ import ( var version string func main() { + log.SetFormatter(&log.TextFormatter{ + FullTimestamp: true, + }) + // Should be 127.0.0.1 to listen to a proxy client on this machine var localHost string // port used by proxy clients to communicate with cloak client @@ -74,9 +78,6 @@ func main() { log.Info("Starting standalone mode") } - log.SetFormatter(&log.TextFormatter{ - FullTimestamp: true, - }) lvl, err := log.ParseLevel(*verbosity) if err != nil { log.Fatal(err) @@ -194,12 +195,12 @@ func main() { return net.ListenUDP("udp", udpAddr) } - cli_client.RouteUDP(acceptor, localConfig.Timeout, remoteConfig.Singleplex, seshMaker) + cli_client.RouteUDP(acceptor, localConfig.Timeout, localConfig.Singleplex, seshMaker) } else { listener, err := net.Listen("tcp", localConfig.LocalAddr) if err != nil { log.Fatal(err) } - cli_client.RouteTCP(listener, localConfig.Timeout, remoteConfig.Singleplex, seshMaker) + cli_client.RouteTCP(listener, localConfig.Timeout, localConfig.Singleplex, seshMaker) } } diff --git a/internal/cli_client/config.go b/internal/cli_client/config.go index 0967625..4dcddcb 100644 --- a/internal/cli_client/config.go +++ b/internal/cli_client/config.go @@ -20,10 +20,11 @@ type CLIConfig struct { // LocalPort is the port to listen for incomig proxy client connections LocalPort string // jsonOptional // AlternativeNames is a list of ServerName Cloak may randomly pick from for different sessions + // Optional AlternativeNames []string // StreamTimeout is the duration, in seconds, for an incoming connection to be automatically closed after the last // piece of incoming data . - // Defaults to 300 + // Optional, Defaults to 300 StreamTimeout int } @@ -108,6 +109,7 @@ type LocalConnConfig struct { LocalAddr string Timeout time.Duration MockDomainList []string + Singleplex bool } func (raw *CLIConfig) ProcessCLIConfig(worldState common.WorldState) (local LocalConnConfig, remote client.RemoteConnConfig, auth client.AuthInfo, err error) { @@ -116,15 +118,18 @@ func (raw *CLIConfig) ProcessCLIConfig(worldState common.WorldState) (local Loca return } - var filteredAlternativeNames []string - for _, alternativeName := range raw.AlternativeNames { - if len(alternativeName) > 0 { - filteredAlternativeNames = append(filteredAlternativeNames, alternativeName) + if raw.AlternativeNames != nil && len(raw.AlternativeNames) > 0 { + var filteredAlternativeNames []string + for _, alternativeName := range raw.AlternativeNames { + if len(alternativeName) > 0 { + filteredAlternativeNames = append(filteredAlternativeNames, alternativeName) + } } + local.MockDomainList = raw.AlternativeNames + } else { + local.MockDomainList = []string{} } - raw.AlternativeNames = filteredAlternativeNames - local.MockDomainList = raw.AlternativeNames local.MockDomainList = append(local.MockDomainList, auth.MockDomain) if raw.LocalHost == "" { @@ -143,5 +148,7 @@ func (raw *CLIConfig) ProcessCLIConfig(worldState common.WorldState) (local Loca local.Timeout = time.Duration(raw.StreamTimeout) * time.Second } + local.Singleplex = raw.NumConn != nil && *raw.NumConn == 0 + return } diff --git a/internal/cli_client/config_test.go b/internal/cli_client/config_test.go index 6865e4e..0e99202 100644 --- a/internal/cli_client/config_test.go +++ b/internal/cli_client/config_test.go @@ -1,10 +1,11 @@ package cli_client import ( + "github.com/cbeuw/Cloak/internal/common" + "github.com/cbeuw/Cloak/libcloak/client" + "github.com/stretchr/testify/assert" "io/ioutil" "testing" - - "github.com/stretchr/testify/assert" ) func TestParseConfig(t *testing.T) { @@ -33,5 +34,42 @@ func TestParseConfig(t *testing.T) { _, err := ParseConfig(tmpConfig.Name()) assert.Error(t, err) }) +} + +func TestProcessCLIConfig(t *testing.T) { + config := CLIConfig{ + Config: client.Config{ + ServerName: "bbc.co.uk", + // ProxyMethod is the name of the underlying proxy you wish + // to connect to, as determined by your server. The value can + // be any string whose UTF-8 ENCODED byte length is no greater than + // 12 bytes + ProxyMethod: "ssh", + // UID is a 16-byte secret string unique to an authorised user + // The same UID can be used by the same user for multiple Cloak connections + UID: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + // PublicKey is the 32-byte public Curve25519 ECDH key of your server + PublicKey: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + // RemoteHost is the Cloak server's hostname or IP address + RemoteHost: "1.2.3.4", + }, + LocalHost: "0.0.0.0", + LocalPort: "1234", + } + + t.Run("Zero means singleplex", func(t *testing.T) { + zero := 0 + config := config + config.NumConn = &zero + local, _, _, err := config.ProcessCLIConfig(common.RealWorldState) + assert.NoError(t, err) + assert.True(t, local.Singleplex) + }) + t.Run("Empty means no singleplex", func(t *testing.T) { + config := config + local, _, _, err := config.ProcessCLIConfig(common.RealWorldState) + assert.NoError(t, err) + assert.False(t, local.Singleplex) + }) } diff --git a/internal/multiplex/session.go b/internal/multiplex/session.go index e176343..4b570f2 100644 --- a/internal/multiplex/session.go +++ b/internal/multiplex/session.go @@ -14,15 +14,13 @@ import ( ) const ( - acceptBacklog = 1024 - defaultInactivityTimeout = 30 * time.Second - defaultMaxOnWireSize = 1<<14 + 256 // https://tools.ietf.org/html/rfc8446#section-5.2 + acceptBacklog = 1024 + defaultMaxOnWireSize = 1<<14 + 256 // https://tools.ietf.org/html/rfc8446#section-5.2 ) var ErrBrokenSession = errors.New("broken session") var errRepeatSessionClosing = errors.New("trying to close a closed session") var errRepeatStreamClosing = errors.New("trying to close a closed stream") -var errNoMultiplex = errors.New("a singleplexing session can have only one stream") type SessionConfig struct { Obfuscator @@ -30,15 +28,15 @@ type SessionConfig struct { // Valve is used to limit transmission rates, and record and limit usage Valve + // Unordered determines whether stream packets' order is preserved Unordered bool - // A Singleplexing session always has just one stream - Singleplex bool - - // maximum size of an obfuscated frame, including headers and overhead + // MsgOnWireSizeLimit is maximum size of an obfuscated frame, including headers and overhead + // Optional MsgOnWireSizeLimit int // InactivityTimeout sets the duration a Session waits while it has no active streams before it closes itself + // Non-optional. 0 means the session closes immediately after the last stream is closed InactivityTimeout time.Duration } @@ -104,9 +102,6 @@ func MakeSession(id uint32, config SessionConfig) *Session { if config.MsgOnWireSizeLimit <= 0 { sesh.MsgOnWireSizeLimit = defaultMaxOnWireSize } - if config.InactivityTimeout == 0 { - sesh.InactivityTimeout = defaultInactivityTimeout - } sesh.maxStreamUnitWrite = sesh.MsgOnWireSizeLimit - frameHeaderLength - sesh.maxOverhead sesh.streamSendBufferSize = sesh.MsgOnWireSizeLimit @@ -118,7 +113,13 @@ func MakeSession(id uint32, config SessionConfig) *Session { }} sesh.sb = makeSwitchboard(sesh) - time.AfterFunc(sesh.InactivityTimeout, sesh.checkTimeout) + if sesh.InactivityTimeout > 0 { + time.AfterFunc(sesh.InactivityTimeout, sesh.checkTimeout) + } else { + // The user wants session to close immediately after the last stream closes, + // but we still give them some time to start a stream first + time.AfterFunc(10*time.Second, sesh.checkTimeout) + } return sesh } @@ -149,12 +150,6 @@ func (sesh *Session) OpenStream() (*Stream, error) { return nil, ErrBrokenSession } id := atomic.AddUint32(&sesh.nextStreamID, 1) - 1 - // Because atomic.AddUint32 returns the value after incrementation - if sesh.Singleplex && id > 1 { - // if there are more than one streams, which shouldn't happen if we are - // singleplexing - return nil, errNoMultiplex - } stream := makeStream(sesh, id) sesh.streamsM.Lock() sesh.streams[id] = stream @@ -213,12 +208,8 @@ func (sesh *Session) closeStream(s *Stream, active bool) error { sesh.streams[s.id] = nil sesh.streamsM.Unlock() if sesh.streamCountDecr() == 0 { - if sesh.Singleplex { - return sesh.Close() - } else { - log.Debugf("session %v has no active stream left", sesh.id) - time.AfterFunc(sesh.InactivityTimeout, sesh.checkTimeout) - } + log.Debugf("session %v has no active stream left", sesh.id) + time.AfterFunc(sesh.InactivityTimeout, sesh.checkTimeout) } return nil } diff --git a/internal/multiplex/session_test.go b/internal/multiplex/session_test.go index 0c8c7be..b59a79d 100644 --- a/internal/multiplex/session_test.go +++ b/internal/multiplex/session_test.go @@ -17,8 +17,8 @@ import ( ) var seshConfigs = map[string]SessionConfig{ - "ordered": {}, - "unordered": {Unordered: true}, + "ordered": {InactivityTimeout: 30 * time.Second}, + "unordered": {Unordered: true, InactivityTimeout: 30 * time.Second}, } var encryptionMethods = map[string]byte{ "plain": EncryptionMethodPlain, diff --git a/internal/multiplex/stream_test.go b/internal/multiplex/stream_test.go index 35a6684..f20e39a 100644 --- a/internal/multiplex/stream_test.go +++ b/internal/multiplex/stream_test.go @@ -80,19 +80,21 @@ func TestStream_WriteSync(t *testing.T) { // Close calls made after write MUST have a higher seq var sessionKey [32]byte rand.Read(sessionKey[:]) - clientSesh := setupSesh(false, sessionKey, EncryptionMethodPlain) - serverSesh := setupSesh(false, sessionKey, EncryptionMethodPlain) - w, r := connutil.AsyncPipe() - clientSesh.AddConnection(common.NewTLSConn(w)) - serverSesh.AddConnection(common.NewTLSConn(r)) testData := make([]byte, payloadLen) rand.Read(testData) t.Run("test single", func(t *testing.T) { + clientSesh := setupSesh(false, sessionKey, EncryptionMethodPlain) + serverSesh := setupSesh(false, sessionKey, EncryptionMethodPlain) + w, r := connutil.AsyncPipe() + clientSesh.AddConnection(common.NewTLSConn(w)) + serverSesh.AddConnection(common.NewTLSConn(r)) + go func() { - stream, _ := clientSesh.OpenStream() - stream.Write(testData) - stream.Close() + stream, err := clientSesh.OpenStream() + assert.NoError(t, err) + _, err = stream.Write(testData) + assert.NoError(t, err) }() recvBuf := make([]byte, payloadLen) @@ -104,12 +106,19 @@ func TestStream_WriteSync(t *testing.T) { }) t.Run("test multiple", func(t *testing.T) { + clientSesh := setupSesh(false, sessionKey, EncryptionMethodPlain) + serverSesh := setupSesh(false, sessionKey, EncryptionMethodPlain) + w, r := connutil.AsyncPipe() + clientSesh.AddConnection(common.NewTLSConn(w)) + serverSesh.AddConnection(common.NewTLSConn(r)) + const numStreams = 100 for i := 0; i < numStreams; i++ { go func() { - stream, _ := clientSesh.OpenStream() - stream.Write(testData) - stream.Close() + stream, err := clientSesh.OpenStream() + assert.NoError(t, err) + _, err = stream.Write(testData) + assert.NoError(t, err) }() } for i := 0; i < numStreams; i++ { diff --git a/internal/test/integration_test.go b/internal/test/integration_test.go index b1d162d..c8a2ce3 100644 --- a/internal/test/integration_test.go +++ b/internal/test/integration_test.go @@ -157,7 +157,7 @@ func (m *mockUDPDialer) Dial(network, address string) (net.Conn, error) { return net.DialUDP("udp", nil, m.raddr) } -func establishSession(rcc client.RemoteConnConfig, ai client.AuthInfo, serverState *server.State) (common.Dialer, *connutil.PipeListener, common.Dialer, net.Listener, error) { +func establishSession(rcc client.RemoteConnConfig, ai client.AuthInfo, serverState *server.State, singleplex bool) (common.Dialer, *connutil.PipeListener, common.Dialer, net.Listener, error) { // redirecting web server // ^ // | @@ -174,6 +174,9 @@ func establishSession(rcc client.RemoteConnConfig, ai client.AuthInfo, serverSta // | // whatever connection initiator (including a proper ck-client) + if singleplex && rcc.NumConn != 1 { + log.Fatal("NumConn must be 1 under singleplex") + } netToCkServerD, ckServerListener := connutil.DialerListener(10 * 1024) clientSeshMaker := func() *client.CloakClient { @@ -198,12 +201,12 @@ func establishSession(rcc client.RemoteConnConfig, ai client.AuthInfo, serverSta addrCh <- conn.LocalAddr().(*net.UDPAddr) return conn, err } - go cli_client.RouteUDP(acceptor, 300*time.Second, rcc.Singleplex, clientSeshMaker) + go cli_client.RouteUDP(acceptor, 300*time.Second, singleplex, clientSeshMaker) proxyToCkClientD = mDialer } else { var proxyToCkClientL *connutil.PipeListener proxyToCkClientD, proxyToCkClientL = connutil.DialerListener(10 * 1024) - go cli_client.RouteTCP(proxyToCkClientL, 300*time.Second, rcc.Singleplex, clientSeshMaker) + go cli_client.RouteTCP(proxyToCkClientL, 300*time.Second, singleplex, clientSeshMaker) } // set up server @@ -259,7 +262,7 @@ func TestUDP(t *testing.T) { sta := basicServerState(worldState) t.Run("simple send", func(t *testing.T) { - proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta) + proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta, false) if err != nil { t.Fatal(err) } @@ -296,7 +299,7 @@ func TestUDP(t *testing.T) { const echoMsgLen = 1024 t.Run("user echo", func(t *testing.T) { - proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta) + proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta, false) if err != nil { t.Fatal(err) } @@ -318,7 +321,7 @@ func TestTCPSingleplex(t *testing.T) { worldState := common.WorldOfTime(time.Unix(10, 0)) rcc, ai := generateClientConfigs(singleplexTCPConfig, worldState) sta := basicServerState(worldState) - proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta) + proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta, true) if err != nil { t.Fatal(err) } @@ -386,7 +389,7 @@ func TestTCPMultiplex(t *testing.T) { writeData := make([]byte, dataLen) rand.Read(writeData) t.Run(fmt.Sprintf("data length %v", dataLen), func(t *testing.T) { - proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta) + proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta, false) if err != nil { t.Fatal(err) } @@ -419,7 +422,7 @@ func TestTCPMultiplex(t *testing.T) { const echoMsgLen = 16384 t.Run("user echo", func(t *testing.T) { - proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta) + proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta, false) if err != nil { t.Fatal(err) } @@ -437,7 +440,7 @@ func TestTCPMultiplex(t *testing.T) { }) t.Run("redir echo", func(t *testing.T) { - _, _, netToCkServerD, redirFromCkServerL, err := establishSession(rcc, ai, sta) + _, _, netToCkServerD, redirFromCkServerL, err := establishSession(rcc, ai, sta, false) if err != nil { t.Fatal(err) } @@ -458,13 +461,13 @@ func TestClosingStreamsFromProxy(t *testing.T) { log.SetLevel(log.ErrorLevel) worldState := common.WorldOfTime(time.Unix(10, 0)) - for clientConfigName, clientConfig := range map[string]client.Config{"basic": basicTCPConfig, "singleplex": singleplexTCPConfig} { - clientConfig := clientConfig + for clientConfigName, clientConfig := range map[string]client.Config{"multiplex": basicTCPConfig, "singleplex": singleplexTCPConfig} { clientConfigName := clientConfigName + clientConfig := clientConfig t.Run(clientConfigName, func(t *testing.T) { rcc, ai := generateClientConfigs(clientConfig, worldState) sta := basicServerState(worldState) - proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta) + proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta, *clientConfig.NumConn == 0) if err != nil { t.Fatal(err) } @@ -538,7 +541,7 @@ func BenchmarkIntegration(b *testing.B) { for name, method := range encryptionMethods { b.Run(name, func(b *testing.B) { ai.EncryptionMethod = method - proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta) + proxyToCkClientD, proxyFromCkServerL, _, _, err := establishSession(rcc, ai, sta, false) if err != nil { b.Fatal(err) } diff --git a/libcloak/client/config.go b/libcloak/client/config.go index 34ad014..ffaf7f3 100644 --- a/libcloak/client/config.go +++ b/libcloak/client/config.go @@ -72,14 +72,18 @@ type Config struct { // RemotePort is the port Cloak server is listening to // Defaults to 443 RemotePort string + // InactivityTimeout is the number of seconds the client keeps the underlying connections to the server + // after the last proxy connection is disconnected. + // Defaults to 30. Always set to 0 under Singleplex mode (NumConn == 0) + InactivityTimeout *int } type RemoteConnConfig struct { - Singleplex bool - NumConn int - KeepAlive time.Duration - RemoteAddr string - TransportMaker func() transports.Transport + NumConn int + KeepAlive time.Duration + RemoteAddr string + TransportMaker func() transports.Transport + InactivityTimeout time.Duration } type AuthInfo = transports.AuthInfo @@ -142,15 +146,20 @@ func (raw *Config) Process(worldState common.WorldState) (remote RemoteConnConfi remotePort = raw.RemotePort } remote.RemoteAddr = net.JoinHostPort(raw.RemoteHost, remotePort) + + if raw.InactivityTimeout == nil { + remote.InactivityTimeout = 30 * time.Second + } else { + remote.InactivityTimeout = time.Duration(*raw.InactivityTimeout) * time.Second + } + if raw.NumConn == nil { remote.NumConn = 4 - remote.Singleplex = false } else if *raw.NumConn <= 0 { remote.NumConn = 1 - remote.Singleplex = true + remote.InactivityTimeout = 0 } else { remote.NumConn = *raw.NumConn - remote.Singleplex = false } // Transport and (if TLS mode), browser diff --git a/libcloak/client/factory.go b/libcloak/client/factory.go index 439eafe..f54dd56 100644 --- a/libcloak/client/factory.go +++ b/libcloak/client/factory.go @@ -65,11 +65,11 @@ func NewCloakClient(connConfig RemoteConnConfig, authInfo AuthInfo, dialer commo } seshConfig := mux.SessionConfig{ - Singleplex: connConfig.Singleplex, Obfuscator: obfuscator, Valve: nil, Unordered: authInfo.Unordered, MsgOnWireSizeLimit: appDataMaxLength, + InactivityTimeout: connConfig.InactivityTimeout, } session := mux.MakeSession(authInfo.SessionId, seshConfig)