Differentiate concrete types of LimitedValve and UnlimitedValve to avoid unnecessary atomic.AddUint64 since it has non-trivial overhead on embedded systems

This commit is contained in:
Qian Wang 2019-08-07 15:43:42 +01:00
parent e7aa4cd04b
commit d99b4009b2
5 changed files with 35 additions and 32 deletions

View File

@ -8,7 +8,7 @@ import (
// Valve needs to be universal, across all sessions that belong to a user // Valve needs to be universal, across all sessions that belong to a user
// gabe please don't sue // gabe please don't sue
type Valve struct { type LimitedValve struct {
// traffic directions from the server's perspective are refered // traffic directions from the server's perspective are refered
// exclusively as rx and tx. // exclusively as rx and tx.
// rx is from client to server, tx is from server to client // rx is from client to server, tx is from server to client
@ -21,9 +21,11 @@ type Valve struct {
tx *int64 tx *int64
} }
func MakeValve(rxRate, txRate int64) *Valve { type UnlimitedValve struct{}
func MakeValve(rxRate, txRate int64) *LimitedValve {
var rx, tx int64 var rx, tx int64
v := &Valve{ v := &LimitedValve{
rxtb: ratelimit.NewLimitedBucketWithRate(float64(rxRate), rxRate), rxtb: ratelimit.NewLimitedBucketWithRate(float64(rxRate), rxRate),
txtb: ratelimit.NewLimitedBucketWithRate(float64(txRate), txRate), txtb: ratelimit.NewLimitedBucketWithRate(float64(txRate), txRate),
rx: &rx, rx: &rx,
@ -32,27 +34,34 @@ func MakeValve(rxRate, txRate int64) *Valve {
return v return v
} }
func MakeUnlimitedValve() *Valve { var UNLIMITED_VALVE = &UnlimitedValve{}
var rx, tx int64
v := &Valve{
rxtb: ratelimit.NewUnlimitedBucket(),
txtb: ratelimit.NewUnlimitedBucket(),
rx: &rx,
tx: &tx,
}
return v
}
var UNLIMITED_VALVE = MakeUnlimitedValve() func (v *LimitedValve) rxWait(n int) { v.rxtb.Wait(int64(n)) }
func (v *LimitedValve) txWait(n int) { v.txtb.Wait(int64(n)) }
func (v *Valve) rxWait(n int) { v.rxtb.Wait(int64(n)) } func (v *LimitedValve) AddRx(n int64) { atomic.AddInt64(v.rx, n) }
func (v *Valve) txWait(n int) { v.txtb.Wait(int64(n)) } func (v *LimitedValve) AddTx(n int64) { atomic.AddInt64(v.tx, n) }
func (v *Valve) AddRx(n int64) { atomic.AddInt64(v.rx, n) } func (v *LimitedValve) GetRx() int64 { return atomic.LoadInt64(v.rx) }
func (v *Valve) AddTx(n int64) { atomic.AddInt64(v.tx, n) } func (v *LimitedValve) GetTx() int64 { return atomic.LoadInt64(v.tx) }
func (v *Valve) GetRx() int64 { return atomic.LoadInt64(v.rx) } func (v *LimitedValve) Nullify() (int64, int64) {
func (v *Valve) GetTx() int64 { return atomic.LoadInt64(v.tx) }
func (v *Valve) Nullify() (int64, int64) {
rx := atomic.SwapInt64(v.rx, 0) rx := atomic.SwapInt64(v.rx, 0)
tx := atomic.SwapInt64(v.tx, 0) tx := atomic.SwapInt64(v.tx, 0)
return rx, tx return rx, tx
} }
func (v *UnlimitedValve) rxWait(n int) {}
func (v *UnlimitedValve) txWait(n int) {}
func (v *UnlimitedValve) AddRx(n int64) {}
func (v *UnlimitedValve) AddTx(n int64) {}
func (v *UnlimitedValve) GetRx() int64 { return 0 }
func (v *UnlimitedValve) GetTx() int64 { return 0 }
func (v *UnlimitedValve) Nullify() (int64, int64) { return 0, 0 }
type Valve interface {
rxWait(n int)
txWait(n int)
AddRx(n int64)
AddTx(n int64)
GetRx() int64
GetTx() int64
Nullify() (int64, int64)
}

View File

@ -52,7 +52,7 @@ type Session struct {
terminalMsg atomic.Value terminalMsg atomic.Value
} }
func MakeSession(id uint32, valve *Valve, obfuscator *Obfuscator, unitReader func(net.Conn, []byte) (int, error)) *Session { func MakeSession(id uint32, valve Valve, obfuscator *Obfuscator, unitReader func(net.Conn, []byte) (int, error)) *Session {
sesh := &Session{ sesh := &Session{
id: id, id: id,
unitRead: unitReader, unitRead: unitReader,

View File

@ -13,7 +13,7 @@ import (
type switchboard struct { type switchboard struct {
session *Session session *Session
*Valve Valve
connsM sync.RWMutex connsM sync.RWMutex
conns map[uint32]net.Conn conns map[uint32]net.Conn
@ -22,7 +22,7 @@ type switchboard struct {
broken uint32 broken uint32
} }
func makeSwitchboard(sesh *Session, valve *Valve) *switchboard { func makeSwitchboard(sesh *Session, valve Valve) *switchboard {
// rates are uint64 because in the usermanager we want the bandwidth to be atomically // rates are uint64 because in the usermanager we want the bandwidth to be atomically
// operated (so that the bandwidth can change on the fly). // operated (so that the bandwidth can change on the fly).
sb := &switchboard{ sb := &switchboard{

View File

@ -12,7 +12,7 @@ type ActiveUser struct {
arrUID [16]byte arrUID [16]byte
valve *mux.Valve valve mux.Valve
bypass bool bypass bool

View File

@ -27,18 +27,12 @@ func TestUserPanel_BypassUser(t *testing.T) {
}) })
t.Run("updateUsageQueue", func(t *testing.T) { t.Run("updateUsageQueue", func(t *testing.T) {
panel.updateUsageQueue() panel.updateUsageQueue()
if user.valve.GetRx() != 10 || user.valve.GetTx() != 10 {
t.Error("user rx or tx info altered")
}
if _, inQ := panel.usageUpdateQueue[user.arrUID]; inQ { if _, inQ := panel.usageUpdateQueue[user.arrUID]; inQ {
t.Error("user in update queue") t.Error("user in update queue")
} }
}) })
t.Run("updateUsageQueueForOne", func(t *testing.T) { t.Run("updateUsageQueueForOne", func(t *testing.T) {
panel.updateUsageQueueForOne(user) panel.updateUsageQueueForOne(user)
if user.valve.GetRx() != 10 || user.valve.GetTx() != 10 {
t.Error("user rx or tx info altered")
}
if _, inQ := panel.usageUpdateQueue[user.arrUID]; inQ { if _, inQ := panel.usageUpdateQueue[user.arrUID]; inQ {
t.Error("user in update queue") t.Error("user in update queue")
} }