Cloak/internal/multiplex/session.go

199 lines
4.9 KiB
Go
Raw Normal View History

2018-10-05 22:44:20 +00:00
package multiplex
import (
2018-10-23 19:47:58 +00:00
"errors"
2018-10-05 22:44:20 +00:00
"net"
"sync"
2018-10-23 19:47:58 +00:00
"sync/atomic"
"time"
2019-08-05 13:33:20 +00:00
log "github.com/sirupsen/logrus"
2018-10-05 22:44:20 +00:00
)
const (
2018-10-23 19:47:58 +00:00
acceptBacklog = 1024
2018-10-05 22:44:20 +00:00
)
2018-10-27 14:27:43 +00:00
var ErrBrokenSession = errors.New("broken session")
var errRepeatSessionClosing = errors.New("trying to close a closed session")
2019-08-02 15:37:48 +00:00
type Obfuscator struct {
// Used in Stream.Write. Add multiplexing headers, encrypt and add TLS header
Obfs Obfser
// Remove TLS header, decrypt and unmarshall frames
Deobfs Deobfser
SessionKey []byte
}
2018-10-05 22:44:20 +00:00
type Session struct {
id uint32
2018-10-05 22:44:20 +00:00
2019-08-02 15:37:48 +00:00
*Obfuscator
2018-10-05 22:44:20 +00:00
2019-08-02 15:37:48 +00:00
// This is supposed to read one TLS message, the same as GoQuiet's ReadTillDrain
unitRead func(net.Conn, []byte) (int, error)
2019-08-02 00:01:19 +00:00
2018-10-27 22:35:46 +00:00
// atomic
2018-10-23 19:47:58 +00:00
nextStreamID uint32
2018-10-05 22:44:20 +00:00
streamsM sync.Mutex
2018-10-05 22:44:20 +00:00
streams map[uint32]*Stream
// Switchboard manages all connections to remote
sb *switchboard
addrs atomic.Value
2018-10-05 22:44:20 +00:00
// For accepting new streams
acceptCh chan *Stream
2018-10-23 19:47:58 +00:00
2019-07-28 22:27:59 +00:00
closed uint32
terminalMsg atomic.Value
2018-10-05 22:44:20 +00:00
}
2019-08-02 15:37:48 +00:00
func MakeSession(id uint32, valve *Valve, obfuscator *Obfuscator, unitReader func(net.Conn, []byte) (int, error)) *Session {
2018-10-05 22:44:20 +00:00
sesh := &Session{
id: id,
2019-08-02 15:37:48 +00:00
unitRead: unitReader,
2018-10-09 20:53:55 +00:00
nextStreamID: 1,
2019-08-02 15:37:48 +00:00
Obfuscator: obfuscator,
2018-10-05 22:44:20 +00:00
streams: make(map[uint32]*Stream),
acceptCh: make(chan *Stream, acceptBacklog),
}
sesh.addrs.Store([]net.Addr{nil, nil})
2018-11-07 21:16:13 +00:00
sesh.sb = makeSwitchboard(sesh, valve)
2019-06-16 13:30:35 +00:00
go sesh.timeoutAfter(30 * time.Second)
2018-10-05 22:44:20 +00:00
return sesh
}
2018-10-07 17:09:45 +00:00
func (sesh *Session) AddConnection(conn net.Conn) {
2018-10-28 21:22:38 +00:00
sesh.sb.addConn(conn)
addrs := []net.Addr{conn.LocalAddr(), conn.RemoteAddr()}
sesh.addrs.Store(addrs)
2018-10-07 17:09:45 +00:00
}
2018-10-05 22:44:20 +00:00
func (sesh *Session) OpenStream() (*Stream, error) {
2019-07-28 22:27:59 +00:00
if sesh.IsClosed() {
2018-11-07 21:16:13 +00:00
return nil, ErrBrokenSession
}
id := atomic.AddUint32(&sesh.nextStreamID, 1) - 1
// Because atomic.AddUint32 returns the value after incrementation
connId, err := sesh.sb.assignRandomConn()
if err != nil {
return nil, err
}
stream := makeStream(sesh, id, connId)
2018-10-05 22:44:20 +00:00
sesh.streamsM.Lock()
sesh.streams[id] = stream
sesh.streamsM.Unlock()
2019-08-06 10:19:47 +00:00
log.Tracef("stream %v of session %v opened", id, sesh.id)
2018-10-05 22:44:20 +00:00
return stream, nil
}
2019-07-23 10:06:49 +00:00
func (sesh *Session) Accept() (net.Conn, error) {
2019-07-28 22:27:59 +00:00
if sesh.IsClosed() {
2018-10-27 14:27:43 +00:00
return nil, ErrBrokenSession
2018-10-23 19:47:58 +00:00
}
2019-07-28 22:27:59 +00:00
stream := <-sesh.acceptCh
if stream == nil {
return nil, ErrBrokenSession
}
2019-08-06 10:19:47 +00:00
log.Tracef("stream %v of session %v accepted", stream.id, sesh.id)
2019-07-28 22:27:59 +00:00
return stream, nil
2018-10-05 22:44:20 +00:00
}
func (sesh *Session) delStream(id uint32) {
2018-10-16 20:13:19 +00:00
sesh.streamsM.Lock()
2018-10-05 22:44:20 +00:00
delete(sesh.streams, id)
if len(sesh.streams) == 0 {
2019-08-06 10:19:47 +00:00
log.Tracef("session %v has no active stream left", sesh.id)
go sesh.timeoutAfter(30 * time.Second)
}
2018-10-16 20:13:19 +00:00
sesh.streamsM.Unlock()
2018-10-05 22:44:20 +00:00
}
2019-08-05 13:33:20 +00:00
func (sesh *Session) recvDataFromRemote(data []byte) {
frame, err := sesh.Deobfs(data)
if err != nil {
log.Debugf("Failed to decrypt a frame for session %v: %v", sesh.id, err)
}
2018-11-24 00:55:26 +00:00
sesh.streamsM.Lock()
2019-07-28 11:52:57 +00:00
defer sesh.streamsM.Unlock()
2019-08-05 13:33:20 +00:00
stream, existing := sesh.streams[frame.StreamID]
if existing {
stream.writeFrame(frame)
2018-11-24 00:55:26 +00:00
} else {
2019-08-05 13:33:20 +00:00
if frame.Closing == 1 {
// If the stream has been closed and the current frame is a closing frame, we do noop
return
2018-11-24 00:55:26 +00:00
} else {
// it may be tempting to use the connId from which the frame was received. However it doesn't make
// any difference because we only care to send the data from the same stream through the same
// TCP connection. The remote may use a different connection to send the same stream than the one the client
// use to send.
connId, _ := sesh.sb.assignRandomConn()
// we ignore the error here. If the switchboard is broken, it will be reflected upon stream.Write
stream = makeStream(sesh, frame.StreamID, connId)
2019-08-05 13:33:20 +00:00
sesh.streams[frame.StreamID] = stream
2018-11-24 00:55:26 +00:00
sesh.acceptCh <- stream
2019-08-05 13:33:20 +00:00
stream.writeFrame(frame)
return
2018-11-24 00:55:26 +00:00
}
}
2019-08-05 13:33:20 +00:00
2018-11-24 00:55:26 +00:00
}
func (sesh *Session) SetTerminalMsg(msg string) {
sesh.terminalMsg.Store(msg)
}
func (sesh *Session) TerminalMsg() string {
msg := sesh.terminalMsg.Load()
if msg != nil {
return msg.(string)
} else {
return ""
}
}
2018-10-23 19:47:58 +00:00
func (sesh *Session) Close() error {
2019-08-06 10:19:47 +00:00
log.Debugf("attempting to close session %v", sesh.id)
2019-07-28 22:27:59 +00:00
atomic.StoreUint32(&sesh.closed, 1)
2018-10-23 19:47:58 +00:00
sesh.streamsM.Lock()
2019-07-28 22:27:59 +00:00
sesh.acceptCh <- nil
2018-10-23 19:47:58 +00:00
for id, stream := range sesh.streams {
// If we call stream.Close() here, streamsM will result in a deadlock
// because stream.Close calls sesh.delStream, which locks the mutex.
// so we need to implement a method of stream that closes the stream without calling
// sesh.delStream
go stream.closeNoDelMap()
delete(sesh.streams, id)
}
sesh.streamsM.Unlock()
2018-12-31 11:30:39 +00:00
sesh.sb.closeAll()
2019-08-05 13:33:20 +00:00
log.Debugf("session %v closed gracefully", sesh.id)
2018-10-23 19:47:58 +00:00
return nil
}
2019-01-20 12:13:29 +00:00
2019-07-28 22:27:59 +00:00
func (sesh *Session) IsClosed() bool {
return atomic.LoadUint32(&sesh.closed) == 1
2019-01-20 12:13:29 +00:00
}
func (sesh *Session) timeoutAfter(to time.Duration) {
time.Sleep(to)
sesh.streamsM.Lock()
2019-07-28 22:27:59 +00:00
if len(sesh.streams) == 0 && !sesh.IsClosed() {
sesh.streamsM.Unlock()
sesh.SetTerminalMsg("timeout")
sesh.Close()
} else {
sesh.streamsM.Unlock()
}
}
2019-07-23 10:06:49 +00:00
func (sesh *Session) Addr() net.Addr { return sesh.addrs.Load().([]net.Addr)[0] }