resources: Fix poke/refresh race
Clearly the use of errgroup is flawed. 1) You can't pass in variables, so this is likely to race. 2) You can't get a set of errors, so this is a bad API. For the second problem, it would be much more sane to return a multierr or a list of errors. If there's no fix for the first, I think it should be removed from the lib.
This commit is contained in:
@@ -29,7 +29,6 @@ import (
|
|||||||
"github.com/purpleidea/mgmt/resources"
|
"github.com/purpleidea/mgmt/resources"
|
||||||
|
|
||||||
errwrap "github.com/pkg/errors"
|
errwrap "github.com/pkg/errors"
|
||||||
"golang.org/x/sync/errgroup"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// GetTimestamp returns the timestamp of a vertex
|
// GetTimestamp returns the timestamp of a vertex
|
||||||
@@ -65,7 +64,7 @@ func (g *Graph) OKTimestamp(v *Vertex) bool {
|
|||||||
// Poke notifies nodes after me in the dependency graph that they need refreshing...
|
// Poke notifies nodes after me in the dependency graph that they need refreshing...
|
||||||
// NOTE: this assumes that this can never fail or need to be rescheduled
|
// NOTE: this assumes that this can never fail or need to be rescheduled
|
||||||
func (g *Graph) Poke(v *Vertex, activity bool) error {
|
func (g *Graph) Poke(v *Vertex, activity bool) error {
|
||||||
var eg errgroup.Group
|
var wg sync.WaitGroup
|
||||||
// these are all the vertices pointing AWAY FROM v, eg: v -> ???
|
// these are all the vertices pointing AWAY FROM v, eg: v -> ???
|
||||||
for _, n := range g.OutgoingGraphVertices(v) {
|
for _, n := range g.OutgoingGraphVertices(v) {
|
||||||
// XXX: if we're in state event and haven't been cancelled by
|
// XXX: if we're in state event and haven't been cancelled by
|
||||||
@@ -75,17 +74,17 @@ func (g *Graph) Poke(v *Vertex, activity bool) error {
|
|||||||
if global.DEBUG {
|
if global.DEBUG {
|
||||||
log.Printf("%s[%s]: Poke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
|
log.Printf("%s[%s]: Poke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
|
||||||
}
|
}
|
||||||
//wg.Add(1)
|
wg.Add(1)
|
||||||
eg.Go(func() error {
|
go func(nn *Vertex) error {
|
||||||
//defer wg.Done()
|
defer wg.Done()
|
||||||
edge := g.Adjacency[v][n] // lookup
|
edge := g.Adjacency[v][nn] // lookup
|
||||||
notify := edge.Notify && edge.Refresh()
|
notify := edge.Notify && edge.Refresh()
|
||||||
|
|
||||||
// FIXME: is it okay that this is sync?
|
// FIXME: is it okay that this is sync?
|
||||||
n.SendEvent(event.EventPoke, true, notify)
|
nn.SendEvent(event.EventPoke, true, notify)
|
||||||
// TODO: check return value?
|
// TODO: check return value?
|
||||||
return nil // never error for now...
|
return nil // never error for now...
|
||||||
})
|
}(n)
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
if global.DEBUG {
|
if global.DEBUG {
|
||||||
@@ -93,7 +92,8 @@ func (g *Graph) Poke(v *Vertex, activity bool) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return eg.Wait() // wait for all the pokes to complete
|
wg.Wait() // wait for all the pokes to complete
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// BackPoke pokes the pre-requisites that are stale and need to run before I can run.
|
// BackPoke pokes the pre-requisites that are stale and need to run before I can run.
|
||||||
@@ -219,7 +219,10 @@ func (g *Graph) Process(v *Vertex) error {
|
|||||||
// if CheckApply ran without noop and without error, state should be good
|
// if CheckApply ran without noop and without error, state should be good
|
||||||
if !noop && err == nil { // aka !noop || checkOK
|
if !noop && err == nil { // aka !noop || checkOK
|
||||||
obj.StateOK(true) // reset
|
obj.StateOK(true) // reset
|
||||||
|
if refresh {
|
||||||
g.SetUpstreamRefresh(v, false) // refresh happened, clear the request
|
g.SetUpstreamRefresh(v, false) // refresh happened, clear the request
|
||||||
|
obj.SetRefresh(false)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !checkOK { // if state *was* not ok, we had to have apply'ed
|
if !checkOK { // if state *was* not ok, we had to have apply'ed
|
||||||
|
|||||||
@@ -65,7 +65,7 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, send bool) {
|
|||||||
poke = true // poke!
|
poke = true // poke!
|
||||||
// XXX: this should be elsewhere in case Watch isn't used (eg: Polling instead...)
|
// XXX: this should be elsewhere in case Watch isn't used (eg: Polling instead...)
|
||||||
// XXX: unless this is used in our "fallback" polling implementation???
|
// XXX: unless this is used in our "fallback" polling implementation???
|
||||||
obj.SetRefresh(true)
|
//obj.SetRefresh(true) // TODO: is this redundant?
|
||||||
}
|
}
|
||||||
|
|
||||||
switch ev.Name {
|
switch ev.Name {
|
||||||
|
|||||||
Reference in New Issue
Block a user