2018-10-05 22:44:20 +00:00
|
|
|
package multiplex
|
|
|
|
|
|
|
|
import (
|
2018-10-23 19:47:58 +00:00
|
|
|
"errors"
|
2019-08-16 22:47:15 +00:00
|
|
|
"fmt"
|
2020-02-01 23:46:46 +00:00
|
|
|
"github.com/cbeuw/Cloak/internal/util"
|
2018-10-05 22:44:20 +00:00
|
|
|
"net"
|
|
|
|
"sync"
|
2018-10-23 19:47:58 +00:00
|
|
|
"sync/atomic"
|
2019-03-23 12:45:12 +00:00
|
|
|
"time"
|
2019-08-05 13:33:20 +00:00
|
|
|
|
|
|
|
log "github.com/sirupsen/logrus"
|
2018-10-05 22:44:20 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
2020-04-08 14:07:35 +00:00
|
|
|
acceptBacklog = 1024
|
|
|
|
defaultSendRecvBufSize = 20480
|
2018-10-05 22:44:20 +00:00
|
|
|
)
|
|
|
|
|
2018-10-27 14:27:43 +00:00
|
|
|
var ErrBrokenSession = errors.New("broken session")
|
|
|
|
var errRepeatSessionClosing = errors.New("trying to close a closed session")
|
|
|
|
|
2019-08-12 21:43:16 +00:00
|
|
|
type switchboardStrategy int
|
2019-08-11 23:48:20 +00:00
|
|
|
|
2019-08-11 23:22:15 +00:00
|
|
|
type SessionConfig struct {
|
2019-08-02 15:37:48 +00:00
|
|
|
*Obfuscator
|
2018-10-05 22:44:20 +00:00
|
|
|
|
2019-08-11 23:22:15 +00:00
|
|
|
Valve
|
|
|
|
|
2019-08-20 21:43:04 +00:00
|
|
|
// This is supposed to read one TLS message.
|
2019-08-11 23:22:15 +00:00
|
|
|
UnitRead func(net.Conn, []byte) (int, error)
|
2019-08-11 23:48:20 +00:00
|
|
|
|
|
|
|
Unordered bool
|
2020-04-08 14:07:35 +00:00
|
|
|
|
|
|
|
SendBufferSize int
|
|
|
|
ReceiveBufferSize int
|
2019-08-11 23:22:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type Session struct {
|
|
|
|
id uint32
|
|
|
|
|
2020-04-08 14:07:35 +00:00
|
|
|
SessionConfig
|
2019-08-02 00:01:19 +00:00
|
|
|
|
2018-10-27 22:35:46 +00:00
|
|
|
// atomic
|
2018-10-23 19:47:58 +00:00
|
|
|
nextStreamID uint32
|
2018-10-05 22:44:20 +00:00
|
|
|
|
2020-01-23 20:30:31 +00:00
|
|
|
// atomic
|
|
|
|
activeStreamCount uint32
|
|
|
|
streams sync.Map
|
2018-10-05 22:44:20 +00:00
|
|
|
|
|
|
|
// Switchboard manages all connections to remote
|
|
|
|
sb *switchboard
|
|
|
|
|
2019-10-08 22:11:16 +00:00
|
|
|
// Used for LocalAddr() and RemoteAddr() etc.
|
2019-07-25 11:17:29 +00:00
|
|
|
addrs atomic.Value
|
|
|
|
|
2018-10-05 22:44:20 +00:00
|
|
|
// For accepting new streams
|
|
|
|
acceptCh chan *Stream
|
2018-10-23 19:47:58 +00:00
|
|
|
|
2019-07-28 22:27:59 +00:00
|
|
|
closed uint32
|
2019-07-24 13:25:57 +00:00
|
|
|
|
|
|
|
terminalMsg atomic.Value
|
2018-10-05 22:44:20 +00:00
|
|
|
}
|
|
|
|
|
2020-04-08 14:07:35 +00:00
|
|
|
func MakeSession(id uint32, config SessionConfig) *Session {
|
2018-10-05 22:44:20 +00:00
|
|
|
sesh := &Session{
|
2019-08-11 23:22:15 +00:00
|
|
|
id: id,
|
|
|
|
SessionConfig: config,
|
|
|
|
nextStreamID: 1,
|
|
|
|
acceptCh: make(chan *Stream, acceptBacklog),
|
2018-10-05 22:44:20 +00:00
|
|
|
}
|
2019-07-25 11:17:29 +00:00
|
|
|
sesh.addrs.Store([]net.Addr{nil, nil})
|
2019-08-11 23:22:15 +00:00
|
|
|
|
|
|
|
if config.Valve == nil {
|
2020-04-08 14:13:49 +00:00
|
|
|
sesh.Valve = UNLIMITED_VALVE
|
2019-08-11 23:22:15 +00:00
|
|
|
}
|
2020-04-08 14:07:35 +00:00
|
|
|
if config.SendBufferSize <= 0 {
|
2020-04-08 14:13:49 +00:00
|
|
|
sesh.SendBufferSize = defaultSendRecvBufSize
|
2020-04-08 14:07:35 +00:00
|
|
|
}
|
|
|
|
if config.ReceiveBufferSize <= 0 {
|
2020-04-08 14:13:49 +00:00
|
|
|
sesh.ReceiveBufferSize = defaultSendRecvBufSize
|
2020-04-08 14:07:35 +00:00
|
|
|
}
|
2019-08-11 23:48:20 +00:00
|
|
|
|
2020-04-08 14:07:35 +00:00
|
|
|
sbConfig := switchboardConfig{
|
2020-04-08 14:13:49 +00:00
|
|
|
Valve: sesh.Valve,
|
|
|
|
recvBufferSize: sesh.ReceiveBufferSize,
|
2019-08-12 21:43:16 +00:00
|
|
|
}
|
2020-04-08 14:13:49 +00:00
|
|
|
if sesh.Unordered {
|
2019-08-12 22:13:13 +00:00
|
|
|
log.Debug("Connection is unordered")
|
2019-08-12 21:43:16 +00:00
|
|
|
sbConfig.strategy = UNIFORM_SPREAD
|
|
|
|
} else {
|
|
|
|
sbConfig.strategy = FIXED_CONN_MAPPING
|
2019-08-11 23:48:20 +00:00
|
|
|
}
|
|
|
|
sesh.sb = makeSwitchboard(sesh, sbConfig)
|
2019-06-16 13:30:35 +00:00
|
|
|
go sesh.timeoutAfter(30 * time.Second)
|
2018-10-05 22:44:20 +00:00
|
|
|
return sesh
|
|
|
|
}
|
|
|
|
|
2020-01-23 20:30:31 +00:00
|
|
|
func (sesh *Session) streamCountIncr() uint32 {
|
|
|
|
return atomic.AddUint32(&sesh.activeStreamCount, 1)
|
|
|
|
}
|
|
|
|
func (sesh *Session) streamCountDecr() uint32 {
|
|
|
|
return atomic.AddUint32(&sesh.activeStreamCount, ^uint32(0))
|
|
|
|
}
|
|
|
|
func (sesh *Session) streamCount() uint32 {
|
|
|
|
return atomic.LoadUint32(&sesh.activeStreamCount)
|
|
|
|
}
|
|
|
|
|
2018-10-07 17:09:45 +00:00
|
|
|
func (sesh *Session) AddConnection(conn net.Conn) {
|
2018-10-28 21:22:38 +00:00
|
|
|
sesh.sb.addConn(conn)
|
2019-07-25 11:17:29 +00:00
|
|
|
addrs := []net.Addr{conn.LocalAddr(), conn.RemoteAddr()}
|
|
|
|
sesh.addrs.Store(addrs)
|
2018-10-07 17:09:45 +00:00
|
|
|
}
|
|
|
|
|
2018-10-05 22:44:20 +00:00
|
|
|
func (sesh *Session) OpenStream() (*Stream, error) {
|
2019-07-28 22:27:59 +00:00
|
|
|
if sesh.IsClosed() {
|
2018-11-07 21:16:13 +00:00
|
|
|
return nil, ErrBrokenSession
|
|
|
|
}
|
|
|
|
id := atomic.AddUint32(&sesh.nextStreamID, 1) - 1
|
|
|
|
// Because atomic.AddUint32 returns the value after incrementation
|
2019-11-03 12:22:12 +00:00
|
|
|
connId, _, err := sesh.sb.pickRandConn()
|
2019-08-05 21:14:11 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
stream := makeStream(sesh, id, connId)
|
2019-11-03 12:22:12 +00:00
|
|
|
sesh.streams.Store(id, stream)
|
2020-01-23 20:30:31 +00:00
|
|
|
sesh.streamCountIncr()
|
2019-08-06 10:19:47 +00:00
|
|
|
log.Tracef("stream %v of session %v opened", id, sesh.id)
|
2018-10-05 22:44:20 +00:00
|
|
|
return stream, nil
|
|
|
|
}
|
|
|
|
|
2019-07-23 10:06:49 +00:00
|
|
|
func (sesh *Session) Accept() (net.Conn, error) {
|
2019-07-28 22:27:59 +00:00
|
|
|
if sesh.IsClosed() {
|
2018-10-27 14:27:43 +00:00
|
|
|
return nil, ErrBrokenSession
|
2018-10-23 19:47:58 +00:00
|
|
|
}
|
2019-07-28 22:27:59 +00:00
|
|
|
stream := <-sesh.acceptCh
|
|
|
|
if stream == nil {
|
|
|
|
return nil, ErrBrokenSession
|
|
|
|
}
|
2019-08-06 10:19:47 +00:00
|
|
|
log.Tracef("stream %v of session %v accepted", stream.id, sesh.id)
|
2019-07-28 22:27:59 +00:00
|
|
|
return stream, nil
|
2018-10-05 22:44:20 +00:00
|
|
|
}
|
|
|
|
|
2019-10-14 14:34:14 +00:00
|
|
|
func (sesh *Session) closeStream(s *Stream, active bool) error {
|
2019-10-15 21:06:11 +00:00
|
|
|
if s.isClosed() {
|
2020-04-04 23:51:28 +00:00
|
|
|
return fmt.Errorf("stream %v is already closed", s.id)
|
2019-10-15 21:06:11 +00:00
|
|
|
}
|
2019-10-14 14:34:14 +00:00
|
|
|
atomic.StoreUint32(&s.closed, 1)
|
|
|
|
_ = s.recvBuf.Close() // both datagramBuffer and streamBuffer won't return err on Close()
|
|
|
|
|
|
|
|
if active {
|
|
|
|
s.writingM.Lock()
|
|
|
|
defer s.writingM.Unlock()
|
|
|
|
// Notify remote that this stream is closed
|
|
|
|
pad := genRandomPadding()
|
|
|
|
f := &Frame{
|
|
|
|
StreamID: s.id,
|
|
|
|
Seq: atomic.AddUint64(&s.nextSendSeq, 1) - 1,
|
2019-11-03 20:28:43 +00:00
|
|
|
Closing: C_STREAM,
|
2019-10-14 14:34:14 +00:00
|
|
|
Payload: pad,
|
|
|
|
}
|
|
|
|
i, err := s.session.Obfs(f, s.obfsBuf)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
_, err = s.session.sb.send(s.obfsBuf[:i], &s.assignedConnId)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
log.Tracef("stream %v actively closed", s.id)
|
|
|
|
} else {
|
|
|
|
log.Tracef("stream %v passively closed", s.id)
|
|
|
|
}
|
|
|
|
|
2020-03-15 23:56:45 +00:00
|
|
|
sesh.streams.Store(s.id, nil) // id may or may not exist. if we use Delete(s.id) here it will panic
|
2020-01-23 20:30:31 +00:00
|
|
|
if sesh.streamCountDecr() == 0 {
|
|
|
|
log.Debugf("session %v has no active stream left", sesh.id)
|
2019-03-23 12:45:12 +00:00
|
|
|
go sesh.timeoutAfter(30 * time.Second)
|
|
|
|
}
|
2019-10-14 14:34:14 +00:00
|
|
|
return nil
|
2018-10-05 22:44:20 +00:00
|
|
|
}
|
|
|
|
|
2020-01-22 21:12:32 +00:00
|
|
|
// recvDataFromRemote deobfuscate the frame and read the Closing field. If it is a closing frame, it writes the frame
|
|
|
|
// to the stream buffer, otherwise it fetches the desired stream instance, or creates and stores one if it's a new
|
|
|
|
// stream and then writes to the stream buffer
|
2019-08-16 22:47:15 +00:00
|
|
|
func (sesh *Session) recvDataFromRemote(data []byte) error {
|
2019-08-05 13:33:20 +00:00
|
|
|
frame, err := sesh.Deobfs(data)
|
|
|
|
if err != nil {
|
2019-08-16 22:47:15 +00:00
|
|
|
return fmt.Errorf("Failed to decrypt a frame for session %v: %v", sesh.id, err)
|
2019-08-05 13:33:20 +00:00
|
|
|
}
|
|
|
|
|
2020-01-23 20:30:31 +00:00
|
|
|
if frame.Closing == C_SESSION {
|
2020-01-22 21:30:30 +00:00
|
|
|
sesh.SetTerminalMsg("Received a closing notification frame")
|
|
|
|
return sesh.passiveClose()
|
2020-01-23 20:30:31 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
connId, _, _ := sesh.sb.pickRandConn()
|
|
|
|
// we ignore the error here. If the switchboard is broken, it will be reflected upon stream.Write
|
|
|
|
newStream := makeStream(sesh, frame.StreamID, connId)
|
|
|
|
existingStreamI, existing := sesh.streams.LoadOrStore(frame.StreamID, newStream)
|
|
|
|
if existing {
|
|
|
|
if existingStreamI == nil {
|
|
|
|
// this is when the stream existed before but has since been closed. We do nothing
|
|
|
|
return nil
|
2018-11-24 00:55:26 +00:00
|
|
|
}
|
2020-01-23 20:30:31 +00:00
|
|
|
return existingStreamI.(*Stream).writeFrame(*frame)
|
|
|
|
} else {
|
2020-03-15 23:56:45 +00:00
|
|
|
// new stream
|
2020-01-23 20:30:31 +00:00
|
|
|
sesh.streamCountIncr()
|
|
|
|
sesh.acceptCh <- newStream
|
|
|
|
return newStream.writeFrame(*frame)
|
2018-11-24 00:55:26 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-07-24 13:25:57 +00:00
|
|
|
func (sesh *Session) SetTerminalMsg(msg string) {
|
|
|
|
sesh.terminalMsg.Store(msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (sesh *Session) TerminalMsg() string {
|
|
|
|
msg := sesh.terminalMsg.Load()
|
|
|
|
if msg != nil {
|
|
|
|
return msg.(string)
|
|
|
|
} else {
|
|
|
|
return ""
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-10-14 14:34:14 +00:00
|
|
|
func (sesh *Session) passiveClose() error {
|
|
|
|
log.Debugf("attempting to passively close session %v", sesh.id)
|
2019-08-19 22:23:41 +00:00
|
|
|
if atomic.SwapUint32(&sesh.closed, 1) == 1 {
|
|
|
|
log.Debugf("session %v has already been closed", sesh.id)
|
|
|
|
return errRepeatSessionClosing
|
|
|
|
}
|
2019-07-28 22:27:59 +00:00
|
|
|
sesh.acceptCh <- nil
|
2019-10-14 14:34:14 +00:00
|
|
|
|
2019-11-03 12:22:12 +00:00
|
|
|
sesh.streams.Range(func(key, streamI interface{}) bool {
|
2020-01-23 20:30:31 +00:00
|
|
|
if streamI == nil {
|
|
|
|
return true
|
|
|
|
}
|
2019-11-03 12:22:12 +00:00
|
|
|
stream := streamI.(*Stream)
|
2019-10-14 14:34:14 +00:00
|
|
|
atomic.StoreUint32(&stream.closed, 1)
|
2019-11-03 12:22:12 +00:00
|
|
|
_ = stream.recvBuf.Close() // will not block
|
|
|
|
sesh.streams.Delete(key)
|
2020-01-23 20:30:31 +00:00
|
|
|
sesh.streamCountDecr()
|
2019-11-03 12:22:12 +00:00
|
|
|
return true
|
|
|
|
})
|
2018-10-23 19:47:58 +00:00
|
|
|
|
2018-12-31 11:30:39 +00:00
|
|
|
sesh.sb.closeAll()
|
2019-08-05 13:33:20 +00:00
|
|
|
log.Debugf("session %v closed gracefully", sesh.id)
|
2018-10-23 19:47:58 +00:00
|
|
|
return nil
|
2019-10-14 14:34:14 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func genRandomPadding() []byte {
|
|
|
|
lenB := make([]byte, 1)
|
2020-02-01 23:46:46 +00:00
|
|
|
util.CryptoRandRead(lenB)
|
2019-10-14 14:34:14 +00:00
|
|
|
pad := make([]byte, lenB[0])
|
2020-02-01 23:46:46 +00:00
|
|
|
util.CryptoRandRead(pad)
|
2019-10-14 14:34:14 +00:00
|
|
|
return pad
|
|
|
|
}
|
|
|
|
|
|
|
|
func (sesh *Session) Close() error {
|
|
|
|
log.Debugf("attempting to actively close session %v", sesh.id)
|
|
|
|
if atomic.SwapUint32(&sesh.closed, 1) == 1 {
|
|
|
|
log.Debugf("session %v has already been closed", sesh.id)
|
|
|
|
return errRepeatSessionClosing
|
|
|
|
}
|
|
|
|
sesh.acceptCh <- nil
|
|
|
|
|
2019-11-03 12:22:12 +00:00
|
|
|
sesh.streams.Range(func(key, streamI interface{}) bool {
|
2020-01-23 20:30:31 +00:00
|
|
|
if streamI == nil {
|
|
|
|
return true
|
|
|
|
}
|
2019-11-03 12:22:12 +00:00
|
|
|
stream := streamI.(*Stream)
|
2019-10-14 14:34:14 +00:00
|
|
|
atomic.StoreUint32(&stream.closed, 1)
|
2019-11-03 12:22:12 +00:00
|
|
|
_ = stream.recvBuf.Close() // will not block
|
|
|
|
sesh.streams.Delete(key)
|
2020-01-23 20:30:31 +00:00
|
|
|
sesh.streamCountDecr()
|
2019-11-03 12:22:12 +00:00
|
|
|
return true
|
|
|
|
})
|
2019-10-14 14:34:14 +00:00
|
|
|
|
|
|
|
pad := genRandomPadding()
|
|
|
|
f := &Frame{
|
|
|
|
StreamID: 0xffffffff,
|
|
|
|
Seq: 0,
|
2019-11-03 20:28:43 +00:00
|
|
|
Closing: C_SESSION,
|
2019-10-14 14:34:14 +00:00
|
|
|
Payload: pad,
|
|
|
|
}
|
|
|
|
obfsBuf := make([]byte, len(pad)+64)
|
|
|
|
i, err := sesh.Obfs(f, obfsBuf)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
_, err = sesh.sb.send(obfsBuf[:i], new(uint32))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2018-10-23 19:47:58 +00:00
|
|
|
|
2019-10-14 14:34:14 +00:00
|
|
|
sesh.sb.closeAll()
|
|
|
|
log.Debugf("session %v closed gracefully", sesh.id)
|
|
|
|
return nil
|
2018-10-23 19:47:58 +00:00
|
|
|
}
|
2019-01-20 12:13:29 +00:00
|
|
|
|
2019-07-28 22:27:59 +00:00
|
|
|
func (sesh *Session) IsClosed() bool {
|
|
|
|
return atomic.LoadUint32(&sesh.closed) == 1
|
2019-01-20 12:13:29 +00:00
|
|
|
}
|
2019-03-23 12:45:12 +00:00
|
|
|
|
|
|
|
func (sesh *Session) timeoutAfter(to time.Duration) {
|
|
|
|
time.Sleep(to)
|
2020-01-23 20:30:31 +00:00
|
|
|
|
|
|
|
if sesh.streamCount() == 0 && !sesh.IsClosed() {
|
2019-07-25 19:57:02 +00:00
|
|
|
sesh.SetTerminalMsg("timeout")
|
2019-03-23 12:45:12 +00:00
|
|
|
sesh.Close()
|
|
|
|
}
|
|
|
|
}
|
2019-07-23 10:06:49 +00:00
|
|
|
|
2019-07-25 11:17:29 +00:00
|
|
|
func (sesh *Session) Addr() net.Addr { return sesh.addrs.Load().([]net.Addr)[0] }
|