From d958de993828a680931e83202a9b58581fe48647 Mon Sep 17 00:00:00 2001 From: Andy Wang Date: Fri, 17 Apr 2020 01:04:26 +0100 Subject: [PATCH] Improve switchboard connscount --- internal/multiplex/switchboard.go | 12 +++++------ internal/multiplex/switchboard_test.go | 30 ++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/internal/multiplex/switchboard.go b/internal/multiplex/switchboard.go index 407d273..3a2b6e7 100644 --- a/internal/multiplex/switchboard.go +++ b/internal/multiplex/switchboard.go @@ -27,6 +27,7 @@ type switchboard struct { switchboardConfig conns sync.Map + numConns uint32 nextConnId uint32 broken uint32 @@ -46,17 +47,12 @@ func makeSwitchboard(sesh *Session, config switchboardConfig) *switchboard { var errBrokenSwitchboard = errors.New("the switchboard is broken") func (sb *switchboard) connsCount() int { - // count the number of entries in conns - var count int - sb.conns.Range(func(_, _ interface{}) bool { - count += 1 - return true - }) - return count + return int(atomic.LoadUint32(&sb.numConns)) } func (sb *switchboard) addConn(conn net.Conn) { connId := atomic.AddUint32(&sb.nextConnId, 1) - 1 + atomic.AddUint32(&sb.numConns, 1) sb.conns.Store(connId, conn) go sb.deplex(connId, conn) } @@ -66,6 +62,7 @@ func (sb *switchboard) send(data []byte, connId *uint32) (n int, err error) { writeAndRegUsage := func(conn net.Conn, d []byte) (int, error) { n, err = conn.Write(d) if err != nil { + sb.conns.Delete(*connId) sb.close("failed to write to remote " + err.Error()) return n, err } @@ -162,6 +159,7 @@ func (sb *switchboard) deplex(connId uint32, conn net.Conn) { if err != nil { log.Debugf("a connection for session %v has closed: %v", sb.session.id, err) sb.conns.Delete(connId) + atomic.AddUint32(&sb.numConns, ^uint32(0)) sb.close("a connection has dropped unexpectedly") return } diff --git a/internal/multiplex/switchboard_test.go b/internal/multiplex/switchboard_test.go index 395aa59..694c6b2 100644 --- a/internal/multiplex/switchboard_test.go +++ b/internal/multiplex/switchboard_test.go @@ -3,6 +3,7 @@ package multiplex import ( "github.com/cbeuw/connutil" "math/rand" + "sync" "testing" "time" ) @@ -154,3 +155,32 @@ func TestSwitchboard_CloseOnOneDisconn(t *testing.T) { return } } + +func TestSwitchboard_ConnsCount(t *testing.T) { + seshConfig := SessionConfig{ + Valve: MakeValve(1<<20, 1<<20), + } + sesh := MakeSession(0, seshConfig) + + var wg sync.WaitGroup + for i := 0; i < 1000; i++ { + wg.Add(1) + go func() { + sesh.AddConnection(connutil.Discard()) + wg.Done() + }() + } + wg.Wait() + + if sesh.sb.connsCount() != 1000 { + t.Error("connsCount incorrect") + } + + sesh.sb.closeAll() + + time.Sleep(100 * time.Millisecond) + if sesh.sb.connsCount() != 0 { + t.Error("connsCount incorrect") + } + +}