From 11b40bf32f4e3cdf955861437a860a5eb54cf3d8 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Wed, 25 Jan 2017 09:03:50 -0500 Subject: [PATCH] resources: Update state checks The mgmt graph depends on state tracking to eliminate redundant pokes. With the Watch loop now able to produce events quickly, it should no longer play a part in determining the vertex state. This simplifies the resource API as well! --- docs/resource-guide.md | 1 - pgraph/actions.go | 40 +++++++++++--------- resources/exec.go | 1 - resources/file.go | 1 - resources/hostname.go | 1 - resources/msg.go | 1 - resources/noop.go | 1 - resources/nspawn.go | 1 - resources/password.go | 1 - resources/pkg.go | 1 - resources/resources.go | 10 ++--- resources/svc.go | 2 - resources/timer.go | 1 - test/shell/t5b.sh | 7 ++++ test/shell/t5b.yaml | 84 ++++++++++++++++++++++++++++++++++++++++++ 15 files changed, 118 insertions(+), 35 deletions(-) create mode 100755 test/shell/t5b.sh create mode 100644 test/shell/t5b.yaml diff --git a/docs/resource-guide.md b/docs/resource-guide.md index 04fc6881..82b7250c 100644 --- a/docs/resource-guide.md +++ b/docs/resource-guide.md @@ -333,7 +333,6 @@ func (obj *FooRes) Watch(processChan chan *event.Event) error { var send = false // send event? var exit *error for { - obj.SetState(ResStateWatching) // reset select { case event := <-obj.Events(): cuid.SetConverged(false) diff --git a/pgraph/actions.go b/pgraph/actions.go index a68a8354..ea19222b 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -62,16 +62,15 @@ func (g *Graph) OKTimestamp(v *Vertex) bool { return true } -// Poke notifies nodes after me in the dependency graph that they need refreshing... -// NOTE: this assumes that this can never fail or need to be rescheduled -func (g *Graph) Poke(v *Vertex, activity bool) error { +// Poke tells nodes after me in the dependency graph that they need to refresh. +func (g *Graph) Poke(v *Vertex) error { var wg sync.WaitGroup // these are all the vertices pointing AWAY FROM v, eg: v -> ??? for _, n := range g.OutgoingGraphVertices(v) { - // XXX: if we're in state event and haven't been cancelled by - // apply, then we can cancel a poke to a child, right? XXX - // XXX: if n.Res.getState() != resources.ResStateEvent || activity { // is this correct? - if true || activity { // XXX: ??? + // we can skip this poke if resource hasn't done work yet... it + // needs to be poked if already running, or not running though! + // TODO: does this need an || activity flag? + if n.Res.GetState() != resources.ResStateProcess { if g.Flags.Debug { log.Printf("%s[%s]: Poke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) } @@ -80,9 +79,7 @@ func (g *Graph) Poke(v *Vertex, activity bool) error { defer wg.Done() //edge := g.Adjacency[v][nn] // lookup //notify := edge.Notify && edge.Refresh() - nn.SendEvent(event.EventPoke, nil) - // TODO: check return value? - return nil // never error for now... + return nn.SendEvent(event.EventPoke, nil) }(n) } else { @@ -91,6 +88,7 @@ func (g *Graph) Poke(v *Vertex, activity bool) error { } } } + // TODO: do something with return values? wg.Wait() // wait for all the pokes to complete return nil } @@ -100,13 +98,13 @@ func (g *Graph) BackPoke(v *Vertex) { // these are all the vertices pointing TO v, eg: ??? -> v for _, n := range g.IncomingGraphVertices(v) { x, y, s := v.GetTimestamp(), n.GetTimestamp(), n.Res.GetState() - // if the parent timestamp needs poking AND it's not in state - // ResStateEvent, then poke it. If the parent is in ResStateEvent it + // If the parent timestamp needs poking AND it's not running + // Process, then poke it. If the parent is in ResStateProcess it // means that an event is pending, so we'll be expecting a poke // back soon, so we can safely discard the extra parent poke... // TODO: implement a stateLT (less than) to tell if something // happens earlier in the state cycle and that doesn't wrap nil - if x >= y && (s != resources.ResStateEvent && s != resources.ResStateCheckApply) { + if x >= y && (s != resources.ResStateProcess && s != resources.ResStateCheckApply) { if g.Flags.Debug { log.Printf("%s[%s]: BackPoke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) } @@ -158,7 +156,8 @@ func (g *Graph) Process(v *Vertex) error { if g.Flags.Debug { log.Printf("%s[%s]: Process()", obj.Kind(), obj.GetName()) } - obj.SetState(resources.ResStateEvent) + defer obj.SetState(resources.ResStateNil) // reset state when finished + obj.SetState(resources.ResStateProcess) var ok = true var applied = false // did we run an apply? // is it okay to run dependency wise right now? @@ -174,8 +173,6 @@ func (g *Graph) Process(v *Vertex) error { log.Printf("%s[%s]: OKTimestamp(%v)", obj.Kind(), obj.GetName(), v.GetTimestamp()) } - obj.SetState(resources.ResStateCheckApply) - // connect any senders to receivers and detect if values changed if updated, err := obj.SendRecv(obj); err != nil { return errwrap.Wrapf(err, "could not SendRecv in Process") @@ -201,6 +198,9 @@ func (g *Graph) Process(v *Vertex) error { refresh = g.RefreshPending(v) // do i need to perform a refresh? obj.SetRefresh(refresh) // tell the resource + // changes can occur after this... + obj.SetState(resources.ResStateCheckApply) + // check cached state, to skip CheckApply; can't skip if refreshing if !refresh && obj.IsStateOK() { checkOK, err = true, nil @@ -283,7 +283,7 @@ func (g *Graph) Process(v *Vertex) error { // nodes might fail due to having a too old timestamp! v.UpdateTimestamp() // this was touched... obj.SetState(resources.ResStatePoking) // can't cancel parent poke - if err := g.Poke(v, activity); err != nil { + if err := g.Poke(v); err != nil { return errwrap.Wrapf(err, "the Poke() failed") } } @@ -343,6 +343,12 @@ func (g *Graph) Worker(v *Vertex) error { break Loop // no event, so no ack! } + // if process started, but no action yet, skip! + if v.Res.GetState() == resources.ResStateProcess { + ev.ACK() // ready for next message + continue + } + // if running, we skip running a new execution! // if waiting, we skip running a new execution! if running || waiting { diff --git a/resources/exec.go b/resources/exec.go index 20cb45c0..b09c7e25 100644 --- a/resources/exec.go +++ b/resources/exec.go @@ -163,7 +163,6 @@ func (obj *ExecRes) Watch(processChan chan *event.Event) error { } for { - obj.SetState(ResStateWatching) // reset select { case text := <-bufioch: cuid.SetConverged(false) diff --git a/resources/file.go b/resources/file.go index 56e3f885..b80f2341 100644 --- a/resources/file.go +++ b/resources/file.go @@ -170,7 +170,6 @@ func (obj *FileRes) Watch(processChan chan *event.Event) error { log.Printf("%s[%s]: Watching: %s", obj.Kind(), obj.GetName(), obj.Path) // attempting to watch... } - obj.SetState(ResStateWatching) // reset select { case event, ok := <-obj.recWatcher.Events(): if !ok { // channel shutdown diff --git a/resources/hostname.go b/resources/hostname.go index f13a5132..a8dbd2db 100644 --- a/resources/hostname.go +++ b/resources/hostname.go @@ -138,7 +138,6 @@ func (obj *HostnameRes) Watch(processChan chan *event.Event) error { var send = false // send event? for { - obj.SetState(ResStateWatching) // reset select { case <-signals: cuid.SetConverged(false) diff --git a/resources/msg.go b/resources/msg.go index 67daeaef..4b1ba42c 100644 --- a/resources/msg.go +++ b/resources/msg.go @@ -150,7 +150,6 @@ func (obj *MsgRes) Watch(processChan chan *event.Event) error { var send = false // send event? var exit *error for { - obj.SetState(ResStateWatching) // reset select { case event := <-obj.Events(): cuid.SetConverged(false) diff --git a/resources/noop.go b/resources/noop.go index b4341b6b..659b76f1 100644 --- a/resources/noop.go +++ b/resources/noop.go @@ -74,7 +74,6 @@ func (obj *NoopRes) Watch(processChan chan *event.Event) error { var send = false // send event? var exit *error for { - obj.SetState(ResStateWatching) // reset select { case event := <-obj.Events(): cuid.SetConverged(false) diff --git a/resources/nspawn.go b/resources/nspawn.go index 63230490..1d3ed245 100644 --- a/resources/nspawn.go +++ b/resources/nspawn.go @@ -140,7 +140,6 @@ func (obj *NspawnRes) Watch(processChan chan *event.Event) error { var exit *error for { - obj.SetState(ResStateWatching) select { case event := <-buschan: // process org.freedesktop.machine1 events for this resource's name diff --git a/resources/password.go b/resources/password.go index 859eb8ae..07cf5f62 100644 --- a/resources/password.go +++ b/resources/password.go @@ -191,7 +191,6 @@ func (obj *PasswordRes) Watch(processChan chan *event.Event) error { var send = false // send event? var exit *error for { - obj.SetState(ResStateWatching) // reset select { // NOTE: this part is very similar to the file resource code case event, ok := <-obj.recWatcher.Events(): diff --git a/resources/pkg.go b/resources/pkg.go index 8b1bcb8c..f2f68569 100644 --- a/resources/pkg.go +++ b/resources/pkg.go @@ -142,7 +142,6 @@ func (obj *PkgRes) Watch(processChan chan *event.Event) error { log.Printf("%s: Watching...", obj.fmtNames(obj.getNames())) } - obj.SetState(ResStateWatching) // reset select { case event := <-ch: cuid.SetConverged(false) diff --git a/resources/resources.go b/resources/resources.go index 94cb08fb..ac52f92d 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -44,11 +44,10 @@ type ResState int // Each ResState should be set properly in the relevant part of the resource. const ( - ResStateNil ResState = iota - ResStateWatching - ResStateEvent // an event has happened, but we haven't poked yet - ResStateCheckApply - ResStatePoking + ResStateNil ResState = iota + ResStateProcess // we're in process, but we haven't done much yet + ResStateCheckApply // we're about to run CheckApply + ResStatePoking // we're done CheckApply, and we're about to poke ) const refreshPathToken = "refresh" @@ -535,7 +534,6 @@ func (obj *BaseRes) Poll(processChan chan *event.Event) error { var send = false var exit *error for { - obj.SetState(ResStateWatching) select { case <-ticker.C: // received the timer event log.Printf("%s[%s]: polling...", obj.Kind(), obj.GetName()) diff --git a/resources/svc.go b/resources/svc.go index cdb3a765..c243a6b7 100644 --- a/resources/svc.go +++ b/resources/svc.go @@ -153,7 +153,6 @@ func (obj *SvcRes) Watch(processChan chan *event.Event) error { set.Remove(svc) // no return value should ever occur } - obj.SetState(ResStateWatching) // reset select { case <-buschan: // XXX: wait for new units event to unstick cuid.SetConverged(false) @@ -177,7 +176,6 @@ func (obj *SvcRes) Watch(processChan chan *event.Event) error { } log.Printf("Watching: %s", svc) // attempting to watch... - obj.SetState(ResStateWatching) // reset select { case event := <-subChannel: diff --git a/resources/timer.go b/resources/timer.go index 36c14ac4..5be83379 100644 --- a/resources/timer.go +++ b/resources/timer.go @@ -92,7 +92,6 @@ func (obj *TimerRes) Watch(processChan chan *event.Event) error { var send = false for { - obj.SetState(ResStateWatching) select { case <-obj.ticker.C: // received the timer event send = true diff --git a/test/shell/t5b.sh b/test/shell/t5b.sh new file mode 100755 index 00000000..48b6ee37 --- /dev/null +++ b/test/shell/t5b.sh @@ -0,0 +1,7 @@ +#!/bin/bash -e + +# should take slightly more than 35s, but fail if we take 45s) +timeout --kill-after=45s 40s ./mgmt run --yaml t5.yaml --converged-timeout=5 --no-watch --tmp-prefix & +pid=$! +wait $pid # get exit status +exit $? diff --git a/test/shell/t5b.yaml b/test/shell/t5b.yaml new file mode 100644 index 00000000..e21cf827 --- /dev/null +++ b/test/shell/t5b.yaml @@ -0,0 +1,84 @@ +--- +graph: mygraph +comment: simpler exec fan in to fan out example to demonstrate optimization +resources: + exec: + - name: exec1 + cmd: sleep 10s + shell: '' + timeout: 0 + watchcmd: '' + watchshell: '' + ifcmd: '' + ifshell: '' + pollint: 0 + state: present + - name: exec2 + cmd: sleep 10s + shell: '' + timeout: 0 + watchcmd: '' + watchshell: '' + ifcmd: '' + ifshell: '' + pollint: 0 + state: present + - name: exec3 + cmd: sleep 10s + shell: '' + timeout: 0 + watchcmd: '' + watchshell: '' + ifcmd: '' + ifshell: '' + pollint: 0 + state: present + - name: exec4 + cmd: sleep 10s + shell: '' + timeout: 0 + watchcmd: '' + watchshell: '' + ifcmd: '' + ifshell: '' + pollint: 0 + state: present + - name: exec5 + cmd: sleep 10s + shell: '' + timeout: 0 + watchcmd: '' + watchshell: '' + ifcmd: '' + ifshell: '' + pollint: 0 + state: present +edges: +- name: e1 + from: + kind: exec + name: exec1 + to: + kind: exec + name: exec3 +- name: e2 + from: + kind: exec + name: exec2 + to: + kind: exec + name: exec3 +- name: e3 + from: + kind: exec + name: exec3 + to: + kind: exec + name: exec4 +- name: e4 + from: + kind: exec + name: exec3 + to: + kind: exec + name: exec5