You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
329 lines
5.7 KiB
Go
329 lines
5.7 KiB
Go
package main
|
|
|
|
import (
|
|
"git.blob42.xyz/blob42/hugobot/v3/export"
|
|
"git.blob42.xyz/blob42/hugobot/v3/feeds"
|
|
"git.blob42.xyz/blob42/hugobot/v3/handlers"
|
|
"git.blob42.xyz/blob42/hugobot/v3/posts"
|
|
"git.blob42.xyz/blob42/hugobot/v3/utils"
|
|
"bytes"
|
|
"encoding/gob"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"time"
|
|
|
|
"github.com/beeker1121/goque"
|
|
"github.com/gofrs/uuid"
|
|
"github.com/syndtr/goleveldb/leveldb"
|
|
)
|
|
|
|
type JobStatus int
|
|
type JobType int
|
|
|
|
const (
|
|
JobStatusNew JobStatus = iota
|
|
JobStatusQueued
|
|
JobStatusDone
|
|
JobStatusFailed
|
|
)
|
|
|
|
const (
|
|
JobTypeFetch JobType = iota
|
|
JobTypeExport
|
|
)
|
|
|
|
var (
|
|
JobTypeMap = map[JobType]string{
|
|
JobTypeFetch: "fetch",
|
|
JobTypeExport: "export",
|
|
}
|
|
|
|
JobStatusMap = map[JobStatus]string{
|
|
JobStatusNew: "new",
|
|
JobStatusQueued: "queued",
|
|
JobStatusDone: "done",
|
|
JobStatusFailed: "failed",
|
|
}
|
|
)
|
|
|
|
func (js JobStatus) String() string {
|
|
return JobStatusMap[js]
|
|
}
|
|
|
|
type Prioritizer interface {
|
|
// Return job priority
|
|
GetPriority() uint8
|
|
}
|
|
|
|
// Represents a Job to be done on a feed
|
|
// It could be any of: Poll, Fetch, Store
|
|
// Should implement Poller
|
|
type Job struct {
|
|
ID uuid.UUID
|
|
Feed *feeds.Feed
|
|
Status JobStatus
|
|
Data []*posts.Post
|
|
|
|
Priority uint8
|
|
JobType JobType
|
|
Serial bool // Should be run in a serial manner
|
|
|
|
Err error
|
|
|
|
Prioritizer
|
|
}
|
|
|
|
type Handler interface {
|
|
Handle()
|
|
}
|
|
|
|
// GoRoutine method
|
|
func (job *Job) Handle() {
|
|
var err error
|
|
|
|
if job.JobType == JobTypeFetch {
|
|
handler := handlers.GetFormatHandler(*job.Feed)
|
|
err = handler.Handle(*job.Feed)
|
|
} else if job.JobType == JobTypeExport {
|
|
handler := export.NewHugoExporter()
|
|
err = handler.Handle(*job.Feed)
|
|
}
|
|
|
|
if err != nil {
|
|
job.Failed(err)
|
|
return
|
|
}
|
|
//log.Println("Done for job type ", job.JobType)
|
|
job.Done()
|
|
}
|
|
|
|
func (job *Job) Failed(err error) {
|
|
errr := job.Feed.UpdateRefreshTime(time.Now())
|
|
if errr != nil {
|
|
log.Fatal(errr)
|
|
}
|
|
|
|
job.Status = JobStatusFailed
|
|
job.Err = err
|
|
NotifyScheduler(job)
|
|
}
|
|
|
|
func (job *Job) Done() {
|
|
//TODO: only update refresh time after actual fetching
|
|
//
|
|
err := job.Feed.UpdateRefreshTime(time.Now())
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
job.Status = JobStatusDone
|
|
NotifyScheduler(job)
|
|
}
|
|
|
|
func (job *Job) GetPriority() uint8 {
|
|
return job.Priority
|
|
}
|
|
|
|
func (job *Job) String() string {
|
|
exp := map[string]interface{}{
|
|
"jobId": job.ID,
|
|
"feed": job.Feed.Name,
|
|
"priority": job.Priority,
|
|
"jobType": JobTypeMap[job.JobType],
|
|
"serial": job.Serial,
|
|
"err": job.Err,
|
|
}
|
|
|
|
b, err := json.MarshalIndent(exp, "", " ")
|
|
if err != nil {
|
|
log.Printf("error printing job %s\n", err)
|
|
return ""
|
|
}
|
|
return fmt.Sprintf(string(b))
|
|
|
|
}
|
|
|
|
// Decode object from []byte
|
|
func JobFromBytes(value []byte) (*Job, error) {
|
|
buffer := bytes.NewBuffer(value)
|
|
dec := gob.NewDecoder(buffer)
|
|
|
|
j := &Job{}
|
|
|
|
err := dec.Decode(j)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return j, nil
|
|
}
|
|
|
|
// helper function for jobs that accepts any
|
|
// value type, which is then encoded into a byte slice using
|
|
// encoding/gob.
|
|
func (job *Job) ToBytes() ([]byte, error) {
|
|
var buffer bytes.Buffer
|
|
enc := gob.NewEncoder(&buffer)
|
|
if err := enc.Encode(job); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return buffer.Bytes(), nil
|
|
}
|
|
|
|
func NewFetchJob(feed *feeds.Feed,
|
|
priority uint8) (*Job, error) {
|
|
|
|
uuid, err := uuid.NewV4()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
job := &Job{
|
|
ID: uuid,
|
|
Feed: feed,
|
|
Status: JobStatusNew,
|
|
JobType: JobTypeFetch,
|
|
Priority: priority,
|
|
Serial: feed.Serial,
|
|
}
|
|
|
|
return job, nil
|
|
}
|
|
|
|
func NewExportJob(feed *feeds.Feed,
|
|
priority uint8) (*Job, error) {
|
|
|
|
uuid, err := uuid.NewV4()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
job := &Job{
|
|
ID: uuid,
|
|
Feed: feed,
|
|
Status: JobStatusNew,
|
|
Priority: priority,
|
|
JobType: JobTypeExport,
|
|
}
|
|
|
|
return job, nil
|
|
}
|
|
|
|
type Queuer interface {
|
|
Enqueue(job *Job) (*Job, error)
|
|
Dequeue() (*Job, error)
|
|
Close() error
|
|
Drop() error // Clsoe and delete all jobs
|
|
Length() uint64
|
|
//Peek() (*Job, error)
|
|
//PeekByID(id uint64) (*Job, error)
|
|
|
|
// Returns item located at given offset starting from head
|
|
// of queue without removing it
|
|
//PeekByOffset(offset uint64) (*Job, error)
|
|
}
|
|
|
|
// Represents the queue of fetching todo jobs
|
|
type JobPool struct {
|
|
// Actual jobs queue
|
|
Q *goque.PriorityQueue
|
|
|
|
// Handle queuing mechanics
|
|
Queuer
|
|
|
|
maxJobs int
|
|
|
|
feedJobMap *leveldb.DB
|
|
}
|
|
|
|
func (jp *JobPool) Close() error {
|
|
jp.Q.Close()
|
|
|
|
err := jp.feedJobMap.Close()
|
|
return err
|
|
}
|
|
|
|
func (jp *JobPool) Dequeue() (*Job, error) {
|
|
item, err := jp.Q.Dequeue()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
j := &Job{}
|
|
item.ToObject(j)
|
|
|
|
//TODO: This is done when the job is done
|
|
//feedId := utils.IntToBytes(j.Feed.ID)
|
|
//err = jp.feedJobMap.Delete(feedId, nil)
|
|
//if err != nil {
|
|
//return nil, err
|
|
//}
|
|
|
|
return j, nil
|
|
}
|
|
|
|
func (jp *JobPool) DeleteMarkedJob(job *Job) error {
|
|
var err error
|
|
|
|
feedId := utils.IntToBytes(job.Feed.FeedID)
|
|
err = jp.feedJobMap.Delete(feedId, nil)
|
|
|
|
return err
|
|
}
|
|
|
|
// Mark a job in feedJobMap to avoid duplicates
|
|
func (jp *JobPool) MarkUniqJob(job *Job) error {
|
|
|
|
// Mark the feed in the feedJobMap to avoid creating duplicates
|
|
feedId := utils.IntToBytes(job.Feed.FeedID)
|
|
|
|
jobData, err := job.ToBytes()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = jp.feedJobMap.Put(feedId, jobData, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (jp *JobPool) Enqueue(job *Job) error {
|
|
|
|
// Update job status
|
|
job.Status = JobStatusQueued
|
|
|
|
// Enqueue the job in the jobpool
|
|
item, err := jp.Q.EnqueueObject(job.GetPriority(), job)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Recode item to job
|
|
j := &Job{}
|
|
item.ToObject(j)
|
|
|
|
return nil
|
|
}
|
|
func (jp *JobPool) Drop() {
|
|
jp.Q.Drop()
|
|
}
|
|
|
|
func (jp *JobPool) Length() uint64 {
|
|
return jp.Q.Length()
|
|
}
|
|
|
|
func (jp *JobPool) Peek() (*Job, error) {
|
|
item, err := jp.Q.Peek()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
j := &Job{}
|
|
item.ToObject(j)
|
|
return j, err
|
|
}
|