pgraph, resources: Add proper start/stop signals
We need to perform some operations in lock step between graph transitions. This should help with that!
This commit is contained in:
@@ -600,7 +600,6 @@ func (g *Graph) Worker(v *Vertex) error {
|
||||
func (g *Graph) Start(first bool) { // start or continue
|
||||
log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState())
|
||||
defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState())
|
||||
var wg sync.WaitGroup
|
||||
t, _ := g.TopologicalSort()
|
||||
// TODO: only calculate indegree if `first` is true to save resources
|
||||
indegree := g.InDegree() // compute all of the indegree's
|
||||
@@ -622,11 +621,13 @@ func (g *Graph) Start(first bool) { // start or continue
|
||||
v.Res.Starter((!first) || indegree[v] == 0)
|
||||
|
||||
if !v.Res.IsWorking() { // if Worker() is not running...
|
||||
v.Res.Setup()
|
||||
g.wg.Add(1)
|
||||
// must pass in value to avoid races...
|
||||
// see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/
|
||||
go func(vv *Vertex) {
|
||||
defer g.wg.Done()
|
||||
defer v.Res.Reset()
|
||||
// TODO: if a sufficient number of workers error,
|
||||
// should something be done? Should these restart
|
||||
// after perma-failure if we have a graph change?
|
||||
@@ -639,19 +640,17 @@ func (g *Graph) Start(first bool) { // start or continue
|
||||
}(v)
|
||||
}
|
||||
|
||||
// let the vertices run their startup code in parallel
|
||||
wg.Add(1)
|
||||
go func(vv *Vertex) {
|
||||
defer wg.Done()
|
||||
vv.Res.Started() // block until started
|
||||
}(v)
|
||||
select {
|
||||
case <-v.Res.Started(): // block until started
|
||||
case <-v.Res.Stopped(): // we failed on init
|
||||
// if the resource Init() fails, we don't hang!
|
||||
}
|
||||
|
||||
if !first { // unpause!
|
||||
v.Res.SendEvent(event.EventStart, nil) // sync!
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait() // wait for everyone
|
||||
// we wait for everyone to start before exiting!
|
||||
}
|
||||
|
||||
// Pause sends pause events to the graph in a topological sort order.
|
||||
@@ -660,7 +659,7 @@ func (g *Graph) Pause() {
|
||||
defer log.Printf("State: %v -> %v", g.setState(graphStatePaused), g.getState())
|
||||
t, _ := g.TopologicalSort()
|
||||
for _, v := range t { // squeeze out the events...
|
||||
v.SendEvent(event.EventPause, nil)
|
||||
v.SendEvent(event.EventPause, nil) // sync
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -140,6 +140,8 @@ type Base interface {
|
||||
Events() chan *event.Event
|
||||
AssociateData(*Data)
|
||||
IsWorking() bool
|
||||
Setup()
|
||||
Reset()
|
||||
Converger() converger.Converger
|
||||
ConvergerUIDs() (converger.ConvergerUID, converger.ConvergerUID, converger.ConvergerUID)
|
||||
GetState() ResState
|
||||
@@ -161,6 +163,7 @@ type Base interface {
|
||||
VarDir(string) (string, error)
|
||||
Running() error // notify the engine that Watch started
|
||||
Started() <-chan struct{} // returns when the resource has started
|
||||
Stopped() <-chan struct{} // returns when the resource has stopped
|
||||
Starter(bool)
|
||||
Poll() error // poll alternative to watching :(
|
||||
ProcessChan() chan *event.Event
|
||||
@@ -202,6 +205,7 @@ type BaseRes struct {
|
||||
state ResState
|
||||
working bool // is the Worker() loop running ?
|
||||
started chan struct{} // closed when worker is started/running
|
||||
stopped chan struct{} // closed when worker is stopped/exited
|
||||
isStarted bool // did the started chan already close?
|
||||
starter bool // does this have indegree == 0 ? XXX: usually?
|
||||
isStateOK bool // whether the state is okay based on events or not
|
||||
@@ -303,7 +307,6 @@ func (obj *BaseRes) Init() error {
|
||||
obj.mutex = &sync.Mutex{}
|
||||
obj.working = true // Worker method should now be running...
|
||||
obj.events = make(chan *event.Event) // unbuffered chan to avoid stale events
|
||||
obj.started = make(chan struct{}) // closes when started
|
||||
|
||||
// FIXME: force a sane default until UnmarshalYAML on *BaseRes works...
|
||||
if obj.Meta().Burst == 0 && obj.Meta().Limit == 0 { // blocked
|
||||
@@ -347,6 +350,8 @@ func (obj *BaseRes) Close() error {
|
||||
obj.wcuid.Unregister()
|
||||
obj.cuid.Unregister()
|
||||
|
||||
close(obj.stopped)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -393,6 +398,19 @@ func (obj *BaseRes) IsWorking() bool {
|
||||
return obj.working
|
||||
}
|
||||
|
||||
// Setup does some work which must happen before the Worker starts. It happens
|
||||
// once per Worker startup.
|
||||
func (obj *BaseRes) Setup() {
|
||||
obj.started = make(chan struct{}) // closes when started
|
||||
obj.stopped = make(chan struct{}) // closes when stopped
|
||||
return
|
||||
}
|
||||
|
||||
// Reset from Setup.
|
||||
func (obj *BaseRes) Reset() {
|
||||
return
|
||||
}
|
||||
|
||||
// Converger returns the converger object used by the system. It can be used to
|
||||
// register new convergers if needed.
|
||||
func (obj *BaseRes) Converger() converger.Converger {
|
||||
@@ -541,6 +559,9 @@ func (obj *BaseRes) VarDir(extra string) (string, error) {
|
||||
// Started returns a channel that closes when the resource has started up.
|
||||
func (obj *BaseRes) Started() <-chan struct{} { return obj.started }
|
||||
|
||||
// Stopped returns a channel that closes when the worker has finished running.
|
||||
func (obj *BaseRes) Stopped() <-chan struct{} { return obj.stopped }
|
||||
|
||||
// Starter sets the starter bool. This defines if a vertex has an indegree of 0.
|
||||
// If we have an indegree of 0, we'll need to be a poke initiator in the graph.
|
||||
func (obj *BaseRes) Starter(b bool) { obj.starter = b }
|
||||
|
||||
188
test/shell/libmgmt-change1.go
Normal file
188
test/shell/libmgmt-change1.go
Normal file
@@ -0,0 +1,188 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/gapi"
|
||||
mgmt "github.com/purpleidea/mgmt/lib"
|
||||
"github.com/purpleidea/mgmt/pgraph"
|
||||
"github.com/purpleidea/mgmt/resources"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// MyGAPI implements the main GAPI interface.
|
||||
type MyGAPI struct {
|
||||
Name string // graph name
|
||||
Interval uint // refresh interval, 0 to never refresh
|
||||
|
||||
data gapi.Data
|
||||
initialized bool
|
||||
closeChan chan struct{}
|
||||
wg sync.WaitGroup // sync group for tunnel go routines
|
||||
}
|
||||
|
||||
// NewMyGAPI creates a new MyGAPI struct and calls Init().
|
||||
func NewMyGAPI(data gapi.Data, name string, interval uint) (*MyGAPI, error) {
|
||||
obj := &MyGAPI{
|
||||
Name: name,
|
||||
Interval: interval,
|
||||
}
|
||||
return obj, obj.Init(data)
|
||||
}
|
||||
|
||||
// Init initializes the MyGAPI struct.
|
||||
func (obj *MyGAPI) Init(data gapi.Data) error {
|
||||
if obj.initialized {
|
||||
return fmt.Errorf("Already initialized!")
|
||||
}
|
||||
if obj.Name == "" {
|
||||
return fmt.Errorf("The graph name must be specified!")
|
||||
}
|
||||
|
||||
obj.data = data // store for later
|
||||
obj.closeChan = make(chan struct{})
|
||||
obj.initialized = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Graph returns a current Graph.
|
||||
func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
|
||||
if !obj.initialized {
|
||||
return nil, fmt.Errorf("%s: MyGAPI is not initialized", obj.Name)
|
||||
}
|
||||
// FIXME: these are being specified temporarily until it's the default!
|
||||
metaparams := resources.MetaParams{
|
||||
Limit: rate.Inf,
|
||||
Burst: 0,
|
||||
}
|
||||
g := pgraph.NewGraph(obj.Name)
|
||||
|
||||
n0 := &resources.NoopRes{
|
||||
BaseRes: resources.BaseRes{
|
||||
Name: "noop1",
|
||||
MetaParams: metaparams,
|
||||
},
|
||||
}
|
||||
v := pgraph.NewVertex(n0)
|
||||
|
||||
g.AddVertex(v)
|
||||
//g, err := config.NewGraphFromConfig(obj.data.Hostname, obj.data.World, obj.data.Noop)
|
||||
return g, nil
|
||||
}
|
||||
|
||||
// Next returns nil errors every time there could be a new graph.
|
||||
func (obj *MyGAPI) Next() chan error {
|
||||
if obj.data.NoWatch || obj.Interval <= 0 {
|
||||
return nil
|
||||
}
|
||||
ch := make(chan error)
|
||||
obj.wg.Add(1)
|
||||
go func() {
|
||||
defer obj.wg.Done()
|
||||
defer close(ch) // this will run before the obj.wg.Done()
|
||||
if !obj.initialized {
|
||||
ch <- fmt.Errorf("%s: MyGAPI is not initialized", obj.Name)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("%s: Generating a bunch of new graphs...", obj.Name)
|
||||
ch <- nil
|
||||
log.Printf("%s: New graph...", obj.Name)
|
||||
ch <- nil
|
||||
log.Printf("%s: New graph...", obj.Name)
|
||||
ch <- nil
|
||||
log.Printf("%s: New graph...", obj.Name)
|
||||
ch <- nil
|
||||
log.Printf("%s: New graph...", obj.Name)
|
||||
ch <- nil
|
||||
log.Printf("%s: New graph...", obj.Name)
|
||||
ch <- nil
|
||||
log.Printf("%s: New graph...", obj.Name)
|
||||
ch <- nil
|
||||
log.Printf("%s: New graph...", obj.Name)
|
||||
ch <- nil
|
||||
log.Printf("%s: New graph...", obj.Name)
|
||||
ch <- nil
|
||||
time.Sleep(1 * time.Second)
|
||||
log.Printf("%s: Done generating graphs!", obj.Name)
|
||||
}()
|
||||
return ch
|
||||
}
|
||||
|
||||
// Close shuts down the MyGAPI.
|
||||
func (obj *MyGAPI) Close() error {
|
||||
if !obj.initialized {
|
||||
return fmt.Errorf("%s: MyGAPI is not initialized", obj.Name)
|
||||
}
|
||||
close(obj.closeChan)
|
||||
obj.wg.Wait()
|
||||
obj.initialized = false // closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run runs an embedded mgmt server.
|
||||
func Run() error {
|
||||
|
||||
obj := &mgmt.Main{}
|
||||
obj.Program = "libmgmt" // TODO: set on compilation
|
||||
obj.Version = "0.0.1" // TODO: set on compilation
|
||||
obj.TmpPrefix = true
|
||||
obj.IdealClusterSize = -1
|
||||
obj.ConvergedTimeout = 5
|
||||
obj.Noop = false // does stuff!
|
||||
|
||||
obj.GAPI = &MyGAPI{ // graph API
|
||||
Name: obj.Program, // graph name
|
||||
Interval: 15, // arbitrarily change graph every 15 seconds
|
||||
}
|
||||
|
||||
if err := obj.Init(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// install the exit signal handler
|
||||
exit := make(chan struct{})
|
||||
defer close(exit)
|
||||
go func() {
|
||||
signals := make(chan os.Signal, 1)
|
||||
signal.Notify(signals, os.Interrupt) // catch ^C
|
||||
//signal.Notify(signals, os.Kill) // catch signals
|
||||
signal.Notify(signals, syscall.SIGTERM)
|
||||
|
||||
select {
|
||||
case sig := <-signals: // any signal will do
|
||||
if sig == os.Interrupt {
|
||||
log.Println("Interrupted by ^C")
|
||||
obj.Exit(nil)
|
||||
return
|
||||
}
|
||||
log.Println("Interrupted by signal")
|
||||
obj.Exit(fmt.Errorf("Killed by %v", sig))
|
||||
return
|
||||
case <-exit:
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
if err := obj.Run(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
log.Printf("Hello!")
|
||||
if err := Run(); err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
return
|
||||
}
|
||||
log.Printf("Goodbye!")
|
||||
}
|
||||
9
test/shell/libmgmt-change1.sh
Executable file
9
test/shell/libmgmt-change1.sh
Executable file
@@ -0,0 +1,9 @@
|
||||
#!/bin/bash -e
|
||||
|
||||
go build libmgmt-change1.go
|
||||
# this example should change graphs frequently, and then shutdown...
|
||||
$timeout --kill-after=30s 20s ./libmgmt-change1 &
|
||||
pid=$!
|
||||
wait $pid # get exit status
|
||||
e=$?
|
||||
exit $e
|
||||
@@ -14,7 +14,7 @@ done < "$FILE"
|
||||
cd "${ROOT}"
|
||||
|
||||
find_files() {
|
||||
git ls-files | grep '\.go$' | grep -v '^examples/'
|
||||
git ls-files | grep '\.go$' | grep -v '^examples/' | grep -v '^test/'
|
||||
}
|
||||
|
||||
bad_files=$(
|
||||
|
||||
Reference in New Issue
Block a user