From d73462653d09ab066d27a147a2ea41b8c1c07316 Mon Sep 17 00:00:00 2001 From: Andy Wang Date: Wed, 14 Aug 2019 11:46:33 +0100 Subject: [PATCH] Refactor routing --- cmd/ck-client/ck-client.go | 264 +++++++++++++++++++------------------ 1 file changed, 136 insertions(+), 128 deletions(-) diff --git a/cmd/ck-client/ck-client.go b/cmd/ck-client/ck-client.go index add2de2..fbcce43 100644 --- a/cmd/ck-client/ck-client.go +++ b/cmd/ck-client/ck-client.go @@ -91,6 +91,134 @@ func makeSession(sta *client.State, isAdmin bool, unordered bool) *mux.Session { return sesh } +func routeUDP(sta *client.State, adminUID []byte) { + var sesh *mux.Session + localUDPAddr, err := net.ResolveUDPAddr("udp", sta.LocalHost+":"+sta.LocalPort) + if err != nil { + log.Fatal(err) + } + localConn, err := net.ListenUDP("udp", localUDPAddr) + if err != nil { + log.Fatal(err) + } + for { + var otherEnd atomic.Value + data := make([]byte, 10240) + i, oe, err := localConn.ReadFromUDP(data) + if err != nil { + log.Errorf("Failed to read first packet from proxy client: %v", err) + localConn.Close() + return + } + otherEnd.Store(oe) + + if sesh == nil || sesh.IsClosed() { + sesh = makeSession(sta, adminUID != nil, true) + } + log.Debugf("proxy local address %v", otherEnd.Load().(*net.UDPAddr).String()) + stream, err := sesh.OpenStream() + if err != nil { + log.Errorf("Failed to open stream: %v", err) + localConn.Close() + //localConnWrite.Close() + return + } + _, err = stream.Write(data[:i]) + if err != nil { + log.Errorf("Failed to write to stream: %v", err) + localConn.Close() + //localConnWrite.Close() + stream.Close() + return + } + + // stream to proxy + go func() { + buf := make([]byte, 16380) + for { + i, err := io.ReadAtLeast(stream, buf, 1) + if err != nil { + log.Print(err) + go localConn.Close() + go stream.Close() + return + } + i, err = localConn.WriteToUDP(buf[:i], otherEnd.Load().(*net.UDPAddr)) + if err != nil { + log.Print(err) + go localConn.Close() + go stream.Close() + return + } + } + }() + + // proxy to stream + buf := make([]byte, 16380) + for { + i, oe, err := localConn.ReadFromUDP(buf) + if err != nil { + log.Print(err) + go localConn.Close() + go stream.Close() + return + } + otherEnd.Store(oe) + i, err = stream.Write(buf[:i]) + if err != nil { + log.Print(err) + go localConn.Close() + go stream.Close() + return + } + } + } + +} + +func routeTCP(sta *client.State, adminUID []byte) { + tcpListener, err := net.Listen("tcp", sta.LocalHost+":"+sta.LocalPort) + if err != nil { + log.Fatal(err) + } + var sesh *mux.Session + for { + localConn, err := tcpListener.Accept() + if err != nil { + log.Fatal(err) + continue + } + if sesh == nil || sesh.IsClosed() { + sesh = makeSession(sta, adminUID != nil, false) + } + go func() { + data := make([]byte, 10240) + i, err := io.ReadAtLeast(localConn, data, 1) + if err != nil { + log.Errorf("Failed to read first packet from proxy client: %v", err) + localConn.Close() + return + } + stream, err := sesh.OpenStream() + if err != nil { + log.Errorf("Failed to open stream: %v", err) + localConn.Close() + return + } + _, err = stream.Write(data[:i]) + if err != nil { + log.Errorf("Failed to write to stream: %v", err) + localConn.Close() + stream.Close() + return + } + go util.Pipe(localConn, stream) + util.Pipe(stream, localConn) + }() + } + +} + func main() { // Should be 127.0.0.1 to listen to a proxy client on this machine var localHost string @@ -176,144 +304,24 @@ func main() { } } - var tcpListener net.Listener - var network string - if udp { - network = "udp" - } else { - network = "tcp" - // TODO use the local variable instead fo sta.LocalPort - tcpListener, err = net.Listen("tcp", listeningIP+":"+sta.LocalPort) - if err != nil { - log.Fatal(err) - } - } - if adminUID != nil { log.Infof("API base is %v:%v", listeningIP, sta.LocalPort) sta.SessionID = 0 sta.UID = adminUID sta.NumConn = 1 } else { + var network string + if udp { + network = "udp" + } else { + network = "tcp" + } log.Infof("Listening on %v %v:%v for proxy clients", network, listeningIP, sta.LocalPort) } - var sesh *mux.Session - if udp { - localUDPAddr, err := net.ResolveUDPAddr("udp", listeningIP+":"+localPort) - if err != nil { - log.Fatal(err) - } - localConn, err := net.ListenUDP("udp", localUDPAddr) - if err != nil { - log.Fatal(err) - } - for { - var otherEnd atomic.Value - data := make([]byte, 10240) - i, oe, err := localConn.ReadFromUDP(data) - if err != nil { - log.Errorf("Failed to read first packet from proxy client: %v", err) - localConn.Close() - return - } - otherEnd.Store(oe) - - if sesh == nil || sesh.IsClosed() { - sesh = makeSession(sta, adminUID != nil, true) - } - log.Debugf("proxy local address %v", otherEnd.Load().(*net.UDPAddr).String()) - stream, err := sesh.OpenStream() - if err != nil { - log.Errorf("Failed to open stream: %v", err) - localConn.Close() - //localConnWrite.Close() - return - } - _, err = stream.Write(data[:i]) - if err != nil { - log.Errorf("Failed to write to stream: %v", err) - localConn.Close() - //localConnWrite.Close() - stream.Close() - return - } - - go func() { - buf := make([]byte, 16380) - for { - i, err := io.ReadAtLeast(stream, buf, 1) - if err != nil { - log.Print(err) - go localConn.Close() - go stream.Close() - return - } - i, err = localConn.WriteToUDP(buf[:i], otherEnd.Load().(*net.UDPAddr)) - if err != nil { - log.Print(err) - go localConn.Close() - go stream.Close() - return - } - } - }() - - buf := make([]byte, 16380) - for { - i, oe, err := localConn.ReadFromUDP(buf) - if err != nil { - log.Print(err) - go localConn.Close() - go stream.Close() - return - } - otherEnd.Store(oe) - i, err = stream.Write(buf[:i]) - if err != nil { - log.Print(err) - go localConn.Close() - go stream.Close() - return - } - } - } + routeUDP(sta, adminUID) } else { - for { - localConn, err := tcpListener.Accept() - if err != nil { - log.Fatal(err) - continue - } - if sesh == nil || sesh.IsClosed() { - sesh = makeSession(sta, adminUID != nil, false) - } - go func() { - data := make([]byte, 10240) - i, err := io.ReadAtLeast(localConn, data, 1) - if err != nil { - log.Errorf("Failed to read first packet from proxy client: %v", err) - localConn.Close() - return - } - stream, err := sesh.OpenStream() - if err != nil { - log.Errorf("Failed to open stream: %v", err) - localConn.Close() - return - } - _, err = stream.Write(data[:i]) - if err != nil { - log.Errorf("Failed to write to stream: %v", err) - localConn.Close() - stream.Close() - return - } - go util.Pipe(localConn, stream) - util.Pipe(stream, localConn) - }() - } + routeTCP(sta, adminUID) } - }