You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
190 lines
3.6 KiB
Go
190 lines
3.6 KiB
Go
package gum
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"reflect"
|
|
"strings"
|
|
)
|
|
|
|
var idGenerator = genID()
|
|
|
|
// The WorkUnit interface is used to define a unit of work.
|
|
// The Run method will be called in a goroutine.
|
|
type WorkUnit interface {
|
|
Run(UnitManager)
|
|
}
|
|
|
|
// The UnitManager interface is used to manage a unit of work.
|
|
// The ShouldStop method returns a channel that will be closed when the unit
|
|
// should stop.
|
|
// The Done method should be called when the unit is done.
|
|
type UnitManager interface {
|
|
ShouldStop() <-chan bool
|
|
Done()
|
|
Panic(err error)
|
|
}
|
|
|
|
type WorkUnitManager struct {
|
|
stop chan bool
|
|
workerQuit chan bool
|
|
unit WorkUnit
|
|
panic chan error
|
|
isPaniced bool
|
|
}
|
|
|
|
func (w *WorkUnitManager) ShouldStop() <-chan bool {
|
|
return w.stop
|
|
}
|
|
|
|
func (w *WorkUnitManager) Done() {
|
|
w.workerQuit <- true
|
|
}
|
|
|
|
func (w *WorkUnitManager) Panic(err error) {
|
|
w.panic <- err
|
|
w.isPaniced = true
|
|
w.workerQuit <- true
|
|
close(w.stop)
|
|
}
|
|
|
|
type Manager struct {
|
|
signalIn chan os.Signal
|
|
|
|
shutdownSigs []os.Signal
|
|
|
|
workers map[string]*WorkUnitManager
|
|
|
|
Quit chan bool
|
|
|
|
panic chan error // Used for panicing goroutines
|
|
}
|
|
|
|
func (m *Manager) Run() {
|
|
log.Println("Starting manager ...")
|
|
|
|
for unitName, w := range m.workers {
|
|
log.Printf("Starting <%s>\n", unitName)
|
|
go w.unit.Run(w)
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case sig := <-m.signalIn:
|
|
|
|
if !in(m.shutdownSigs, sig) {
|
|
break
|
|
}
|
|
|
|
log.Println("shutting event received ... ")
|
|
|
|
// send shutdown event to all worker units
|
|
for name, w := range m.workers {
|
|
log.Printf("shutting down <%s>\n", name)
|
|
w.stop <- true
|
|
}
|
|
|
|
// Wait for all units to quit
|
|
for name, w := range m.workers {
|
|
<-w.workerQuit
|
|
log.Printf("<%s> down", name)
|
|
}
|
|
|
|
// All workers have shutdown
|
|
log.Println("All workers have shutdown, shutting down manager ...")
|
|
|
|
m.Quit <- true
|
|
|
|
case p := <-m.panic:
|
|
|
|
for name, w := range m.workers {
|
|
if w.isPaniced {
|
|
log.Printf("Panicing for <%s>: %s", name, p)
|
|
}
|
|
}
|
|
|
|
for name, w := range m.workers {
|
|
log.Printf("shuting down <%s>\n", name)
|
|
if !w.isPaniced {
|
|
w.stop <- true
|
|
}
|
|
}
|
|
|
|
// Wait for all units to quit
|
|
for name, w := range m.workers {
|
|
<-w.workerQuit
|
|
log.Printf("<%s> down", name)
|
|
}
|
|
|
|
// All workers have shutdown
|
|
log.Println("All workers have shutdown, shutting down manager ...")
|
|
|
|
m.Quit <- true
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) ShutdownOn(sig ...os.Signal) {
|
|
|
|
for _, s := range sig {
|
|
log.Printf("Registering shutdown signal: %s\n", s)
|
|
signal.Notify(m.signalIn, s)
|
|
}
|
|
|
|
m.shutdownSigs = append(m.shutdownSigs, sig...)
|
|
}
|
|
|
|
type IDGenerator func(string) int
|
|
|
|
func genID() IDGenerator {
|
|
ids := make(map[string]int)
|
|
|
|
return func(unit string) int {
|
|
ret := ids[unit]
|
|
ids[unit]++
|
|
return ret
|
|
}
|
|
}
|
|
|
|
func (m *Manager) AddUnit(unit WorkUnit, name string) {
|
|
|
|
workUnitManager := &WorkUnitManager{
|
|
workerQuit: make(chan bool, 1),
|
|
stop: make(chan bool, 1),
|
|
unit: unit,
|
|
panic: m.panic,
|
|
}
|
|
|
|
unitType := reflect.TypeOf(unit)
|
|
unitClass := strings.Split(unitType.String(), ".")[1]
|
|
unitName := fmt.Sprintf("%s[%s", name, unitClass)
|
|
unitID := idGenerator(unitName)
|
|
unitName = fmt.Sprintf("%s#%d]", unitName, unitID)
|
|
|
|
log.Println("Adding unit ", unitName)
|
|
|
|
m.workers[unitName] = workUnitManager
|
|
}
|
|
|
|
func NewManager() *Manager {
|
|
return &Manager{
|
|
signalIn: make(chan os.Signal, 1),
|
|
Quit: make(chan bool, 1),
|
|
workers: make(map[string]*WorkUnitManager),
|
|
panic: make(chan error, 1),
|
|
}
|
|
}
|
|
|
|
// Test if signal is in array
|
|
func in(arr []os.Signal, sig os.Signal) bool {
|
|
for _, s := range arr {
|
|
if s == sig {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|