Synchronise stream.Write

pull/110/head
Andy Wang 4 years ago
parent 008fd8f0a3
commit 58e0797578

@ -165,7 +165,9 @@ func (sesh *Session) closeStream(s *Stream, active bool) error {
Payload: pad, Payload: pad,
} }
s.allocIdempot.Do(func() { s.obfsBuf = make([]byte, s.session.SendBufferSize) }) if s.obfsBuf == nil {
s.obfsBuf = make([]byte, s.session.SendBufferSize)
}
i, err := s.session.Obfs(f, s.obfsBuf) i, err := s.session.Obfs(f, s.obfsBuf)
if err != nil { if err != nil {
return err return err

@ -23,7 +23,7 @@ type Stream struct {
// atomic // atomic
nextSendSeq uint64 nextSendSeq uint64
writingM sync.RWMutex writingM sync.Mutex
// atomic // atomic
closed uint32 closed uint32
@ -92,14 +92,15 @@ func (s *Stream) Write(in []byte) (n int, err error) {
// to be sent before the data frame and cause loss of packet. // to be sent before the data frame and cause loss of packet.
//log.Tracef("attempting to write %v bytes to stream %v",len(in),s.id) //log.Tracef("attempting to write %v bytes to stream %v",len(in),s.id)
// todo: forbid concurrent write // todo: forbid concurrent write
s.writingM.RLock() s.writingM.Lock()
defer s.writingM.RUnlock() defer s.writingM.Unlock()
if s.isClosed() { if s.isClosed() {
return 0, ErrBrokenStream return 0, ErrBrokenStream
} }
s.allocIdempot.Do(func() { s.obfsBuf = make([]byte, s.session.SendBufferSize) }) if s.obfsBuf == nil {
s.obfsBuf = make([]byte, s.session.SendBufferSize)
}
for n < len(in) { for n < len(in) {
var framePayload []byte var framePayload []byte
if len(in)-n <= s.session.maxStreamUnitWrite { if len(in)-n <= s.session.maxStreamUnitWrite {

Loading…
Cancel
Save