You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
hugobot/scheduler.go

380 lines
7.6 KiB
Go

package main
import (
"git.blob42.xyz/blob42/hugobot/v3/export"
"git.blob42.xyz/blob42/hugobot/v3/feeds"
"git.blob42.xyz/blob42/hugobot/v3/static"
"git.blob42.xyz/blob42/hugobot/v3/utils"
"errors"
"log"
"math"
"path"
"time"
gum "git.blob42.xyz/blob42/gum.git"
"github.com/syndtr/goleveldb/leveldb"
"github.com/beeker1121/goque"
)
const (
StaticDataExportInterval = 3600 * time.Second
JobSchedulerInterval = 60 * time.Second
QDataDir = "./.data"
MaxQueueJobs = 100
MaxSerialJob = 100
// Used in JobPool to avoid duplicate jobs for the same feed
MapFeedJobFile = "map_feed_job"
)
var (
SourcePriorityRange = Range{Min: 0, Max: 336 * time.Hour.Seconds()} // 2 weeks
TargetPriorityRange = Range{Min: 0, Max: math.MaxUint8}
SchedulerUpdates = make(chan *Job)
)
type Range struct {
Min float64
Max float64
}
func (r Range) Val() float64 {
return r.Max - r.Min
}
// JobPool schedluer. Priodically schedule new jobs
type Scheduler struct {
jobs *JobPool
jobUpdates chan *Job
serialJobs chan *Job
}
func serialRun(inputJobs <-chan *Job) {
for {
j := <-inputJobs
log.Printf("serial run %v", j)
j.Handle()
}
}
func (s *Scheduler) Run(m gum.UnitManager) {
go serialRun(s.serialJobs) // These jobs run in series
jobTimer := time.NewTicker(JobSchedulerInterval)
staticExportTimer := time.NewTicker(StaticDataExportInterval)
for {
select {
case <-jobTimer.C:
log.Println("job heartbeat !")
j, _ := s.jobs.Peek()
if j != nil {
log.Printf("peeking job: %s\n", j)
}
// If max pool jobs reached clean the pool
if s.jobs.Length() >= MaxQueueJobs {
s.jobs.Drop()
s.panicAndShutdown(m)
return
}
s.updateJobs()
// Spawn job works
s.dispatchJobs()
case job := <-s.jobUpdates:
log.Printf("job update recieved: %s", JobStatusMap[job.Status])
switch job.Status {
case JobStatusDone:
log.Println("Job is done, removing from feedJobMap. New jobs for this feed can be added now.")
// Remove job from feedJobMap
err := s.jobs.DeleteMarkedJob(job)
if err != nil {
log.Fatal(err)
}
// Create export job for successful fetch jobs
if job.JobType == JobTypeFetch {
log.Println("Creating an export job")
expJob, err := NewExportJob(job.Feed, 0)
if err != nil {
log.Fatal(err)
}
//log.Printf("export job: %+v", expJob)
log.Printf("export job: %s\n", expJob)
err = s.jobs.Enqueue(expJob)
if err != nil {
log.Fatal(err)
}
}
case JobStatusFailed:
//TODO: Store all failed jobs somewhere
err := s.jobs.DeleteMarkedJob(job)
if err != nil {
log.Fatal(err)
}
log.Printf("Job %s failed with error: %s", job.Feed.Name, job.Err)
}
case <-staticExportTimer.C:
log.Println("-------- export tick --------")
log.Println("Exporting static data ...")
err := static.HugoExportData()
if err != nil {
log.Fatal(err)
}
log.Println("Exporting weeks ...")
err = export.ExportWeeks()
if err != nil {
log.Fatal(err)
}
log.Println("Exporting btc ...")
err = export.ExportBTCAddresses()
if err != nil {
log.Fatal(err)
}
case <-m.ShouldStop():
s.Shutdown()
m.Done()
}
}
}
func (s *Scheduler) panicAndShutdown(m gum.UnitManager) {
//err := s.jobs.Drop()
//if err != nil {
//log.Fatal(err)
//}
//TODO
m.Panic(errors.New("max job queue exceeded"))
s.Shutdown()
m.Done()
}
func (s *Scheduler) Shutdown() {
// Flush ongoing jobs back to job queue
iter := s.jobs.feedJobMap.NewIterator(nil, nil)
var markedDelete [][]byte
for iter.Next() {
key := iter.Key()
log.Printf("Putting job %s back to queue", key)
value := iter.Value()
//log.Println("value ", value)
job, err := JobFromBytes(value)
if err != nil {
log.Fatal(err)
}
err = s.jobs.Enqueue(job)
if err != nil {
log.Fatal(err)
}
markedDelete = append(markedDelete, key)
}
iter.Release()
err := iter.Error()
if err != nil {
log.Fatal(err)
}
for _, k := range markedDelete {
err := s.jobs.feedJobMap.Delete(k, nil)
if err != nil {
log.Fatal(err)
}
}
// Close jobpool queue
err = s.jobs.Close()
if err != nil {
panic(err)
}
}
// Dispatchs all jobs in the job pool to task workers
func (s *Scheduler) dispatchJobs() {
//log.Println("dispatching ...")
jobsLength := int(s.jobs.Length())
for i := 0; i < jobsLength; i++ {
log.Printf("Dequeing %d/%d", i, s.jobs.Length())
j, err := s.jobs.Dequeue()
if err != nil {
log.Fatal(err)
}
log.Println("Dispatching ", j)
if j.Serial {
s.serialJobs <- j
} else {
go j.Handle()
}
}
}
// Gets all available feeds and creates
// a new Job if time.Now() - feed.last_refresh >= feed.interval
func (s *Scheduler) updateJobs() {
//
// Get all feeds
//
// For each feed compare Now() vs last refresh
// If now() - last_refresh >= refresh interval -> create job
feeds, err := feeds.ListFeeds()
if err != nil {
log.Fatal(err)
}
//log.Printf("updating jobs for %d feeds\n", len(feeds))
// Check all jobs
for _, f := range feeds {
//log.Printf("checking feed %s: %s\n", f.Name, f.Url)
//log.Printf("Seconds since last refresh %f", time.Since(f.LastRefresh.Time).Seconds())
//log.Printf("Refresh interval %f", f.Interval)
if delta, ok := f.ShouldRefresh(); ok {
log.Printf("Refreshing %s -- %f seconds since last.", f.Name, delta)
// If there is already a job with this feed id skip and return an empty job
feedId := utils.IntToBytes(f.FeedID)
_, err := s.jobs.feedJobMap.Get(feedId, nil)
if err == nil {
log.Println("Job already exists for feed")
} else if err != leveldb.ErrNotFound {
log.Fatal(err)
} else {
// Priority is based on the delta time since last refresh
// bigger delta == higher priority
// Convert priority to smaller range priority
// We use original range of `0 - 1 month` in seconds
// Target range is uint8 `0 - 255`
prio := MapPriority(delta)
job, err := NewFetchJob(f, prio)
if err != nil {
panic(err)
}
err = s.jobs.Enqueue(job)
if err != nil {
panic(err)
}
err = s.jobs.MarkUniqJob(job)
if err != nil {
panic(err)
}
}
}
}
jLen := s.jobs.Length()
if jLen > 0 {
log.Printf("jobs length = %+v\n", jLen)
}
}
func NewScheduler() gum.WorkUnit {
// Priority queue for jobs
q, err := goque.OpenPriorityQueue(QDataDir, goque.DESC)
if err != nil {
panic(err)
}
// map[FEED_ID][JOB]
// Used to avoid duplicate jobs in the queue for the same feed
feedJobMapDB, err := leveldb.OpenFile(path.Join(QDataDir, MapFeedJobFile), nil)
if err != nil {
panic(err)
}
jobPool := &JobPool{
Q: q,
maxJobs: MaxQueueJobs,
feedJobMap: feedJobMapDB,
}
// Restore all ongoing jobs in feedJobMap to the pool
iter := feedJobMapDB.NewIterator(nil, nil)
var markedDelete [][]byte
for iter.Next() {
key := iter.Key()
value := iter.Value()
job, err := JobFromBytes(value)
if err != nil {
log.Fatal(err)
}
log.Printf("Restoring uncomplete job %s back to job queue", job.ID)
err = jobPool.Enqueue(job)
if err != nil {
log.Fatal(err)
}
markedDelete = append(markedDelete, key)
}
iter.Release()
err = iter.Error()
if err != nil {
log.Fatal(err)
}
serialJobs := make(chan *Job, MaxSerialJob)
return &Scheduler{
jobs: jobPool,
jobUpdates: SchedulerUpdates,
serialJobs: serialJobs,
}
}
func NotifyScheduler(job *Job) {
SchedulerUpdates <- job
}
func MapPriority(val float64) uint8 {
newVal := (((val - SourcePriorityRange.Min) * TargetPriorityRange.Val()) /
SourcePriorityRange.Val()) + TargetPriorityRange.Min
return uint8(newVal)
}