diff --git a/internal/database/sync.go b/internal/database/sync.go index bb9aced..728ad3d 100644 --- a/internal/database/sync.go +++ b/internal/database/sync.go @@ -224,24 +224,44 @@ func (src *DB) SyncTo(dst *DB) { var syncQueue = make(chan interface{}) // Sync all databases to disk in a goroutine using a debouncer -//TODO: add `force` param to force sync func cacheSyncScheduler(input <-chan interface{}) { log.Debug("starting cache sync scheduler") + queue := make(chan interface{}, 100) + + + intervalOpt, err := config.GetModOpt("database", "sync-interval") + if err != nil { + log.Fatal(err) + } + + intervalInt, ok := intervalOpt.(int) + + if !ok { + log.Fatalf("sync-interval is not an int") + } else { + log.Debugf("db sync-interval: %d seconds", intervalInt) + } + + interval := time.Duration(intervalInt) * time.Second + + + // TODO: set as global options // debounce interval - queue := make(chan<- interface{}, 100) - interval := 4 * time.Second timer := time.NewTimer(0) for { select { case <-input: - log.Debug("debouncing sync to disk") + // log.Debug("debouncing sync to disk") timer.Reset(interval) + // log.Debugf("sync que len is %d", len(queue)) select { case queue <- true: - // Writing to queue would not block + // Writing to queue will not block + // log.Debug("pushed sync to disk to queue") default: - log.Critical("cache sync queue is full, something is wrong") + log.Debug("queue is full, dropping sync to disk request") + continue } case <-timer.C: if len(queue) > 0 { @@ -252,12 +272,17 @@ func cacheSyncScheduler(input <-chan interface{}) { if err := Cache.DB.SyncToDisk(GetDBFullPath()); err != nil { log.Fatalf("failed to sync cache to disk: %s", err) } - queue = make(chan<- interface{}) + + // empty the queue + for len(queue) > 0 { + <-queue + } } } } } +//TODO: add `force` param to force sync func ScheduleSyncToDisk() { go func() { log.Debug("received sync to disk request") @@ -371,6 +396,8 @@ func (dst *DB) SyncFromDisk(dbpath string) error { func (src *DB) CopyTo(dst *DB) { log.Debugf("Copying <%s> to <%s>", src.Name, dst.Name) + mu.Lock() + defer mu.Unlock() srcDB, err := sqlx.Open(DriverBackupMode, src.Path) defer func() { @@ -405,3 +432,27 @@ func (src *DB) CopyTo(dst *DB) { bk.Finish() } + +func (src *DB) SyncToCache() error { + if Cache.DB == nil { + return fmt.Errorf("cache db is nil") + } + + empty, err := Cache.DB.IsEmpty() + + + //TODO!: if the error is table is not "non existant table" return the error + // otherwise move on and check if error is table does not exist + sql3err, isSQL3Err := err.(sqlite3.Error) + if err != nil && sql3err.Code != sqlite3.ErrError { + return fmt.Errorf("error checking if cache is empty: %w", err) + } + if empty || ( isSQL3Err && sql3err.Code == sqlite3.ErrError) { + log.Debugf("cache is empty, copying <%s> to <%s>", src.Name, CacheName) + src.CopyTo(Cache.DB) + } else { + log.Debugf("syncing <%s> to cache", src.Name) + src.SyncTo(Cache.DB) + } + return nil +}