diff --git a/docs/resource-guide.md b/docs/resource-guide.md index 841ebd0e..bf27be6e 100644 --- a/docs/resource-guide.md +++ b/docs/resource-guide.md @@ -307,21 +307,18 @@ running. The lifetime of most resources `Watch` method should be spent in an infinite loop that is bounded by a `select` call. The `select` call is the point where our method hands back control to the engine (and the kernel) so that we can -sleep until something of interest wakes us up. In this loop we must process -events from the engine via the `<-obj.init.Events` channel, and receive events -for our resource itself! +sleep until something of interest wakes us up. In this loop we must wait until +we get a shutdown event from the engine via the `<-obj.init.Done` channel, which +closes when we'd like to shut everything down. At this point you should cleanup, +and let `Watch` close. #### Events -If we receive an internal event from the `<-obj.init.Events` channel, we should -read it with the `obj.init.Read` helper function. This function tells us if we -should shutdown our resource. It also handles pause functionality which blocks -our resource temporarily in this method. If this channel shuts down, then we -should treat that as an exit signal. - -When we want to send an event, we use the `Event` helper function. It is also -important to mark the resource state as `dirty` if we believe it might have -changed. We do this by calling the `obj.init.Dirty` function. +If the `<-obj.init.Done` channel closes, we should shutdown our resource. When +When we want to send an event, we use the `Event` helper function. This +automatically marks the resource state as `dirty`. If you're unsure, it's not +harmful to send the event. This will ultimately cause `CheckApply` to run. This +method can block if the resource is being paused. #### Startup @@ -330,8 +327,7 @@ to generate one event to notify the `mgmt` engine that we're now listening successfully, so that it can run an initial `CheckApply` to ensure we're safely tracking a healthy state and that we didn't miss anything when `Watch` was down or from before `mgmt` was running. You must do this by calling the -`obj.init.Running` method. If it returns an error, you must exit and return that -error. +`obj.init.Running` method. #### Converged @@ -358,41 +354,29 @@ func (obj *FooRes) Watch() error { defer obj.whatever.CloseFoo() // shutdown our Foo // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running var send = false // send event? for { select { - case event, ok := <-obj.init.Events: - if !ok { - // shutdown engine - // (it is okay if some `defer` code runs first) - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } - // the actual events! case event := <-obj.foo.Events: if is_an_event { send = true - obj.init.Dirty() // dirty } // event errors case err := <-obj.foo.Errors: return err // will cause a retry or permanent failure + + case <-obj.init.Done: // signal for shutdown request + return nil } // do all our event sending all together to avoid duplicate msgs if send { send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } + obj.init.Event() } } } @@ -567,23 +551,10 @@ ready to detect changes. Event sends an event notifying the engine of a possible state change. It is only called from within `Watch`. -### Events +### Done -Events is a channel that we must watch for messages from the engine. When it -closes, this is a signal to shutdown. It is -only called from within `Watch`. - -### Read - -Read processes messages that come in from the `Events` channel. It is a helper -method that knows how to handle the pause mechanism correctly. It is -only called from within `Watch`. - -### Dirty - -Dirty marks the resource state as dirty. This signals to the engine that -CheckApply will have some work to do in order to converge it. It is -only called from within `Watch`. +Done is a channel that closes when the engine wants us to shutdown. It is only +called from within `Watch`. ### Refresh diff --git a/engine/error.go b/engine/error.go index e1339dde..74b3fb30 100644 --- a/engine/error.go +++ b/engine/error.go @@ -24,9 +24,6 @@ type Error string func (e Error) Error() string { return string(e) } const ( - // ErrWatchExit represents an exit from the Watch loop via chan closure. - ErrWatchExit = Error("watch exit") - - // ErrSignalExit represents an exit from the Watch loop via exit signal. - ErrSignalExit = Error("signal exit") + // ErrClosed means we couldn't complete a task because we had closed. + ErrClosed = Error("closed") ) diff --git a/engine/event/event.go b/engine/event/event.go deleted file mode 100644 index 54ec5ff6..00000000 --- a/engine/event/event.go +++ /dev/null @@ -1,83 +0,0 @@ -// Mgmt -// Copyright (C) 2013-2018+ James Shubin and the project contributors -// Written by James Shubin and the project contributors -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -// Package event provides some primitives that are used for message passing. -package event - -//go:generate stringer -type=Kind -output=kind_stringer.go - -// Kind represents the type of event being passed. -type Kind int - -// The different event kinds are used in different contexts. -const ( - KindNil Kind = iota - KindStart - KindPause - KindPoke - KindExit -) - -// Pre-built messages so they can be used directly without having to use NewMsg. -// These are useful when we don't want a response via ACK(). -var ( - Start = &Msg{Kind: KindStart} - Pause = &Msg{Kind: KindPause} // probably unused b/c we want a resp - Poke = &Msg{Kind: KindPoke} - Exit = &Msg{Kind: KindExit} -) - -// Msg is an event primitive that represents a kind of event, and optionally a -// request for an ACK. -type Msg struct { - Kind Kind - - resp chan struct{} -} - -// NewMsg builds a new message struct. It will want an ACK. If you don't want an -// ACK then use the pre-built messages in the package variable globals. -func NewMsg(kind Kind) *Msg { - return &Msg{ - Kind: kind, - resp: make(chan struct{}), - } -} - -// CanACK determines if an ACK is possible for this message. It does not say -// whether one has already been sent or not. -func (obj *Msg) CanACK() bool { - return obj.resp != nil -} - -// ACK acknowledges the event. It must not be called more than once for the same -// event. It unblocks the past and future calls of Wait for this event. -func (obj *Msg) ACK() { - close(obj.resp) -} - -// Wait on ACK for this event. It doesn't matter if this runs before or after -// the ACK. It will unblock either way. -// TODO: consider adding a context if it's ever useful. -func (obj *Msg) Wait() error { - select { - //case <-ctx.Done(): - // return ctx.Err() - case <-obj.resp: - return nil - } -} diff --git a/engine/graph/actions.go b/engine/graph/actions.go index 253c02d6..31d73c6b 100644 --- a/engine/graph/actions.go +++ b/engine/graph/actions.go @@ -24,12 +24,10 @@ import ( "time" "github.com/purpleidea/mgmt/engine" - "github.com/purpleidea/mgmt/engine/event" "github.com/purpleidea/mgmt/pgraph" - //multierr "github.com/hashicorp/go-multierror" + multierr "github.com/hashicorp/go-multierror" errwrap "github.com/pkg/errors" - "golang.org/x/time/rate" ) // OKTimestamp returns true if this vertex can run right now. @@ -67,26 +65,24 @@ func (obj *Engine) Process(vertex pgraph.Vertex) error { return fmt.Errorf("vertex is not a Res") } - // Engine Guarantee: Do not allow CheckApply to run while we are paused. - // This makes the resource able to know that synchronous channel sending - // to the main loop select in Watch from within CheckApply, will succeed - // without blocking because the resource went into a paused state. If we - // are using the Poll metaparam, then Watch will (of course) not be run. - // FIXME: should this lock be here, or wrapped right around CheckApply ? - obj.state[vertex].eventsLock.Lock() // this lock is taken within Event() - defer obj.state[vertex].eventsLock.Unlock() - // backpoke! (can be async) if vs := obj.BadTimestamps(vertex); len(vs) > 0 { // back poke in parallel (sync b/c of waitgroup) + wg := &sync.WaitGroup{} for _, v := range obj.graph.IncomingGraphVertices(vertex) { if !pgraph.VertexContains(v, vs) { // only poke what's needed continue } - go obj.state[v].Poke() // async + // doesn't really need to be in parallel, but we can... + wg.Add(1) + go func(vv pgraph.Vertex) { + defer wg.Done() + obj.state[vv].Poke() // async + }(v) } + wg.Wait() return nil // can't continue until timestamp is in sequence } @@ -244,14 +240,17 @@ func (obj *Engine) Process(vertex pgraph.Vertex) error { // Worker is the common run frontend of the vertex. It handles all of the retry // and retry delay common code, and ultimately returns the final status of this -// vertex execution. +// vertex execution. This function cannot be "re-run" for the same vertex. The +// retry mechanism stuff happens inside of this. To actually "re-run" you need +// to remove the vertex and build a new one. The engine guarantees that we do +// not allow CheckApply to run while we are paused. That is enforced here. func (obj *Engine) Worker(vertex pgraph.Vertex) error { res, isRes := vertex.(engine.Res) if !isRes { return fmt.Errorf("vertex is not a resource") } - defer close(obj.state[vertex].stopped) // done signal + //defer close(obj.state[vertex].stopped) // done signal obj.state[vertex].cuid = obj.Converger.Register() obj.state[vertex].tuid = obj.Converger.Register() @@ -265,214 +264,140 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { obj.state[vertex].wg.Add(1) go func() { defer obj.state[vertex].wg.Done() - defer close(obj.state[vertex].outputChan) // we close this on behalf of res + defer close(obj.state[vertex].eventsChan) // we close this on behalf of res - var err error - var retry = res.MetaParams().Retry // lookup the retry value - var delay uint64 - for { // retry loop - // a retry-delay was requested, wait, but don't block events! - if delay > 0 { - errDelayExpired := engine.Error("delay exit") - err = func() error { // slim watch main loop - timer := time.NewTimer(time.Duration(delay) * time.Millisecond) - defer obj.state[vertex].init.Logf("the Watch delay expired!") - defer timer.Stop() // it's nice to cleanup - for { - select { - case <-timer.C: // the wait is over - return errDelayExpired // special + // This is a close reverse-multiplexer. If any of the channels + // close, then it will cause the doneChan to close. That way, + // multiple different folks can send a close signal, without + // every worrying about duplicate channel close panics. + obj.state[vertex].wg.Add(1) + go func() { + defer obj.state[vertex].wg.Done() - case event, ok := <-obj.state[vertex].init.Events: - if !ok { - return nil - } - if err := obj.state[vertex].init.Read(event); err != nil { - return err - } - } - } - }() - if err == errDelayExpired { - delay = 0 // reset - continue - } - } else if interval := res.MetaParams().Poll; interval > 0 { // poll instead of watching :( - obj.state[vertex].cuid.StartTimer() - err = obj.state[vertex].poll(interval) - obj.state[vertex].cuid.StopTimer() // clean up nicely - } else { - obj.state[vertex].cuid.StartTimer() - obj.Logf("Watch(%s)", vertex) - err = res.Watch() // run the watch normally - obj.Logf("Watch(%s): Exited(%+v)", vertex, err) - obj.state[vertex].cuid.StopTimer() // clean up nicely + // reverse-multiplexer: any close, causes *the* close! + select { + case <-obj.state[vertex].processDone: + case <-obj.state[vertex].watchDone: + case <-obj.state[vertex].removeDone: + case <-obj.state[vertex].eventsDone: } - if err == nil || err == engine.ErrWatchExit || err == engine.ErrSignalExit { - return // exited cleanly, we're done - } - // we've got an error... - delay = res.MetaParams().Delay - if retry < 0 { // infinite retries - obj.state[vertex].reset() - continue - } - if retry > 0 { // don't decrement past 0 - retry-- - obj.state[vertex].init.Logf("retrying Watch after %.4f seconds (%d left)", float64(delay)/1000, retry) - obj.state[vertex].reset() - continue - } - //if retry == 0 { // optional - // err = errwrap.Wrapf(err, "permanent watch error") - //} - break // break out of this and send the error + // the main "done" signal gets activated here! + close(obj.state[vertex].doneChan) + }() + + obj.Logf("Watch(%s)", vertex) + err := res.Watch() // run the watch normally + obj.Logf("Watch(%s): Exited(%+v)", vertex, err) + if err == nil { // || err == engine.ErrClosed + return // exited cleanly, we're done } + // this section sends an error... // If the CheckApply loop exits and THEN the Watch fails with an // error, then we'd be stuck here if exit signal didn't unblock! select { - case obj.state[vertex].outputChan <- errwrap.Wrapf(err, "watch failed"): + case obj.state[vertex].eventsChan <- errwrap.Wrapf(err, "watch failed"): // send - case <-obj.state[vertex].exit.Signal(): - // pass } }() - // bonus safety check - if res.MetaParams().Burst == 0 && !(res.MetaParams().Limit == rate.Inf) { // blocked - return fmt.Errorf("permanently limited (rate != Inf, burst = 0)") - } - var limiter = rate.NewLimiter(res.MetaParams().Limit, res.MetaParams().Burst) - // It is important that we shutdown the Watch loop if this exits. - // Example, if Process errors permanently, we should ask Watch to exit. - defer obj.state[vertex].Event(event.Exit) // signal an exit - for { + // If this exits cleanly, we must unblock the reverse-multiplexer. + // I think this additional close is unnecessary, but it's not harmful. + defer close(obj.state[vertex].eventsDone) // causes doneChan to close + var reterr error + var failed bool // has Process permanently failed? +Loop: + for { // process loop select { - case err, ok := <-obj.state[vertex].outputChan: // read from watch channel + case err, ok := <-obj.state[vertex].eventsChan: // read from watch channel if !ok { - return nil + return reterr // we only return when chan closes } + // If the Watch method exits with an error, then this + // channel will get that error propagated to it, which + // we then save so we can return it to the caller of us. if err != nil { - return err // permanent failure + failed = true + close(obj.state[vertex].watchDone) // causes doneChan to close + reterr = multierr.Append(reterr, err) // permanent failure + continue + } + if obj.Debug { + obj.Logf("event received") } - // safe to go run the process... - case <-obj.state[vertex].exit.Signal(): // TODO: is this needed? - return nil + case _, ok := <-obj.state[vertex].pokeChan: // read from buffered poke channel + if !ok { // we never close it + panic("unexpected close of poke channel") + } + if obj.Debug { + obj.Logf("poke received") + } + } + if failed { // don't Process anymore if we've already failed... + continue Loop } - now := time.Now() - r := limiter.ReserveN(now, 1) // one event - // r.OK() seems to always be true here! - d := r.DelayFrom(now) - if d > 0 { // delay - obj.state[vertex].init.Logf("limited (rate: %v/sec, burst: %d, next: %v)", res.MetaParams().Limit, res.MetaParams().Burst, d) - var count int - timer := time.NewTimer(time.Duration(d) * time.Millisecond) - LimitWait: - for { - select { - case <-timer.C: // the wait is over - break LimitWait - - // consume other events while we're waiting... - case e, ok := <-obj.state[vertex].outputChan: // read from watch channel - if !ok { - // FIXME: is this logic correct? - if count == 0 { - return nil - } - // loop, because we have - // the previous event to - // run process on first! - continue - } - if e != nil { - return e // permanent failure - } - count++ // count the events... - limiter.ReserveN(time.Now(), 1) // one event - } + // drop redundant pokes + for len(obj.state[vertex].pokeChan) > 0 { + select { + case <-obj.state[vertex].pokeChan: + default: + // race, someone else read one! } - timer.Stop() // it's nice to cleanup - obj.state[vertex].init.Logf("rate limiting expired!") + } + + // pause if one was requested... + select { + case <-obj.state[vertex].pauseSignal: // channel closes + // NOTE: If we allowed a doneChan below to let us out + // of the resumeSignal wait, then we could loop around + // and run this again, causing a panic. Instead of this + // being made safe with a sync.Once, we instead run a + // Resume() call inside of the vertexRemoveFn function, + // which should unblock it when we're going to need to. + obj.state[vertex].pausedAck.Ack() // send ack + // we are paused now, and waiting for resume or exit... + select { + case <-obj.state[vertex].resumeSignal: // channel closes + // resumed! + // pass through to allow a Process to try to run + // TODO: consider adding this fast pause here... + //if obj.fastPause { + // obj.Logf("fast pausing on resume") + // continue + //} + } + default: + // no pause requested, keep going... } var err error - var retry = res.MetaParams().Retry // lookup the retry value - var delay uint64 - Loop: - for { // retry loop - if delay > 0 { - var count int - timer := time.NewTimer(time.Duration(delay) * time.Millisecond) - RetryWait: - for { - select { - case <-timer.C: // the wait is over - break RetryWait - - // consume other events while we're waiting... - case e, ok := <-obj.state[vertex].outputChan: // read from watch channel - if !ok { - // FIXME: is this logic correct? - if count == 0 { - // last process error - return err - } - // loop, because we have - // the previous event to - // run process on first! - continue - } - if e != nil { - return e // permanent failure - } - count++ // count the events... - limiter.ReserveN(time.Now(), 1) // one event - } - } - timer.Stop() // it's nice to cleanup - delay = 0 // reset - obj.state[vertex].init.Logf("the CheckApply delay expired!") - } - - if obj.Debug { - obj.Logf("Process(%s)", vertex) - } - err = obj.Process(vertex) - if obj.Debug { - obj.Logf("Process(%s): Return(%+v)", vertex, err) - } - if err == nil { - break Loop - } - // we've got an error... - delay = res.MetaParams().Delay - - if retry < 0 { // infinite retries - continue - } - if retry > 0 { // don't decrement past 0 - retry-- - obj.state[vertex].init.Logf("retrying CheckApply after %.4f seconds (%d left)", float64(delay)/1000, retry) - continue - } - //if retry == 0 { // optional - // err = errwrap.Wrapf(err, "permanent process error") - //} - - // If this exits, defer calls: obj.Event(event.Exit), - // which will cause the Watch loop to shutdown. Also, - // if the Watch loop shuts down, that will cause this - // Process loop to shut down. Also the graph sync can - // run an: obj.Event(event.Exit) which causes this to - // shutdown as well. Lastly, it is possible that more - // that one of these scenarios happens simultaneously. - return err + if obj.Debug { + obj.Logf("Process(%s)", vertex) } - } + err = obj.Process(vertex) + if obj.Debug { + obj.Logf("Process(%s): Return(%+v)", vertex, err) + } + + // It is important that we shutdown the Watch loop if this dies. + // If Process fails permanently, we ask it to exit right here... + if err != nil { + failed = true + close(obj.state[vertex].processDone) // causes doneChan to close + reterr = multierr.Append(reterr, err) // permanent failure + continue + } + + // When this Process loop exits, it's because something has + // caused Watch() to shutdown (even if it's our permanent + // failure from Process), which caused this channel to close. + // On or more exit signals are possible, and more than one can + // happen simultaneously. + + } // process loop + //return nil // unreachable } diff --git a/engine/graph/engine.go b/engine/graph/engine.go index f47289b1..a3495b6b 100644 --- a/engine/graph/engine.go +++ b/engine/graph/engine.go @@ -25,7 +25,6 @@ import ( "github.com/purpleidea/mgmt/converger" "github.com/purpleidea/mgmt/engine" - "github.com/purpleidea/mgmt/engine/event" "github.com/purpleidea/mgmt/pgraph" "github.com/purpleidea/mgmt/util/semaphore" @@ -50,13 +49,14 @@ type Engine struct { graph *pgraph.Graph nextGraph *pgraph.Graph state map[pgraph.Vertex]*State - waits map[pgraph.Vertex]*sync.WaitGroup + waits map[pgraph.Vertex]*sync.WaitGroup // wg for the Worker func slock *sync.Mutex // semaphore lock semas map[string]*semaphore.Semaphore - wg *sync.WaitGroup + wg *sync.WaitGroup // wg for the whole engine (only used for close) + paused bool // are we paused? fastPause bool } @@ -84,6 +84,8 @@ func (obj *Engine) Init() error { obj.wg = &sync.WaitGroup{} + obj.paused = true // start off true, so we can Resume after first Commit + return nil } @@ -137,6 +139,7 @@ func (obj *Engine) Apply(fn func(*pgraph.Graph) error) error { func (obj *Engine) Commit() error { // TODO: Does this hurt performance or graph changes ? + start := []func() error{} // functions to run after graphsync to start... vertexAddFn := func(vertex pgraph.Vertex) error { // some of these validation steps happen before this Commit step // in Validate() to avoid erroring here. These are redundant. @@ -192,12 +195,36 @@ func (obj *Engine) Commit() error { if err := obj.state[vertex].Init(); err != nil { return errwrap.Wrapf(err, "the Res did not Init") } + + fn := func() error { + // start the Worker + obj.wg.Add(1) + obj.waits[vertex].Add(1) + go func(v pgraph.Vertex) { + defer obj.wg.Done() + defer obj.waits[v].Done() + + obj.Logf("Worker(%s)", v) + // contains the Watch and CheckApply loops + err := obj.Worker(v) + obj.Logf("Worker(%s): Exited(%+v)", v, err) + obj.state[v].workerErr = err // store the error + // If the Rewatch metaparam is true, then this will get + // restarted if we do a graph cmp swap. This is why the + // graph cmp function runs the removes before the adds. + // XXX: This should feed into an $error var in the lang. + }(vertex) + return nil + } + start = append(start, fn) // do this at the end, if it's needed return nil } + free := []func() error{} // functions to run after graphsync to reset... vertexRemoveFn := func(vertex pgraph.Vertex) error { // wait for exit before starting new graph! - obj.state[vertex].Event(event.Exit) // signal an exit + close(obj.state[vertex].removeDone) // causes doneChan to close + obj.state[vertex].Resume() // unblock from resume obj.waits[vertex].Wait() // sync // close the state and resource @@ -216,15 +243,58 @@ func (obj *Engine) Commit() error { return nil } + // add the Worker swap (reload) on error decision into this vertexCmpFn + vertexCmpFn := func(v1, v2 pgraph.Vertex) (bool, error) { + r1, ok1 := v1.(engine.Res) + r2, ok2 := v2.(engine.Res) + if !ok1 || !ok2 { // should not happen, previously validated + return false, fmt.Errorf("not a Res") + } + m1 := r1.MetaParams() + m2 := r2.MetaParams() + swap1, swap2 := true, true // assume default of true + if m1 != nil { + swap1 = m1.Rewatch + } + if m2 != nil { + swap2 = m2.Rewatch + } + + s1, ok1 := obj.state[v1] + s2, ok2 := obj.state[v2] + x1, x2 := false, false + if ok1 { + x1 = s1.workerErr != nil && swap1 + } + if ok2 { + x2 = s2.workerErr != nil && swap2 + } + + if x1 || x2 { + // We swap, even if they're the same, so that we reload! + // This causes an add and remove of the "same" vertex... + return false, nil + } + + return engine.VertexCmpFn(v1, v2) // do the normal cmp otherwise + } + // If GraphSync succeeds, it updates the receiver graph accordingly... // Running the shutdown in vertexRemoveFn does not need to happen in a // topologically sorted order because it already paused in that order. obj.Logf("graph sync...") - if err := obj.graph.GraphSync(obj.nextGraph, engine.VertexCmpFn, vertexAddFn, vertexRemoveFn, engine.EdgeCmpFn); err != nil { + if err := obj.graph.GraphSync(obj.nextGraph, vertexCmpFn, vertexAddFn, vertexRemoveFn, engine.EdgeCmpFn); err != nil { return errwrap.Wrapf(err, "error running graph sync") } - // we run these afterwards, so that the state structs (that might get - // referenced) aren't destroyed while someone might poke or use one. + // We run these afterwards, so that we don't unnecessarily start anyone + // if GraphSync failed in some way. Otherwise we'd have to do clean up! + for _, fn := range start { + if err := fn(); err != nil { + return errwrap.Wrapf(err, "error running start fn") + } + } + // We run these afterwards, so that the state structs (that might get + // referenced) are not destroyed while someone might poke or use one. for _, fn := range free { if err := fn(); err != nil { return errwrap.Wrapf(err, "error running free fn") @@ -248,50 +318,28 @@ func (obj *Engine) Commit() error { return nil } -// Start runs the currently active graph. It also un-pauses the graph if it was -// paused. -func (obj *Engine) Start() error { +// Resume runs the currently active graph. It also un-pauses the graph if it was +// paused. Very little that is interesting should happen here. It all happens in +// the Commit method. After Commit, new things are already started, but we still +// need to Resume any pre-existing resources. +func (obj *Engine) Resume() error { + if !obj.paused { + return fmt.Errorf("already resumed") + } + topoSort, err := obj.graph.TopologicalSort() if err != nil { return err } - indegree := obj.graph.InDegree() // compute all of the indegree's + //indegree := obj.graph.InDegree() // compute all of the indegree's reversed := pgraph.Reverse(topoSort) for _, vertex := range reversed { - state := obj.state[vertex] - state.starter = (indegree[vertex] == 0) - var unpause = true // assume true - - if !state.working { // if not running... - state.working = true - unpause = false // doesn't need unpausing if starting - obj.wg.Add(1) - obj.waits[vertex].Add(1) - go func(v pgraph.Vertex) { - defer obj.wg.Done() - defer obj.waits[vertex].Done() - defer func() { - obj.state[v].working = false - }() - - obj.Logf("Worker(%s)", v) - // contains the Watch and CheckApply loops - err := obj.Worker(v) - obj.Logf("Worker(%s): Exited(%+v)", v, err) - }(vertex) - } - - select { - case <-state.started: - case <-state.stopped: // we failed on Watch start - } - - if unpause { // unpause (if needed) - obj.state[vertex].Event(event.Start) - } + //obj.state[vertex].starter = (indegree[vertex] == 0) + obj.state[vertex].Resume() // doesn't error } // we wait for everyone to start before exiting! + obj.paused = false return nil } @@ -302,22 +350,32 @@ func (obj *Engine) Start() error { // This is because once you've started a fast pause, some dependencies might // have been skipped when fast pausing, and future resources might have missed a // poke. In general this is only called when you're trying to hurry up the exit. +// XXX: Not implemented func (obj *Engine) SetFastPause() { obj.fastPause = true } -// Pause the active, running graph. At the moment this cannot error. -func (obj *Engine) Pause(fastPause bool) { +// Pause the active, running graph. +func (obj *Engine) Pause(fastPause bool) error { + if obj.paused { + return fmt.Errorf("already paused") + } + obj.fastPause = fastPause topoSort, _ := obj.graph.TopologicalSort() for _, vertex := range topoSort { // squeeze out the events... // The Event is sent to an unbuffered channel, so this event is // synchronous, and as a result it blocks until it is received. - obj.state[vertex].Event(event.Pause) + if err := obj.state[vertex].Pause(); err != nil && err != engine.ErrClosed { + return err + } } + obj.paused = true + // we are now completely paused... obj.fastPause = false // reset + return nil } // Close triggers a shutdown. Engine must be already paused before this is run. diff --git a/engine/graph/graph_test.go b/engine/graph/graph_test.go new file mode 100644 index 00000000..bb4a074e --- /dev/null +++ b/engine/graph/graph_test.go @@ -0,0 +1,37 @@ +// Mgmt +// Copyright (C) 2013-2018+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +// +build !root + +package graph + +import ( + "fmt" + "testing" + + multierr "github.com/hashicorp/go-multierror" +) + +func TestMultiErr(t *testing.T) { + var err error + e := fmt.Errorf("some error") + err = multierr.Append(err, e) // build an error from a nil base + // ensure that this lib allows us to append to a nil + if err == nil { + t.Errorf("missing error") + } +} diff --git a/engine/graph/state.go b/engine/graph/state.go index 6f714591..4c98e0ba 100644 --- a/engine/graph/state.go +++ b/engine/graph/state.go @@ -19,14 +19,11 @@ package graph import ( "fmt" - "os" - "path" "sync" "time" "github.com/purpleidea/mgmt/converger" "github.com/purpleidea/mgmt/engine" - "github.com/purpleidea/mgmt/engine/event" "github.com/purpleidea/mgmt/pgraph" "github.com/purpleidea/mgmt/util" @@ -61,29 +58,49 @@ type State struct { timestamp int64 // last updated timestamp isStateOK bool // is state OK or do we need to run CheckApply ? + workerErr error // did the Worker error? - // events is a channel of incoming events which is read by the Watch - // loop for that resource. It receives events like pause, start, and - // poke. The channel shuts down to signal for Watch to exit. - eventsChan chan *event.Msg // incoming to resource - eventsLock *sync.Mutex // lock around sending and closing of events channel - eventsDone bool // is channel closed? + // doneChan closes when Watch should shut down. When any of the + // following channels close, it causes this to close. + doneChan chan struct{} - // outputChan is the channel that the engine listens on for events from + // processDone is closed when the Process/CheckApply function fails + // permanently, and wants to cause Watch to exit. + processDone chan struct{} + // watchDone is closed when the Watch function fails permanently, and we + // close this to signal we should definitely exit. (Often redundant.) + watchDone chan struct{} + // removeDone is closed when the vertexRemoveFn method asks for an exit. + // This happens when we're switching graphs. The switch to an "empty" is + // the equivalent of asking for a final shutdown. + removeDone chan struct{} + // eventsDone is closed when we shutdown the Process loop because we + // closed without error. In theory this shouldn't happen, but it could + // if Watch returns without error for some reason. + eventsDone chan struct{} + + // eventsChan is the channel that the engine listens on for events from // the Watch loop for that resource. The event is nil normally, except // when events are sent on this channel from the engine. This only // happens as a signaling mechanism when Watch has shutdown and we want // to notify the Process loop which reads from this. - outputChan chan error // outgoing from resource + eventsChan chan error // outgoing from resource - wg *sync.WaitGroup - exit *util.EasyExit + // pokeChan is a separate channel that the Process loop listens on to + // know when we might need to run Process. It never closes, and is safe + // to send on since it is buffered. + pokeChan chan struct{} // outgoing from resource - started chan struct{} // closes when it's started - stopped chan struct{} // closes when it's stopped + // paused represents if this particular res is paused or not. + paused bool + // pauseSignal closes to request a pause of this resource. + pauseSignal chan struct{} + // resumeSignal closes to request a resume of this resource. + resumeSignal chan struct{} + // pausedAck is used to send an ack message saying that we've paused. + pausedAck *util.EasyAck - starter bool // do we have an indegree of 0 ? - working bool // is the Main() loop running ? + wg *sync.WaitGroup // used for all vertex specific processes cuid *converger.UID // primary converger tuid *converger.UID // secondary converger @@ -93,17 +110,6 @@ type State struct { // Init initializes structures like channels. func (obj *State) Init() error { - obj.eventsChan = make(chan *event.Msg) - obj.eventsLock = &sync.Mutex{} - - obj.outputChan = make(chan error) - - obj.wg = &sync.WaitGroup{} - obj.exit = util.NewEasyExit() - - obj.started = make(chan struct{}) - obj.stopped = make(chan struct{}) - res, isRes := obj.Vertex.(engine.Res) if !isRes { return fmt.Errorf("vertex is not a Res") @@ -121,6 +127,24 @@ func (obj *State) Init() error { return fmt.Errorf("the Logf function is missing") } + obj.doneChan = make(chan struct{}) + + obj.processDone = make(chan struct{}) + obj.watchDone = make(chan struct{}) + obj.removeDone = make(chan struct{}) + obj.eventsDone = make(chan struct{}) + + obj.eventsChan = make(chan error) + + obj.pokeChan = make(chan struct{}, 1) // must be buffered + + //obj.paused = false // starts off as started + obj.pauseSignal = make(chan struct{}) + //obj.resumeSignal = make(chan struct{}) // happens on pause + //obj.pausedAck = util.NewEasyAck() // happens on pause + + obj.wg = &sync.WaitGroup{} + //obj.cuid = obj.Converger.Register() // gets registered in Worker() //obj.tuid = obj.Converger.Register() // gets registered in Worker() @@ -129,24 +153,9 @@ func (obj *State) Init() error { Hostname: obj.Hostname, // Watch: - Running: func() error { - obj.tuid.StopTimer() - close(obj.started) // this is reset in the reset func - obj.isStateOK = false // assume we're initially dirty - // optimization: skip the initial send if not a starter - // because we'll get poked from a starter soon anyways! - if !obj.starter { - return nil - } - return obj.event() - }, - Event: obj.event, - Events: obj.eventsChan, - Read: obj.read, - Dirty: func() { // TODO: should we rename this SetDirty? - obj.tuid.StopTimer() - obj.isStateOK = false - }, + Running: obj.event, + Event: obj.event, + Done: obj.doneChan, // CheckApply: Refresh: func() bool { @@ -231,187 +240,91 @@ func (obj *State) Close() error { return err } -// reset is run to reset the state so that Watch can run a second time. Thus is -// needed for the Watch retry in particular. -func (obj *State) reset() { - obj.started = make(chan struct{}) - obj.stopped = make(chan struct{}) -} - -// Poke sends a nil message on the outputChan. This channel is used by the -// resource to signal a possible change. This will cause the Process loop to -// run if it can. +// Poke sends a notification on the poke channel. This channel is used to notify +// the Worker to run the Process/CheckApply when it can. This is used when there +// is a need to schedule or reschedule some work which got postponed or dropped. +// This doesn't contain any internal synchronization primitives or wait groups, +// callers are expected to make sure that they don't leave any of these running +// by the time the Worker() shuts down. func (obj *State) Poke() { - // add a wait group on the vertex we're poking! - obj.wg.Add(1) - defer obj.wg.Done() - - // now that we've added to the wait group, obj.outputChan won't close... - // so see if there's an exit signal before we release the wait group! - // XXX: i don't think this is necessarily happening, but maybe it is? - // XXX: re-write some of the engine to ensure that: "the sender closes"! - select { - case <-obj.exit.Signal(): - return // skip sending the poke b/c we're closing - default: - } + // redundant + //if len(obj.pokeChan) > 0 { + // return + //} select { - case obj.outputChan <- nil: - - case <-obj.exit.Signal(): + case obj.pokeChan <- struct{}{}: + default: // if chan is now full because more than one poke happened... } } -// Event sends a Pause or Start event to the resource. It can also be used to -// send Poke events, but it's much more efficient to send them directly instead -// of passing them through the resource. -func (obj *State) Event(msg *event.Msg) { - // TODO: should these happen after the lock? - obj.wg.Add(1) - defer obj.wg.Done() +// Pause pauses this resource. It should not be called on any already paused +// resource. It will block until the resource pauses with an acknowledgment, or +// until an exit for that resource is seen. If the latter happens it will error. +// It is NOT thread-safe with the Resume() method so only call either one at a +// time. +func (obj *State) Pause() error { + if obj.paused { + return fmt.Errorf("already paused") + } - obj.eventsLock.Lock() - defer obj.eventsLock.Unlock() + obj.pausedAck = util.NewEasyAck() + obj.resumeSignal = make(chan struct{}) // build the resume signal + close(obj.pauseSignal) + obj.Poke() // unblock and notice the pause if necessary - if obj.eventsDone { // closing, skip events... + // wait for ack (or exit signal) + select { + case <-obj.pausedAck.Wait(): // we got it! + // we're paused + case <-obj.doneChan: + return engine.ErrClosed + } + obj.paused = true + + return nil +} + +// Resume unpauses this resource. It can be safely called on a brand-new +// resource that has just started running without incident. It is NOT +// thread-safe with the Pause() method, so only call either one at a time. +func (obj *State) Resume() { + // TODO: do we need a mutex around Resume? + if !obj.paused { // no need to unpause brand-new resources return } - if msg.Kind == event.KindExit { // set this so future events don't deadlock - obj.Logf("exit event...") - obj.eventsDone = true - close(obj.eventsChan) // causes resource Watch loop to close - obj.exit.Done(nil) // trigger exit signal to unblock some cases - return - } + obj.pauseSignal = make(chan struct{}) // rebuild for next pause + close(obj.resumeSignal) + //obj.Poke() // not needed, we're already waiting for resume + + obj.paused = false + + // no need to wait for it to resume + //return // implied +} + +// event is a helper function to send an event to the CheckApply process loop. +// It can be used for the initial `running` event, or any regular event. You +// should instead use Poke() to "schedule" a new Process/CheckApply loop when +// one might be needed. This method will block until we're unpaused and ready to +// receive on the events channel. +func (obj *State) event() { + obj.setDirty() // assume we're initially dirty select { - case obj.eventsChan <- msg: - - case <-obj.exit.Signal(): + case obj.eventsChan <- nil: + // send! } + + //return // implied } -// read is a helper function used inside the main select statement of resources. -// If it returns an error, then this is a signal for the resource to exit. -func (obj *State) read(msg *event.Msg) error { - switch msg.Kind { - case event.KindPoke: - return obj.event() // a poke needs to cause an event... - case event.KindStart: - return fmt.Errorf("unexpected start") - case event.KindPause: - // pass - case event.KindExit: - return engine.ErrSignalExit - - default: - return fmt.Errorf("unhandled event: %+v", msg.Kind) - } - - // we're paused now - select { - case msg, ok := <-obj.eventsChan: - if !ok { - return engine.ErrWatchExit - } - switch msg.Kind { - case event.KindPoke: - return fmt.Errorf("unexpected poke") - case event.KindPause: - return fmt.Errorf("unexpected pause") - case event.KindStart: - // resumed - return nil - case event.KindExit: - return engine.ErrSignalExit - - default: - return fmt.Errorf("unhandled event: %+v", msg.Kind) - } - } -} - -// event is a helper function to send an event from the resource Watch loop. It -// can be used for the initial `running` event, or any regular event. If it -// returns an error, then the Watch loop must return this error and shutdown. -func (obj *State) event() error { - // loop until we sent on obj.outputChan or exit with error - for { - select { - // send "activity" event - case obj.outputChan <- nil: - return nil // sent event! - - // make sure to keep handling incoming - case msg, ok := <-obj.eventsChan: - if !ok { - return engine.ErrWatchExit - } - switch msg.Kind { - case event.KindPoke: - // we're trying to send an event, so swallow the - // poke: it's what we wanted to have happen here - continue - case event.KindStart: - return fmt.Errorf("unexpected start") - case event.KindPause: - // pass - case event.KindExit: - return engine.ErrSignalExit - - default: - return fmt.Errorf("unhandled event: %+v", msg.Kind) - } - } - - // we're paused now - select { - case msg, ok := <-obj.eventsChan: - if !ok { - return engine.ErrWatchExit - } - switch msg.Kind { - case event.KindPoke: - return fmt.Errorf("unexpected poke") - case event.KindPause: - return fmt.Errorf("unexpected pause") - case event.KindStart: - // resumed - case event.KindExit: - return engine.ErrSignalExit - - default: - return fmt.Errorf("unhandled event: %+v", msg.Kind) - } - } - } -} - -// varDir returns the path to a working directory for the resource. It will try -// and create the directory first, and return an error if this failed. The dir -// should be cleaned up by the resource on Close if it wishes to discard the -// contents. If it does not, then a future resource with the same kind and name -// may see those contents in that directory. The resource should clean up the -// contents before use if it is important that nothing exist. It is always -// possible that contents could remain after an abrupt crash, so do not store -// overly sensitive data unless you're aware of the risks. -func (obj *State) varDir(extra string) (string, error) { - // Using extra adds additional dirs onto our namespace. An empty extra - // adds no additional directories. - if obj.Prefix == "" { // safety - return "", fmt.Errorf("the VarDir prefix is empty") - } - - // an empty string at the end has no effect - p := fmt.Sprintf("%s/", path.Join(obj.Prefix, extra)) - if err := os.MkdirAll(p, 0770); err != nil { - return "", errwrap.Wrapf(err, "can't create prefix in: %s", p) - } - - // returns with a trailing slash as per the mgmt file res convention - return p, nil +// setDirty marks the resource state as dirty. This signals to the engine that +// CheckApply will have some work to do in order to converge it. +func (obj *State) setDirty() { + obj.tuid.StopTimer() + obj.isStateOK = false } // poll is a replacement for Watch when the Poll metaparameter is used. @@ -420,34 +333,17 @@ func (obj *State) poll(interval uint32) error { ticker := time.NewTicker(time.Duration(interval) * time.Second) defer ticker.Stop() - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running - var send = false // send event? for { select { case <-ticker.C: // received the timer event obj.init.Logf("polling...") - send = true - obj.init.Dirty() // dirty - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } + case <-obj.init.Done: // signal for shutdown request + return nil } - // do all our event sending all together to avoid duplicate msgs - if send { - send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } - } + obj.init.Event() // notify engine of an event (this can block) } } diff --git a/engine/graph/vardir.go b/engine/graph/vardir.go new file mode 100644 index 00000000..9fcd8f21 --- /dev/null +++ b/engine/graph/vardir.go @@ -0,0 +1,51 @@ +// Mgmt +// Copyright (C) 2013-2018+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package graph + +import ( + "fmt" + "os" + "path" + + errwrap "github.com/pkg/errors" +) + +// varDir returns the path to a working directory for the resource. It will try +// and create the directory first, and return an error if this failed. The dir +// should be cleaned up by the resource on Close if it wishes to discard the +// contents. If it does not, then a future resource with the same kind and name +// may see those contents in that directory. The resource should clean up the +// contents before use if it is important that nothing exist. It is always +// possible that contents could remain after an abrupt crash, so do not store +// overly sensitive data unless you're aware of the risks. +func (obj *State) varDir(extra string) (string, error) { + // Using extra adds additional dirs onto our namespace. An empty extra + // adds no additional directories. + if obj.Prefix == "" { // safety + return "", fmt.Errorf("the VarDir prefix is empty") + } + + // an empty string at the end has no effect + p := fmt.Sprintf("%s/", path.Join(obj.Prefix, extra)) + if err := os.MkdirAll(p, 0770); err != nil { + return "", errwrap.Wrapf(err, "can't create prefix in: %s", p) + } + + // returns with a trailing slash as per the mgmt file res convention + return p, nil +} diff --git a/engine/metaparams.go b/engine/metaparams.go index 23ad2d0d..773cd4bc 100644 --- a/engine/metaparams.go +++ b/engine/metaparams.go @@ -37,6 +37,8 @@ var DefaultMetaParams = &MetaParams{ Limit: rate.Inf, // defaults to no limit Burst: 0, // no burst needed on an infinite rate //Sema: []string{}, + Rewatch: true, + Realize: false, // true would be more awesome, but unexpected for users } // MetaRes is the interface a resource must implement to support meta params. @@ -81,6 +83,24 @@ type MetaParams struct { // has a count equal to 1, is different from a sema named `foo:1` which // also has a count equal to 1, but is a different semaphore. Sema []string `yaml:"sema"` + + // Rewatch specifies whether we re-run the Watch worker during a swap + // if it has errored. When doing a GraphCmp to swap the graphs, if this + // is true, and this particular worker has errored, then we'll remove it + // and add it back as a new vertex, thus causing it to run again. This + // is different from the Retry metaparam which applies during the normal + // execution. It is only when this is exhausted that we're in permanent + // worker failure, and only then can we rely on this metaparam. + Rewatch bool `yaml:"rewatch"` + + // Realize ensures that the resource is guaranteed to converge at least + // once before a potential graph swap removes or changes it. This + // guarantee is useful for fast changing graphs, to ensure that the + // brief creation of a resource is seen. This guarantee does not prevent + // against the engine quitting normally, and it can't guarantee it if + // the resource is blocked because of a failed pre-requisite resource. + // XXX: Not implemented! + Realize bool `yaml:"realize"` } // Cmp compares two AutoGroupMeta structs and determines if they're equivalent. @@ -118,6 +138,13 @@ func (obj *MetaParams) Cmp(meta *MetaParams) error { return errwrap.Wrapf(err, "values for Sema are different") } + if obj.Rewatch != meta.Rewatch { + return fmt.Errorf("values for Rewatch are different") + } + if obj.Realize != meta.Realize { + return fmt.Errorf("values for Realize are different") + } + return nil } @@ -147,13 +174,15 @@ func (obj *MetaParams) Copy() *MetaParams { copy(sema, obj.Sema) } return &MetaParams{ - Noop: obj.Noop, - Retry: obj.Retry, - Delay: obj.Delay, - Poll: obj.Poll, - Limit: obj.Limit, // FIXME: can we copy this type like this? test me! - Burst: obj.Burst, - Sema: sema, + Noop: obj.Noop, + Retry: obj.Retry, + Delay: obj.Delay, + Poll: obj.Poll, + Limit: obj.Limit, // FIXME: can we copy this type like this? test me! + Burst: obj.Burst, + Sema: sema, + Rewatch: obj.Rewatch, + Realize: obj.Realize, } } diff --git a/engine/resources.go b/engine/resources.go index fec342ed..abb76b05 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -21,8 +21,6 @@ import ( "encoding/gob" "fmt" - "github.com/purpleidea/mgmt/engine/event" - errwrap "github.com/pkg/errors" "gopkg.in/yaml.v2" ) @@ -93,22 +91,14 @@ type Init struct { // Called from within Watch: // Running must be called after your watches are all started and ready. - Running func() error + Running func() // Event sends an event notifying the engine of a possible state change. - Event func() error + Event func() - // Events returns a channel that we must watch for messages from the - // engine. When it closes, this is a signal to shutdown. - Events chan *event.Msg - - // Read processes messages that come in from the Events channel. It is a - // helper method that knows how to handle the pause mechanism correctly. - Read func(*event.Msg) error - - // Dirty marks the resource state as dirty. This signals to the engine - // that CheckApply will have some work to do in order to converge it. - Dirty func() + // Done returns a channel that will close to signal to us that it's time + // for us to shutdown. + Done chan struct{} // Called from within CheckApply: diff --git a/engine/resources/augeas.go b/engine/resources/augeas.go index 7e67d1be..bb4e3aed 100644 --- a/engine/resources/augeas.go +++ b/engine/resources/augeas.go @@ -135,10 +135,7 @@ func (obj *AugeasRes) Watch() error { } defer obj.recWatcher.Close() - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running var send = false // send event? for { @@ -158,23 +155,15 @@ func (obj *AugeasRes) Watch() error { obj.init.Logf("Event(%s): %v", event.Body.Name, event.Body.Op) } send = true - obj.init.Dirty() // dirty - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } // do all our event sending all together to avoid duplicate msgs if send { send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } + obj.init.Event() // notify engine of an event (this can block) } } } diff --git a/engine/resources/aws_ec2.go b/engine/resources/aws_ec2.go index a7a30e18..7dbe831e 100644 --- a/engine/resources/aws_ec2.go +++ b/engine/resources/aws_ec2.go @@ -423,9 +423,7 @@ func (obj *AwsEc2Res) longpollWatch() error { // We tell the engine that we're running right away. This is not correct, // but the api doesn't have a way to signal when the waiters are ready. - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running // cancellable context used for exiting cleanly ctx, cancel := context.WithCancel(context.TODO()) @@ -488,14 +486,6 @@ func (obj *AwsEc2Res) longpollWatch() error { // process events from the goroutine for { select { - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } - case msg, ok := <-obj.awsChan: if !ok { return nil @@ -509,15 +499,16 @@ func (obj *AwsEc2Res) longpollWatch() error { continue default: obj.init.Logf("State: %v", msg.state) - obj.init.Dirty() // dirty send = true } + + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } + if send { send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } + obj.init.Event() // notify engine of an event (this can block) } } } @@ -587,14 +578,6 @@ func (obj *AwsEc2Res) snsWatch() error { // process events for { select { - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } - case msg, ok := <-obj.awsChan: if !ok { return nil @@ -607,20 +590,19 @@ func (obj *AwsEc2Res) snsWatch() error { // is confirmed, we are ready to receive events, so we // can notify the engine that we're running. if msg.event == awsEc2EventWatchReady { - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running continue } obj.init.Logf("State: %v", msg.event) - obj.init.Dirty() // dirty send = true + + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } + if send { send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } + obj.init.Event() // notify engine of an event (this can block) } } } diff --git a/engine/resources/cron.go b/engine/resources/cron.go index 568d0603..195c3885 100644 --- a/engine/resources/cron.go +++ b/engine/resources/cron.go @@ -271,10 +271,7 @@ func (obj *CronRes) Watch() error { } defer obj.recWatcher.Close() - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running var send = false // send event? for { @@ -285,7 +282,7 @@ func (obj *CronRes) Watch() error { obj.init.Logf("%+v", event) } send = true - obj.init.Dirty() // dirty + case event, ok := <-obj.recWatcher.Events(): // process unit file recwatch events if !ok { // channel shutdown @@ -298,21 +295,14 @@ func (obj *CronRes) Watch() error { obj.init.Logf("Event(%s): %v", event.Body.Name, event.Body.Op) } send = true - obj.init.Dirty() // dirty - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } + + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } // do all our event sending all together to avoid duplicate msgs if send { send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } + obj.init.Event() // notify engine of an event (this can block) } } } diff --git a/engine/resources/docker_container.go b/engine/resources/docker_container.go index 399e3513..ed9acb21 100644 --- a/engine/resources/docker_container.go +++ b/engine/resources/docker_container.go @@ -168,10 +168,7 @@ func (obj *DockerContainerRes) Watch() error { eventChan, errChan := obj.client.Events(ctx, types.EventsOptions{}) - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running var send = false // send event? for { @@ -184,27 +181,21 @@ func (obj *DockerContainerRes) Watch() error { obj.init.Logf("%+v", event) } send = true - obj.init.Dirty() // dirty + case err, ok := <-errChan: if !ok { return nil } return err - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } + + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } // do all our event sending all together to avoid duplicate msgs if send { send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } + obj.init.Event() // notify engine of an event (this can block) } } } diff --git a/engine/resources/exec.go b/engine/resources/exec.go index b70e5957..4c697a54 100644 --- a/engine/resources/exec.go +++ b/engine/resources/exec.go @@ -157,10 +157,7 @@ func (obj *ExecRes) Watch() error { ioChan = obj.bufioChanScanner(scanner) } - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running var send = false // send event? for { @@ -180,24 +177,16 @@ func (obj *ExecRes) Watch() error { obj.init.Logf("watch output: %s", data.text) if data.text != "" { send = true - obj.init.Dirty() // dirty } - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } // do all our event sending all together to avoid duplicate msgs if send { send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } + obj.init.Event() // notify engine of an event (this can block) } } } diff --git a/engine/resources/exec_test.go b/engine/resources/exec_test.go index 7adb0054..d63e33f4 100644 --- a/engine/resources/exec_test.go +++ b/engine/resources/exec_test.go @@ -31,9 +31,6 @@ func fakeInit(t *testing.T) *engine.Init { t.Logf("test: "+format, v...) } return &engine.Init{ - Running: func() error { - return nil - }, Debug: debug, Logf: logf, } diff --git a/engine/resources/file.go b/engine/resources/file.go index bc31075d..97782f9e 100644 --- a/engine/resources/file.go +++ b/engine/resources/file.go @@ -194,10 +194,7 @@ func (obj *FileRes) Watch() error { } defer obj.recWatcher.Close() - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running var send = false // send event? for { @@ -217,23 +214,15 @@ func (obj *FileRes) Watch() error { obj.init.Logf("Event(%s): %v", event.Body.Name, event.Body.Op) } send = true - obj.init.Dirty() // dirty - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } // do all our event sending all together to avoid duplicate msgs if send { send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } + obj.init.Event() // notify engine of an event (this can block) } } } diff --git a/engine/resources/group.go b/engine/resources/group.go index a5efaee0..38cc11cd 100644 --- a/engine/resources/group.go +++ b/engine/resources/group.go @@ -85,10 +85,7 @@ func (obj *GroupRes) Watch() error { } defer obj.recWatcher.Close() - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running var send = false // send event? for { @@ -108,23 +105,15 @@ func (obj *GroupRes) Watch() error { obj.init.Logf("Event(%s): %v", event.Body.Name, event.Body.Op) } send = true - obj.init.Dirty() // dirty - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } // do all our event sending all together to avoid duplicate msgs if send { send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } + obj.init.Event() // notify engine of an event (this can block) } } } diff --git a/engine/resources/hostname.go b/engine/resources/hostname.go index 15e8c902..f26a191d 100644 --- a/engine/resources/hostname.go +++ b/engine/resources/hostname.go @@ -127,33 +127,22 @@ func (obj *HostnameRes) Watch() error { signals := make(chan *dbus.Signal, 10) // closed by dbus package bus.Signal(signals) - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running var send = false // send event? for { select { case <-signals: send = true - obj.init.Dirty() // dirty - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } // do all our event sending all together to avoid duplicate msgs if send { send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } + obj.init.Event() // notify engine of an event (this can block) } } } diff --git a/engine/resources/kv.go b/engine/resources/kv.go index 0a6a063b..65a0ecec 100644 --- a/engine/resources/kv.go +++ b/engine/resources/kv.go @@ -102,11 +102,7 @@ func (obj *KVRes) Close() error { // Watch is the primary listener for this resource and it outputs events. func (obj *KVRes) Watch() error { - - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running ch := obj.init.World.StrMapWatch(obj.Key) // get possible events! @@ -125,23 +121,15 @@ func (obj *KVRes) Watch() error { obj.init.Logf("Event!") } send = true - obj.init.Dirty() // dirty - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } // do all our event sending all together to avoid duplicate msgs if send { send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } + obj.init.Event() // notify engine of an event (this can block) } } } diff --git a/engine/resources/mount.go b/engine/resources/mount.go index f5cfcc29..4fe20de3 100644 --- a/engine/resources/mount.go +++ b/engine/resources/mount.go @@ -224,10 +224,7 @@ func (obj *MountRes) Watch() error { // close the recwatcher when we're done defer recWatcher.Close() - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // bubble up a NACK... - } + obj.init.Running() // when started, notify engine that we're running var send bool var done bool @@ -248,7 +245,6 @@ func (obj *MountRes) Watch() error { obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op) } - obj.init.Dirty() send = true case event, ok := <-ch: @@ -263,24 +259,16 @@ func (obj *MountRes) Watch() error { obj.init.Logf("event: %+v", event) } - obj.init.Dirty() send = true - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } // do all our event sending all together to avoid duplicate msgs if send { send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } + obj.init.Event() // notify engine of an event (this can block) } } } diff --git a/engine/resources/msg.go b/engine/resources/msg.go index 1588bf37..6b82d541 100644 --- a/engine/resources/msg.go +++ b/engine/resources/msg.go @@ -94,30 +94,20 @@ func (obj *MsgRes) Close() error { // Watch is the primary listener for this resource and it outputs events. func (obj *MsgRes) Watch() error { - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running - var send = false // send event? + //var send = false // send event? for { select { - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } // do all our event sending all together to avoid duplicate msgs - if send { - send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } - } + //if send { + // send = false + // obj.init.Event() // notify engine of an event (this can block) + //} } } @@ -137,7 +127,7 @@ func (obj *MsgRes) isAllStateOK() bool { func (obj *MsgRes) updateStateOK() { // XXX: this resource doesn't entirely make sense to me at the moment. if !obj.isAllStateOK() { - obj.init.Dirty() + //obj.init.Dirty() // XXX: removed with API cleanup } } diff --git a/engine/resources/net.go b/engine/resources/net.go index 20279d4d..1ab58f22 100644 --- a/engine/resources/net.go +++ b/engine/resources/net.go @@ -247,10 +247,7 @@ func (obj *NetRes) Watch() error { } }() - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running var send = false // send event? var done bool @@ -272,7 +269,6 @@ func (obj *NetRes) Watch() error { } send = true - obj.init.Dirty() // dirty case event, ok := <-recWatcher.Events(): if !ok { @@ -290,23 +286,15 @@ func (obj *NetRes) Watch() error { } send = true - obj.init.Dirty() // dirty - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } // do all our event sending all together to avoid duplicate msgs if send { send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } + obj.init.Event() // notify engine of an event (this can block) } } } diff --git a/engine/resources/noop.go b/engine/resources/noop.go index ddfadf5a..990d0df1 100644 --- a/engine/resources/noop.go +++ b/engine/resources/noop.go @@ -63,31 +63,15 @@ func (obj *NoopRes) Close() error { // Watch is the primary listener for this resource and it outputs events. func (obj *NoopRes) Watch() error { - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested + obj.init.Running() // when started, notify engine that we're running + + select { + case <-obj.init.Done: // closed by the engine to signal shutdown } - var send = false // send event? - for { - select { - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } - } + //obj.init.Event() // notify engine of an event (this can block) - // do all our event sending all together to avoid duplicate msgs - if send { - send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } - } - } + return nil } // CheckApply method for Noop resource. Does nothing, returns happy! diff --git a/engine/resources/nspawn.go b/engine/resources/nspawn.go index cbd473ad..31b03708 100644 --- a/engine/resources/nspawn.go +++ b/engine/resources/nspawn.go @@ -167,10 +167,7 @@ func (obj *NspawnRes) Watch() error { bus.Signal(busChan) defer bus.RemoveSignal(busChan) // not needed here, but nice for symmetry - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running var send = false // send event? for { @@ -187,24 +184,16 @@ func (obj *NspawnRes) Watch() error { return fmt.Errorf("unknown event: %s", event.Name) } send = true - obj.init.Dirty() // dirty } - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } // do all our event sending all together to avoid duplicate msgs if send { send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } + obj.init.Event() // notify engine of an event (this can block) } } } diff --git a/engine/resources/password.go b/engine/resources/password.go index 2676a583..ef28b3bc 100644 --- a/engine/resources/password.go +++ b/engine/resources/password.go @@ -182,10 +182,7 @@ func (obj *PasswordRes) Watch() error { } defer obj.recWatcher.Close() - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running var send = false // send event? for { @@ -199,23 +196,15 @@ func (obj *PasswordRes) Watch() error { return errwrap.Wrapf(err, "unknown %s watcher error", obj) } send = true - obj.init.Dirty() // dirty - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } // do all our event sending all together to avoid duplicate msgs if send { send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } + obj.init.Event() // notify engine of an event (this can block) } } } diff --git a/engine/resources/pkg.go b/engine/resources/pkg.go index 33a87268..c1c84d8a 100644 --- a/engine/resources/pkg.go +++ b/engine/resources/pkg.go @@ -67,7 +67,7 @@ type PkgRes struct { // Default returns some sensible defaults for this resource. func (obj *PkgRes) Default() engine.Res { return &PkgRes{ - State: PkgStateInstalled, // i think this is preferable to "newest" + State: PkgStateInstalled, // this *is* preferable to "newest" } } @@ -121,10 +121,7 @@ func (obj *PkgRes) Watch() error { return errwrap.Wrapf(err, "error adding signal match") } - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running var send = false // send event? for { @@ -146,23 +143,15 @@ func (obj *PkgRes) Watch() error { } send = true - obj.init.Dirty() // dirty - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } // do all our event sending all together to avoid duplicate msgs if send { send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } + obj.init.Event() // notify engine of an event (this can block) } } } diff --git a/engine/resources/print.go b/engine/resources/print.go index dac863f0..55fbcafe 100644 --- a/engine/resources/print.go +++ b/engine/resources/print.go @@ -66,31 +66,15 @@ func (obj *PrintRes) Close() error { // Watch is the primary listener for this resource and it outputs events. func (obj *PrintRes) Watch() error { - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested + obj.init.Running() // when started, notify engine that we're running + + select { + case <-obj.init.Done: // closed by the engine to signal shutdown } - var send = false // send event? - for { - select { - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } - } + //obj.init.Event() // notify engine of an event (this can block) - // do all our event sending all together to avoid duplicate msgs - if send { - send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } - } - } + return nil } // CheckApply method for Print resource. Does nothing, returns happy! diff --git a/engine/resources/resources_test.go b/engine/resources/resources_test.go index 66f9d706..8c594db7 100644 --- a/engine/resources/resources_test.go +++ b/engine/resources/resources_test.go @@ -28,7 +28,6 @@ import ( "time" "github.com/purpleidea/mgmt/engine" - "github.com/purpleidea/mgmt/engine/event" "github.com/purpleidea/mgmt/util" ) @@ -220,36 +219,33 @@ func TestResources1(t *testing.T) { readyChan := make(chan struct{}) eventChan := make(chan struct{}) - eventsChan := make(chan *event.Msg) + doneChan := make(chan struct{}) debug := testing.Verbose() // set via the -test.v flag to `go test` logf := func(format string, v ...interface{}) { t.Logf(fmt.Sprintf("test #%d: Res: ", index)+format, v...) } init := &engine.Init{ - Running: func() error { + Running: func() { close(readyChan) select { // this always sends one! case eventChan <- struct{}{}: } - return nil }, // Watch runs this to send a changed event. - Event: func() error { + Event: func() { select { case eventChan <- struct{}{}: } - return nil }, // Watch listens on this for close/pause events. - Events: eventsChan, - Debug: debug, - Logf: logf, + Done: doneChan, + Debug: debug, + Logf: logf, // unused - Dirty: func() {}, Recv: func() map[string]*engine.Send { return map[string]*engine.Send{} }, @@ -341,7 +337,7 @@ func TestResources1(t *testing.T) { } } t.Logf("test #%d: shutting down Watch", index) - close(eventsChan) // send Watch shutdown command + close(doneChan) // send Watch shutdown command }() Loop: for { diff --git a/engine/resources/svc.go b/engine/resources/svc.go index b0e2107c..77eedc79 100644 --- a/engine/resources/svc.go +++ b/engine/resources/svc.go @@ -120,10 +120,7 @@ func (obj *SvcRes) Watch() error { bus.Signal(buschan) defer bus.RemoveSignal(buschan) // not needed here, but nice for symmetry - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running var svc = fmt.Sprintf("%s.service", obj.Name()) // systemd name var send = false // send event? @@ -161,7 +158,6 @@ func (obj *SvcRes) Watch() error { if previous != invalid { // if invalid changed, send signal send = true - obj.init.Dirty() // dirty } if invalid { @@ -176,13 +172,8 @@ func (obj *SvcRes) Watch() error { // loop so that we can see the changed invalid signal obj.init.Logf("daemon reload") - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } } else { if !activeSet { @@ -220,26 +211,18 @@ func (obj *SvcRes) Watch() error { obj.init.Logf("stopped") } send = true - obj.init.Dirty() // dirty case err := <-subErrors: return errwrap.Wrapf(err, "unknown %s error", obj) - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } } if send { send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } + obj.init.Event() // notify engine of an event (this can block) } } } diff --git a/engine/resources/test.go b/engine/resources/test.go index daece3b2..7594d089 100644 --- a/engine/resources/test.go +++ b/engine/resources/test.go @@ -125,31 +125,15 @@ func (obj *TestRes) Close() error { // Watch is the primary listener for this resource and it outputs events. func (obj *TestRes) Watch() error { - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested + obj.init.Running() // when started, notify engine that we're running + + select { + case <-obj.init.Done: // closed by the engine to signal shutdown } - var send = false // send event? - for { - select { - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } - } + //obj.init.Event() // notify engine of an event (this can block) - // do all our event sending all together to avoid duplicate msgs - if send { - send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } - } - } + return nil } // CheckApply method for Test resource. Does nothing, returns happy! diff --git a/engine/resources/timer.go b/engine/resources/timer.go index d5308c52..58738a6e 100644 --- a/engine/resources/timer.go +++ b/engine/resources/timer.go @@ -75,10 +75,7 @@ func (obj *TimerRes) Watch() error { obj.ticker = obj.newTicker() defer obj.ticker.Stop() - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running var send = false // send event? for { @@ -87,20 +84,13 @@ func (obj *TimerRes) Watch() error { send = true obj.init.Logf("received tick") - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } if send { send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } + obj.init.Event() // notify engine of an event (this can block) } } } diff --git a/engine/resources/user.go b/engine/resources/user.go index b13a537a..1508bf53 100644 --- a/engine/resources/user.go +++ b/engine/resources/user.go @@ -119,10 +119,7 @@ func (obj *UserRes) Watch() error { } defer obj.recWatcher.Close() - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running var send = false // send event? for { @@ -142,23 +139,15 @@ func (obj *UserRes) Watch() error { obj.init.Logf("Event(%s): %v", event.Body.Name, event.Body.Op) } send = true - obj.init.Dirty() // dirty - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } // do all our event sending all together to avoid duplicate msgs if send { send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } + obj.init.Event() // notify engine of an event (this can block) } } } diff --git a/engine/resources/virt.go b/engine/resources/virt.go index 5ae96ae6..500166a3 100644 --- a/engine/resources/virt.go +++ b/engine/resources/virt.go @@ -326,10 +326,7 @@ func (obj *VirtRes) Watch() error { } defer obj.conn.DomainEventDeregister(gaCallbackID) - // notify engine that we're running - if err := obj.init.Running(); err != nil { - return err // exit if requested - } + obj.init.Running() // when started, notify engine that we're running var send = false // send event? for { @@ -340,31 +337,26 @@ func (obj *VirtRes) Watch() error { switch event { case libvirt.DOMAIN_EVENT_DEFINED: if obj.Transient { - obj.init.Dirty() // dirty send = true } case libvirt.DOMAIN_EVENT_UNDEFINED: if !obj.Transient { - obj.init.Dirty() // dirty send = true } case libvirt.DOMAIN_EVENT_STARTED: fallthrough case libvirt.DOMAIN_EVENT_RESUMED: if obj.State != "running" { - obj.init.Dirty() // dirty send = true } case libvirt.DOMAIN_EVENT_SUSPENDED: if obj.State != "paused" { - obj.init.Dirty() // dirty send = true } case libvirt.DOMAIN_EVENT_STOPPED: fallthrough case libvirt.DOMAIN_EVENT_SHUTDOWN: if obj.State != "shutoff" { - obj.init.Dirty() // dirty send = true } processExited = true @@ -375,7 +367,6 @@ func (obj *VirtRes) Watch() error { // verify, detect and patch appropriately! fallthrough case libvirt.DOMAIN_EVENT_CRASHED: - obj.init.Dirty() // dirty send = true processExited = true // FIXME: is this okay for PMSUSPENDED ? } @@ -390,7 +381,6 @@ func (obj *VirtRes) Watch() error { if state == libvirt.CONNECT_DOMAIN_EVENT_AGENT_LIFECYCLE_STATE_CONNECTED { obj.guestAgentConnected = true - obj.init.Dirty() // dirty send = true obj.init.Logf("Guest agent connected") @@ -409,21 +399,14 @@ func (obj *VirtRes) Watch() error { case err := <-errorChan: return fmt.Errorf("unknown %s libvirt error: %s", obj, err) - case event, ok := <-obj.init.Events: - if !ok { - return nil - } - if err := obj.init.Read(event); err != nil { - return err - } + case <-obj.init.Done: // closed by the engine to signal shutdown + return nil } // do all our event sending all together to avoid duplicate msgs if send { send = false - if err := obj.init.Event(); err != nil { - return err // exit if requested - } + obj.init.Event() // notify engine of an event (this can block) } } } diff --git a/engine/reverse.go b/engine/reverse.go new file mode 100644 index 00000000..61befbf3 --- /dev/null +++ b/engine/reverse.go @@ -0,0 +1,65 @@ +// Mgmt +// Copyright (C) 2013-2018+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package engine + +import ( + "fmt" +) + +// ReversibleRes is an interface that a resource can implement if it wants to +// have some resource run when it disappears. A disappearance happens when a +// resource is defined in one instance of the graph, and is gone in the +// subsequent one. This is helpful for building robust programs with the engine. +// Default implementations for most of the methods declared in this interface +// can be obtained for your resource by anonymously adding the traits.Reversible +// struct to your resource implementation. +type ReversibleRes interface { + Res + + // ReversibleMeta lets you get or set meta params for the reversible + // trait. + ReversibleMeta() *ReversibleMeta + + // SetReversibleMeta lets you set all of the meta params for the + // reversible trait in a single call. + SetReversibleMeta(*ReversibleMeta) + + // Reversed returns the "reverse" or "reciprocal" resource. This is used + // to "clean" up after a previously defined resource has been removed. + // Interestingly, this returns the core Res interface instead of a + // ReversibleRes, because there is no requirement that the reverse of a + // Res be the same kind of Res, and the reverse might not be reversible! + Reversed() (Res, error) +} + +// ReversibleMeta provides some parameters specific to reversible resources. +type ReversibleMeta struct { + // Disabled specifies that reversing should be disabled for this + // resource. + Disabled bool + + // TODO: add options here, including whether to reverse edges, etc... +} + +// Cmp compares two ReversibleMeta structs and determines if they're equivalent. +func (obj *ReversibleMeta) Cmp(rm *ReversibleMeta) error { + if obj.Disabled != rm.Disabled { + return fmt.Errorf("values for Disabled are different") + } + return nil +} diff --git a/engine/traits/reverse.go b/engine/traits/reverse.go new file mode 100644 index 00000000..0ae44e13 --- /dev/null +++ b/engine/traits/reverse.go @@ -0,0 +1,48 @@ +// Mgmt +// Copyright (C) 2013-2018+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package traits + +import ( + "github.com/purpleidea/mgmt/engine" +) + +// Reversible contains a general implementation with most of the properties and +// methods needed to support reversing resources. It may be used as a starting +// point to avoid re-implementing the straightforward methods. +type Reversible struct { + meta *engine.ReversibleMeta + + // Bug5819 works around issue https://github.com/golang/go/issues/5819 + Bug5819 interface{} // XXX: workaround +} + +// ReversibleMeta lets you get or set meta params for the reversing trait. +func (obj *Reversible) ReversibleMeta() *engine.ReversibleMeta { + if obj.meta == nil { // set the defaults if previously empty + obj.meta = &engine.ReversibleMeta{ + Disabled: true, // by default we're disabled + } + } + return obj.meta +} + +// SetReversibleMeta lets you set all of the meta params for the reversing trait +// in a single call. +func (obj *Reversible) SetReversibleMeta(meta *engine.ReversibleMeta) { + obj.meta = meta +} diff --git a/lang/interpret_test.go b/lang/interpret_test.go index b5644d07..a7446877 100644 --- a/lang/interpret_test.go +++ b/lang/interpret_test.go @@ -1387,13 +1387,15 @@ func TestAstInterpret0(t *testing.T) { stringptr := "this is meta" x.StringPtr = &stringptr m := &engine.MetaParams{ - Noop: true, // overwritten - Retry: -1, - Delay: 0, - Poll: 5, - Limit: 4.2, - Burst: 3, - Sema: []string{"foo:1", "bar:3"}, + Noop: true, // overwritten + Retry: -1, + Delay: 0, + Poll: 5, + Limit: 4.2, + Burst: 3, + Sema: []string{"foo:1", "bar:3"}, + Rewatch: false, + Realize: true, } x.SetMetaParams(m) graph.AddVertex(t1) @@ -1411,10 +1413,14 @@ func TestAstInterpret0(t *testing.T) { limit => 4.2, burst => 3, sema => ["foo:1", "bar:3",], + rewatch => false, + realize => true, + reverse => true, autoedge => true, autogroup => true, }, Meta:noop => true, + Meta:reverse => true, Meta:autoedge => true, Meta:autogroup => true, } diff --git a/lang/interpret_test/TestAstFunc1/metaparams0.graph b/lang/interpret_test/TestAstFunc1/metaparams0.graph index a045351a..599993bc 100644 --- a/lang/interpret_test/TestAstFunc1/metaparams0.graph +++ b/lang/interpret_test/TestAstFunc1/metaparams0.graph @@ -1,20 +1,26 @@ -Edge: bool(false) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); autoedge: bool(true); autogroup: bool(true)) # noop -Edge: bool(true) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); autoedge: bool(true); autogroup: bool(true)) # autoedge -Edge: bool(true) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); autoedge: bool(true); autogroup: bool(true)) # autogroup +Edge: bool(false) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); rewatch: bool(false); realize: bool(true); reverse: bool(true); autoedge: bool(true); autogroup: bool(true)) # noop +Edge: bool(false) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); rewatch: bool(false); realize: bool(true); reverse: bool(true); autoedge: bool(true); autogroup: bool(true)) # rewatch +Edge: bool(true) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); rewatch: bool(false); realize: bool(true); reverse: bool(true); autoedge: bool(true); autogroup: bool(true)) # autoedge +Edge: bool(true) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); rewatch: bool(false); realize: bool(true); reverse: bool(true); autoedge: bool(true); autogroup: bool(true)) # autogroup +Edge: bool(true) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); rewatch: bool(false); realize: bool(true); reverse: bool(true); autoedge: bool(true); autogroup: bool(true)) # realize +Edge: bool(true) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); rewatch: bool(false); realize: bool(true); reverse: bool(true); autoedge: bool(true); autogroup: bool(true)) # reverse Edge: bool(true) -> var(b) # b Edge: bool(true) -> var(b) # b -Edge: float(4.2) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); autoedge: bool(true); autogroup: bool(true)) # limit -Edge: int(-1) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); autoedge: bool(true); autogroup: bool(true)) # retry -Edge: int(0) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); autoedge: bool(true); autogroup: bool(true)) # delay -Edge: int(3) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); autoedge: bool(true); autogroup: bool(true)) # burst -Edge: int(5) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); autoedge: bool(true); autogroup: bool(true)) # poll -Edge: list(str(foo:1), str(bar:3)) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); autoedge: bool(true); autogroup: bool(true)) # sema +Edge: float(4.2) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); rewatch: bool(false); realize: bool(true); reverse: bool(true); autoedge: bool(true); autogroup: bool(true)) # limit +Edge: int(-1) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); rewatch: bool(false); realize: bool(true); reverse: bool(true); autoedge: bool(true); autogroup: bool(true)) # retry +Edge: int(0) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); rewatch: bool(false); realize: bool(true); reverse: bool(true); autoedge: bool(true); autogroup: bool(true)) # delay +Edge: int(3) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); rewatch: bool(false); realize: bool(true); reverse: bool(true); autoedge: bool(true); autogroup: bool(true)) # burst +Edge: int(5) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); rewatch: bool(false); realize: bool(true); reverse: bool(true); autoedge: bool(true); autogroup: bool(true)) # poll +Edge: list(str(foo:1), str(bar:3)) -> struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); rewatch: bool(false); realize: bool(true); reverse: bool(true); autoedge: bool(true); autogroup: bool(true)) # sema Edge: str(bar:3) -> list(str(foo:1), str(bar:3)) # 1 Edge: str(foo:1) -> list(str(foo:1), str(bar:3)) # 0 Edge: str(hello world) -> call:fmt.printf(str(hello world)) # a Vertex: bool(false) Vertex: bool(false) Vertex: bool(false) +Vertex: bool(false) +Vertex: bool(true) +Vertex: bool(true) Vertex: bool(true) Vertex: bool(true) Vertex: bool(true) @@ -32,6 +38,6 @@ Vertex: str(bar:3) Vertex: str(foo:1) Vertex: str(greeting) Vertex: str(hello world) -Vertex: struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); autoedge: bool(true); autogroup: bool(true)) +Vertex: struct(noop: bool(false); retry: int(-1); delay: int(0); poll: int(5); limit: float(4.2); burst: int(3); sema: list(str(foo:1), str(bar:3)); rewatch: bool(false); realize: bool(true); reverse: bool(true); autoedge: bool(true); autogroup: bool(true)) Vertex: var(b) Vertex: var(b) diff --git a/lang/interpret_test/TestAstFunc1/metaparams0/main.mcl b/lang/interpret_test/TestAstFunc1/metaparams0/main.mcl index e3f7ccbe..167f10d4 100644 --- a/lang/interpret_test/TestAstFunc1/metaparams0/main.mcl +++ b/lang/interpret_test/TestAstFunc1/metaparams0/main.mcl @@ -13,6 +13,9 @@ test "greeting" { limit => 4.2, burst => 3, sema => ["foo:1", "bar:3",], + rewatch => false, + realize => true, + reverse => true, autoedge => true, autogroup => true, }, diff --git a/lang/structs.go b/lang/structs.go index e41af842..b85fec22 100644 --- a/lang/structs.go +++ b/lang/structs.go @@ -638,6 +638,10 @@ func (obj *StmtRes) edges(resName string) ([]*interfaces.Edge, error) { func (obj *StmtRes) metaparams(res engine.Res) error { meta := engine.DefaultMetaParams.Copy() // defaults + var rm *engine.ReversibleMeta + if r, ok := res.(engine.ReversibleRes); ok { + rm = r.ReversibleMeta() // get a struct with the defaults + } var aem *engine.AutoEdgeMeta if r, ok := res.(engine.EdgeableRes); ok { aem = r.AutoEdgeMeta() // get a struct with the defaults @@ -706,6 +710,21 @@ func (obj *StmtRes) metaparams(res engine.Res) error { } meta.Sema = values + case "rewatch": + meta.Rewatch = v.Bool() // must not panic + + case "realize": + meta.Realize = v.Bool() // must not panic + + case "reverse": + if v.Type().Cmp(types.TypeBool) == nil { + if rm != nil { + rm.Disabled = !v.Bool() // must not panic + } + } else { + // TODO: read values from struct into rm.XXX + } + case "autoedge": if aem != nil { aem.Disabled = !v.Bool() // must not panic @@ -752,6 +771,19 @@ func (obj *StmtRes) metaparams(res engine.Res) error { } meta.Sema = values } + if val, exists := v.Struct()["rewatch"]; exists { + meta.Rewatch = val.Bool() // must not panic + } + if val, exists := v.Struct()["realize"]; exists { + meta.Realize = val.Bool() // must not panic + } + if val, exists := v.Struct()["reverse"]; exists && rm != nil { + if val.Type().Cmp(types.TypeBool) == nil { + rm.Disabled = !val.Bool() // must not panic + } else { + // TODO: read values from struct into rm.XXX + } + } if val, exists := v.Struct()["autoedge"]; exists && aem != nil { aem.Disabled = !val.Bool() // must not panic } @@ -765,6 +797,9 @@ func (obj *StmtRes) metaparams(res engine.Res) error { } res.SetMetaParams(meta) // set it! + if r, ok := res.(engine.ReversibleRes); ok { + r.SetReversibleMeta(rm) // set + } if r, ok := res.(engine.EdgeableRes); ok { r.SetAutoEdgeMeta(aem) // set } @@ -1139,6 +1174,9 @@ func (obj *StmtResMeta) Init(data *interfaces.Data) error { case "limit": case "burst": case "sema": + case "rewatch": + case "realize": + case "reverse": case "autoedge": case "autogroup": case MetaField: @@ -1225,50 +1263,83 @@ func (obj *StmtResMeta) Unify(kind string) ([]interfaces.Invariant, error) { } // add additional invariants based on what's in obj.Property !!! - var typ *types.Type + var invar interfaces.Invariant + static := func(typ *types.Type) interfaces.Invariant { + return &unification.EqualsInvariant{ + Expr: obj.MetaExpr, + Type: typ, + } + } switch p := strings.ToLower(obj.Property); p { // TODO: we could add these fields dynamically if we were fancy! case "noop": - typ = types.TypeBool + invar = static(types.TypeBool) case "retry": - typ = types.TypeInt + invar = static(types.TypeInt) case "delay": - typ = types.TypeInt + invar = static(types.TypeInt) case "poll": - typ = types.TypeInt + invar = static(types.TypeInt) case "limit": // rate.Limit - typ = types.TypeFloat + invar = static(types.TypeFloat) case "burst": - typ = types.TypeInt + invar = static(types.TypeInt) case "sema": - typ = types.NewType("[]str") + invar = static(types.NewType("[]str")) + + case "rewatch": + invar = static(types.TypeBool) + + case "realize": + invar = static(types.TypeBool) + + case "reverse": + ors := []interfaces.Invariant{} + + invarBool := static(types.TypeBool) + ors = append(ors, invarBool) + + // TODO: decide what fields we might want here + //invarStruct := static(types.NewType("struct{edges str}")) + //ors = append(ors, invarStruct) + + invar = &unification.ExclusiveInvariant{ + Invariants: ors, // one and only one of these should be true + } case "autoedge": - typ = types.TypeBool + invar = static(types.TypeBool) case "autogroup": - typ = types.TypeBool + invar = static(types.TypeBool) // autoedge and autogroup aren't part of the `MetaRes` interface, but we // can merge them in here for simplicity in the public user interface... case MetaField: // FIXME: allow partial subsets of this struct, and in any order // FIXME: we might need an updated unification engine to do this - typ = types.NewType("struct{noop bool; retry int; delay int; poll int; limit float; burst int; sema []str; autoedge bool; autogroup bool}") + wrap := func(reverse *types.Type) *types.Type { + return types.NewType(fmt.Sprintf("struct{noop bool; retry int; delay int; poll int; limit float; burst int; sema []str; rewatch bool; realize bool; reverse %s; autoedge bool; autogroup bool}", reverse.String())) + } + ors := []interfaces.Invariant{} + invarBool := static(wrap(types.TypeBool)) + ors = append(ors, invarBool) + // TODO: decide what fields we might want here + //invarStruct := static(wrap(types.NewType("struct{edges str}"))) + //ors = append(ors, invarStruct) + invar = &unification.ExclusiveInvariant{ + Invariants: ors, // one and only one of these should be true + } default: return nil, fmt.Errorf("unknown property: %s", p) } - invar := &unification.EqualsInvariant{ - Expr: obj.MetaExpr, - Type: typ, - } invariants = append(invariants, invar) return invariants, nil diff --git a/lib/main.go b/lib/main.go index 86867826..233ee8a9 100644 --- a/lib/main.go +++ b/lib/main.go @@ -647,8 +647,10 @@ func (obj *Main) Run() error { // Start needs to be synchronous because we don't want // to loop around and cause a pause before we unpaused. - if err := obj.ge.Start(); err != nil { // sync - Logf("error starting graph: %+v", err) + // Commit already starts things, but we still need to + // resume anything that was pre-existing and was paused. + if err := obj.ge.Resume(); err != nil { // sync + Logf("error resuming graph: %+v", err) continue } converger.Resume() // after Start() diff --git a/pgraph/graphsync.go b/pgraph/graphsync.go index 6e1c207a..8a6c553b 100644 --- a/pgraph/graphsync.go +++ b/pgraph/graphsync.go @@ -40,10 +40,10 @@ func strEdgeCmpFn(e1, e2 Edge) (bool, error) { // GraphSync updates the Graph so that it matches the newGraph. It leaves // identical elements alone so that they don't need to be refreshed. // It tries to mutate existing elements into new ones, if they support this. -// This updates the Graph on success only. +// This updates the Graph on success only. If it fails, then the graph won't +// have been modified. // FIXME: should we do this with copies of the vertex resources? func (obj *Graph) GraphSync(newGraph *Graph, vertexCmpFn func(Vertex, Vertex) (bool, error), vertexAddFn func(Vertex) error, vertexRemoveFn func(Vertex) error, edgeCmpFn func(Edge, Edge) (bool, error)) error { - oldGraph := obj.Copy() // work on a copy of the old graph if oldGraph == nil { var err error @@ -69,8 +69,11 @@ func (obj *Graph) GraphSync(newGraph *Graph, vertexCmpFn func(Vertex, Vertex) (b var lookup = make(map[Vertex]Vertex) var vertexKeep []Vertex // list of vertices which are the same in new graph - var edgeKeep []Edge // list of vertices which are the same in new graph + var vertexDels []Vertex // list of vertices which are to be removed + var vertexAdds []Vertex // list of vertices which are to be added + var edgeKeep []Edge // list of edges which are the same in new graph + // XXX: run this as a topological sort or reverse topological sort? for v := range newGraph.Adjacency() { // loop through the vertices (resources) var vertex Vertex // step one, direct compare with res.Cmp @@ -92,27 +95,44 @@ func (obj *Graph) GraphSync(newGraph *Graph, vertexCmpFn func(Vertex, Vertex) (b // vertex = oldGraph.MutateMatch(res) //} + // run the removes BEFORE the adds, so don't do the add here... if vertex == nil { // no match found yet - if err := vertexAddFn(v); err != nil { - return errwrap.Wrapf(err, "vertexAddFn failed") - } + vertexAdds = append(vertexAdds, v) // append vertex = v - oldGraph.AddVertex(vertex) // call standalone in case not part of an edge } lookup[v] = vertex // used for constructing edges vertexKeep = append(vertexKeep, vertex) // append } - // get rid of any vertices we shouldn't keep (that aren't in new graph) for v := range oldGraph.Adjacency() { if !VertexContains(v, vertexKeep) { - if err := vertexRemoveFn(v); err != nil { - return errwrap.Wrapf(err, "vertexRemoveFn failed") - } - oldGraph.DeleteVertex(v) + vertexDels = append(vertexDels, v) // append } } + // see if any of the add/remove functions actually fail first + // XXX: run this as a reverse topological sort or topological sort? + for _, vertex := range vertexDels { + if err := vertexRemoveFn(vertex); err != nil { + return errwrap.Wrapf(err, "vertexRemoveFn failed") + } + } + for _, vertex := range vertexAdds { + if err := vertexAddFn(vertex); err != nil { + return errwrap.Wrapf(err, "vertexAddFn failed") + } + } + + // no add/remove functions failed, so we can actually modify the graph! + for _, vertex := range vertexDels { + oldGraph.DeleteVertex(vertex) + } + for _, vertex := range vertexAdds { + oldGraph.AddVertex(vertex) // call standalone in case not part of an edge + } + + // XXX: fixup this part so the CmpFn stuff fails early, and THEN we edit + // the graph at the end, if no errors happened... // compare edges for v1 := range newGraph.Adjacency() { // loop through the vertices (resources) for v2, e := range newGraph.Adjacency()[v1] {