Cloak/internal/multiplex/stream.go

158 lines
3.7 KiB
Go
Raw Normal View History

2018-10-05 22:44:20 +00:00
package multiplex
import (
"errors"
2019-01-12 15:51:20 +00:00
//"log"
2018-10-27 22:35:46 +00:00
"math"
prand "math/rand"
2018-10-05 22:44:20 +00:00
"sync"
2018-10-23 19:47:58 +00:00
"sync/atomic"
2018-10-05 22:44:20 +00:00
)
var ErrBrokenStream = errors.New("broken stream")
2018-10-05 22:44:20 +00:00
type Stream struct {
id uint32
session *Session
// Explanations of the following 4 fields can be found in frameSorter.go
nextRecvSeq uint32
2019-01-13 19:06:12 +00:00
rev int
sh sorterHeap
wrapMode bool
2018-10-05 22:44:20 +00:00
2018-10-27 14:27:43 +00:00
// New frames are received through newFrameCh by frameSorter
newFrameCh chan *Frame
// sortedBufCh are order-sorted data ready to be read raw
2018-10-05 22:44:20 +00:00
sortedBufCh chan []byte
2018-10-27 22:35:46 +00:00
// atomic
2018-10-23 19:47:58 +00:00
nextSendSeq uint32
2018-10-07 17:09:45 +00:00
2018-11-23 23:57:35 +00:00
writingM sync.RWMutex
2018-10-27 14:27:43 +00:00
// close(die) is used to notify different goroutines that this stream is closing
2018-11-23 23:57:35 +00:00
die chan struct{}
heliumMask sync.Once // my personal fav
2018-10-05 22:44:20 +00:00
}
func makeStream(id uint32, sesh *Session) *Stream {
stream := &Stream{
2018-10-09 20:53:55 +00:00
id: id,
session: sesh,
die: make(chan struct{}),
sh: []*frameNode{},
newFrameCh: make(chan *Frame, 1024),
2018-11-07 21:16:13 +00:00
sortedBufCh: make(chan []byte, 1024),
2018-10-05 22:44:20 +00:00
}
2018-10-09 20:53:55 +00:00
go stream.recvNewFrame()
2018-10-05 22:44:20 +00:00
return stream
}
func (stream *Stream) Read(buf []byte) (n int, err error) {
2018-10-16 20:13:19 +00:00
if len(buf) == 0 {
2018-10-05 22:44:20 +00:00
select {
case <-stream.die:
return 0, ErrBrokenStream
2018-10-16 20:13:19 +00:00
default:
return 0, nil
2018-10-05 22:44:20 +00:00
}
}
2018-10-16 20:13:19 +00:00
select {
case <-stream.die:
return 0, ErrBrokenStream
2018-10-20 10:35:50 +00:00
case data := <-stream.sortedBufCh:
2018-11-07 21:16:13 +00:00
if len(data) == 0 {
stream.passiveClose()
return 0, ErrBrokenStream
2018-11-07 21:16:13 +00:00
}
2018-10-20 10:35:50 +00:00
if len(buf) < len(data) {
return 0, errors.New("buf too small")
}
2018-10-16 20:13:19 +00:00
copy(buf, data)
return len(data), nil
}
2018-10-05 22:44:20 +00:00
}
func (stream *Stream) Write(in []byte) (n int, err error) {
2018-11-07 21:16:13 +00:00
// RWMutex used here isn't really for RW.
// we use it to exploit the fact that RLock doesn't create contention.
// The use of RWMutex is so that the stream will not actively close
// in the middle of the execution of Write. This may cause the closing frame
// to be sent before the data frame and cause loss of packet.
2018-11-23 23:57:35 +00:00
stream.writingM.RLock()
2018-10-05 22:44:20 +00:00
select {
case <-stream.die:
2018-11-23 23:57:35 +00:00
stream.writingM.RUnlock()
return 0, ErrBrokenStream
2018-10-05 22:44:20 +00:00
default:
}
f := &Frame{
2018-10-27 22:35:46 +00:00
StreamID: stream.id,
2018-11-07 21:16:13 +00:00
Seq: atomic.AddUint32(&stream.nextSendSeq, 1) - 1,
2018-10-27 22:35:46 +00:00
Closing: 0,
Payload: in,
2018-10-05 22:44:20 +00:00
}
2018-12-09 23:45:06 +00:00
tlsRecord, err := stream.session.obfs(f)
if err != nil {
2019-01-21 21:13:54 +00:00
stream.writingM.RUnlock()
2018-12-09 23:45:06 +00:00
return 0, err
}
2018-10-28 21:22:38 +00:00
n, err = stream.session.sb.send(tlsRecord)
2018-11-23 23:57:35 +00:00
stream.writingM.RUnlock()
2018-10-05 22:44:20 +00:00
2018-10-28 21:22:38 +00:00
return
2018-10-05 22:44:20 +00:00
}
2018-11-07 21:16:13 +00:00
// only close locally. Used when the stream close is notified by the remote
2018-12-22 23:58:03 +00:00
func (stream *Stream) passiveClose() {
2018-11-23 23:57:35 +00:00
stream.heliumMask.Do(func() { close(stream.die) })
2018-10-27 22:35:46 +00:00
stream.session.delStream(stream.id)
2019-01-12 15:51:20 +00:00
//log.Printf("%v passive closing\n", stream.id)
2018-10-27 22:35:46 +00:00
}
// active close. Close locally and tell the remote that this stream is being closed
2018-10-05 22:44:20 +00:00
func (stream *Stream) Close() error {
2018-10-14 19:32:54 +00:00
2018-11-23 23:57:35 +00:00
stream.writingM.Lock()
2018-12-29 00:02:59 +00:00
select {
case <-stream.die:
stream.writingM.Unlock()
return errors.New("Already Closed")
default:
}
2018-11-23 23:57:35 +00:00
stream.heliumMask.Do(func() { close(stream.die) })
2018-10-27 22:35:46 +00:00
2018-11-07 21:16:13 +00:00
// Notify remote that this stream is closed
2018-10-27 22:35:46 +00:00
prand.Seed(int64(stream.id))
padLen := int(math.Floor(prand.Float64()*200 + 300))
pad := make([]byte, padLen)
prand.Read(pad)
f := &Frame{
StreamID: stream.id,
2018-11-07 21:16:13 +00:00
Seq: atomic.AddUint32(&stream.nextSendSeq, 1) - 1,
2018-10-27 22:35:46 +00:00
Closing: 1,
Payload: pad,
}
2018-12-09 23:45:06 +00:00
tlsRecord, _ := stream.session.obfs(f)
2018-10-28 21:22:38 +00:00
stream.session.sb.send(tlsRecord)
2018-10-27 22:35:46 +00:00
2018-10-16 20:13:19 +00:00
stream.session.delStream(stream.id)
2019-01-12 15:51:20 +00:00
//log.Printf("%v actively closed\n", stream.id)
2018-11-23 23:57:35 +00:00
stream.writingM.Unlock()
2018-10-05 22:44:20 +00:00
return nil
}
2018-10-23 19:47:58 +00:00
2018-11-23 23:57:35 +00:00
// Same as passiveClose() but no call to session.delStream.
2018-10-23 19:47:58 +00:00
// This is called in session.Close() to avoid mutex deadlock
2018-11-23 23:57:35 +00:00
// We don't notify the remote because session.Close() is always
// called when the session is passively closed
2018-12-22 23:58:03 +00:00
func (stream *Stream) closeNoDelMap() {
2018-11-23 23:57:35 +00:00
stream.heliumMask.Do(func() { close(stream.die) })
2018-10-23 19:47:58 +00:00
}