pgraph, resources: Integrate properly with the startup logic
This signals which resources have to run their initial pokes, and removes the racy retry timer. We actually get a proper signal when things are running too!
This commit is contained in:
@@ -453,10 +453,26 @@ func (g *Graph) Worker(v *Vertex) error {
|
|||||||
func (g *Graph) Start(first bool) { // start or continue
|
func (g *Graph) Start(first bool) { // start or continue
|
||||||
log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState())
|
log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState())
|
||||||
defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState())
|
defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState())
|
||||||
|
var wg sync.WaitGroup
|
||||||
t, _ := g.TopologicalSort()
|
t, _ := g.TopologicalSort()
|
||||||
// TODO: only calculate indegree if `first` is true to save resources
|
// TODO: only calculate indegree if `first` is true to save resources
|
||||||
indegree := g.InDegree() // compute all of the indegree's
|
indegree := g.InDegree() // compute all of the indegree's
|
||||||
for _, v := range Reverse(t) {
|
for _, v := range Reverse(t) {
|
||||||
|
// selective poke: here we reduce the number of initial pokes
|
||||||
|
// to the minimum required to activate every vertex in the
|
||||||
|
// graph, either by direct action, or by getting poked by a
|
||||||
|
// vertex that was previously activated. if we poke each vertex
|
||||||
|
// that has no incoming edges, then we can be sure to reach the
|
||||||
|
// whole graph. Please note: this may mask certain optimization
|
||||||
|
// failures, such as any poke limiting code in Poke() or
|
||||||
|
// BackPoke(). You might want to disable this selective start
|
||||||
|
// when experimenting with and testing those elements.
|
||||||
|
// if we are unpausing (since it's not the first run of this
|
||||||
|
// function) we need to poke to *unpause* every graph vertex,
|
||||||
|
// and not just selectively the subset with no indegree.
|
||||||
|
if (!first) || indegree[v] == 0 {
|
||||||
|
v.Res.Starter(true) // let the startup code know to poke
|
||||||
|
}
|
||||||
|
|
||||||
if !v.Res.IsWatching() { // if Watch() is not running...
|
if !v.Res.IsWatching() { // if Watch() is not running...
|
||||||
g.wg.Add(1)
|
g.wg.Add(1)
|
||||||
@@ -475,31 +491,15 @@ func (g *Graph) Start(first bool) { // start or continue
|
|||||||
}(v)
|
}(v)
|
||||||
}
|
}
|
||||||
|
|
||||||
// selective poke: here we reduce the number of initial pokes
|
// let the vertices run their startup code in parallel
|
||||||
// to the minimum required to activate every vertex in the
|
wg.Add(1)
|
||||||
// graph, either by direct action, or by getting poked by a
|
go func(vv *Vertex) {
|
||||||
// vertex that was previously activated. if we poke each vertex
|
defer wg.Done()
|
||||||
// that has no incoming edges, then we can be sure to reach the
|
vv.Res.Started() // block until started
|
||||||
// whole graph. Please note: this may mask certain optimization
|
}(v)
|
||||||
// failures, such as any poke limiting code in Poke() or
|
|
||||||
// BackPoke(). You might want to disable this selective start
|
|
||||||
// when experimenting with and testing those elements.
|
|
||||||
// if we are unpausing (since it's not the first run of this
|
|
||||||
// function) we need to poke to *unpause* every graph vertex,
|
|
||||||
// and not just selectively the subset with no indegree.
|
|
||||||
if (!first) || indegree[v] == 0 {
|
|
||||||
// ensure state is started before continuing on to next vertex
|
|
||||||
for !v.SendEvent(event.EventStart, true, false) {
|
|
||||||
if g.Flags.Debug {
|
|
||||||
// if SendEvent fails, we aren't up yet
|
|
||||||
log.Printf("%s[%s]: Retrying SendEvent(Start)", v.Kind(), v.GetName())
|
|
||||||
// sleep here briefly or otherwise cause
|
|
||||||
// a different goroutine to be scheduled
|
|
||||||
time.Sleep(1 * time.Millisecond)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wg.Wait() // wait for everyone
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait waits for all the graph vertex workers to exit.
|
// Wait waits for all the graph vertex workers to exit.
|
||||||
|
|||||||
@@ -151,6 +151,8 @@ type Base interface {
|
|||||||
SetGroup([]Res)
|
SetGroup([]Res)
|
||||||
VarDir(string) (string, error)
|
VarDir(string) (string, error)
|
||||||
Running(chan event.Event) error // notify the engine that Watch started
|
Running(chan event.Event) error // notify the engine that Watch started
|
||||||
|
Started() <-chan struct{} // returns when the resource has started
|
||||||
|
Starter(bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Res is the minimum interface you need to implement to define a new resource.
|
// Res is the minimum interface you need to implement to define a new resource.
|
||||||
@@ -179,11 +181,13 @@ type BaseRes struct {
|
|||||||
prefix string // base prefix for this resource
|
prefix string // base prefix for this resource
|
||||||
debug bool
|
debug bool
|
||||||
state ResState
|
state ResState
|
||||||
watching bool // is Watch() loop running ?
|
watching bool // is Watch() loop running ?
|
||||||
isStateOK bool // whether the state is okay based on events or not
|
started chan struct{} // closed when worker is started/running
|
||||||
isGrouped bool // am i contained within a group?
|
starter bool // does this have indegree == 0 ? XXX: usually?
|
||||||
grouped []Res // list of any grouped resources
|
isStateOK bool // whether the state is okay based on events or not
|
||||||
refresh bool // does this resource have a refresh to run?
|
isGrouped bool // am i contained within a group?
|
||||||
|
grouped []Res // list of any grouped resources
|
||||||
|
refresh bool // does this resource have a refresh to run?
|
||||||
//refreshState StatefulBool // TODO: future stateful bool
|
//refreshState StatefulBool // TODO: future stateful bool
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -234,6 +238,7 @@ func (obj *BaseRes) Init() error {
|
|||||||
return fmt.Errorf("Resource did not set kind!")
|
return fmt.Errorf("Resource did not set kind!")
|
||||||
}
|
}
|
||||||
obj.events = make(chan event.Event) // unbuffered chan to avoid stale events
|
obj.events = make(chan event.Event) // unbuffered chan to avoid stale events
|
||||||
|
obj.started = make(chan struct{}) // closes when started
|
||||||
//dir, err := obj.VarDir("")
|
//dir, err := obj.VarDir("")
|
||||||
//if err != nil {
|
//if err != nil {
|
||||||
// return errwrap.Wrapf(err, "VarDir failed in Init()")
|
// return errwrap.Wrapf(err, "VarDir failed in Init()")
|
||||||
@@ -426,6 +431,13 @@ func (obj *BaseRes) VarDir(extra string) (string, error) {
|
|||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Started returns a channel that closes when the resource has started up.
|
||||||
|
func (obj *BaseRes) Started() <-chan struct{} { return obj.started }
|
||||||
|
|
||||||
|
// Starter sets the starter bool. This defines if a vertex has an indegree of 0.
|
||||||
|
// If we have an indegree of 0, we'll need to be a poke initiator in the graph.
|
||||||
|
func (obj *BaseRes) Starter(b bool) { obj.starter = b }
|
||||||
|
|
||||||
// ResToB64 encodes a resource to a base64 encoded string (after serialization)
|
// ResToB64 encodes a resource to a base64 encoded string (after serialization)
|
||||||
func ResToB64(res Res) (string, error) {
|
func ResToB64(res Res) (string, error) {
|
||||||
b := bytes.Buffer{}
|
b := bytes.Buffer{}
|
||||||
|
|||||||
@@ -114,10 +114,14 @@ func (obj *BaseRes) Running(processChan chan event.Event) error {
|
|||||||
obj.StateOK(false) // assume we're initially dirty
|
obj.StateOK(false) // assume we're initially dirty
|
||||||
cuid := obj.Converger() // get the converger uid used to report status
|
cuid := obj.Converger() // get the converger uid used to report status
|
||||||
cuid.SetConverged(false) // a reasonable initial assumption
|
cuid.SetConverged(false) // a reasonable initial assumption
|
||||||
|
close(obj.started) // send started signal
|
||||||
|
|
||||||
// FIXME: exit return value is unused atm, so ignore it for now...
|
// FIXME: exit return value is unused atm, so ignore it for now...
|
||||||
//if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
//if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
|
||||||
_, err := obj.DoSend(processChan, "")
|
var err error
|
||||||
|
if obj.starter { // vertices of indegree == 0 should send initial pokes
|
||||||
|
_, err = obj.DoSend(processChan, "") // trigger a CheckApply
|
||||||
|
}
|
||||||
return err // bubble up any possible error (or nil)
|
return err // bubble up any possible error (or nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user