Working lnd grpc with invoice polling

master
Chakib Benziane 5 years ago
parent 23abc0b369
commit fd7cec6600

@ -5,6 +5,7 @@ const (
WatchInvoicesChannelName = "watch_invoices"
WebsocketPubSubPrefix = "websocket_update"
UploadUpdateChannelPrefix = "upload_update"
InvoicePaidChannelPrefix = "invoice:paid"
)
// PubSub event types

@ -8,12 +8,16 @@ import (
"net/http"
"net/url"
"strconv"
"strings"
"git.sp4ke.com/sp4ke/bit4sat/btc"
"git.sp4ke.com/sp4ke/bit4sat/bus"
"git.sp4ke.com/sp4ke/bit4sat/config"
"git.sp4ke.com/sp4ke/bit4sat/db"
"git.sp4ke.com/sp4ke/bit4sat/lndrpc"
"git.sp4ke.com/sp4ke/bit4sat/utils"
"github.com/gin-gonic/gin"
"github.com/mediocregopher/radix/v3"
)
var (
@ -34,25 +38,52 @@ func Info() (gin.H, error) {
return result, nil
}
// Shoudl be called in goroutine
// Quick check if invoice was paid
func CheckInvoice(id string) (*Invoice, error) {
invoice := Invoice{}
reqUri := fmt.Sprintf("%s/%s", getUrl(InvoiceEndpoint), id)
log.Printf("checking invoice %s", reqUri)
resp, err := http.Get(reqUri)
lnInvoice, err := lndrpc.LookupInvoiceRhashStr(id)
if err != nil {
return nil, err
}
jsonDec := json.NewDecoder(resp.Body)
err = jsonDec.Decode(&invoice)
invoice := InvoiceFromLndIn(lnInvoice)
return &invoice, err
return invoice, nil
}
// This will watch on the pubsub for paid invoices
func PollPaidInvoice(invoiceId string, invoiceChan chan<- *Invoice, errorChan chan<- error) {
// This will watch on the pubsub for paid invoices given an invoice id
func PollPaidInvoice(invoiceId string, invoicePaidChan chan<- *Invoice, errorChan chan<- error) {
watchPaidChannel := make(chan radix.PubSubMessage)
busName := fmt.Sprintf("%s_*", bus.InvoicePaidChannelPrefix)
err := db.DB.RedisPubSub.PSubscribe(watchPaidChannel, busName)
if err != nil {
errorChan <- err
return
}
for {
msg := <-watchPaidChannel
targetInvoiceId := strings.Split(msg.Channel, "_")[1]
if targetInvoiceId == invoiceId {
invoice := Invoice{}
err := json.Unmarshal(msg.Message, &invoice)
if err != nil {
errorChan <- err
return
}
invoicePaidChan <- &invoice
return
}
}
}
func PollPaidInvoiceTMP(invoiceId string, invoiceChan chan<- *Invoice, errorChan chan<- error) {

@ -87,6 +87,7 @@ func (t timestamp) String() string {
type Invoice struct {
AddIndex uint64 `json:"-"`
SettleIndex uint64 `json:"-"`
Description string `json:"description"`
Msatoshi float64 `json:"msatoshi"`
Payreq string `json:"payreq"`
@ -108,6 +109,7 @@ func (i *Invoice) UnmarshalBinary(b []byte) error {
return json.Unmarshal(b, &i)
}
// Create new Invoice from lnrpc.Invoice
func InvoiceFromLndIn(lndInvoice *lnrpc.Invoice) *Invoice {
invoice := Invoice{
@ -117,7 +119,6 @@ func InvoiceFromLndIn(lndInvoice *lnrpc.Invoice) *Invoice {
Payreq: lndInvoice.PaymentRequest,
RHash: hex.EncodeToString(lndInvoice.RHash),
CreatedAt: timestamp(time.Unix(lndInvoice.CreationDate, 0)),
PaidAt: timestamp(time.Unix(lndInvoice.SettleDate, 0)),
Status: InvoiceStatus[UnPaid],
}
@ -135,3 +136,22 @@ func InvoiceFromLndIn(lndInvoice *lnrpc.Invoice) *Invoice {
return &invoice
}
// Update stored invoice from lnd invoice
func UpdateInvoiceFromLnd(storedIn *Invoice, newIn *lnrpc.Invoice) *Invoice {
storedIn.SettleIndex = newIn.SettleIndex
storedIn.PaidAt = timestamp(time.Unix(newIn.SettleDate, 0))
// Calculate status
if newIn.Settled {
storedIn.Status = InvoiceStatus[Paid]
}
// Handle expired status
if time.Now().After(time.Time(storedIn.ExpiresAt)) {
storedIn.Status = InvoiceStatus[Expired]
storedIn.Expired = true
}
return storedIn
}

@ -11,7 +11,7 @@ import (
func GetInfo() error {
log.Println("get info")
ctxb := context.Background()
client, cleanUp := getClient()
client, cleanUp := GetClient()
defer cleanUp()
req := &lnrpc.GetInfoRequest{}
@ -27,7 +27,7 @@ func GetInfo() error {
func lookupInvoiceRhash(rhash []byte) (*lnrpc.Invoice, error) {
ctxb := context.Background()
client, cleanUp := getClient()
client, cleanUp := GetClient()
defer cleanUp()
req := &lnrpc.PaymentHash{
@ -43,10 +43,10 @@ func lookupInvoiceRhash(rhash []byte) (*lnrpc.Invoice, error) {
return createdInvoice, nil
}
func lookupInvoiceRhashStr(rhash string) (*lnrpc.Invoice, error) {
func LookupInvoiceRhashStr(rhash string) (*lnrpc.Invoice, error) {
ctxb := context.Background()
client, cleanUp := getClient()
client, cleanUp := GetClient()
defer cleanUp()
req := &lnrpc.PaymentHash{
@ -65,7 +65,7 @@ func lookupInvoiceRhashStr(rhash string) (*lnrpc.Invoice, error) {
func AddInvoiceSat(desc string, satVal int64) (*lnrpc.Invoice, error) {
ctxb := context.Background()
client, cleanUp := getClient()
client, cleanUp := GetClient()
defer cleanUp()
invoice := &lnrpc.Invoice{

@ -20,7 +20,7 @@ func fatal(err error) {
log.Fatal(err)
}
func getClient() (lnrpc.LightningClient, func()) {
func GetClient() (lnrpc.LightningClient, func()) {
conn := getClientConn()
cleanUp := func() {

@ -1,77 +0,0 @@
package lndrpc
import (
"io"
"log"
"git.sp4ke.com/sp4ke/bit4sat/db"
lnrpc "github.com/lightningnetwork/lnd/lnrpc"
"github.com/mediocregopher/radix/v3"
"golang.org/x/net/context"
)
const (
LastSettledInvoiceIndexKey = "last_invoice_settled_index_cursor"
)
// Last watched invoice
func getLastSettledInvoiceIndex() uint64 {
var cursor uint64
err := db.DB.Redis.Do(radix.FlatCmd(&cursor, "GET", LastSettledInvoiceIndexKey))
if err != nil {
log.Fatal(err)
}
return cursor
}
func setLastSettledInvoiceIndex(index uint64) {
err := db.DB.Redis.Do(radix.FlatCmd(nil, "SET", LastSettledInvoiceIndexKey, index))
if err != nil {
log.Fatal(err)
}
}
// Runs in a goroutine and keep swatching invoices
func WatchInvoice(settleIndex uint64) {
// Get the last invoice index cursor
cursor := getLastSettledInvoiceIndex()
log.Println(cursor)
ctxb := context.Background()
client, cleanUp := getClient()
defer cleanUp()
subscription := &lnrpc.InvoiceSubscription{}
invoiceStream, err := client.SubscribeInvoices(ctxb, subscription)
if err != nil {
//return nil, err
}
for {
invoice, err := invoiceStream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("error invoice stream %s", err)
break
}
log.Printf("Received invoice \n%s\n", invoice)
// We are only interested in settled invoices
if !invoice.Settled {
continue
}
// Invoice was settled we need to save it and also notify the pubsub channel
// Save last settled index
setLastSettledInvoiceIndex(invoice.SettleIndex)
}
}

@ -1,14 +1,18 @@
package main
import "git.sp4ke.com/sp4ke/bit4sat/lndrpc"
import (
"git.sp4ke.com/sp4ke/bit4sat/api"
"git.sp4ke.com/sp4ke/bit4sat/db"
"git.sp4ke.com/sp4ke/bit4sat/watchers"
)
func main() {
go lndrpc.WatchInvoice(0)
<-make(chan bool)
go watchers.WatchInvoice()
//<-make(chan bool)
//defer db.DB.Sql.Close()
defer db.DB.Sql.Close()
//api := api.NewAPI()
//api.Run()
api := api.NewAPI()
api.Run()
}

@ -162,7 +162,7 @@ func (ctrl UploadCtrl) CheckStatus(c *gin.Context) {
}
// Get invoice id for upload
invoiceId, err := GetUploadInvoiceId(uploadId)
invoiceId, err := GetUploadIdInvoice(uploadId)
if err != nil {
utils.JSONErrPriv(c, http.StatusInternalServerError, err)
return
@ -222,7 +222,7 @@ func (ctrl UploadCtrl) PollStatus(c *gin.Context) {
invoiceChan := make(chan *ln.Invoice)
errorChan := make(chan error)
log.Println("starting go routine")
log.Printf("starting invoice poll for %s", invoice.RHash)
// If waiting payment, wait until invoice is paid
go ln.PollPaidInvoice(invoice.RHash, invoiceChan, errorChan)

@ -219,7 +219,7 @@ func GetUploadInvoice(uploadId string) (*ln.Invoice, error) {
return &invoice, nil
}
func GetUploadInvoiceId(uploadId string) (string, error) {
func GetUploadIdInvoice(uploadId string) (string, error) {
invoice, err := GetUploadInvoice(uploadId)
return invoice.RHash, err
@ -279,6 +279,13 @@ func SetUploadInvoice(uploadId string, invoice *ln.Invoice) error {
return DB.Redis.Do(radix.FlatCmd(nil, "SET", invoiceUploadKey, uploadId))
}
func GetUploadIdForInvoice(invoiceId string) (string, error) {
key := fmt.Sprintf("invoice_%s_upload", invoiceId)
err := DB.Redis.Do(radix.FlatCmd(&invoiceId, "GET", key))
return invoiceId, err
}
// Returns true if id exists in DB
func IdExists(id string) (exists bool, err error) {
key := fmt.Sprintf("upload_status_%s", id)

@ -0,0 +1,164 @@
package watchers
import (
"encoding/hex"
"encoding/json"
"fmt"
"io"
"log"
"time"
"git.sp4ke.com/sp4ke/bit4sat/bus"
"git.sp4ke.com/sp4ke/bit4sat/db"
"git.sp4ke.com/sp4ke/bit4sat/ln"
"git.sp4ke.com/sp4ke/bit4sat/lndrpc"
"git.sp4ke.com/sp4ke/bit4sat/storage"
lnrpc "github.com/lightningnetwork/lnd/lnrpc"
"github.com/mediocregopher/radix/v3"
"golang.org/x/net/context"
)
const (
LastSettledInvoiceIndexKey = "last_invoice_settled_index_cursor"
)
// Last watched invoice
func getLastSettledInvoiceIndex() uint64 {
var cursor uint64
err := db.DB.Redis.Do(radix.FlatCmd(&cursor, "GET", LastSettledInvoiceIndexKey))
if err != nil {
log.Fatal(err)
}
return cursor
}
func setLastSettledInvoiceIndex(index uint64) {
err := db.DB.Redis.Do(radix.FlatCmd(nil, "SET", LastSettledInvoiceIndexKey, index))
if err != nil {
log.Fatal(err)
}
}
// Runs in a goroutine and keep swatching invoices
func WatchInvoice() {
// Get the last invoice index cursor
cursor := getLastSettledInvoiceIndex()
log.Println(cursor)
ctxb := context.Background()
client, cleanUp := lndrpc.GetClient()
defer cleanUp()
subscription := &lnrpc.InvoiceSubscription{}
invoiceStream, err := client.SubscribeInvoices(ctxb, subscription)
if err != nil {
//return nil, err
}
for {
invoice, err := invoiceStream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("error invoice stream %s", err)
break
}
log.Printf("Received invoice \n%s\n", invoice)
// We are only interested in settled invoices
if !invoice.Settled {
continue
}
// Invoice expired
if expiredInvoice(invoice.CreationDate, invoice.Expiry) {
handleExpiredInvoice(invoice)
}
// Invoice was paid, we handle it
handleSettledInvoice(invoice)
}
}
//TODO
func handleExpiredInvoice(invoice *lnrpc.Invoice) {
// Update upload status to "expired"
uploadId, err := storage.GetUploadIdForInvoice(hex.EncodeToString(invoice.RHash))
if err != nil {
log.Fatal(fmt.Errorf("handleExpiredInvoice: %s", err))
}
err = storage.SetUploadStatus(uploadId, storage.UpPayExpired)
if err != nil {
log.Fatal(fmt.Errorf("handleExpiredInvoice %s", err))
}
// No need to store the invoice
// TODO
// TODO: flush all invoices and uploads
// TODO
}
func expiredInvoice(creation, expiry int64) bool {
createdAt := time.Unix(int64(creation), 0)
expiresAt := createdAt.Add(time.Second * time.Duration(expiry))
return time.Now().After(expiresAt)
}
func handleSettledInvoice(invoice *lnrpc.Invoice) {
// we need to save it and also notify the pubsub channel
// Save last settled index
setLastSettledInvoiceIndex(invoice.SettleIndex)
// Update upload status to "paid"
uploadId, err := storage.GetUploadIdForInvoice(hex.EncodeToString(invoice.RHash))
if err != nil {
log.Fatal(fmt.Errorf("handleSettledInvoice: %s", err))
}
err = storage.SetUploadStatus(uploadId, storage.UpPaid)
if err != nil {
log.Fatal(fmt.Errorf("handleSettledInvoice: %s", err))
}
// Get stored invoice for upload
storedInvoice, err := storage.GetUploadInvoice(uploadId)
if err != nil {
log.Fatal(fmt.Errorf("handleSettledInvoice: %s", err))
}
// Update stored invoice fields
newInvoice := ln.UpdateInvoiceFromLnd(storedInvoice, invoice)
// Set invoice for upload
err = storage.SetUploadInvoice(uploadId, newInvoice)
if err != nil {
log.Fatal(fmt.Errorf("handleSettledInvoice: %s", err))
}
newInvoiceJson, err := json.Marshal(newInvoice)
if err != nil {
log.Fatal(fmt.Errorf("handleSettledInvoice: %s", err))
}
// publish invoice was updated to upload_id_paid channel
log.Printf("Notifying invoice paid %s", uploadId)
key := fmt.Sprintf("%s_%s", bus.InvoicePaidChannelPrefix,
newInvoice.RHash)
err = db.DB.Redis.Do(radix.FlatCmd(nil, "PUBLISH",
key, newInvoiceJson))
}
Loading…
Cancel
Save