websocket ready, wip invoice watcher

master
Chakib Benziane 5 years ago
parent b1e4ee4196
commit bcd3160cfd

@ -2,6 +2,7 @@ package main
import ( import (
"git.sp4ke.com/sp4ke/bit4sat/storage" "git.sp4ke.com/sp4ke/bit4sat/storage"
"git.sp4ke.com/sp4ke/bit4sat/ws"
"github.com/gin-contrib/cors" "github.com/gin-contrib/cors"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
@ -24,11 +25,11 @@ func (api *API) Run() {
} }
websocket := api.router.Group("/ws") wsRoute := api.router.Group("/ws")
{ {
websocket.GET("/", serveWebsocket) wsRoute.GET("/", ws.Serve)
websocket.GET("/WS", serveWebsocket) wsRoute.GET("/WS", ws.Serve)
websocket.GET("/WSS", serveWebsocket) wsRoute.GET("/WSS", ws.Serve)
} }
api.router.Run(":8880") api.router.Run(":8880")

@ -18,7 +18,7 @@ require (
github.com/stretchr/testify v1.3.0 // indirect github.com/stretchr/testify v1.3.0 // indirect
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf
github.com/ugorji/go/codec v0.0.0-20190320090025-2dc34c0b8780 // indirect github.com/ugorji/go/codec v0.0.0-20190320090025-2dc34c0b8780 // indirect
golang.org/x/net v0.0.0-20190322120337-addf6b3196f6 // indirect golang.org/x/net v0.0.0-20190322120337-addf6b3196f6
golang.org/x/sys v0.0.0-20190322080309-f49334f85ddc // indirect golang.org/x/sys v0.0.0-20190322080309-f49334f85ddc // indirect
google.golang.org/appengine v1.5.0 // indirect google.golang.org/appengine v1.5.0 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect

@ -9,22 +9,28 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
"strconv"
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
type Currency int
const ( const (
CurMSat = iota CurMSat Currency = iota
CurSat CurSat
CurBTC
CurUSD CurUSD
CurEur
) )
const ( const (
LNChargeAPIEnv = "LN_CHARGE_API" LNChargeAPIEnv = "LN_CHARGE_API"
LNChargeTokenEnv = "LN_CHARGE_TOKEN" LNChargeTokenEnv = "LN_CHARGE_TOKEN"
InfoEndpoint = "info" InfoEndpoint = "info"
InvoiceEndpoint = "invoice" InvoiceEndpoint = "invoice"
PollInvoiceEndpoint = "invoice"
) )
var ( var (
@ -32,6 +38,24 @@ var (
LNChargeToken string LNChargeToken string
) )
var (
CurrencyString = map[Currency]string{
CurUSD: "USD",
CurSat: "SAT",
CurBTC: "BTC",
CurMSat: "MSAT",
CurEur: "EUR",
}
CurrencyID = map[string]Currency{
"USD": CurUSD,
"SAT": CurSat,
"MSAT": CurMSat,
"BTC": CurBTC,
"EUR": CurEur,
}
)
type Charge struct { type Charge struct {
client *http.Client client *http.Client
} }
@ -67,13 +91,69 @@ func (c *Charge) Info() (gin.H, error) {
return result, nil return result, nil
} }
func (c *Charge) Invoice(amount float32, id string) (gin.H, error) { func (c *Charge) PollInvoice(id string) (*Invoice, error) {
reqParams := url.Values{}
// Timeout in seconds
reqParams.Set("timeout", strconv.Itoa(30))
reqURI := fmt.Sprintf("%s/%s/wait?%s",
getUrl(PollInvoiceEndpoint),
id,
reqParams.Encode())
log.Printf("polling to %s", reqURI)
var err error
resp := &http.Response{}
for {
log.Println("new poll")
resp, err = http.Get(reqURI)
if err != nil {
return nil, err
}
if resp.StatusCode == http.StatusPaymentRequired {
continue
}
if resp.StatusCode == http.StatusGone {
log.Printf("invoice expired")
return nil, fmt.Errorf("invoice expired")
}
if resp.StatusCode == http.StatusOK {
break
}
//unknownData := gin.H{}
//jsonDec := json.NewDecoder(resp.Body)
//jsonDec.Decode(&data)
log.Printf("unknown resopnse %s", resp.Status)
return nil, fmt.Errorf("unknown response %s", resp.Status)
}
invoice := Invoice{}
jsonDec := json.NewDecoder(resp.Body)
jsonDec.Decode(&invoice)
log.Printf("Invoice paid %s", invoice)
return &invoice, nil
}
func (c *Charge) Invoice(amount float32, curr Currency, uploadId string) (*Invoice, error) {
result := make(gin.H)
reqData := gin.H{ reqData := gin.H{
"amount": amount, "description": fmt.Sprintf("bit4sat upload: %s", uploadId),
"currency": "USD", }
"description": fmt.Sprintf("bit4sat upload: %s", id),
// If Satoshi convert to millisat
if curr == CurSat {
reqData["msatoshi"] = amount * 1000
} else {
reqData["currency"] = CurrencyString[curr]
reqData["amount"] = amount
} }
jsonEnc, err := json.Marshal(reqData) jsonEnc, err := json.Marshal(reqData)
@ -89,11 +169,12 @@ func (c *Charge) Invoice(amount float32, id string) (gin.H, error) {
return nil, err return nil, err
} }
result = make(gin.H) invoice := Invoice{}
invoice.UploadId = uploadId
jsonDec := json.NewDecoder(resp.Body) jsonDec := json.NewDecoder(resp.Body)
jsonDec.Decode(&result) jsonDec.Decode(&invoice)
return result, nil return &invoice, nil
} }
func getUrl(endpoint string) string { func getUrl(endpoint string) string {

@ -0,0 +1,80 @@
package ln
import (
"encoding/json"
"log"
"time"
"git.sp4ke.com/sp4ke/bit4sat/db"
"github.com/mediocregopher/radix/v3"
)
type Invoice struct {
Id string `json:"id"`
CreatedAt time.Time `json:"created_at"`
Description string `json:"description"`
ExpiresAt time.Time `json:"expires_at"`
Msatoshi int `json:"msatoshi"`
Payreq string `json:"payreq"`
RHash string `json:"r_hash"`
Status string `json:"status"`
UploadId string `json:"upload_id"`
}
type InvoiceWatcher struct{}
func (w *InvoiceWatcher) Run() {
watchedKey := "watch_invoices"
for {
var invoiceBlob []byte
popTimeout := "0" //block until ready
err := db.DB.Redis.Do(radix.Cmd(&invoiceBlob,
"BLPOP",
watchedKey,
popTimeout))
if err != nil {
log.Println("error in InvoiceWatcher ", err)
continue
}
invoice := Invoice{}
err = json.Unmarshal(invoiceBlob, &invoice)
if err != nil {
log.Println("error unmarshaling in InvoiceWatcher ", err)
}
go func() {
log.Println("Starting goroutine to watch invoice")
time.Sleep(10 * time.Second)
//charge := ln.NewCharge()
// Blocking function
//rawInvoice, err := charge.PollInvoice(invoice.Id)
//if err != nil {
//return err
//}
//err = SetUploadStatus(uploadId, UpWaitingStorage)
//if err != nil {
//log.Printf("Error setting upload status", err)
//}
// Set paid upload invoice id
//err = SetPaidUploadInvoice(uploadId, invoice)
//if err != nil {
//return err
//}
}()
time.Sleep(1 * time.Second)
}
}
func NewInvoiceWatcher() *InvoiceWatcher {
return &InvoiceWatcher{}
}

@ -2,13 +2,16 @@ package main
import ( import (
"git.sp4ke.com/sp4ke/bit4sat/db" "git.sp4ke.com/sp4ke/bit4sat/db"
"git.sp4ke.com/sp4ke/bit4sat/ln"
) )
func main() { func main() {
defer db.DB.Sql.Close() defer db.DB.Sql.Close()
api := NewAPI() api := NewAPI()
invoiceWatcher := ln.NewInvoiceWatcher()
go invoiceWatcher.Run()
api.Run() api.Run()
} }

@ -28,7 +28,7 @@ func (ctrl UploadCtrl) New(c *gin.Context) {
} }
// Create unique id // Create unique id
id, err := GetSIDGenerator().Generate() uploadId, err := GetSIDGenerator().Generate()
if err != nil { if err != nil {
utils.JSONErrPriv(c, http.StatusInternalServerError, err) utils.JSONErrPriv(c, http.StatusInternalServerError, err)
return return
@ -42,7 +42,7 @@ func (ctrl UploadCtrl) New(c *gin.Context) {
for _, file := range uploadForm.Files { for _, file := range uploadForm.Files {
up := &Upload{} up := &Upload{}
up.ID = id up.ID = uploadId
up.FileName, up.FileExt = utils.CleanFileName(file.Name) up.FileName, up.FileExt = utils.CleanFileName(file.Name)
@ -65,27 +65,75 @@ func (ctrl UploadCtrl) New(c *gin.Context) {
return return
} }
err = CacheSetUploadStatus(id, UpNew) err = SetUploadStatus(uploadId, UpNew)
if err != nil { if err != nil {
utils.JSONErrPriv(c, http.StatusInternalServerError, err) utils.JSONErrPriv(c, http.StatusInternalServerError, err)
return return
} }
log.Println("new upload created") // If not free upload generate a an invoice
c.JSON(http.StatusOK, gin.H{ // TODO: check if upload fee is free
"status": http.StatusOK, if true {
"result": gin.H{
"id": id, // Get a new lightning invoice
}, charge := ln.NewCharge()
})
// New invoice for 10$
invoice, err := charge.Invoice(100, ln.CurSat, uploadId)
//info, err := ln.Info()
if err != nil {
utils.JSONErrPriv(c, http.StatusInternalServerError, err)
return
}
err = SetUploadStatus(uploadId, UpWaitingPayment)
if err != nil {
utils.JSONErrPriv(c, http.StatusInternalServerError, err)
return
}
err = PushWatchUploadPayment(invoice)
if err != nil {
log.Println("error watching payment ", err)
utils.JSONErrPriv(c, http.StatusInternalServerError, err)
return
}
result := gin.H{
"upload_id": uploadId,
"invoice": invoice,
}
c.JSON(http.StatusOK, gin.H{
"status": "waiting for payment",
"result": result,
})
return
} else {
//Else, free upload accepted
log.Println("new upload created")
c.JSON(http.StatusOK, gin.H{
"status": "ready for upload",
"result": gin.H{
"id": uploadId,
},
})
}
} }
func (ctrl UploadCtrl) Upload(c *gin.Context) { func (ctrl UploadCtrl) Upload(c *gin.Context) {
// Check that upload ID exists // Check that upload ID exists
id := c.Param("id") uploadId := c.Param("id")
exists, err := IdExists(id) exists, err := IdExists(uploadId)
if err != nil { if err != nil {
utils.JSONErrPriv(c, http.StatusInternalServerError, err) utils.JSONErrPriv(c, http.StatusInternalServerError, err)
return return
@ -140,7 +188,7 @@ func (ctrl UploadCtrl) Upload(c *gin.Context) {
sum256 := hex.EncodeToString(hasher.Sum(nil)) sum256 := hex.EncodeToString(hasher.Sum(nil))
// Get the file's metadata from upload table // Get the file's metadata from upload table
up, err := GetByHashID(sum256, id) up, err := GetByHashID(sum256, uploadId)
if err != nil { if err != nil {
utils.JSONErrPriv(c, http.StatusNotFound, err) utils.JSONErrPriv(c, http.StatusNotFound, err)
db.RollbackTx(tx, nil) db.RollbackTx(tx, nil)
@ -191,31 +239,15 @@ func (ctrl UploadCtrl) Upload(c *gin.Context) {
return return
} }
err = CacheSetUploadStatus(id, UpStored) err = SetUploadStatus(uploadId, UpStored)
if err != nil { if err != nil {
utils.JSONErrPriv(c, http.StatusInternalServerError, err) utils.JSONErrPriv(c, http.StatusInternalServerError, err)
return return
} }
// Get a new lightning invoice
charge := ln.NewCharge()
// New invoice for 10$
invoice, err := charge.Invoice(float32(0.01), id)
//info, err := ln.Info()
if err != nil {
utils.JSONErrPriv(c, http.StatusInternalServerError, err)
return
}
result := gin.H{
"message": "upload OK. waiting for payment",
"invoice": invoice,
}
c.JSON(http.StatusOK, gin.H{ c.JSON(http.StatusOK, gin.H{
"status": http.StatusOK, "status": http.StatusOK,
"result": result, "result": "uploaded",
}) })
} }

@ -12,6 +12,13 @@ type File struct {
type UploadForm struct { type UploadForm struct {
Files []File `form:"files" binding:"required"` Files []File `form:"files" binding:"required"`
// Ask payment
RequestPayment bool `form:"request_payment"`
RequestPaymentAmount int `form:"request_payment_amount"`
PaymentCurrency string `form:"payment_currency"`
// Default Date().toJSON() from js // Default Date().toJSON() from js
TimeStamp time.Time `form:"timestamp"` TimeStamp time.Time `form:"timestamp"`
} }

@ -2,19 +2,22 @@ package storage
import ( import (
"database/sql" "database/sql"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"log" "log"
"git.sp4ke.com/sp4ke/bit4sat/db" "git.sp4ke.com/sp4ke/bit4sat/db"
"git.sp4ke.com/sp4ke/bit4sat/ln"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/mattn/go-sqlite3" "github.com/lib/pq"
"github.com/mediocregopher/radix/v3" "github.com/mediocregopher/radix/v3"
) )
var DB = db.DB var DB = db.DB
const ( const (
//TODO: sync upload status from redis
DBUploadSchema = ` DBUploadSchema = `
CREATE TABLE IF NOT EXISTS upload ( CREATE TABLE IF NOT EXISTS upload (
id serial PRIMARY KEY, id serial PRIMARY KEY,
@ -24,6 +27,7 @@ const (
file_type varchar(255) DEFAULT '', file_type varchar(255) DEFAULT '',
file_size integer NOT NULL, file_size integer NOT NULL,
file_ext varchar(255) DEFAULT '', file_ext varchar(255) DEFAULT '',
fee integer NOT NULL DEFAULT 0,
status integer DEFAULT 0 REFERENCES upload_status(type), status integer DEFAULT 0 REFERENCES upload_status(type),
UNIQUE (upload_id, sha256) UNIQUE (upload_id, sha256)
); );
@ -38,7 +42,8 @@ AS SELECT
upload.file_name, upload.file_name,
upload.file_type, upload.file_type,
upload.file_size, upload.file_size,
upload.file_ext upload.file_ext,
upload.fee
FROM upload FROM upload
JOIN upload_status ON JOIN upload_status ON
upload.status = upload_status.type upload.status = upload_status.type
@ -52,9 +57,9 @@ upload.status = upload_status.type
` `
QNewUpload = `INSERT INTO upload QNewUpload = `INSERT INTO upload
(upload_id, sha256, file_name, file_type, file_size, file_ext, status) (upload_id, sha256, file_name, file_type, file_size, file_ext, status, fee)
VALUES VALUES
(:upload_id, :sha256, :file_name, :file_type, :file_size, :file_ext, :status)` (:upload_id, :sha256, :file_name, :file_type, :file_size, :file_ext, :status, :fee)`
QSetStatus = `UPDATE upload SET status = :status WHERE upload_id = :upload_id ` QSetStatus = `UPDATE upload SET status = :status WHERE upload_id = :upload_id `
@ -64,21 +69,26 @@ upload.status = upload_status.type
file_type, file_type,
file_size, file_size,
file_ext, file_ext,
fee,
status status
FROM upload WHERE sha256 = $1 AND upload_id = $2` FROM upload WHERE sha256 = $1 AND upload_id = $2`
) )
const ( const (
UpNew = iota UpNew = iota
UpStored
UpWaitingPayment UpWaitingPayment
UpWaitingStorage
UpStored
UpReady UpReady
) )
// new --> waiting payment (optional) --> waiting storage (client upload)
// --> Sotred --> Ready (sharing url sent to client and acked)
var UploadStatus = map[int]string{ var UploadStatus = map[int]string{
UpNew: "new upload", UpNew: "new upload",
UpStored: "stored",
UpWaitingPayment: "waiting payment", UpWaitingPayment: "waiting payment",
UpWaitingStorage: "waiting storage",
UpStored: "stored",
UpReady: "ready", UpReady: "ready",
} }
@ -95,14 +105,64 @@ type Upload struct {
FileSize int64 `db:"file_size"` FileSize int64 `db:"file_size"`
FileExt string `db:"file_ext"` FileExt string `db:"file_ext"`
Status int `db:"status"` Status int `db:"status"`
Fee int `db:"fee"`
} }
func CacheSetUploadStatus(id string, status int) error { // TODO: sync from redis to db
key := fmt.Sprintf("upload_status_%s", id) //func SyncUploadStatusToDB(){
//}
func SetUploadStatus(id string, status int) error {
key := fmt.Sprintf("upload_status_%s", id)
return DB.Redis.Do(radix.FlatCmd(nil, "SET", key, status)) return DB.Redis.Do(radix.FlatCmd(nil, "SET", key, status))
} }
func SetUploadInvoice(uploadId string, invoice *ln.Invoice) error {
key := fmt.Sprintf("upload_%s_invoice", uploadId)
invoiceJson, err := json.Marshal(invoice)
if err != nil {
return err
}
return DB.Redis.Do(radix.FlatCmd(nil, "SET", key, invoiceJson))
}
func PushWatchUploadPayment(invoice *ln.Invoice) error {
key := "watch_invoices"
invoiceJson, err := json.Marshal(invoice)
if err != nil {
return err
}
return DB.Redis.Do(radix.FlatCmd(nil, "RPUSH", key, invoiceJson))
//log.Printf("Will watch upload payment for uID %s", uploadId)
//charge := ln.NewCharge()
//invoice, err := charge.PollInvoice(invoiceId)
//if err != nil {
//return err
//}
//err = SetUploadStatus(uploadId, UpWaitingStorage)
//if err != nil {
//return err
//}
//// Set paid upload invoice id
//err = SetPaidUploadInvoice(uploadId, invoice)
//if err != nil {
//return err
//}
//TODO: Notify client on websocket
return nil
}
// Returns true if id exists in DB // Returns true if id exists in DB
func IdExists(id string) (exists bool, err error) { func IdExists(id string) (exists bool, err error) {
key := fmt.Sprintf("upload_status_%s", id) key := fmt.Sprintf("upload_status_%s", id)
@ -143,10 +203,11 @@ func (u *Upload) TxWrite(tx *sqlx.Tx) error {
_, err := tx.NamedExec(QNewUpload, u) _, err := tx.NamedExec(QNewUpload, u)
sqlErr, isSqlErr := err.(sqlite3.Error) if pqError, ok := err.(*pq.Error); ok {
// unique constraint
if isSqlErr && sqlErr.Code == sqlite3.ErrConstraint { if pqError.Code == "23505" {
return ErrAlreadyExists return ErrAlreadyExists
}
} }
if err != nil { if err != nil {
@ -158,9 +219,12 @@ func (u *Upload) TxWrite(tx *sqlx.Tx) error {
func (u *Upload) Write() error { func (u *Upload) Write() error {
_, err := DB.Sql.NamedExec(QNewUpload, u) _, err := DB.Sql.NamedExec(QNewUpload, u)
sqlErr, isSqlErr := err.(sqlite3.Error)
if isSqlErr && sqlErr.Code == sqlite3.ErrConstraint { if pqError, ok := err.(*pq.Error); ok {
return ErrAlreadyExists // unique constraint
if pqError.Code == "23505" {
return ErrAlreadyExists
}
} }
if err != nil { if err != nil {
@ -193,14 +257,7 @@ func init() {
for k, v := range UploadStatus { for k, v := range UploadStatus {
_, err := DB.Sql.Exec(query, k, v, v) _, err := DB.Sql.Exec(query, k, v, v)
if err != nil { if err != nil {
sqlErr, ok := err.(sqlite3.Error) log.Panic(err)
if ok && sqlErr.ExtendedCode == sqlite3.ErrConstraintUnique {
log.Panic(err)
}
if !ok {
log.Panic(err)
}
} }
} }

@ -4,7 +4,7 @@ root dist
ext .html ext .html
header / { header / {
Content-Security-Policy "default-src * 'unsafe-eval' 'unsafe-inline';" Content-Security-Policy "default-src * 'unsafe-eval' 'unsafe-inline' data:;"
} }
proxy /api localhost:8880 proxy /api localhost:8880

@ -1,18 +1,68 @@
import { apiPort } from './api.js'; import { apiPort } from './api.js';
let singleton = false let singleton = false;
export class WS { export class WS {
constructor(){ constructor(){
if (singleton) { console.log("starting new websocket")
if (singleton && singleton.open) {
return singleton; return singleton;
} }
const endpoint = `ws://localhost:2015/ws/`; const endpoint = `ws://localhost:2015/ws/`;
console.log(`endpoint is ${endpoint}`);
this.conn = new WebSocket(endpoint); this.conn = new WebSocket(endpoint);
singleton = this; this.conn.onerror = this.onerror();
this.conn.onclose = this.onclose();
this.conn.onmessage = this.onmessage();
this.conn.onopen = this.onopen();
}
send(data) {
this.conn.send(data);
}
onopen(){
let self = this;
return (ev) => {
self.opened = true;
console.log("websocket opened");
}
}
onerror (){
let self = this;
return (ev) => {
console.error(ev);
}
}
onclose () {
let self = this;
let instance = singleton
return (ev) => {
console.warn("websocket closed");
//TODO: restore on prod
//setTimeout(() =>{
//console.log("websocket: trying to reconnect")
//instance = new WS()
//}, 5000)
}
}
onmessage() {
let self = this;
return (ev) => {
let msg = JSON.parse(ev.data);
switch (msg.type) {
case "hello":
console.log(msg.data);
break;
}
}
} }
} }

@ -6,7 +6,6 @@ import GetWorker from './workerInterface.js';
import { WS } from './Websocket.js'; import { WS } from './Websocket.js';
window.ws = new WS() window.ws = new WS()
new WS()
window.app = new Vue({ window.app = new Vue({

@ -17,6 +17,7 @@ const Worker = GetWorker('main');
export default { export default {
data() { data() {
return { return {
uploadId: 0,
invoice: {}, invoice: {},
payreqURI: "", payreqURI: "",
} }
@ -25,13 +26,11 @@ export default {
let self = this let self = this
this.worker = Worker; this.worker = Worker;
//let canvas = this.$el.querySelector('#canvas')
this.worker.listenTo('upload-invoice', (e) => { this.worker.listenTo('upload-invoice', (e) => {
console.log("received invoice ", e.data) console.log("received invoice ", e.data)
self.invoice = e.data.invoice self.invoice = e.data.invoice
self.uploadId = e.data.id
}) })
}, },

@ -18,9 +18,10 @@
import GetWorker from './workerInterface.js'; import GetWorker from './workerInterface.js';
const w = GetWorker('main'); const w = GetWorker('main');
w.listenTo('upload-id', (e) => { //w.listenTo('upload-id', (e) => {
console.log('vue received upload id ', e.data.id) // console.log('vue received upload id ', e.data.id)
}) // console.log('vue received upload id ', e.data.invoice)
//})
export default { export default {

@ -53,23 +53,29 @@ async function newUpload(files){
await upload.create() await upload.create()
.then(data => { .then(data => {
// Get the upload id // Get the upload id
let { result: { id: id } } = data let { result: { upload_id: id, invoice: invoice } } = data
//postMessage({
//msg: 'upload-invoice',
//id: id,
//invoice: resp.result.invoice
//})
// Notify the UI // Notify the UI
postMessage({msg: 'upload-id', id: id}) postMessage({msg: 'upload-invoice', id: id, invoice: invoice})
// Send the files // Send the files
upload.send() //upload.send()
.then(resp =>{ //.then(resp =>{
postMessage({ //postMessage({
msg: 'upload-invoice', //msg: 'upload-invoice',
id: id, //id: id,
invoice: resp.result.invoice //invoice: resp.result.invoice
}) //})
}) //})
.catch(err =>{ //.catch(err =>{
console.error(err) //console.error(err)
}) //})
}) })
} }

@ -1,108 +0,0 @@
package main
import (
"bytes"
"log"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
const (
// Time allowed to write message to peer
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the client.
pongWait = 5 * time.Second
// Send pings to client with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
//pingPeriod = 5 * time.Second
// Maximum message size
maxMessageSize = 512
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
var (
conn *websocket.Conn
)
var (
newline = []byte{'\n'}
space = []byte{' '}
)
func reader() {
defer conn.Close()
conn.SetReadLimit(maxMessageSize)
conn.SetReadDeadline(time.Now().Add(pongWait))
conn.SetPongHandler(func(string) error {
//log.Println("pong")
conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
_, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway,
websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
}
}
func writer() {
pingTicker := time.NewTicker(pingPeriod)
defer func() {
log.Println("quit ws server")
pingTicker.Stop()
conn.Close()
}()
for {
select {
case <-pingTicker.C:
//log.Println("ping")
conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
log.Println(err)
return
}
}
}
}
func serveWebsocket(c *gin.Context) {
var err error
log.Println("websocket request")
conn, err = upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
if _, ok := err.(websocket.HandshakeError); !ok {
log.Printf("handshake error: %s", err)
}
log.Println(err)
return
}
go writer()
reader()
}

@ -0,0 +1,142 @@
package ws
import (
"encoding/json"
"log"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
const (
// Time allowed to write message to peer
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the client.
pongWait = 5 * time.Second
// Send pings to client with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
//pingPeriod = 5 * time.Second
// Maximum message size
maxMessageSize = 512
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
var C *Client
// Interface to talk to websocket
type Client struct {
// websocket connection
conn *websocket.Conn
// Buffered channel of outbound messages
send chan []byte
}
func (c *Client) readPump() {
defer c.conn.Close()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error {
//log.Println("pong")
c.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway,
websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
recvData := make(map[string]interface{})
json.Unmarshal(message, &recvData)
log.Println(recvData)
}
}
func (c *Client) writePump() {
//testTicker := time.NewTicker(5 * time.Second)
pingTicker := time.NewTicker(pingPeriod)
defer func() {
log.Println("quit ws server")
pingTicker.Stop()
c.conn.Close()
}()
for {
select {
//case <-testTicker.C:
//c.conn.SetWriteDeadline(time.Now().Add(writeWait))
//msg := gin.H{
//"msg": "hello",
//"data": "from server",
//}
//w, err := c.conn.NextWriter(websocket.TextMessage)
//if err != nil {
//log.Println(err)
//return
//}
//enc := json.NewEncoder(w)
//err = enc.Encode(msg)
//if err != nil {
//log.Println(err)
//return
//}
//if err = w.Close(); err != nil {
//log.Println(err)
//return
//}
case <-pingTicker.C:
//log.Println("ping")
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
log.Println(err)
return
}
}
}
}
func Serve(c *gin.Context) {
log.Println("websocket request")
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
if _, ok := err.(websocket.HandshakeError); !ok {
log.Printf("handshake error: %s", err)
}
log.Println(err)
return
}
C = &Client{conn: conn, send: make(chan []byte, 256)}
go C.writePump()
go C.readPump()
}
Loading…
Cancel
Save