db: improved sync scheduler and sync utils
This commit is contained in:
parent
eadb3aef86
commit
e2f65af6c9
@ -224,24 +224,44 @@ func (src *DB) SyncTo(dst *DB) {
|
||||
var syncQueue = make(chan interface{})
|
||||
|
||||
// Sync all databases to disk in a goroutine using a debouncer
|
||||
//TODO: add `force` param to force sync
|
||||
func cacheSyncScheduler(input <-chan interface{}) {
|
||||
log.Debug("starting cache sync scheduler")
|
||||
|
||||
queue := make(chan interface{}, 100)
|
||||
|
||||
|
||||
intervalOpt, err := config.GetModOpt("database", "sync-interval")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
intervalInt, ok := intervalOpt.(int)
|
||||
|
||||
if !ok {
|
||||
log.Fatalf("sync-interval is not an int")
|
||||
} else {
|
||||
log.Debugf("db sync-interval: %d seconds", intervalInt)
|
||||
}
|
||||
|
||||
interval := time.Duration(intervalInt) * time.Second
|
||||
|
||||
|
||||
// TODO: set as global options
|
||||
// debounce interval
|
||||
queue := make(chan<- interface{}, 100)
|
||||
interval := 4 * time.Second
|
||||
timer := time.NewTimer(0)
|
||||
for {
|
||||
select {
|
||||
case <-input:
|
||||
log.Debug("debouncing sync to disk")
|
||||
// log.Debug("debouncing sync to disk")
|
||||
timer.Reset(interval)
|
||||
// log.Debugf("sync que len is %d", len(queue))
|
||||
select {
|
||||
case queue <- true:
|
||||
// Writing to queue would not block
|
||||
// Writing to queue will not block
|
||||
// log.Debug("pushed sync to disk to queue")
|
||||
default:
|
||||
log.Critical("cache sync queue is full, something is wrong")
|
||||
log.Debug("queue is full, dropping sync to disk request")
|
||||
continue
|
||||
}
|
||||
case <-timer.C:
|
||||
if len(queue) > 0 {
|
||||
@ -252,12 +272,17 @@ func cacheSyncScheduler(input <-chan interface{}) {
|
||||
if err := Cache.DB.SyncToDisk(GetDBFullPath()); err != nil {
|
||||
log.Fatalf("failed to sync cache to disk: %s", err)
|
||||
}
|
||||
queue = make(chan<- interface{})
|
||||
|
||||
// empty the queue
|
||||
for len(queue) > 0 {
|
||||
<-queue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//TODO: add `force` param to force sync
|
||||
func ScheduleSyncToDisk() {
|
||||
go func() {
|
||||
log.Debug("received sync to disk request")
|
||||
@ -371,6 +396,8 @@ func (dst *DB) SyncFromDisk(dbpath string) error {
|
||||
func (src *DB) CopyTo(dst *DB) {
|
||||
|
||||
log.Debugf("Copying <%s> to <%s>", src.Name, dst.Name)
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
srcDB, err := sqlx.Open(DriverBackupMode, src.Path)
|
||||
defer func() {
|
||||
@ -405,3 +432,27 @@ func (src *DB) CopyTo(dst *DB) {
|
||||
|
||||
bk.Finish()
|
||||
}
|
||||
|
||||
func (src *DB) SyncToCache() error {
|
||||
if Cache.DB == nil {
|
||||
return fmt.Errorf("cache db is nil")
|
||||
}
|
||||
|
||||
empty, err := Cache.DB.IsEmpty()
|
||||
|
||||
|
||||
//TODO!: if the error is table is not "non existant table" return the error
|
||||
// otherwise move on and check if error is table does not exist
|
||||
sql3err, isSQL3Err := err.(sqlite3.Error)
|
||||
if err != nil && sql3err.Code != sqlite3.ErrError {
|
||||
return fmt.Errorf("error checking if cache is empty: %w", err)
|
||||
}
|
||||
if empty || ( isSQL3Err && sql3err.Code == sqlite3.ErrError) {
|
||||
log.Debugf("cache is empty, copying <%s> to <%s>", src.Name, CacheName)
|
||||
src.CopyTo(Cache.DB)
|
||||
} else {
|
||||
log.Debugf("syncing <%s> to cache", src.Name)
|
||||
src.SyncTo(Cache.DB)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user