Wip firefox watch process
This commit is contained in:
parent
95791ade34
commit
17e370be39
53
browsers.go
53
browsers.go
@ -58,7 +58,7 @@ type IBrowser interface {
|
||||
//
|
||||
// `BufferDB`: sqlite buffer used across jobs
|
||||
type BaseBrowser struct {
|
||||
watcher *fsnotify.Watcher
|
||||
watcher *Watcher
|
||||
eventsChan chan fsnotify.Event
|
||||
baseDir string
|
||||
bkFile string
|
||||
@ -84,9 +84,10 @@ type BaseBrowser struct {
|
||||
parseHooks []ParseHook
|
||||
}
|
||||
|
||||
func (bw *BaseBrowser) GetFileWatcher() *fsnotify.Watcher {
|
||||
fsnotifyWatcherType := reflect.TypeOf((*fsnotify.Watcher)(nil)).Elem()
|
||||
if reflect.TypeOf(bw.watcher) == reflect.PtrTo(fsnotifyWatcherType) {
|
||||
func (bw *BaseBrowser) GetWatcher() *Watcher {
|
||||
watcherType := reflect.TypeOf((*Watcher)(nil)).Elem()
|
||||
// In case we use other types of watchers/events
|
||||
if reflect.TypeOf(bw.watcher) == reflect.PtrTo(watcherType) {
|
||||
return bw.watcher
|
||||
}
|
||||
return nil
|
||||
@ -101,8 +102,9 @@ func (bw *BaseBrowser) Load() {
|
||||
log.Criticalf("<%s> Loading bookmarks while cache not yet initialized !", bw.name)
|
||||
}
|
||||
|
||||
// In case we use other types of watchers/events
|
||||
if bw.useFileWatcher && bw.watcher == nil {
|
||||
log.Warningf("<%s> watcher not initialized, use SetupWatcher() when creating the browser !", bw.name)
|
||||
log.Warningf("<%s> watcher not initialized, use SetupFileWatcher() when creating the browser !", bw.name)
|
||||
}
|
||||
|
||||
log.Debugf("<%s> preloading bookmarks", bw.name)
|
||||
@ -120,32 +122,51 @@ func (bw *BaseBrowser) GetDir() string {
|
||||
return bw.baseDir
|
||||
}
|
||||
|
||||
func (bw *BaseBrowser) SetupFileWatcher() {
|
||||
// Setup file watcher using the provided []Watch elements
|
||||
func (bw *BaseBrowser) SetupFileWatcher(watches ...*Watch) {
|
||||
var err error
|
||||
|
||||
if !bw.useFileWatcher {
|
||||
return
|
||||
}
|
||||
var err error
|
||||
bw.watcher, err = fsnotify.NewWatcher()
|
||||
|
||||
fswatcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
log.Critical(err)
|
||||
}
|
||||
err = bw.watcher.Add(bw.GetDir())
|
||||
|
||||
watchedMap := make(map[string]*Watch)
|
||||
for _, v := range watches {
|
||||
watchedMap[v.path] = v
|
||||
}
|
||||
|
||||
bw.watcher = &Watcher{
|
||||
w: fswatcher,
|
||||
watched: watchedMap,
|
||||
watches: watches,
|
||||
}
|
||||
|
||||
// Add all watched paths
|
||||
for _, v := range watches {
|
||||
|
||||
err = bw.watcher.w.Add(v.path)
|
||||
if err != nil {
|
||||
log.Critical(err)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (bw *BaseBrowser) ResetWatcher() {
|
||||
err := bw.watcher.Close()
|
||||
err := bw.watcher.w.Close()
|
||||
if err != nil {
|
||||
log.Critical(err)
|
||||
}
|
||||
bw.SetupFileWatcher()
|
||||
bw.SetupFileWatcher(bw.watcher.watches...)
|
||||
}
|
||||
|
||||
func (bw *BaseBrowser) Close() error {
|
||||
err := bw.watcher.Close()
|
||||
err := bw.watcher.w.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -197,6 +218,14 @@ func (b *BaseBrowser) ResetStats() {
|
||||
b.Stats.currentUrlCount = 0
|
||||
}
|
||||
|
||||
func (b *BaseBrowser) HasReducer() bool {
|
||||
return b.eventsChan != nil
|
||||
}
|
||||
|
||||
func (b *BaseBrowser) Name() string {
|
||||
return b.name
|
||||
}
|
||||
|
||||
// Runs browsed defined hooks on bookmark
|
||||
func (b *BaseBrowser) RunParseHooks(node *Node) {
|
||||
for _, hook := range b.parseHooks {
|
||||
|
19
chrome.go
19
chrome.go
@ -7,11 +7,12 @@ import (
|
||||
|
||||
"github.com/OneOfOne/xxhash"
|
||||
"github.com/buger/jsonparser"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
)
|
||||
|
||||
var ChromeData = BrowserPaths{
|
||||
"Bookmarks",
|
||||
"/home/spike/.config/google-chrome-unstable/Default/",
|
||||
BookmarkFile: "Bookmarks",
|
||||
BookmarkDir: "/home/spike/.config/google-chrome-unstable/Default/",
|
||||
}
|
||||
|
||||
type ChromeBrowser struct {
|
||||
@ -75,19 +76,27 @@ func (rawNode *RawNode) getNode() *Node {
|
||||
}
|
||||
|
||||
func NewChromeBrowser() IBrowser {
|
||||
browser := &ChromeBrowser{}
|
||||
browser := new(ChromeBrowser)
|
||||
browser.name = "chrome"
|
||||
browser.bType = TChrome
|
||||
browser.baseDir = ChromeData.BookmarkDir
|
||||
browser.bkFile = ChromeData.BookmarkFile
|
||||
browser.Stats = &ParserStats{}
|
||||
browser.Stats = new(ParserStats)
|
||||
browser.NodeTree = &Node{Name: "root", Parent: nil, Type: "root"}
|
||||
browser.useFileWatcher = true
|
||||
|
||||
// Across jobs buffer
|
||||
browser.InitBuffer()
|
||||
|
||||
browser.SetupFileWatcher()
|
||||
// Create watch objects, we will watch the basedir for create events
|
||||
watchedEvents := []fsnotify.Op{fsnotify.Create}
|
||||
w := &Watch{
|
||||
path: browser.baseDir,
|
||||
eventTypes: watchedEvents,
|
||||
eventNames: []string{path.Join(browser.baseDir, browser.bkFile)},
|
||||
resetWatch: true,
|
||||
}
|
||||
browser.SetupFileWatcher(w)
|
||||
|
||||
return browser
|
||||
}
|
||||
|
53
firefox.go
53
firefox.go
@ -1,22 +1,22 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
sqlite3 "github.com/mattn/go-sqlite3"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
)
|
||||
|
||||
var Firefox = BrowserPaths{
|
||||
"places.sqlite",
|
||||
"/home/spike/.mozilla/firefox/p1rrgord.default/",
|
||||
BookmarkFile: "places.sqlite",
|
||||
BookmarkDir: "/home/spike/.mozilla/firefox/p1rrgord.default/",
|
||||
}
|
||||
|
||||
const (
|
||||
MozPlacesRootID = 1
|
||||
MozPlacesTagsRootID = 4
|
||||
MozPlacesMobileRootID = 6
|
||||
MozMinJobInterval = 2 * time.Second
|
||||
)
|
||||
|
||||
type FFBrowser struct {
|
||||
@ -40,47 +40,38 @@ func NewFFBrowser() IBrowser {
|
||||
browser.bType = TFirefox
|
||||
browser.baseDir = Firefox.BookmarkDir
|
||||
browser.bkFile = Firefox.BookmarkFile
|
||||
browser.useFileWatcher = false
|
||||
browser.useFileWatcher = true
|
||||
browser.Stats = &ParserStats{}
|
||||
browser.NodeTree = &Node{Name: "root", Parent: nil, Type: "root"}
|
||||
browser.urlMap = make(map[string]*Node)
|
||||
|
||||
// sqlite update hook
|
||||
sql.Register(DBUpdateMode,
|
||||
&sqlite3.SQLiteDriver{
|
||||
ConnectHook: func(conn *sqlite3.SQLiteConn) error {
|
||||
log.Warningf("registered connect hook <%s>", DBUpdateMode)
|
||||
conn.RegisterUpdateHook(
|
||||
func(op int, db string, table string, rowid int64) {
|
||||
switch op {
|
||||
case sqlite3.SQLITE_UPDATE:
|
||||
log.Warning("Notified of insert on db", db, "table", table, "rowid", rowid)
|
||||
}
|
||||
|
||||
// Update hook here
|
||||
log.Warningf("notified op %s", op)
|
||||
|
||||
})
|
||||
return nil
|
||||
},
|
||||
})
|
||||
|
||||
// Initialize `places.sqlite`
|
||||
bookmarkPath := path.Join(browser.baseDir, browser.bkFile)
|
||||
browser.places = DB{}.New("Places", bookmarkPath)
|
||||
browser.places.engineMode = DBUpdateMode
|
||||
browser.places.InitRO()
|
||||
|
||||
// Buffer that lives accross Run() jobs
|
||||
browser.InitBuffer()
|
||||
|
||||
// Setup watcher
|
||||
|
||||
w := &Watch{
|
||||
path: path.Join(browser.baseDir),
|
||||
eventTypes: []fsnotify.Op{fsnotify.Write},
|
||||
eventNames: []string{path.Join(browser.baseDir, "places.sqlite-wal")},
|
||||
resetWatch: false,
|
||||
}
|
||||
|
||||
browser.SetupFileWatcher(w)
|
||||
|
||||
/*
|
||||
*Run debouncer to avoid duplicate running of jobs
|
||||
*Run reducer to avoid duplicate running of jobs
|
||||
*when a batch of events is received
|
||||
*/
|
||||
|
||||
//browser.eventsChan = make(chan fsnotify.Event, EventsChanLen)
|
||||
//go debouncer(3000*time.Millisecond, browser.eventsChan, browser)
|
||||
browser.eventsChan = make(chan fsnotify.Event, EventsChanLen)
|
||||
|
||||
go reducer(MozMinJobInterval, browser.eventsChan, browser)
|
||||
|
||||
return browser
|
||||
}
|
||||
@ -105,11 +96,12 @@ func (bw *FFBrowser) Watch() bool {
|
||||
log.Debugf("<%s> TODO ... ", bw.name)
|
||||
|
||||
if !bw.isWatching {
|
||||
go WatcherThread(bw)
|
||||
bw.isWatching = true
|
||||
log.Infof("<%s> Watching %s", bw.name, bw.GetPath())
|
||||
return true
|
||||
}
|
||||
|
||||
//return false
|
||||
return false
|
||||
}
|
||||
|
||||
@ -248,5 +240,6 @@ func getFFBookmarks(bw *FFBrowser) {
|
||||
}
|
||||
|
||||
func (bw *FFBrowser) Run() {
|
||||
log.Debugf("<%s>", bw.name)
|
||||
|
||||
}
|
||||
|
47
utils.go
47
utils.go
@ -7,37 +7,38 @@ import (
|
||||
)
|
||||
|
||||
// TODO
|
||||
// Run debounce in it's own thread when the watcher is started
|
||||
// Run reducer in it's own thread when the watcher is started
|
||||
// It receives a struct{event, func} and runs the func only once in the interval
|
||||
func debouncer(interval time.Duration, input chan fsnotify.Event, w IWatchable) {
|
||||
log.Debug("Running debouncer")
|
||||
var event fsnotify.Event
|
||||
var isResting bool
|
||||
timer := time.NewTimer(interval)
|
||||
func reducer(interval time.Duration, input chan fsnotify.Event, w IWatchable) {
|
||||
var waiting bool
|
||||
|
||||
log.Debug("Running reducer")
|
||||
|
||||
ticker := time.NewTicker(interval)
|
||||
|
||||
for {
|
||||
select {
|
||||
case event = <-input:
|
||||
log.Debugf("received an event %v on the events channel", event.Op)
|
||||
case <-input:
|
||||
log.Debugf("received event, len(chan): %d ", len(input))
|
||||
|
||||
if !isResting {
|
||||
if !waiting {
|
||||
waiting = true
|
||||
// Run the job
|
||||
//log.Debug("Not resting, running job")
|
||||
time.AfterFunc(1*time.Second, func() {
|
||||
log.Debug("Not resting")
|
||||
w.Run()
|
||||
})
|
||||
//log.Debug("Restting timer")
|
||||
timer.Reset(interval)
|
||||
//log.Debug("Is resting now")
|
||||
isResting = true
|
||||
}
|
||||
//else {
|
||||
//log.Debug("Resting, will not run job")
|
||||
//}
|
||||
|
||||
case <-timer.C:
|
||||
//log.Debugf("timer done, not resting")
|
||||
isResting = false
|
||||
//ticker = time.NewTicker(interval)
|
||||
} else { // Ignore this event
|
||||
log.Debug("resting")
|
||||
break
|
||||
}
|
||||
|
||||
case <-ticker.C:
|
||||
//log.Debug("tick")
|
||||
ticker = time.NewTicker(interval)
|
||||
if waiting {
|
||||
waiting = false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
78
watcher.go
78
watcher.go
@ -7,29 +7,48 @@ import (
|
||||
// Used as input to WatcherThread
|
||||
// It does not have to be a browser as long is the interface is implemented
|
||||
type IWatchable interface {
|
||||
SetupFileWatcher() // Starts watching bookmarks and runs Load on change
|
||||
Name() string // Name of the watchable
|
||||
HasReducer() bool // Does the watchable has a reducer
|
||||
SetupFileWatcher(...*Watch) // Starts watching bookmarks and runs Load on change
|
||||
Watch() bool // starts watching linked watcher
|
||||
Run() // Callback fired on event
|
||||
GetFileWatcher() *fsnotify.Watcher // returns linked watcher
|
||||
GetWatcher() *Watcher // returns linked watcher
|
||||
ResetWatcher() // resets a new watcher
|
||||
GetPath() string // returns watched path
|
||||
GetDir() string // returns watched dir
|
||||
EventsChan() chan fsnotify.Event
|
||||
}
|
||||
|
||||
// Wrapper around fsnotify watcher
|
||||
type Watcher struct {
|
||||
w *fsnotify.Watcher // underlying fsnotify watcher
|
||||
watched map[string]*Watch // watched paths
|
||||
watches []*Watch // helper var
|
||||
}
|
||||
|
||||
// Details about the object being watched
|
||||
type Watch struct {
|
||||
path string // Path to watch for events
|
||||
eventTypes []fsnotify.Op // events to watch for
|
||||
eventNames []string // event names to watch for (file/dir names)
|
||||
resetWatch bool // Reset the watcher when the event happens (useful for create events)
|
||||
}
|
||||
|
||||
// Main thread for watching file changes
|
||||
func WatcherThread(w IWatchable) {
|
||||
|
||||
bookmarkPath := w.GetPath()
|
||||
log.Infof("watching %s", bookmarkPath)
|
||||
|
||||
log.Infof("<%s> Started watcher", w.Name())
|
||||
for {
|
||||
// Keep watcher here as it is reset from within
|
||||
// the select block
|
||||
watcher := w.GetFileWatcher()
|
||||
watcher := w.GetWatcher()
|
||||
resetWatch := false
|
||||
|
||||
select {
|
||||
case event := <-watcher.Events:
|
||||
case event := <-watcher.w.Events:
|
||||
|
||||
// Very verbose
|
||||
log.Debugf("event: %v | eventName: %v", event.Op, event.Name)
|
||||
|
||||
// On Chrome like browsers the bookmarks file is created
|
||||
// at every change.
|
||||
@ -42,28 +61,49 @@ func WatcherThread(w IWatchable) {
|
||||
* the newly created watcher to catch events even after rename/create
|
||||
*/
|
||||
|
||||
if event.Op&fsnotify.Create == fsnotify.Create &&
|
||||
event.Name == bookmarkPath {
|
||||
for _, watched := range watcher.watches {
|
||||
for _, watchedEv := range watched.eventTypes {
|
||||
for _, watchedName := range watched.eventNames {
|
||||
|
||||
w.Run()
|
||||
log.Debugf("event: %v | eventName: %v", event.Op, event.Name)
|
||||
if event.Op&watchedEv == watchedEv &&
|
||||
event.Name == watchedName {
|
||||
|
||||
// For watchers who need a reducer
|
||||
// to avoid spammy events
|
||||
if w.HasReducer() {
|
||||
ch := w.EventsChan()
|
||||
ch <- event
|
||||
} else {
|
||||
//w.Run()
|
||||
}
|
||||
|
||||
//log.Warning("event: %v | eventName: %v", event.Op, event.Name)
|
||||
|
||||
if watched.resetWatch {
|
||||
log.Debugf("resetting watchers")
|
||||
w.ResetWatcher()
|
||||
resetWatch = true // needed to break out of big loop
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if resetWatch {
|
||||
break
|
||||
}
|
||||
|
||||
// Firefox keeps the file open and makes changes on it
|
||||
// It needs a debouncer
|
||||
if event.Name == bookmarkPath {
|
||||
log.Debugf("event: %v | eventName: %v", event.Op, event.Name)
|
||||
//go debounce(1000*time.Millisecond, spammyEventsChannel, w)
|
||||
ch := w.EventsChan()
|
||||
ch <- event
|
||||
//w.Run()
|
||||
}
|
||||
case err := <-watcher.Errors:
|
||||
//if event.Name == bookmarkPath {
|
||||
//log.Debugf("event: %v | eventName: %v", event.Op, event.Name)
|
||||
////go debounce(1000*time.Millisecond, spammyEventsChannel, w)
|
||||
//ch := w.EventsChan()
|
||||
//ch <- event
|
||||
////w.Run()
|
||||
//}
|
||||
case err := <-watcher.w.Errors:
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user