From 98a772b6ee02551058b06ca3b01f26cf60e58489 Mon Sep 17 00:00:00 2001 From: Qian Wang Date: Wed, 24 Jul 2019 15:25:09 +0100 Subject: [PATCH] Add server response to status update --- internal/server/activeuser.go | 6 ++++- internal/server/um_local.go | 41 ++++++++++++++++++++++++++++++---- internal/server/usermanager.go | 14 ++++++++++-- internal/server/userpanel.go | 20 ++++++++++++++++- 4 files changed, 73 insertions(+), 8 deletions(-) diff --git a/internal/server/activeuser.go b/internal/server/activeuser.go index def2c3c..f702fa4 100644 --- a/internal/server/activeuser.go +++ b/internal/server/activeuser.go @@ -48,9 +48,13 @@ func (u *ActiveUser) GetSession(sessionID uint32, obfs mux.Obfser, deobfs mux.De } } -func (u *ActiveUser) Terminate() { +// TODO: allow message to pass through this and sesh.Close() +func (u *ActiveUser) Terminate(reason string) { u.sessionsM.Lock() for _, sesh := range u.sessions { + if reason != "" { + sesh.SetTerminalMsg(reason) + } go sesh.Close() } u.sessionsM.Unlock() diff --git a/internal/server/um_local.go b/internal/server/um_local.go index 59785f3..64342f4 100644 --- a/internal/server/um_local.go +++ b/internal/server/um_local.go @@ -96,20 +96,53 @@ func i64ToB(value int64) []byte { return oct } -func (manager *localManager) uploadStatus(uploads []statusUpdate) error { +func (manager *localManager) uploadStatus(uploads []statusUpdate) ([]statusResponse, error) { + var responses []statusResponse err := manager.db.Update(func(tx *bolt.Tx) error { for _, status := range uploads { + var resp statusResponse bucket := tx.Bucket(status.UID) if bucket == nil { log.Printf("%x doesn't exist\n", status.UID) continue } + oldUp := int64(Uint64(bucket.Get([]byte("UpCredit")))) - bucket.Put([]byte("UpCredit"), i64ToB(oldUp-status.upUsage)) + newUp := oldUp - status.upUsage + if newUp <= 0 { + resp = statusResponse{ + status.UID, + TERMINATE, + "No upload credit left", + } + } + bucket.Put([]byte("UpCredit"), i64ToB(newUp)) + oldDown := int64(Uint64(bucket.Get([]byte("DownCredit")))) - bucket.Put([]byte("DownCredit"), i64ToB(oldDown-status.downUsage)) + newDown := oldDown - status.downUsage + if newDown <= 0 { + resp = statusResponse{ + status.UID, + TERMINATE, + "No download credit left", + } + } + bucket.Put([]byte("DownCredit"), i64ToB(newDown)) + + expiry := int64(Uint64(bucket.Get([]byte("ExpiryTime")))) + if time.Now().Unix()>expiry{ + resp = statusResponse{ + status.UID, + TERMINATE, + "User has expired", + } + } + + if resp.UID != nil { + responses = append(responses, resp) + } } return nil }) - return err + return responses, err } diff --git a/internal/server/usermanager.go b/internal/server/usermanager.go index f62a0b1..1889647 100644 --- a/internal/server/usermanager.go +++ b/internal/server/usermanager.go @@ -14,8 +14,19 @@ type statusUpdate struct { timestamp int64 } +type statusResponse struct { + UID []byte + action int + message string +} + +const ( + TERMINATE = iota + 1 +) + var ErrUserNotFound = errors.New("UID does not correspond to a user") var ErrSessionsCapReached = errors.New("Sessions cap has reached") + var ErrNoUpCredit = errors.New("No upload credit left") var ErrNoDownCredit = errors.New("No download credit left") var ErrUserExpired = errors.New("User has expired") @@ -23,6 +34,5 @@ var ErrUserExpired = errors.New("User has expired") type UserManager interface { authenticateUser([]byte) (int64, int64, error) authoriseNewSession(*ActiveUser) error - // TODO: fetch update's response - uploadStatus([]statusUpdate) error + uploadStatus([]statusUpdate) ([]statusResponse, error) } diff --git a/internal/server/userpanel.go b/internal/server/userpanel.go index d4c888e..ee33a48 100644 --- a/internal/server/userpanel.go +++ b/internal/server/userpanel.go @@ -1,6 +1,7 @@ package server import ( + "log" "sync" "sync/atomic" "time" @@ -122,7 +123,24 @@ func (panel *userPanel) commitUpdate() { } statuses = append(statuses, status) } - panel.manager.uploadStatus(statuses) + + responses, err := panel.manager.uploadStatus(statuses) + if err != nil { + log.Println(err) + } + for _, resp := range responses { + var arrUID [16]byte + copy(arrUID[:], resp.UID) + switch resp.action { + case TERMINATE: + panel.activeUsersM.RLock() + user := panel.activeUsers[arrUID] + panel.activeUsersM.RUnlock() + if user != nil { + user.Terminate(resp.message) + } + } + } panel.usageUpdateQueue = make(map[[16]byte]*usagePair) panel.usageUpdateQueueM.Unlock() }