Resources: Prototype retry and retry delay meta parameters

This was the initial cut of the retry and delay meta parameters.
Instead, I decided to move the delay action into the common space
outside of the Watch resource. This is more complicated in the short
term, but will be more beneficial in the long run as each resource won't
have to implement this part itself (even if it uses boiler plate).

This is the first version of this patch without this fix. I decided to
include it because I think it has more correct event processing.
This commit is contained in:
James Shubin
2016-09-14 04:33:03 -04:00
parent 2b1e8cdbee
commit 53cabd5ee4
11 changed files with 618 additions and 131 deletions

20
etcd.go
View File

@@ -919,8 +919,8 @@ func (obj *EmbdEtcd) loopProcessAW(ctx context.Context, aw *AW) {
func (obj *EmbdEtcd) Set(key, value string, opts ...etcd.OpOption) error {
resp := NewResp()
obj.setq <- &KV{key: key, value: value, opts: opts, resp: resp}
if !resp.Wait() { // wait for ack/nack
return fmt.Errorf("Etcd: Set: Probably received an exit...")
if err := resp.Wait(); err != nil { // wait for ack/nack
return fmt.Errorf("Etcd: Set: Probably received an exit: %v", err)
}
return nil
}
@@ -954,8 +954,8 @@ func (obj *EmbdEtcd) ComplexGet(path string, skipConv bool, opts ...etcd.OpOptio
resp := NewResp()
gq := &GQ{path: path, skipConv: skipConv, opts: opts, resp: resp, data: nil}
obj.getq <- gq // send
if !resp.Wait() { // wait for ack/nack
return nil, fmt.Errorf("Etcd: Get: Probably received an exit...")
if err := resp.Wait(); err != nil { // wait for ack/nack
return nil, fmt.Errorf("Etcd: Get: Probably received an exit: %v", err)
}
return gq.data, nil
}
@@ -988,8 +988,8 @@ func (obj *EmbdEtcd) Delete(path string, opts ...etcd.OpOption) (int64, error) {
resp := NewResp()
dl := &DL{path: path, opts: opts, resp: resp, data: -1}
obj.delq <- dl // send
if !resp.Wait() { // wait for ack/nack
return -1, fmt.Errorf("Etcd: Delete: Probably received an exit...")
if err := resp.Wait(); err != nil { // wait for ack/nack
return -1, fmt.Errorf("Etcd: Delete: Probably received an exit: %v", err)
}
return dl.data, nil
}
@@ -1016,8 +1016,8 @@ func (obj *EmbdEtcd) Txn(ifcmps []etcd.Cmp, thenops, elseops []etcd.Op) (*etcd.T
resp := NewResp()
tn := &TN{ifcmps: ifcmps, thenops: thenops, elseops: elseops, resp: resp, data: nil}
obj.txnq <- tn // send
if !resp.Wait() { // wait for ack/nack
return nil, fmt.Errorf("Etcd: Txn: Probably received an exit...")
if err := resp.Wait(); err != nil { // wait for ack/nack
return nil, fmt.Errorf("Etcd: Txn: Probably received an exit: %v", err)
}
return tn.data, nil
}
@@ -1041,8 +1041,8 @@ func (obj *EmbdEtcd) AddWatcher(path string, callback func(re *RE) error, errChe
resp := NewResp()
awq := &AW{path: path, opts: opts, callback: callback, errCheck: errCheck, skipConv: skipConv, cancelFunc: nil, resp: resp}
obj.awq <- awq // send
if !resp.Wait() { // wait for ack/nack
return nil, fmt.Errorf("Etcd: AddWatcher: Got NACK!")
if err := resp.Wait(); err != nil { // wait for ack/nack
return nil, fmt.Errorf("Etcd: AddWatcher: Got NACK: %v", err)
}
return awq.cancelFunc, nil
}

View File

@@ -17,6 +17,10 @@
package main
import (
"fmt"
)
//go:generate stringer -type=eventName -output=eventname_stringer.go
type eventName int
@@ -29,8 +33,9 @@ const (
eventBackPoke
)
// Resp is a channel to be used for boolean responses.
type Resp chan bool
// Resp is a channel to be used for boolean responses. A nil represents an ACK,
// and a non-nil represents a NACK (false). This also lets us use custom errors.
type Resp chan error
// Event is the main struct that stores event information and responses.
type Event struct {
@@ -55,28 +60,43 @@ func (event *Event) NACK() {
}
}
// ACKNACK sends a custom ACK or NACK message on the channel if one was requested.
func (event *Event) ACKNACK(err error) {
if event.Resp != nil { // if they've requested a NACK
event.Resp.ACKNACK(err)
}
}
// NewResp is just a helper to return the right type of response channel.
func NewResp() Resp {
resp := make(chan bool)
resp := make(chan error)
return resp
}
// ACK sends a true value to resp.
func (resp Resp) ACK() {
if resp != nil {
resp <- true
resp <- nil
}
}
// NACK sends a false value to resp.
func (resp Resp) NACK() {
if resp != nil {
resp <- false
resp <- fmt.Errorf("NACK")
}
}
// ACKNACK sends a custom ACK or NACK. The ACK value is always nil, the NACK can
// be any non-nil error value.
func (resp Resp) ACKNACK(err error) {
if resp != nil {
resp <- err
}
}
// Wait waits for any response from a Resp channel and returns it.
func (resp Resp) Wait() bool {
func (resp Resp) Wait() error {
return <-resp
}
@@ -84,7 +104,7 @@ func (resp Resp) Wait() bool {
func (resp Resp) ACKWait() {
for {
// wait until true value
if resp.Wait() {
if resp.Wait() == nil {
return
}
}

85
exec.go
View File

@@ -22,9 +22,11 @@ import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"log"
"os/exec"
"strings"
"time"
)
func init() {
@@ -105,15 +107,70 @@ func (obj *ExecRes) BufioChanScanner(scanner *bufio.Scanner) (chan string, chan
}
// Watch is the primary listener for this resource and it outputs events.
func (obj *ExecRes) Watch(processChan chan Event) {
func (obj *ExecRes) Watch(processChan chan Event, delay time.Duration) error {
if obj.IsWatching() {
return
return nil
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuuid := obj.converger.Register()
defer cuuid.Unregister()
var doSend func() (bool, error) // lol, golang doesn't support recursive lambdas
doSend = func() (bool, error) {
resp := NewResp()
processChan <- Event{eventNil, resp, "", true} // trigger process
select {
case e := <-resp: // wait for the ACK()
if e != nil { // we got a NACK
return true, e // exit with error
}
case event := <-obj.events:
// NOTE: this code should match the similar code below!
cuuid.SetConverged(false)
if exit, send := obj.ReadEvent(&event); exit {
return true, nil // exit, without error
} else if send {
return doSend() // recurse
}
}
return false, nil // return, no error or exit signal
}
// if a retry-delay was requested, wait, but don't block our events!
if delay > 0 {
var pendingSendEvent bool
timer := time.NewTimer(delay)
Loop:
for {
select {
case <-timer.C: // the wait is over
break Loop // critical
case event := <-obj.events:
// NOTE: this code should match the similar code below!
cuuid.SetConverged(false)
if exit, send := obj.ReadEvent(&event); exit {
return nil // exit
} else if send {
// NOTE: see long comment in the file resource
//if exit, err := doSend(); exit || err != nil {
// return err // we exit or bubble up a NACK...
//}
pendingSendEvent = true // all events are identical for now...
}
}
}
timer.Stop() // it's nice to cleanup
log.Printf("%s[%s]: Delay expired!", obj.Kind(), obj.GetName())
if pendingSendEvent { // TODO: should this become a list in the future?
if exit, err := doSend(); exit || err != nil {
return err // we exit or bubble up a NACK...
}
}
}
var send = false // send event?
var exit = false
bufioch, errch := make(chan string), make(chan error)
@@ -138,8 +195,7 @@ func (obj *ExecRes) Watch(processChan chan Event) {
cmdReader, err := cmd.StdoutPipe()
if err != nil {
log.Printf("%v[%v]: Error creating StdoutPipe for Cmd: %v", obj.Kind(), obj.GetName(), err)
log.Fatal(err) // XXX: how should we handle errors?
return fmt.Errorf("%s[%s]: Error creating StdoutPipe for Cmd: %v", obj.Kind(), obj.GetName(), err)
}
scanner := bufio.NewScanner(cmdReader)
@@ -150,8 +206,7 @@ func (obj *ExecRes) Watch(processChan chan Event) {
cmd.Process.Kill() // TODO: is this necessary?
}()
if err := cmd.Start(); err != nil {
log.Printf("%v[%v]: Error starting Cmd: %v", obj.Kind(), obj.GetName(), err)
log.Fatal(err) // XXX: how should we handle errors?
return fmt.Errorf("%s[%s]: Error starting Cmd: %v", obj.Kind(), obj.GetName(), err)
}
bufioch, errch = obj.BufioChanScanner(scanner)
@@ -169,21 +224,19 @@ func (obj *ExecRes) Watch(processChan chan Event) {
}
case err := <-errch:
cuuid.SetConverged(false) // XXX ?
cuuid.SetConverged(false)
if err == nil { // EOF
// FIXME: add an "if watch command ends/crashes"
// restart or generate error option
log.Printf("%v[%v]: Reached EOF", obj.Kind(), obj.GetName())
return
return fmt.Errorf("%s[%s]: Reached EOF", obj.Kind(), obj.GetName())
}
log.Printf("%v[%v]: Error reading input?: %v", obj.Kind(), obj.GetName(), err)
log.Fatal(err)
// XXX: how should we handle errors?
// error reading input?
return fmt.Errorf("Unknown %s[%s] error: %v", obj.Kind(), obj.GetName(), err)
case event := <-obj.events:
cuuid.SetConverged(false)
if exit, send = obj.ReadEvent(&event); exit {
return // exit
return nil // exit
}
case <-cuuid.ConvergedTimer():
@@ -196,9 +249,9 @@ func (obj *ExecRes) Watch(processChan chan Event) {
send = false
// it is okay to invalidate the clean state on poke too
obj.isStateOK = false // something made state dirty
resp := NewResp()
processChan <- Event{eventNil, resp, "", true} // trigger process
resp.ACKWait() // wait for the ACK()
if exit, err := doSend(); exit || err != nil {
return err // we exit or bubble up a NACK...
}
}
}
}

120
file.go
View File

@@ -34,6 +34,7 @@ import (
"path/filepath"
"strings"
"syscall"
"time"
)
func init() {
@@ -143,7 +144,7 @@ func (obj *FileRes) addSubFolders(p string) error {
// look at all subfolders...
walkFn := func(path string, info os.FileInfo, err error) error {
if DEBUG {
log.Printf("File[%v]: Walk: %s (%v): %v", obj.GetName(), path, info, err)
log.Printf("%s[%s]: Walk: %s (%v): %v", obj.Kind(), obj.GetName(), path, info, err)
}
if err != nil {
return nil
@@ -164,22 +165,94 @@ func (obj *FileRes) addSubFolders(p string) error {
// Watch is the primary listener for this resource and it outputs events.
// This one is a file watcher for files and directories.
// Modify with caution, it is probably important to write some test cases first!
// If the Watch returns an error, it means that something has gone wrong, and it
// must be restarted. On a clean exit it returns nil. The delay parameter asks
// it to respect this pause duration before trying to watch again.
// FIXME: Also watch the source directory when using obj.Source !!!
func (obj *FileRes) Watch(processChan chan Event) {
func (obj *FileRes) Watch(processChan chan Event, delay time.Duration) error {
if obj.IsWatching() {
return
return nil // TODO: should this be an error?
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuuid := obj.converger.Register()
defer cuuid.Unregister()
var doSend func() (bool, error) // lol, golang doesn't support recursive lambdas
doSend = func() (bool, error) {
resp := NewResp()
processChan <- Event{eventNil, resp, "", true} // trigger process
select {
case e := <-resp: // wait for the ACK()
if e != nil { // we got a NACK
return true, e // exit with error
}
case event := <-obj.events:
// NOTE: this code should match the similar code below!
cuuid.SetConverged(false)
if exit, send := obj.ReadEvent(&event); exit {
return true, nil // exit, without error
} else if send {
return doSend() // recurse
}
}
return false, nil // return, no error or exit signal
}
// if a retry-delay was requested, wait, but don't block our events!
if delay > 0 {
var pendingSendEvent bool
timer := time.NewTimer(delay)
Loop:
for {
select {
case <-timer.C: // the wait is over
break Loop // critical
case event := <-obj.events:
// NOTE: this code should match the similar code below!
cuuid.SetConverged(false)
if exit, send := obj.ReadEvent(&event); exit {
return nil // exit
} else if send {
// if we dive down this rabbit hole, our
// timer.C won't get seen until we get out!
// in this situation, the Watch() is blocked
// from performing until CheckApply returns
// successfully, or errors out. This isn't
// so bad, but we should document it. Is it
// possible that some resource *needs* Watch
// to run to be able to execute a CheckApply?
// That situation shouldn't be common, and
// should probably not be allowed. Can we
// avoid it though?
//if exit, err := doSend(); exit || err != nil {
// return err // we exit or bubble up a NACK...
//}
// Instead of doing the above, we can
// add events to a pending list, and
// when we finish the delay, we can run
// them.
pendingSendEvent = true // all events are identical for now...
}
}
}
timer.Stop() // it's nice to cleanup
log.Printf("%s[%s]: Delay expired!", obj.Kind(), obj.GetName())
if pendingSendEvent { // TODO: should this become a list in the future?
if exit, err := doSend(); exit || err != nil {
return err // we exit or bubble up a NACK...
}
}
}
var safename = path.Clean(obj.path) // no trailing slash
var err error
obj.watcher, err = fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
return err
}
defer obj.watcher.Close()
@@ -201,7 +274,7 @@ func (obj *FileRes) Watch(processChan chan Event) {
if obj.isDir {
if err := obj.addSubFolders(safename); err != nil {
log.Fatal(err) // TODO: temporary until we support errors
return err
}
}
for {
@@ -210,24 +283,25 @@ func (obj *FileRes) Watch(processChan chan Event) {
current = "/"
}
if DEBUG {
log.Printf("File[%v]: Watching: %v", obj.GetName(), current) // attempting to watch...
log.Printf("%s[%s]: Watching: %v", obj.Kind(), obj.GetName(), current) // attempting to watch...
}
// initialize in the loop so that we can reset on rm-ed handles
err = obj.watcher.Add(current)
if err != nil {
if DEBUG {
log.Printf("File[%v]: watcher.Add(%v): Error: %v", obj.GetName(), current, err)
log.Printf("%s[%s]: watcher.Add(%v): Error: %v", obj.Kind(), obj.GetName(), current, err)
}
if err == syscall.ENOENT {
index-- // usually not found, move up one dir
} else if err == syscall.ENOSPC {
// XXX: occasionally: no space left on device,
// XXX: probably due to lack of inotify watches
log.Printf("%v[%v]: Out of inotify watches!", obj.Kind(), obj.GetName())
log.Fatal(err)
// no space left on device, out of inotify watches
// TODO: consider letting the user fall back to
// polling if they hit this error very often...
return fmt.Errorf("%s[%s]: Out of inotify watches: %v", obj.Kind(), obj.GetName(), err)
} else if os.IsPermission(err) {
return fmt.Errorf("%s[%s]: Permission denied to add a watch: %v", obj.Kind(), obj.GetName(), err)
} else {
log.Printf("Unknown file[%v] error:", obj.Name)
log.Fatal(err)
return fmt.Errorf("Unknown %s[%s] error: %v", obj.Kind(), obj.GetName(), err)
}
index = int(math.Max(1, float64(index)))
continue
@@ -237,7 +311,7 @@ func (obj *FileRes) Watch(processChan chan Event) {
select {
case event := <-obj.watcher.Events:
if DEBUG {
log.Printf("File[%v]: Watch(%v), Event(%v): %v", obj.GetName(), current, event.Name, event.Op)
log.Printf("%s[%s]: Watch(%s), Event(%s): %v", obj.Kind(), obj.GetName(), current, event.Name, event.Op)
}
cuuid.SetConverged(false) // XXX: technically i can detect if the event is erroneous or not first
// the deeper you go, the bigger the deltaDepth is...
@@ -263,7 +337,7 @@ func (obj *FileRes) Watch(processChan chan Event) {
obj.watcher.Add(event.Name)
obj.watches[event.Name] = struct{}{}
if err := obj.addSubFolders(event.Name); err != nil {
log.Fatal(err) // TODO: temporary until we support errors
return err
}
}
}
@@ -286,7 +360,7 @@ func (obj *FileRes) Watch(processChan chan Event) {
if obj.isDir {
if err := obj.addSubFolders(safename); err != nil {
log.Fatal(err) // TODO: temporary until we support errors
return err
}
}
@@ -331,15 +405,13 @@ func (obj *FileRes) Watch(processChan chan Event) {
}
case err := <-obj.watcher.Errors:
cuuid.SetConverged(false) // XXX ?
log.Printf("error: %v", err)
log.Fatal(err)
//obj.events <- fmt.Sprintf("file: %v", "error") // XXX: how should we handle errors?
cuuid.SetConverged(false)
return fmt.Errorf("Unknown %s[%s] watcher error: %v", obj.Kind(), obj.GetName(), err)
case event := <-obj.events:
cuuid.SetConverged(false)
if exit, send = obj.ReadEvent(&event); exit {
return // exit
return nil // exit
}
//dirty = false // these events don't invalidate state
@@ -356,9 +428,9 @@ func (obj *FileRes) Watch(processChan chan Event) {
dirty = false
obj.isStateOK = false // something made state dirty
}
resp := NewResp()
processChan <- Event{eventNil, resp, "", true} // trigger process
resp.ACKWait() // wait for the ACK()
if exit, err := doSend(); exit || err != nil {
return err // we exit or bubble up a NACK...
}
}
}
}

68
noop.go
View File

@@ -20,6 +20,7 @@ package main
import (
"encoding/gob"
"log"
"time"
)
func init() {
@@ -57,15 +58,70 @@ func (obj *NoopRes) Validate() bool {
}
// Watch is the primary listener for this resource and it outputs events.
func (obj *NoopRes) Watch(processChan chan Event) {
func (obj *NoopRes) Watch(processChan chan Event, delay time.Duration) error {
if obj.IsWatching() {
return
return nil // TODO: should this be an error?
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuuid := obj.converger.Register()
defer cuuid.Unregister()
var doSend func() (bool, error) // lol, golang doesn't support recursive lambdas
doSend = func() (bool, error) {
resp := NewResp()
processChan <- Event{eventNil, resp, "", true} // trigger process
select {
case e := <-resp: // wait for the ACK()
if e != nil { // we got a NACK
return true, e // exit with error
}
case event := <-obj.events:
// NOTE: this code should match the similar code below!
cuuid.SetConverged(false)
if exit, send := obj.ReadEvent(&event); exit {
return true, nil // exit, without error
} else if send {
return doSend() // recurse
}
}
return false, nil // return, no error or exit signal
}
// if a retry-delay was requested, wait, but don't block our events!
if delay > 0 {
var pendingSendEvent bool
timer := time.NewTimer(delay)
Loop:
for {
select {
case <-timer.C: // the wait is over
break Loop // critical
case event := <-obj.events:
// NOTE: this code should match the similar code below!
cuuid.SetConverged(false)
if exit, send := obj.ReadEvent(&event); exit {
return nil // exit
} else if send {
// NOTE: see long comment in the file resource
//if exit, err := doSend(); exit || err != nil {
// return err // we exit or bubble up a NACK...
//}
pendingSendEvent = true // all events are identical for now...
}
}
}
timer.Stop() // it's nice to cleanup
log.Printf("%s[%s]: Delay expired!", obj.Kind(), obj.GetName())
if pendingSendEvent { // TODO: should this become a list in the future?
if exit, err := doSend(); exit || err != nil {
return err // we exit or bubble up a NACK...
}
}
}
var send = false // send event?
var exit = false
for {
@@ -75,7 +131,7 @@ func (obj *NoopRes) Watch(processChan chan Event) {
cuuid.SetConverged(false)
// we avoid sending events on unpause
if exit, send = obj.ReadEvent(&event); exit {
return // exit
return nil // exit
}
case <-cuuid.ConvergedTimer():
@@ -88,9 +144,9 @@ func (obj *NoopRes) Watch(processChan chan Event) {
send = false
// only do this on certain types of events
//obj.isStateOK = false // something made state dirty
resp := NewResp()
processChan <- Event{eventNil, resp, "", true} // trigger process
resp.ACKWait() // wait for the ACK()
if exit, err := doSend(); exit || err != nil {
return err // we exit or bubble up a NACK...
}
}
}
}

149
pgraph.go
View File

@@ -23,6 +23,7 @@ import (
"fmt"
"io/ioutil"
"log"
"math"
"os"
"os/exec"
"sort"
@@ -718,7 +719,7 @@ func (g *Graph) BackPoke(v *Vertex) {
// Process is the primary function to execute for a particular vertex in the graph.
// XXX: rename this function
func (g *Graph) Process(v *Vertex) {
func (g *Graph) Process(v *Vertex) error {
obj := v.Res
if DEBUG {
log.Printf("%v[%v]: Process()", obj.Kind(), obj.GetName())
@@ -765,10 +766,132 @@ func (g *Graph) Process(v *Vertex) {
g.Poke(v, apply)
}
// poke at our pre-req's instead since they need to refresh/run...
return err
} else {
// only poke at the pre-req's that need to run
go g.BackPoke(v)
}
return nil
}
// SentinelErr is a sentinal as an error type that wraps an arbitrary error.
type SentinelErr struct {
err error
}
// Error is the required method to fulfill the error type.
func (obj *SentinelErr) Error() string {
return obj.err.Error()
}
// Worker is the common run frontend of the vertex. It handles all of the retry
// and retry delay common code, and ultimately returns the final status of this
// vertex execution.
func (g *Graph) Worker(v *Vertex) error {
// listen for chan events from Watch() and run
// the Process() function when they're received
// this avoids us having to pass the data into
// the Watch() function about which graph it is
// running on, which isolates things nicely...
chanProcess := make(chan Event)
go func() {
running := false
var timer = time.NewTimer(time.Duration(math.MaxInt64)) // longest duration
if !timer.Stop() {
<-timer.C // unnecessary, shouldn't happen
}
var delay = time.Duration(v.Meta().Delay) * time.Millisecond
var retry int16 = v.Meta().Retry // number of tries left, -1 for infinite
var saved Event
Loop:
for {
// this has to be synchronous, because otherwise the Res
// event loop will keep running and change state,
// causing the converged timeout to fire!
select {
case event, ok := <-chanProcess: // must use like this
if running && ok {
// we got an event that wasn't a close,
// while we were waiting for the timer!
// if this happens, it might be a bug:(
log.Fatalf("%v[%v]: Worker: Unexpected event: %+v", v.Kind(), v.GetName(), event)
}
if !ok { // chanProcess closed, let's exit
break Loop // no event, so no ack!
}
// the above mentioned synchronous part, is the
// running of this function, paired with an ack.
if e := g.Process(v); e != nil {
saved = event
log.Printf("%v[%v]: CheckApply errored: %v", v.Kind(), v.GetName(), e)
if retry == 0 {
// wrap the error in the sentinel
event.ACKNACK(&SentinelErr{e}) // fail the Watch()
break Loop
}
if retry > 0 { // don't decrement the -1
retry--
}
log.Printf("%v[%v]: CheckApply: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), delay.Seconds(), retry)
// start the timer...
timer.Reset(delay)
running = true
continue
}
retry = v.Meta().Retry // reset on success
event.ACK() // sync
case <-timer.C:
if !timer.Stop() {
//<-timer.C // blocks, docs are wrong!
}
running = false
// re-send this failed event, to trigger a CheckApply()
go func() { chanProcess <- saved }()
// TODO: should we send a fake event instead?
//saved = nil
}
}
}()
var err error // propagate the error up (this is a permanent BAD error!)
// the watch delay runs inside of the Watch resource loop, so that it
// can still process signals and exit if needed. It shouldn't run any
// resource specific code since this is supposed to be a retry delay.
// NOTE: we're using the same retry and delay metaparams that CheckApply
// uses. This is for practicality. We can separate them later if needed!
var watchDelay time.Duration
var watchRetry int16 = v.Meta().Retry // number of tries left, -1 for infinite
// watch blocks until it ends, & errors to retry
for {
// TODO: reset the watch retry count after some amount of success
e := v.Res.Watch(chanProcess, watchDelay)
if e == nil { // exit signal
err = nil // clean exit
break
}
if sentinelErr, ok := e.(*SentinelErr); ok { // unwrap the sentinel
err = sentinelErr.err
break // sentinel means, perma-exit
}
log.Printf("%v[%v]: Watch errored: %v", v.Kind(), v.GetName(), e)
if watchRetry == 0 {
err = fmt.Errorf("Permanent watch error: %v", e)
break
}
if watchRetry > 0 { // don't decrement the -1
watchRetry--
}
watchDelay = time.Duration(v.Meta().Delay) * time.Millisecond
log.Printf("%v[%v]: Watch: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), watchDelay.Seconds(), watchRetry)
// We trigger a CheckApply if watch restarts, so that we catch
// any possible events that happened while down. NOTE: this is
// only flood-safe if the Watch resource de-duplicates similar
// send event messages. It does for now, rethink this later...
v.SendEvent(eventPoke, false, false)
}
close(chanProcess)
return err
}
// Start is a main kick to start the graph. It goes through in reverse topological
@@ -787,25 +910,13 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
// see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/
go func(vv *Vertex) {
defer wg.Done()
// listen for chan events from Watch() and run
// the Process() function when they're received
// this avoids us having to pass the data into
// the Watch() function about which graph it is
// running on, which isolates things nicely...
chanProcess := make(chan Event)
go func() {
for event := range chanProcess {
// this has to be synchronous,
// because otherwise the Res
// event loop will keep running
// and change state, causing the
// converged timeout to fire!
g.Process(vv)
event.ACK() // sync
// TODO: if a sufficient number of workers error,
// should something be done? Will these restart
// after perma-failure if we have a graph change?
if err := g.Worker(vv); err != nil { // contains the Watch and CheckApply loops
log.Printf("%s[%s]: Exited with failure: %v", vv.Kind(), vv.GetName(), err)
return
}
}()
vv.Res.Watch(chanProcess) // i block until i end
close(chanProcess)
log.Printf("%v[%v]: Exited", vv.Kind(), vv.GetName())
}(v)
}

View File

@@ -25,6 +25,7 @@ import (
"sort"
"strings"
"testing"
"time"
)
func TestPgraphT1(t *testing.T) {
@@ -1282,3 +1283,13 @@ func TestPgraphGroupingConnected1(t *testing.T) {
}
runGraphCmp(t, g1, g2)
}
func TestDurationAssumptions(t *testing.T) {
var d time.Duration
if (d == 0) != true {
t.Errorf("Empty time.Duration is no longer equal to zero!")
}
if (d > 0) != false {
t.Errorf("Empty time.Duration is now greater than zero!")
}
}

68
pkg.go
View File

@@ -25,6 +25,7 @@ import (
"log"
"path"
"strings"
"time"
)
func init() {
@@ -108,15 +109,70 @@ func (obj *PkgRes) Validate() bool {
// It uses the PackageKit UpdatesChanged signal to watch for changes.
// TODO: https://github.com/hughsie/PackageKit/issues/109
// TODO: https://github.com/hughsie/PackageKit/issues/110
func (obj *PkgRes) Watch(processChan chan Event) {
func (obj *PkgRes) Watch(processChan chan Event, delay time.Duration) error {
if obj.IsWatching() {
return
return nil
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuuid := obj.converger.Register()
defer cuuid.Unregister()
var doSend func() (bool, error) // lol, golang doesn't support recursive lambdas
doSend = func() (bool, error) {
resp := NewResp()
processChan <- Event{eventNil, resp, "", true} // trigger process
select {
case e := <-resp: // wait for the ACK()
if e != nil { // we got a NACK
return true, e // exit with error
}
case event := <-obj.events:
// NOTE: this code should match the similar code below!
cuuid.SetConverged(false)
if exit, send := obj.ReadEvent(&event); exit {
return true, nil // exit, without error
} else if send {
return doSend() // recurse
}
}
return false, nil // return, no error or exit signal
}
// if a retry-delay was requested, wait, but don't block our events!
if delay > 0 {
var pendingSendEvent bool
timer := time.NewTimer(delay)
Loop:
for {
select {
case <-timer.C: // the wait is over
break Loop // critical
case event := <-obj.events:
// NOTE: this code should match the similar code below!
cuuid.SetConverged(false)
if exit, send := obj.ReadEvent(&event); exit {
return nil // exit
} else if send {
// NOTE: see long comment in the file resource
//if exit, err := doSend(); exit || err != nil {
// return err // we exit or bubble up a NACK...
//}
pendingSendEvent = true // all events are identical for now...
}
}
}
timer.Stop() // it's nice to cleanup
log.Printf("%s[%s]: Delay expired!", obj.Kind(), obj.GetName())
if pendingSendEvent { // TODO: should this become a list in the future?
if exit, err := doSend(); exit || err != nil {
return err // we exit or bubble up a NACK...
}
}
}
bus := NewBus()
if bus == nil {
log.Fatal("Can't connect to PackageKit bus.")
@@ -159,7 +215,7 @@ func (obj *PkgRes) Watch(processChan chan Event) {
case event := <-obj.events:
cuuid.SetConverged(false)
if exit, send = obj.ReadEvent(&event); exit {
return // exit
return nil // exit
}
dirty = false // these events don't invalidate state
@@ -176,9 +232,9 @@ func (obj *PkgRes) Watch(processChan chan Event) {
dirty = false
obj.isStateOK = false // something made state dirty
}
resp := NewResp()
processChan <- Event{eventNil, resp, "", true} // trigger process
resp.ACKWait() // wait for the ACK()
if exit, err := doSend(); exit || err != nil {
return err // we exit or bubble up a NACK...
}
}
}
}

View File

@@ -23,6 +23,7 @@ import (
"encoding/gob"
"fmt"
"log"
"time"
)
//go:generate stringer -type=resState -output=resstate_stringer.go
@@ -64,6 +65,11 @@ type MetaParams struct {
AutoEdge bool `yaml:"autoedge"` // metaparam, should we generate auto edges? // XXX should default to true
AutoGroup bool `yaml:"autogroup"` // metaparam, should we auto group? // XXX should default to true
Noop bool `yaml:"noop"`
// NOTE: there are separate Watch and CheckApply retry and delay values,
// but I've decided to use the same ones for both until there's a proper
// reason to want to do something differently for the Watch errors.
Retry int16 `yaml:"retry"` // metaparam, number of times to retry on error. -1 for infinite
Delay uint64 `yaml:"delay"` // metaparam, number of milliseconds to wait between retries
}
// The Base interface is everything that is common to all resources.
@@ -95,7 +101,7 @@ type Res interface {
Init()
//Validate() bool // TODO: this might one day be added
GetUUIDs() []ResUUID // most resources only return one
Watch(chan Event) // send on channel to signal process() events
Watch(chan Event, time.Duration) error // send on channel to signal process() events
CheckApply(bool) (bool, error)
AutoEdges() AutoEdge
Compare(Res) bool
@@ -226,15 +232,10 @@ func (obj *BaseRes) SendEvent(event eventName, sync bool, activity bool) bool {
return true
}
resp := make(chan bool)
resp := NewResp()
obj.events <- Event{event, resp, "", activity}
for {
value := <-resp
// wait until true value
if value {
resp.ACKWait() // waits until true (nil) value
return true
}
}
}
// ReadEvent processes events when a select gets one, and handles the pause

85
svc.go
View File

@@ -27,6 +27,7 @@ import (
systemdUtil "github.com/coreos/go-systemd/util"
"github.com/godbus/dbus" // namespace collides with systemd wrapper
"log"
"time"
)
func init() {
@@ -71,30 +72,85 @@ func (obj *SvcRes) Validate() bool {
}
// Watch is the primary listener for this resource and it outputs events.
func (obj *SvcRes) Watch(processChan chan Event) {
func (obj *SvcRes) Watch(processChan chan Event, delay time.Duration) error {
if obj.IsWatching() {
return
return nil
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuuid := obj.converger.Register()
defer cuuid.Unregister()
var doSend func() (bool, error) // lol, golang doesn't support recursive lambdas
doSend = func() (bool, error) {
resp := NewResp()
processChan <- Event{eventNil, resp, "", true} // trigger process
select {
case e := <-resp: // wait for the ACK()
if e != nil { // we got a NACK
return true, e // exit with error
}
case event := <-obj.events:
// NOTE: this code should match the similar code below!
cuuid.SetConverged(false)
if exit, send := obj.ReadEvent(&event); exit {
return true, nil // exit, without error
} else if send {
return doSend() // recurse
}
}
return false, nil // return, no error or exit signal
}
// if a retry-delay was requested, wait, but don't block our events!
if delay > 0 {
var pendingSendEvent bool
timer := time.NewTimer(delay)
Loop:
for {
select {
case <-timer.C: // the wait is over
break Loop // critical
case event := <-obj.events:
// NOTE: this code should match the similar code below!
cuuid.SetConverged(false)
if exit, send := obj.ReadEvent(&event); exit {
return nil // exit
} else if send {
// NOTE: see long comment in the file resource
//if exit, err := doSend(); exit || err != nil {
// return err // we exit or bubble up a NACK...
//}
pendingSendEvent = true // all events are identical for now...
}
}
}
timer.Stop() // it's nice to cleanup
log.Printf("%s[%s]: Delay expired!", obj.Kind(), obj.GetName())
if pendingSendEvent { // TODO: should this become a list in the future?
if exit, err := doSend(); exit || err != nil {
return err // we exit or bubble up a NACK...
}
}
}
// obj.Name: svc name
if !systemdUtil.IsRunningSystemd() {
log.Fatal("Systemd is not running.")
return fmt.Errorf("Systemd is not running.")
}
conn, err := systemd.NewSystemdConnection() // needs root access
if err != nil {
log.Fatal("Failed to connect to systemd: ", err)
return fmt.Errorf("Failed to connect to systemd: %s", err)
}
defer conn.Close()
// if we share the bus with others, we will get each others messages!!
bus, err := SystemBusPrivateUsable() // don't share the bus connection!
if err != nil {
log.Fatal("Failed to connect to bus: ", err)
return fmt.Errorf("Failed to connect to bus: %s", err)
}
// XXX: will this detect new units?
@@ -157,7 +213,7 @@ func (obj *SvcRes) Watch(processChan chan Event) {
case event := <-obj.events:
cuuid.SetConverged(false)
if exit, send = obj.ReadEvent(&event); exit {
return // exit
return nil // exit
}
if event.GetActivity() {
dirty = true
@@ -187,7 +243,7 @@ func (obj *SvcRes) Watch(processChan chan Event) {
} else if event[svc].ActiveState == "inactive" {
log.Printf("Svc[%v]->Stopped!()", svc)
} else {
log.Fatal("Unknown svc state: ", event[svc].ActiveState)
log.Fatalf("Unknown svc state: %s", event[svc].ActiveState)
}
} else {
// svc stopped (and ActiveState is nil...)
@@ -197,15 +253,13 @@ func (obj *SvcRes) Watch(processChan chan Event) {
dirty = true
case err := <-subErrors:
cuuid.SetConverged(false) // XXX ?
log.Printf("error: %v", err)
log.Fatal(err)
//vertex.events <- fmt.Sprintf("svc: %v", "error") // XXX: how should we handle errors?
cuuid.SetConverged(false)
return fmt.Errorf("Unknown %s[%s] error: %v", obj.Kind(), obj.GetName(), err)
case event := <-obj.events:
cuuid.SetConverged(false)
if exit, send = obj.ReadEvent(&event); exit {
return // exit
return nil // exit
}
if event.GetActivity() {
dirty = true
@@ -223,11 +277,10 @@ func (obj *SvcRes) Watch(processChan chan Event) {
dirty = false
obj.isStateOK = false // something made state dirty
}
resp := NewResp()
processChan <- Event{eventNil, resp, "", true} // trigger process
resp.ACKWait() // wait for the ACK()
if exit, err := doSend(); exit || err != nil {
return err // we exit or bubble up a NACK...
}
}
}
}

View File

@@ -65,20 +65,74 @@ func (obj *TimerRes) Validate() bool {
}
// Watch is the primary listener for this resource and it outputs events.
func (obj *TimerRes) Watch(processChan chan Event) {
func (obj *TimerRes) Watch(processChan chan Event, delay time.Duration) error {
if obj.IsWatching() {
return
return nil
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuuid := obj.converger.Register()
defer cuuid.Unregister()
var doSend func(string) (bool, error) // lol, golang doesn't support recursive lambdas
doSend = func(comment string) (bool, error) {
resp := NewResp()
processChan <- Event{eventNil, resp, comment, true} // trigger process
select {
case e := <-resp: // wait for the ACK()
if e != nil { // we got a NACK
return true, e // exit with error
}
case event := <-obj.events:
// NOTE: this code should match the similar code below!
cuuid.SetConverged(false)
if exit, send := obj.ReadEvent(&event); exit {
return true, nil // exit, without error
} else if send {
return doSend(comment) // recurse
}
}
return false, nil // return, no error or exit signal
}
// if a retry-delay was requested, wait, but don't block our events!
if delay > 0 {
var pendingSendEvent bool
timer := time.NewTimer(delay)
Loop:
for {
select {
case <-timer.C: // the wait is over
break Loop // critical
case event := <-obj.events:
// NOTE: this code should match the similar code below!
cuuid.SetConverged(false)
if exit, send := obj.ReadEvent(&event); exit {
return nil // exit
} else if send {
// NOTE: see long comment in the file resource
//if exit, err := doSend(); exit || err != nil {
// return err // we exit or bubble up a NACK...
//}
pendingSendEvent = true // all events are identical for now...
}
}
}
timer.Stop() // it's nice to cleanup
log.Printf("%s[%s]: Delay expired!", obj.Kind(), obj.GetName())
if pendingSendEvent { // TODO: should this become a list in the future?
if exit, err := doSend("pending delayed event"); exit || err != nil {
return err // we exit or bubble up a NACK...
}
}
}
// Create a time.Ticker for the given interval
ticker := time.NewTicker(time.Duration(obj.Interval) * time.Second)
defer ticker.Stop()
obj.SetWatching(true)
defer obj.SetWatching(false)
cuuid := obj.converger.Register()
defer cuuid.Unregister()
var send = false
for {
@@ -90,7 +144,7 @@ func (obj *TimerRes) Watch(processChan chan Event) {
case event := <-obj.events:
cuuid.SetConverged(false)
if exit, _ := obj.ReadEvent(&event); exit {
return
return nil
}
case <-cuuid.ConvergedTimer():
cuuid.SetConverged(true)
@@ -99,9 +153,9 @@ func (obj *TimerRes) Watch(processChan chan Event) {
if send {
send = false
obj.isStateOK = false
resp := NewResp()
processChan <- Event{eventNil, resp, "timer ticked", true}
resp.ACKWait()
if exit, err := doSend("timer ticked"); exit || err != nil {
return err // we exit or bubble up a NACK...
}
}
}
}