implemented invoice watcher

master
Chakib Benziane 5 years ago
parent bcd3160cfd
commit c1900d52a3

@ -32,7 +32,7 @@ func (api *API) Run() {
wsRoute.GET("/WSS", ws.Serve)
}
api.router.Run(":8880")
api.router.Run("0.0.0.0:8880")
}
func NewAPI() *API {

@ -0,0 +1,18 @@
package bus
// PubSub Update Types
const (
UpdatePaymentReceived = iota
)
// Channel names
const (
WatchInvoicesChannelName = "watch_invoices"
)
// Simple message used for update notifications in pub/sub
type UpdateMessage struct {
UploadId string
Status int // update type
Data interface{}
}

@ -25,8 +25,9 @@ var (
)
type Database struct {
Sql *sqlx.DB
Redis *radix.Pool
Sql *sqlx.DB
Redis *radix.Pool
RedisPubSub radix.PubSubConn
}
func (d *Database) Open() error {
@ -63,6 +64,8 @@ func (d *Database) Open() error {
log.Fatal(err)
}
d.RedisPubSub = radix.PersistentPubSub("tcp", "redis:6379", nil)
return nil
}

@ -15,72 +15,14 @@ import (
"github.com/gin-gonic/gin"
)
type Currency int
const (
CurMSat Currency = iota
CurSat
CurBTC
CurUSD
CurEur
)
const (
LNChargeAPIEnv = "LN_CHARGE_API"
LNChargeTokenEnv = "LN_CHARGE_TOKEN"
InfoEndpoint = "info"
InvoiceEndpoint = "invoice"
PollInvoiceEndpoint = "invoice"
)
var (
LNCEndpoint string
LNChargeToken string
Client *http.Client
)
var (
CurrencyString = map[Currency]string{
CurUSD: "USD",
CurSat: "SAT",
CurBTC: "BTC",
CurMSat: "MSAT",
CurEur: "EUR",
}
CurrencyID = map[string]Currency{
"USD": CurUSD,
"SAT": CurSat,
"MSAT": CurMSat,
"BTC": CurBTC,
"EUR": CurEur,
}
)
type Charge struct {
client *http.Client
}
func NewCharge() *Charge {
netTransport := &http.Transport{
Dial: (&net.Dialer{
Timeout: 5 * time.Second,
}).Dial,
TLSHandshakeTimeout: 5 * time.Second,
}
c := &http.Client{
Timeout: time.Second * 10,
Transport: netTransport,
}
return &Charge{
client: c,
}
}
func (c *Charge) Info() (gin.H, error) {
func Info() (gin.H, error) {
result := make(gin.H)
resp, err := c.client.Get(getUrl(InfoEndpoint))
resp, err := Client.Get(getUrl(InfoEndpoint))
if err != nil {
return nil, err
}
@ -91,14 +33,15 @@ func (c *Charge) Info() (gin.H, error) {
return result, nil
}
func (c *Charge) PollInvoice(id string) (*Invoice, error) {
// Shoudl be called in goroutine
func PollPaidInvoice(in *Invoice, paidChan chan<- *Invoice) {
reqParams := url.Values{}
// Timeout in seconds
reqParams.Set("timeout", strconv.Itoa(30))
reqURI := fmt.Sprintf("%s/%s/wait?%s",
getUrl(PollInvoiceEndpoint),
id,
in.Id,
reqParams.Encode())
log.Printf("polling to %s", reqURI)
@ -110,16 +53,16 @@ func (c *Charge) PollInvoice(id string) (*Invoice, error) {
log.Println("new poll")
resp, err = http.Get(reqURI)
if err != nil {
return nil, err
log.Printf("Error: ", err)
}
if resp.StatusCode == http.StatusPaymentRequired {
log.Printf("invoice %s not yet paid", in.Id)
continue
}
if resp.StatusCode == http.StatusGone {
log.Printf("invoice expired")
return nil, fmt.Errorf("invoice expired")
log.Printf("invoice expired ", in.Id)
}
if resp.StatusCode == http.StatusOK {
@ -129,8 +72,7 @@ func (c *Charge) PollInvoice(id string) (*Invoice, error) {
//unknownData := gin.H{}
//jsonDec := json.NewDecoder(resp.Body)
//jsonDec.Decode(&data)
log.Printf("unknown resopnse %s", resp.Status)
return nil, fmt.Errorf("unknown response %s", resp.Status)
log.Printf("InvoicePoll Error: unknown resopnse %s", resp.Status)
}
@ -139,10 +81,10 @@ func (c *Charge) PollInvoice(id string) (*Invoice, error) {
jsonDec.Decode(&invoice)
log.Printf("Invoice paid %s", invoice)
return &invoice, nil
paidChan <- &invoice
}
func (c *Charge) Invoice(amount float32, curr Currency, uploadId string) (*Invoice, error) {
func UploadInvoice(amount float32, curr Currency, uploadId string) (*Invoice, error) {
reqData := gin.H{
"description": fmt.Sprintf("bit4sat upload: %s", uploadId),
@ -161,7 +103,7 @@ func (c *Charge) Invoice(amount float32, curr Currency, uploadId string) (*Invoi
return nil, err
}
resp, err := c.client.Post(getUrl(InvoiceEndpoint),
resp, err := Client.Post(getUrl(InvoiceEndpoint),
"application/json",
bytes.NewReader(jsonEnc))
@ -203,3 +145,22 @@ func init() {
}
}
func newClient() *http.Client {
netTransport := &http.Transport{
Dial: (&net.Dialer{
Timeout: 5 * time.Second,
}).Dial,
TLSHandshakeTimeout: 5 * time.Second,
}
c := &http.Client{
Timeout: time.Second * 10,
Transport: netTransport,
}
return c
}
func init() {
Client = newClient()
}

@ -1,12 +1,48 @@
package ln
import (
"encoding/json"
"log"
"time"
)
type Currency int
const (
CurMSat Currency = iota
CurSat
CurBTC
CurUSD
CurEur
)
const (
LNChargeAPIEnv = "LN_CHARGE_API"
LNChargeTokenEnv = "LN_CHARGE_TOKEN"
InfoEndpoint = "info"
InvoiceEndpoint = "invoice"
PollInvoiceEndpoint = "invoice"
)
var (
LNCEndpoint string
LNChargeToken string
)
var (
CurrencyString = map[Currency]string{
CurUSD: "USD",
CurSat: "SAT",
CurBTC: "BTC",
CurMSat: "MSAT",
CurEur: "EUR",
}
"git.sp4ke.com/sp4ke/bit4sat/db"
"github.com/mediocregopher/radix/v3"
CurrencyID = map[string]Currency{
"USD": CurUSD,
"SAT": CurSat,
"MSAT": CurMSat,
"BTC": CurBTC,
"EUR": CurEur,
}
)
type Invoice struct {
@ -20,61 +56,3 @@ type Invoice struct {
Status string `json:"status"`
UploadId string `json:"upload_id"`
}
type InvoiceWatcher struct{}
func (w *InvoiceWatcher) Run() {
watchedKey := "watch_invoices"
for {
var invoiceBlob []byte
popTimeout := "0" //block until ready
err := db.DB.Redis.Do(radix.Cmd(&invoiceBlob,
"BLPOP",
watchedKey,
popTimeout))
if err != nil {
log.Println("error in InvoiceWatcher ", err)
continue
}
invoice := Invoice{}
err = json.Unmarshal(invoiceBlob, &invoice)
if err != nil {
log.Println("error unmarshaling in InvoiceWatcher ", err)
}
go func() {
log.Println("Starting goroutine to watch invoice")
time.Sleep(10 * time.Second)
//charge := ln.NewCharge()
// Blocking function
//rawInvoice, err := charge.PollInvoice(invoice.Id)
//if err != nil {
//return err
//}
//err = SetUploadStatus(uploadId, UpWaitingStorage)
//if err != nil {
//log.Printf("Error setting upload status", err)
//}
// Set paid upload invoice id
//err = SetPaidUploadInvoice(uploadId, invoice)
//if err != nil {
//return err
//}
}()
time.Sleep(1 * time.Second)
}
}
func NewInvoiceWatcher() *InvoiceWatcher {
return &InvoiceWatcher{}
}

@ -2,14 +2,14 @@ package main
import (
"git.sp4ke.com/sp4ke/bit4sat/db"
"git.sp4ke.com/sp4ke/bit4sat/ln"
"git.sp4ke.com/sp4ke/bit4sat/watchers"
)
func main() {
defer db.DB.Sql.Close()
api := NewAPI()
invoiceWatcher := ln.NewInvoiceWatcher()
invoiceWatcher := watchers.NewInvoiceWatcher()
go invoiceWatcher.Run()
api.Run()

@ -75,11 +75,8 @@ func (ctrl UploadCtrl) New(c *gin.Context) {
// TODO: check if upload fee is free
if true {
// Get a new lightning invoice
charge := ln.NewCharge()
// New invoice for 10$
invoice, err := charge.Invoice(100, ln.CurSat, uploadId)
invoice, err := ln.UploadInvoice(100, ln.CurSat, uploadId)
//info, err := ln.Info()
if err != nil {
@ -93,7 +90,7 @@ func (ctrl UploadCtrl) New(c *gin.Context) {
return
}
err = PushWatchUploadPayment(invoice)
err = PublishWatchUploadPayment(invoice)
if err != nil {
log.Println("error watching payment ", err)
utils.JSONErrPriv(c, http.StatusInternalServerError, err)

@ -7,6 +7,7 @@ import (
"fmt"
"log"
"git.sp4ke.com/sp4ke/bit4sat/bus"
"git.sp4ke.com/sp4ke/bit4sat/db"
"git.sp4ke.com/sp4ke/bit4sat/ln"
"github.com/jmoiron/sqlx"
@ -18,6 +19,7 @@ var DB = db.DB
const (
//TODO: sync upload status from redis
// TODO: status is currently handled in cache not here
DBUploadSchema = `
CREATE TABLE IF NOT EXISTS upload (
id serial PRIMARY KEY,
@ -129,34 +131,14 @@ func SetUploadInvoice(uploadId string, invoice *ln.Invoice) error {
return DB.Redis.Do(radix.FlatCmd(nil, "SET", key, invoiceJson))
}
func PushWatchUploadPayment(invoice *ln.Invoice) error {
key := "watch_invoices"
func PublishWatchUploadPayment(invoice *ln.Invoice) error {
invoiceJson, err := json.Marshal(invoice)
if err != nil {
return err
}
return DB.Redis.Do(radix.FlatCmd(nil, "RPUSH", key, invoiceJson))
//log.Printf("Will watch upload payment for uID %s", uploadId)
//charge := ln.NewCharge()
//invoice, err := charge.PollInvoice(invoiceId)
//if err != nil {
//return err
//}
//err = SetUploadStatus(uploadId, UpWaitingStorage)
//if err != nil {
//return err
//}
//// Set paid upload invoice id
//err = SetPaidUploadInvoice(uploadId, invoice)
//if err != nil {
//return err
//}
return DB.Redis.Do(radix.FlatCmd(nil, "PUBLISH", bus.WatchInvoicesChannelName, invoiceJson))
//TODO: Notify client on websocket

@ -0,0 +1,79 @@
package watchers
import (
"encoding/json"
"fmt"
"log"
"git.sp4ke.com/sp4ke/bit4sat/bus"
"git.sp4ke.com/sp4ke/bit4sat/db"
"git.sp4ke.com/sp4ke/bit4sat/ln"
"git.sp4ke.com/sp4ke/bit4sat/storage"
"github.com/mediocregopher/radix/v3"
)
type InvoiceWatcher struct {
listenWatchInvoice chan radix.PubSubMessage
invoicePaid chan *ln.Invoice
}
func (w *InvoiceWatcher) Run() {
if err := db.DB.RedisPubSub.Subscribe(w.listenWatchInvoice,
bus.WatchInvoicesChannelName); err != nil {
panic(err)
}
for {
select {
case msg := <-w.listenWatchInvoice:
log.Printf("checking new invoice %s", msg.Message)
invoice := ln.Invoice{}
err := json.Unmarshal(msg.Message, &invoice)
if err != nil {
panic(err)
}
go ln.PollPaidInvoice(&invoice, w.invoicePaid)
case invoice := <-w.invoicePaid:
// Set upload status to UpWaitingStorage (paid)
err := storage.SetUploadStatus(invoice.UploadId, storage.UpWaitingStorage)
if err != nil {
panic(err)
}
// Update Upload Invoice
err = storage.SetUploadInvoice(invoice.UploadId, invoice)
if err != nil {
panic(err)
}
// publish invoice was updated to upload_id_paid channel
updateMsg := bus.UpdateMessage{}
updateMsg.Status = bus.UpdatePaymentReceived
updateMsg.UploadId = invoice.UploadId
updateMsg.Data = invoice
updateMsgJson, err := json.Marshal(updateMsg)
if err != nil {
panic(err)
}
log.Printf("Notifying upload %s paid", invoice.UploadId)
key := fmt.Sprintf("upload_update_%s", invoice.UploadId)
err = db.DB.Redis.Do(radix.FlatCmd(nil, "PUBLISH",
key, updateMsgJson))
}
}
}
func NewInvoiceWatcher() *InvoiceWatcher {
return &InvoiceWatcher{
listenWatchInvoice: make(chan radix.PubSubMessage),
invoicePaid: make(chan *ln.Invoice),
}
}

@ -1,19 +1,22 @@
#172.29.195.31
localhost
root dist
ext .html
header / {
Content-Security-Policy "default-src * 'unsafe-eval' 'unsafe-inline' data:;"
Content-Security-Policy "default-src * 'unsafe-eval' 'unsafe-inline' data: ;"
}
proxy /api localhost:8880
log /api stdout
#log /api stdout
log / stdout
proxy /ws localhost:8880 {
websocket
transparent
}
proxy /wss localhost:8880
#tls self_signed

@ -57,9 +57,13 @@ export class WS {
let msg = JSON.parse(ev.data);
switch (msg.type) {
case "hello":
case 'hello':
console.log(msg.data);
break;
case 'upload-accepted':
console.log(msg)
break;
}
}

@ -5,7 +5,6 @@ import App from './App.vue';
import GetWorker from './workerInterface.js';
import { WS } from './Websocket.js';
window.ws = new WS()
window.app = new Vue({

@ -2,9 +2,20 @@
//to keep the UI free
import Api from './api.js'
import { WS } from './Websocket.js';
self.websocket = new WS()
const name = 'main'
// TODO
//if (window.isSecureContext) {
//// Page is a secure context so service workers are now available
//navigator.serviceWorker.register("/offline-worker.js").then(function () {
//...
//});
//}
function hexString(buffer) {
const byteArray = new Uint8Array(buffer);
@ -50,10 +61,15 @@ async function newUpload(files){
let upload = new Api.Upload(filesMetadata, files)
// Ask permission to send a new upload
await upload.create()
.then(data => {
// Get the upload id
let { result: { upload_id: id, invoice: invoice } } = data
let { result: { upload_id: id, invoice: invoice } } = data;
// Register to updates on our uploadId from websocket
//self.websocket.listenTo('upload-accepted')
//postMessage({
//msg: 'upload-invoice',
@ -64,6 +80,9 @@ async function newUpload(files){
// Notify the UI
postMessage({msg: 'upload-invoice', id: id, invoice: invoice})
// Send the files
//upload.send()
//.then(resp =>{

@ -43,6 +43,8 @@ type Client struct {
// Buffered channel of outbound messages
send chan []byte
// Listen to updates from redis
}
func (c *Client) readPump() {
@ -72,7 +74,6 @@ func (c *Client) readPump() {
}
func (c *Client) writePump() {
//testTicker := time.NewTicker(5 * time.Second)
pingTicker := time.NewTicker(pingPeriod)
defer func() {
log.Println("quit ws server")
@ -81,6 +82,8 @@ func (c *Client) writePump() {
c.conn.Close()
}()
// Subscribe to upload notifications
for {
select {
//case <-testTicker.C:

Loading…
Cancel
Save