From 0009d9b20e7b276df79b4e5eb99206b1b4c91641 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Tue, 20 Dec 2016 04:29:14 -0500 Subject: [PATCH] pgraph, resources: Integrate properly with the startup logic This signals which resources have to run their initial pokes, and removes the racy retry timer. We actually get a proper signal when things are running too! --- pgraph/actions.go | 48 +++++++++++++++++++++--------------------- resources/resources.go | 22 ++++++++++++++----- resources/sendrecv.go | 6 +++++- 3 files changed, 46 insertions(+), 30 deletions(-) diff --git a/pgraph/actions.go b/pgraph/actions.go index 1626cdd4..a1a1778b 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -453,10 +453,26 @@ func (g *Graph) Worker(v *Vertex) error { func (g *Graph) Start(first bool) { // start or continue log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState()) defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState()) + var wg sync.WaitGroup t, _ := g.TopologicalSort() // TODO: only calculate indegree if `first` is true to save resources indegree := g.InDegree() // compute all of the indegree's for _, v := range Reverse(t) { + // selective poke: here we reduce the number of initial pokes + // to the minimum required to activate every vertex in the + // graph, either by direct action, or by getting poked by a + // vertex that was previously activated. if we poke each vertex + // that has no incoming edges, then we can be sure to reach the + // whole graph. Please note: this may mask certain optimization + // failures, such as any poke limiting code in Poke() or + // BackPoke(). You might want to disable this selective start + // when experimenting with and testing those elements. + // if we are unpausing (since it's not the first run of this + // function) we need to poke to *unpause* every graph vertex, + // and not just selectively the subset with no indegree. + if (!first) || indegree[v] == 0 { + v.Res.Starter(true) // let the startup code know to poke + } if !v.Res.IsWatching() { // if Watch() is not running... g.wg.Add(1) @@ -475,31 +491,15 @@ func (g *Graph) Start(first bool) { // start or continue }(v) } - // selective poke: here we reduce the number of initial pokes - // to the minimum required to activate every vertex in the - // graph, either by direct action, or by getting poked by a - // vertex that was previously activated. if we poke each vertex - // that has no incoming edges, then we can be sure to reach the - // whole graph. Please note: this may mask certain optimization - // failures, such as any poke limiting code in Poke() or - // BackPoke(). You might want to disable this selective start - // when experimenting with and testing those elements. - // if we are unpausing (since it's not the first run of this - // function) we need to poke to *unpause* every graph vertex, - // and not just selectively the subset with no indegree. - if (!first) || indegree[v] == 0 { - // ensure state is started before continuing on to next vertex - for !v.SendEvent(event.EventStart, true, false) { - if g.Flags.Debug { - // if SendEvent fails, we aren't up yet - log.Printf("%s[%s]: Retrying SendEvent(Start)", v.Kind(), v.GetName()) - // sleep here briefly or otherwise cause - // a different goroutine to be scheduled - time.Sleep(1 * time.Millisecond) - } - } - } + // let the vertices run their startup code in parallel + wg.Add(1) + go func(vv *Vertex) { + defer wg.Done() + vv.Res.Started() // block until started + }(v) } + + wg.Wait() // wait for everyone } // Wait waits for all the graph vertex workers to exit. diff --git a/resources/resources.go b/resources/resources.go index 959f6bb6..5adf8ad7 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -151,6 +151,8 @@ type Base interface { SetGroup([]Res) VarDir(string) (string, error) Running(chan event.Event) error // notify the engine that Watch started + Started() <-chan struct{} // returns when the resource has started + Starter(bool) } // Res is the minimum interface you need to implement to define a new resource. @@ -179,11 +181,13 @@ type BaseRes struct { prefix string // base prefix for this resource debug bool state ResState - watching bool // is Watch() loop running ? - isStateOK bool // whether the state is okay based on events or not - isGrouped bool // am i contained within a group? - grouped []Res // list of any grouped resources - refresh bool // does this resource have a refresh to run? + watching bool // is Watch() loop running ? + started chan struct{} // closed when worker is started/running + starter bool // does this have indegree == 0 ? XXX: usually? + isStateOK bool // whether the state is okay based on events or not + isGrouped bool // am i contained within a group? + grouped []Res // list of any grouped resources + refresh bool // does this resource have a refresh to run? //refreshState StatefulBool // TODO: future stateful bool } @@ -234,6 +238,7 @@ func (obj *BaseRes) Init() error { return fmt.Errorf("Resource did not set kind!") } obj.events = make(chan event.Event) // unbuffered chan to avoid stale events + obj.started = make(chan struct{}) // closes when started //dir, err := obj.VarDir("") //if err != nil { // return errwrap.Wrapf(err, "VarDir failed in Init()") @@ -426,6 +431,13 @@ func (obj *BaseRes) VarDir(extra string) (string, error) { return p, nil } +// Started returns a channel that closes when the resource has started up. +func (obj *BaseRes) Started() <-chan struct{} { return obj.started } + +// Starter sets the starter bool. This defines if a vertex has an indegree of 0. +// If we have an indegree of 0, we'll need to be a poke initiator in the graph. +func (obj *BaseRes) Starter(b bool) { obj.starter = b } + // ResToB64 encodes a resource to a base64 encoded string (after serialization) func ResToB64(res Res) (string, error) { b := bytes.Buffer{} diff --git a/resources/sendrecv.go b/resources/sendrecv.go index 7c89aaed..6be71286 100644 --- a/resources/sendrecv.go +++ b/resources/sendrecv.go @@ -114,10 +114,14 @@ func (obj *BaseRes) Running(processChan chan event.Event) error { obj.StateOK(false) // assume we're initially dirty cuid := obj.Converger() // get the converger uid used to report status cuid.SetConverged(false) // a reasonable initial assumption + close(obj.started) // send started signal // FIXME: exit return value is unused atm, so ignore it for now... //if exit, err := obj.DoSend(processChan, ""); exit || err != nil { - _, err := obj.DoSend(processChan, "") + var err error + if obj.starter { // vertices of indegree == 0 should send initial pokes + _, err = obj.DoSend(processChan, "") // trigger a CheckApply + } return err // bubble up any possible error (or nil) }