diff --git a/etcd.go b/etcd.go index 0580af1e..ffda4bb3 100644 --- a/etcd.go +++ b/etcd.go @@ -919,8 +919,8 @@ func (obj *EmbdEtcd) loopProcessAW(ctx context.Context, aw *AW) { func (obj *EmbdEtcd) Set(key, value string, opts ...etcd.OpOption) error { resp := NewResp() obj.setq <- &KV{key: key, value: value, opts: opts, resp: resp} - if !resp.Wait() { // wait for ack/nack - return fmt.Errorf("Etcd: Set: Probably received an exit...") + if err := resp.Wait(); err != nil { // wait for ack/nack + return fmt.Errorf("Etcd: Set: Probably received an exit: %v", err) } return nil } @@ -953,9 +953,9 @@ func (obj *EmbdEtcd) Get(path string, opts ...etcd.OpOption) (map[string]string, func (obj *EmbdEtcd) ComplexGet(path string, skipConv bool, opts ...etcd.OpOption) (map[string]string, error) { resp := NewResp() gq := &GQ{path: path, skipConv: skipConv, opts: opts, resp: resp, data: nil} - obj.getq <- gq // send - if !resp.Wait() { // wait for ack/nack - return nil, fmt.Errorf("Etcd: Get: Probably received an exit...") + obj.getq <- gq // send + if err := resp.Wait(); err != nil { // wait for ack/nack + return nil, fmt.Errorf("Etcd: Get: Probably received an exit: %v", err) } return gq.data, nil } @@ -987,9 +987,9 @@ func (obj *EmbdEtcd) rawGet(ctx context.Context, gq *GQ) (result map[string]stri func (obj *EmbdEtcd) Delete(path string, opts ...etcd.OpOption) (int64, error) { resp := NewResp() dl := &DL{path: path, opts: opts, resp: resp, data: -1} - obj.delq <- dl // send - if !resp.Wait() { // wait for ack/nack - return -1, fmt.Errorf("Etcd: Delete: Probably received an exit...") + obj.delq <- dl // send + if err := resp.Wait(); err != nil { // wait for ack/nack + return -1, fmt.Errorf("Etcd: Delete: Probably received an exit: %v", err) } return dl.data, nil } @@ -1015,9 +1015,9 @@ func (obj *EmbdEtcd) rawDelete(ctx context.Context, dl *DL) (count int64, err er func (obj *EmbdEtcd) Txn(ifcmps []etcd.Cmp, thenops, elseops []etcd.Op) (*etcd.TxnResponse, error) { resp := NewResp() tn := &TN{ifcmps: ifcmps, thenops: thenops, elseops: elseops, resp: resp, data: nil} - obj.txnq <- tn // send - if !resp.Wait() { // wait for ack/nack - return nil, fmt.Errorf("Etcd: Txn: Probably received an exit...") + obj.txnq <- tn // send + if err := resp.Wait(); err != nil { // wait for ack/nack + return nil, fmt.Errorf("Etcd: Txn: Probably received an exit: %v", err) } return tn.data, nil } @@ -1040,9 +1040,9 @@ func (obj *EmbdEtcd) rawTxn(ctx context.Context, tn *TN) (*etcd.TxnResponse, err func (obj *EmbdEtcd) AddWatcher(path string, callback func(re *RE) error, errCheck bool, skipConv bool, opts ...etcd.OpOption) (func(), error) { resp := NewResp() awq := &AW{path: path, opts: opts, callback: callback, errCheck: errCheck, skipConv: skipConv, cancelFunc: nil, resp: resp} - obj.awq <- awq // send - if !resp.Wait() { // wait for ack/nack - return nil, fmt.Errorf("Etcd: AddWatcher: Got NACK!") + obj.awq <- awq // send + if err := resp.Wait(); err != nil { // wait for ack/nack + return nil, fmt.Errorf("Etcd: AddWatcher: Got NACK: %v", err) } return awq.cancelFunc, nil } diff --git a/event.go b/event.go index b4f58433..bf2868d4 100644 --- a/event.go +++ b/event.go @@ -17,6 +17,10 @@ package main +import ( + "fmt" +) + //go:generate stringer -type=eventName -output=eventname_stringer.go type eventName int @@ -29,8 +33,9 @@ const ( eventBackPoke ) -// Resp is a channel to be used for boolean responses. -type Resp chan bool +// Resp is a channel to be used for boolean responses. A nil represents an ACK, +// and a non-nil represents a NACK (false). This also lets us use custom errors. +type Resp chan error // Event is the main struct that stores event information and responses. type Event struct { @@ -55,28 +60,43 @@ func (event *Event) NACK() { } } +// ACKNACK sends a custom ACK or NACK message on the channel if one was requested. +func (event *Event) ACKNACK(err error) { + if event.Resp != nil { // if they've requested a NACK + event.Resp.ACKNACK(err) + } +} + // NewResp is just a helper to return the right type of response channel. func NewResp() Resp { - resp := make(chan bool) + resp := make(chan error) return resp } // ACK sends a true value to resp. func (resp Resp) ACK() { if resp != nil { - resp <- true + resp <- nil } } // NACK sends a false value to resp. func (resp Resp) NACK() { if resp != nil { - resp <- false + resp <- fmt.Errorf("NACK") + } +} + +// ACKNACK sends a custom ACK or NACK. The ACK value is always nil, the NACK can +// be any non-nil error value. +func (resp Resp) ACKNACK(err error) { + if resp != nil { + resp <- err } } // Wait waits for any response from a Resp channel and returns it. -func (resp Resp) Wait() bool { +func (resp Resp) Wait() error { return <-resp } @@ -84,7 +104,7 @@ func (resp Resp) Wait() bool { func (resp Resp) ACKWait() { for { // wait until true value - if resp.Wait() { + if resp.Wait() == nil { return } } diff --git a/exec.go b/exec.go index a57da1bf..b63c7a42 100644 --- a/exec.go +++ b/exec.go @@ -22,9 +22,11 @@ import ( "bytes" "encoding/gob" "errors" + "fmt" "log" "os/exec" "strings" + "time" ) func init() { @@ -105,15 +107,70 @@ func (obj *ExecRes) BufioChanScanner(scanner *bufio.Scanner) (chan string, chan } // Watch is the primary listener for this resource and it outputs events. -func (obj *ExecRes) Watch(processChan chan Event) { +func (obj *ExecRes) Watch(processChan chan Event, delay time.Duration) error { if obj.IsWatching() { - return + return nil } obj.SetWatching(true) defer obj.SetWatching(false) cuuid := obj.converger.Register() defer cuuid.Unregister() + var doSend func() (bool, error) // lol, golang doesn't support recursive lambdas + doSend = func() (bool, error) { + resp := NewResp() + processChan <- Event{eventNil, resp, "", true} // trigger process + select { + case e := <-resp: // wait for the ACK() + if e != nil { // we got a NACK + return true, e // exit with error + } + + case event := <-obj.events: + // NOTE: this code should match the similar code below! + cuuid.SetConverged(false) + if exit, send := obj.ReadEvent(&event); exit { + return true, nil // exit, without error + } else if send { + return doSend() // recurse + } + } + return false, nil // return, no error or exit signal + } + + // if a retry-delay was requested, wait, but don't block our events! + if delay > 0 { + var pendingSendEvent bool + timer := time.NewTimer(delay) + Loop: + for { + select { + case <-timer.C: // the wait is over + break Loop // critical + + case event := <-obj.events: + // NOTE: this code should match the similar code below! + cuuid.SetConverged(false) + if exit, send := obj.ReadEvent(&event); exit { + return nil // exit + } else if send { + // NOTE: see long comment in the file resource + //if exit, err := doSend(); exit || err != nil { + // return err // we exit or bubble up a NACK... + //} + pendingSendEvent = true // all events are identical for now... + } + } + } + timer.Stop() // it's nice to cleanup + log.Printf("%s[%s]: Delay expired!", obj.Kind(), obj.GetName()) + if pendingSendEvent { // TODO: should this become a list in the future? + if exit, err := doSend(); exit || err != nil { + return err // we exit or bubble up a NACK... + } + } + } + var send = false // send event? var exit = false bufioch, errch := make(chan string), make(chan error) @@ -138,8 +195,7 @@ func (obj *ExecRes) Watch(processChan chan Event) { cmdReader, err := cmd.StdoutPipe() if err != nil { - log.Printf("%v[%v]: Error creating StdoutPipe for Cmd: %v", obj.Kind(), obj.GetName(), err) - log.Fatal(err) // XXX: how should we handle errors? + return fmt.Errorf("%s[%s]: Error creating StdoutPipe for Cmd: %v", obj.Kind(), obj.GetName(), err) } scanner := bufio.NewScanner(cmdReader) @@ -150,8 +206,7 @@ func (obj *ExecRes) Watch(processChan chan Event) { cmd.Process.Kill() // TODO: is this necessary? }() if err := cmd.Start(); err != nil { - log.Printf("%v[%v]: Error starting Cmd: %v", obj.Kind(), obj.GetName(), err) - log.Fatal(err) // XXX: how should we handle errors? + return fmt.Errorf("%s[%s]: Error starting Cmd: %v", obj.Kind(), obj.GetName(), err) } bufioch, errch = obj.BufioChanScanner(scanner) @@ -169,21 +224,19 @@ func (obj *ExecRes) Watch(processChan chan Event) { } case err := <-errch: - cuuid.SetConverged(false) // XXX ? - if err == nil { // EOF + cuuid.SetConverged(false) + if err == nil { // EOF // FIXME: add an "if watch command ends/crashes" // restart or generate error option - log.Printf("%v[%v]: Reached EOF", obj.Kind(), obj.GetName()) - return + return fmt.Errorf("%s[%s]: Reached EOF", obj.Kind(), obj.GetName()) } - log.Printf("%v[%v]: Error reading input?: %v", obj.Kind(), obj.GetName(), err) - log.Fatal(err) - // XXX: how should we handle errors? + // error reading input? + return fmt.Errorf("Unknown %s[%s] error: %v", obj.Kind(), obj.GetName(), err) case event := <-obj.events: cuuid.SetConverged(false) if exit, send = obj.ReadEvent(&event); exit { - return // exit + return nil // exit } case <-cuuid.ConvergedTimer(): @@ -196,9 +249,9 @@ func (obj *ExecRes) Watch(processChan chan Event) { send = false // it is okay to invalidate the clean state on poke too obj.isStateOK = false // something made state dirty - resp := NewResp() - processChan <- Event{eventNil, resp, "", true} // trigger process - resp.ACKWait() // wait for the ACK() + if exit, err := doSend(); exit || err != nil { + return err // we exit or bubble up a NACK... + } } } } diff --git a/file.go b/file.go index 11ed6fcd..999cbccd 100644 --- a/file.go +++ b/file.go @@ -34,6 +34,7 @@ import ( "path/filepath" "strings" "syscall" + "time" ) func init() { @@ -143,7 +144,7 @@ func (obj *FileRes) addSubFolders(p string) error { // look at all subfolders... walkFn := func(path string, info os.FileInfo, err error) error { if DEBUG { - log.Printf("File[%v]: Walk: %s (%v): %v", obj.GetName(), path, info, err) + log.Printf("%s[%s]: Walk: %s (%v): %v", obj.Kind(), obj.GetName(), path, info, err) } if err != nil { return nil @@ -164,22 +165,94 @@ func (obj *FileRes) addSubFolders(p string) error { // Watch is the primary listener for this resource and it outputs events. // This one is a file watcher for files and directories. // Modify with caution, it is probably important to write some test cases first! +// If the Watch returns an error, it means that something has gone wrong, and it +// must be restarted. On a clean exit it returns nil. The delay parameter asks +// it to respect this pause duration before trying to watch again. // FIXME: Also watch the source directory when using obj.Source !!! -func (obj *FileRes) Watch(processChan chan Event) { +func (obj *FileRes) Watch(processChan chan Event, delay time.Duration) error { if obj.IsWatching() { - return + return nil // TODO: should this be an error? } obj.SetWatching(true) defer obj.SetWatching(false) cuuid := obj.converger.Register() defer cuuid.Unregister() + var doSend func() (bool, error) // lol, golang doesn't support recursive lambdas + doSend = func() (bool, error) { + resp := NewResp() + processChan <- Event{eventNil, resp, "", true} // trigger process + select { + case e := <-resp: // wait for the ACK() + if e != nil { // we got a NACK + return true, e // exit with error + } + + case event := <-obj.events: + // NOTE: this code should match the similar code below! + cuuid.SetConverged(false) + if exit, send := obj.ReadEvent(&event); exit { + return true, nil // exit, without error + } else if send { + return doSend() // recurse + } + } + return false, nil // return, no error or exit signal + } + + // if a retry-delay was requested, wait, but don't block our events! + if delay > 0 { + var pendingSendEvent bool + timer := time.NewTimer(delay) + Loop: + for { + select { + case <-timer.C: // the wait is over + break Loop // critical + + case event := <-obj.events: + // NOTE: this code should match the similar code below! + cuuid.SetConverged(false) + if exit, send := obj.ReadEvent(&event); exit { + return nil // exit + } else if send { + // if we dive down this rabbit hole, our + // timer.C won't get seen until we get out! + // in this situation, the Watch() is blocked + // from performing until CheckApply returns + // successfully, or errors out. This isn't + // so bad, but we should document it. Is it + // possible that some resource *needs* Watch + // to run to be able to execute a CheckApply? + // That situation shouldn't be common, and + // should probably not be allowed. Can we + // avoid it though? + //if exit, err := doSend(); exit || err != nil { + // return err // we exit or bubble up a NACK... + //} + // Instead of doing the above, we can + // add events to a pending list, and + // when we finish the delay, we can run + // them. + pendingSendEvent = true // all events are identical for now... + } + } + } + timer.Stop() // it's nice to cleanup + log.Printf("%s[%s]: Delay expired!", obj.Kind(), obj.GetName()) + if pendingSendEvent { // TODO: should this become a list in the future? + if exit, err := doSend(); exit || err != nil { + return err // we exit or bubble up a NACK... + } + } + } + var safename = path.Clean(obj.path) // no trailing slash var err error obj.watcher, err = fsnotify.NewWatcher() if err != nil { - log.Fatal(err) + return err } defer obj.watcher.Close() @@ -201,7 +274,7 @@ func (obj *FileRes) Watch(processChan chan Event) { if obj.isDir { if err := obj.addSubFolders(safename); err != nil { - log.Fatal(err) // TODO: temporary until we support errors + return err } } for { @@ -210,24 +283,25 @@ func (obj *FileRes) Watch(processChan chan Event) { current = "/" } if DEBUG { - log.Printf("File[%v]: Watching: %v", obj.GetName(), current) // attempting to watch... + log.Printf("%s[%s]: Watching: %v", obj.Kind(), obj.GetName(), current) // attempting to watch... } // initialize in the loop so that we can reset on rm-ed handles err = obj.watcher.Add(current) if err != nil { if DEBUG { - log.Printf("File[%v]: watcher.Add(%v): Error: %v", obj.GetName(), current, err) + log.Printf("%s[%s]: watcher.Add(%v): Error: %v", obj.Kind(), obj.GetName(), current, err) } if err == syscall.ENOENT { index-- // usually not found, move up one dir } else if err == syscall.ENOSPC { - // XXX: occasionally: no space left on device, - // XXX: probably due to lack of inotify watches - log.Printf("%v[%v]: Out of inotify watches!", obj.Kind(), obj.GetName()) - log.Fatal(err) + // no space left on device, out of inotify watches + // TODO: consider letting the user fall back to + // polling if they hit this error very often... + return fmt.Errorf("%s[%s]: Out of inotify watches: %v", obj.Kind(), obj.GetName(), err) + } else if os.IsPermission(err) { + return fmt.Errorf("%s[%s]: Permission denied to add a watch: %v", obj.Kind(), obj.GetName(), err) } else { - log.Printf("Unknown file[%v] error:", obj.Name) - log.Fatal(err) + return fmt.Errorf("Unknown %s[%s] error: %v", obj.Kind(), obj.GetName(), err) } index = int(math.Max(1, float64(index))) continue @@ -237,7 +311,7 @@ func (obj *FileRes) Watch(processChan chan Event) { select { case event := <-obj.watcher.Events: if DEBUG { - log.Printf("File[%v]: Watch(%v), Event(%v): %v", obj.GetName(), current, event.Name, event.Op) + log.Printf("%s[%s]: Watch(%s), Event(%s): %v", obj.Kind(), obj.GetName(), current, event.Name, event.Op) } cuuid.SetConverged(false) // XXX: technically i can detect if the event is erroneous or not first // the deeper you go, the bigger the deltaDepth is... @@ -263,7 +337,7 @@ func (obj *FileRes) Watch(processChan chan Event) { obj.watcher.Add(event.Name) obj.watches[event.Name] = struct{}{} if err := obj.addSubFolders(event.Name); err != nil { - log.Fatal(err) // TODO: temporary until we support errors + return err } } } @@ -286,7 +360,7 @@ func (obj *FileRes) Watch(processChan chan Event) { if obj.isDir { if err := obj.addSubFolders(safename); err != nil { - log.Fatal(err) // TODO: temporary until we support errors + return err } } @@ -331,15 +405,13 @@ func (obj *FileRes) Watch(processChan chan Event) { } case err := <-obj.watcher.Errors: - cuuid.SetConverged(false) // XXX ? - log.Printf("error: %v", err) - log.Fatal(err) - //obj.events <- fmt.Sprintf("file: %v", "error") // XXX: how should we handle errors? + cuuid.SetConverged(false) + return fmt.Errorf("Unknown %s[%s] watcher error: %v", obj.Kind(), obj.GetName(), err) case event := <-obj.events: cuuid.SetConverged(false) if exit, send = obj.ReadEvent(&event); exit { - return // exit + return nil // exit } //dirty = false // these events don't invalidate state @@ -356,9 +428,9 @@ func (obj *FileRes) Watch(processChan chan Event) { dirty = false obj.isStateOK = false // something made state dirty } - resp := NewResp() - processChan <- Event{eventNil, resp, "", true} // trigger process - resp.ACKWait() // wait for the ACK() + if exit, err := doSend(); exit || err != nil { + return err // we exit or bubble up a NACK... + } } } } diff --git a/noop.go b/noop.go index b1547ed0..12bcb750 100644 --- a/noop.go +++ b/noop.go @@ -20,6 +20,7 @@ package main import ( "encoding/gob" "log" + "time" ) func init() { @@ -57,15 +58,70 @@ func (obj *NoopRes) Validate() bool { } // Watch is the primary listener for this resource and it outputs events. -func (obj *NoopRes) Watch(processChan chan Event) { +func (obj *NoopRes) Watch(processChan chan Event, delay time.Duration) error { if obj.IsWatching() { - return + return nil // TODO: should this be an error? } obj.SetWatching(true) defer obj.SetWatching(false) cuuid := obj.converger.Register() defer cuuid.Unregister() + var doSend func() (bool, error) // lol, golang doesn't support recursive lambdas + doSend = func() (bool, error) { + resp := NewResp() + processChan <- Event{eventNil, resp, "", true} // trigger process + select { + case e := <-resp: // wait for the ACK() + if e != nil { // we got a NACK + return true, e // exit with error + } + + case event := <-obj.events: + // NOTE: this code should match the similar code below! + cuuid.SetConverged(false) + if exit, send := obj.ReadEvent(&event); exit { + return true, nil // exit, without error + } else if send { + return doSend() // recurse + } + } + return false, nil // return, no error or exit signal + } + + // if a retry-delay was requested, wait, but don't block our events! + if delay > 0 { + var pendingSendEvent bool + timer := time.NewTimer(delay) + Loop: + for { + select { + case <-timer.C: // the wait is over + break Loop // critical + + case event := <-obj.events: + // NOTE: this code should match the similar code below! + cuuid.SetConverged(false) + if exit, send := obj.ReadEvent(&event); exit { + return nil // exit + } else if send { + // NOTE: see long comment in the file resource + //if exit, err := doSend(); exit || err != nil { + // return err // we exit or bubble up a NACK... + //} + pendingSendEvent = true // all events are identical for now... + } + } + } + timer.Stop() // it's nice to cleanup + log.Printf("%s[%s]: Delay expired!", obj.Kind(), obj.GetName()) + if pendingSendEvent { // TODO: should this become a list in the future? + if exit, err := doSend(); exit || err != nil { + return err // we exit or bubble up a NACK... + } + } + } + var send = false // send event? var exit = false for { @@ -75,7 +131,7 @@ func (obj *NoopRes) Watch(processChan chan Event) { cuuid.SetConverged(false) // we avoid sending events on unpause if exit, send = obj.ReadEvent(&event); exit { - return // exit + return nil // exit } case <-cuuid.ConvergedTimer(): @@ -88,9 +144,9 @@ func (obj *NoopRes) Watch(processChan chan Event) { send = false // only do this on certain types of events //obj.isStateOK = false // something made state dirty - resp := NewResp() - processChan <- Event{eventNil, resp, "", true} // trigger process - resp.ACKWait() // wait for the ACK() + if exit, err := doSend(); exit || err != nil { + return err // we exit or bubble up a NACK... + } } } } diff --git a/pgraph.go b/pgraph.go index 4f463242..f759001f 100644 --- a/pgraph.go +++ b/pgraph.go @@ -23,6 +23,7 @@ import ( "fmt" "io/ioutil" "log" + "math" "os" "os/exec" "sort" @@ -718,7 +719,7 @@ func (g *Graph) BackPoke(v *Vertex) { // Process is the primary function to execute for a particular vertex in the graph. // XXX: rename this function -func (g *Graph) Process(v *Vertex) { +func (g *Graph) Process(v *Vertex) error { obj := v.Res if DEBUG { log.Printf("%v[%v]: Process()", obj.Kind(), obj.GetName()) @@ -765,10 +766,132 @@ func (g *Graph) Process(v *Vertex) { g.Poke(v, apply) } // poke at our pre-req's instead since they need to refresh/run... + return err } else { // only poke at the pre-req's that need to run go g.BackPoke(v) } + return nil +} + +// SentinelErr is a sentinal as an error type that wraps an arbitrary error. +type SentinelErr struct { + err error +} + +// Error is the required method to fulfill the error type. +func (obj *SentinelErr) Error() string { + return obj.err.Error() +} + +// Worker is the common run frontend of the vertex. It handles all of the retry +// and retry delay common code, and ultimately returns the final status of this +// vertex execution. +func (g *Graph) Worker(v *Vertex) error { + // listen for chan events from Watch() and run + // the Process() function when they're received + // this avoids us having to pass the data into + // the Watch() function about which graph it is + // running on, which isolates things nicely... + chanProcess := make(chan Event) + go func() { + running := false + var timer = time.NewTimer(time.Duration(math.MaxInt64)) // longest duration + if !timer.Stop() { + <-timer.C // unnecessary, shouldn't happen + } + var delay = time.Duration(v.Meta().Delay) * time.Millisecond + var retry int16 = v.Meta().Retry // number of tries left, -1 for infinite + var saved Event + Loop: + for { + // this has to be synchronous, because otherwise the Res + // event loop will keep running and change state, + // causing the converged timeout to fire! + select { + case event, ok := <-chanProcess: // must use like this + if running && ok { + // we got an event that wasn't a close, + // while we were waiting for the timer! + // if this happens, it might be a bug:( + log.Fatalf("%v[%v]: Worker: Unexpected event: %+v", v.Kind(), v.GetName(), event) + } + if !ok { // chanProcess closed, let's exit + break Loop // no event, so no ack! + } + + // the above mentioned synchronous part, is the + // running of this function, paired with an ack. + if e := g.Process(v); e != nil { + saved = event + log.Printf("%v[%v]: CheckApply errored: %v", v.Kind(), v.GetName(), e) + if retry == 0 { + // wrap the error in the sentinel + event.ACKNACK(&SentinelErr{e}) // fail the Watch() + break Loop + } + if retry > 0 { // don't decrement the -1 + retry-- + } + log.Printf("%v[%v]: CheckApply: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), delay.Seconds(), retry) + // start the timer... + timer.Reset(delay) + running = true + continue + } + retry = v.Meta().Retry // reset on success + event.ACK() // sync + + case <-timer.C: + if !timer.Stop() { + //<-timer.C // blocks, docs are wrong! + } + running = false + // re-send this failed event, to trigger a CheckApply() + go func() { chanProcess <- saved }() + // TODO: should we send a fake event instead? + //saved = nil + } + } + }() + var err error // propagate the error up (this is a permanent BAD error!) + // the watch delay runs inside of the Watch resource loop, so that it + // can still process signals and exit if needed. It shouldn't run any + // resource specific code since this is supposed to be a retry delay. + // NOTE: we're using the same retry and delay metaparams that CheckApply + // uses. This is for practicality. We can separate them later if needed! + var watchDelay time.Duration + var watchRetry int16 = v.Meta().Retry // number of tries left, -1 for infinite + // watch blocks until it ends, & errors to retry + for { + // TODO: reset the watch retry count after some amount of success + e := v.Res.Watch(chanProcess, watchDelay) + if e == nil { // exit signal + err = nil // clean exit + break + } + if sentinelErr, ok := e.(*SentinelErr); ok { // unwrap the sentinel + err = sentinelErr.err + break // sentinel means, perma-exit + } + log.Printf("%v[%v]: Watch errored: %v", v.Kind(), v.GetName(), e) + if watchRetry == 0 { + err = fmt.Errorf("Permanent watch error: %v", e) + break + } + if watchRetry > 0 { // don't decrement the -1 + watchRetry-- + } + watchDelay = time.Duration(v.Meta().Delay) * time.Millisecond + log.Printf("%v[%v]: Watch: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), watchDelay.Seconds(), watchRetry) + // We trigger a CheckApply if watch restarts, so that we catch + // any possible events that happened while down. NOTE: this is + // only flood-safe if the Watch resource de-duplicates similar + // send event messages. It does for now, rethink this later... + v.SendEvent(eventPoke, false, false) + } + close(chanProcess) + return err } // Start is a main kick to start the graph. It goes through in reverse topological @@ -787,25 +910,13 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue // see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/ go func(vv *Vertex) { defer wg.Done() - // listen for chan events from Watch() and run - // the Process() function when they're received - // this avoids us having to pass the data into - // the Watch() function about which graph it is - // running on, which isolates things nicely... - chanProcess := make(chan Event) - go func() { - for event := range chanProcess { - // this has to be synchronous, - // because otherwise the Res - // event loop will keep running - // and change state, causing the - // converged timeout to fire! - g.Process(vv) - event.ACK() // sync - } - }() - vv.Res.Watch(chanProcess) // i block until i end - close(chanProcess) + // TODO: if a sufficient number of workers error, + // should something be done? Will these restart + // after perma-failure if we have a graph change? + if err := g.Worker(vv); err != nil { // contains the Watch and CheckApply loops + log.Printf("%s[%s]: Exited with failure: %v", vv.Kind(), vv.GetName(), err) + return + } log.Printf("%v[%v]: Exited", vv.Kind(), vv.GetName()) }(v) } diff --git a/pgraph_test.go b/pgraph_test.go index d8872259..8e12f88a 100644 --- a/pgraph_test.go +++ b/pgraph_test.go @@ -25,6 +25,7 @@ import ( "sort" "strings" "testing" + "time" ) func TestPgraphT1(t *testing.T) { @@ -1282,3 +1283,13 @@ func TestPgraphGroupingConnected1(t *testing.T) { } runGraphCmp(t, g1, g2) } + +func TestDurationAssumptions(t *testing.T) { + var d time.Duration + if (d == 0) != true { + t.Errorf("Empty time.Duration is no longer equal to zero!") + } + if (d > 0) != false { + t.Errorf("Empty time.Duration is now greater than zero!") + } +} diff --git a/pkg.go b/pkg.go index c84e7f02..2d906835 100644 --- a/pkg.go +++ b/pkg.go @@ -25,6 +25,7 @@ import ( "log" "path" "strings" + "time" ) func init() { @@ -108,15 +109,70 @@ func (obj *PkgRes) Validate() bool { // It uses the PackageKit UpdatesChanged signal to watch for changes. // TODO: https://github.com/hughsie/PackageKit/issues/109 // TODO: https://github.com/hughsie/PackageKit/issues/110 -func (obj *PkgRes) Watch(processChan chan Event) { +func (obj *PkgRes) Watch(processChan chan Event, delay time.Duration) error { if obj.IsWatching() { - return + return nil } obj.SetWatching(true) defer obj.SetWatching(false) cuuid := obj.converger.Register() defer cuuid.Unregister() + var doSend func() (bool, error) // lol, golang doesn't support recursive lambdas + doSend = func() (bool, error) { + resp := NewResp() + processChan <- Event{eventNil, resp, "", true} // trigger process + select { + case e := <-resp: // wait for the ACK() + if e != nil { // we got a NACK + return true, e // exit with error + } + + case event := <-obj.events: + // NOTE: this code should match the similar code below! + cuuid.SetConverged(false) + if exit, send := obj.ReadEvent(&event); exit { + return true, nil // exit, without error + } else if send { + return doSend() // recurse + } + } + return false, nil // return, no error or exit signal + } + + // if a retry-delay was requested, wait, but don't block our events! + if delay > 0 { + var pendingSendEvent bool + timer := time.NewTimer(delay) + Loop: + for { + select { + case <-timer.C: // the wait is over + break Loop // critical + + case event := <-obj.events: + // NOTE: this code should match the similar code below! + cuuid.SetConverged(false) + if exit, send := obj.ReadEvent(&event); exit { + return nil // exit + } else if send { + // NOTE: see long comment in the file resource + //if exit, err := doSend(); exit || err != nil { + // return err // we exit or bubble up a NACK... + //} + pendingSendEvent = true // all events are identical for now... + } + } + } + timer.Stop() // it's nice to cleanup + log.Printf("%s[%s]: Delay expired!", obj.Kind(), obj.GetName()) + if pendingSendEvent { // TODO: should this become a list in the future? + if exit, err := doSend(); exit || err != nil { + return err // we exit or bubble up a NACK... + } + } + } + bus := NewBus() if bus == nil { log.Fatal("Can't connect to PackageKit bus.") @@ -159,7 +215,7 @@ func (obj *PkgRes) Watch(processChan chan Event) { case event := <-obj.events: cuuid.SetConverged(false) if exit, send = obj.ReadEvent(&event); exit { - return // exit + return nil // exit } dirty = false // these events don't invalidate state @@ -176,9 +232,9 @@ func (obj *PkgRes) Watch(processChan chan Event) { dirty = false obj.isStateOK = false // something made state dirty } - resp := NewResp() - processChan <- Event{eventNil, resp, "", true} // trigger process - resp.ACKWait() // wait for the ACK() + if exit, err := doSend(); exit || err != nil { + return err // we exit or bubble up a NACK... + } } } } diff --git a/resources.go b/resources.go index 8f72e386..2f1f643f 100644 --- a/resources.go +++ b/resources.go @@ -23,6 +23,7 @@ import ( "encoding/gob" "fmt" "log" + "time" ) //go:generate stringer -type=resState -output=resstate_stringer.go @@ -64,6 +65,11 @@ type MetaParams struct { AutoEdge bool `yaml:"autoedge"` // metaparam, should we generate auto edges? // XXX should default to true AutoGroup bool `yaml:"autogroup"` // metaparam, should we auto group? // XXX should default to true Noop bool `yaml:"noop"` + // NOTE: there are separate Watch and CheckApply retry and delay values, + // but I've decided to use the same ones for both until there's a proper + // reason to want to do something differently for the Watch errors. + Retry int16 `yaml:"retry"` // metaparam, number of times to retry on error. -1 for infinite + Delay uint64 `yaml:"delay"` // metaparam, number of milliseconds to wait between retries } // The Base interface is everything that is common to all resources. @@ -94,8 +100,8 @@ type Res interface { Base // include everything from the Base interface Init() //Validate() bool // TODO: this might one day be added - GetUUIDs() []ResUUID // most resources only return one - Watch(chan Event) // send on channel to signal process() events + GetUUIDs() []ResUUID // most resources only return one + Watch(chan Event, time.Duration) error // send on channel to signal process() events CheckApply(bool) (bool, error) AutoEdges() AutoEdge Compare(Res) bool @@ -226,15 +232,10 @@ func (obj *BaseRes) SendEvent(event eventName, sync bool, activity bool) bool { return true } - resp := make(chan bool) + resp := NewResp() obj.events <- Event{event, resp, "", activity} - for { - value := <-resp - // wait until true value - if value { - return true - } - } + resp.ACKWait() // waits until true (nil) value + return true } // ReadEvent processes events when a select gets one, and handles the pause diff --git a/svc.go b/svc.go index 19546a91..0bcacb9c 100644 --- a/svc.go +++ b/svc.go @@ -27,6 +27,7 @@ import ( systemdUtil "github.com/coreos/go-systemd/util" "github.com/godbus/dbus" // namespace collides with systemd wrapper "log" + "time" ) func init() { @@ -71,30 +72,85 @@ func (obj *SvcRes) Validate() bool { } // Watch is the primary listener for this resource and it outputs events. -func (obj *SvcRes) Watch(processChan chan Event) { +func (obj *SvcRes) Watch(processChan chan Event, delay time.Duration) error { if obj.IsWatching() { - return + return nil } obj.SetWatching(true) defer obj.SetWatching(false) cuuid := obj.converger.Register() defer cuuid.Unregister() + var doSend func() (bool, error) // lol, golang doesn't support recursive lambdas + doSend = func() (bool, error) { + resp := NewResp() + processChan <- Event{eventNil, resp, "", true} // trigger process + select { + case e := <-resp: // wait for the ACK() + if e != nil { // we got a NACK + return true, e // exit with error + } + + case event := <-obj.events: + // NOTE: this code should match the similar code below! + cuuid.SetConverged(false) + if exit, send := obj.ReadEvent(&event); exit { + return true, nil // exit, without error + } else if send { + return doSend() // recurse + } + } + return false, nil // return, no error or exit signal + } + + // if a retry-delay was requested, wait, but don't block our events! + if delay > 0 { + var pendingSendEvent bool + timer := time.NewTimer(delay) + Loop: + for { + select { + case <-timer.C: // the wait is over + break Loop // critical + + case event := <-obj.events: + // NOTE: this code should match the similar code below! + cuuid.SetConverged(false) + if exit, send := obj.ReadEvent(&event); exit { + return nil // exit + } else if send { + // NOTE: see long comment in the file resource + //if exit, err := doSend(); exit || err != nil { + // return err // we exit or bubble up a NACK... + //} + pendingSendEvent = true // all events are identical for now... + } + } + } + timer.Stop() // it's nice to cleanup + log.Printf("%s[%s]: Delay expired!", obj.Kind(), obj.GetName()) + if pendingSendEvent { // TODO: should this become a list in the future? + if exit, err := doSend(); exit || err != nil { + return err // we exit or bubble up a NACK... + } + } + } + // obj.Name: svc name if !systemdUtil.IsRunningSystemd() { - log.Fatal("Systemd is not running.") + return fmt.Errorf("Systemd is not running.") } conn, err := systemd.NewSystemdConnection() // needs root access if err != nil { - log.Fatal("Failed to connect to systemd: ", err) + return fmt.Errorf("Failed to connect to systemd: %s", err) } defer conn.Close() // if we share the bus with others, we will get each others messages!! bus, err := SystemBusPrivateUsable() // don't share the bus connection! if err != nil { - log.Fatal("Failed to connect to bus: ", err) + return fmt.Errorf("Failed to connect to bus: %s", err) } // XXX: will this detect new units? @@ -157,7 +213,7 @@ func (obj *SvcRes) Watch(processChan chan Event) { case event := <-obj.events: cuuid.SetConverged(false) if exit, send = obj.ReadEvent(&event); exit { - return // exit + return nil // exit } if event.GetActivity() { dirty = true @@ -187,7 +243,7 @@ func (obj *SvcRes) Watch(processChan chan Event) { } else if event[svc].ActiveState == "inactive" { log.Printf("Svc[%v]->Stopped!()", svc) } else { - log.Fatal("Unknown svc state: ", event[svc].ActiveState) + log.Fatalf("Unknown svc state: %s", event[svc].ActiveState) } } else { // svc stopped (and ActiveState is nil...) @@ -197,15 +253,13 @@ func (obj *SvcRes) Watch(processChan chan Event) { dirty = true case err := <-subErrors: - cuuid.SetConverged(false) // XXX ? - log.Printf("error: %v", err) - log.Fatal(err) - //vertex.events <- fmt.Sprintf("svc: %v", "error") // XXX: how should we handle errors? + cuuid.SetConverged(false) + return fmt.Errorf("Unknown %s[%s] error: %v", obj.Kind(), obj.GetName(), err) case event := <-obj.events: cuuid.SetConverged(false) if exit, send = obj.ReadEvent(&event); exit { - return // exit + return nil // exit } if event.GetActivity() { dirty = true @@ -223,11 +277,10 @@ func (obj *SvcRes) Watch(processChan chan Event) { dirty = false obj.isStateOK = false // something made state dirty } - resp := NewResp() - processChan <- Event{eventNil, resp, "", true} // trigger process - resp.ACKWait() // wait for the ACK() + if exit, err := doSend(); exit || err != nil { + return err // we exit or bubble up a NACK... + } } - } } diff --git a/timer.go b/timer.go index 689c2246..dea911f1 100644 --- a/timer.go +++ b/timer.go @@ -65,20 +65,74 @@ func (obj *TimerRes) Validate() bool { } // Watch is the primary listener for this resource and it outputs events. -func (obj *TimerRes) Watch(processChan chan Event) { +func (obj *TimerRes) Watch(processChan chan Event, delay time.Duration) error { if obj.IsWatching() { - return + return nil + } + obj.SetWatching(true) + defer obj.SetWatching(false) + cuuid := obj.converger.Register() + defer cuuid.Unregister() + + var doSend func(string) (bool, error) // lol, golang doesn't support recursive lambdas + doSend = func(comment string) (bool, error) { + resp := NewResp() + processChan <- Event{eventNil, resp, comment, true} // trigger process + select { + case e := <-resp: // wait for the ACK() + if e != nil { // we got a NACK + return true, e // exit with error + } + + case event := <-obj.events: + // NOTE: this code should match the similar code below! + cuuid.SetConverged(false) + if exit, send := obj.ReadEvent(&event); exit { + return true, nil // exit, without error + } else if send { + return doSend(comment) // recurse + } + } + return false, nil // return, no error or exit signal + } + + // if a retry-delay was requested, wait, but don't block our events! + if delay > 0 { + var pendingSendEvent bool + timer := time.NewTimer(delay) + Loop: + for { + select { + case <-timer.C: // the wait is over + break Loop // critical + + case event := <-obj.events: + // NOTE: this code should match the similar code below! + cuuid.SetConverged(false) + if exit, send := obj.ReadEvent(&event); exit { + return nil // exit + } else if send { + // NOTE: see long comment in the file resource + //if exit, err := doSend(); exit || err != nil { + // return err // we exit or bubble up a NACK... + //} + pendingSendEvent = true // all events are identical for now... + } + } + } + timer.Stop() // it's nice to cleanup + log.Printf("%s[%s]: Delay expired!", obj.Kind(), obj.GetName()) + if pendingSendEvent { // TODO: should this become a list in the future? + if exit, err := doSend("pending delayed event"); exit || err != nil { + return err // we exit or bubble up a NACK... + } + } } // Create a time.Ticker for the given interval ticker := time.NewTicker(time.Duration(obj.Interval) * time.Second) defer ticker.Stop() - obj.SetWatching(true) - defer obj.SetWatching(false) - cuuid := obj.converger.Register() - defer cuuid.Unregister() - var send = false for { @@ -90,7 +144,7 @@ func (obj *TimerRes) Watch(processChan chan Event) { case event := <-obj.events: cuuid.SetConverged(false) if exit, _ := obj.ReadEvent(&event); exit { - return + return nil } case <-cuuid.ConvergedTimer(): cuuid.SetConverged(true) @@ -99,9 +153,9 @@ func (obj *TimerRes) Watch(processChan chan Event) { if send { send = false obj.isStateOK = false - resp := NewResp() - processChan <- Event{eventNil, resp, "timer ticked", true} - resp.ACKWait() + if exit, err := doSend("timer ticked"); exit || err != nil { + return err // we exit or bubble up a NACK... + } } } }