resources: Update state checks
The mgmt graph depends on state tracking to eliminate redundant pokes. With the Watch loop now able to produce events quickly, it should no longer play a part in determining the vertex state. This simplifies the resource API as well!
This commit is contained in:
@@ -333,7 +333,6 @@ func (obj *FooRes) Watch(processChan chan *event.Event) error {
|
|||||||
var send = false // send event?
|
var send = false // send event?
|
||||||
var exit *error
|
var exit *error
|
||||||
for {
|
for {
|
||||||
obj.SetState(ResStateWatching) // reset
|
|
||||||
select {
|
select {
|
||||||
case event := <-obj.Events():
|
case event := <-obj.Events():
|
||||||
cuid.SetConverged(false)
|
cuid.SetConverged(false)
|
||||||
|
|||||||
@@ -62,16 +62,15 @@ func (g *Graph) OKTimestamp(v *Vertex) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Poke notifies nodes after me in the dependency graph that they need refreshing...
|
// Poke tells nodes after me in the dependency graph that they need to refresh.
|
||||||
// NOTE: this assumes that this can never fail or need to be rescheduled
|
func (g *Graph) Poke(v *Vertex) error {
|
||||||
func (g *Graph) Poke(v *Vertex, activity bool) error {
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
// these are all the vertices pointing AWAY FROM v, eg: v -> ???
|
// these are all the vertices pointing AWAY FROM v, eg: v -> ???
|
||||||
for _, n := range g.OutgoingGraphVertices(v) {
|
for _, n := range g.OutgoingGraphVertices(v) {
|
||||||
// XXX: if we're in state event and haven't been cancelled by
|
// we can skip this poke if resource hasn't done work yet... it
|
||||||
// apply, then we can cancel a poke to a child, right? XXX
|
// needs to be poked if already running, or not running though!
|
||||||
// XXX: if n.Res.getState() != resources.ResStateEvent || activity { // is this correct?
|
// TODO: does this need an || activity flag?
|
||||||
if true || activity { // XXX: ???
|
if n.Res.GetState() != resources.ResStateProcess {
|
||||||
if g.Flags.Debug {
|
if g.Flags.Debug {
|
||||||
log.Printf("%s[%s]: Poke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
|
log.Printf("%s[%s]: Poke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
|
||||||
}
|
}
|
||||||
@@ -80,9 +79,7 @@ func (g *Graph) Poke(v *Vertex, activity bool) error {
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
//edge := g.Adjacency[v][nn] // lookup
|
//edge := g.Adjacency[v][nn] // lookup
|
||||||
//notify := edge.Notify && edge.Refresh()
|
//notify := edge.Notify && edge.Refresh()
|
||||||
nn.SendEvent(event.EventPoke, nil)
|
return nn.SendEvent(event.EventPoke, nil)
|
||||||
// TODO: check return value?
|
|
||||||
return nil // never error for now...
|
|
||||||
}(n)
|
}(n)
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
@@ -91,6 +88,7 @@ func (g *Graph) Poke(v *Vertex, activity bool) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// TODO: do something with return values?
|
||||||
wg.Wait() // wait for all the pokes to complete
|
wg.Wait() // wait for all the pokes to complete
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -100,13 +98,13 @@ func (g *Graph) BackPoke(v *Vertex) {
|
|||||||
// these are all the vertices pointing TO v, eg: ??? -> v
|
// these are all the vertices pointing TO v, eg: ??? -> v
|
||||||
for _, n := range g.IncomingGraphVertices(v) {
|
for _, n := range g.IncomingGraphVertices(v) {
|
||||||
x, y, s := v.GetTimestamp(), n.GetTimestamp(), n.Res.GetState()
|
x, y, s := v.GetTimestamp(), n.GetTimestamp(), n.Res.GetState()
|
||||||
// if the parent timestamp needs poking AND it's not in state
|
// If the parent timestamp needs poking AND it's not running
|
||||||
// ResStateEvent, then poke it. If the parent is in ResStateEvent it
|
// Process, then poke it. If the parent is in ResStateProcess it
|
||||||
// means that an event is pending, so we'll be expecting a poke
|
// means that an event is pending, so we'll be expecting a poke
|
||||||
// back soon, so we can safely discard the extra parent poke...
|
// back soon, so we can safely discard the extra parent poke...
|
||||||
// TODO: implement a stateLT (less than) to tell if something
|
// TODO: implement a stateLT (less than) to tell if something
|
||||||
// happens earlier in the state cycle and that doesn't wrap nil
|
// happens earlier in the state cycle and that doesn't wrap nil
|
||||||
if x >= y && (s != resources.ResStateEvent && s != resources.ResStateCheckApply) {
|
if x >= y && (s != resources.ResStateProcess && s != resources.ResStateCheckApply) {
|
||||||
if g.Flags.Debug {
|
if g.Flags.Debug {
|
||||||
log.Printf("%s[%s]: BackPoke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
|
log.Printf("%s[%s]: BackPoke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
|
||||||
}
|
}
|
||||||
@@ -158,7 +156,8 @@ func (g *Graph) Process(v *Vertex) error {
|
|||||||
if g.Flags.Debug {
|
if g.Flags.Debug {
|
||||||
log.Printf("%s[%s]: Process()", obj.Kind(), obj.GetName())
|
log.Printf("%s[%s]: Process()", obj.Kind(), obj.GetName())
|
||||||
}
|
}
|
||||||
obj.SetState(resources.ResStateEvent)
|
defer obj.SetState(resources.ResStateNil) // reset state when finished
|
||||||
|
obj.SetState(resources.ResStateProcess)
|
||||||
var ok = true
|
var ok = true
|
||||||
var applied = false // did we run an apply?
|
var applied = false // did we run an apply?
|
||||||
// is it okay to run dependency wise right now?
|
// is it okay to run dependency wise right now?
|
||||||
@@ -174,8 +173,6 @@ func (g *Graph) Process(v *Vertex) error {
|
|||||||
log.Printf("%s[%s]: OKTimestamp(%v)", obj.Kind(), obj.GetName(), v.GetTimestamp())
|
log.Printf("%s[%s]: OKTimestamp(%v)", obj.Kind(), obj.GetName(), v.GetTimestamp())
|
||||||
}
|
}
|
||||||
|
|
||||||
obj.SetState(resources.ResStateCheckApply)
|
|
||||||
|
|
||||||
// connect any senders to receivers and detect if values changed
|
// connect any senders to receivers and detect if values changed
|
||||||
if updated, err := obj.SendRecv(obj); err != nil {
|
if updated, err := obj.SendRecv(obj); err != nil {
|
||||||
return errwrap.Wrapf(err, "could not SendRecv in Process")
|
return errwrap.Wrapf(err, "could not SendRecv in Process")
|
||||||
@@ -201,6 +198,9 @@ func (g *Graph) Process(v *Vertex) error {
|
|||||||
refresh = g.RefreshPending(v) // do i need to perform a refresh?
|
refresh = g.RefreshPending(v) // do i need to perform a refresh?
|
||||||
obj.SetRefresh(refresh) // tell the resource
|
obj.SetRefresh(refresh) // tell the resource
|
||||||
|
|
||||||
|
// changes can occur after this...
|
||||||
|
obj.SetState(resources.ResStateCheckApply)
|
||||||
|
|
||||||
// check cached state, to skip CheckApply; can't skip if refreshing
|
// check cached state, to skip CheckApply; can't skip if refreshing
|
||||||
if !refresh && obj.IsStateOK() {
|
if !refresh && obj.IsStateOK() {
|
||||||
checkOK, err = true, nil
|
checkOK, err = true, nil
|
||||||
@@ -283,7 +283,7 @@ func (g *Graph) Process(v *Vertex) error {
|
|||||||
// nodes might fail due to having a too old timestamp!
|
// nodes might fail due to having a too old timestamp!
|
||||||
v.UpdateTimestamp() // this was touched...
|
v.UpdateTimestamp() // this was touched...
|
||||||
obj.SetState(resources.ResStatePoking) // can't cancel parent poke
|
obj.SetState(resources.ResStatePoking) // can't cancel parent poke
|
||||||
if err := g.Poke(v, activity); err != nil {
|
if err := g.Poke(v); err != nil {
|
||||||
return errwrap.Wrapf(err, "the Poke() failed")
|
return errwrap.Wrapf(err, "the Poke() failed")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -343,6 +343,12 @@ func (g *Graph) Worker(v *Vertex) error {
|
|||||||
break Loop // no event, so no ack!
|
break Loop // no event, so no ack!
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if process started, but no action yet, skip!
|
||||||
|
if v.Res.GetState() == resources.ResStateProcess {
|
||||||
|
ev.ACK() // ready for next message
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// if running, we skip running a new execution!
|
// if running, we skip running a new execution!
|
||||||
// if waiting, we skip running a new execution!
|
// if waiting, we skip running a new execution!
|
||||||
if running || waiting {
|
if running || waiting {
|
||||||
|
|||||||
@@ -163,7 +163,6 @@ func (obj *ExecRes) Watch(processChan chan *event.Event) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
obj.SetState(ResStateWatching) // reset
|
|
||||||
select {
|
select {
|
||||||
case text := <-bufioch:
|
case text := <-bufioch:
|
||||||
cuid.SetConverged(false)
|
cuid.SetConverged(false)
|
||||||
|
|||||||
@@ -170,7 +170,6 @@ func (obj *FileRes) Watch(processChan chan *event.Event) error {
|
|||||||
log.Printf("%s[%s]: Watching: %s", obj.Kind(), obj.GetName(), obj.Path) // attempting to watch...
|
log.Printf("%s[%s]: Watching: %s", obj.Kind(), obj.GetName(), obj.Path) // attempting to watch...
|
||||||
}
|
}
|
||||||
|
|
||||||
obj.SetState(ResStateWatching) // reset
|
|
||||||
select {
|
select {
|
||||||
case event, ok := <-obj.recWatcher.Events():
|
case event, ok := <-obj.recWatcher.Events():
|
||||||
if !ok { // channel shutdown
|
if !ok { // channel shutdown
|
||||||
|
|||||||
@@ -138,7 +138,6 @@ func (obj *HostnameRes) Watch(processChan chan *event.Event) error {
|
|||||||
var send = false // send event?
|
var send = false // send event?
|
||||||
|
|
||||||
for {
|
for {
|
||||||
obj.SetState(ResStateWatching) // reset
|
|
||||||
select {
|
select {
|
||||||
case <-signals:
|
case <-signals:
|
||||||
cuid.SetConverged(false)
|
cuid.SetConverged(false)
|
||||||
|
|||||||
@@ -150,7 +150,6 @@ func (obj *MsgRes) Watch(processChan chan *event.Event) error {
|
|||||||
var send = false // send event?
|
var send = false // send event?
|
||||||
var exit *error
|
var exit *error
|
||||||
for {
|
for {
|
||||||
obj.SetState(ResStateWatching) // reset
|
|
||||||
select {
|
select {
|
||||||
case event := <-obj.Events():
|
case event := <-obj.Events():
|
||||||
cuid.SetConverged(false)
|
cuid.SetConverged(false)
|
||||||
|
|||||||
@@ -74,7 +74,6 @@ func (obj *NoopRes) Watch(processChan chan *event.Event) error {
|
|||||||
var send = false // send event?
|
var send = false // send event?
|
||||||
var exit *error
|
var exit *error
|
||||||
for {
|
for {
|
||||||
obj.SetState(ResStateWatching) // reset
|
|
||||||
select {
|
select {
|
||||||
case event := <-obj.Events():
|
case event := <-obj.Events():
|
||||||
cuid.SetConverged(false)
|
cuid.SetConverged(false)
|
||||||
|
|||||||
@@ -140,7 +140,6 @@ func (obj *NspawnRes) Watch(processChan chan *event.Event) error {
|
|||||||
var exit *error
|
var exit *error
|
||||||
|
|
||||||
for {
|
for {
|
||||||
obj.SetState(ResStateWatching)
|
|
||||||
select {
|
select {
|
||||||
case event := <-buschan:
|
case event := <-buschan:
|
||||||
// process org.freedesktop.machine1 events for this resource's name
|
// process org.freedesktop.machine1 events for this resource's name
|
||||||
|
|||||||
@@ -191,7 +191,6 @@ func (obj *PasswordRes) Watch(processChan chan *event.Event) error {
|
|||||||
var send = false // send event?
|
var send = false // send event?
|
||||||
var exit *error
|
var exit *error
|
||||||
for {
|
for {
|
||||||
obj.SetState(ResStateWatching) // reset
|
|
||||||
select {
|
select {
|
||||||
// NOTE: this part is very similar to the file resource code
|
// NOTE: this part is very similar to the file resource code
|
||||||
case event, ok := <-obj.recWatcher.Events():
|
case event, ok := <-obj.recWatcher.Events():
|
||||||
|
|||||||
@@ -142,7 +142,6 @@ func (obj *PkgRes) Watch(processChan chan *event.Event) error {
|
|||||||
log.Printf("%s: Watching...", obj.fmtNames(obj.getNames()))
|
log.Printf("%s: Watching...", obj.fmtNames(obj.getNames()))
|
||||||
}
|
}
|
||||||
|
|
||||||
obj.SetState(ResStateWatching) // reset
|
|
||||||
select {
|
select {
|
||||||
case event := <-ch:
|
case event := <-ch:
|
||||||
cuid.SetConverged(false)
|
cuid.SetConverged(false)
|
||||||
|
|||||||
@@ -44,11 +44,10 @@ type ResState int
|
|||||||
|
|
||||||
// Each ResState should be set properly in the relevant part of the resource.
|
// Each ResState should be set properly in the relevant part of the resource.
|
||||||
const (
|
const (
|
||||||
ResStateNil ResState = iota
|
ResStateNil ResState = iota
|
||||||
ResStateWatching
|
ResStateProcess // we're in process, but we haven't done much yet
|
||||||
ResStateEvent // an event has happened, but we haven't poked yet
|
ResStateCheckApply // we're about to run CheckApply
|
||||||
ResStateCheckApply
|
ResStatePoking // we're done CheckApply, and we're about to poke
|
||||||
ResStatePoking
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const refreshPathToken = "refresh"
|
const refreshPathToken = "refresh"
|
||||||
@@ -535,7 +534,6 @@ func (obj *BaseRes) Poll(processChan chan *event.Event) error {
|
|||||||
var send = false
|
var send = false
|
||||||
var exit *error
|
var exit *error
|
||||||
for {
|
for {
|
||||||
obj.SetState(ResStateWatching)
|
|
||||||
select {
|
select {
|
||||||
case <-ticker.C: // received the timer event
|
case <-ticker.C: // received the timer event
|
||||||
log.Printf("%s[%s]: polling...", obj.Kind(), obj.GetName())
|
log.Printf("%s[%s]: polling...", obj.Kind(), obj.GetName())
|
||||||
|
|||||||
@@ -153,7 +153,6 @@ func (obj *SvcRes) Watch(processChan chan *event.Event) error {
|
|||||||
set.Remove(svc) // no return value should ever occur
|
set.Remove(svc) // no return value should ever occur
|
||||||
}
|
}
|
||||||
|
|
||||||
obj.SetState(ResStateWatching) // reset
|
|
||||||
select {
|
select {
|
||||||
case <-buschan: // XXX: wait for new units event to unstick
|
case <-buschan: // XXX: wait for new units event to unstick
|
||||||
cuid.SetConverged(false)
|
cuid.SetConverged(false)
|
||||||
@@ -177,7 +176,6 @@ func (obj *SvcRes) Watch(processChan chan *event.Event) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Watching: %s", svc) // attempting to watch...
|
log.Printf("Watching: %s", svc) // attempting to watch...
|
||||||
obj.SetState(ResStateWatching) // reset
|
|
||||||
select {
|
select {
|
||||||
case event := <-subChannel:
|
case event := <-subChannel:
|
||||||
|
|
||||||
|
|||||||
@@ -92,7 +92,6 @@ func (obj *TimerRes) Watch(processChan chan *event.Event) error {
|
|||||||
var send = false
|
var send = false
|
||||||
|
|
||||||
for {
|
for {
|
||||||
obj.SetState(ResStateWatching)
|
|
||||||
select {
|
select {
|
||||||
case <-obj.ticker.C: // received the timer event
|
case <-obj.ticker.C: // received the timer event
|
||||||
send = true
|
send = true
|
||||||
|
|||||||
7
test/shell/t5b.sh
Executable file
7
test/shell/t5b.sh
Executable file
@@ -0,0 +1,7 @@
|
|||||||
|
#!/bin/bash -e
|
||||||
|
|
||||||
|
# should take slightly more than 35s, but fail if we take 45s)
|
||||||
|
timeout --kill-after=45s 40s ./mgmt run --yaml t5.yaml --converged-timeout=5 --no-watch --tmp-prefix &
|
||||||
|
pid=$!
|
||||||
|
wait $pid # get exit status
|
||||||
|
exit $?
|
||||||
84
test/shell/t5b.yaml
Normal file
84
test/shell/t5b.yaml
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
---
|
||||||
|
graph: mygraph
|
||||||
|
comment: simpler exec fan in to fan out example to demonstrate optimization
|
||||||
|
resources:
|
||||||
|
exec:
|
||||||
|
- name: exec1
|
||||||
|
cmd: sleep 10s
|
||||||
|
shell: ''
|
||||||
|
timeout: 0
|
||||||
|
watchcmd: ''
|
||||||
|
watchshell: ''
|
||||||
|
ifcmd: ''
|
||||||
|
ifshell: ''
|
||||||
|
pollint: 0
|
||||||
|
state: present
|
||||||
|
- name: exec2
|
||||||
|
cmd: sleep 10s
|
||||||
|
shell: ''
|
||||||
|
timeout: 0
|
||||||
|
watchcmd: ''
|
||||||
|
watchshell: ''
|
||||||
|
ifcmd: ''
|
||||||
|
ifshell: ''
|
||||||
|
pollint: 0
|
||||||
|
state: present
|
||||||
|
- name: exec3
|
||||||
|
cmd: sleep 10s
|
||||||
|
shell: ''
|
||||||
|
timeout: 0
|
||||||
|
watchcmd: ''
|
||||||
|
watchshell: ''
|
||||||
|
ifcmd: ''
|
||||||
|
ifshell: ''
|
||||||
|
pollint: 0
|
||||||
|
state: present
|
||||||
|
- name: exec4
|
||||||
|
cmd: sleep 10s
|
||||||
|
shell: ''
|
||||||
|
timeout: 0
|
||||||
|
watchcmd: ''
|
||||||
|
watchshell: ''
|
||||||
|
ifcmd: ''
|
||||||
|
ifshell: ''
|
||||||
|
pollint: 0
|
||||||
|
state: present
|
||||||
|
- name: exec5
|
||||||
|
cmd: sleep 10s
|
||||||
|
shell: ''
|
||||||
|
timeout: 0
|
||||||
|
watchcmd: ''
|
||||||
|
watchshell: ''
|
||||||
|
ifcmd: ''
|
||||||
|
ifshell: ''
|
||||||
|
pollint: 0
|
||||||
|
state: present
|
||||||
|
edges:
|
||||||
|
- name: e1
|
||||||
|
from:
|
||||||
|
kind: exec
|
||||||
|
name: exec1
|
||||||
|
to:
|
||||||
|
kind: exec
|
||||||
|
name: exec3
|
||||||
|
- name: e2
|
||||||
|
from:
|
||||||
|
kind: exec
|
||||||
|
name: exec2
|
||||||
|
to:
|
||||||
|
kind: exec
|
||||||
|
name: exec3
|
||||||
|
- name: e3
|
||||||
|
from:
|
||||||
|
kind: exec
|
||||||
|
name: exec3
|
||||||
|
to:
|
||||||
|
kind: exec
|
||||||
|
name: exec4
|
||||||
|
- name: e4
|
||||||
|
from:
|
||||||
|
kind: exec
|
||||||
|
name: exec3
|
||||||
|
to:
|
||||||
|
kind: exec
|
||||||
|
name: exec5
|
||||||
Reference in New Issue
Block a user