Using callback no need for watchers

master
Chakib Benziane 5 years ago
parent e4ca0e2701
commit 89fc1b2894

@ -0,0 +1,80 @@
package api
import (
"encoding/json"
"fmt"
"log"
"net/http"
"git.sp4ke.com/sp4ke/bit4sat/bus"
"git.sp4ke.com/sp4ke/bit4sat/db"
"git.sp4ke.com/sp4ke/bit4sat/ln"
"git.sp4ke.com/sp4ke/bit4sat/storage"
"git.sp4ke.com/sp4ke/bit4sat/utils"
"github.com/gin-gonic/gin"
"github.com/mediocregopher/radix/v3"
)
func invoiceCbHandler(c *gin.Context) {
invoice := ln.Invoice{}
//dec := json.Decoder(c.Request.Body)
if err := c.ShouldBindJSON(&invoice); err != nil {
log.Println(err)
utils.JSONErr(c, http.StatusNotAcceptable,
"callback: could not parse request from charge api")
return
}
log.Printf("received invoice paid from ln charge\n%s", invoice)
c.Status(http.StatusOK)
// get upload id related to invoice
var uploadId string
invoiceUploadKey := fmt.Sprintf("invoice_%s_upload", invoice.Id)
err := db.DB.Redis.Do(radix.FlatCmd(&uploadId, "GET", invoiceUploadKey))
if err != nil {
panic(err)
}
if uploadId == "" {
log.Printf("cannot find uploadId for invoice %s", invoice)
return
}
invoice.UploadId = uploadId
//
// Set upload status to UpWaitingStorage (paid)
err = storage.SetUploadStatus(invoice.UploadId, storage.UpWaitingStorage)
if err != nil {
panic(err)
}
//
// Update Upload Invoice
err = storage.SetUploadInvoice(invoice.UploadId, &invoice)
if err != nil {
panic(err)
}
// publish invoice was updated to upload_id_paid channel
msg := bus.Message{}
msg.Type = bus.PaymentReceived
msg.UploadId = invoice.UploadId
msg.Data = gin.H{
"invoice": invoice,
}
msgJson, err := json.Marshal(msg)
if err != nil {
panic(err)
}
log.Printf("Notifying upload paid %s", invoice.UploadId)
key := fmt.Sprintf("%s_%s", bus.UploadUpdateChannelPrefix,
invoice.UploadId)
err = db.DB.Redis.Do(radix.FlatCmd(nil, "PUBLISH",
key, msgJson))
return
}

