From c1b261c652135d1402f8669727c9244c81b48f6c Mon Sep 17 00:00:00 2001 From: Andy Wang Date: Sat, 17 Oct 2020 19:28:03 +0100 Subject: [PATCH] Add stream tests --- internal/multiplex/stream_test.go | 142 ++++++++++++++++++++++-------- 1 file changed, 106 insertions(+), 36 deletions(-) diff --git a/internal/multiplex/stream_test.go b/internal/multiplex/stream_test.go index 6c89515..27541c3 100644 --- a/internal/multiplex/stream_test.go +++ b/internal/multiplex/stream_test.go @@ -175,46 +175,119 @@ func TestStream_WriteSync(t *testing.T) { func TestStream_Close(t *testing.T) { var sessionKey [32]byte rand.Read(sessionKey[:]) - sesh := setupSesh(false, sessionKey, E_METHOD_PLAIN) testPayload := []byte{42, 42, 42} - f := &Frame{ + dataFrame := &Frame{ 1, 0, 0, testPayload, } - conn, writingEnd := connutil.AsyncPipe() - sesh.AddConnection(conn) - obfsBuf := make([]byte, 512) - i, _ := sesh.Obfs(f, obfsBuf, 0) - writingEnd.Write(obfsBuf[:i]) - time.Sleep(100 * time.Microsecond) - stream, err := sesh.Accept() - if err != nil { - t.Error("failed to accept stream", err) - return - } - err = stream.Close() - if err != nil { - t.Error("failed to actively close stream", err) - return - } + t.Run("active closing", func(t *testing.T) { + sesh := setupSesh(false, sessionKey, E_METHOD_PLAIN) + rawConn, rawWritingEnd := connutil.AsyncPipe() + sesh.AddConnection(common.NewTLSConn(rawConn)) + writingEnd := common.NewTLSConn(rawWritingEnd) - if sI, _ := sesh.streams.Load(stream.(*Stream).id); sI != nil { - t.Error("stream still exists") - return - } + obfsBuf := make([]byte, 512) + i, _ := sesh.Obfs(dataFrame, obfsBuf, 0) + _, err := writingEnd.Write(obfsBuf[:i]) + if err != nil { + t.Error("failed to write from remote end") + } + stream, err := sesh.Accept() + if err != nil { + t.Error("failed to accept stream", err) + return + } + err = stream.Close() + if err != nil { + t.Error("failed to actively close stream", err) + return + } - readBuf := make([]byte, len(testPayload)) - _, err = io.ReadFull(stream, readBuf) - if err != nil { - t.Errorf("can't read residual data %v", err) - } - if !bytes.Equal(readBuf, testPayload) { - t.Errorf("read wrong data") - } + if sI, _ := sesh.streams.Load(stream.(*Stream).id); sI != nil { + t.Error("stream still exists") + return + } + + readBuf := make([]byte, len(testPayload)) + _, err = io.ReadFull(stream, readBuf) + if err != nil { + t.Errorf("can't read residual data %v", err) + } + if !bytes.Equal(readBuf, testPayload) { + t.Errorf("read wrong data") + } + }) + + t.Run("passive closing", func(t *testing.T) { + sesh := setupSesh(false, sessionKey, E_METHOD_PLAIN) + rawConn, rawWritingEnd := connutil.AsyncPipe() + sesh.AddConnection(common.NewTLSConn(rawConn)) + writingEnd := common.NewTLSConn(rawWritingEnd) + + obfsBuf := make([]byte, 512) + i, err := sesh.Obfs(dataFrame, obfsBuf, 0) + if err != nil { + t.Errorf("failed to obfuscate frame %v", err) + } + _, err = writingEnd.Write(obfsBuf[:i]) + if err != nil { + t.Error("failed to write from remote end") + } + + stream, err := sesh.Accept() + if err != nil { + t.Error("failed to accept stream", err) + return + } + + closingFrame := &Frame{ + 1, + dataFrame.Seq + 1, + C_STREAM, + testPayload, + } + + i, err = sesh.Obfs(closingFrame, obfsBuf, 0) + if err != nil { + t.Errorf("failed to obfuscate frame %v", err) + } + _, err = writingEnd.Write(obfsBuf[:i]) + if err != nil { + t.Errorf("failed to write from remote end %v", err) + } + + closingFrameDup := &Frame{ + 1, + dataFrame.Seq + 2, + C_STREAM, + testPayload, + } + + i, err = sesh.Obfs(closingFrameDup, obfsBuf, 0) + if err != nil { + t.Errorf("failed to obfuscate frame %v", err) + } + _, err = writingEnd.Write(obfsBuf[:i]) + if err != nil { + t.Errorf("failed to write from remote end %v", err) + } + + readBuf := make([]byte, len(testPayload)) + _, err = io.ReadFull(stream, readBuf) + if err != nil { + t.Errorf("can't read residual data %v", err) + } + time.Sleep(100 * time.Microsecond) + if sI, _ := sesh.streams.Load(stream.(*Stream).id); sI != nil { + t.Error("stream still exists") + return + } + + }) } func TestStream_Read(t *testing.T) { @@ -239,15 +312,15 @@ func TestStream_Read(t *testing.T) { for name, unordered := range seshes { sesh := setupSesh(unordered, emptyKey, E_METHOD_PLAIN) - conn, writingEnd := connutil.AsyncPipe() - sesh.AddConnection(conn) + rawConn, rawWritingEnd := connutil.AsyncPipe() + sesh.AddConnection(common.NewTLSConn(rawConn)) + writingEnd := common.NewTLSConn(rawWritingEnd) t.Run(name, func(t *testing.T) { t.Run("Plain read", func(t *testing.T) { f.StreamID = streamID i, _ := sesh.Obfs(f, obfsBuf, 0) streamID++ writingEnd.Write(obfsBuf[:i]) - time.Sleep(100 * time.Microsecond) stream, err := sesh.Accept() if err != nil { t.Error("failed to accept stream", err) @@ -273,7 +346,6 @@ func TestStream_Read(t *testing.T) { i, _ := sesh.Obfs(f, obfsBuf, 0) streamID++ writingEnd.Write(obfsBuf[:i]) - time.Sleep(100 * time.Microsecond) stream, _ := sesh.Accept() i, err := stream.Read(nil) if i != 0 || err != nil { @@ -286,7 +358,6 @@ func TestStream_Read(t *testing.T) { i, _ := sesh.Obfs(f, obfsBuf, 0) streamID++ writingEnd.Write(obfsBuf[:i]) - time.Sleep(100 * time.Microsecond) stream, _ := sesh.Accept() stream.Close() i, err := stream.Read(buf) @@ -311,7 +382,6 @@ func TestStream_Read(t *testing.T) { i, _ := sesh.Obfs(f, obfsBuf, 0) streamID++ writingEnd.Write(obfsBuf[:i]) - time.Sleep(100 * time.Microsecond) stream, _ := sesh.Accept() sesh.Close() i, err := stream.Read(buf)