Cloak/internal/multiplex/session.go

148 lines
3.5 KiB
Go
Raw Normal View History

2018-10-05 22:44:20 +00:00
package multiplex
import (
2018-10-23 19:47:58 +00:00
"errors"
2018-11-07 21:16:13 +00:00
"log"
2018-10-05 22:44:20 +00:00
"net"
"sync"
2018-10-23 19:47:58 +00:00
"sync/atomic"
2018-10-05 22:44:20 +00:00
)
const (
// Copied from smux
2018-10-23 19:47:58 +00:00
acceptBacklog = 1024
2018-10-05 22:44:20 +00:00
)
2018-10-27 14:27:43 +00:00
var ErrBrokenSession = errors.New("broken session")
var errRepeatSessionClosing = errors.New("trying to close a closed session")
2018-10-05 22:44:20 +00:00
type Session struct {
2018-11-07 21:16:13 +00:00
id uint32 // This field isn't acutally used
2018-10-05 22:44:20 +00:00
// Used in Stream.Write. Add multiplexing headers, encrypt and add TLS header
obfs func(*Frame) []byte
// Remove TLS header, decrypt and unmarshall multiplexing headers
deobfs func([]byte) *Frame
// This is supposed to read one TLS message, the same as GoQuiet's ReadTillDrain
2018-11-07 21:16:13 +00:00
obfsedRead func(net.Conn, []byte) (int, error)
2018-10-05 22:44:20 +00:00
2018-10-27 22:35:46 +00:00
// atomic
2018-10-23 19:47:58 +00:00
nextStreamID uint32
2018-10-05 22:44:20 +00:00
streamsM sync.RWMutex
streams map[uint32]*Stream
// Switchboard manages all connections to remote
sb *switchboard
// For accepting new streams
acceptCh chan *Stream
2018-10-23 19:47:58 +00:00
2018-11-07 21:16:13 +00:00
// TODO: use sync.Once for this
2018-10-23 19:47:58 +00:00
closingM sync.Mutex
die chan struct{}
closing bool
2018-10-05 22:44:20 +00:00
}
2018-10-07 17:09:45 +00:00
// 1 conn is needed to make a session
2018-11-07 21:16:13 +00:00
func MakeSession(id uint32, valve *Valve, obfs func(*Frame) []byte, deobfs func([]byte) *Frame, obfsedRead func(net.Conn, []byte) (int, error)) *Session {
2018-10-05 22:44:20 +00:00
sesh := &Session{
id: id,
2018-10-07 17:09:45 +00:00
obfs: obfs,
deobfs: deobfs,
2018-11-07 21:16:13 +00:00
obfsedRead: obfsedRead,
2018-10-09 20:53:55 +00:00
nextStreamID: 1,
2018-10-05 22:44:20 +00:00
streams: make(map[uint32]*Stream),
acceptCh: make(chan *Stream, acceptBacklog),
2018-10-27 14:27:43 +00:00
die: make(chan struct{}),
2018-10-05 22:44:20 +00:00
}
2018-11-07 21:16:13 +00:00
sesh.sb = makeSwitchboard(sesh, valve)
2018-10-05 22:44:20 +00:00
return sesh
}
2018-10-07 17:09:45 +00:00
func (sesh *Session) AddConnection(conn net.Conn) {
2018-10-28 21:22:38 +00:00
sesh.sb.addConn(conn)
2018-10-07 17:09:45 +00:00
}
2018-10-05 22:44:20 +00:00
func (sesh *Session) OpenStream() (*Stream, error) {
2018-11-07 21:16:13 +00:00
select {
case <-sesh.die:
return nil, ErrBrokenSession
default:
}
id := atomic.AddUint32(&sesh.nextStreamID, 1) - 1
// Because atomic.AddUint32 returns the value after incrementation
2018-10-05 22:44:20 +00:00
stream := makeStream(id, sesh)
sesh.streamsM.Lock()
sesh.streams[id] = stream
sesh.streamsM.Unlock()
2018-11-07 21:16:13 +00:00
log.Printf("Opening stream %v\n", id)
2018-10-05 22:44:20 +00:00
return stream, nil
}
func (sesh *Session) AcceptStream() (*Stream, error) {
2018-10-23 19:47:58 +00:00
select {
case <-sesh.die:
2018-10-27 14:27:43 +00:00
return nil, ErrBrokenSession
2018-10-23 19:47:58 +00:00
case stream := <-sesh.acceptCh:
return stream, nil
}
2018-10-09 20:53:55 +00:00
2018-10-05 22:44:20 +00:00
}
func (sesh *Session) delStream(id uint32) {
2018-10-16 20:13:19 +00:00
sesh.streamsM.Lock()
2018-10-05 22:44:20 +00:00
delete(sesh.streams, id)
2018-10-16 20:13:19 +00:00
sesh.streamsM.Unlock()
2018-10-05 22:44:20 +00:00
}
func (sesh *Session) isStream(id uint32) bool {
2018-10-23 19:47:58 +00:00
sesh.streamsM.RLock()
2018-10-05 22:44:20 +00:00
_, ok := sesh.streams[id]
2018-10-23 19:47:58 +00:00
sesh.streamsM.RUnlock()
2018-10-05 22:44:20 +00:00
return ok
}
func (sesh *Session) getStream(id uint32) *Stream {
2018-10-23 19:47:58 +00:00
sesh.streamsM.RLock()
defer sesh.streamsM.RUnlock()
2018-10-05 22:44:20 +00:00
return sesh.streams[id]
}
2018-10-09 20:53:55 +00:00
// addStream is used when the remote opened a new stream and we got notified
func (sesh *Session) addStream(id uint32) *Stream {
2018-10-05 22:44:20 +00:00
stream := makeStream(id, sesh)
2018-10-09 20:53:55 +00:00
sesh.streamsM.Lock()
sesh.streams[id] = stream
sesh.streamsM.Unlock()
2018-10-05 22:44:20 +00:00
sesh.acceptCh <- stream
2018-11-07 21:16:13 +00:00
log.Printf("Adding stream %v\n", id)
2018-10-09 20:53:55 +00:00
return stream
2018-10-05 22:44:20 +00:00
}
2018-10-23 19:47:58 +00:00
func (sesh *Session) Close() error {
// Because closing a closed channel causes panic
sesh.closingM.Lock()
defer sesh.closingM.Unlock()
if sesh.closing {
2018-10-27 14:27:43 +00:00
return errRepeatSessionClosing
2018-10-23 19:47:58 +00:00
}
sesh.closing = true
close(sesh.die)
sesh.streamsM.Lock()
for id, stream := range sesh.streams {
// If we call stream.Close() here, streamsM will result in a deadlock
// because stream.Close calls sesh.delStream, which locks the mutex.
// so we need to implement a method of stream that closes the stream without calling
// sesh.delStream
// This can also be seen in smux
go stream.closeNoDelMap()
delete(sesh.streams, id)
}
sesh.streamsM.Unlock()
2018-10-28 21:22:38 +00:00
sesh.sb.shutdown()
2018-10-23 19:47:58 +00:00
return nil
}