@ -1,6 +1,7 @@
package main
package api
import (
"git.sp4ke.com/sp4ke/bit4sat/config"
"git.sp4ke.com/sp4ke/bit4sat/storage"
"git.sp4ke.com/sp4ke/bit4sat/ws"
"github.com/gin-contrib/cors"
@ -21,20 +22,17 @@ func (api *API) Run() {
uploadRoute := api.router.Group("/api/upload")
{
uploadRoute.POST("", UploadCtrl.New)
uploadRoute.PUT(":id", UploadCtrl.Upload)
}
wsRoute := api.router.Group("/ws")
{
wsRoute.GET("/", ws.Serve)
wsRoute.GET("/WS", ws.Serve)
wsRoute.GET("/WSS", ws.Serve)
}
// Websocket server
api.router.GET("/ws", ws.Serve)
// LN charge callback
api.router.POST("/"+config.ChargeCallbackEndpoint, invoiceCbHandler)
api.router.Run("0.0.0.0:8880")
api.router.Run(config.ApiListen)
}
func NewAPI() *API {
@ -43,7 +41,7 @@ func NewAPI() *API {
// Setup Session
sessionStore, err := redis.NewStore(10, "tcp", "redis:6379",
"", []byte(SessionSecret))
"", []byte(config.SessionSecret))
if err != nil {
panic(err)
}

@ -0,0 +1,49 @@
package config
import (
"fmt"
"github.com/namsral/flag"
)
const (
ChargeCallbackEndpoint = "chargecallback"
)
var (
StoragePath,
ApiPort,
ApiInterface,
ApiListen,
ApiHost,
SessionSecret,
LnChargeApi,
LnChargeToken,
SqlDbHost,
SqlDbUser,
SqlDbPass string
)
func init() {
// Storage
flag.StringVar(&StoragePath, "bit4sat-storage-path", "file-storage", "base file storage path")
// Api
flag.StringVar(&ApiPort, "api-port", "8880", "api port number")
flag.StringVar(&ApiInterface, "api-interface", "", "api listening interface")
flag.StringVar(&ApiHost, "api-host", "", "reachable hostname for api (used by callbacks)")
ApiListen = fmt.Sprintf("%s:%s", ApiInterface, ApiPort)
flag.StringVar(&SessionSecret, "session-secret", "default-secret", "cookie sessions secret")
// Database
flag.StringVar(&SqlDbHost, "sql-db-host", "bit4sat", "sql db hostname")
flag.StringVar(&SqlDbUser, "sql-db-user", "bit4sat", "sql db username")
flag.StringVar(&SqlDbPass, "sql-db-pass", "bit4sat", "sql db pass")
// LN Charge
flag.StringVar(&LnChargeApi, "ln-charge-api", "", "ln charge api endpoint")
flag.StringVar(&LnChargeToken, "ln-charge-token", "", "ln charge api token")
flag.Parse()
}

@ -3,21 +3,17 @@ package db
import (
"fmt"
"log"
"os"
"git.sp4ke.com/sp4ke/bit4sat/config"
_ "github.com/lib/pq"
//_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"github.com/mediocregopher/radix/v3"
)
const (
//DBName = "bit4sat.sqlite"
DBName = "bit4sat"
DBHostEnv = "DB_HOST"
DBUserEnv = "DB_USER"
DBPassEnv = "DB_PASS"
//DBPragma = ` PRAGMA foreign_keys = ON; `
DBName = "bit4sat"
)
var (
@ -33,23 +29,11 @@ type Database struct {
func (d *Database) Open() error {
var err error
// Get env vars
host, set := os.LookupEnv(DBHostEnv)
if !set {
log.Fatal("undefined DB_HOST env")
}
user, set := os.LookupEnv(DBUserEnv)
if !set {
log.Fatal("undefined DB_USER env")
}
pass, set := os.LookupEnv(DBPassEnv)
if !set {
log.Fatal("undefined DB_PASS env")
}
dsn := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable", user, pass, host, DBName)
dsn := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable",
config.SqlDbUser,
config.SqlDbPass,
config.SqlDbHost,
DBName)
log.Printf("Opening SQL at %s\n", dsn)

@ -11,14 +11,16 @@ volumes:
services:
app:
image: sp4ke/bit4sat
container_name: bit4sat-api
build: ./docker
environment:
- GO111MODULE=on
- API_HOST=bit4sat-api
- BIT4SAT_STORAGE_PATH=/storage
- GOPATH=/go
- DB_HOST=postgres
- DB_USER=bit4sat
- DB_PASS=bit4sat
- SQL_DB_HOST=postgres
- SQL_DB_USER=bit4sat
- SQL_DB_PASS=bit4sat
- LN_CHARGE_API=ln-charge-test:9112
- LN_CHARGE_TOKEN=3emU3Fy8VasHCzMaMXHSVJYpQSqH3yXQj8N5cQFBbq3botrudJuR7zQkBBmFSbAmgXs9GD4j4U3J4R2sMfgqPo8q
- SESSION_SECRET=Ai7fCy36UE5cb9wcmdAxxRXwYyQDsDMr6rYocA6Eava7pdiB29EusLbb9sTYWS1e

@ -15,6 +15,7 @@ require (
github.com/mattn/go-isatty v0.0.7 // indirect
github.com/mattn/go-sqlite3 v1.10.0
github.com/mediocregopher/radix/v3 v3.2.3
github.com/namsral/flag v1.7.4-pre
github.com/segmentio/ksuid v1.0.2
github.com/stretchr/testify v1.3.0 // indirect
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf

@ -62,6 +62,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/namsral/flag v1.7.4-pre h1:b2ScHhoCUkbsq0d2C15Mv+VU8bl8hAXV8arnWiOHNZs=
github.com/namsral/flag v1.7.4-pre/go.mod h1:OXldTctbM6SWH1K899kPZcf65KxJiD7MsceFUpB5yDo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/quasoft/memstore v0.0.0-20180925164028-84a050167438/go.mod h1:wTPjTepVu7uJBYgZ0SdWHQlIas582j6cn2jgk4DDdlg=

@ -8,10 +8,10 @@ import (
"net"
"net/http"
"net/url"
"os"
"strconv"
"time"
"git.sp4ke.com/sp4ke/bit4sat/config"
"github.com/gin-gonic/gin"
)
@ -87,8 +87,12 @@ func PollPaidInvoice(in *Invoice, paidChan chan<- *Invoice) {
func UploadInvoice(amount float32, curr Currency, uploadId string) (*Invoice, error) {
webhookUrl := fmt.Sprintf("http://%s:%s/%s", config.ApiHost, config.ApiPort,
config.ChargeCallbackEndpoint)
reqData := gin.H{
"description": fmt.Sprintf("bit4sat upload: %s", uploadId),
"webhook": webhookUrl,
}
// If Satoshi convert to millisat
@ -124,8 +128,8 @@ func UploadInvoice(amount float32, curr Currency, uploadId string) (*Invoice, er
func getUrl(endpoint string) string {
url, err := url.Parse(fmt.Sprintf("http://api-token:%s@%s/%s",
LNChargeToken,
LNCEndpoint,
config.LnChargeToken,
config.LnChargeApi,
endpoint))
if err != nil {
log.Fatal(err)
@ -135,20 +139,6 @@ func getUrl(endpoint string) string {
}
func init() {
var set bool
LNCEndpoint, set = os.LookupEnv(LNChargeAPIEnv)
if !set {
log.Fatalf("%s not set", LNChargeAPIEnv)
}
LNChargeToken, set = os.LookupEnv(LNChargeTokenEnv)
if !set {
log.Fatalf("%s not set", LNChargeAPIEnv)
}
}
func newClient() *http.Client {
netTransport := &http.Transport{
Dial: (&net.Dialer{

@ -1,6 +1,8 @@
package ln
import (
"fmt"
"strconv"
"time"
)
@ -15,18 +17,11 @@ const (
)
const (
LNChargeAPIEnv = "LN_CHARGE_API"
LNChargeTokenEnv = "LN_CHARGE_TOKEN"
InfoEndpoint = "info"
InvoiceEndpoint = "invoice"
PollInvoiceEndpoint = "invoice"
)
var (
LNCEndpoint string
LNChargeToken string
)
var (
CurrencyString = map[Currency]string{
CurUSD: "USD",
@ -45,17 +40,35 @@ var (
}
)
type timestamp time.Time
func (t *timestamp) UnmarshalJSON(in []byte) error {
val, err := strconv.Atoi(string(in))
if err != nil {
return fmt.Errorf("cannot unmarshal timestamp int")
}
parsedTime := time.Unix(int64(val), 0)
*t = timestamp(parsedTime)
return nil
}
func (t timestamp) String() string {
return time.Time(t).Format(time.RFC3339)
}
type Invoice struct {
Id string `json:"id"`
CreatedAt time.Time `json:"created_at"`
UploadId string `json:"upload_id"`
Description string `json:"description"`
ExpiresAt time.Time `json:"expires_at"`
Msatoshi string `json:"msatoshi"`
Payreq string `json:"payreq"`
RHash string `json:"rhash"`
Status string `json:"status"`
QuotedCurrency string `json:"quoted_currency"`
QuotedAmount string `json:"quoted_amount"`
PaidAt time.Time `json:"paid_at"`
UploadId string `json:"upload_id"`
PaidAt timestamp `json:"paid_at"`
CreatedAt timestamp `json:"created_at"`
ExpiresAt timestamp `json:"expires_at"`
}

@ -1,17 +1,16 @@
package main
import (
"git.sp4ke.com/sp4ke/bit4sat/api"
"git.sp4ke.com/sp4ke/bit4sat/db"
"git.sp4ke.com/sp4ke/bit4sat/watchers"
)
func main() {
defer db.DB.Sql.Close()
api := NewAPI()
invoiceWatcher := watchers.NewInvoiceWatcher()
go invoiceWatcher.Run()
api := api.NewAPI()
//invoiceWatcher := watchers.NewInvoiceWatcher()
//go invoiceWatcher.Run()
api.Run()
}

@ -1,24 +0,0 @@
package main
import (
"log"
"os"
)
const (
SessionSecretEnv = "SESSION_SECRET"
)
var (
SessionSecret string
)
func init() {
var set bool
SessionSecret, set = os.LookupEnv(SessionSecretEnv)
if !set {
log.Fatalf("%s not set", SessionSecretEnv)
}
}

@ -9,20 +9,12 @@ import (
"os"
"path/filepath"
"git.sp4ke.com/sp4ke/bit4sat/config"
"git.sp4ke.com/sp4ke/bit4sat/utils"
)
const (
StoragePath = "file-storage"
StoragePathEnv = "BIT4SAT_STORAGE_PATH"
)
var (
RuntimeStoragePath string
)
func GetStoreDestination(filename string) string {
return filepath.Join(RuntimeStoragePath, filename)
return filepath.Join(config.StoragePath, filename)
}
func StoreFormFile(src multipart.File, filename string) error {
@ -68,14 +60,7 @@ func CheckFileExists(file string) (bool, error) {
}
func init() {
path, set := os.LookupEnv(StoragePathEnv)
if !set {
path = StoragePath
}
RuntimeStoragePath = path
err := utils.Mkdir(path)
err := utils.Mkdir(config.StoragePath)
if err != nil {
log.Fatal(err)
}

@ -95,24 +95,25 @@ func (ctrl UploadCtrl) New(c *gin.Context) {
return
}
err = SetUploadStatus(uploadId, UpNew)
if err != nil {
utils.JSONErrPriv(c, http.StatusInternalServerError, err)
return
}
// If not free upload generate a an invoice
// TODO: check if upload fee is free
if true {
// New invoice for 10$
invoice, err := ln.UploadInvoice(100, ln.CurSat, uploadId)
if err != nil {
utils.JSONErrPriv(c, http.StatusInternalServerError, err)
return
}
invoice.UploadId = uploadId
//info, err := ln.Info()
// Update Upload Invoice
err = SetUploadInvoice(invoice.UploadId, invoice)
if err != nil {
utils.JSONErrPriv(c, http.StatusInternalServerError, err)
return
}
err = SetUploadStatus(uploadId, UpWaitingPayment)
@ -123,12 +124,12 @@ func (ctrl UploadCtrl) New(c *gin.Context) {
// Start watching upload payment status for this client
err = WatchUploadPayment(invoice)
if err != nil {
log.Println("error watching payment ", err)
utils.JSONErrPriv(c, http.StatusInternalServerError, err)
return
}
//err = WatchUploadPayment(invoice)
//if err != nil {
//log.Println("error watching payment ", err)
//utils.JSONErrPriv(c, http.StatusInternalServerError, err)
//return
//}
result := gin.H{
"upload_id": uploadId,
@ -142,10 +143,9 @@ func (ctrl UploadCtrl) New(c *gin.Context) {
return
// Handle free uploads
} else {
//Else, free upload accepted
log.Println("new upload created")
c.JSON(http.StatusOK, gin.H{
"status": "ready for upload",

@ -121,14 +121,21 @@ func SetUploadStatus(id string, status int) error {
}
func SetUploadInvoice(uploadId string, invoice *ln.Invoice) error {
key := fmt.Sprintf("upload_%s_invoice", uploadId)
uploadInvoiceKey := fmt.Sprintf("upload_%s_invoice", uploadId)
invoiceJson, err := json.Marshal(invoice)
if err != nil {
return err
}
return DB.Redis.Do(radix.FlatCmd(nil, "SET", key, invoiceJson))
err = DB.Redis.Do(radix.FlatCmd(nil, "SET", uploadInvoiceKey, invoiceJson))
if err != nil {
return err
}
// Set inverse relation
invoiceUploadKey := fmt.Sprintf("invoice_%s_upload", invoice.Id)
return DB.Redis.Do(radix.FlatCmd(nil, "SET", invoiceUploadKey, uploadId))
}
func WatchUploadPayment(invoice *ln.Invoice) error {

@ -1,85 +0,0 @@
package watchers
import (
"encoding/json"
"fmt"
"log"
"git.sp4ke.com/sp4ke/bit4sat/bus"
"git.sp4ke.com/sp4ke/bit4sat/db"
"git.sp4ke.com/sp4ke/bit4sat/ln"
"git.sp4ke.com/sp4ke/bit4sat/storage"
"github.com/gin-gonic/gin"
"github.com/mediocregopher/radix/v3"
)
type InvoiceWatcher struct {
listenWatchInvoice chan radix.PubSubMessage
invoicePaid chan *ln.Invoice
}
func (w *InvoiceWatcher) Run() {
if err := db.DB.RedisPubSub.Subscribe(w.listenWatchInvoice,
bus.WatchInvoicesChannelName); err != nil {
panic(err)
}
for {
select {
case msg := <-w.listenWatchInvoice:
log.Printf("checking new invoice %s", msg.Message)
invoice := ln.Invoice{}
err := json.Unmarshal(msg.Message, &invoice)
if err != nil {
panic(err)
}
//TODO: kill goroutine if listening websocket dies
go ln.PollPaidInvoice(&invoice, w.invoicePaid)
case invoice := <-w.invoicePaid:
log.Printf("recevied update for invoice <%s>", invoice.UploadId)
// Set upload status to UpWaitingStorage (paid)
err := storage.SetUploadStatus(invoice.UploadId, storage.UpWaitingStorage)
if err != nil {
panic(err)
}
// Update Upload Invoice
err = storage.SetUploadInvoice(invoice.UploadId, invoice)
if err != nil {
panic(err)
}
// publish invoice was updated to upload_id_paid channel
msg := bus.Message{}
msg.Type = bus.PaymentReceived
msg.UploadId = invoice.UploadId
msg.Data = gin.H{
"invoice": invoice,
}
msgJson, err := json.Marshal(msg)
if err != nil {
panic(err)
}
log.Printf("Notifying upload paid %s", invoice.UploadId)
key := fmt.Sprintf("%s_%s", bus.UploadUpdateChannelPrefix,
invoice.UploadId)
err = db.DB.Redis.Do(radix.FlatCmd(nil, "PUBLISH",
key, msgJson))
}
}
}
func NewInvoiceWatcher() *InvoiceWatcher {
return &InvoiceWatcher{
listenWatchInvoice: make(chan radix.PubSubMessage),
invoicePaid: make(chan *ln.Invoice),
}
}

@ -10,7 +10,7 @@ export class WS {
return singleton;
}
const endpoint = `ws://localhost:2015/ws/`;
const endpoint = `ws://localhost:2015/ws`;
this.conn = new WebSocket(endpoint);
this.conn.onerror = this.onerror();
this.conn.onclose = this.onclose();
@ -43,10 +43,10 @@ export class WS {
return (ev) => {
console.warn("websocket closed");
//TODO: restore on prod
//setTimeout(() =>{
//console.log("websocket: trying to reconnect")
//instance = new WS()
//}, 5000)
setTimeout(() =>{
console.log("websocket: trying to reconnect")
instance = new WS()
}, 3000)
}
}

@ -19,10 +19,10 @@ import (
const (
// Time allowed to write message to peer
writeWait = 10 * time.Second
writeWait = 30 * time.Second
// Time allowed to read the next pong message from the client.
pongWait = 5 * time.Second
pongWait = 60 * time.Second
// Send pings to client with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
@ -101,8 +101,7 @@ func (c *Client) readPump() {
func (c *Client) writePump() {
pingTicker := time.NewTicker(pingPeriod)
defer func() {
log.Println("quit ws server")
log.Println("websocket closing")
pingTicker.Stop()
c.conn.Close()
}()
@ -126,14 +125,15 @@ func (c *Client) writePump() {
for {
select {
case msg := <-c.subChannel:
log.Printf("received msg %s on socket main channel", msg)
//log.Printf("received msg %s on socket main channel", msg)
jsonMsg := bus.Message{}
if err := json.Unmarshal(msg.Message, &jsonMsg); err != nil {
log.Printf("ws error reading from pubsub: %s", err)
continue
break
}
if jsonMsg.Type == bus.SetUploadId {
log.Printf("registering uploadId: %s to socket", jsonMsg.UploadId)
c.uploadId = jsonMsg.UploadId
}
@ -202,32 +202,6 @@ func (c *Client) writePump() {
}
//case <-testTicker.C:
//c.conn.SetWriteDeadline(time.Now().Add(writeWait))
//msg := gin.H{
//"msg": "hello",
//"data": "from server",
//}
//w, err := c.conn.NextWriter(websocket.TextMessage)
//if err != nil {
//log.Println(err)
//return
//}
//enc := json.NewEncoder(w)
//err = enc.Encode(msg)
//if err != nil {
//log.Println(err)
//return
//}
//if err = w.Close(); err != nil {
//log.Println(err)
//return
//}
case <-pingTicker.C:
//log.Println("ping")
@ -241,24 +215,39 @@ func (c *Client) writePump() {
}
func Serve(c *gin.Context) {
var err error
log.Println("websocket request")
session := sessions.Default(c)
// Create unique id for this session in order to store it
// in the websocket bus
// First check if cookie is already set with upload id in case this is a
// reconnection
//
var finalSessId ksuid.KSUID
socketSessionId, err := ksuid.NewRandomWithTime(time.Now())
if err != nil {
log.Println("error generating ksuid for websocket")
}
log.Printf("using socket id: %s", socketSessionId)
socketSessionId := session.Get(WebsocketIdName)
if socketSessionId == nil {
session.Set(WebsocketIdName, socketSessionId.String())
session.Save()
// Create unique id for this session in order to store it
// in the websocket bus
finalSessId, err = ksuid.NewRandomWithTime(time.Now())
if err != nil {
log.Println("error generating ksuid for websocket")
return
}
log.Printf("Writing socket session id to header: %s", c.Writer.Header())
log.Printf("using socket id: %s", finalSessId)
session.Set(WebsocketIdName, finalSessId.String())
session.Save()
log.Printf("Writing socket session id to header: %s", c.Writer.Header())
} else {
// Reuse socket session id
log.Printf("reusing websocket id %s", socketSessionId)
finalSessId, err = ksuid.Parse(socketSessionId.(string))
if err != nil {
log.Println("could not parse websocket session id")
}
}
conn, err := upgrader.Upgrade(c.Writer, c.Request, c.Writer.Header())
if err != nil {
@ -271,11 +260,11 @@ func Serve(c *gin.Context) {
}
client := &Client{
id: socketSessionId,
id: finalSessId,
conn: conn,
subChannel: make(chan radix.PubSubMessage),
uploadNotifChannel: make(chan radix.PubSubMessage),
channelName: fmt.Sprintf("%s_%s", bus.WebsocketPubSubPrefix, socketSessionId),
channelName: fmt.Sprintf("%s_%s", bus.WebsocketPubSubPrefix, finalSessId),
}
C[client] = true

Loading…
Cancel
Save