From 54ff87a130b92fd6230ab470cd5bb46ba8b663cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alo=C3=AFs=20Micard?= Date: Thu, 24 Dec 2020 00:30:07 +0100 Subject: [PATCH] event: add PublishJson method --- internal/api/service/service.go | 2 +- internal/api/service/service_test.go | 2 +- internal/crawler/crawler.go | 2 +- internal/crawler/crawler_test.go | 2 +- internal/event/publisher.go | 25 +++++++++++++------------ internal/event/subscriber.go | 17 +++++++++++++++-- internal/extractor/extractor.go | 2 +- internal/extractor/extractor_test.go | 4 ++-- internal/scheduler/scheduler.go | 2 +- internal/scheduler/scheduler_test.go | 2 +- 10 files changed, 37 insertions(+), 23 deletions(-) diff --git a/internal/api/service/service.go b/internal/api/service/service.go index ca51bd5..cba97c6 100644 --- a/internal/api/service/service.go +++ b/internal/api/service/service.go @@ -121,7 +121,7 @@ func (s *Service) AddResource(res api.ResourceDto) (api.ResourceDto, error) { // ScheduleURL schedule given url for crawling func (s *Service) ScheduleURL(url string) error { // Publish the URL - if err := s.pub.Publish(&event.FoundURLEvent{URL: url}); err != nil { + if err := s.pub.PublishEvent(&event.FoundURLEvent{URL: url}); err != nil { log.Err(err).Msg("Unable to publish URL") return err } diff --git a/internal/api/service/service_test.go b/internal/api/service/service_test.go index 2296e51..5aaa476 100644 --- a/internal/api/service/service_test.go +++ b/internal/api/service/service_test.go @@ -177,7 +177,7 @@ func TestScheduleURL(t *testing.T) { s := Service{pub: pubMock} - pubMock.EXPECT().Publish(&event.FoundURLEvent{URL: "https://example.onion"}) + pubMock.EXPECT().PublishEvent(&event.FoundURLEvent{URL: "https://example.onion"}) if err := s.ScheduleURL("https://example.onion"); err != nil { t.FailNow() diff --git a/internal/crawler/crawler.go b/internal/crawler/crawler.go index d84b634..d3c7a74 100644 --- a/internal/crawler/crawler.go +++ b/internal/crawler/crawler.go @@ -130,7 +130,7 @@ func (state *state) handleNewURLEvent(subscriber event.Subscriber, body io.Reade Time: state.clock.Now(), } - if err := subscriber.Publish(&res); err != nil { + if err := subscriber.PublishEvent(&res); err != nil { return err } diff --git a/internal/crawler/crawler_test.go b/internal/crawler/crawler_test.go index fb12930..947f728 100644 --- a/internal/crawler/crawler_test.go +++ b/internal/crawler/crawler_test.go @@ -112,7 +112,7 @@ func TestHandleNewURLEvent(t *testing.T) { tn := time.Now() clockMock.EXPECT().Now().Return(tn) - subscriberMock.EXPECT().Publish(&event.NewResourceEvent{ + subscriberMock.EXPECT().PublishEvent(&event.NewResourceEvent{ URL: "https://example.onion/image.png?id=12&test=2", Body: "Hello", Headers: map[string]string{"Content-Type": "text/plain", "Server": "Debian"}, diff --git a/internal/event/publisher.go b/internal/event/publisher.go index 46e0ca0..945c799 100644 --- a/internal/event/publisher.go +++ b/internal/event/publisher.go @@ -8,7 +8,8 @@ import ( // Publisher is something that push an event type Publisher interface { - Publish(event Event) error + PublishEvent(event Event) error + PublishJSON(exchange string, event []byte) error Close() error } @@ -33,23 +34,23 @@ func NewPublisher(amqpURI string) (Publisher, error) { }, nil } -func (p *publisher) Publish(event Event) error { - return publishJSON(p.channel, event.Exchange(), event) -} - -func (p *publisher) Close() error { - return p.channel.Close() -} - -func publishJSON(rc *amqp.Channel, exchange string, event interface{}) error { +func (p *publisher) PublishEvent(event Event) error { evtBytes, err := json.Marshal(event) if err != nil { return fmt.Errorf("error while encoding event: %s", err) } - return rc.Publish(exchange, "", false, false, amqp.Publishing{ + return p.PublishJSON(event.Exchange(), evtBytes) +} + +func (p *publisher) PublishJSON(exchange string, event []byte) error { + return p.channel.Publish(exchange, "", false, false, amqp.Publishing{ ContentType: "application/json", - Body: evtBytes, + Body: event, DeliveryMode: amqp.Persistent, }) } + +func (p *publisher) Close() error { + return p.channel.Close() +} diff --git a/internal/event/subscriber.go b/internal/event/subscriber.go index 85b6f35..97d1e55 100644 --- a/internal/event/subscriber.go +++ b/internal/event/subscriber.go @@ -45,8 +45,21 @@ func NewSubscriber(amqpURI string) (Subscriber, error) { }, nil } -func (s *subscriber) Publish(event Event) error { - return publishJSON(s.channel, event.Exchange(), event) +func (s *subscriber) PublishEvent(event Event) error { + evtBytes, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("error while encoding event: %s", err) + } + + return s.PublishJSON(event.Exchange(), evtBytes) +} + +func (p *subscriber) PublishJSON(exchange string, event []byte) error { + return p.channel.Publish(exchange, "", false, false, amqp.Publishing{ + ContentType: "application/json", + Body: event, + DeliveryMode: amqp.Persistent, + }) } func (s *subscriber) Close() error { diff --git a/internal/extractor/extractor.go b/internal/extractor/extractor.go index fc197ad..d317613 100644 --- a/internal/extractor/extractor.go +++ b/internal/extractor/extractor.go @@ -118,7 +118,7 @@ func (state *state) handleNewResourceEvent(subscriber event.Subscriber, body io. Str("url", url). Msg("Publishing found URL") - if err := subscriber.Publish(&event.FoundURLEvent{URL: url}); err != nil { + if err := subscriber.PublishEvent(&event.FoundURLEvent{URL: url}); err != nil { log.Warn(). Str("url", url). Str("err", err.Error()). diff --git a/internal/extractor/extractor_test.go b/internal/extractor/extractor_test.go index c8cdd67..41d54ea 100644 --- a/internal/extractor/extractor_test.go +++ b/internal/extractor/extractor_test.go @@ -121,10 +121,10 @@ This is sparta (hosted on https://example.org) // should be called only one time subscriberMock.EXPECT(). - Publish(&event.FoundURLEvent{URL: "https://example.org"}). + PublishEvent(&event.FoundURLEvent{URL: "https://example.org"}). Return(nil) subscriberMock.EXPECT(). - Publish(&event.FoundURLEvent{URL: "https://google.com/test?test=test"}). + PublishEvent(&event.FoundURLEvent{URL: "https://google.com/test?test=test"}). Return(nil) s := state{apiClient: apiClientMock} diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 290afcd..fb2601b 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -172,7 +172,7 @@ func (state *state) handleURLFoundEvent(subscriber event.Subscriber, body io.Rea // No matches: schedule! log.Debug().Stringer("url", u).Msg("URL should be scheduled") - if err := subscriber.Publish(&event.NewURLEvent{URL: evt.URL}); err != nil { + if err := subscriber.PublishEvent(&event.NewURLEvent{URL: evt.URL}); err != nil { return fmt.Errorf("error while publishing URL: %s", err) } diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index a59bbba..1e8d7be 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -205,7 +205,7 @@ func TestHandleMessage(t *testing.T) { Return([]api.ResourceDto{}, int64(0), nil) subscriberMock.EXPECT(). - Publish(&event.NewURLEvent{URL: "https://example.onion"}). + PublishEvent(&event.NewURLEvent{URL: "https://example.onion"}). Return(nil) configClientMock.EXPECT().GetForbiddenMimeTypes().Return([]client.ForbiddenMimeType{}, nil)