module interval worker
This commit is contained in:
parent
a3e1aae827
commit
8db7234694
@ -117,15 +117,16 @@ func runBrowserModule(m *manager.Manager,
|
||||
unitName = fmt.Sprintf("%s(%s)", unitName, profileName)
|
||||
}
|
||||
|
||||
|
||||
//BUG: last worker is the only instance that is run
|
||||
worker := watch.Worker(runner)
|
||||
worker := watch.WatchWork{
|
||||
WatchRunner: runner,
|
||||
}
|
||||
|
||||
m.AddUnit(worker, unitName)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
func startDaemon(c *cli.Context) error {
|
||||
defer utils.CleanFiles()
|
||||
manager := manager.NewManager()
|
||||
@ -138,6 +139,27 @@ func startDaemon(c *cli.Context) error {
|
||||
// Initialize sqlite database available in global `cacheDB` variable
|
||||
db.Init()
|
||||
|
||||
// Handle normal modules
|
||||
mods := modules.GetModules()
|
||||
for _, mod := range mods {
|
||||
name := mod.ModInfo().ID
|
||||
log.Debugf("starting <%s>", name)
|
||||
modInstance := mod.ModInfo().New()
|
||||
|
||||
ir, ok := modInstance.(watch.IntervalFetcher)
|
||||
if !ok {
|
||||
log.Criticalf("module <%s> does not implement watch.IntervalRunner", name)
|
||||
continue
|
||||
}
|
||||
|
||||
worker := watch.IntervalWork{
|
||||
Name: string(name),
|
||||
IntervalFetcher: ir,
|
||||
}
|
||||
manager.AddUnit(worker, string(name))
|
||||
}
|
||||
|
||||
// start all registered browser modules
|
||||
registeredBrowsers := modules.GetBrowserModules()
|
||||
|
||||
// instanciate all browsers
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/blob42/gosuki"
|
||||
"github.com/blob42/gosuki/internal/database"
|
||||
"github.com/blob42/gosuki/internal/logging"
|
||||
"github.com/blob42/gosuki/pkg/manager"
|
||||
|
||||
@ -38,6 +39,17 @@ type WatchRunner interface {
|
||||
Runner
|
||||
}
|
||||
|
||||
type IntervalFetcher interface {
|
||||
Fetcher
|
||||
Interval() time.Duration
|
||||
}
|
||||
|
||||
// Fetcher is an interface for modules that fetches data from some source
|
||||
// and produces a list of bookmarks.
|
||||
type Fetcher interface {
|
||||
Fetch() ([]*gosuki.Bookmark, error)
|
||||
}
|
||||
|
||||
// If the browser needs the watcher to be reset for each new event
|
||||
type ResetWatcher interface {
|
||||
ResetWatcher() error // resets a new watcher
|
||||
@ -129,18 +141,14 @@ type Watch struct {
|
||||
}
|
||||
|
||||
// Implement work unit for watchers
|
||||
type WatcherWork struct {
|
||||
wr WatchRunner
|
||||
type WatchWork struct {
|
||||
WatchRunner
|
||||
}
|
||||
|
||||
func Worker(wr WatchRunner) WatcherWork {
|
||||
return WatcherWork{wr}
|
||||
}
|
||||
|
||||
func (w WatcherWork) Run(m manager.UnitManager) {
|
||||
watcher := w.wr.Watch()
|
||||
func (w WatchWork) Run(m manager.UnitManager) {
|
||||
watcher := w.Watch()
|
||||
if ! watcher.isWatching {
|
||||
go WatchLoop(w.wr)
|
||||
go WatchLoop(w.WatchRunner)
|
||||
watcher.isWatching = true
|
||||
|
||||
for _, watch := range watcher.Watches{
|
||||
@ -150,7 +158,7 @@ func (w WatcherWork) Run(m manager.UnitManager) {
|
||||
|
||||
// wait for stop signal
|
||||
<-m.ShouldStop()
|
||||
sht, ok := w.wr.(Shutdowner)
|
||||
sht, ok := w.WatchRunner.(Shutdowner)
|
||||
if ok {
|
||||
if err := sht.Shutdown(); err != nil {
|
||||
m.Panic(err)
|
||||
@ -159,6 +167,60 @@ func (w WatcherWork) Run(m manager.UnitManager) {
|
||||
m.Done()
|
||||
}
|
||||
|
||||
// Implement work unit for interval runners
|
||||
type IntervalWork struct {
|
||||
Name string
|
||||
IntervalFetcher
|
||||
}
|
||||
|
||||
func (iw IntervalWork) Run(m manager.UnitManager) {
|
||||
go IntervalLoop(iw.IntervalFetcher, iw.Name)
|
||||
// wait for stop signal
|
||||
<-m.ShouldStop()
|
||||
m.Done()
|
||||
}
|
||||
|
||||
// Main thread for fetching bookmarks at regular intervals
|
||||
// One goroutine spawned per module
|
||||
func IntervalLoop(ir IntervalFetcher, modName string) {
|
||||
var err error
|
||||
var buffer *database.DB
|
||||
|
||||
// prepare buffer for module
|
||||
buffer, err = database.NewBuffer(modName)
|
||||
if err != nil {
|
||||
log.Criticalf("could not create buffer for <%s>: %s", modName, err)
|
||||
return
|
||||
}
|
||||
defer buffer.Close()
|
||||
|
||||
beat := time.NewTicker(ir.Interval()).C
|
||||
for range beat {
|
||||
marks, err := ir.Fetch()
|
||||
if err != nil {
|
||||
log.Errorf("error fetching bookmarks: %s", err)
|
||||
}
|
||||
|
||||
if len(marks) == 0 {
|
||||
log.Warningf("no bookmarks fetched")
|
||||
continue
|
||||
}
|
||||
|
||||
for _, mark := range marks {
|
||||
log.Debugf("Fetched bookmark: %s", mark.URL)
|
||||
buffer.UpsertBookmark(mark)
|
||||
}
|
||||
buffer.PrintBookmarks()
|
||||
err = buffer.SyncToCache()
|
||||
if err != nil {
|
||||
log.Errorf("error syncing buffer to cache: %s", err)
|
||||
continue
|
||||
}
|
||||
database.ScheduleSyncToDisk()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Main thread for watching file changes
|
||||
func WatchLoop(w WatchRunner) {
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user