|
|
|
package watchers
|
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/hex"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"log"
|
|
|
|
"regexp"
|
|
|
|
|
|
|
|
"git.sp4ke.com/sp4ke/bit4sat/bus"
|
|
|
|
"git.sp4ke.com/sp4ke/bit4sat/db"
|
|
|
|
"git.sp4ke.com/sp4ke/bit4sat/ln"
|
|
|
|
"git.sp4ke.com/sp4ke/bit4sat/lndrpc"
|
|
|
|
"git.sp4ke.com/sp4ke/bit4sat/storage"
|
|
|
|
lnrpc "github.com/lightningnetwork/lnd/lnrpc"
|
|
|
|
"github.com/mediocregopher/radix/v3"
|
|
|
|
"golang.org/x/net/context"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
LastSettledInvoiceIndexKey = "last_invoice_settled_index_cursor"
|
|
|
|
ReUploadInvoice = `bit4sat upload`
|
|
|
|
ReDownInvoice = `bit4sat download`
|
|
|
|
)
|
|
|
|
|
|
|
|
// Last watched invoice
|
|
|
|
func getLastSettledInvoiceIndex() uint64 {
|
|
|
|
var cursor uint64
|
|
|
|
|
|
|
|
err := db.DB.Redis.Do(radix.FlatCmd(&cursor, "GET", LastSettledInvoiceIndexKey))
|
|
|
|
if err != nil {
|
|
|
|
return uint64(0)
|
|
|
|
}
|
|
|
|
|
|
|
|
return cursor
|
|
|
|
}
|
|
|
|
|
|
|
|
func setLastSettledInvoiceIndex(index uint64) {
|
|
|
|
err := db.DB.Redis.Do(radix.FlatCmd(nil, "SET", LastSettledInvoiceIndexKey, index))
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Runs in a goroutine and keep swatching invoices
|
|
|
|
func WatchInvoice() {
|
|
|
|
|
|
|
|
// Get the last invoice index cursor
|
|
|
|
cursor := getLastSettledInvoiceIndex()
|
|
|
|
|
|
|
|
log.Printf("watching settled from index %d", cursor)
|
|
|
|
|
|
|
|
ctxb := context.Background()
|
|
|
|
client, cleanUp := lndrpc.GetClient()
|
|
|
|
defer cleanUp()
|
|
|
|
|
|
|
|
subscription := &lnrpc.InvoiceSubscription{
|
|
|
|
SettleIndex: cursor,
|
|
|
|
}
|
|
|
|
|
|
|
|
invoiceStream, err := client.SubscribeInvoices(ctxb, subscription)
|
|
|
|
if err != nil {
|
|
|
|
//return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
for {
|
|
|
|
invoice, err := invoiceStream.Recv()
|
|
|
|
if err == io.EOF {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("error invoice stream %s", err)
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Printf("watcher, received invoice \n%s\n", invoice)
|
|
|
|
|
|
|
|
// We are only interested in settled invoices
|
|
|
|
if !invoice.Settled {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// Invoice was paid, we handle it
|
|
|
|
handleSettledInvoice(invoice)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func handleSettledInvoice(invoice *lnrpc.Invoice) {
|
|
|
|
|
|
|
|
// we need to save it and also notify the pubsub channel
|
|
|
|
|
|
|
|
// Save last settled index
|
|
|
|
setLastSettledInvoiceIndex(invoice.SettleIndex)
|
|
|
|
|
|
|
|
matchUp, err := regexp.MatchString(ReUploadInvoice, invoice.Memo)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("Error regex match: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
matchDown, err := regexp.MatchString(ReDownInvoice, invoice.Memo)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("Error regex match: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Handle upload related invoices
|
|
|
|
if matchUp {
|
|
|
|
|
|
|
|
// Update upload status to "paid"
|
|
|
|
uploadId, err := storage.GetUploadIdForInvoice(hex.EncodeToString(invoice.RHash))
|
|
|
|
log.Printf("watcher: found upload id %s for invoice %s", uploadId,
|
|
|
|
hex.EncodeToString(invoice.RHash))
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("error handleSettledInvoice: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
err = storage.SetUploadStatus(uploadId, storage.UpPaid)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("error handleSettledInvoice: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get stored invoice for upload
|
|
|
|
storedInvoice, err := storage.GetUploadInvoice(uploadId)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("error handleSettledInvoice: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
//log.Printf("stored invoice %#v", storedInvoice)
|
|
|
|
|
|
|
|
// Update stored invoice fields
|
|
|
|
newInvoice := ln.UpdateInvoiceFromLnd(storedInvoice, invoice)
|
|
|
|
|
|
|
|
//log.Printf("new invoice %#v", newInvoice)
|
|
|
|
|
|
|
|
// Set invoice for upload
|
|
|
|
err = storage.SetUploadInvoice(uploadId, newInvoice)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("error handleSettledInvoice: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Printf("Notifying invoice paid for upload %s", uploadId)
|
|
|
|
// publish invoice was updated to upload_id_paid channel
|
|
|
|
key := fmt.Sprintf("%s:%s", bus.InvoicePaidChannelPrefix,
|
|
|
|
newInvoice.RHash)
|
|
|
|
|
|
|
|
err = db.DB.Redis.Do(radix.FlatCmd(nil, "PUBLISH",
|
|
|
|
key, newInvoice))
|
|
|
|
}
|
|
|
|
|
|
|
|
// This is a download invoice
|
|
|
|
if matchDown {
|
|
|
|
|
|
|
|
// Store the full invoice
|
|
|
|
key := fmt.Sprintf("invoice_%s", hex.EncodeToString(invoice.RHash))
|
|
|
|
storedInvoice := ln.InvoiceFromLndIn(invoice)
|
|
|
|
err = db.DB.Redis.Do(radix.FlatCmd(nil, "SET", key, storedInvoice))
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("error handleSettledInvoice: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get upload_id related to this invoice
|
|
|
|
key = fmt.Sprintf("invoice_id_%s_upload_id", storedInvoice.RHash)
|
|
|
|
//log.Printf("looking for %s", key)
|
|
|
|
var uploadId string
|
|
|
|
err := db.DB.Redis.Do(radix.FlatCmd(&uploadId, "GET", key))
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("error handleSettledInvoice: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Add payment to the corresponding upload_id (user account)
|
|
|
|
//log.Printf("updating payment for upload %s", uploadId)
|
|
|
|
err = storage.AddPaymentToUpload(uploadId, storedInvoice.RHash,
|
|
|
|
storedInvoice.Msatoshi)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("error handleSettledInvoice: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Printf("Notifying invoice paid for download")
|
|
|
|
key = fmt.Sprintf("%s:%s", bus.InvoicePaidChannelPrefix,
|
|
|
|
storedInvoice.RHash)
|
|
|
|
|
|
|
|
err = db.DB.Redis.Do(radix.FlatCmd(nil, "PUBLISH",
|
|
|
|
key, storedInvoice))
|
|
|
|
}
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("error handleSettledInvoice: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|