From 89fc1b289494b38a595f25a3005f528441c2e349 Mon Sep 17 00:00:00 2001 From: Chakib Benziane Date: Mon, 1 Apr 2019 12:12:29 +0200 Subject: [PATCH] Using callback no need for watchers --- api/handlers.go | 80 ++++++++++++++++++++++++++++++++++++++ api.go => api/routes.go | 20 +++++----- config/vars.go | 49 ++++++++++++++++++++++++ db/db.go | 32 ++++------------ docker-compose.yml | 8 ++-- go.mod | 1 + go.sum | 2 + ln/api.go | 24 ++++-------- ln/invoice.go | 35 +++++++++++------ main.go | 9 ++--- session.go | 24 ------------ storage/storage.go | 21 ++-------- storage/upload_ctrl.go | 30 +++++++-------- storage/upload_model.go | 11 +++++- watchers/invoices.go | 85 ----------------------------------------- web/src/Websocket.js | 10 ++--- ws/server.go | 79 +++++++++++++++++--------------------- 17 files changed, 255 insertions(+), 265 deletions(-) create mode 100644 api/handlers.go rename api.go => api/routes.go (77%) create mode 100644 config/vars.go delete mode 100644 session.go delete mode 100644 watchers/invoices.go diff --git a/api/handlers.go b/api/handlers.go new file mode 100644 index 0000000..86d1419 --- /dev/null +++ b/api/handlers.go @@ -0,0 +1,80 @@ +package api + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + + "git.sp4ke.com/sp4ke/bit4sat/bus" + "git.sp4ke.com/sp4ke/bit4sat/db" + "git.sp4ke.com/sp4ke/bit4sat/ln" + "git.sp4ke.com/sp4ke/bit4sat/storage" + "git.sp4ke.com/sp4ke/bit4sat/utils" + "github.com/gin-gonic/gin" + "github.com/mediocregopher/radix/v3" +) + +func invoiceCbHandler(c *gin.Context) { + invoice := ln.Invoice{} + + //dec := json.Decoder(c.Request.Body) + + if err := c.ShouldBindJSON(&invoice); err != nil { + log.Println(err) + utils.JSONErr(c, http.StatusNotAcceptable, + "callback: could not parse request from charge api") + return + } + + log.Printf("received invoice paid from ln charge\n%s", invoice) + c.Status(http.StatusOK) + + // get upload id related to invoice + var uploadId string + invoiceUploadKey := fmt.Sprintf("invoice_%s_upload", invoice.Id) + err := db.DB.Redis.Do(radix.FlatCmd(&uploadId, "GET", invoiceUploadKey)) + if err != nil { + panic(err) + } + if uploadId == "" { + log.Printf("cannot find uploadId for invoice %s", invoice) + return + } + + invoice.UploadId = uploadId + // + // Set upload status to UpWaitingStorage (paid) + err = storage.SetUploadStatus(invoice.UploadId, storage.UpWaitingStorage) + if err != nil { + panic(err) + } + // + // Update Upload Invoice + err = storage.SetUploadInvoice(invoice.UploadId, &invoice) + if err != nil { + panic(err) + } + + // publish invoice was updated to upload_id_paid channel + msg := bus.Message{} + msg.Type = bus.PaymentReceived + msg.UploadId = invoice.UploadId + msg.Data = gin.H{ + "invoice": invoice, + } + + msgJson, err := json.Marshal(msg) + if err != nil { + panic(err) + } + + log.Printf("Notifying upload paid %s", invoice.UploadId) + key := fmt.Sprintf("%s_%s", bus.UploadUpdateChannelPrefix, + invoice.UploadId) + + err = db.DB.Redis.Do(radix.FlatCmd(nil, "PUBLISH", + key, msgJson)) + + return +} diff --git a/api.go b/api/routes.go similarity index 77% rename from api.go rename to api/routes.go index 91539e5..ae6e6d2 100644 --- a/api.go +++ b/api/routes.go @@ -1,6 +1,7 @@ -package main +package api import ( + "git.sp4ke.com/sp4ke/bit4sat/config" "git.sp4ke.com/sp4ke/bit4sat/storage" "git.sp4ke.com/sp4ke/bit4sat/ws" "github.com/gin-contrib/cors" @@ -21,20 +22,17 @@ func (api *API) Run() { uploadRoute := api.router.Group("/api/upload") { - uploadRoute.POST("", UploadCtrl.New) uploadRoute.PUT(":id", UploadCtrl.Upload) - } - wsRoute := api.router.Group("/ws") - { - wsRoute.GET("/", ws.Serve) - wsRoute.GET("/WS", ws.Serve) - wsRoute.GET("/WSS", ws.Serve) - } + // Websocket server + api.router.GET("/ws", ws.Serve) + + // LN charge callback + api.router.POST("/"+config.ChargeCallbackEndpoint, invoiceCbHandler) - api.router.Run("0.0.0.0:8880") + api.router.Run(config.ApiListen) } func NewAPI() *API { @@ -43,7 +41,7 @@ func NewAPI() *API { // Setup Session sessionStore, err := redis.NewStore(10, "tcp", "redis:6379", - "", []byte(SessionSecret)) + "", []byte(config.SessionSecret)) if err != nil { panic(err) } diff --git a/config/vars.go b/config/vars.go new file mode 100644 index 0000000..900b81d --- /dev/null +++ b/config/vars.go @@ -0,0 +1,49 @@ +package config + +import ( + "fmt" + + "github.com/namsral/flag" +) + +const ( + ChargeCallbackEndpoint = "chargecallback" +) + +var ( + StoragePath, + ApiPort, + ApiInterface, + ApiListen, + ApiHost, + SessionSecret, + LnChargeApi, + LnChargeToken, + SqlDbHost, + SqlDbUser, + SqlDbPass string +) + +func init() { + // Storage + flag.StringVar(&StoragePath, "bit4sat-storage-path", "file-storage", "base file storage path") + + // Api + flag.StringVar(&ApiPort, "api-port", "8880", "api port number") + flag.StringVar(&ApiInterface, "api-interface", "", "api listening interface") + flag.StringVar(&ApiHost, "api-host", "", "reachable hostname for api (used by callbacks)") + ApiListen = fmt.Sprintf("%s:%s", ApiInterface, ApiPort) + + flag.StringVar(&SessionSecret, "session-secret", "default-secret", "cookie sessions secret") + + // Database + flag.StringVar(&SqlDbHost, "sql-db-host", "bit4sat", "sql db hostname") + flag.StringVar(&SqlDbUser, "sql-db-user", "bit4sat", "sql db username") + flag.StringVar(&SqlDbPass, "sql-db-pass", "bit4sat", "sql db pass") + + // LN Charge + flag.StringVar(&LnChargeApi, "ln-charge-api", "", "ln charge api endpoint") + flag.StringVar(&LnChargeToken, "ln-charge-token", "", "ln charge api token") + + flag.Parse() +} diff --git a/db/db.go b/db/db.go index f8d427a..49dab50 100644 --- a/db/db.go +++ b/db/db.go @@ -3,21 +3,17 @@ package db import ( "fmt" "log" - "os" + "git.sp4ke.com/sp4ke/bit4sat/config" _ "github.com/lib/pq" + //_ "github.com/go-sql-driver/mysql" "github.com/jmoiron/sqlx" "github.com/mediocregopher/radix/v3" ) const ( - //DBName = "bit4sat.sqlite" - DBName = "bit4sat" - DBHostEnv = "DB_HOST" - DBUserEnv = "DB_USER" - DBPassEnv = "DB_PASS" - //DBPragma = ` PRAGMA foreign_keys = ON; ` + DBName = "bit4sat" ) var ( @@ -33,23 +29,11 @@ type Database struct { func (d *Database) Open() error { var err error - // Get env vars - host, set := os.LookupEnv(DBHostEnv) - if !set { - log.Fatal("undefined DB_HOST env") - } - - user, set := os.LookupEnv(DBUserEnv) - if !set { - log.Fatal("undefined DB_USER env") - } - - pass, set := os.LookupEnv(DBPassEnv) - if !set { - log.Fatal("undefined DB_PASS env") - } - - dsn := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable", user, pass, host, DBName) + dsn := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable", + config.SqlDbUser, + config.SqlDbPass, + config.SqlDbHost, + DBName) log.Printf("Opening SQL at %s\n", dsn) diff --git a/docker-compose.yml b/docker-compose.yml index bf7c083..ad16e5d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,14 +11,16 @@ volumes: services: app: image: sp4ke/bit4sat + container_name: bit4sat-api build: ./docker environment: - GO111MODULE=on + - API_HOST=bit4sat-api - BIT4SAT_STORAGE_PATH=/storage - GOPATH=/go - - DB_HOST=postgres - - DB_USER=bit4sat - - DB_PASS=bit4sat + - SQL_DB_HOST=postgres + - SQL_DB_USER=bit4sat + - SQL_DB_PASS=bit4sat - LN_CHARGE_API=ln-charge-test:9112 - LN_CHARGE_TOKEN=3emU3Fy8VasHCzMaMXHSVJYpQSqH3yXQj8N5cQFBbq3botrudJuR7zQkBBmFSbAmgXs9GD4j4U3J4R2sMfgqPo8q - SESSION_SECRET=Ai7fCy36UE5cb9wcmdAxxRXwYyQDsDMr6rYocA6Eava7pdiB29EusLbb9sTYWS1e diff --git a/go.mod b/go.mod index d0e8b25..d376abd 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/mattn/go-isatty v0.0.7 // indirect github.com/mattn/go-sqlite3 v1.10.0 github.com/mediocregopher/radix/v3 v3.2.3 + github.com/namsral/flag v1.7.4-pre github.com/segmentio/ksuid v1.0.2 github.com/stretchr/testify v1.3.0 // indirect github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf diff --git a/go.sum b/go.sum index 8b05412..911c8a4 100644 --- a/go.sum +++ b/go.sum @@ -62,6 +62,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/namsral/flag v1.7.4-pre h1:b2ScHhoCUkbsq0d2C15Mv+VU8bl8hAXV8arnWiOHNZs= +github.com/namsral/flag v1.7.4-pre/go.mod h1:OXldTctbM6SWH1K899kPZcf65KxJiD7MsceFUpB5yDo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/quasoft/memstore v0.0.0-20180925164028-84a050167438/go.mod h1:wTPjTepVu7uJBYgZ0SdWHQlIas582j6cn2jgk4DDdlg= diff --git a/ln/api.go b/ln/api.go index ab73c2a..6c94a76 100644 --- a/ln/api.go +++ b/ln/api.go @@ -8,10 +8,10 @@ import ( "net" "net/http" "net/url" - "os" "strconv" "time" + "git.sp4ke.com/sp4ke/bit4sat/config" "github.com/gin-gonic/gin" ) @@ -87,8 +87,12 @@ func PollPaidInvoice(in *Invoice, paidChan chan<- *Invoice) { func UploadInvoice(amount float32, curr Currency, uploadId string) (*Invoice, error) { + webhookUrl := fmt.Sprintf("http://%s:%s/%s", config.ApiHost, config.ApiPort, + config.ChargeCallbackEndpoint) + reqData := gin.H{ "description": fmt.Sprintf("bit4sat upload: %s", uploadId), + "webhook": webhookUrl, } // If Satoshi convert to millisat @@ -124,8 +128,8 @@ func UploadInvoice(amount float32, curr Currency, uploadId string) (*Invoice, er func getUrl(endpoint string) string { url, err := url.Parse(fmt.Sprintf("http://api-token:%s@%s/%s", - LNChargeToken, - LNCEndpoint, + config.LnChargeToken, + config.LnChargeApi, endpoint)) if err != nil { log.Fatal(err) @@ -135,20 +139,6 @@ func getUrl(endpoint string) string { } -func init() { - var set bool - LNCEndpoint, set = os.LookupEnv(LNChargeAPIEnv) - if !set { - log.Fatalf("%s not set", LNChargeAPIEnv) - } - - LNChargeToken, set = os.LookupEnv(LNChargeTokenEnv) - if !set { - log.Fatalf("%s not set", LNChargeAPIEnv) - } - -} - func newClient() *http.Client { netTransport := &http.Transport{ Dial: (&net.Dialer{ diff --git a/ln/invoice.go b/ln/invoice.go index 867b5d8..9f1b490 100644 --- a/ln/invoice.go +++ b/ln/invoice.go @@ -1,6 +1,8 @@ package ln import ( + "fmt" + "strconv" "time" ) @@ -15,18 +17,11 @@ const ( ) const ( - LNChargeAPIEnv = "LN_CHARGE_API" - LNChargeTokenEnv = "LN_CHARGE_TOKEN" InfoEndpoint = "info" InvoiceEndpoint = "invoice" PollInvoiceEndpoint = "invoice" ) -var ( - LNCEndpoint string - LNChargeToken string -) - var ( CurrencyString = map[Currency]string{ CurUSD: "USD", @@ -45,17 +40,35 @@ var ( } ) +type timestamp time.Time + +func (t *timestamp) UnmarshalJSON(in []byte) error { + val, err := strconv.Atoi(string(in)) + if err != nil { + return fmt.Errorf("cannot unmarshal timestamp int") + } + + parsedTime := time.Unix(int64(val), 0) + *t = timestamp(parsedTime) + + return nil +} + +func (t timestamp) String() string { + return time.Time(t).Format(time.RFC3339) +} + type Invoice struct { Id string `json:"id"` - CreatedAt time.Time `json:"created_at"` + UploadId string `json:"upload_id"` Description string `json:"description"` - ExpiresAt time.Time `json:"expires_at"` Msatoshi string `json:"msatoshi"` Payreq string `json:"payreq"` RHash string `json:"rhash"` Status string `json:"status"` QuotedCurrency string `json:"quoted_currency"` QuotedAmount string `json:"quoted_amount"` - PaidAt time.Time `json:"paid_at"` - UploadId string `json:"upload_id"` + PaidAt timestamp `json:"paid_at"` + CreatedAt timestamp `json:"created_at"` + ExpiresAt timestamp `json:"expires_at"` } diff --git a/main.go b/main.go index 1ae9321..064e5fc 100644 --- a/main.go +++ b/main.go @@ -1,17 +1,16 @@ package main import ( + "git.sp4ke.com/sp4ke/bit4sat/api" "git.sp4ke.com/sp4ke/bit4sat/db" - "git.sp4ke.com/sp4ke/bit4sat/watchers" ) func main() { defer db.DB.Sql.Close() - api := NewAPI() - invoiceWatcher := watchers.NewInvoiceWatcher() - - go invoiceWatcher.Run() + api := api.NewAPI() + //invoiceWatcher := watchers.NewInvoiceWatcher() + //go invoiceWatcher.Run() api.Run() } diff --git a/session.go b/session.go deleted file mode 100644 index c851875..0000000 --- a/session.go +++ /dev/null @@ -1,24 +0,0 @@ -package main - -import ( - "log" - "os" -) - -const ( - SessionSecretEnv = "SESSION_SECRET" -) - -var ( - SessionSecret string -) - -func init() { - var set bool - - SessionSecret, set = os.LookupEnv(SessionSecretEnv) - if !set { - log.Fatalf("%s not set", SessionSecretEnv) - } - -} diff --git a/storage/storage.go b/storage/storage.go index 7538fc8..d77ac3d 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -9,20 +9,12 @@ import ( "os" "path/filepath" + "git.sp4ke.com/sp4ke/bit4sat/config" "git.sp4ke.com/sp4ke/bit4sat/utils" ) -const ( - StoragePath = "file-storage" - StoragePathEnv = "BIT4SAT_STORAGE_PATH" -) - -var ( - RuntimeStoragePath string -) - func GetStoreDestination(filename string) string { - return filepath.Join(RuntimeStoragePath, filename) + return filepath.Join(config.StoragePath, filename) } func StoreFormFile(src multipart.File, filename string) error { @@ -68,14 +60,7 @@ func CheckFileExists(file string) (bool, error) { } func init() { - path, set := os.LookupEnv(StoragePathEnv) - if !set { - path = StoragePath - } - - RuntimeStoragePath = path - - err := utils.Mkdir(path) + err := utils.Mkdir(config.StoragePath) if err != nil { log.Fatal(err) } diff --git a/storage/upload_ctrl.go b/storage/upload_ctrl.go index eb9476e..a144187 100644 --- a/storage/upload_ctrl.go +++ b/storage/upload_ctrl.go @@ -95,24 +95,25 @@ func (ctrl UploadCtrl) New(c *gin.Context) { return } - err = SetUploadStatus(uploadId, UpNew) - if err != nil { - utils.JSONErrPriv(c, http.StatusInternalServerError, err) - return - } - // If not free upload generate a an invoice // TODO: check if upload fee is free if true { // New invoice for 10$ invoice, err := ln.UploadInvoice(100, ln.CurSat, uploadId) + if err != nil { + utils.JSONErrPriv(c, http.StatusInternalServerError, err) + return + } + invoice.UploadId = uploadId - //info, err := ln.Info() + // Update Upload Invoice + err = SetUploadInvoice(invoice.UploadId, invoice) if err != nil { utils.JSONErrPriv(c, http.StatusInternalServerError, err) return + } err = SetUploadStatus(uploadId, UpWaitingPayment) @@ -123,12 +124,12 @@ func (ctrl UploadCtrl) New(c *gin.Context) { // Start watching upload payment status for this client - err = WatchUploadPayment(invoice) - if err != nil { - log.Println("error watching payment ", err) - utils.JSONErrPriv(c, http.StatusInternalServerError, err) - return - } + //err = WatchUploadPayment(invoice) + //if err != nil { + //log.Println("error watching payment ", err) + //utils.JSONErrPriv(c, http.StatusInternalServerError, err) + //return + //} result := gin.H{ "upload_id": uploadId, @@ -142,10 +143,9 @@ func (ctrl UploadCtrl) New(c *gin.Context) { return + // Handle free uploads } else { - //Else, free upload accepted - log.Println("new upload created") c.JSON(http.StatusOK, gin.H{ "status": "ready for upload", diff --git a/storage/upload_model.go b/storage/upload_model.go index d31d35d..2ced60a 100644 --- a/storage/upload_model.go +++ b/storage/upload_model.go @@ -121,14 +121,21 @@ func SetUploadStatus(id string, status int) error { } func SetUploadInvoice(uploadId string, invoice *ln.Invoice) error { - key := fmt.Sprintf("upload_%s_invoice", uploadId) + uploadInvoiceKey := fmt.Sprintf("upload_%s_invoice", uploadId) invoiceJson, err := json.Marshal(invoice) if err != nil { return err } - return DB.Redis.Do(radix.FlatCmd(nil, "SET", key, invoiceJson)) + err = DB.Redis.Do(radix.FlatCmd(nil, "SET", uploadInvoiceKey, invoiceJson)) + if err != nil { + return err + } + + // Set inverse relation + invoiceUploadKey := fmt.Sprintf("invoice_%s_upload", invoice.Id) + return DB.Redis.Do(radix.FlatCmd(nil, "SET", invoiceUploadKey, uploadId)) } func WatchUploadPayment(invoice *ln.Invoice) error { diff --git a/watchers/invoices.go b/watchers/invoices.go deleted file mode 100644 index 4786f56..0000000 --- a/watchers/invoices.go +++ /dev/null @@ -1,85 +0,0 @@ -package watchers - -import ( - "encoding/json" - "fmt" - "log" - - "git.sp4ke.com/sp4ke/bit4sat/bus" - "git.sp4ke.com/sp4ke/bit4sat/db" - "git.sp4ke.com/sp4ke/bit4sat/ln" - "git.sp4ke.com/sp4ke/bit4sat/storage" - "github.com/gin-gonic/gin" - "github.com/mediocregopher/radix/v3" -) - -type InvoiceWatcher struct { - listenWatchInvoice chan radix.PubSubMessage - invoicePaid chan *ln.Invoice -} - -func (w *InvoiceWatcher) Run() { - if err := db.DB.RedisPubSub.Subscribe(w.listenWatchInvoice, - bus.WatchInvoicesChannelName); err != nil { - panic(err) - } - - for { - select { - case msg := <-w.listenWatchInvoice: - log.Printf("checking new invoice %s", msg.Message) - - invoice := ln.Invoice{} - err := json.Unmarshal(msg.Message, &invoice) - if err != nil { - panic(err) - } - - //TODO: kill goroutine if listening websocket dies - go ln.PollPaidInvoice(&invoice, w.invoicePaid) - - case invoice := <-w.invoicePaid: - log.Printf("recevied update for invoice <%s>", invoice.UploadId) - // Set upload status to UpWaitingStorage (paid) - err := storage.SetUploadStatus(invoice.UploadId, storage.UpWaitingStorage) - if err != nil { - panic(err) - } - - // Update Upload Invoice - err = storage.SetUploadInvoice(invoice.UploadId, invoice) - if err != nil { - panic(err) - } - - // publish invoice was updated to upload_id_paid channel - msg := bus.Message{} - msg.Type = bus.PaymentReceived - msg.UploadId = invoice.UploadId - msg.Data = gin.H{ - "invoice": invoice, - } - - msgJson, err := json.Marshal(msg) - if err != nil { - panic(err) - } - - log.Printf("Notifying upload paid %s", invoice.UploadId) - key := fmt.Sprintf("%s_%s", bus.UploadUpdateChannelPrefix, - invoice.UploadId) - - err = db.DB.Redis.Do(radix.FlatCmd(nil, "PUBLISH", - key, msgJson)) - - } - } - -} - -func NewInvoiceWatcher() *InvoiceWatcher { - return &InvoiceWatcher{ - listenWatchInvoice: make(chan radix.PubSubMessage), - invoicePaid: make(chan *ln.Invoice), - } -} diff --git a/web/src/Websocket.js b/web/src/Websocket.js index e2fed75..8755dbd 100644 --- a/web/src/Websocket.js +++ b/web/src/Websocket.js @@ -10,7 +10,7 @@ export class WS { return singleton; } - const endpoint = `ws://localhost:2015/ws/`; + const endpoint = `ws://localhost:2015/ws`; this.conn = new WebSocket(endpoint); this.conn.onerror = this.onerror(); this.conn.onclose = this.onclose(); @@ -43,10 +43,10 @@ export class WS { return (ev) => { console.warn("websocket closed"); //TODO: restore on prod - //setTimeout(() =>{ - //console.log("websocket: trying to reconnect") - //instance = new WS() - //}, 5000) + setTimeout(() =>{ + console.log("websocket: trying to reconnect") + instance = new WS() + }, 3000) } } diff --git a/ws/server.go b/ws/server.go index fb427e6..150e6fa 100644 --- a/ws/server.go +++ b/ws/server.go @@ -19,10 +19,10 @@ import ( const ( // Time allowed to write message to peer - writeWait = 10 * time.Second + writeWait = 30 * time.Second // Time allowed to read the next pong message from the client. - pongWait = 5 * time.Second + pongWait = 60 * time.Second // Send pings to client with this period. Must be less than pongWait. pingPeriod = (pongWait * 9) / 10 @@ -101,8 +101,7 @@ func (c *Client) readPump() { func (c *Client) writePump() { pingTicker := time.NewTicker(pingPeriod) defer func() { - log.Println("quit ws server") - + log.Println("websocket closing") pingTicker.Stop() c.conn.Close() }() @@ -126,14 +125,15 @@ func (c *Client) writePump() { for { select { case msg := <-c.subChannel: - log.Printf("received msg %s on socket main channel", msg) + //log.Printf("received msg %s on socket main channel", msg) jsonMsg := bus.Message{} if err := json.Unmarshal(msg.Message, &jsonMsg); err != nil { log.Printf("ws error reading from pubsub: %s", err) - continue + break } if jsonMsg.Type == bus.SetUploadId { + log.Printf("registering uploadId: %s to socket", jsonMsg.UploadId) c.uploadId = jsonMsg.UploadId } @@ -202,32 +202,6 @@ func (c *Client) writePump() { } - //case <-testTicker.C: - //c.conn.SetWriteDeadline(time.Now().Add(writeWait)) - - //msg := gin.H{ - //"msg": "hello", - //"data": "from server", - //} - - //w, err := c.conn.NextWriter(websocket.TextMessage) - //if err != nil { - //log.Println(err) - //return - //} - - //enc := json.NewEncoder(w) - //err = enc.Encode(msg) - //if err != nil { - //log.Println(err) - //return - //} - - //if err = w.Close(); err != nil { - //log.Println(err) - //return - //} - case <-pingTicker.C: //log.Println("ping") @@ -241,24 +215,39 @@ func (c *Client) writePump() { } func Serve(c *gin.Context) { + var err error log.Println("websocket request") session := sessions.Default(c) - // Create unique id for this session in order to store it - // in the websocket bus + // First check if cookie is already set with upload id in case this is a + // reconnection + // + var finalSessId ksuid.KSUID - socketSessionId, err := ksuid.NewRandomWithTime(time.Now()) - if err != nil { - log.Println("error generating ksuid for websocket") - } - - log.Printf("using socket id: %s", socketSessionId) + socketSessionId := session.Get(WebsocketIdName) + if socketSessionId == nil { - session.Set(WebsocketIdName, socketSessionId.String()) - session.Save() + // Create unique id for this session in order to store it + // in the websocket bus + finalSessId, err = ksuid.NewRandomWithTime(time.Now()) + if err != nil { + log.Println("error generating ksuid for websocket") + return + } - log.Printf("Writing socket session id to header: %s", c.Writer.Header()) + log.Printf("using socket id: %s", finalSessId) + session.Set(WebsocketIdName, finalSessId.String()) + session.Save() + log.Printf("Writing socket session id to header: %s", c.Writer.Header()) + } else { + // Reuse socket session id + log.Printf("reusing websocket id %s", socketSessionId) + finalSessId, err = ksuid.Parse(socketSessionId.(string)) + if err != nil { + log.Println("could not parse websocket session id") + } + } conn, err := upgrader.Upgrade(c.Writer, c.Request, c.Writer.Header()) if err != nil { @@ -271,11 +260,11 @@ func Serve(c *gin.Context) { } client := &Client{ - id: socketSessionId, + id: finalSessId, conn: conn, subChannel: make(chan radix.PubSubMessage), uploadNotifChannel: make(chan radix.PubSubMessage), - channelName: fmt.Sprintf("%s_%s", bus.WebsocketPubSubPrefix, socketSessionId), + channelName: fmt.Sprintf("%s_%s", bus.WebsocketPubSubPrefix, finalSessId), } C[client] = true