diff --git a/pgraph/actions.go b/pgraph/actions.go index d38e829e..37f91750 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -29,7 +29,6 @@ import ( "github.com/purpleidea/mgmt/resources" errwrap "github.com/pkg/errors" - "golang.org/x/sync/errgroup" ) // GetTimestamp returns the timestamp of a vertex @@ -65,7 +64,7 @@ func (g *Graph) OKTimestamp(v *Vertex) bool { // Poke notifies nodes after me in the dependency graph that they need refreshing... // NOTE: this assumes that this can never fail or need to be rescheduled func (g *Graph) Poke(v *Vertex, activity bool) error { - var eg errgroup.Group + var wg sync.WaitGroup // these are all the vertices pointing AWAY FROM v, eg: v -> ??? for _, n := range g.OutgoingGraphVertices(v) { // XXX: if we're in state event and haven't been cancelled by @@ -75,17 +74,17 @@ func (g *Graph) Poke(v *Vertex, activity bool) error { if global.DEBUG { log.Printf("%s[%s]: Poke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) } - //wg.Add(1) - eg.Go(func() error { - //defer wg.Done() - edge := g.Adjacency[v][n] // lookup + wg.Add(1) + go func(nn *Vertex) error { + defer wg.Done() + edge := g.Adjacency[v][nn] // lookup notify := edge.Notify && edge.Refresh() // FIXME: is it okay that this is sync? - n.SendEvent(event.EventPoke, true, notify) + nn.SendEvent(event.EventPoke, true, notify) // TODO: check return value? return nil // never error for now... - }) + }(n) } else { if global.DEBUG { @@ -93,7 +92,8 @@ func (g *Graph) Poke(v *Vertex, activity bool) error { } } } - return eg.Wait() // wait for all the pokes to complete + wg.Wait() // wait for all the pokes to complete + return nil } // BackPoke pokes the pre-requisites that are stale and need to run before I can run. @@ -218,8 +218,11 @@ func (g *Graph) Process(v *Vertex) error { // if CheckApply ran without noop and without error, state should be good if !noop && err == nil { // aka !noop || checkOK - obj.StateOK(true) // reset - g.SetUpstreamRefresh(v, false) // refresh happened, clear the request + obj.StateOK(true) // reset + if refresh { + g.SetUpstreamRefresh(v, false) // refresh happened, clear the request + obj.SetRefresh(false) + } } if !checkOK { // if state *was* not ok, we had to have apply'ed diff --git a/resources/sendrecv.go b/resources/sendrecv.go index 96fac117..8f209202 100644 --- a/resources/sendrecv.go +++ b/resources/sendrecv.go @@ -65,7 +65,7 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, send bool) { poke = true // poke! // XXX: this should be elsewhere in case Watch isn't used (eg: Polling instead...) // XXX: unless this is used in our "fallback" polling implementation??? - obj.SetRefresh(true) + //obj.SetRefresh(true) // TODO: is this redundant? } switch ev.Name {