From fd7cec6600288a8f073035d03abcbe77a6b3b4b6 Mon Sep 17 00:00:00 2001 From: Chakib Benziane Date: Thu, 4 Apr 2019 00:12:03 +0200 Subject: [PATCH] Working lnd grpc with invoice polling --- bus/messages.go | 1 + ln/api.go | 51 +++++++++--- ln/invoice.go | 22 ++++- lndrpc/commands.go | 10 +-- lndrpc/grpc.go | 2 +- lndrpc/invoice_watcher.go | 77 ------------------ main.go | 16 ++-- storage/upload_ctrl.go | 4 +- storage/upload_model.go | 9 ++- watchers/invoices.go | 164 ++++++++++++++++++++++++++++++++++++++ 10 files changed, 253 insertions(+), 103 deletions(-) delete mode 100644 lndrpc/invoice_watcher.go create mode 100644 watchers/invoices.go diff --git a/bus/messages.go b/bus/messages.go index abc9531..faa06eb 100644 --- a/bus/messages.go +++ b/bus/messages.go @@ -5,6 +5,7 @@ const ( WatchInvoicesChannelName = "watch_invoices" WebsocketPubSubPrefix = "websocket_update" UploadUpdateChannelPrefix = "upload_update" + InvoicePaidChannelPrefix = "invoice:paid" ) // PubSub event types diff --git a/ln/api.go b/ln/api.go index a19fbd7..4866c66 100644 --- a/ln/api.go +++ b/ln/api.go @@ -8,12 +8,16 @@ import ( "net/http" "net/url" "strconv" + "strings" "git.sp4ke.com/sp4ke/bit4sat/btc" + "git.sp4ke.com/sp4ke/bit4sat/bus" "git.sp4ke.com/sp4ke/bit4sat/config" + "git.sp4ke.com/sp4ke/bit4sat/db" "git.sp4ke.com/sp4ke/bit4sat/lndrpc" "git.sp4ke.com/sp4ke/bit4sat/utils" "github.com/gin-gonic/gin" + "github.com/mediocregopher/radix/v3" ) var ( @@ -34,25 +38,52 @@ func Info() (gin.H, error) { return result, nil } -// Shoudl be called in goroutine +// Quick check if invoice was paid func CheckInvoice(id string) (*Invoice, error) { - invoice := Invoice{} - reqUri := fmt.Sprintf("%s/%s", getUrl(InvoiceEndpoint), id) - log.Printf("checking invoice %s", reqUri) - resp, err := http.Get(reqUri) + lnInvoice, err := lndrpc.LookupInvoiceRhashStr(id) if err != nil { return nil, err } - jsonDec := json.NewDecoder(resp.Body) - err = jsonDec.Decode(&invoice) + invoice := InvoiceFromLndIn(lnInvoice) - return &invoice, err + return invoice, nil } -// This will watch on the pubsub for paid invoices -func PollPaidInvoice(invoiceId string, invoiceChan chan<- *Invoice, errorChan chan<- error) { +// This will watch on the pubsub for paid invoices given an invoice id +func PollPaidInvoice(invoiceId string, invoicePaidChan chan<- *Invoice, errorChan chan<- error) { + + watchPaidChannel := make(chan radix.PubSubMessage) + + busName := fmt.Sprintf("%s_*", bus.InvoicePaidChannelPrefix) + + err := db.DB.RedisPubSub.PSubscribe(watchPaidChannel, busName) + if err != nil { + errorChan <- err + return + } + + for { + msg := <-watchPaidChannel + + targetInvoiceId := strings.Split(msg.Channel, "_")[1] + if targetInvoiceId == invoiceId { + + invoice := Invoice{} + + err := json.Unmarshal(msg.Message, &invoice) + if err != nil { + errorChan <- err + return + } + + invoicePaidChan <- &invoice + return + + } + + } } func PollPaidInvoiceTMP(invoiceId string, invoiceChan chan<- *Invoice, errorChan chan<- error) { diff --git a/ln/invoice.go b/ln/invoice.go index 8422498..5b9d8ed 100644 --- a/ln/invoice.go +++ b/ln/invoice.go @@ -87,6 +87,7 @@ func (t timestamp) String() string { type Invoice struct { AddIndex uint64 `json:"-"` + SettleIndex uint64 `json:"-"` Description string `json:"description"` Msatoshi float64 `json:"msatoshi"` Payreq string `json:"payreq"` @@ -108,6 +109,7 @@ func (i *Invoice) UnmarshalBinary(b []byte) error { return json.Unmarshal(b, &i) } +// Create new Invoice from lnrpc.Invoice func InvoiceFromLndIn(lndInvoice *lnrpc.Invoice) *Invoice { invoice := Invoice{ @@ -117,7 +119,6 @@ func InvoiceFromLndIn(lndInvoice *lnrpc.Invoice) *Invoice { Payreq: lndInvoice.PaymentRequest, RHash: hex.EncodeToString(lndInvoice.RHash), CreatedAt: timestamp(time.Unix(lndInvoice.CreationDate, 0)), - PaidAt: timestamp(time.Unix(lndInvoice.SettleDate, 0)), Status: InvoiceStatus[UnPaid], } @@ -135,3 +136,22 @@ func InvoiceFromLndIn(lndInvoice *lnrpc.Invoice) *Invoice { return &invoice } + +// Update stored invoice from lnd invoice +func UpdateInvoiceFromLnd(storedIn *Invoice, newIn *lnrpc.Invoice) *Invoice { + storedIn.SettleIndex = newIn.SettleIndex + storedIn.PaidAt = timestamp(time.Unix(newIn.SettleDate, 0)) + + // Calculate status + if newIn.Settled { + storedIn.Status = InvoiceStatus[Paid] + } + + // Handle expired status + if time.Now().After(time.Time(storedIn.ExpiresAt)) { + storedIn.Status = InvoiceStatus[Expired] + storedIn.Expired = true + } + + return storedIn +} diff --git a/lndrpc/commands.go b/lndrpc/commands.go index d8a8c3d..2d6054c 100644 --- a/lndrpc/commands.go +++ b/lndrpc/commands.go @@ -11,7 +11,7 @@ import ( func GetInfo() error { log.Println("get info") ctxb := context.Background() - client, cleanUp := getClient() + client, cleanUp := GetClient() defer cleanUp() req := &lnrpc.GetInfoRequest{} @@ -27,7 +27,7 @@ func GetInfo() error { func lookupInvoiceRhash(rhash []byte) (*lnrpc.Invoice, error) { ctxb := context.Background() - client, cleanUp := getClient() + client, cleanUp := GetClient() defer cleanUp() req := &lnrpc.PaymentHash{ @@ -43,10 +43,10 @@ func lookupInvoiceRhash(rhash []byte) (*lnrpc.Invoice, error) { return createdInvoice, nil } -func lookupInvoiceRhashStr(rhash string) (*lnrpc.Invoice, error) { +func LookupInvoiceRhashStr(rhash string) (*lnrpc.Invoice, error) { ctxb := context.Background() - client, cleanUp := getClient() + client, cleanUp := GetClient() defer cleanUp() req := &lnrpc.PaymentHash{ @@ -65,7 +65,7 @@ func lookupInvoiceRhashStr(rhash string) (*lnrpc.Invoice, error) { func AddInvoiceSat(desc string, satVal int64) (*lnrpc.Invoice, error) { ctxb := context.Background() - client, cleanUp := getClient() + client, cleanUp := GetClient() defer cleanUp() invoice := &lnrpc.Invoice{ diff --git a/lndrpc/grpc.go b/lndrpc/grpc.go index b02e069..5110d87 100644 --- a/lndrpc/grpc.go +++ b/lndrpc/grpc.go @@ -20,7 +20,7 @@ func fatal(err error) { log.Fatal(err) } -func getClient() (lnrpc.LightningClient, func()) { +func GetClient() (lnrpc.LightningClient, func()) { conn := getClientConn() cleanUp := func() { diff --git a/lndrpc/invoice_watcher.go b/lndrpc/invoice_watcher.go deleted file mode 100644 index feddf3c..0000000 --- a/lndrpc/invoice_watcher.go +++ /dev/null @@ -1,77 +0,0 @@ -package lndrpc - -import ( - "io" - "log" - - "git.sp4ke.com/sp4ke/bit4sat/db" - lnrpc "github.com/lightningnetwork/lnd/lnrpc" - "github.com/mediocregopher/radix/v3" - "golang.org/x/net/context" -) - -const ( - LastSettledInvoiceIndexKey = "last_invoice_settled_index_cursor" -) - -// Last watched invoice -func getLastSettledInvoiceIndex() uint64 { - var cursor uint64 - - err := db.DB.Redis.Do(radix.FlatCmd(&cursor, "GET", LastSettledInvoiceIndexKey)) - if err != nil { - log.Fatal(err) - } - - return cursor -} - -func setLastSettledInvoiceIndex(index uint64) { - err := db.DB.Redis.Do(radix.FlatCmd(nil, "SET", LastSettledInvoiceIndexKey, index)) - if err != nil { - log.Fatal(err) - } -} - -// Runs in a goroutine and keep swatching invoices -func WatchInvoice(settleIndex uint64) { - - // Get the last invoice index cursor - cursor := getLastSettledInvoiceIndex() - log.Println(cursor) - - ctxb := context.Background() - client, cleanUp := getClient() - defer cleanUp() - - subscription := &lnrpc.InvoiceSubscription{} - - invoiceStream, err := client.SubscribeInvoices(ctxb, subscription) - if err != nil { - //return nil, err - } - - for { - invoice, err := invoiceStream.Recv() - if err == io.EOF { - break - } - - if err != nil { - log.Printf("error invoice stream %s", err) - break - } - - log.Printf("Received invoice \n%s\n", invoice) - - // We are only interested in settled invoices - if !invoice.Settled { - continue - } - - // Invoice was settled we need to save it and also notify the pubsub channel - // Save last settled index - setLastSettledInvoiceIndex(invoice.SettleIndex) - } - -} diff --git a/main.go b/main.go index a51a861..c9b8fb5 100644 --- a/main.go +++ b/main.go @@ -1,14 +1,18 @@ package main -import "git.sp4ke.com/sp4ke/bit4sat/lndrpc" +import ( + "git.sp4ke.com/sp4ke/bit4sat/api" + "git.sp4ke.com/sp4ke/bit4sat/db" + "git.sp4ke.com/sp4ke/bit4sat/watchers" +) func main() { - go lndrpc.WatchInvoice(0) - <-make(chan bool) + go watchers.WatchInvoice() + //<-make(chan bool) - //defer db.DB.Sql.Close() + defer db.DB.Sql.Close() - //api := api.NewAPI() - //api.Run() + api := api.NewAPI() + api.Run() } diff --git a/storage/upload_ctrl.go b/storage/upload_ctrl.go index 69ad986..8a82a4d 100644 --- a/storage/upload_ctrl.go +++ b/storage/upload_ctrl.go @@ -162,7 +162,7 @@ func (ctrl UploadCtrl) CheckStatus(c *gin.Context) { } // Get invoice id for upload - invoiceId, err := GetUploadInvoiceId(uploadId) + invoiceId, err := GetUploadIdInvoice(uploadId) if err != nil { utils.JSONErrPriv(c, http.StatusInternalServerError, err) return @@ -222,7 +222,7 @@ func (ctrl UploadCtrl) PollStatus(c *gin.Context) { invoiceChan := make(chan *ln.Invoice) errorChan := make(chan error) - log.Println("starting go routine") + log.Printf("starting invoice poll for %s", invoice.RHash) // If waiting payment, wait until invoice is paid go ln.PollPaidInvoice(invoice.RHash, invoiceChan, errorChan) diff --git a/storage/upload_model.go b/storage/upload_model.go index 09eb2cf..7aa8599 100644 --- a/storage/upload_model.go +++ b/storage/upload_model.go @@ -219,7 +219,7 @@ func GetUploadInvoice(uploadId string) (*ln.Invoice, error) { return &invoice, nil } -func GetUploadInvoiceId(uploadId string) (string, error) { +func GetUploadIdInvoice(uploadId string) (string, error) { invoice, err := GetUploadInvoice(uploadId) return invoice.RHash, err @@ -279,6 +279,13 @@ func SetUploadInvoice(uploadId string, invoice *ln.Invoice) error { return DB.Redis.Do(radix.FlatCmd(nil, "SET", invoiceUploadKey, uploadId)) } +func GetUploadIdForInvoice(invoiceId string) (string, error) { + key := fmt.Sprintf("invoice_%s_upload", invoiceId) + + err := DB.Redis.Do(radix.FlatCmd(&invoiceId, "GET", key)) + return invoiceId, err +} + // Returns true if id exists in DB func IdExists(id string) (exists bool, err error) { key := fmt.Sprintf("upload_status_%s", id) diff --git a/watchers/invoices.go b/watchers/invoices.go new file mode 100644 index 0000000..6d89355 --- /dev/null +++ b/watchers/invoices.go @@ -0,0 +1,164 @@ +package watchers + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "io" + "log" + "time" + + "git.sp4ke.com/sp4ke/bit4sat/bus" + "git.sp4ke.com/sp4ke/bit4sat/db" + "git.sp4ke.com/sp4ke/bit4sat/ln" + "git.sp4ke.com/sp4ke/bit4sat/lndrpc" + "git.sp4ke.com/sp4ke/bit4sat/storage" + lnrpc "github.com/lightningnetwork/lnd/lnrpc" + "github.com/mediocregopher/radix/v3" + "golang.org/x/net/context" +) + +const ( + LastSettledInvoiceIndexKey = "last_invoice_settled_index_cursor" +) + +// Last watched invoice +func getLastSettledInvoiceIndex() uint64 { + var cursor uint64 + + err := db.DB.Redis.Do(radix.FlatCmd(&cursor, "GET", LastSettledInvoiceIndexKey)) + if err != nil { + log.Fatal(err) + } + + return cursor +} + +func setLastSettledInvoiceIndex(index uint64) { + err := db.DB.Redis.Do(radix.FlatCmd(nil, "SET", LastSettledInvoiceIndexKey, index)) + if err != nil { + log.Fatal(err) + } +} + +// Runs in a goroutine and keep swatching invoices +func WatchInvoice() { + + // Get the last invoice index cursor + cursor := getLastSettledInvoiceIndex() + log.Println(cursor) + + ctxb := context.Background() + client, cleanUp := lndrpc.GetClient() + defer cleanUp() + + subscription := &lnrpc.InvoiceSubscription{} + + invoiceStream, err := client.SubscribeInvoices(ctxb, subscription) + if err != nil { + //return nil, err + } + + for { + invoice, err := invoiceStream.Recv() + if err == io.EOF { + break + } + + if err != nil { + log.Printf("error invoice stream %s", err) + break + } + + log.Printf("Received invoice \n%s\n", invoice) + + // We are only interested in settled invoices + if !invoice.Settled { + continue + } + + // Invoice expired + if expiredInvoice(invoice.CreationDate, invoice.Expiry) { + handleExpiredInvoice(invoice) + } + + // Invoice was paid, we handle it + handleSettledInvoice(invoice) + } +} + +//TODO +func handleExpiredInvoice(invoice *lnrpc.Invoice) { + + // Update upload status to "expired" + uploadId, err := storage.GetUploadIdForInvoice(hex.EncodeToString(invoice.RHash)) + if err != nil { + log.Fatal(fmt.Errorf("handleExpiredInvoice: %s", err)) + } + + err = storage.SetUploadStatus(uploadId, storage.UpPayExpired) + if err != nil { + log.Fatal(fmt.Errorf("handleExpiredInvoice %s", err)) + } + + // No need to store the invoice + // TODO + // TODO: flush all invoices and uploads + // TODO +} + +func expiredInvoice(creation, expiry int64) bool { + createdAt := time.Unix(int64(creation), 0) + expiresAt := createdAt.Add(time.Second * time.Duration(expiry)) + + return time.Now().After(expiresAt) + +} + +func handleSettledInvoice(invoice *lnrpc.Invoice) { + + // we need to save it and also notify the pubsub channel + + // Save last settled index + setLastSettledInvoiceIndex(invoice.SettleIndex) + + // Update upload status to "paid" + uploadId, err := storage.GetUploadIdForInvoice(hex.EncodeToString(invoice.RHash)) + if err != nil { + log.Fatal(fmt.Errorf("handleSettledInvoice: %s", err)) + } + + err = storage.SetUploadStatus(uploadId, storage.UpPaid) + if err != nil { + log.Fatal(fmt.Errorf("handleSettledInvoice: %s", err)) + } + + // Get stored invoice for upload + storedInvoice, err := storage.GetUploadInvoice(uploadId) + if err != nil { + log.Fatal(fmt.Errorf("handleSettledInvoice: %s", err)) + } + + // Update stored invoice fields + newInvoice := ln.UpdateInvoiceFromLnd(storedInvoice, invoice) + + // Set invoice for upload + err = storage.SetUploadInvoice(uploadId, newInvoice) + if err != nil { + log.Fatal(fmt.Errorf("handleSettledInvoice: %s", err)) + } + + newInvoiceJson, err := json.Marshal(newInvoice) + if err != nil { + log.Fatal(fmt.Errorf("handleSettledInvoice: %s", err)) + } + + // publish invoice was updated to upload_id_paid channel + log.Printf("Notifying invoice paid %s", uploadId) + key := fmt.Sprintf("%s_%s", bus.InvoicePaidChannelPrefix, + newInvoice.RHash) + + err = db.DB.Redis.Do(radix.FlatCmd(nil, "PUBLISH", + key, newInvoiceJson)) + +}