mirror of
https://github.com/creekorful/bathyscaphe
synced 2024-11-16 00:12:56 +00:00
Make extractor publish found URLs
This commit is contained in:
parent
560d7cb846
commit
ae5812c566
@ -8,6 +8,7 @@ import (
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/urfave/cli/v2"
|
||||
"mvdan.cc/xurls/v2"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
@ -78,7 +79,7 @@ func handleMessage(apiClient api.Client, apiURI string) natsutil.MsgHandler {
|
||||
log.Debug().Str("url", resMsg.URL).Msg("Processing new resource")
|
||||
|
||||
// Extract & process resource
|
||||
resDto, err := extractResource(resMsg)
|
||||
resDto, urls, err := extractResource(resMsg)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Ersror while extracting resource")
|
||||
return err
|
||||
@ -91,11 +92,25 @@ func handleMessage(apiClient api.Client, apiURI string) natsutil.MsgHandler {
|
||||
return err
|
||||
}
|
||||
|
||||
// Finally push found URLs
|
||||
for _, url := range urls {
|
||||
log.Trace().
|
||||
Str("url", url).
|
||||
Msg("Publishing found URL")
|
||||
|
||||
if err := natsutil.PublishMsg(nc, &messaging.URLFoundMsg{URL: url}); err != nil {
|
||||
log.Warn().
|
||||
Str("url", url).
|
||||
Str("err", err.Error()).
|
||||
Msg("Error while publishing URL")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func extractResource(msg messaging.NewResourceMsg) (api.ResourceDto, error) {
|
||||
func extractResource(msg messaging.NewResourceMsg) (api.ResourceDto, []string, error) {
|
||||
resDto := api.ResourceDto{
|
||||
URL: protocolRegex.ReplaceAllLiteralString(msg.URL, ""),
|
||||
Title: extractTitle(msg.Body),
|
||||
@ -103,7 +118,9 @@ func extractResource(msg messaging.NewResourceMsg) (api.ResourceDto, error) {
|
||||
Time: time.Now(),
|
||||
}
|
||||
|
||||
return resDto, nil
|
||||
// Extract URLs
|
||||
xu := xurls.Strict()
|
||||
return resDto, xu.FindAllString(msg.Body, -1), nil
|
||||
}
|
||||
|
||||
// extract title from html body
|
||||
|
@ -11,7 +11,7 @@ func TestExtractResource(t *testing.T) {
|
||||
Body: "<title>Creekorful Inc</title>This is sparta",
|
||||
}
|
||||
|
||||
resDto, err := extractResource(msg)
|
||||
resDto, _, err := extractResource(msg)
|
||||
if err != nil {
|
||||
t.FailNow()
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ type Subscriber struct {
|
||||
func NewSubscriber(address string) (*Subscriber, error) {
|
||||
nc, err := nats.Connect(address)
|
||||
if err != nil {
|
||||
log.Err(err).Str("server-uri", address).Msg("Error while connecting to NATS server")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -32,7 +31,6 @@ func (qs *Subscriber) QueueSubscribe(subject, queue string, handler MsgHandler)
|
||||
// Create the subscriber
|
||||
sub, err := qs.nc.QueueSubscribeSync(subject, queue)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Error while reading message from NATS server")
|
||||
return err
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user