Working invoice payment

master
Chakib Benziane 5 years ago
parent c1900d52a3
commit e4ca0e2701

@ -4,6 +4,8 @@ import (
"git.sp4ke.com/sp4ke/bit4sat/storage"
"git.sp4ke.com/sp4ke/bit4sat/ws"
"github.com/gin-contrib/cors"
"github.com/gin-contrib/sessions"
"github.com/gin-contrib/sessions/redis"
"github.com/gin-gonic/gin"
)
@ -38,6 +40,17 @@ func (api *API) Run() {
func NewAPI() *API {
router := gin.Default()
router.Use(cors.Default())
// Setup Session
sessionStore, err := redis.NewStore(10, "tcp", "redis:6379",
"", []byte(SessionSecret))
if err != nil {
panic(err)
}
router.Use(sessions.Sessions("websocket-session", sessionStore))
//
//
//router.Use(secure.New(secure.Config{
//ContentSecurityPolicy: "default-src 'self'; script-src *; worker-src *",

@ -1,18 +1,21 @@
package bus
// PubSub Update Types
// Channel names
const (
UpdatePaymentReceived = iota
WatchInvoicesChannelName = "watch_invoices"
WebsocketPubSubPrefix = "websocket_update"
UploadUpdateChannelPrefix = "upload_update"
)
// Channel names
// PubSub event types
const (
WatchInvoicesChannelName = "watch_invoices"
PaymentReceived = iota
SetUploadId
)
// Simple message used for update notifications in pub/sub
type UpdateMessage struct {
UploadId string
Status int // update type
Data interface{}
type Message struct {
UploadId string `json:"upload_id"`
Type int `json:"type"` // update type
Data interface{} `json:"data"`
}

@ -21,6 +21,7 @@ services:
- DB_PASS=bit4sat
- LN_CHARGE_API=ln-charge-test:9112
- LN_CHARGE_TOKEN=3emU3Fy8VasHCzMaMXHSVJYpQSqH3yXQj8N5cQFBbq3botrudJuR7zQkBBmFSbAmgXs9GD4j4U3J4R2sMfgqPo8q
- SESSION_SECRET=Ai7fCy36UE5cb9wcmdAxxRXwYyQDsDMr6rYocA6Eava7pdiB29EusLbb9sTYWS1e
#deploy:
#replicas: 1

@ -5,21 +5,21 @@ go 1.12
require (
github.com/gin-contrib/cors v0.0.0-20190301062745-f9e10995c85a
github.com/gin-contrib/secure v0.0.0-20190301062601-f9a5befa6106
github.com/gin-contrib/sessions v0.0.0-20190226023029-1532893d996f
github.com/gin-gonic/gin v1.3.0
github.com/go-sql-driver/mysql v1.4.1 // indirect
github.com/golang/protobuf v1.3.1 // indirect
github.com/gorilla/websocket v1.4.0
github.com/jmoiron/sqlx v1.2.0
github.com/kr/pretty v0.1.0 // indirect
github.com/lib/pq v1.0.0
github.com/mattn/go-isatty v0.0.7 // indirect
github.com/mattn/go-sqlite3 v1.10.0
github.com/mediocregopher/radix/v3 v3.2.3
github.com/segmentio/ksuid v1.0.2
github.com/stretchr/testify v1.3.0 // indirect
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf
github.com/ugorji/go/codec v0.0.0-20190320090025-2dc34c0b8780 // indirect
golang.org/x/net v0.0.0-20190322120337-addf6b3196f6
golang.org/x/sys v0.0.0-20190322080309-f49334f85ddc // indirect
google.golang.org/appengine v1.5.0 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
)

@ -1,3 +1,7 @@
github.com/boj/redistore v0.0.0-20180917114910-cd5dcc76aeff h1:RmdPFa+slIr4SCBg4st/l/vZWVe9QJKMXGO60Bxbe04=
github.com/boj/redistore v0.0.0-20180917114910-cd5dcc76aeff/go.mod h1:+RTT1BOk5P97fT2CiHkbFQwkK3mjsFAP6zCYV2aXtjw=
github.com/bradfitz/gomemcache v0.0.0-20180710155616-bc664df96737/go.mod h1:PmM6Mmwb0LSuEubjR8N7PtNe1KxZLtOUHtbeikc5h60=
github.com/bradleypeabody/gorilla-sessions-memcache v0.0.0-20181103040241-659414f458e1/go.mod h1:dkChI7Tbtx7H1Tj7TqGSZMOeGpMP5gLHtjroHd4agiI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -5,10 +9,13 @@ github.com/gin-contrib/cors v0.0.0-20190301062745-f9e10995c85a h1:zBycVvXa03SIX+
github.com/gin-contrib/cors v0.0.0-20190301062745-f9e10995c85a/go.mod h1:pL2kNE+DgDU+eQ+dary5bX0Z6LPP8nR6Mqs1iejILw4=
github.com/gin-contrib/secure v0.0.0-20190301062601-f9a5befa6106 h1:EvzHcDe2j4sFRLhCJR6sE1cwmt9LtrMMDdejcdjQ1Dg=
github.com/gin-contrib/secure v0.0.0-20190301062601-f9a5befa6106/go.mod h1:l2I5UUSmHxQp81CF1EmB6RIo22QgZx7yIa8bwgFwq3k=
github.com/gin-contrib/sessions v0.0.0-20190226023029-1532893d996f h1:f8TGHcU6cxOhMwW6YQhpRe+zlr05qNjVmdcK1qigr5I=
github.com/gin-contrib/sessions v0.0.0-20190226023029-1532893d996f/go.mod h1:8Xd9k6zfW7ekJjy3wJrgbgB2KWvP+GWywe6PanyMVwI=
github.com/gin-contrib/sse v0.0.0-20170109093832-22d885f9ecc7 h1:AzN37oI0cOS+cougNAV9szl6CVoj2RYwzS3DpUQNtlY=
github.com/gin-contrib/sse v0.0.0-20170109093832-22d885f9ecc7/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s=
github.com/gin-gonic/gin v1.3.0 h1:kCmZyPklC0gVdL728E6Aj20uYBJV93nj/TkwBTKhFbs=
github.com/gin-gonic/gin v1.3.0/go.mod h1:7cKuhb5qV2ggCFctp2fJQ+ErvciLZrIeoOSOm6mUr7Y=
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
@ -16,12 +23,22 @@ github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.1.1/go.mod h1:8KCfur6+4Mqcc6S0FEfKuN15Vl5MgXW92AE8ovaJD0w=
github.com/gorilla/sessions v1.1.3 h1:uXoZdcdA5XdXF3QzuSlheVRUvjl+1rKY7zBXL68L9RU=
github.com/gorilla/sessions v1.1.3/go.mod h1:8KCfur6+4Mqcc6S0FEfKuN15Vl5MgXW92AE8ovaJD0w=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/jmoiron/sqlx v1.2.0 h1:41Ip0zITnmWNR/vHV+S4m+VoUivnWY5E4OJfLZjCJMA=
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
github.com/json-iterator/go v1.1.5 h1:gL2yXlmiIo4+t+y32d4WGwOjKGYcGOuyrg46vadswDE=
github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/kidstuff/mongostore v0.0.0-20181113001930-e650cd85ee4b/go.mod h1:g2nVr8KZVXJSS97Jo8pJ0jgq29P6H7dG0oplUA86MQw=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@ -40,12 +57,16 @@ github.com/mediocregopher/mediocre-go-lib v0.0.0-20181029021733-cb65787f37ed h1:
github.com/mediocregopher/mediocre-go-lib v0.0.0-20181029021733-cb65787f37ed/go.mod h1:dSsfyI2zABAdhcbvkXqgxOxrCsbYeHCPgrZkku60dSg=
github.com/mediocregopher/radix/v3 v3.2.3 h1:TbcGCZdo9zfPYPgevsqRn+OjvCyfOK6TzuXhqzWdCt0=
github.com/mediocregopher/radix/v3 v3.2.3/go.mod h1:EmfVyvspXz1uZEyPBMyGK+kjWiKQGvsUt6O3Pj+LDCQ=
github.com/memcachier/mc v2.0.1+incompatible/go.mod h1:7bkvFE61leUBvXz+yxsOnGBQSZpBSPIMUQSmmSHvuXc=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/quasoft/memstore v0.0.0-20180925164028-84a050167438/go.mod h1:wTPjTepVu7uJBYgZ0SdWHQlIas582j6cn2jgk4DDdlg=
github.com/segmentio/ksuid v1.0.2 h1:9yBfKyw4ECGTdALaF09Snw3sLJmYIX6AbPJrAy6MrDc=
github.com/segmentio/ksuid v1.0.2/go.mod h1:BXuJDr2byAiHuQaQtSKoXh1J0YmUDurywOXgB2w+OSU=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=

@ -50,7 +50,7 @@ func PollPaidInvoice(in *Invoice, paidChan chan<- *Invoice) {
resp := &http.Response{}
for {
log.Println("new poll")
//log.Println("new poll")
resp, err = http.Get(reqURI)
if err != nil {
log.Printf("Error: ", err)
@ -77,6 +77,7 @@ func PollPaidInvoice(in *Invoice, paidChan chan<- *Invoice) {
}
invoice := Invoice{}
invoice.UploadId = in.UploadId
jsonDec := json.NewDecoder(resp.Body)
jsonDec.Decode(&invoice)
log.Printf("Invoice paid %s", invoice)
@ -103,6 +104,8 @@ func UploadInvoice(amount float32, curr Currency, uploadId string) (*Invoice, er
return nil, err
}
log.Printf("Asking for invoice with: %s", jsonEnc)
resp, err := Client.Post(getUrl(InvoiceEndpoint),
"application/json",
bytes.NewReader(jsonEnc))

@ -46,13 +46,16 @@ var (
)
type Invoice struct {
Id string `json:"id"`
CreatedAt time.Time `json:"created_at"`
Description string `json:"description"`
ExpiresAt time.Time `json:"expires_at"`
Msatoshi int `json:"msatoshi"`
Payreq string `json:"payreq"`
RHash string `json:"r_hash"`
Status string `json:"status"`
UploadId string `json:"upload_id"`
Id string `json:"id"`
CreatedAt time.Time `json:"created_at"`
Description string `json:"description"`
ExpiresAt time.Time `json:"expires_at"`
Msatoshi string `json:"msatoshi"`
Payreq string `json:"payreq"`
RHash string `json:"rhash"`
Status string `json:"status"`
QuotedCurrency string `json:"quoted_currency"`
QuotedAmount string `json:"quoted_amount"`
PaidAt time.Time `json:"paid_at"`
UploadId string `json:"upload_id"`
}

@ -0,0 +1,24 @@
package main
import (
"log"
"os"
)
const (
SessionSecretEnv = "SESSION_SECRET"
)
var (
SessionSecret string
)
func init() {
var set bool
SessionSecret, set = os.LookupEnv(SessionSecretEnv)
if !set {
log.Fatalf("%s not set", SessionSecretEnv)
}
}

@ -3,16 +3,21 @@ package storage
import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"git.sp4ke.com/sp4ke/bit4sat/bus"
"git.sp4ke.com/sp4ke/bit4sat/db"
"git.sp4ke.com/sp4ke/bit4sat/ln"
"git.sp4ke.com/sp4ke/bit4sat/utils"
"git.sp4ke.com/sp4ke/bit4sat/ws"
"github.com/gin-contrib/sessions"
"github.com/gin-gonic/gin"
"github.com/mediocregopher/radix/v3"
)
type UploadCtrl struct{}
@ -65,6 +70,31 @@ func (ctrl UploadCtrl) New(c *gin.Context) {
return
}
// Register this uploadId to the client's websocket update channel
// websocket on the pubsub channel update_websocket_[websocket id]
session := sessions.Default(c)
webSocketId := session.Get(ws.WebsocketIdName)
if webSocketId == nil {
utils.JSONErrPriv(c, http.StatusInternalServerError,
fmt.Errorf("no session id found for web socket"))
return
}
msg := bus.Message{
UploadId: uploadId,
}
msg.Type = bus.SetUploadId
jsonMsg, err := json.Marshal(msg)
channelName := fmt.Sprintf("%s_%s", bus.WebsocketPubSubPrefix, webSocketId)
if err = db.DB.Redis.Do(radix.FlatCmd(nil, "PUBLISH",
channelName, jsonMsg)); err != nil {
utils.JSONErrPriv(c, http.StatusInternalServerError, err)
return
}
err = SetUploadStatus(uploadId, UpNew)
if err != nil {
utils.JSONErrPriv(c, http.StatusInternalServerError, err)
@ -77,6 +107,7 @@ func (ctrl UploadCtrl) New(c *gin.Context) {
// New invoice for 10$
invoice, err := ln.UploadInvoice(100, ln.CurSat, uploadId)
invoice.UploadId = uploadId
//info, err := ln.Info()
if err != nil {
@ -90,7 +121,9 @@ func (ctrl UploadCtrl) New(c *gin.Context) {
return
}
err = PublishWatchUploadPayment(invoice)
// Start watching upload payment status for this client
err = WatchUploadPayment(invoice)
if err != nil {
log.Println("error watching payment ", err)
utils.JSONErrPriv(c, http.StatusInternalServerError, err)

@ -131,7 +131,9 @@ func SetUploadInvoice(uploadId string, invoice *ln.Invoice) error {
return DB.Redis.Do(radix.FlatCmd(nil, "SET", key, invoiceJson))
}
func PublishWatchUploadPayment(invoice *ln.Invoice) error {
func WatchUploadPayment(invoice *ln.Invoice) error {
log.Println("watching invoice ", invoice)
invoiceJson, err := json.Marshal(invoice)
if err != nil {

@ -9,6 +9,7 @@ import (
"git.sp4ke.com/sp4ke/bit4sat/db"
"git.sp4ke.com/sp4ke/bit4sat/ln"
"git.sp4ke.com/sp4ke/bit4sat/storage"
"github.com/gin-gonic/gin"
"github.com/mediocregopher/radix/v3"
)
@ -34,9 +35,11 @@ func (w *InvoiceWatcher) Run() {
panic(err)
}
//TODO: kill goroutine if listening websocket dies
go ln.PollPaidInvoice(&invoice, w.invoicePaid)
case invoice := <-w.invoicePaid:
log.Printf("recevied update for invoice <%s>", invoice.UploadId)
// Set upload status to UpWaitingStorage (paid)
err := storage.SetUploadStatus(invoice.UploadId, storage.UpWaitingStorage)
if err != nil {
@ -50,21 +53,24 @@ func (w *InvoiceWatcher) Run() {
}
// publish invoice was updated to upload_id_paid channel
updateMsg := bus.UpdateMessage{}
updateMsg.Status = bus.UpdatePaymentReceived
updateMsg.UploadId = invoice.UploadId
updateMsg.Data = invoice
msg := bus.Message{}
msg.Type = bus.PaymentReceived
msg.UploadId = invoice.UploadId
msg.Data = gin.H{
"invoice": invoice,
}
updateMsgJson, err := json.Marshal(updateMsg)
msgJson, err := json.Marshal(msg)
if err != nil {
panic(err)
}
log.Printf("Notifying upload %s paid", invoice.UploadId)
key := fmt.Sprintf("upload_update_%s", invoice.UploadId)
log.Printf("Notifying upload paid %s", invoice.UploadId)
key := fmt.Sprintf("%s_%s", bus.UploadUpdateChannelPrefix,
invoice.UploadId)
err = db.DB.Redis.Do(radix.FlatCmd(nil, "PUBLISH",
key, updateMsgJson))
key, msgJson))
}
}

@ -30,6 +30,7 @@ class Upload {
async create() {
let req = new Request(endPoints.upload , {
method: 'POST',
credentials: 'same-origin',
body: JSON.stringify(this.uploadMetadata)
})
@ -60,6 +61,7 @@ class Upload {
let req = new Request(endPoints.upload + '/' + this.uploadId, {
method: 'PUT',
credentials: 'same-origin',
body: formData
})

@ -0,0 +1,15 @@
package ws
// Websocket message types
const (
invoicePaid = iota
)
var messageTypes = map[int]string{
invoicePaid: "invoice-paid",
}
type ProtoMessage struct {
Type string `json:"type"`
Data interface{} `json:"data"`
}

@ -2,12 +2,19 @@ package ws
import (
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"time"
"git.sp4ke.com/sp4ke/bit4sat/bus"
"git.sp4ke.com/sp4ke/bit4sat/db"
"github.com/gin-contrib/sessions"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/mediocregopher/radix/v3"
"github.com/segmentio/ksuid"
)
const (
@ -23,6 +30,8 @@ const (
// Maximum message size
maxMessageSize = 512
WebsocketIdName = "websocket-id"
)
var upgrader = websocket.Upgrader{
@ -33,22 +42,38 @@ var upgrader = websocket.Upgrader{
},
}
var C *Client
// Keep reference to clients so they can be used in goroutines
var C = make(map[*Client]bool)
// Interface to talk to websocket
type Client struct {
id ksuid.KSUID
// websocket connection
conn *websocket.Conn
// Buffered channel of outbound messages
send chan []byte
// PubSub message channel for this websocket
subChannel chan radix.PubSubMessage
// Name of main subscribed channel for this websocket
channelName string
// Listen to updates from redis
// upload ids registered to this client
uploadId string
// upload notifications channel
uploadNotifChannel chan radix.PubSubMessage
}
func (c *Client) readPump() {
defer c.conn.Close()
defer func() {
// Delete client from referenc list
delete(C, c)
// Close ws connection
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error {
@ -82,10 +107,101 @@ func (c *Client) writePump() {
c.conn.Close()
}()
// Subscribe to upload notifications
// Subscribe to general notifications for this client
log.Printf("socket subscribing to %s", c.channelName)
if err := db.DB.RedisPubSub.Subscribe(c.subChannel,
c.channelName); err != nil {
log.Println(err)
return
}
// subscribe to notifications related to this channel's registered upload
// ids
if err := db.DB.RedisPubSub.PSubscribe(c.uploadNotifChannel,
fmt.Sprintf("%s_*", bus.UploadUpdateChannelPrefix)); err != nil {
log.Println(err)
return
}
for {
select {
case msg := <-c.subChannel:
log.Printf("received msg %s on socket main channel", msg)
jsonMsg := bus.Message{}
if err := json.Unmarshal(msg.Message, &jsonMsg); err != nil {
log.Printf("ws error reading from pubsub: %s", err)
continue
}
if jsonMsg.Type == bus.SetUploadId {
c.uploadId = jsonMsg.UploadId
}
case msg := <-c.uploadNotifChannel:
log.Printf("websocket received upload paid notification on channel %s", msg.Channel)
log.Printf("our registered ids are %s", c.uploadId)
// If we have no upload id registered for this client break
if c.uploadId == "" {
continue
}
// Check if the message matches our upload id
slice := strings.SplitN(msg.Channel, "_", 3)
if len(slice) != 3 {
log.Printf("error decoding channel name %s", msg.Channel)
}
msgTarget := slice[2]
// If the target is any of our registered upload ids
// broadcast the message
if c.uploadId == msgTarget {
log.Println("this message is for us sending to client")
jsonMsg := bus.Message{}
err := json.Unmarshal(msg.Message, &jsonMsg)
if err != nil {
log.Printf("unmarshal error %s", err)
break
}
switch jsonMsg.Type {
case bus.PaymentReceived:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
log.Println(err)
return
}
socketMsg := ProtoMessage{}
socketMsg.Type = messageTypes[invoicePaid]
socketMsg.Data = jsonMsg.Data
enc := json.NewEncoder(w)
err = enc.Encode(socketMsg)
if err != nil {
log.Println(err)
return
}
// No need to encode the message it's already in json
if err = w.Close(); err != nil {
fmt.Println(err)
return
}
}
// Handle different message types
}
//case <-testTicker.C:
//c.conn.SetWriteDeadline(time.Now().Add(writeWait))
@ -117,10 +233,9 @@ func (c *Client) writePump() {
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
log.Println(err)
log.Printf("websocket ping error: %s", err)
return
}
}
}
}
@ -128,7 +243,24 @@ func (c *Client) writePump() {
func Serve(c *gin.Context) {
log.Println("websocket request")
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
session := sessions.Default(c)
// Create unique id for this session in order to store it
// in the websocket bus
socketSessionId, err := ksuid.NewRandomWithTime(time.Now())
if err != nil {
log.Println("error generating ksuid for websocket")
}
log.Printf("using socket id: %s", socketSessionId)
session.Set(WebsocketIdName, socketSessionId.String())
session.Save()
log.Printf("Writing socket session id to header: %s", c.Writer.Header())
conn, err := upgrader.Upgrade(c.Writer, c.Request, c.Writer.Header())
if err != nil {
if _, ok := err.(websocket.HandshakeError); !ok {
log.Printf("handshake error: %s", err)
@ -138,8 +270,16 @@ func Serve(c *gin.Context) {
return
}
C = &Client{conn: conn, send: make(chan []byte, 256)}
client := &Client{
id: socketSessionId,
conn: conn,
subChannel: make(chan radix.PubSubMessage),
uploadNotifChannel: make(chan radix.PubSubMessage),
channelName: fmt.Sprintf("%s_%s", bus.WebsocketPubSubPrefix, socketSessionId),
}
C[client] = true
go C.writePump()
go C.readPump()
go client.writePump()
go client.readPump()
}

Loading…
Cancel
Save