package watchers import ( "encoding/hex" "encoding/json" "fmt" "io" "log" "time" "git.sp4ke.com/sp4ke/bit4sat/bus" "git.sp4ke.com/sp4ke/bit4sat/db" "git.sp4ke.com/sp4ke/bit4sat/ln" "git.sp4ke.com/sp4ke/bit4sat/lndrpc" "git.sp4ke.com/sp4ke/bit4sat/storage" lnrpc "github.com/lightningnetwork/lnd/lnrpc" "github.com/mediocregopher/radix/v3" "golang.org/x/net/context" ) const ( LastSettledInvoiceIndexKey = "last_invoice_settled_index_cursor" ) // Last watched invoice func getLastSettledInvoiceIndex() uint64 { var cursor uint64 err := db.DB.Redis.Do(radix.FlatCmd(&cursor, "GET", LastSettledInvoiceIndexKey)) if err != nil { return uint64(0) } return cursor } func setLastSettledInvoiceIndex(index uint64) { err := db.DB.Redis.Do(radix.FlatCmd(nil, "SET", LastSettledInvoiceIndexKey, index)) if err != nil { panic(err) } } // Runs in a goroutine and keep swatching invoices func WatchInvoice() { // Get the last invoice index cursor cursor := getLastSettledInvoiceIndex() log.Printf("watching settled from index %d", cursor) ctxb := context.Background() client, cleanUp := lndrpc.GetClient() defer cleanUp() subscription := &lnrpc.InvoiceSubscription{ SettleIndex: cursor, } invoiceStream, err := client.SubscribeInvoices(ctxb, subscription) if err != nil { //return nil, err } for { invoice, err := invoiceStream.Recv() if err == io.EOF { break } if err != nil { log.Printf("error invoice stream %s", err) break } log.Printf("watcher, received invoice \n%s\n", invoice) // We are only interested in settled invoices if !invoice.Settled { continue } // Invoice expired if expiredInvoice(invoice.CreationDate, invoice.Expiry) { handleExpiredInvoice(invoice) } // Invoice was paid, we handle it handleSettledInvoice(invoice) } } //TODO func handleExpiredInvoice(invoice *lnrpc.Invoice) { // Update upload status to "expired" uploadId, err := storage.GetUploadIdForInvoice(hex.EncodeToString(invoice.RHash)) if err != nil { log.Fatal(fmt.Errorf("handleExpiredInvoice: %s", err)) } err = storage.SetUploadStatus(uploadId, storage.UpPayExpired) if err != nil { log.Fatal(fmt.Errorf("handleExpiredInvoice %s", err)) } // No need to store the invoice // TODO // TODO: flush all invoices and uploads // TODO } func expiredInvoice(creation, expiry int64) bool { createdAt := time.Unix(int64(creation), 0) expiresAt := createdAt.Add(time.Second * time.Duration(expiry)) return time.Now().After(expiresAt) } func handleSettledInvoice(invoice *lnrpc.Invoice) { // we need to save it and also notify the pubsub channel // Save last settled index setLastSettledInvoiceIndex(invoice.SettleIndex) // Update upload status to "paid" uploadId, err := storage.GetUploadIdForInvoice(hex.EncodeToString(invoice.RHash)) if err != nil { log.Fatal(fmt.Errorf("handleSettledInvoice: %s", err)) } err = storage.SetUploadStatus(uploadId, storage.UpPaid) if err != nil { log.Fatal(fmt.Errorf("handleSettledInvoice: %s", err)) } // Get stored invoice for upload storedInvoice, err := storage.GetUploadInvoice(uploadId) if err != nil { log.Fatal(fmt.Errorf("handleSettledInvoice: %s", err)) } // Update stored invoice fields newInvoice := ln.UpdateInvoiceFromLnd(storedInvoice, invoice) // Set invoice for upload err = storage.SetUploadInvoice(uploadId, newInvoice) if err != nil { log.Fatal(fmt.Errorf("handleSettledInvoice: %s", err)) } newInvoiceJson, err := json.Marshal(newInvoice) if err != nil { log.Fatal(fmt.Errorf("handleSettledInvoice: %s", err)) } // publish invoice was updated to upload_id_paid channel log.Printf("Notifying invoice paid %s", uploadId) key := fmt.Sprintf("%s:%s", bus.InvoicePaidChannelPrefix, newInvoice.RHash) err = db.DB.Redis.Do(radix.FlatCmd(nil, "PUBLISH", key, newInvoiceJson)) if err != nil { panic(err) } }