diff --git a/api.go b/api.go index c410223..a506789 100644 --- a/api.go +++ b/api.go @@ -2,6 +2,7 @@ package main import ( "git.sp4ke.com/sp4ke/bit4sat/storage" + "git.sp4ke.com/sp4ke/bit4sat/ws" "github.com/gin-contrib/cors" "github.com/gin-gonic/gin" ) @@ -24,11 +25,11 @@ func (api *API) Run() { } - websocket := api.router.Group("/ws") + wsRoute := api.router.Group("/ws") { - websocket.GET("/", serveWebsocket) - websocket.GET("/WS", serveWebsocket) - websocket.GET("/WSS", serveWebsocket) + wsRoute.GET("/", ws.Serve) + wsRoute.GET("/WS", ws.Serve) + wsRoute.GET("/WSS", ws.Serve) } api.router.Run(":8880") diff --git a/go.mod b/go.mod index a35ac7b..fe85dd1 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/stretchr/testify v1.3.0 // indirect github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf github.com/ugorji/go/codec v0.0.0-20190320090025-2dc34c0b8780 // indirect - golang.org/x/net v0.0.0-20190322120337-addf6b3196f6 // indirect + golang.org/x/net v0.0.0-20190322120337-addf6b3196f6 golang.org/x/sys v0.0.0-20190322080309-f49334f85ddc // indirect google.golang.org/appengine v1.5.0 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect diff --git a/ln/charge.go b/ln/charge.go index ece3f18..e628d6d 100644 --- a/ln/charge.go +++ b/ln/charge.go @@ -9,22 +9,28 @@ import ( "net/http" "net/url" "os" + "strconv" "time" "github.com/gin-gonic/gin" ) +type Currency int + const ( - CurMSat = iota + CurMSat Currency = iota CurSat + CurBTC CurUSD + CurEur ) const ( - LNChargeAPIEnv = "LN_CHARGE_API" - LNChargeTokenEnv = "LN_CHARGE_TOKEN" - InfoEndpoint = "info" - InvoiceEndpoint = "invoice" + LNChargeAPIEnv = "LN_CHARGE_API" + LNChargeTokenEnv = "LN_CHARGE_TOKEN" + InfoEndpoint = "info" + InvoiceEndpoint = "invoice" + PollInvoiceEndpoint = "invoice" ) var ( @@ -32,6 +38,24 @@ var ( LNChargeToken string ) +var ( + CurrencyString = map[Currency]string{ + CurUSD: "USD", + CurSat: "SAT", + CurBTC: "BTC", + CurMSat: "MSAT", + CurEur: "EUR", + } + + CurrencyID = map[string]Currency{ + "USD": CurUSD, + "SAT": CurSat, + "MSAT": CurMSat, + "BTC": CurBTC, + "EUR": CurEur, + } +) + type Charge struct { client *http.Client } @@ -67,13 +91,69 @@ func (c *Charge) Info() (gin.H, error) { return result, nil } -func (c *Charge) Invoice(amount float32, id string) (gin.H, error) { +func (c *Charge) PollInvoice(id string) (*Invoice, error) { + + reqParams := url.Values{} + // Timeout in seconds + reqParams.Set("timeout", strconv.Itoa(30)) + reqURI := fmt.Sprintf("%s/%s/wait?%s", + getUrl(PollInvoiceEndpoint), + id, + reqParams.Encode()) + + log.Printf("polling to %s", reqURI) + + var err error + resp := &http.Response{} + + for { + log.Println("new poll") + resp, err = http.Get(reqURI) + if err != nil { + return nil, err + } + + if resp.StatusCode == http.StatusPaymentRequired { + continue + } + + if resp.StatusCode == http.StatusGone { + log.Printf("invoice expired") + return nil, fmt.Errorf("invoice expired") + } + + if resp.StatusCode == http.StatusOK { + break + } + + //unknownData := gin.H{} + //jsonDec := json.NewDecoder(resp.Body) + //jsonDec.Decode(&data) + log.Printf("unknown resopnse %s", resp.Status) + return nil, fmt.Errorf("unknown response %s", resp.Status) + + } + + invoice := Invoice{} + jsonDec := json.NewDecoder(resp.Body) + jsonDec.Decode(&invoice) + log.Printf("Invoice paid %s", invoice) + + return &invoice, nil +} + +func (c *Charge) Invoice(amount float32, curr Currency, uploadId string) (*Invoice, error) { - result := make(gin.H) reqData := gin.H{ - "amount": amount, - "currency": "USD", - "description": fmt.Sprintf("bit4sat upload: %s", id), + "description": fmt.Sprintf("bit4sat upload: %s", uploadId), + } + + // If Satoshi convert to millisat + if curr == CurSat { + reqData["msatoshi"] = amount * 1000 + } else { + reqData["currency"] = CurrencyString[curr] + reqData["amount"] = amount } jsonEnc, err := json.Marshal(reqData) @@ -89,11 +169,12 @@ func (c *Charge) Invoice(amount float32, id string) (gin.H, error) { return nil, err } - result = make(gin.H) + invoice := Invoice{} + invoice.UploadId = uploadId jsonDec := json.NewDecoder(resp.Body) - jsonDec.Decode(&result) + jsonDec.Decode(&invoice) - return result, nil + return &invoice, nil } func getUrl(endpoint string) string { diff --git a/ln/invoice.go b/ln/invoice.go new file mode 100644 index 0000000..1387639 --- /dev/null +++ b/ln/invoice.go @@ -0,0 +1,80 @@ +package ln + +import ( + "encoding/json" + "log" + "time" + + "git.sp4ke.com/sp4ke/bit4sat/db" + "github.com/mediocregopher/radix/v3" +) + +type Invoice struct { + Id string `json:"id"` + CreatedAt time.Time `json:"created_at"` + Description string `json:"description"` + ExpiresAt time.Time `json:"expires_at"` + Msatoshi int `json:"msatoshi"` + Payreq string `json:"payreq"` + RHash string `json:"r_hash"` + Status string `json:"status"` + UploadId string `json:"upload_id"` +} + +type InvoiceWatcher struct{} + +func (w *InvoiceWatcher) Run() { + watchedKey := "watch_invoices" + + for { + var invoiceBlob []byte + + popTimeout := "0" //block until ready + err := db.DB.Redis.Do(radix.Cmd(&invoiceBlob, + "BLPOP", + watchedKey, + popTimeout)) + + if err != nil { + log.Println("error in InvoiceWatcher ", err) + continue + } + + invoice := Invoice{} + err = json.Unmarshal(invoiceBlob, &invoice) + if err != nil { + log.Println("error unmarshaling in InvoiceWatcher ", err) + } + + go func() { + + log.Println("Starting goroutine to watch invoice") + time.Sleep(10 * time.Second) + //charge := ln.NewCharge() + + // Blocking function + //rawInvoice, err := charge.PollInvoice(invoice.Id) + //if err != nil { + //return err + //} + + //err = SetUploadStatus(uploadId, UpWaitingStorage) + //if err != nil { + //log.Printf("Error setting upload status", err) + //} + + // Set paid upload invoice id + //err = SetPaidUploadInvoice(uploadId, invoice) + //if err != nil { + //return err + //} + + }() + + time.Sleep(1 * time.Second) + } +} + +func NewInvoiceWatcher() *InvoiceWatcher { + return &InvoiceWatcher{} +} diff --git a/main.go b/main.go index 5a46c6c..b163d92 100644 --- a/main.go +++ b/main.go @@ -2,13 +2,16 @@ package main import ( "git.sp4ke.com/sp4ke/bit4sat/db" + "git.sp4ke.com/sp4ke/bit4sat/ln" ) func main() { defer db.DB.Sql.Close() api := NewAPI() + invoiceWatcher := ln.NewInvoiceWatcher() + go invoiceWatcher.Run() api.Run() } diff --git a/storage/upload_ctrl.go b/storage/upload_ctrl.go index 87db633..03b77ca 100644 --- a/storage/upload_ctrl.go +++ b/storage/upload_ctrl.go @@ -28,7 +28,7 @@ func (ctrl UploadCtrl) New(c *gin.Context) { } // Create unique id - id, err := GetSIDGenerator().Generate() + uploadId, err := GetSIDGenerator().Generate() if err != nil { utils.JSONErrPriv(c, http.StatusInternalServerError, err) return @@ -42,7 +42,7 @@ func (ctrl UploadCtrl) New(c *gin.Context) { for _, file := range uploadForm.Files { up := &Upload{} - up.ID = id + up.ID = uploadId up.FileName, up.FileExt = utils.CleanFileName(file.Name) @@ -65,27 +65,75 @@ func (ctrl UploadCtrl) New(c *gin.Context) { return } - err = CacheSetUploadStatus(id, UpNew) + err = SetUploadStatus(uploadId, UpNew) if err != nil { utils.JSONErrPriv(c, http.StatusInternalServerError, err) return } - log.Println("new upload created") - c.JSON(http.StatusOK, gin.H{ - "status": http.StatusOK, - "result": gin.H{ - "id": id, - }, - }) + // If not free upload generate a an invoice + // TODO: check if upload fee is free + if true { + + // Get a new lightning invoice + charge := ln.NewCharge() + + // New invoice for 10$ + invoice, err := charge.Invoice(100, ln.CurSat, uploadId) + + //info, err := ln.Info() + if err != nil { + utils.JSONErrPriv(c, http.StatusInternalServerError, err) + return + } + + err = SetUploadStatus(uploadId, UpWaitingPayment) + if err != nil { + utils.JSONErrPriv(c, http.StatusInternalServerError, err) + return + } + + err = PushWatchUploadPayment(invoice) + if err != nil { + log.Println("error watching payment ", err) + utils.JSONErrPriv(c, http.StatusInternalServerError, err) + return + } + + result := gin.H{ + "upload_id": uploadId, + "invoice": invoice, + } + + c.JSON(http.StatusOK, gin.H{ + "status": "waiting for payment", + "result": result, + }) + + return + + } else { + + //Else, free upload accepted + + log.Println("new upload created") + c.JSON(http.StatusOK, gin.H{ + "status": "ready for upload", + "result": gin.H{ + "id": uploadId, + }, + }) + + } + } func (ctrl UploadCtrl) Upload(c *gin.Context) { // Check that upload ID exists - id := c.Param("id") + uploadId := c.Param("id") - exists, err := IdExists(id) + exists, err := IdExists(uploadId) if err != nil { utils.JSONErrPriv(c, http.StatusInternalServerError, err) return @@ -140,7 +188,7 @@ func (ctrl UploadCtrl) Upload(c *gin.Context) { sum256 := hex.EncodeToString(hasher.Sum(nil)) // Get the file's metadata from upload table - up, err := GetByHashID(sum256, id) + up, err := GetByHashID(sum256, uploadId) if err != nil { utils.JSONErrPriv(c, http.StatusNotFound, err) db.RollbackTx(tx, nil) @@ -191,31 +239,15 @@ func (ctrl UploadCtrl) Upload(c *gin.Context) { return } - err = CacheSetUploadStatus(id, UpStored) + err = SetUploadStatus(uploadId, UpStored) if err != nil { utils.JSONErrPriv(c, http.StatusInternalServerError, err) return } - // Get a new lightning invoice - charge := ln.NewCharge() - - // New invoice for 10$ - invoice, err := charge.Invoice(float32(0.01), id) - - //info, err := ln.Info() - if err != nil { - utils.JSONErrPriv(c, http.StatusInternalServerError, err) - return - } - - result := gin.H{ - "message": "upload OK. waiting for payment", - "invoice": invoice, - } c.JSON(http.StatusOK, gin.H{ "status": http.StatusOK, - "result": result, + "result": "uploaded", }) } diff --git a/storage/upload_form.go b/storage/upload_form.go index 5ac9a77..31ca6a5 100644 --- a/storage/upload_form.go +++ b/storage/upload_form.go @@ -12,6 +12,13 @@ type File struct { type UploadForm struct { Files []File `form:"files" binding:"required"` + // Ask payment + RequestPayment bool `form:"request_payment"` + + RequestPaymentAmount int `form:"request_payment_amount"` + + PaymentCurrency string `form:"payment_currency"` + // Default Date().toJSON() from js TimeStamp time.Time `form:"timestamp"` } diff --git a/storage/upload_model.go b/storage/upload_model.go index 58251c8..a3a2d44 100644 --- a/storage/upload_model.go +++ b/storage/upload_model.go @@ -2,19 +2,22 @@ package storage import ( "database/sql" + "encoding/json" "errors" "fmt" "log" "git.sp4ke.com/sp4ke/bit4sat/db" + "git.sp4ke.com/sp4ke/bit4sat/ln" "github.com/jmoiron/sqlx" - "github.com/mattn/go-sqlite3" + "github.com/lib/pq" "github.com/mediocregopher/radix/v3" ) var DB = db.DB const ( + //TODO: sync upload status from redis DBUploadSchema = ` CREATE TABLE IF NOT EXISTS upload ( id serial PRIMARY KEY, @@ -24,6 +27,7 @@ const ( file_type varchar(255) DEFAULT '', file_size integer NOT NULL, file_ext varchar(255) DEFAULT '', + fee integer NOT NULL DEFAULT 0, status integer DEFAULT 0 REFERENCES upload_status(type), UNIQUE (upload_id, sha256) ); @@ -38,7 +42,8 @@ AS SELECT upload.file_name, upload.file_type, upload.file_size, - upload.file_ext + upload.file_ext, + upload.fee FROM upload JOIN upload_status ON upload.status = upload_status.type @@ -52,9 +57,9 @@ upload.status = upload_status.type ` QNewUpload = `INSERT INTO upload - (upload_id, sha256, file_name, file_type, file_size, file_ext, status) + (upload_id, sha256, file_name, file_type, file_size, file_ext, status, fee) VALUES - (:upload_id, :sha256, :file_name, :file_type, :file_size, :file_ext, :status)` + (:upload_id, :sha256, :file_name, :file_type, :file_size, :file_ext, :status, :fee)` QSetStatus = `UPDATE upload SET status = :status WHERE upload_id = :upload_id ` @@ -64,21 +69,26 @@ upload.status = upload_status.type file_type, file_size, file_ext, + fee, status FROM upload WHERE sha256 = $1 AND upload_id = $2` ) const ( UpNew = iota - UpStored UpWaitingPayment + UpWaitingStorage + UpStored UpReady ) +// new --> waiting payment (optional) --> waiting storage (client upload) +// --> Sotred --> Ready (sharing url sent to client and acked) var UploadStatus = map[int]string{ UpNew: "new upload", - UpStored: "stored", UpWaitingPayment: "waiting payment", + UpWaitingStorage: "waiting storage", + UpStored: "stored", UpReady: "ready", } @@ -95,14 +105,64 @@ type Upload struct { FileSize int64 `db:"file_size"` FileExt string `db:"file_ext"` Status int `db:"status"` + Fee int `db:"fee"` } -func CacheSetUploadStatus(id string, status int) error { - key := fmt.Sprintf("upload_status_%s", id) +// TODO: sync from redis to db +//func SyncUploadStatusToDB(){ + +//} +func SetUploadStatus(id string, status int) error { + key := fmt.Sprintf("upload_status_%s", id) return DB.Redis.Do(radix.FlatCmd(nil, "SET", key, status)) } +func SetUploadInvoice(uploadId string, invoice *ln.Invoice) error { + key := fmt.Sprintf("upload_%s_invoice", uploadId) + + invoiceJson, err := json.Marshal(invoice) + if err != nil { + return err + } + + return DB.Redis.Do(radix.FlatCmd(nil, "SET", key, invoiceJson)) +} + +func PushWatchUploadPayment(invoice *ln.Invoice) error { + key := "watch_invoices" + + invoiceJson, err := json.Marshal(invoice) + if err != nil { + return err + } + + return DB.Redis.Do(radix.FlatCmd(nil, "RPUSH", key, invoiceJson)) + + //log.Printf("Will watch upload payment for uID %s", uploadId) + + //charge := ln.NewCharge() + //invoice, err := charge.PollInvoice(invoiceId) + //if err != nil { + //return err + //} + + //err = SetUploadStatus(uploadId, UpWaitingStorage) + //if err != nil { + //return err + //} + + //// Set paid upload invoice id + //err = SetPaidUploadInvoice(uploadId, invoice) + //if err != nil { + //return err + //} + + //TODO: Notify client on websocket + + return nil +} + // Returns true if id exists in DB func IdExists(id string) (exists bool, err error) { key := fmt.Sprintf("upload_status_%s", id) @@ -143,10 +203,11 @@ func (u *Upload) TxWrite(tx *sqlx.Tx) error { _, err := tx.NamedExec(QNewUpload, u) - sqlErr, isSqlErr := err.(sqlite3.Error) - - if isSqlErr && sqlErr.Code == sqlite3.ErrConstraint { - return ErrAlreadyExists + if pqError, ok := err.(*pq.Error); ok { + // unique constraint + if pqError.Code == "23505" { + return ErrAlreadyExists + } } if err != nil { @@ -158,9 +219,12 @@ func (u *Upload) TxWrite(tx *sqlx.Tx) error { func (u *Upload) Write() error { _, err := DB.Sql.NamedExec(QNewUpload, u) - sqlErr, isSqlErr := err.(sqlite3.Error) - if isSqlErr && sqlErr.Code == sqlite3.ErrConstraint { - return ErrAlreadyExists + + if pqError, ok := err.(*pq.Error); ok { + // unique constraint + if pqError.Code == "23505" { + return ErrAlreadyExists + } } if err != nil { @@ -193,14 +257,7 @@ func init() { for k, v := range UploadStatus { _, err := DB.Sql.Exec(query, k, v, v) if err != nil { - sqlErr, ok := err.(sqlite3.Error) - if ok && sqlErr.ExtendedCode == sqlite3.ErrConstraintUnique { - log.Panic(err) - } - - if !ok { - log.Panic(err) - } + log.Panic(err) } } diff --git a/web/Caddyfile b/web/Caddyfile index d01da91..78c3c26 100644 --- a/web/Caddyfile +++ b/web/Caddyfile @@ -4,7 +4,7 @@ root dist ext .html header / { - Content-Security-Policy "default-src * 'unsafe-eval' 'unsafe-inline';" + Content-Security-Policy "default-src * 'unsafe-eval' 'unsafe-inline' data:;" } proxy /api localhost:8880 diff --git a/web/src/Websocket.js b/web/src/Websocket.js index f051a92..6a17cbd 100644 --- a/web/src/Websocket.js +++ b/web/src/Websocket.js @@ -1,18 +1,68 @@ import { apiPort } from './api.js'; -let singleton = false +let singleton = false; export class WS { constructor(){ - if (singleton) { + console.log("starting new websocket") + if (singleton && singleton.open) { return singleton; } const endpoint = `ws://localhost:2015/ws/`; - console.log(`endpoint is ${endpoint}`); this.conn = new WebSocket(endpoint); - singleton = this; + this.conn.onerror = this.onerror(); + this.conn.onclose = this.onclose(); + this.conn.onmessage = this.onmessage(); + this.conn.onopen = this.onopen(); + } + + send(data) { + this.conn.send(data); + } + + onopen(){ + let self = this; + return (ev) => { + self.opened = true; + console.log("websocket opened"); + } + } + + onerror (){ + let self = this; + return (ev) => { + console.error(ev); + } + } + + onclose () { + let self = this; + let instance = singleton + return (ev) => { + console.warn("websocket closed"); + //TODO: restore on prod + //setTimeout(() =>{ + //console.log("websocket: trying to reconnect") + //instance = new WS() + //}, 5000) + } + } + + onmessage() { + let self = this; + + return (ev) => { + let msg = JSON.parse(ev.data); + + switch (msg.type) { + case "hello": + console.log(msg.data); + break; + } + } + } } diff --git a/web/src/index.js b/web/src/index.js index e637612..3bbfa53 100644 --- a/web/src/index.js +++ b/web/src/index.js @@ -6,7 +6,6 @@ import GetWorker from './workerInterface.js'; import { WS } from './Websocket.js'; window.ws = new WS() -new WS() window.app = new Vue({ diff --git a/web/src/pay.vue b/web/src/pay.vue index f9bf569..9d8e14e 100644 --- a/web/src/pay.vue +++ b/web/src/pay.vue @@ -17,6 +17,7 @@ const Worker = GetWorker('main'); export default { data() { return { + uploadId: 0, invoice: {}, payreqURI: "", } @@ -25,13 +26,11 @@ export default { let self = this this.worker = Worker; - //let canvas = this.$el.querySelector('#canvas') this.worker.listenTo('upload-invoice', (e) => { - console.log("received invoice ", e.data) self.invoice = e.data.invoice - + self.uploadId = e.data.id }) }, diff --git a/web/src/upload.vue b/web/src/upload.vue index 96da61c..16acacb 100644 --- a/web/src/upload.vue +++ b/web/src/upload.vue @@ -18,9 +18,10 @@ import GetWorker from './workerInterface.js'; const w = GetWorker('main'); -w.listenTo('upload-id', (e) => { - console.log('vue received upload id ', e.data.id) -}) +//w.listenTo('upload-id', (e) => { +// console.log('vue received upload id ', e.data.id) +// console.log('vue received upload id ', e.data.invoice) +//}) export default { diff --git a/web/src/worker.js b/web/src/worker.js index 1c1464c..047630f 100644 --- a/web/src/worker.js +++ b/web/src/worker.js @@ -53,23 +53,29 @@ async function newUpload(files){ await upload.create() .then(data => { // Get the upload id - let { result: { id: id } } = data + let { result: { upload_id: id, invoice: invoice } } = data + + //postMessage({ + //msg: 'upload-invoice', + //id: id, + //invoice: resp.result.invoice + //}) // Notify the UI - postMessage({msg: 'upload-id', id: id}) + postMessage({msg: 'upload-invoice', id: id, invoice: invoice}) // Send the files - upload.send() - .then(resp =>{ - postMessage({ - msg: 'upload-invoice', - id: id, - invoice: resp.result.invoice - }) - }) - .catch(err =>{ - console.error(err) - }) + //upload.send() + //.then(resp =>{ + //postMessage({ + //msg: 'upload-invoice', + //id: id, + //invoice: resp.result.invoice + //}) + //}) + //.catch(err =>{ + //console.error(err) + //}) }) } diff --git a/websocket.go b/websocket.go deleted file mode 100644 index dac7f93..0000000 --- a/websocket.go +++ /dev/null @@ -1,108 +0,0 @@ -package main - -import ( - "bytes" - "log" - "net/http" - "time" - - "github.com/gin-gonic/gin" - "github.com/gorilla/websocket" -) - -const ( - // Time allowed to write message to peer - writeWait = 10 * time.Second - - // Time allowed to read the next pong message from the client. - pongWait = 5 * time.Second - - // Send pings to client with this period. Must be less than pongWait. - pingPeriod = (pongWait * 9) / 10 - //pingPeriod = 5 * time.Second - - // Maximum message size - maxMessageSize = 512 -) - -var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - return true - }, -} - -var ( - conn *websocket.Conn -) - -var ( - newline = []byte{'\n'} - space = []byte{' '} -) - -func reader() { - defer conn.Close() - conn.SetReadLimit(maxMessageSize) - conn.SetReadDeadline(time.Now().Add(pongWait)) - conn.SetPongHandler(func(string) error { - //log.Println("pong") - conn.SetReadDeadline(time.Now().Add(pongWait)) - return nil - }) - - for { - _, message, err := conn.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, - websocket.CloseAbnormalClosure) { - log.Printf("error: %v", err) - } - break - } - message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1)) - } -} - -func writer() { - pingTicker := time.NewTicker(pingPeriod) - defer func() { - log.Println("quit ws server") - - pingTicker.Stop() - conn.Close() - }() - - for { - select { - case <-pingTicker.C: - //log.Println("ping") - - conn.SetWriteDeadline(time.Now().Add(writeWait)) - if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { - log.Println(err) - return - } - - } - } -} - -func serveWebsocket(c *gin.Context) { - var err error - log.Println("websocket request") - - conn, err = upgrader.Upgrade(c.Writer, c.Request, nil) - if err != nil { - if _, ok := err.(websocket.HandshakeError); !ok { - log.Printf("handshake error: %s", err) - } - - log.Println(err) - return - } - - go writer() - reader() -} diff --git a/ws/server.go b/ws/server.go new file mode 100644 index 0000000..874e858 --- /dev/null +++ b/ws/server.go @@ -0,0 +1,142 @@ +package ws + +import ( + "encoding/json" + "log" + "net/http" + "time" + + "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" +) + +const ( + // Time allowed to write message to peer + writeWait = 10 * time.Second + + // Time allowed to read the next pong message from the client. + pongWait = 5 * time.Second + + // Send pings to client with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 + //pingPeriod = 5 * time.Second + + // Maximum message size + maxMessageSize = 512 +) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +var C *Client + +// Interface to talk to websocket +type Client struct { + + // websocket connection + conn *websocket.Conn + + // Buffered channel of outbound messages + send chan []byte +} + +func (c *Client) readPump() { + defer c.conn.Close() + c.conn.SetReadLimit(maxMessageSize) + c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.conn.SetPongHandler(func(string) error { + //log.Println("pong") + c.conn.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) + + for { + _, message, err := c.conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, + websocket.CloseAbnormalClosure) { + log.Printf("error: %v", err) + } + break + } + + recvData := make(map[string]interface{}) + json.Unmarshal(message, &recvData) + log.Println(recvData) + } +} + +func (c *Client) writePump() { + //testTicker := time.NewTicker(5 * time.Second) + pingTicker := time.NewTicker(pingPeriod) + defer func() { + log.Println("quit ws server") + + pingTicker.Stop() + c.conn.Close() + }() + + for { + select { + //case <-testTicker.C: + //c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + + //msg := gin.H{ + //"msg": "hello", + //"data": "from server", + //} + + //w, err := c.conn.NextWriter(websocket.TextMessage) + //if err != nil { + //log.Println(err) + //return + //} + + //enc := json.NewEncoder(w) + //err = enc.Encode(msg) + //if err != nil { + //log.Println(err) + //return + //} + + //if err = w.Close(); err != nil { + //log.Println(err) + //return + //} + + case <-pingTicker.C: + //log.Println("ping") + + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + log.Println(err) + return + } + + } + } +} + +func Serve(c *gin.Context) { + log.Println("websocket request") + + conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + if _, ok := err.(websocket.HandshakeError); !ok { + log.Printf("handshake error: %s", err) + } + + log.Println(err) + return + } + + C = &Client{conn: conn, send: make(chan []byte, 256)} + + go C.writePump() + go C.readPump() +}