diff --git a/internal/multiplex/qos.go b/internal/multiplex/qos.go index 6a96f78..355c58e 100644 --- a/internal/multiplex/qos.go +++ b/internal/multiplex/qos.go @@ -8,7 +8,7 @@ import ( // Valve needs to be universal, across all sessions that belong to a user // gabe please don't sue -type Valve struct { +type LimitedValve struct { // traffic directions from the server's perspective are refered // exclusively as rx and tx. // rx is from client to server, tx is from server to client @@ -21,9 +21,11 @@ type Valve struct { tx *int64 } -func MakeValve(rxRate, txRate int64) *Valve { +type UnlimitedValve struct{} + +func MakeValve(rxRate, txRate int64) *LimitedValve { var rx, tx int64 - v := &Valve{ + v := &LimitedValve{ rxtb: ratelimit.NewLimitedBucketWithRate(float64(rxRate), rxRate), txtb: ratelimit.NewLimitedBucketWithRate(float64(txRate), txRate), rx: &rx, @@ -32,27 +34,34 @@ func MakeValve(rxRate, txRate int64) *Valve { return v } -func MakeUnlimitedValve() *Valve { - var rx, tx int64 - v := &Valve{ - rxtb: ratelimit.NewUnlimitedBucket(), - txtb: ratelimit.NewUnlimitedBucket(), - rx: &rx, - tx: &tx, - } - return v -} +var UNLIMITED_VALVE = &UnlimitedValve{} -var UNLIMITED_VALVE = MakeUnlimitedValve() - -func (v *Valve) rxWait(n int) { v.rxtb.Wait(int64(n)) } -func (v *Valve) txWait(n int) { v.txtb.Wait(int64(n)) } -func (v *Valve) AddRx(n int64) { atomic.AddInt64(v.rx, n) } -func (v *Valve) AddTx(n int64) { atomic.AddInt64(v.tx, n) } -func (v *Valve) GetRx() int64 { return atomic.LoadInt64(v.rx) } -func (v *Valve) GetTx() int64 { return atomic.LoadInt64(v.tx) } -func (v *Valve) Nullify() (int64, int64) { +func (v *LimitedValve) rxWait(n int) { v.rxtb.Wait(int64(n)) } +func (v *LimitedValve) txWait(n int) { v.txtb.Wait(int64(n)) } +func (v *LimitedValve) AddRx(n int64) { atomic.AddInt64(v.rx, n) } +func (v *LimitedValve) AddTx(n int64) { atomic.AddInt64(v.tx, n) } +func (v *LimitedValve) GetRx() int64 { return atomic.LoadInt64(v.rx) } +func (v *LimitedValve) GetTx() int64 { return atomic.LoadInt64(v.tx) } +func (v *LimitedValve) Nullify() (int64, int64) { rx := atomic.SwapInt64(v.rx, 0) tx := atomic.SwapInt64(v.tx, 0) return rx, tx } + +func (v *UnlimitedValve) rxWait(n int) {} +func (v *UnlimitedValve) txWait(n int) {} +func (v *UnlimitedValve) AddRx(n int64) {} +func (v *UnlimitedValve) AddTx(n int64) {} +func (v *UnlimitedValve) GetRx() int64 { return 0 } +func (v *UnlimitedValve) GetTx() int64 { return 0 } +func (v *UnlimitedValve) Nullify() (int64, int64) { return 0, 0 } + +type Valve interface { + rxWait(n int) + txWait(n int) + AddRx(n int64) + AddTx(n int64) + GetRx() int64 + GetTx() int64 + Nullify() (int64, int64) +} diff --git a/internal/multiplex/session.go b/internal/multiplex/session.go index ee729fa..414d6d4 100644 --- a/internal/multiplex/session.go +++ b/internal/multiplex/session.go @@ -52,7 +52,7 @@ type Session struct { terminalMsg atomic.Value } -func MakeSession(id uint32, valve *Valve, obfuscator *Obfuscator, unitReader func(net.Conn, []byte) (int, error)) *Session { +func MakeSession(id uint32, valve Valve, obfuscator *Obfuscator, unitReader func(net.Conn, []byte) (int, error)) *Session { sesh := &Session{ id: id, unitRead: unitReader, diff --git a/internal/multiplex/switchboard.go b/internal/multiplex/switchboard.go index fc4edd1..270b4ac 100644 --- a/internal/multiplex/switchboard.go +++ b/internal/multiplex/switchboard.go @@ -13,7 +13,7 @@ import ( type switchboard struct { session *Session - *Valve + Valve connsM sync.RWMutex conns map[uint32]net.Conn @@ -22,7 +22,7 @@ type switchboard struct { broken uint32 } -func makeSwitchboard(sesh *Session, valve *Valve) *switchboard { +func makeSwitchboard(sesh *Session, valve Valve) *switchboard { // rates are uint64 because in the usermanager we want the bandwidth to be atomically // operated (so that the bandwidth can change on the fly). sb := &switchboard{ diff --git a/internal/server/activeuser.go b/internal/server/activeuser.go index b8de9a7..d741899 100644 --- a/internal/server/activeuser.go +++ b/internal/server/activeuser.go @@ -12,7 +12,7 @@ type ActiveUser struct { arrUID [16]byte - valve *mux.Valve + valve mux.Valve bypass bool diff --git a/internal/server/userpanel_test.go b/internal/server/userpanel_test.go index 0d3ea65..be10ddb 100644 --- a/internal/server/userpanel_test.go +++ b/internal/server/userpanel_test.go @@ -27,18 +27,12 @@ func TestUserPanel_BypassUser(t *testing.T) { }) t.Run("updateUsageQueue", func(t *testing.T) { panel.updateUsageQueue() - if user.valve.GetRx() != 10 || user.valve.GetTx() != 10 { - t.Error("user rx or tx info altered") - } if _, inQ := panel.usageUpdateQueue[user.arrUID]; inQ { t.Error("user in update queue") } }) t.Run("updateUsageQueueForOne", func(t *testing.T) { panel.updateUsageQueueForOne(user) - if user.valve.GetRx() != 10 || user.valve.GetTx() != 10 { - t.Error("user rx or tx info altered") - } if _, inQ := panel.usageUpdateQueue[user.arrUID]; inQ { t.Error("user in update queue") }