Cloak/internal/multiplex/session.go

311 lines
7.3 KiB
Go
Raw Normal View History

2018-10-05 22:44:20 +00:00
package multiplex
import (
"crypto/rand"
2018-10-23 19:47:58 +00:00
"errors"
2019-08-16 22:47:15 +00:00
"fmt"
2018-10-05 22:44:20 +00:00
"net"
"sync"
2018-10-23 19:47:58 +00:00
"sync/atomic"
"time"
2019-08-05 13:33:20 +00:00
log "github.com/sirupsen/logrus"
2018-10-05 22:44:20 +00:00
)
const (
2018-10-23 19:47:58 +00:00
acceptBacklog = 1024
2018-10-05 22:44:20 +00:00
)
2018-10-27 14:27:43 +00:00
var ErrBrokenSession = errors.New("broken session")
var errRepeatSessionClosing = errors.New("trying to close a closed session")
2019-08-20 21:43:04 +00:00
// Obfuscator is responsible for the obfuscation and deobfuscation of frames
2019-08-02 15:37:48 +00:00
type Obfuscator struct {
// Used in Stream.Write. Add multiplexing headers, encrypt and add TLS header
Obfs Obfser
// Remove TLS header, decrypt and unmarshall frames
Deobfs Deobfser
SessionKey []byte
}
2019-08-12 21:43:16 +00:00
type switchboardStrategy int
2019-08-11 23:22:15 +00:00
type SessionConfig struct {
2019-09-01 19:23:45 +00:00
NoRecordLayer bool
2019-08-02 15:37:48 +00:00
*Obfuscator
2018-10-05 22:44:20 +00:00
2019-08-11 23:22:15 +00:00
Valve
2019-08-20 21:43:04 +00:00
// This is supposed to read one TLS message.
2019-08-11 23:22:15 +00:00
UnitRead func(net.Conn, []byte) (int, error)
Unordered bool
2019-08-11 23:22:15 +00:00
}
type Session struct {
id uint32
*SessionConfig
2019-08-02 00:01:19 +00:00
2018-10-27 22:35:46 +00:00
// atomic
2018-10-23 19:47:58 +00:00
nextStreamID uint32
2018-10-05 22:44:20 +00:00
streams sync.Map
2018-10-05 22:44:20 +00:00
// Switchboard manages all connections to remote
sb *switchboard
2019-10-08 22:11:16 +00:00
// Used for LocalAddr() and RemoteAddr() etc.
addrs atomic.Value
2018-10-05 22:44:20 +00:00
// For accepting new streams
acceptCh chan *Stream
2018-10-23 19:47:58 +00:00
2019-07-28 22:27:59 +00:00
closed uint32
terminalMsg atomic.Value
2018-10-05 22:44:20 +00:00
}
2019-08-11 23:22:15 +00:00
func MakeSession(id uint32, config *SessionConfig) *Session {
2018-10-05 22:44:20 +00:00
sesh := &Session{
2019-08-11 23:22:15 +00:00
id: id,
SessionConfig: config,
nextStreamID: 1,
acceptCh: make(chan *Stream, acceptBacklog),
2018-10-05 22:44:20 +00:00
}
sesh.addrs.Store([]net.Addr{nil, nil})
2019-08-11 23:22:15 +00:00
if config.Valve == nil {
config.Valve = UNLIMITED_VALVE
}
sbConfig := &switchboardConfig{
2019-08-12 21:43:16 +00:00
Valve: config.Valve,
}
if config.Unordered {
2019-08-12 22:13:13 +00:00
log.Debug("Connection is unordered")
2019-08-12 21:43:16 +00:00
sbConfig.strategy = UNIFORM_SPREAD
} else {
sbConfig.strategy = FIXED_CONN_MAPPING
}
sesh.sb = makeSwitchboard(sesh, sbConfig)
2019-06-16 13:30:35 +00:00
go sesh.timeoutAfter(30 * time.Second)
2018-10-05 22:44:20 +00:00
return sesh
}
2018-10-07 17:09:45 +00:00
func (sesh *Session) AddConnection(conn net.Conn) {
2018-10-28 21:22:38 +00:00
sesh.sb.addConn(conn)
addrs := []net.Addr{conn.LocalAddr(), conn.RemoteAddr()}
sesh.addrs.Store(addrs)
2018-10-07 17:09:45 +00:00
}
2018-10-05 22:44:20 +00:00
func (sesh *Session) OpenStream() (*Stream, error) {
2019-07-28 22:27:59 +00:00
if sesh.IsClosed() {
2018-11-07 21:16:13 +00:00
return nil, ErrBrokenSession
}
id := atomic.AddUint32(&sesh.nextStreamID, 1) - 1
// Because atomic.AddUint32 returns the value after incrementation
connId, _, err := sesh.sb.pickRandConn()
if err != nil {
return nil, err
}
stream := makeStream(sesh, id, connId)
sesh.streams.Store(id, stream)
2019-08-06 10:19:47 +00:00
log.Tracef("stream %v of session %v opened", id, sesh.id)
2018-10-05 22:44:20 +00:00
return stream, nil
}
2019-07-23 10:06:49 +00:00
func (sesh *Session) Accept() (net.Conn, error) {
2019-07-28 22:27:59 +00:00
if sesh.IsClosed() {
2018-10-27 14:27:43 +00:00
return nil, ErrBrokenSession
2018-10-23 19:47:58 +00:00
}
2019-07-28 22:27:59 +00:00
stream := <-sesh.acceptCh
if stream == nil {
return nil, ErrBrokenSession
}
2019-08-06 10:19:47 +00:00
log.Tracef("stream %v of session %v accepted", stream.id, sesh.id)
2019-07-28 22:27:59 +00:00
return stream, nil
2018-10-05 22:44:20 +00:00
}
func (sesh *Session) closeStream(s *Stream, active bool) error {
2019-10-15 21:06:11 +00:00
if s.isClosed() {
return errors.New("Already Closed")
}
atomic.StoreUint32(&s.closed, 1)
_ = s.recvBuf.Close() // both datagramBuffer and streamBuffer won't return err on Close()
if active {
s.writingM.Lock()
defer s.writingM.Unlock()
// Notify remote that this stream is closed
pad := genRandomPadding()
f := &Frame{
StreamID: s.id,
Seq: atomic.AddUint64(&s.nextSendSeq, 1) - 1,
2019-11-03 20:28:43 +00:00
Closing: C_STREAM,
Payload: pad,
}
i, err := s.session.Obfs(f, s.obfsBuf)
if err != nil {
return err
}
_, err = s.session.sb.send(s.obfsBuf[:i], &s.assignedConnId)
if err != nil {
return err
}
log.Tracef("stream %v actively closed", s.id)
} else {
log.Tracef("stream %v passively closed", s.id)
}
sesh.streams.Delete(s.id)
var count int
sesh.streams.Range(func(_, _ interface{}) bool {
count += 1
return true
})
if count == 0 {
2019-08-06 10:19:47 +00:00
log.Tracef("session %v has no active stream left", sesh.id)
go sesh.timeoutAfter(30 * time.Second)
}
return nil
2018-10-05 22:44:20 +00:00
}
// recvDataFromRemote deobfuscate the frame and read the Closing field. If it is a closing frame, it writes the frame
// to the stream buffer, otherwise it fetches the desired stream instance, or creates and stores one if it's a new
// stream and then writes to the stream buffer
2019-08-16 22:47:15 +00:00
func (sesh *Session) recvDataFromRemote(data []byte) error {
2019-08-05 13:33:20 +00:00
frame, err := sesh.Deobfs(data)
if err != nil {
2019-08-16 22:47:15 +00:00
return fmt.Errorf("Failed to decrypt a frame for session %v: %v", sesh.id, err)
2019-08-05 13:33:20 +00:00
}
2020-01-09 10:22:40 +00:00
if frame.Closing == C_STREAM {
streamI, existing := sesh.streams.Load(frame.StreamID)
if existing {
// DO NOT close the stream straight away here because the sequence number of this frame
// hasn't been checked. There may be later data frames which haven't arrived
2020-01-09 10:22:40 +00:00
stream := streamI.(*Stream)
return stream.writeFrame(*frame)
} else {
// If the stream has been closed and the current frame is a closing frame, we do noop
2019-08-16 22:47:15 +00:00
return nil
2020-01-09 10:22:40 +00:00
}
} else if frame.Closing == C_SESSION {
// Closing session
sesh.SetTerminalMsg("Received a closing notification frame")
return sesh.passiveClose()
2020-01-09 10:22:40 +00:00
} else {
connId, _, _ := sesh.sb.pickRandConn()
// we ignore the error here. If the switchboard is broken, it will be reflected upon stream.Write
newStream := makeStream(sesh, frame.StreamID, connId)
existingStreamI, existing := sesh.streams.LoadOrStore(frame.StreamID, newStream)
if existing {
return existingStreamI.(*Stream).writeFrame(*frame)
2018-11-24 00:55:26 +00:00
} else {
2020-01-09 10:22:40 +00:00
sesh.acceptCh <- newStream
return newStream.writeFrame(*frame)
2018-11-24 00:55:26 +00:00
}
}
}
func (sesh *Session) SetTerminalMsg(msg string) {
sesh.terminalMsg.Store(msg)
}
func (sesh *Session) TerminalMsg() string {
msg := sesh.terminalMsg.Load()
if msg != nil {
return msg.(string)
} else {
return ""
}
}
func (sesh *Session) passiveClose() error {
log.Debugf("attempting to passively close session %v", sesh.id)
2019-08-19 22:23:41 +00:00
if atomic.SwapUint32(&sesh.closed, 1) == 1 {
log.Debugf("session %v has already been closed", sesh.id)
return errRepeatSessionClosing
}
2019-07-28 22:27:59 +00:00
sesh.acceptCh <- nil
sesh.streams.Range(func(key, streamI interface{}) bool {
stream := streamI.(*Stream)
atomic.StoreUint32(&stream.closed, 1)
_ = stream.recvBuf.Close() // will not block
sesh.streams.Delete(key)
return true
})
2018-10-23 19:47:58 +00:00
2018-12-31 11:30:39 +00:00
sesh.sb.closeAll()
2019-08-05 13:33:20 +00:00
log.Debugf("session %v closed gracefully", sesh.id)
2018-10-23 19:47:58 +00:00
return nil
}
func genRandomPadding() []byte {
lenB := make([]byte, 1)
rand.Read(lenB)
pad := make([]byte, lenB[0])
rand.Read(pad)
return pad
}
func (sesh *Session) Close() error {
log.Debugf("attempting to actively close session %v", sesh.id)
if atomic.SwapUint32(&sesh.closed, 1) == 1 {
log.Debugf("session %v has already been closed", sesh.id)
return errRepeatSessionClosing
}
sesh.acceptCh <- nil
sesh.streams.Range(func(key, streamI interface{}) bool {
stream := streamI.(*Stream)
atomic.StoreUint32(&stream.closed, 1)
_ = stream.recvBuf.Close() // will not block
sesh.streams.Delete(key)
return true
})
pad := genRandomPadding()
f := &Frame{
StreamID: 0xffffffff,
Seq: 0,
2019-11-03 20:28:43 +00:00
Closing: C_SESSION,
Payload: pad,
}
obfsBuf := make([]byte, len(pad)+64)
i, err := sesh.Obfs(f, obfsBuf)
if err != nil {
return err
}
_, err = sesh.sb.send(obfsBuf[:i], new(uint32))
if err != nil {
return err
}
2018-10-23 19:47:58 +00:00
sesh.sb.closeAll()
log.Debugf("session %v closed gracefully", sesh.id)
return nil
2018-10-23 19:47:58 +00:00
}
2019-01-20 12:13:29 +00:00
2019-07-28 22:27:59 +00:00
func (sesh *Session) IsClosed() bool {
return atomic.LoadUint32(&sesh.closed) == 1
2019-01-20 12:13:29 +00:00
}
func (sesh *Session) timeoutAfter(to time.Duration) {
time.Sleep(to)
var count int
sesh.streams.Range(func(_, _ interface{}) bool {
count += 1
return true
})
if count == 0 && !sesh.IsClosed() {
sesh.SetTerminalMsg("timeout")
sesh.Close()
}
}
2019-07-23 10:06:49 +00:00
func (sesh *Session) Addr() net.Addr { return sesh.addrs.Load().([]net.Addr)[0] }