resources: Improve the state/cache system

Refactor the state cache into the engine. This makes resource writing
less error prone, and paves the way for better notifications.
This commit is contained in:
James Shubin
2016-11-25 12:30:29 -05:00
parent ba6044e9e8
commit b0a8fc165c
13 changed files with 95 additions and 176 deletions

View File

@@ -129,19 +129,35 @@ func (g *Graph) Process(v *Vertex) error {
if changed, err := obj.SendRecv(obj); err != nil {
return errwrap.Wrapf(err, "could not SendRecv in Process")
} else if changed {
obj.StateOK(false) // invalidate cache
obj.StateOK(false) // invalidate cache, mark as dirty
}
// if this fails, don't UpdateTimestamp()
checkok, err := obj.CheckApply(!obj.Meta().Noop)
if checkok && err != nil { // should never return this way
log.Fatalf("%s[%s]: CheckApply(): %t, %+v", obj.Kind(), obj.GetName(), checkok, err)
if global.DEBUG {
log.Printf("%s[%s]: CheckApply(%t)", obj.Kind(), obj.GetName(), !obj.Meta().Noop)
}
var checkOK bool
var err error
if obj.IsStateOK() { // check cached state, to skip CheckApply
checkOK, err = true, nil
} else {
// if this fails, don't UpdateTimestamp()
checkOK, err = obj.CheckApply(!obj.Meta().Noop)
}
if checkOK && err != nil { // should never return this way
log.Fatalf("%s[%s]: CheckApply(): %t, %+v", obj.Kind(), obj.GetName(), checkOK, err)
}
if global.DEBUG {
log.Printf("%s[%s]: CheckApply(): %t, %v", obj.Kind(), obj.GetName(), checkok, err)
log.Printf("%s[%s]: CheckApply(): %t, %v", obj.Kind(), obj.GetName(), checkOK, err)
}
if !checkok { // if state *was* not ok, we had to have apply'ed
// if CheckApply ran without noop and without error, state should be good
if !obj.Meta().Noop && err == nil { // aka !obj.Meta().Noop || checkOK
obj.StateOK(true) // reset
}
if !checkOK { // if state *was* not ok, we had to have apply'ed
if err != nil { // error during check or apply
ok = false
} else {
@@ -189,7 +205,7 @@ func (g *Graph) Worker(v *Vertex) error {
// the Watch() function about which graph it is
// running on, which isolates things nicely...
obj := v.Res
chanProcess := make(chan event.Event)
processChan := make(chan event.Event)
go func() {
running := false
var timer = time.NewTimer(time.Duration(math.MaxInt64)) // longest duration
@@ -205,14 +221,14 @@ func (g *Graph) Worker(v *Vertex) error {
// event loop will keep running and change state,
// causing the converged timeout to fire!
select {
case event, ok := <-chanProcess: // must use like this
case event, ok := <-processChan: // must use like this
if running && ok {
// we got an event that wasn't a close,
// while we were waiting for the timer!
// if this happens, it might be a bug:(
log.Fatalf("%s[%s]: Worker: Unexpected event: %+v", v.Kind(), v.GetName(), event)
}
if !ok { // chanProcess closed, let's exit
if !ok { // processChan closed, let's exit
break Loop // no event, so no ack!
}
@@ -245,7 +261,7 @@ func (g *Graph) Worker(v *Vertex) error {
running = false
log.Printf("%s[%s]: CheckApply delay expired!", v.Kind(), v.GetName())
// re-send this failed event, to trigger a CheckApply()
go func() { chanProcess <- saved }()
go func() { processChan <- saved }()
// TODO: should we send a fake event instead?
//saved = nil
}
@@ -308,14 +324,14 @@ func (g *Graph) Worker(v *Vertex) error {
// NOTE: we can avoid the send if running Watch guarantees
// one CheckApply event on startup!
//if pendingSendEvent { // TODO: should this become a list in the future?
// if exit, err := obj.DoSend(chanProcess, ""); exit || err != nil {
// if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
// return err // we exit or bubble up a NACK...
// }
//}
}
// TODO: reset the watch retry count after some amount of success
e := v.Res.Watch(chanProcess)
e := v.Res.Watch(processChan)
if e == nil { // exit signal
err = nil // clean exit
break
@@ -339,7 +355,7 @@ func (g *Graph) Worker(v *Vertex) error {
// by getting the Watch resource to send one event once it's up!
//v.SendEvent(eventPoke, false, false)
}
close(chanProcess)
close(processChan)
return err
}