From c1900d52a303ae09d44c2326f8c82285b1f4955c Mon Sep 17 00:00:00 2001 From: Chakib Benziane Date: Sat, 30 Mar 2019 19:42:41 +0100 Subject: [PATCH] implemented invoice watcher --- api.go | 2 +- bus/messages.go | 18 +++++++ db/db.go | 7 ++- ln/{charge.go => api.go} | 103 ++++++++++++--------------------------- ln/invoice.go | 102 +++++++++++++++----------------------- main.go | 4 +- storage/upload_ctrl.go | 7 +-- storage/upload_model.go | 26 ++-------- watchers/invoices.go | 79 ++++++++++++++++++++++++++++++ web/Caddyfile | 9 ++-- web/src/Websocket.js | 6 ++- web/src/index.js | 1 - web/src/worker.js | 21 +++++++- ws/server.go | 5 +- 14 files changed, 218 insertions(+), 172 deletions(-) create mode 100644 bus/messages.go rename ln/{charge.go => api.go} (65%) create mode 100644 watchers/invoices.go diff --git a/api.go b/api.go index a506789..040356e 100644 --- a/api.go +++ b/api.go @@ -32,7 +32,7 @@ func (api *API) Run() { wsRoute.GET("/WSS", ws.Serve) } - api.router.Run(":8880") + api.router.Run("0.0.0.0:8880") } func NewAPI() *API { diff --git a/bus/messages.go b/bus/messages.go new file mode 100644 index 0000000..f4baec1 --- /dev/null +++ b/bus/messages.go @@ -0,0 +1,18 @@ +package bus + +// PubSub Update Types +const ( + UpdatePaymentReceived = iota +) + +// Channel names +const ( + WatchInvoicesChannelName = "watch_invoices" +) + +// Simple message used for update notifications in pub/sub +type UpdateMessage struct { + UploadId string + Status int // update type + Data interface{} +} diff --git a/db/db.go b/db/db.go index 5b182cd..f8d427a 100644 --- a/db/db.go +++ b/db/db.go @@ -25,8 +25,9 @@ var ( ) type Database struct { - Sql *sqlx.DB - Redis *radix.Pool + Sql *sqlx.DB + Redis *radix.Pool + RedisPubSub radix.PubSubConn } func (d *Database) Open() error { @@ -63,6 +64,8 @@ func (d *Database) Open() error { log.Fatal(err) } + d.RedisPubSub = radix.PersistentPubSub("tcp", "redis:6379", nil) + return nil } diff --git a/ln/charge.go b/ln/api.go similarity index 65% rename from ln/charge.go rename to ln/api.go index e628d6d..eabe0fb 100644 --- a/ln/charge.go +++ b/ln/api.go @@ -15,72 +15,14 @@ import ( "github.com/gin-gonic/gin" ) -type Currency int - -const ( - CurMSat Currency = iota - CurSat - CurBTC - CurUSD - CurEur -) - -const ( - LNChargeAPIEnv = "LN_CHARGE_API" - LNChargeTokenEnv = "LN_CHARGE_TOKEN" - InfoEndpoint = "info" - InvoiceEndpoint = "invoice" - PollInvoiceEndpoint = "invoice" -) - var ( - LNCEndpoint string - LNChargeToken string + Client *http.Client ) -var ( - CurrencyString = map[Currency]string{ - CurUSD: "USD", - CurSat: "SAT", - CurBTC: "BTC", - CurMSat: "MSAT", - CurEur: "EUR", - } - - CurrencyID = map[string]Currency{ - "USD": CurUSD, - "SAT": CurSat, - "MSAT": CurMSat, - "BTC": CurBTC, - "EUR": CurEur, - } -) - -type Charge struct { - client *http.Client -} - -func NewCharge() *Charge { - netTransport := &http.Transport{ - Dial: (&net.Dialer{ - Timeout: 5 * time.Second, - }).Dial, - TLSHandshakeTimeout: 5 * time.Second, - } - c := &http.Client{ - Timeout: time.Second * 10, - Transport: netTransport, - } - - return &Charge{ - client: c, - } -} - -func (c *Charge) Info() (gin.H, error) { +func Info() (gin.H, error) { result := make(gin.H) - resp, err := c.client.Get(getUrl(InfoEndpoint)) + resp, err := Client.Get(getUrl(InfoEndpoint)) if err != nil { return nil, err } @@ -91,14 +33,15 @@ func (c *Charge) Info() (gin.H, error) { return result, nil } -func (c *Charge) PollInvoice(id string) (*Invoice, error) { +// Shoudl be called in goroutine +func PollPaidInvoice(in *Invoice, paidChan chan<- *Invoice) { reqParams := url.Values{} // Timeout in seconds reqParams.Set("timeout", strconv.Itoa(30)) reqURI := fmt.Sprintf("%s/%s/wait?%s", getUrl(PollInvoiceEndpoint), - id, + in.Id, reqParams.Encode()) log.Printf("polling to %s", reqURI) @@ -110,16 +53,16 @@ func (c *Charge) PollInvoice(id string) (*Invoice, error) { log.Println("new poll") resp, err = http.Get(reqURI) if err != nil { - return nil, err + log.Printf("Error: ", err) } if resp.StatusCode == http.StatusPaymentRequired { + log.Printf("invoice %s not yet paid", in.Id) continue } if resp.StatusCode == http.StatusGone { - log.Printf("invoice expired") - return nil, fmt.Errorf("invoice expired") + log.Printf("invoice expired ", in.Id) } if resp.StatusCode == http.StatusOK { @@ -129,8 +72,7 @@ func (c *Charge) PollInvoice(id string) (*Invoice, error) { //unknownData := gin.H{} //jsonDec := json.NewDecoder(resp.Body) //jsonDec.Decode(&data) - log.Printf("unknown resopnse %s", resp.Status) - return nil, fmt.Errorf("unknown response %s", resp.Status) + log.Printf("InvoicePoll Error: unknown resopnse %s", resp.Status) } @@ -139,10 +81,10 @@ func (c *Charge) PollInvoice(id string) (*Invoice, error) { jsonDec.Decode(&invoice) log.Printf("Invoice paid %s", invoice) - return &invoice, nil + paidChan <- &invoice } -func (c *Charge) Invoice(amount float32, curr Currency, uploadId string) (*Invoice, error) { +func UploadInvoice(amount float32, curr Currency, uploadId string) (*Invoice, error) { reqData := gin.H{ "description": fmt.Sprintf("bit4sat upload: %s", uploadId), @@ -161,7 +103,7 @@ func (c *Charge) Invoice(amount float32, curr Currency, uploadId string) (*Invoi return nil, err } - resp, err := c.client.Post(getUrl(InvoiceEndpoint), + resp, err := Client.Post(getUrl(InvoiceEndpoint), "application/json", bytes.NewReader(jsonEnc)) @@ -203,3 +145,22 @@ func init() { } } + +func newClient() *http.Client { + netTransport := &http.Transport{ + Dial: (&net.Dialer{ + Timeout: 5 * time.Second, + }).Dial, + TLSHandshakeTimeout: 5 * time.Second, + } + c := &http.Client{ + Timeout: time.Second * 10, + Transport: netTransport, + } + + return c +} + +func init() { + Client = newClient() +} diff --git a/ln/invoice.go b/ln/invoice.go index 1387639..aca3c62 100644 --- a/ln/invoice.go +++ b/ln/invoice.go @@ -1,12 +1,48 @@ package ln import ( - "encoding/json" - "log" "time" +) + +type Currency int + +const ( + CurMSat Currency = iota + CurSat + CurBTC + CurUSD + CurEur +) + +const ( + LNChargeAPIEnv = "LN_CHARGE_API" + LNChargeTokenEnv = "LN_CHARGE_TOKEN" + InfoEndpoint = "info" + InvoiceEndpoint = "invoice" + PollInvoiceEndpoint = "invoice" +) + +var ( + LNCEndpoint string + LNChargeToken string +) + +var ( + CurrencyString = map[Currency]string{ + CurUSD: "USD", + CurSat: "SAT", + CurBTC: "BTC", + CurMSat: "MSAT", + CurEur: "EUR", + } - "git.sp4ke.com/sp4ke/bit4sat/db" - "github.com/mediocregopher/radix/v3" + CurrencyID = map[string]Currency{ + "USD": CurUSD, + "SAT": CurSat, + "MSAT": CurMSat, + "BTC": CurBTC, + "EUR": CurEur, + } ) type Invoice struct { @@ -20,61 +56,3 @@ type Invoice struct { Status string `json:"status"` UploadId string `json:"upload_id"` } - -type InvoiceWatcher struct{} - -func (w *InvoiceWatcher) Run() { - watchedKey := "watch_invoices" - - for { - var invoiceBlob []byte - - popTimeout := "0" //block until ready - err := db.DB.Redis.Do(radix.Cmd(&invoiceBlob, - "BLPOP", - watchedKey, - popTimeout)) - - if err != nil { - log.Println("error in InvoiceWatcher ", err) - continue - } - - invoice := Invoice{} - err = json.Unmarshal(invoiceBlob, &invoice) - if err != nil { - log.Println("error unmarshaling in InvoiceWatcher ", err) - } - - go func() { - - log.Println("Starting goroutine to watch invoice") - time.Sleep(10 * time.Second) - //charge := ln.NewCharge() - - // Blocking function - //rawInvoice, err := charge.PollInvoice(invoice.Id) - //if err != nil { - //return err - //} - - //err = SetUploadStatus(uploadId, UpWaitingStorage) - //if err != nil { - //log.Printf("Error setting upload status", err) - //} - - // Set paid upload invoice id - //err = SetPaidUploadInvoice(uploadId, invoice) - //if err != nil { - //return err - //} - - }() - - time.Sleep(1 * time.Second) - } -} - -func NewInvoiceWatcher() *InvoiceWatcher { - return &InvoiceWatcher{} -} diff --git a/main.go b/main.go index b163d92..1ae9321 100644 --- a/main.go +++ b/main.go @@ -2,14 +2,14 @@ package main import ( "git.sp4ke.com/sp4ke/bit4sat/db" - "git.sp4ke.com/sp4ke/bit4sat/ln" + "git.sp4ke.com/sp4ke/bit4sat/watchers" ) func main() { defer db.DB.Sql.Close() api := NewAPI() - invoiceWatcher := ln.NewInvoiceWatcher() + invoiceWatcher := watchers.NewInvoiceWatcher() go invoiceWatcher.Run() api.Run() diff --git a/storage/upload_ctrl.go b/storage/upload_ctrl.go index 03b77ca..e5609db 100644 --- a/storage/upload_ctrl.go +++ b/storage/upload_ctrl.go @@ -75,11 +75,8 @@ func (ctrl UploadCtrl) New(c *gin.Context) { // TODO: check if upload fee is free if true { - // Get a new lightning invoice - charge := ln.NewCharge() - // New invoice for 10$ - invoice, err := charge.Invoice(100, ln.CurSat, uploadId) + invoice, err := ln.UploadInvoice(100, ln.CurSat, uploadId) //info, err := ln.Info() if err != nil { @@ -93,7 +90,7 @@ func (ctrl UploadCtrl) New(c *gin.Context) { return } - err = PushWatchUploadPayment(invoice) + err = PublishWatchUploadPayment(invoice) if err != nil { log.Println("error watching payment ", err) utils.JSONErrPriv(c, http.StatusInternalServerError, err) diff --git a/storage/upload_model.go b/storage/upload_model.go index a3a2d44..900e4f9 100644 --- a/storage/upload_model.go +++ b/storage/upload_model.go @@ -7,6 +7,7 @@ import ( "fmt" "log" + "git.sp4ke.com/sp4ke/bit4sat/bus" "git.sp4ke.com/sp4ke/bit4sat/db" "git.sp4ke.com/sp4ke/bit4sat/ln" "github.com/jmoiron/sqlx" @@ -18,6 +19,7 @@ var DB = db.DB const ( //TODO: sync upload status from redis + // TODO: status is currently handled in cache not here DBUploadSchema = ` CREATE TABLE IF NOT EXISTS upload ( id serial PRIMARY KEY, @@ -129,34 +131,14 @@ func SetUploadInvoice(uploadId string, invoice *ln.Invoice) error { return DB.Redis.Do(radix.FlatCmd(nil, "SET", key, invoiceJson)) } -func PushWatchUploadPayment(invoice *ln.Invoice) error { - key := "watch_invoices" +func PublishWatchUploadPayment(invoice *ln.Invoice) error { invoiceJson, err := json.Marshal(invoice) if err != nil { return err } - return DB.Redis.Do(radix.FlatCmd(nil, "RPUSH", key, invoiceJson)) - - //log.Printf("Will watch upload payment for uID %s", uploadId) - - //charge := ln.NewCharge() - //invoice, err := charge.PollInvoice(invoiceId) - //if err != nil { - //return err - //} - - //err = SetUploadStatus(uploadId, UpWaitingStorage) - //if err != nil { - //return err - //} - - //// Set paid upload invoice id - //err = SetPaidUploadInvoice(uploadId, invoice) - //if err != nil { - //return err - //} + return DB.Redis.Do(radix.FlatCmd(nil, "PUBLISH", bus.WatchInvoicesChannelName, invoiceJson)) //TODO: Notify client on websocket diff --git a/watchers/invoices.go b/watchers/invoices.go new file mode 100644 index 0000000..2d2614e --- /dev/null +++ b/watchers/invoices.go @@ -0,0 +1,79 @@ +package watchers + +import ( + "encoding/json" + "fmt" + "log" + + "git.sp4ke.com/sp4ke/bit4sat/bus" + "git.sp4ke.com/sp4ke/bit4sat/db" + "git.sp4ke.com/sp4ke/bit4sat/ln" + "git.sp4ke.com/sp4ke/bit4sat/storage" + "github.com/mediocregopher/radix/v3" +) + +type InvoiceWatcher struct { + listenWatchInvoice chan radix.PubSubMessage + invoicePaid chan *ln.Invoice +} + +func (w *InvoiceWatcher) Run() { + if err := db.DB.RedisPubSub.Subscribe(w.listenWatchInvoice, + bus.WatchInvoicesChannelName); err != nil { + panic(err) + } + + for { + select { + case msg := <-w.listenWatchInvoice: + log.Printf("checking new invoice %s", msg.Message) + + invoice := ln.Invoice{} + err := json.Unmarshal(msg.Message, &invoice) + if err != nil { + panic(err) + } + + go ln.PollPaidInvoice(&invoice, w.invoicePaid) + + case invoice := <-w.invoicePaid: + // Set upload status to UpWaitingStorage (paid) + err := storage.SetUploadStatus(invoice.UploadId, storage.UpWaitingStorage) + if err != nil { + panic(err) + } + + // Update Upload Invoice + err = storage.SetUploadInvoice(invoice.UploadId, invoice) + if err != nil { + panic(err) + } + + // publish invoice was updated to upload_id_paid channel + updateMsg := bus.UpdateMessage{} + updateMsg.Status = bus.UpdatePaymentReceived + updateMsg.UploadId = invoice.UploadId + updateMsg.Data = invoice + + updateMsgJson, err := json.Marshal(updateMsg) + if err != nil { + panic(err) + } + + log.Printf("Notifying upload %s paid", invoice.UploadId) + key := fmt.Sprintf("upload_update_%s", invoice.UploadId) + + err = db.DB.Redis.Do(radix.FlatCmd(nil, "PUBLISH", + key, updateMsgJson)) + + } + } + +} + +func NewInvoiceWatcher() *InvoiceWatcher { + return &InvoiceWatcher{ + listenWatchInvoice: make(chan radix.PubSubMessage), + invoicePaid: make(chan *ln.Invoice), + } +} diff --git a/web/Caddyfile b/web/Caddyfile index 78c3c26..6c9d5cd 100644 --- a/web/Caddyfile +++ b/web/Caddyfile @@ -1,19 +1,22 @@ +#172.29.195.31 localhost root dist ext .html header / { - Content-Security-Policy "default-src * 'unsafe-eval' 'unsafe-inline' data:;" + Content-Security-Policy "default-src * 'unsafe-eval' 'unsafe-inline' data: ;" } proxy /api localhost:8880 -log /api stdout +#log /api stdout +log / stdout proxy /ws localhost:8880 { websocket transparent } -proxy /wss localhost:8880 + +#tls self_signed diff --git a/web/src/Websocket.js b/web/src/Websocket.js index 6a17cbd..e2fed75 100644 --- a/web/src/Websocket.js +++ b/web/src/Websocket.js @@ -57,9 +57,13 @@ export class WS { let msg = JSON.parse(ev.data); switch (msg.type) { - case "hello": + case 'hello': console.log(msg.data); break; + + case 'upload-accepted': + console.log(msg) + break; } } diff --git a/web/src/index.js b/web/src/index.js index 3bbfa53..69704f5 100644 --- a/web/src/index.js +++ b/web/src/index.js @@ -5,7 +5,6 @@ import App from './App.vue'; import GetWorker from './workerInterface.js'; import { WS } from './Websocket.js'; -window.ws = new WS() window.app = new Vue({ diff --git a/web/src/worker.js b/web/src/worker.js index 047630f..35ad5ed 100644 --- a/web/src/worker.js +++ b/web/src/worker.js @@ -2,9 +2,20 @@ //to keep the UI free import Api from './api.js' +import { WS } from './Websocket.js'; + +self.websocket = new WS() const name = 'main' +// TODO +//if (window.isSecureContext) { + //// Page is a secure context so service workers are now available + //navigator.serviceWorker.register("/offline-worker.js").then(function () { + //... + //}); +//} + function hexString(buffer) { const byteArray = new Uint8Array(buffer); @@ -50,10 +61,15 @@ async function newUpload(files){ let upload = new Api.Upload(filesMetadata, files) + // Ask permission to send a new upload await upload.create() .then(data => { // Get the upload id - let { result: { upload_id: id, invoice: invoice } } = data + let { result: { upload_id: id, invoice: invoice } } = data; + + + // Register to updates on our uploadId from websocket + //self.websocket.listenTo('upload-accepted') //postMessage({ //msg: 'upload-invoice', @@ -64,6 +80,9 @@ async function newUpload(files){ // Notify the UI postMessage({msg: 'upload-invoice', id: id, invoice: invoice}) + + + // Send the files //upload.send() //.then(resp =>{ diff --git a/ws/server.go b/ws/server.go index 874e858..4880b23 100644 --- a/ws/server.go +++ b/ws/server.go @@ -43,6 +43,8 @@ type Client struct { // Buffered channel of outbound messages send chan []byte + + // Listen to updates from redis } func (c *Client) readPump() { @@ -72,7 +74,6 @@ func (c *Client) readPump() { } func (c *Client) writePump() { - //testTicker := time.NewTicker(5 * time.Second) pingTicker := time.NewTicker(pingPeriod) defer func() { log.Println("quit ws server") @@ -81,6 +82,8 @@ func (c *Client) writePump() { c.conn.Close() }() + // Subscribe to upload notifications + for { select { //case <-testTicker.C: