Replace gorilla with melody for websocket API (#1205)

pull/1216/head
NikkyAI 4 years ago committed by GitHub
parent 88d371c71c
commit 27c02549c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -6,9 +6,10 @@ import (
"sync"
"time"
"gopkg.in/olahol/melody.v1"
"github.com/42wim/matterbridge/bridge"
"github.com/42wim/matterbridge/bridge/config"
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
ring "github.com/zfjagann/golang-ring"
@ -18,6 +19,7 @@ type API struct {
Messages ring.Ring
sync.RWMutex
*bridge.Config
mrouter *melody.Melody
}
type Message struct {
@ -33,6 +35,32 @@ func New(cfg *bridge.Config) bridge.Bridger {
e := echo.New()
e.HideBanner = true
e.HidePort = true
b.mrouter = melody.New()
b.mrouter.HandleMessage(func(s *melody.Session, msg []byte) {
message := config.Message{}
err := json.Unmarshal(msg, &message)
if err != nil {
b.Log.Errorf("failed to decode message from byte[] '%s'", string(msg))
return
}
b.handleWebsocketMessage(message)
})
b.mrouter.HandleConnect(func(session *melody.Session) {
greet := b.getGreeting()
data, err := json.Marshal(greet)
if err != nil {
b.Log.Errorf("failed to encode message '%v'", greet)
return
}
err = session.Write(data)
if err != nil {
b.Log.Errorf("failed to write message '%s'", string(data))
return
}
// TODO: send message history buffer from `b.Messages` here
})
b.Messages = ring.Ring{}
if b.GetInt("Buffer") != 0 {
b.Messages.SetCapacity(b.GetInt("Buffer"))
@ -67,13 +95,13 @@ func New(cfg *bridge.Config) bridge.Bridger {
func (b *API) Connect() error {
return nil
}
func (b *API) Disconnect() error {
return nil
}
func (b *API) JoinChannel(channel config.ChannelInfo) error {
return nil
}
func (b *API) Send(msg config.Message) (string, error) {
@ -83,7 +111,14 @@ func (b *API) Send(msg config.Message) (string, error) {
if msg.Event == config.EventMsgDelete {
return "", nil
}
b.Messages.Enqueue(&msg)
b.Log.Debugf("enqueueing message from %s on ring buffer", msg.Username)
b.Messages.Enqueue(msg)
data, err := json.Marshal(msg)
if err != nil {
b.Log.Errorf("failed to encode message '%s'", msg)
}
_ = b.mrouter.Broadcast(data)
return "", nil
}
@ -131,6 +166,7 @@ func (b *API) handleStream(c echo.Context) error {
}
c.Response().Flush()
for {
// TODO: this causes issues, messages should be broadcasted to all connected clients
msg := b.Messages.Dequeue()
if msg != nil {
if err := json.NewEncoder(c.Response()).Encode(msg); err != nil {
@ -153,40 +189,12 @@ func (b *API) handleWebsocketMessage(message config.Message) {
b.Remote <- message
}
func (b *API) writePump(conn *websocket.Conn) {
for {
msg := b.Messages.Dequeue()
if msg != nil {
err := conn.WriteJSON(msg)
if err != nil {
break
}
}
}
}
func (b *API) readPump(conn *websocket.Conn) {
for {
message := config.Message{}
err := conn.ReadJSON(&message)
if err != nil {
break
}
b.handleWebsocketMessage(message)
}
}
func (b *API) handleWebsocket(c echo.Context) error {
conn, err := websocket.Upgrade(c.Response().Writer, c.Request(), nil, 1024, 1024)
err := b.mrouter.HandleRequest(c.Response(), c.Request())
if err != nil {
b.Log.Errorf("error in websocket handling '%v'", err)
return err
}
greet := b.getGreeting()
_ = conn.WriteJSON(greet)
go b.writePump(conn)
go b.readPump(conn)
return nil
}

@ -50,6 +50,7 @@ require (
golang.org/x/image v0.0.0-20200618115811-c13761719519
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
gomod.garykim.dev/nc-talk v0.0.2
gopkg.in/olahol/melody.v1 v1.0.0-20170518105555-d52139073376
)
go 1.13

@ -928,6 +928,8 @@ gopkg.in/ini.v1 v1.55.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/mail.v2 v2.3.1/go.mod h1:htwXN1Qh09vZJ1NVKxQqHPBaCBbzKhp5GzuJEA4VJWw=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/olahol/melody.v1 v1.0.0-20170518105555-d52139073376 h1:sY2a+y0j4iDrajJcorb+a0hJIQ6uakU5gybjfLWHlXo=
gopkg.in/olahol/melody.v1 v1.0.0-20170518105555-d52139073376/go.mod h1:BHKOc1m5wm8WwQkMqYBoo4vNxhmF7xg8+xhG8L+Cy3M=
gopkg.in/olivere/elastic.v6 v6.2.30/go.mod h1:2cTT8Z+/LcArSWpCgvZqBgt3VOqXiy7v00w12Lz8bd4=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=

@ -1540,6 +1540,8 @@ Buffer=1000
#Bearer token used for authentication
#curl -H "Authorization: Bearer token" http://localhost:4242/api/messages
# https://github.com/vi/websocat
# websocat -H="Authorization: Bearer token" ws://127.0.0.1:4242/api/websocket
#OPTIONAL (no authorization if token is empty)
Token="mytoken"

@ -0,0 +1,5 @@
.DS_Store
benchmark
*.swp
coverage.out
Makefile

@ -0,0 +1,10 @@
language: go
sudo: required
go:
- 1.6
- 1.7
- 1.8
install:
- go get github.com/gorilla/websocket
script:
- go test

@ -0,0 +1,44 @@
## 2017-05-18
* Fix `HandleSentMessageBinary`.
## 2017-04-11
* Allow any origin by default.
* Add `BroadcastMultiple`.
## 2017-04-09
* Add control message support.
* Add `IsClosed` to Session.
## 2017-02-10
* Return errors for some exposed methods.
* Add `HandleRequestWithKeys`.
* Add `HandleSentMessage` and `HandleSentMessageBinary`.
## 2017-01-20
* Add `Len()` to fetch number of connected sessions.
## 2016-12-09
* Add metadata management for sessions.
## 2016-05-09
* Add method `HandlePong` to melody instance.
## 2015-10-07
* Add broadcast methods for binary messages.
## 2015-09-03
* Add `Close` method to melody instance.
### 2015-06-10
* Support for binary messages.
* BroadcastOthers method.

@ -0,0 +1,22 @@
Copyright (c) 2015 Ola Holmström. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,185 @@
# melody
[![Build Status](https://travis-ci.org/olahol/melody.svg)](https://travis-ci.org/olahol/melody)
[![Coverage Status](https://img.shields.io/coveralls/olahol/melody.svg?style=flat)](https://coveralls.io/r/olahol/melody)
[![GoDoc](https://godoc.org/github.com/olahol/melody?status.svg)](https://godoc.org/github.com/olahol/melody)
> :notes: Minimalist websocket framework for Go.
Melody is websocket framework based on [github.com/gorilla/websocket](https://github.com/gorilla/websocket)
that abstracts away the tedious parts of handling websockets. It gets out of
your way so you can write real-time apps. Features include:
* [x] Clear and easy interface similar to `net/http` or Gin.
* [x] A simple way to broadcast to all or selected connected sessions.
* [x] Message buffers making concurrent writing safe.
* [x] Automatic handling of ping/pong and session timeouts.
* [x] Store data on sessions.
## Install
```bash
go get gopkg.in/olahol/melody.v1
```
## [Example: chat](https://github.com/olahol/melody/tree/master/examples/chat)
[![Chat](https://cdn.rawgit.com/olahol/melody/master/examples/chat/demo.gif "Demo")](https://github.com/olahol/melody/tree/master/examples/chat)
Using [Gin](https://github.com/gin-gonic/gin):
```go
package main
import (
"github.com/gin-gonic/gin"
"gopkg.in/olahol/melody.v1"
"net/http"
)
func main() {
r := gin.Default()
m := melody.New()
r.GET("/", func(c *gin.Context) {
http.ServeFile(c.Writer, c.Request, "index.html")
})
r.GET("/ws", func(c *gin.Context) {
m.HandleRequest(c.Writer, c.Request)
})
m.HandleMessage(func(s *melody.Session, msg []byte) {
m.Broadcast(msg)
})
r.Run(":5000")
}
```
Using [Echo](https://github.com/labstack/echo):
```go
package main
import (
"github.com/labstack/echo"
"github.com/labstack/echo/engine/standard"
"github.com/labstack/echo/middleware"
"gopkg.in/olahol/melody.v1"
"net/http"
)
func main() {
e := echo.New()
m := melody.New()
e.Use(middleware.Logger())
e.Use(middleware.Recover())
e.GET("/", func(c echo.Context) error {
http.ServeFile(c.Response().(*standard.Response).ResponseWriter, c.Request().(*standard.Request).Request, "index.html")
return nil
})
e.GET("/ws", func(c echo.Context) error {
m.HandleRequest(c.Response().(*standard.Response).ResponseWriter, c.Request().(*standard.Request).Request)
return nil
})
m.HandleMessage(func(s *melody.Session, msg []byte) {
m.Broadcast(msg)
})
e.Run(standard.New(":5000"))
}
```
## [Example: gophers](https://github.com/olahol/melody/tree/master/examples/gophers)
[![Gophers](https://cdn.rawgit.com/olahol/melody/master/examples/gophers/demo.gif "Demo")](https://github.com/olahol/melody/tree/master/examples/gophers)
```go
package main
import (
"github.com/gin-gonic/gin"
"gopkg.in/olahol/melody.v1"
"net/http"
"strconv"
"strings"
"sync"
)
type GopherInfo struct {
ID, X, Y string
}
func main() {
router := gin.Default()
mrouter := melody.New()
gophers := make(map[*melody.Session]*GopherInfo)
lock := new(sync.Mutex)
counter := 0
router.GET("/", func(c *gin.Context) {
http.ServeFile(c.Writer, c.Request, "index.html")
})
router.GET("/ws", func(c *gin.Context) {
mrouter.HandleRequest(c.Writer, c.Request)
})
mrouter.HandleConnect(func(s *melody.Session) {
lock.Lock()
for _, info := range gophers {
s.Write([]byte("set " + info.ID + " " + info.X + " " + info.Y))
}
gophers[s] = &GopherInfo{strconv.Itoa(counter), "0", "0"}
s.Write([]byte("iam " + gophers[s].ID))
counter += 1
lock.Unlock()
})
mrouter.HandleDisconnect(func(s *melody.Session) {
lock.Lock()
mrouter.BroadcastOthers([]byte("dis "+gophers[s].ID), s)
delete(gophers, s)
lock.Unlock()
})
mrouter.HandleMessage(func(s *melody.Session, msg []byte) {
p := strings.Split(string(msg), " ")
lock.Lock()
info := gophers[s]
if len(p) == 2 {
info.X = p[0]
info.Y = p[1]
mrouter.BroadcastOthers([]byte("set "+info.ID+" "+info.X+" "+info.Y), s)
}
lock.Unlock()
})
router.Run(":5000")
}
```
### [More examples](https://github.com/olahol/melody/tree/master/examples)
## [Documentation](https://godoc.org/github.com/olahol/melody)
## Contributors
* Ola Holmström (@olahol)
* Shogo Iwano (@shiwano)
* Matt Caldwell (@mattcaldwell)
* Heikki Uljas (@huljas)
* Robbie Trencheny (@robbiet480)
* yangjinecho (@yangjinecho)
## FAQ
If you are getting a `403` when trying to connect to your websocket you can [change allow all origin hosts](http://godoc.org/github.com/gorilla/websocket#hdr-Origin_Considerations):
```go
m := melody.New()
m.Upgrader.CheckOrigin = func(r *http.Request) bool { return true }
```

@ -0,0 +1,22 @@
package melody
import "time"
// Config melody configuration struct.
type Config struct {
WriteWait time.Duration // Milliseconds until write times out.
PongWait time.Duration // Timeout for waiting on pong.
PingPeriod time.Duration // Milliseconds between pings.
MaxMessageSize int64 // Maximum size in bytes of a message.
MessageBufferSize int // The max amount of messages that can be in a sessions buffer before it starts dropping them.
}
func newConfig() *Config {
return &Config{
WriteWait: 10 * time.Second,
PongWait: 60 * time.Second,
PingPeriod: (60 * time.Second * 9) / 10,
MaxMessageSize: 512,
MessageBufferSize: 256,
}
}

@ -0,0 +1,22 @@
// Copyright 2015 Ola Holmström. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package melody implements a framework for dealing with WebSockets.
//
// Example
//
// A broadcasting echo server:
//
// func main() {
// r := gin.Default()
// m := melody.New()
// r.GET("/ws", func(c *gin.Context) {
// m.HandleRequest(c.Writer, c.Request)
// })
// m.HandleMessage(func(s *melody.Session, msg []byte) {
// m.Broadcast(msg)
// })
// r.Run(":5000")
// }
package melody

@ -0,0 +1,7 @@
package melody
type envelope struct {
t int
msg []byte
filter filterFunc
}

@ -0,0 +1,80 @@
package melody
import (
"sync"
)
type hub struct {
sessions map[*Session]bool
broadcast chan *envelope
register chan *Session
unregister chan *Session
exit chan *envelope
open bool
rwmutex *sync.RWMutex
}
func newHub() *hub {
return &hub{
sessions: make(map[*Session]bool),
broadcast: make(chan *envelope),
register: make(chan *Session),
unregister: make(chan *Session),
exit: make(chan *envelope),
open: true,
rwmutex: &sync.RWMutex{},
}
}
func (h *hub) run() {
loop:
for {
select {
case s := <-h.register:
h.rwmutex.Lock()
h.sessions[s] = true
h.rwmutex.Unlock()
case s := <-h.unregister:
if _, ok := h.sessions[s]; ok {
h.rwmutex.Lock()
delete(h.sessions, s)
h.rwmutex.Unlock()
}
case m := <-h.broadcast:
h.rwmutex.RLock()
for s := range h.sessions {
if m.filter != nil {
if m.filter(s) {
s.writeMessage(m)
}
} else {
s.writeMessage(m)
}
}
h.rwmutex.RUnlock()
case m := <-h.exit:
h.rwmutex.Lock()
for s := range h.sessions {
s.writeMessage(m)
delete(h.sessions, s)
s.Close()
}
h.open = false
h.rwmutex.Unlock()
break loop
}
}
}
func (h *hub) closed() bool {
h.rwmutex.RLock()
defer h.rwmutex.RUnlock()
return !h.open
}
func (h *hub) len() int {
h.rwmutex.RLock()
defer h.rwmutex.RUnlock()
return len(h.sessions)
}

@ -0,0 +1,313 @@
package melody
import (
"errors"
"net/http"
"sync"
"github.com/gorilla/websocket"
)
// Close codes defined in RFC 6455, section 11.7.
// Duplicate of codes from gorilla/websocket for convenience.
const (
CloseNormalClosure = 1000
CloseGoingAway = 1001
CloseProtocolError = 1002
CloseUnsupportedData = 1003
CloseNoStatusReceived = 1005
CloseAbnormalClosure = 1006
CloseInvalidFramePayloadData = 1007
ClosePolicyViolation = 1008
CloseMessageTooBig = 1009
CloseMandatoryExtension = 1010
CloseInternalServerErr = 1011
CloseServiceRestart = 1012
CloseTryAgainLater = 1013
CloseTLSHandshake = 1015
)
// Duplicate of codes from gorilla/websocket for convenience.
var validReceivedCloseCodes = map[int]bool{
// see http://www.iana.org/assignments/websocket/websocket.xhtml#close-code-number
CloseNormalClosure: true,
CloseGoingAway: true,
CloseProtocolError: true,
CloseUnsupportedData: true,
CloseNoStatusReceived: false,
CloseAbnormalClosure: false,
CloseInvalidFramePayloadData: true,
ClosePolicyViolation: true,
CloseMessageTooBig: true,
CloseMandatoryExtension: true,
CloseInternalServerErr: true,
CloseServiceRestart: true,
CloseTryAgainLater: true,
CloseTLSHandshake: false,
}
type handleMessageFunc func(*Session, []byte)
type handleErrorFunc func(*Session, error)
type handleCloseFunc func(*Session, int, string) error
type handleSessionFunc func(*Session)
type filterFunc func(*Session) bool
// Melody implements a websocket manager.
type Melody struct {
Config *Config
Upgrader *websocket.Upgrader
messageHandler handleMessageFunc
messageHandlerBinary handleMessageFunc
messageSentHandler handleMessageFunc
messageSentHandlerBinary handleMessageFunc
errorHandler handleErrorFunc
closeHandler handleCloseFunc
connectHandler handleSessionFunc
disconnectHandler handleSessionFunc
pongHandler handleSessionFunc
hub *hub
}
// New creates a new melody instance with default Upgrader and Config.
func New() *Melody {
upgrader := &websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
}
hub := newHub()
go hub.run()
return &Melody{
Config: newConfig(),
Upgrader: upgrader,
messageHandler: func(*Session, []byte) {},
messageHandlerBinary: func(*Session, []byte) {},
messageSentHandler: func(*Session, []byte) {},
messageSentHandlerBinary: func(*Session, []byte) {},
errorHandler: func(*Session, error) {},
closeHandler: nil,
connectHandler: func(*Session) {},
disconnectHandler: func(*Session) {},
pongHandler: func(*Session) {},
hub: hub,
}
}
// HandleConnect fires fn when a session connects.
func (m *Melody) HandleConnect(fn func(*Session)) {
m.connectHandler = fn
}
// HandleDisconnect fires fn when a session disconnects.
func (m *Melody) HandleDisconnect(fn func(*Session)) {
m.disconnectHandler = fn
}
// HandlePong fires fn when a pong is received from a session.
func (m *Melody) HandlePong(fn func(*Session)) {
m.pongHandler = fn
}
// HandleMessage fires fn when a text message comes in.
func (m *Melody) HandleMessage(fn func(*Session, []byte)) {
m.messageHandler = fn
}
// HandleMessageBinary fires fn when a binary message comes in.
func (m *Melody) HandleMessageBinary(fn func(*Session, []byte)) {
m.messageHandlerBinary = fn
}
// HandleSentMessage fires fn when a text message is successfully sent.
func (m *Melody) HandleSentMessage(fn func(*Session, []byte)) {
m.messageSentHandler = fn
}
// HandleSentMessageBinary fires fn when a binary message is successfully sent.
func (m *Melody) HandleSentMessageBinary(fn func(*Session, []byte)) {
m.messageSentHandlerBinary = fn
}
// HandleError fires fn when a session has an error.
func (m *Melody) HandleError(fn func(*Session, error)) {
m.errorHandler = fn
}
// HandleClose sets the handler for close messages received from the session.
// The code argument to h is the received close code or CloseNoStatusReceived
// if the close message is empty. The default close handler sends a close frame
// back to the session.
//
// The application must read the connection to process close messages as
// described in the section on Control Frames above.
//
// The connection read methods return a CloseError when a close frame is
// received. Most applications should handle close messages as part of their
// normal error handling. Applications should only set a close handler when the
// application must perform some action before sending a close frame back to
// the session.
func (m *Melody) HandleClose(fn func(*Session, int, string) error) {
if fn != nil {
m.closeHandler = fn
}
}
// HandleRequest upgrades http requests to websocket connections and dispatches them to be handled by the melody instance.
func (m *Melody) HandleRequest(w http.ResponseWriter, r *http.Request) error {
return m.HandleRequestWithKeys(w, r, nil)
}
// HandleRequestWithKeys does the same as HandleRequest but populates session.Keys with keys.
func (m *Melody) HandleRequestWithKeys(w http.ResponseWriter, r *http.Request, keys map[string]interface{}) error {
if m.hub.closed() {
return errors.New("melody instance is closed")
}
conn, err := m.Upgrader.Upgrade(w, r, nil)
if err != nil {
return err
}
session := &Session{
Request: r,
Keys: keys,
conn: conn,
output: make(chan *envelope, m.Config.MessageBufferSize),
melody: m,
open: true,
rwmutex: &sync.RWMutex{},
}
m.hub.register <- session
m.connectHandler(session)
go session.writePump()
session.readPump()
if !m.hub.closed() {
m.hub.unregister <- session
}
session.close()
m.disconnectHandler(session)
return nil
}
// Broadcast broadcasts a text message to all sessions.
func (m *Melody) Broadcast(msg []byte) error {
if m.hub.closed() {
return errors.New("melody instance is closed")
}
message := &envelope{t: websocket.TextMessage, msg: msg}
m.hub.broadcast <- message
return nil
}
// BroadcastFilter broadcasts a text message to all sessions that fn returns true for.
func (m *Melody) BroadcastFilter(msg []byte, fn func(*Session) bool) error {
if m.hub.closed() {
return errors.New("melody instance is closed")
}
message := &envelope{t: websocket.TextMessage, msg: msg, filter: fn}
m.hub.broadcast <- message
return nil
}
// BroadcastOthers broadcasts a text message to all sessions except session s.
func (m *Melody) BroadcastOthers(msg []byte, s *Session) error {
return m.BroadcastFilter(msg, func(q *Session) bool {
return s != q
})
}
// BroadcastMultiple broadcasts a text message to multiple sessions given in the sessions slice.
func (m *Melody) BroadcastMultiple(msg []byte, sessions []*Session) error {
for _, sess := range sessions {
if writeErr := sess.Write(msg); writeErr != nil {
return writeErr
}
}
return nil
}
// BroadcastBinary broadcasts a binary message to all sessions.
func (m *Melody) BroadcastBinary(msg []byte) error {
if m.hub.closed() {
return errors.New("melody instance is closed")
}
message := &envelope{t: websocket.BinaryMessage, msg: msg}
m.hub.broadcast <- message
return nil
}
// BroadcastBinaryFilter broadcasts a binary message to all sessions that fn returns true for.
func (m *Melody) BroadcastBinaryFilter(msg []byte, fn func(*Session) bool) error {
if m.hub.closed() {
return errors.New("melody instance is closed")
}
message := &envelope{t: websocket.BinaryMessage, msg: msg, filter: fn}
m.hub.broadcast <- message
return nil
}
// BroadcastBinaryOthers broadcasts a binary message to all sessions except session s.
func (m *Melody) BroadcastBinaryOthers(msg []byte, s *Session) error {
return m.BroadcastBinaryFilter(msg, func(q *Session) bool {
return s != q
})
}
// Close closes the melody instance and all connected sessions.
func (m *Melody) Close() error {
if m.hub.closed() {
return errors.New("melody instance is already closed")
}
m.hub.exit <- &envelope{t: websocket.CloseMessage, msg: []byte{}}
return nil
}
// CloseWithMsg closes the melody instance with the given close payload and all connected sessions.
// Use the FormatCloseMessage function to format a proper close message payload.
func (m *Melody) CloseWithMsg(msg []byte) error {
if m.hub.closed() {
return errors.New("melody instance is already closed")
}
m.hub.exit <- &envelope{t: websocket.CloseMessage, msg: msg}
return nil
}
// Len return the number of connected sessions.
func (m *Melody) Len() int {
return m.hub.len()
}
// IsClosed returns the status of the melody instance.
func (m *Melody) IsClosed() bool {
return m.hub.closed()
}
// FormatCloseMessage formats closeCode and text as a WebSocket close message.
func FormatCloseMessage(closeCode int, text string) []byte {
return websocket.FormatCloseMessage(closeCode, text)
}

@ -0,0 +1,219 @@
package melody
import (
"errors"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
// Session wrapper around websocket connections.
type Session struct {
Request *http.Request
Keys map[string]interface{}
conn *websocket.Conn
output chan *envelope
melody *Melody
open bool
rwmutex *sync.RWMutex
}
func (s *Session) writeMessage(message *envelope) {
if s.closed() {
s.melody.errorHandler(s, errors.New("tried to write to closed a session"))
return
}
select {
case s.output <- message:
default:
s.melody.errorHandler(s, errors.New("session message buffer is full"))
}
}
func (s *Session) writeRaw(message *envelope) error {
if s.closed() {
return errors.New("tried to write to a closed session")
}
s.conn.SetWriteDeadline(time.Now().Add(s.melody.Config.WriteWait))
err := s.conn.WriteMessage(message.t, message.msg)
if err != nil {
return err
}
return nil
}
func (s *Session) closed() bool {
s.rwmutex.RLock()
defer s.rwmutex.RUnlock()
return !s.open
}
func (s *Session) close() {
if !s.closed() {
s.rwmutex.Lock()
s.open = false
s.conn.Close()
close(s.output)
s.rwmutex.Unlock()
}
}
func (s *Session) ping() {
s.writeRaw(&envelope{t: websocket.PingMessage, msg: []byte{}})
}
func (s *Session) writePump() {
ticker := time.NewTicker(s.melody.Config.PingPeriod)
defer ticker.Stop()
loop:
for {
select {
case msg, ok := <-s.output:
if !ok {
break loop
}
err := s.writeRaw(msg)
if err != nil {
s.melody.errorHandler(s, err)
break loop
}
if msg.t == websocket.CloseMessage {
break loop
}
if msg.t == websocket.TextMessage {
s.melody.messageSentHandler(s, msg.msg)
}
if msg.t == websocket.BinaryMessage {
s.melody.messageSentHandlerBinary(s, msg.msg)
}
case <-ticker.C:
s.ping()
}
}
}
func (s *Session) readPump() {
s.conn.SetReadLimit(s.melody.Config.MaxMessageSize)
s.conn.SetReadDeadline(time.Now().Add(s.melody.Config.PongWait))
s.conn.SetPongHandler(func(string) error {
s.conn.SetReadDeadline(time.Now().Add(s.melody.Config.PongWait))
s.melody.pongHandler(s)
return nil
})
if s.melody.closeHandler != nil {
s.conn.SetCloseHandler(func(code int, text string) error {
return s.melody.closeHandler(s, code, text)
})
}
for {
t, message, err := s.conn.ReadMessage()
if err != nil {
s.melody.errorHandler(s, err)
break
}
if t == websocket.TextMessage {
s.melody.messageHandler(s, message)
}
if t == websocket.BinaryMessage {
s.melody.messageHandlerBinary(s, message)
}
}
}
// Write writes message to session.
func (s *Session) Write(msg []byte) error {
if s.closed() {
return errors.New("session is closed")
}
s.writeMessage(&envelope{t: websocket.TextMessage, msg: msg})
return nil
}
// WriteBinary writes a binary message to session.
func (s *Session) WriteBinary(msg []byte) error {
if s.closed() {
return errors.New("session is closed")
}
s.writeMessage(&envelope{t: websocket.BinaryMessage, msg: msg})
return nil
}
// Close closes session.
func (s *Session) Close() error {
if s.closed() {
return errors.New("session is already closed")
}
s.writeMessage(&envelope{t: websocket.CloseMessage, msg: []byte{}})
return nil
}
// CloseWithMsg closes the session with the provided payload.
// Use the FormatCloseMessage function to format a proper close message payload.
func (s *Session) CloseWithMsg(msg []byte) error {
if s.closed() {
return errors.New("session is already closed")
}
s.writeMessage(&envelope{t: websocket.CloseMessage, msg: msg})
return nil
}
// Set is used to store a new key/value pair exclusivelly for this session.
// It also lazy initializes s.Keys if it was not used previously.
func (s *Session) Set(key string, value interface{}) {
if s.Keys == nil {
s.Keys = make(map[string]interface{})
}
s.Keys[key] = value
}
// Get returns the value for the given key, ie: (value, true).
// If the value does not exists it returns (nil, false)
func (s *Session) Get(key string) (value interface{}, exists bool) {
if s.Keys != nil {
value, exists = s.Keys[key]
}
return
}
// MustGet returns the value for the given key if it exists, otherwise it panics.
func (s *Session) MustGet(key string) interface{} {
if value, exists := s.Get(key); exists {
return value
}
panic("Key \"" + key + "\" does not exist")
}
// IsClosed returns the status of the connection.
func (s *Session) IsClosed() bool {
return s.closed()
}

@ -338,5 +338,7 @@ google.golang.org/protobuf/types/descriptorpb
gopkg.in/ini.v1
# gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/natefinch/lumberjack.v2
# gopkg.in/olahol/melody.v1 v1.0.0-20170518105555-d52139073376
gopkg.in/olahol/melody.v1
# gopkg.in/yaml.v2 v2.2.8
gopkg.in/yaml.v2

Loading…
Cancel
Save