diff --git a/internal/multiplex/session.go b/internal/multiplex/session.go index bd06299..b94677b 100644 --- a/internal/multiplex/session.go +++ b/internal/multiplex/session.go @@ -15,7 +15,8 @@ import ( const ( acceptBacklog = 1024 // TODO: will this be a signature? - defaultSendRecvBufSize = 20480 + defaultSendRecvBufSize = 20480 + defaultInactivityTimeout = 30 * time.Second ) var ErrBrokenSession = errors.New("broken session") @@ -37,11 +38,14 @@ type SessionConfig struct { // maximum size of an obfuscated frame, including headers and overhead MsgOnWireSizeLimit int - // this sets the buffer size used to send data from a Stream (Stream.obfsBuf) + // StreamSendBufferSize sets the buffer size used to send data from a Stream (Stream.obfsBuf) StreamSendBufferSize int - // this sets the buffer size used to receive data from an underlying Conn (allocated in + // ConnReceiveBufferSize sets the buffer size used to receive data from an underlying Conn (allocated in // switchboard.deplex) ConnReceiveBufferSize int + + // InactivityTimeout sets the duration a Session waits while it has no active streams before it closes itself + InactivityTimeout time.Duration } type Session struct { @@ -95,11 +99,14 @@ func MakeSession(id uint32, config SessionConfig) *Session { if config.MsgOnWireSizeLimit <= 0 { sesh.MsgOnWireSizeLimit = defaultSendRecvBufSize - 1024 } + if config.InactivityTimeout == 0 { + sesh.InactivityTimeout = defaultInactivityTimeout + } // todo: validation. this must be smaller than StreamSendBufferSize sesh.maxStreamUnitWrite = sesh.MsgOnWireSizeLimit - HEADER_LEN - sesh.Obfuscator.maxOverhead sesh.sb = makeSwitchboard(sesh) - go sesh.timeoutAfter(30 * time.Second) + go sesh.timeoutAfter(sesh.InactivityTimeout) return sesh } @@ -187,7 +194,7 @@ func (sesh *Session) closeStream(s *Stream, active bool) error { return sesh.Close() } else { log.Debugf("session %v has no active stream left", sesh.id) - go sesh.timeoutAfter(30 * time.Second) + go sesh.timeoutAfter(sesh.InactivityTimeout) } } return nil diff --git a/internal/multiplex/session_test.go b/internal/multiplex/session_test.go index 3fa6f8f..8f4b207 100644 --- a/internal/multiplex/session_test.go +++ b/internal/multiplex/session_test.go @@ -150,10 +150,8 @@ func TestRecvDataFromRemote_Closing_InOrder(t *testing.T) { var sessionKey [32]byte rand.Read(sessionKey[:]) - obfuscator, _ := MakeObfuscator(E_METHOD_PLAIN, sessionKey) seshConfigOrdered.Obfuscator = obfuscator - sesh := MakeSession(0, seshConfigOrdered) f1 := &Frame{ @@ -281,10 +279,8 @@ func TestRecvDataFromRemote_Closing_OutOfOrder(t *testing.T) { var sessionKey [32]byte rand.Read(sessionKey[:]) - obfuscator, _ := MakeObfuscator(E_METHOD_PLAIN, sessionKey) seshConfigOrdered.Obfuscator = obfuscator - sesh := MakeSession(0, seshConfigOrdered) // receive stream 1 closing first @@ -336,11 +332,8 @@ func TestRecvDataFromRemote_Closing_OutOfOrder(t *testing.T) { } func TestParallelStreams(t *testing.T) { - rand.Seed(0) - var sessionKey [32]byte rand.Read(sessionKey[:]) - obfuscator, _ := MakeObfuscator(E_METHOD_PLAIN, sessionKey) seshConfigOrdered.Obfuscator = obfuscator sesh := MakeSession(0, seshConfigOrdered) @@ -444,6 +437,19 @@ func TestStream_SetReadDeadline(t *testing.T) { testReadDeadline(sesh) } +func TestSession_timeoutAfter(t *testing.T) { + var sessionKey [32]byte + rand.Read(sessionKey[:]) + obfuscator, _ := MakeObfuscator(E_METHOD_PLAIN, sessionKey) + seshConfigOrdered.Obfuscator = obfuscator + seshConfigOrdered.InactivityTimeout = 100 * time.Millisecond + sesh := MakeSession(0, seshConfigOrdered) + time.Sleep(200 * time.Millisecond) + if !sesh.IsClosed() { + t.Error("session should have timed out") + } +} + func BenchmarkRecvDataFromRemote_Ordered(b *testing.B) { testPayloadLen := 1024 testPayload := make([]byte, testPayloadLen)