Add server response to status update

This commit is contained in:
Qian Wang 2019-07-24 15:25:09 +01:00
parent 9b552f55a4
commit 98a772b6ee
4 changed files with 73 additions and 8 deletions

View File

@ -48,9 +48,13 @@ func (u *ActiveUser) GetSession(sessionID uint32, obfs mux.Obfser, deobfs mux.De
}
}
func (u *ActiveUser) Terminate() {
// TODO: allow message to pass through this and sesh.Close()
func (u *ActiveUser) Terminate(reason string) {
u.sessionsM.Lock()
for _, sesh := range u.sessions {
if reason != "" {
sesh.SetTerminalMsg(reason)
}
go sesh.Close()
}
u.sessionsM.Unlock()

View File

@ -96,20 +96,53 @@ func i64ToB(value int64) []byte {
return oct
}
func (manager *localManager) uploadStatus(uploads []statusUpdate) error {
func (manager *localManager) uploadStatus(uploads []statusUpdate) ([]statusResponse, error) {
var responses []statusResponse
err := manager.db.Update(func(tx *bolt.Tx) error {
for _, status := range uploads {
var resp statusResponse
bucket := tx.Bucket(status.UID)
if bucket == nil {
log.Printf("%x doesn't exist\n", status.UID)
continue
}
oldUp := int64(Uint64(bucket.Get([]byte("UpCredit"))))
bucket.Put([]byte("UpCredit"), i64ToB(oldUp-status.upUsage))
newUp := oldUp - status.upUsage
if newUp <= 0 {
resp = statusResponse{
status.UID,
TERMINATE,
"No upload credit left",
}
}
bucket.Put([]byte("UpCredit"), i64ToB(newUp))
oldDown := int64(Uint64(bucket.Get([]byte("DownCredit"))))
bucket.Put([]byte("DownCredit"), i64ToB(oldDown-status.downUsage))
newDown := oldDown - status.downUsage
if newDown <= 0 {
resp = statusResponse{
status.UID,
TERMINATE,
"No download credit left",
}
}
bucket.Put([]byte("DownCredit"), i64ToB(newDown))
expiry := int64(Uint64(bucket.Get([]byte("ExpiryTime"))))
if time.Now().Unix()>expiry{
resp = statusResponse{
status.UID,
TERMINATE,
"User has expired",
}
}
if resp.UID != nil {
responses = append(responses, resp)
}
}
return nil
})
return err
return responses, err
}

View File

@ -14,8 +14,19 @@ type statusUpdate struct {
timestamp int64
}
type statusResponse struct {
UID []byte
action int
message string
}
const (
TERMINATE = iota + 1
)
var ErrUserNotFound = errors.New("UID does not correspond to a user")
var ErrSessionsCapReached = errors.New("Sessions cap has reached")
var ErrNoUpCredit = errors.New("No upload credit left")
var ErrNoDownCredit = errors.New("No download credit left")
var ErrUserExpired = errors.New("User has expired")
@ -23,6 +34,5 @@ var ErrUserExpired = errors.New("User has expired")
type UserManager interface {
authenticateUser([]byte) (int64, int64, error)
authoriseNewSession(*ActiveUser) error
// TODO: fetch update's response
uploadStatus([]statusUpdate) error
uploadStatus([]statusUpdate) ([]statusResponse, error)
}

View File

@ -1,6 +1,7 @@
package server
import (
"log"
"sync"
"sync/atomic"
"time"
@ -122,7 +123,24 @@ func (panel *userPanel) commitUpdate() {
}
statuses = append(statuses, status)
}
panel.manager.uploadStatus(statuses)
responses, err := panel.manager.uploadStatus(statuses)
if err != nil {
log.Println(err)
}
for _, resp := range responses {
var arrUID [16]byte
copy(arrUID[:], resp.UID)
switch resp.action {
case TERMINATE:
panel.activeUsersM.RLock()
user := panel.activeUsers[arrUID]
panel.activeUsersM.RUnlock()
if user != nil {
user.Terminate(resp.message)
}
}
}
panel.usageUpdateQueue = make(map[[16]byte]*usagePair)
panel.usageUpdateQueueM.Unlock()
}