2022-10-27 22:33:20 +00:00
|
|
|
package watch
|
2018-11-09 17:25:50 +00:00
|
|
|
|
2022-10-27 22:33:20 +00:00
|
|
|
import "time"
|
2018-11-09 17:25:50 +00:00
|
|
|
|
2018-11-13 18:57:07 +00:00
|
|
|
// Run reducer in its own thread when the watcher is started
|
2018-11-09 17:25:50 +00:00
|
|
|
// It receives a struct{event, func} and runs the func only once in the interval
|
2018-11-30 03:15:08 +00:00
|
|
|
func ReduceEvents(interval time.Duration,
|
2023-09-15 01:17:12 +00:00
|
|
|
w WatchRunner) {
|
|
|
|
watch := w.Watch()
|
|
|
|
log.Debugf("starting reducer service for %s", watch.ID)
|
2018-11-09 17:25:50 +00:00
|
|
|
|
2023-02-18 22:08:12 +00:00
|
|
|
eventsIn := w.Watch().eventsChan
|
2018-11-09 17:25:50 +00:00
|
|
|
timer := time.NewTimer(interval)
|
2023-09-15 01:17:12 +00:00
|
|
|
beat := time.NewTicker(1 * time.Second).C
|
2018-11-09 17:25:50 +00:00
|
|
|
var events []bool
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
2022-10-27 22:33:20 +00:00
|
|
|
case <-eventsIn:
|
2022-09-28 20:08:21 +00:00
|
|
|
// log.Debug("[reducuer] received event, resetting watch interval !")
|
2018-11-09 17:25:50 +00:00
|
|
|
timer.Reset(interval)
|
|
|
|
events = append(events, true)
|
|
|
|
|
2023-09-15 01:17:12 +00:00
|
|
|
case <-beat:
|
|
|
|
// log.Debugf("reducer beat %s", watch.ID)
|
|
|
|
|
2018-11-09 17:25:50 +00:00
|
|
|
case <-timer.C:
|
|
|
|
if len(events) > 0 {
|
2023-09-15 01:17:12 +00:00
|
|
|
log.Debug("<reduce>: calling Run()")
|
2018-11-09 17:25:50 +00:00
|
|
|
w.Run()
|
|
|
|
|
|
|
|
// Empty events queue
|
|
|
|
events = make([]bool, 0)
|
|
|
|
}
|
2023-09-15 01:17:12 +00:00
|
|
|
case _, ok := <-eventsIn:
|
|
|
|
if !ok {
|
|
|
|
log.Warningf("Events channel closed for %s", watch.ID)
|
|
|
|
return // Exit the function or handle the closed channel case
|
|
|
|
}
|
2018-11-09 17:25:50 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|