From 0dc70f63f7d8ed9a4793c70c2e5ba2b218193b1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alo=C3=AFs=20Micard?= Date: Wed, 16 Dec 2020 12:40:38 +0100 Subject: [PATCH] Refactor to use RabbitMQ --- deployments/docker/docker-compose.yml | 20 ++++---- go.mod | 6 +-- go.sum | 33 +------------ internal/api/api.go | 4 +- internal/api/service.go | 6 +-- internal/crawler/crawler.go | 12 ++--- internal/extractor/extractor.go | 12 ++--- internal/extractor/extractor_test.go | 8 +-- internal/messaging/message.go | 7 --- internal/messaging/messaging.go | 6 +++ internal/messaging/publisher.go | 26 ++++++---- internal/messaging/subscriber.go | 71 ++++++++++++++++----------- internal/scheduler/scheduler.go | 12 ++--- internal/scheduler/scheduler_test.go | 26 +++++----- internal/util/amqp.go | 12 +++++ internal/util/nats.go | 12 ----- 16 files changed, 132 insertions(+), 141 deletions(-) delete mode 100644 internal/messaging/message.go create mode 100644 internal/util/amqp.go delete mode 100644 internal/util/nats.go diff --git a/deployments/docker/docker-compose.yml b/deployments/docker/docker-compose.yml index 6c5eff0..3484c0f 100644 --- a/deployments/docker/docker-compose.yml +++ b/deployments/docker/docker-compose.yml @@ -1,8 +1,10 @@ version: '3' services: - nats: - image: nats:2.1.9-alpine3.12 + rabbitmq: + image: rabbitmq:3.8.9-management-alpine + ports: + - 15672:15672 torproxy: image: dperson/torproxy:latest elasticsearch: @@ -22,17 +24,17 @@ services: image: creekorful/tdsh-crawler:latest command: > --log-level debug - --nats-uri nats + --event-srv-uri amqp://guest:guest@rabbitmq:5672 --tor-uri torproxy:9050 restart: always depends_on: - - nats + - rabbitmq - torproxy scheduler: image: creekorful/tdsh-scheduler:latest command: > --log-level debug - --nats-uri nats + --event-srv-uri amqp://guest:guest@rabbitmq:5672 --api-uri http://api:8080 --api-token eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6InNjaGVkdWxlciIsInJpZ2h0cyI6eyJHRVQiOlsiL3YxL3Jlc291cmNlcyJdfX0.dBR6KLQp2h2srY-By3zikEznhQplLCtDrvOkcXP6USY --forbidden-extensions png @@ -42,24 +44,24 @@ services: --forbidden-extensions bmp restart: always depends_on: - - nats + - rabbitmq - api extractor: image: creekorful/tdsh-extractor:latest command: > --log-level debug - --nats-uri nats + --event-srv-uri amqp://guest:guest@rabbitmq:5672 --api-uri http://api:8080 --api-token eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImV4dHJhY3RvciIsInJpZ2h0cyI6eyJQT1NUIjpbIi92MS9yZXNvdXJjZXMiXX19.mytGd_9zyK8y_T3fsWAmH8FnaBNr6qWefwCPDOx4in0 restart: always depends_on: - - nats + - rabbitmq - api api: image: creekorful/tdsh-api:latest command: > --log-level debug - --nats-uri nats + --event-srv-uri amqp://guest:guest@rabbitmq:5672 --elasticsearch-uri http://elasticsearch:9200 --signing-key K==M5RsU_DQa4_XSbkX?L27s^xWmde25 restart: always diff --git a/go.mod b/go.mod index 4a7e949..1e0ec40 100644 --- a/go.mod +++ b/go.mod @@ -9,16 +9,14 @@ require ( github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/go-resty/resty/v2 v2.3.0 github.com/golang/mock v1.4.4 - github.com/golang/protobuf v1.4.2 // indirect github.com/labstack/echo/v4 v4.1.16 - github.com/nats-io/nats-server/v2 v2.1.8 // indirect - github.com/nats-io/nats.go v1.10.0 github.com/olekukonko/tablewriter v0.0.4 github.com/olivere/elastic/v7 v7.0.20 github.com/rs/zerolog v1.20.0 + github.com/streadway/amqp v1.0.0 github.com/urfave/cli/v2 v2.2.0 github.com/valyala/fasthttp v1.9.0 github.com/xhit/go-str2duration/v2 v2.0.0 - golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 + golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 // indirect mvdan.cc/xurls/v2 v2.1.0 ) diff --git a/go.sum b/go.sum index e3319bd..d923ee5 100644 --- a/go.sum +++ b/go.sum @@ -30,17 +30,7 @@ github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= -github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= -github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= -github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= -github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= -github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= -github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= -github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= @@ -65,17 +55,6 @@ github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHX github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-runewidth v0.0.7 h1:Ei8KR0497xHyKJPAv59M1dkC+rOZCMBJ+t3fZ+twI54= github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= -github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI= -github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= -github.com/nats-io/nats-server/v2 v2.1.8 h1:d5GoJA6W7vQkmt99Nfdeie3pEFFUEjIwt1YZp50DkIQ= -github.com/nats-io/nats-server/v2 v2.1.8/go.mod h1:rbRrRE/Iv93O/rUvZ9dh4NfT0Cm9HWjW/BqOWLGgYiE= -github.com/nats-io/nats.go v1.10.0 h1:L8qnKaofSfNFbXg0C5F71LdjPRnmQwSsA4ukmkt1TvY= -github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= -github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= -github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA= -github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= -github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= -github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/olekukonko/tablewriter v0.0.4 h1:vHD/YYe1Wolo78koG299f7V/VAS08c6IpCLn+Ejf/w8= github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA= github.com/olivere/elastic/v7 v7.0.20 h1:5FFpGPVJlBSlWBOdict406Y3yNTIpVpAiUvdFZeSbAo= @@ -96,6 +75,8 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM= github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+zMHZXV9/bvak= +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= +github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= @@ -116,7 +97,6 @@ github.com/xhit/go-str2duration/v2 v2.0.0 h1:uFtk6FWB375bP7ewQl+/1wBcn840GPhnySO github.com/xhit/go-str2duration/v2 v2.0.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -148,7 +128,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8= @@ -175,14 +154,6 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= -google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= -google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= -google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= -google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= -google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= -google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= diff --git a/internal/api/api.go b/internal/api/api.go index 25ef969..88ad817 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -22,7 +22,7 @@ func GetApp() *cli.App { Usage: "Trandoshan API component", Flags: []cli.Flag{ logging.GetLogFlag(), - util.GetNATSURIFlag(), + util.GetEventSrvURI(), &cli.StringFlag{ Name: "elasticsearch-uri", Usage: "URI to the Elasticsearch server", @@ -55,7 +55,7 @@ func execute(c *cli.Context) error { log.Info().Str("ver", c.App.Version). Str("elasticsearch-uri", c.String("elasticsearch-uri")). - Str("nats-uri", c.String("nats-uri")). + Str("event-srv-uri", c.String("event-srv-uri")). Msg("Starting tdsh-api") signingKey := []byte(c.String("signing-key")) diff --git a/internal/api/service.go b/internal/api/service.go index 7e0d827..a900d01 100644 --- a/internal/api/service.go +++ b/internal/api/service.go @@ -21,10 +21,10 @@ type svc struct { } func newService(c *cli.Context) (service, error) { - // Connect to the NATS server - pub, err := messaging.NewPublisher(c.String("nats-uri")) + // Connect to the messaging server + pub, err := messaging.NewPublisher(c.String("event-srv-uri")) if err != nil { - log.Err(err).Str("uri", c.String("nats-uri")).Msg("Error while connecting to NATS server") + log.Err(err).Str("uri", c.String("event-srv-uri")).Msg("Error while connecting to event server") return nil, err } diff --git a/internal/crawler/crawler.go b/internal/crawler/crawler.go index ee8926b..5581130 100644 --- a/internal/crawler/crawler.go +++ b/internal/crawler/crawler.go @@ -6,11 +6,11 @@ import ( "github.com/creekorful/trandoshan/internal/logging" "github.com/creekorful/trandoshan/internal/messaging" "github.com/creekorful/trandoshan/internal/util" - "github.com/nats-io/nats.go" "github.com/rs/zerolog/log" "github.com/urfave/cli/v2" "github.com/valyala/fasthttp" "github.com/valyala/fasthttp/fasthttpproxy" + "io" "strings" "time" ) @@ -25,7 +25,7 @@ func GetApp() *cli.App { Usage: "Trandoshan crawler component", Flags: []cli.Flag{ logging.GetLogFlag(), - util.GetNATSURIFlag(), + util.GetEventSrvURI(), &cli.StringFlag{ Name: "tor-uri", Usage: "URI to the TOR SOCKS proxy", @@ -51,7 +51,7 @@ func execute(ctx *cli.Context) error { log.Info(). Str("ver", ctx.App.Version). - Str("nats-uri", ctx.String("nats-uri")). + Str("event-srv-uri", ctx.String("event-srv-uri")). Str("tor-uri", ctx.String("tor-uri")). Strs("allowed-content-types", ctx.StringSlice("allowed-ct")). Msg("Starting tdsh-crawler") @@ -67,8 +67,8 @@ func execute(ctx *cli.Context) error { Name: ctx.String("user-agent"), } - // Create the NATS subscriber - sub, err := messaging.NewSubscriber(ctx.String("nats-uri")) + // Create the subscriber + sub, err := messaging.NewSubscriber(ctx.String("event-srv-uri")) if err != nil { return err } @@ -85,7 +85,7 @@ func execute(ctx *cli.Context) error { } func handleMessage(httpClient *fasthttp.Client, allowedContentTypes []string) messaging.MsgHandler { - return func(sub messaging.Subscriber, msg *nats.Msg) error { + return func(sub messaging.Subscriber, msg io.Reader) error { var urlMsg messaging.URLTodoMsg if err := sub.ReadMsg(msg, &urlMsg); err != nil { return err diff --git a/internal/extractor/extractor.go b/internal/extractor/extractor.go index 1bb1da9..8acb8d4 100644 --- a/internal/extractor/extractor.go +++ b/internal/extractor/extractor.go @@ -8,9 +8,9 @@ import ( "github.com/creekorful/trandoshan/internal/logging" "github.com/creekorful/trandoshan/internal/messaging" "github.com/creekorful/trandoshan/internal/util" - "github.com/nats-io/nats.go" "github.com/rs/zerolog/log" "github.com/urfave/cli/v2" + "io" "mvdan.cc/xurls/v2" "regexp" "strings" @@ -29,7 +29,7 @@ func GetApp() *cli.App { Usage: "Trandoshan extractor component", Flags: []cli.Flag{ logging.GetLogFlag(), - util.GetNATSURIFlag(), + util.GetEventSrvURI(), util.GetAPIURIFlag(), util.GetAPITokenFlag(), }, @@ -42,14 +42,14 @@ func execute(ctx *cli.Context) error { log.Info(). Str("ver", ctx.App.Version). - Str("nats-uri", ctx.String("nats-uri")). + Str("event-srv-uri", ctx.String("event-srv-uri")). Str("api-uri", ctx.String("api-uri")). Msg("Starting tdsh-extractor") apiClient := util.GetAPIClient(ctx) - // Create the NATS subscriber - sub, err := messaging.NewSubscriber(ctx.String("nats-uri")) + // Create the event subscriber + sub, err := messaging.NewSubscriber(ctx.String("event-srv-uri")) if err != nil { return err } @@ -66,7 +66,7 @@ func execute(ctx *cli.Context) error { } func handleMessage(apiClient api.Client) messaging.MsgHandler { - return func(sub messaging.Subscriber, msg *nats.Msg) error { + return func(sub messaging.Subscriber, msg io.Reader) error { var resMsg messaging.NewResourceMsg if err := sub.ReadMsg(msg, &resMsg); err != nil { return err diff --git a/internal/extractor/extractor_test.go b/internal/extractor/extractor_test.go index 605f022..185d95b 100644 --- a/internal/extractor/extractor_test.go +++ b/internal/extractor/extractor_test.go @@ -1,12 +1,12 @@ package extractor import ( + "bytes" "github.com/creekorful/trandoshan/api" "github.com/creekorful/trandoshan/api_mock" "github.com/creekorful/trandoshan/internal/messaging" "github.com/creekorful/trandoshan/internal/messaging_mock" "github.com/golang/mock/gomock" - "github.com/nats-io/nats.go" "testing" ) @@ -90,9 +90,9 @@ This is sparta apiClientMock := api_mock.NewMockClient(mockCtrl) subscriberMock := messaging_mock.NewMockSubscriber(mockCtrl) - msg := nats.Msg{} + msg := bytes.NewReader(nil) subscriberMock.EXPECT(). - ReadMsg(&msg, &messaging.NewResourceMsg{}). + ReadMsg(msg, &messaging.NewResourceMsg{}). SetArg(1, messaging.NewResourceMsg{URL: "https://example.onion", Body: body}). Return(nil) @@ -113,7 +113,7 @@ This is sparta PublishMsg(&messaging.URLFoundMsg{URL: "https://google.com/test?test=test"}). Return(nil) - if err := handleMessage(apiClientMock)(subscriberMock, &msg); err != nil { + if err := handleMessage(apiClientMock)(subscriberMock, msg); err != nil { t.FailNow() } } diff --git a/internal/messaging/message.go b/internal/messaging/message.go deleted file mode 100644 index b57b1d9..0000000 --- a/internal/messaging/message.go +++ /dev/null @@ -1,7 +0,0 @@ -package messaging - -// Msg represent a message send-able trough NATS -type Msg interface { - // Subject returns the subject where message should be push - Subject() string -} diff --git a/internal/messaging/messaging.go b/internal/messaging/messaging.go index ef402c8..7501846 100644 --- a/internal/messaging/messaging.go +++ b/internal/messaging/messaging.go @@ -11,6 +11,12 @@ const ( NewResourceSubject = "resource.new" ) +// Msg represent a message send-able trough queuing +type Msg interface { + // Subject returns the subject where message should be push + Subject() string +} + // URLTodoMsg represent an URL to crawl type URLTodoMsg struct { URL string `json:"url"` diff --git a/internal/messaging/publisher.go b/internal/messaging/publisher.go index cef3e26..ecaf93a 100644 --- a/internal/messaging/publisher.go +++ b/internal/messaging/publisher.go @@ -3,7 +3,7 @@ package messaging import ( "encoding/json" "fmt" - "github.com/nats-io/nats.go" + "github.com/streadway/amqp" ) // Publisher is something that push msg to an event queue @@ -13,34 +13,42 @@ type Publisher interface { } type publisher struct { - nc *nats.Conn + rc *amqp.Channel } // NewPublisher create a new Publisher instance -func NewPublisher(natsURI string) (Publisher, error) { - nc, err := nats.Connect(natsURI) +func NewPublisher(amqpURI string) (Publisher, error) { + conn, err := amqp.Dial(amqpURI) + if err != nil { + return nil, err + } + + c, err := conn.Channel() if err != nil { return nil, err } return &publisher{ - nc: nc, + rc: c, }, nil } func (p *publisher) PublishMsg(msg Msg) error { - return publishJSON(p.nc, msg.Subject(), msg) + return publishJSON(p.rc, msg.Subject(), msg) } func (p *publisher) Close() { - p.nc.Close() + _ = p.rc.Close() } -func publishJSON(nc *nats.Conn, subject string, msg interface{}) error { +func publishJSON(rc *amqp.Channel, subject string, msg interface{}) error { msgBytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("error while encoding message: %s", err) } - return nc.Publish(subject, msgBytes) + return rc.Publish("", subject, false, false, amqp.Publishing{ + ContentType: "application/json", + Body: msgBytes, + }) } diff --git a/internal/messaging/subscriber.go b/internal/messaging/subscriber.go index 483fed4..ec727ea 100644 --- a/internal/messaging/subscriber.go +++ b/internal/messaging/subscriber.go @@ -1,79 +1,92 @@ package messaging import ( - "context" + "bytes" "encoding/json" "fmt" - "github.com/nats-io/nats.go" "github.com/rs/zerolog/log" + "github.com/streadway/amqp" + "io" ) -// MsgHandler represent an handler for a NATS subscriber -type MsgHandler func(s Subscriber, msg *nats.Msg) error +// MsgHandler represent an handler for a subscriber +type MsgHandler func(s Subscriber, body io.Reader) error // Subscriber is something that read msg from an event queue type Subscriber interface { Publisher - ReadMsg(natsMsg *nats.Msg, msg Msg) error + ReadMsg(body io.Reader, msg Msg) error QueueSubscribe(subject, queue string, handler MsgHandler) error Close() } -// Subscriber represent a NATS subscriber +// Subscriber represent a subscriber type subscriber struct { - nc *nats.Conn + rc *amqp.Channel } -// NewSubscriber create a new subscriber and connect it to given NATS server -func NewSubscriber(address string) (Subscriber, error) { - nc, err := nats.Connect(address) +// NewSubscriber create a new subscriber and connect it to given server +func NewSubscriber(amqpURI string) (Subscriber, error) { + conn, err := amqp.Dial(amqpURI) if err != nil { return nil, err } + c, err := conn.Channel() + if err != nil { + return nil, err + } + if err := c.Qos(1, 0, false); err != nil { + return nil, err + } + return &subscriber{ - nc: nc, + rc: c, }, nil } -func (s *subscriber) ReadMsg(natsMsg *nats.Msg, msg Msg) error { - return readJSON(natsMsg, msg) +func (s *subscriber) ReadMsg(body io.Reader, msg Msg) error { + return readJSON(body, msg) } func (s *subscriber) QueueSubscribe(subject, queue string, handler MsgHandler) error { - // Create the subscriber - sub, err := s.nc.QueueSubscribeSync(subject, queue) + q, err := s.rc.QueueDeclare(subject, true, false, false, false, nil) if err != nil { - return err + return fmt.Errorf("error while declaring queue: %s", err) } - for { - // Read incoming message - msg, err := sub.NextMsgWithContext(context.Background()) - if err != nil { - log.Warn().Str("err", err.Error()).Msg("error while reading incoming message, skipping it") - continue - } + deliveries, err := s.rc.Consume(q.Name, "", false, false, false, false, nil) + if err != nil { + return fmt.Errorf("error while consuming queue: %s", err) + } - // ... And process it - if err := handler(s, msg); err != nil { + for d := range deliveries { + if err := handler(s, bytes.NewReader(d.Body)); err != nil { log.Err(err).Msg("error while processing message") continue } + + // Ack no matter what since we doesn't care about failing messages + if err := d.Ack(false); err != nil { + log.Err(err).Msg("error while ack`ing message") + continue + } } + + return nil } func (s *subscriber) PublishMsg(msg Msg) error { - return publishJSON(s.nc, msg.Subject(), msg) + return publishJSON(s.rc, msg.Subject(), msg) } func (s *subscriber) Close() { - s.nc.Close() + _ = s.rc.Close() } -func readJSON(msg *nats.Msg, body interface{}) error { - if err := json.Unmarshal(msg.Data, body); err != nil { +func readJSON(body io.Reader, where interface{}) error { + if err := json.NewDecoder(body).Decode(where); err != nil { return fmt.Errorf("error while decoding message: %s", err) } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index b6ae038..d0503ef 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -6,10 +6,10 @@ import ( "github.com/creekorful/trandoshan/internal/logging" "github.com/creekorful/trandoshan/internal/messaging" "github.com/creekorful/trandoshan/internal/util" - "github.com/nats-io/nats.go" "github.com/rs/zerolog/log" "github.com/urfave/cli/v2" "github.com/xhit/go-str2duration/v2" + "io" "net/url" "strings" "time" @@ -23,7 +23,7 @@ func GetApp() *cli.App { Usage: "Trandoshan scheduler component", Flags: []cli.Flag{ logging.GetLogFlag(), - util.GetNATSURIFlag(), + util.GetEventSrvURI(), util.GetAPIURIFlag(), util.GetAPITokenFlag(), &cli.StringFlag{ @@ -46,7 +46,7 @@ func execute(ctx *cli.Context) error { log.Info(). Str("ver", ctx.App.Version). - Str("nats-uri", ctx.String("nats-uri")). + Str("event-srv-uri", ctx.String("event-srv-uri")). Str("api-uri", ctx.String("api-uri")). Strs("forbidden-exts", ctx.StringSlice("forbidden-extensions")). Dur("refresh-delay", refreshDelay). @@ -55,8 +55,8 @@ func execute(ctx *cli.Context) error { // Create the API client apiClient := util.GetAPIClient(ctx) - // Create the NATS subscriber - sub, err := messaging.NewSubscriber(ctx.String("nats-uri")) + // Create the subscriber + sub, err := messaging.NewSubscriber(ctx.String("event-srv-uri")) if err != nil { return err } @@ -73,7 +73,7 @@ func execute(ctx *cli.Context) error { } func handleMessage(apiClient api.Client, refreshDelay time.Duration, forbiddenExtensions []string) messaging.MsgHandler { - return func(sub messaging.Subscriber, msg *nats.Msg) error { + return func(sub messaging.Subscriber, msg io.Reader) error { var urlMsg messaging.URLFoundMsg if err := sub.ReadMsg(msg, &urlMsg); err != nil { return err diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 341e67b..333741a 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -1,12 +1,12 @@ package scheduler import ( + "bytes" "github.com/creekorful/trandoshan/api" "github.com/creekorful/trandoshan/api_mock" "github.com/creekorful/trandoshan/internal/messaging" "github.com/creekorful/trandoshan/internal/messaging_mock" "github.com/golang/mock/gomock" - "github.com/nats-io/nats.go" "testing" "time" ) @@ -36,13 +36,13 @@ func TestHandleMessageNotOnion(t *testing.T) { apiClientMock := api_mock.NewMockClient(mockCtrl) subscriberMock := messaging_mock.NewMockSubscriber(mockCtrl) - msg := nats.Msg{} + msg := bytes.NewReader(nil) subscriberMock.EXPECT(). - ReadMsg(&msg, &messaging.URLFoundMsg{}). + ReadMsg(msg, &messaging.URLFoundMsg{}). SetArg(1, messaging.URLFoundMsg{URL: "https://example.org"}). Return(nil) - if err := handleMessage(apiClientMock, -1, []string{})(subscriberMock, &msg); err != nil { + if err := handleMessage(apiClientMock, -1, []string{})(subscriberMock, msg); err != nil { t.FailNow() } } @@ -54,9 +54,9 @@ func TestHandleMessageAlreadyCrawled(t *testing.T) { apiClientMock := api_mock.NewMockClient(mockCtrl) subscriberMock := messaging_mock.NewMockSubscriber(mockCtrl) - msg := nats.Msg{} + msg := bytes.NewReader(nil) subscriberMock.EXPECT(). - ReadMsg(&msg, &messaging.URLFoundMsg{}). + ReadMsg(msg, &messaging.URLFoundMsg{}). SetArg(1, messaging.URLFoundMsg{URL: "https://example.onion"}). Return(nil) @@ -64,7 +64,7 @@ func TestHandleMessageAlreadyCrawled(t *testing.T) { SearchResources("https://example.onion", "", time.Time{}, time.Time{}, 1, 1). Return([]api.ResourceDto{}, int64(1), nil) - if err := handleMessage(apiClientMock, -1, []string{"png"})(subscriberMock, &msg); err != nil { + if err := handleMessage(apiClientMock, -1, []string{"png"})(subscriberMock, msg); err != nil { t.FailNow() } } @@ -76,13 +76,13 @@ func TestHandleMessageForbiddenExtensions(t *testing.T) { apiClientMock := api_mock.NewMockClient(mockCtrl) subscriberMock := messaging_mock.NewMockSubscriber(mockCtrl) - msg := nats.Msg{} + msg := bytes.NewReader(nil) subscriberMock.EXPECT(). - ReadMsg(&msg, &messaging.URLFoundMsg{}). + ReadMsg(msg, &messaging.URLFoundMsg{}). SetArg(1, messaging.URLFoundMsg{URL: "https://example.onion/image.png?id=12&test=2"}). Return(nil) - if err := handleMessage(apiClientMock, -1, []string{"png"})(subscriberMock, &msg); err != nil { + if err := handleMessage(apiClientMock, -1, []string{"png"})(subscriberMock, msg); err != nil { t.FailNow() } } @@ -94,9 +94,9 @@ func TestHandleMessage(t *testing.T) { apiClientMock := api_mock.NewMockClient(mockCtrl) subscriberMock := messaging_mock.NewMockSubscriber(mockCtrl) - msg := nats.Msg{} + msg := bytes.NewReader(nil) subscriberMock.EXPECT(). - ReadMsg(&msg, &messaging.URLFoundMsg{}). + ReadMsg(msg, &messaging.URLFoundMsg{}). SetArg(1, messaging.URLFoundMsg{URL: "https://example.onion"}). Return(nil) @@ -108,7 +108,7 @@ func TestHandleMessage(t *testing.T) { PublishMsg(&messaging.URLTodoMsg{URL: "https://example.onion"}). Return(nil) - if err := handleMessage(apiClientMock, -1, []string{})(subscriberMock, &msg); err != nil { + if err := handleMessage(apiClientMock, -1, []string{})(subscriberMock, msg); err != nil { t.FailNow() } } diff --git a/internal/util/amqp.go b/internal/util/amqp.go new file mode 100644 index 0000000..8507f1d --- /dev/null +++ b/internal/util/amqp.go @@ -0,0 +1,12 @@ +package util + +import "github.com/urfave/cli/v2" + +// GetEventSrvURI return the URI of the event server +func GetEventSrvURI() *cli.StringFlag { + return &cli.StringFlag{ + Name: "event-srv-uri", + Usage: "URI to the event server", + Required: true, + } +} diff --git a/internal/util/nats.go b/internal/util/nats.go deleted file mode 100644 index 678fb38..0000000 --- a/internal/util/nats.go +++ /dev/null @@ -1,12 +0,0 @@ -package util - -import "github.com/urfave/cli/v2" - -// GetNATSURIFlag return the nats uri from cli flag -func GetNATSURIFlag() *cli.StringFlag { - return &cli.StringFlag{ - Name: "nats-uri", - Usage: "URI to the NATS server", - Required: true, - } -}