Refactor frame reception processor

This commit is contained in:
Andy Wang 2020-01-09 10:22:40 +00:00
parent 2acc174a05
commit e7e4cd5726
2 changed files with 24 additions and 17 deletions

1
go.mod
View File

@ -9,6 +9,7 @@ require (
github.com/gorilla/websocket v1.4.1 github.com/gorilla/websocket v1.4.1
github.com/juju/ratelimit v1.0.1 github.com/juju/ratelimit v1.0.1
github.com/kr/pretty v0.1.0 // indirect github.com/kr/pretty v0.1.0 // indirect
github.com/mitchellh/gox v1.0.1 // indirect
github.com/refraction-networking/utls v0.0.0-20190824032329-cc2996c81813 github.com/refraction-networking/utls v0.0.0-20190824032329-cc2996c81813
github.com/sirupsen/logrus v1.4.2 github.com/sirupsen/logrus v1.4.2
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4

View File

@ -179,29 +179,35 @@ func (sesh *Session) recvDataFromRemote(data []byte) error {
return fmt.Errorf("Failed to decrypt a frame for session %v: %v", sesh.id, err) return fmt.Errorf("Failed to decrypt a frame for session %v: %v", sesh.id, err)
} }
streamI, existing := sesh.streams.Load(frame.StreamID) if frame.Closing == C_STREAM {
if existing { streamI, existing := sesh.streams.Load(frame.StreamID)
stream := streamI.(*Stream) if existing {
return stream.writeFrame(*frame)
} else {
if frame.Closing == C_STREAM {
// If the stream has been closed and the current frame is a closing frame, we do noop // If the stream has been closed and the current frame is a closing frame, we do noop
stream := streamI.(*Stream)
return stream.writeFrame(*frame)
} else {
return nil return nil
} else if frame.Closing == C_SESSION { }
} else if frame.Closing == C_SESSION {
streamI, existing := sesh.streams.Load(frame.StreamID)
if existing {
stream := streamI.(*Stream)
return stream.writeFrame(*frame)
} else {
// Closing session // Closing session
sesh.SetTerminalMsg("Received a closing notification frame") sesh.SetTerminalMsg("Received a closing notification frame")
return sesh.passiveClose() return sesh.passiveClose()
}
} else {
connId, _, _ := sesh.sb.pickRandConn()
// we ignore the error here. If the switchboard is broken, it will be reflected upon stream.Write
newStream := makeStream(sesh, frame.StreamID, connId)
existingStreamI, existing := sesh.streams.LoadOrStore(frame.StreamID, newStream)
if existing {
return existingStreamI.(*Stream).writeFrame(*frame)
} else { } else {
// it may be tempting to use the connId from which the frame was received. However it doesn't make sesh.acceptCh <- newStream
// any difference because we only care to send the data from the same stream through the same return newStream.writeFrame(*frame)
// TCP connection. The remote may use a different connection to send the same stream than the one the client
// use to send.
connId, _, _ := sesh.sb.pickRandConn()
// we ignore the error here. If the switchboard is broken, it will be reflected upon stream.Write
stream := makeStream(sesh, frame.StreamID, connId)
sesh.streams.Store(stream.id, stream)
sesh.acceptCh <- stream
return stream.writeFrame(*frame)
} }
} }
} }