From 18ea05c837036288692490ca0a820c4b4072b007 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Tue, 21 Feb 2017 18:06:09 -0500 Subject: [PATCH] pgraph, resources: Add proper start/stop signals We need to perform some operations in lock step between graph transitions. This should help with that! --- pgraph/actions.go | 19 ++-- resources/resources.go | 23 ++++- test/shell/libmgmt-change1.go | 188 ++++++++++++++++++++++++++++++++++ test/shell/libmgmt-change1.sh | 9 ++ test/test-headerfmt.sh | 2 +- 5 files changed, 229 insertions(+), 12 deletions(-) create mode 100644 test/shell/libmgmt-change1.go create mode 100755 test/shell/libmgmt-change1.sh diff --git a/pgraph/actions.go b/pgraph/actions.go index 77ee5a8c..155dcd13 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -600,7 +600,6 @@ func (g *Graph) Worker(v *Vertex) error { func (g *Graph) Start(first bool) { // start or continue log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState()) defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState()) - var wg sync.WaitGroup t, _ := g.TopologicalSort() // TODO: only calculate indegree if `first` is true to save resources indegree := g.InDegree() // compute all of the indegree's @@ -622,11 +621,13 @@ func (g *Graph) Start(first bool) { // start or continue v.Res.Starter((!first) || indegree[v] == 0) if !v.Res.IsWorking() { // if Worker() is not running... + v.Res.Setup() g.wg.Add(1) // must pass in value to avoid races... // see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/ go func(vv *Vertex) { defer g.wg.Done() + defer v.Res.Reset() // TODO: if a sufficient number of workers error, // should something be done? Should these restart // after perma-failure if we have a graph change? @@ -639,19 +640,17 @@ func (g *Graph) Start(first bool) { // start or continue }(v) } - // let the vertices run their startup code in parallel - wg.Add(1) - go func(vv *Vertex) { - defer wg.Done() - vv.Res.Started() // block until started - }(v) + select { + case <-v.Res.Started(): // block until started + case <-v.Res.Stopped(): // we failed on init + // if the resource Init() fails, we don't hang! + } if !first { // unpause! v.Res.SendEvent(event.EventStart, nil) // sync! } } - - wg.Wait() // wait for everyone + // we wait for everyone to start before exiting! } // Pause sends pause events to the graph in a topological sort order. @@ -660,7 +659,7 @@ func (g *Graph) Pause() { defer log.Printf("State: %v -> %v", g.setState(graphStatePaused), g.getState()) t, _ := g.TopologicalSort() for _, v := range t { // squeeze out the events... - v.SendEvent(event.EventPause, nil) + v.SendEvent(event.EventPause, nil) // sync } } diff --git a/resources/resources.go b/resources/resources.go index 490d0032..0b0167bd 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -140,6 +140,8 @@ type Base interface { Events() chan *event.Event AssociateData(*Data) IsWorking() bool + Setup() + Reset() Converger() converger.Converger ConvergerUIDs() (converger.ConvergerUID, converger.ConvergerUID, converger.ConvergerUID) GetState() ResState @@ -161,6 +163,7 @@ type Base interface { VarDir(string) (string, error) Running() error // notify the engine that Watch started Started() <-chan struct{} // returns when the resource has started + Stopped() <-chan struct{} // returns when the resource has stopped Starter(bool) Poll() error // poll alternative to watching :( ProcessChan() chan *event.Event @@ -202,6 +205,7 @@ type BaseRes struct { state ResState working bool // is the Worker() loop running ? started chan struct{} // closed when worker is started/running + stopped chan struct{} // closed when worker is stopped/exited isStarted bool // did the started chan already close? starter bool // does this have indegree == 0 ? XXX: usually? isStateOK bool // whether the state is okay based on events or not @@ -303,7 +307,6 @@ func (obj *BaseRes) Init() error { obj.mutex = &sync.Mutex{} obj.working = true // Worker method should now be running... obj.events = make(chan *event.Event) // unbuffered chan to avoid stale events - obj.started = make(chan struct{}) // closes when started // FIXME: force a sane default until UnmarshalYAML on *BaseRes works... if obj.Meta().Burst == 0 && obj.Meta().Limit == 0 { // blocked @@ -347,6 +350,8 @@ func (obj *BaseRes) Close() error { obj.wcuid.Unregister() obj.cuid.Unregister() + close(obj.stopped) + return nil } @@ -393,6 +398,19 @@ func (obj *BaseRes) IsWorking() bool { return obj.working } +// Setup does some work which must happen before the Worker starts. It happens +// once per Worker startup. +func (obj *BaseRes) Setup() { + obj.started = make(chan struct{}) // closes when started + obj.stopped = make(chan struct{}) // closes when stopped + return +} + +// Reset from Setup. +func (obj *BaseRes) Reset() { + return +} + // Converger returns the converger object used by the system. It can be used to // register new convergers if needed. func (obj *BaseRes) Converger() converger.Converger { @@ -541,6 +559,9 @@ func (obj *BaseRes) VarDir(extra string) (string, error) { // Started returns a channel that closes when the resource has started up. func (obj *BaseRes) Started() <-chan struct{} { return obj.started } +// Stopped returns a channel that closes when the worker has finished running. +func (obj *BaseRes) Stopped() <-chan struct{} { return obj.stopped } + // Starter sets the starter bool. This defines if a vertex has an indegree of 0. // If we have an indegree of 0, we'll need to be a poke initiator in the graph. func (obj *BaseRes) Starter(b bool) { obj.starter = b } diff --git a/test/shell/libmgmt-change1.go b/test/shell/libmgmt-change1.go new file mode 100644 index 00000000..f1c3fa19 --- /dev/null +++ b/test/shell/libmgmt-change1.go @@ -0,0 +1,188 @@ +package main + +import ( + "fmt" + "log" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/purpleidea/mgmt/gapi" + mgmt "github.com/purpleidea/mgmt/lib" + "github.com/purpleidea/mgmt/pgraph" + "github.com/purpleidea/mgmt/resources" + + "golang.org/x/time/rate" +) + +// MyGAPI implements the main GAPI interface. +type MyGAPI struct { + Name string // graph name + Interval uint // refresh interval, 0 to never refresh + + data gapi.Data + initialized bool + closeChan chan struct{} + wg sync.WaitGroup // sync group for tunnel go routines +} + +// NewMyGAPI creates a new MyGAPI struct and calls Init(). +func NewMyGAPI(data gapi.Data, name string, interval uint) (*MyGAPI, error) { + obj := &MyGAPI{ + Name: name, + Interval: interval, + } + return obj, obj.Init(data) +} + +// Init initializes the MyGAPI struct. +func (obj *MyGAPI) Init(data gapi.Data) error { + if obj.initialized { + return fmt.Errorf("Already initialized!") + } + if obj.Name == "" { + return fmt.Errorf("The graph name must be specified!") + } + + obj.data = data // store for later + obj.closeChan = make(chan struct{}) + obj.initialized = true + return nil +} + +// Graph returns a current Graph. +func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { + if !obj.initialized { + return nil, fmt.Errorf("%s: MyGAPI is not initialized", obj.Name) + } + // FIXME: these are being specified temporarily until it's the default! + metaparams := resources.MetaParams{ + Limit: rate.Inf, + Burst: 0, + } + g := pgraph.NewGraph(obj.Name) + + n0 := &resources.NoopRes{ + BaseRes: resources.BaseRes{ + Name: "noop1", + MetaParams: metaparams, + }, + } + v := pgraph.NewVertex(n0) + + g.AddVertex(v) + //g, err := config.NewGraphFromConfig(obj.data.Hostname, obj.data.World, obj.data.Noop) + return g, nil +} + +// Next returns nil errors every time there could be a new graph. +func (obj *MyGAPI) Next() chan error { + if obj.data.NoWatch || obj.Interval <= 0 { + return nil + } + ch := make(chan error) + obj.wg.Add(1) + go func() { + defer obj.wg.Done() + defer close(ch) // this will run before the obj.wg.Done() + if !obj.initialized { + ch <- fmt.Errorf("%s: MyGAPI is not initialized", obj.Name) + return + } + + log.Printf("%s: Generating a bunch of new graphs...", obj.Name) + ch <- nil + log.Printf("%s: New graph...", obj.Name) + ch <- nil + log.Printf("%s: New graph...", obj.Name) + ch <- nil + log.Printf("%s: New graph...", obj.Name) + ch <- nil + log.Printf("%s: New graph...", obj.Name) + ch <- nil + log.Printf("%s: New graph...", obj.Name) + ch <- nil + log.Printf("%s: New graph...", obj.Name) + ch <- nil + log.Printf("%s: New graph...", obj.Name) + ch <- nil + log.Printf("%s: New graph...", obj.Name) + ch <- nil + time.Sleep(1 * time.Second) + log.Printf("%s: Done generating graphs!", obj.Name) + }() + return ch +} + +// Close shuts down the MyGAPI. +func (obj *MyGAPI) Close() error { + if !obj.initialized { + return fmt.Errorf("%s: MyGAPI is not initialized", obj.Name) + } + close(obj.closeChan) + obj.wg.Wait() + obj.initialized = false // closed = true + return nil +} + +// Run runs an embedded mgmt server. +func Run() error { + + obj := &mgmt.Main{} + obj.Program = "libmgmt" // TODO: set on compilation + obj.Version = "0.0.1" // TODO: set on compilation + obj.TmpPrefix = true + obj.IdealClusterSize = -1 + obj.ConvergedTimeout = 5 + obj.Noop = false // does stuff! + + obj.GAPI = &MyGAPI{ // graph API + Name: obj.Program, // graph name + Interval: 15, // arbitrarily change graph every 15 seconds + } + + if err := obj.Init(); err != nil { + return err + } + + // install the exit signal handler + exit := make(chan struct{}) + defer close(exit) + go func() { + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) // catch ^C + //signal.Notify(signals, os.Kill) // catch signals + signal.Notify(signals, syscall.SIGTERM) + + select { + case sig := <-signals: // any signal will do + if sig == os.Interrupt { + log.Println("Interrupted by ^C") + obj.Exit(nil) + return + } + log.Println("Interrupted by signal") + obj.Exit(fmt.Errorf("Killed by %v", sig)) + return + case <-exit: + return + } + }() + + if err := obj.Run(); err != nil { + return err + } + return nil +} + +func main() { + log.Printf("Hello!") + if err := Run(); err != nil { + fmt.Println(err) + os.Exit(1) + return + } + log.Printf("Goodbye!") +} diff --git a/test/shell/libmgmt-change1.sh b/test/shell/libmgmt-change1.sh new file mode 100755 index 00000000..9ae003e1 --- /dev/null +++ b/test/shell/libmgmt-change1.sh @@ -0,0 +1,9 @@ +#!/bin/bash -e + +go build libmgmt-change1.go +# this example should change graphs frequently, and then shutdown... +$timeout --kill-after=30s 20s ./libmgmt-change1 & +pid=$! +wait $pid # get exit status +e=$? +exit $e diff --git a/test/test-headerfmt.sh b/test/test-headerfmt.sh index 640ddb20..40a0982c 100755 --- a/test/test-headerfmt.sh +++ b/test/test-headerfmt.sh @@ -14,7 +14,7 @@ done < "$FILE" cd "${ROOT}" find_files() { - git ls-files | grep '\.go$' | grep -v '^examples/' + git ls-files | grep '\.go$' | grep -v '^examples/' | grep -v '^test/' } bad_files=$(