From e7e4cd57269e1c06a85e8d67f4585580e5b5d83c Mon Sep 17 00:00:00 2001 From: Andy Wang Date: Thu, 9 Jan 2020 10:22:40 +0000 Subject: [PATCH] Refactor frame reception processor --- go.mod | 1 + internal/multiplex/session.go | 40 ++++++++++++++++++++--------------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/go.mod b/go.mod index edc231a..e37b172 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/gorilla/websocket v1.4.1 github.com/juju/ratelimit v1.0.1 github.com/kr/pretty v0.1.0 // indirect + github.com/mitchellh/gox v1.0.1 // indirect github.com/refraction-networking/utls v0.0.0-20190824032329-cc2996c81813 github.com/sirupsen/logrus v1.4.2 golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 diff --git a/internal/multiplex/session.go b/internal/multiplex/session.go index 2bb46a4..321b02d 100644 --- a/internal/multiplex/session.go +++ b/internal/multiplex/session.go @@ -179,29 +179,35 @@ func (sesh *Session) recvDataFromRemote(data []byte) error { return fmt.Errorf("Failed to decrypt a frame for session %v: %v", sesh.id, err) } - streamI, existing := sesh.streams.Load(frame.StreamID) - if existing { - stream := streamI.(*Stream) - return stream.writeFrame(*frame) - } else { - if frame.Closing == C_STREAM { + if frame.Closing == C_STREAM { + streamI, existing := sesh.streams.Load(frame.StreamID) + if existing { // If the stream has been closed and the current frame is a closing frame, we do noop + stream := streamI.(*Stream) + return stream.writeFrame(*frame) + } else { return nil - } else if frame.Closing == C_SESSION { + } + } else if frame.Closing == C_SESSION { + streamI, existing := sesh.streams.Load(frame.StreamID) + if existing { + stream := streamI.(*Stream) + return stream.writeFrame(*frame) + } else { // Closing session sesh.SetTerminalMsg("Received a closing notification frame") return sesh.passiveClose() + } + } else { + connId, _, _ := sesh.sb.pickRandConn() + // we ignore the error here. If the switchboard is broken, it will be reflected upon stream.Write + newStream := makeStream(sesh, frame.StreamID, connId) + existingStreamI, existing := sesh.streams.LoadOrStore(frame.StreamID, newStream) + if existing { + return existingStreamI.(*Stream).writeFrame(*frame) } else { - // it may be tempting to use the connId from which the frame was received. However it doesn't make - // any difference because we only care to send the data from the same stream through the same - // TCP connection. The remote may use a different connection to send the same stream than the one the client - // use to send. - connId, _, _ := sesh.sb.pickRandConn() - // we ignore the error here. If the switchboard is broken, it will be reflected upon stream.Write - stream := makeStream(sesh, frame.StreamID, connId) - sesh.streams.Store(stream.id, stream) - sesh.acceptCh <- stream - return stream.writeFrame(*frame) + sesh.acceptCh <- newStream + return newStream.writeFrame(*frame) } } }