resources: Overhaul legacy code around the resource API

This patch makes a number of changes in the engine surrounding the
resource API. In particular:

* Cleanup of send/read event.
* Cleanup of DoSend (now Event) in the Watch method.
* Events are now more consistently pointers.
* Exiting within Watch is now done in a single place.
* Multiple incoming events will be combined into a single action.
* Events in flight during an action are played back after CheckApply.
* Addition of Close method to API

This gets things ready for rate limiting and semaphore metaparams!
This commit is contained in:
James Shubin
2017-01-04 16:22:19 -05:00
parent 74435aac76
commit 51c83116a2
17 changed files with 268 additions and 222 deletions

View File

@@ -33,6 +33,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
* [Default - Get an empty resource with defaults](#default) * [Default - Get an empty resource with defaults](#default)
* [Validate - Validate the values of a resource struct](#validate) * [Validate - Validate the values of a resource struct](#validate)
* [Init - Initialize the resource](#init) * [Init - Initialize the resource](#init)
* [Close - Cleanup the resource](#close)
* [CheckApply - Check and apply resource state](#checkapply) * [CheckApply - Check and apply resource state](#checkapply)
* [Watch - Detect resource changes](#watch) * [Watch - Detect resource changes](#watch)
* [Compare - Compare resource with another](#compare) * [Compare - Compare resource with another](#compare)
@@ -133,6 +134,31 @@ this. In other words, you should expect `Validate` to have run first, but you
shouldn't allow `Init` to dangerously `rm -rf /$the_world` if your code only shouldn't allow `Init` to dangerously `rm -rf /$the_world` if your code only
checks `$the_world` in `Validate`. Remember to always program safely! checks `$the_world` in `Validate`. Remember to always program safely!
### Close
```golang
Close() error
```
This is called to cleanup after the resource. It is usually not necessary, but
can be useful if you'd like to properly close a persistent connection that you
opened in the `Init` method and were using throughout the resource.
#### Example
```golang
// Close runs some cleanup code for this resource.
func (obj *FooRes) Close() error {
obj.Conn.Close() // ignore error in this case
return obj.BaseRes.Close() // call base close, b/c we're overriding
}
```
You should probably check the return errors of your internal methods, and pass
on an error if something went wrong. Remember to always call the base `Close`
method! If you plan to return early if you hit an internal error, then at least
call it with a defer!
### CheckApply ### CheckApply
```golang ```golang
CheckApply(apply bool) (checkOK bool, err error) CheckApply(apply bool) (checkOK bool, err error)
@@ -210,12 +236,12 @@ will likely find the state to now be correct.
### Watch ### Watch
```golang ```golang
Watch(chan Event) error Watch(chan *Event) error
``` ```
`Watch` is a main loop that runs and sends messages when it detects that the `Watch` is a main loop that runs and sends messages when it detects that the
state of the resource might have changed. To send a message you should write to state of the resource might have changed. To send a message you should write to
the input `Event` channel using the `DoSend` helper method. The Watch function the input event channel using the `Event` helper method. The Watch function
should run continuously until a shutdown message is received. If at any time should run continuously until a shutdown message is received. If at any time
something goes wrong, you should return an error, and the `mgmt` engine will something goes wrong, you should return an error, and the `mgmt` engine will
handle possibly restarting the main loop based on the `retry` meta parameters. handle possibly restarting the main loop based on the `retry` meta parameters.
@@ -250,7 +276,7 @@ itself!
If we receive an internal event from the `<-obj.Events()` method, we can read it If we receive an internal event from the `<-obj.Events()` method, we can read it
with the ReadEvent helper function. This function tells us if we should shutdown with the ReadEvent helper function. This function tells us if we should shutdown
our resource, and if we should generate an event. When we want to send an event, our resource, and if we should generate an event. When we want to send an event,
we use the `DoSend` helper function. It is also important to mark the resource we use the `Event` helper function. It is also important to mark the resource
state as `dirty` if we believe it might have changed. We do this with the state as `dirty` if we believe it might have changed. We do this with the
`StateOK(false)` function. `StateOK(false)` function.
@@ -278,7 +304,7 @@ thing, but provide a `select`-free interface for different coding situations.
#### Example #### Example
```golang ```golang
// Watch is the listener and main loop for this resource. // Watch is the listener and main loop for this resource.
func (obj *FooRes) Watch(processChan chan event.Event) error { func (obj *FooRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status cuid := obj.ConvergerUID() // get the converger uid used to report status
// setup the Foo resource // setup the Foo resource
@@ -294,15 +320,15 @@ func (obj *FooRes) Watch(processChan chan event.Event) error {
} }
var send = false // send event? var send = false // send event?
var exit = false var exit *error
for { for {
obj.SetState(ResStateWatching) // reset obj.SetState(ResStateWatching) // reset
select { select {
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false) cuid.SetConverged(false)
// we avoid sending events on unpause // we avoid sending events on unpause
if exit, send = obj.ReadEvent(&event); exit { if exit, send = obj.ReadEvent(event); exit != nil {
return nil // exit return *exit // exit
} }
// the actual events! // the actual events!
@@ -326,9 +352,7 @@ func (obj *FooRes) Watch(processChan chan event.Event) error {
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
send = false send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil { obj.Event(processChan)
return err // we exit or bubble up a NACK...
}
} }
} }
} }

View File

@@ -46,8 +46,7 @@ type Event struct {
Name EventName Name EventName
Resp Resp // channel to send an ack response on, nil to skip Resp Resp // channel to send an ack response on, nil to skip
//Wg *sync.WaitGroup // receiver barrier to Wait() for everyone else on //Wg *sync.WaitGroup // receiver barrier to Wait() for everyone else on
Msg string // some words for fun Err error // store an error in our event
Activity bool // did something interesting happen?
} }
// ACK sends a single acknowledgement on the channel if one was requested. // ACK sends a single acknowledgement on the channel if one was requested.
@@ -80,7 +79,7 @@ func NewResp() Resp {
// ACK sends a true value to resp. // ACK sends a true value to resp.
func (resp Resp) ACK() { func (resp Resp) ACK() {
if resp != nil { if resp != nil {
resp <- nil resp <- nil // TODO: close instead?
} }
} }
@@ -114,7 +113,7 @@ func (resp Resp) ACKWait() {
} }
} }
// GetActivity returns the activity value. // Error returns the stored error value.
func (event *Event) GetActivity() bool { func (event *Event) Error() error {
return event.Activity return event.Err
} }

View File

@@ -27,6 +27,7 @@ import (
"github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/resources" "github.com/purpleidea/mgmt/resources"
multierr "github.com/hashicorp/go-multierror"
errwrap "github.com/pkg/errors" errwrap "github.com/pkg/errors"
) )
@@ -76,11 +77,9 @@ func (g *Graph) Poke(v *Vertex, activity bool) error {
wg.Add(1) wg.Add(1)
go func(nn *Vertex) error { go func(nn *Vertex) error {
defer wg.Done() defer wg.Done()
edge := g.Adjacency[v][nn] // lookup //edge := g.Adjacency[v][nn] // lookup
notify := edge.Notify && edge.Refresh() //notify := edge.Notify && edge.Refresh()
nn.SendEvent(event.EventPoke, nil)
// FIXME: is it okay that this is sync?
nn.SendEvent(event.EventPoke, true, notify)
// TODO: check return value? // TODO: check return value?
return nil // never error for now... return nil // never error for now...
}(n) }(n)
@@ -110,8 +109,7 @@ func (g *Graph) BackPoke(v *Vertex) {
if g.Flags.Debug { if g.Flags.Debug {
log.Printf("%s[%s]: BackPoke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) log.Printf("%s[%s]: BackPoke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
} }
// FIXME: is it okay that this is sync? n.SendEvent(event.EventBackPoke, nil)
n.SendEvent(event.EventBackPoke, true, false)
} else { } else {
if g.Flags.Debug { if g.Flags.Debug {
log.Printf("%s[%s]: BackPoke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) log.Printf("%s[%s]: BackPoke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName())
@@ -311,45 +309,53 @@ func (g *Graph) Worker(v *Vertex) error {
// the Watch() function about which graph it is // the Watch() function about which graph it is
// running on, which isolates things nicely... // running on, which isolates things nicely...
obj := v.Res obj := v.Res
// TODO: is there a better system for the `Watching` flag? obj.SetWorking(true) // gets set to false in Res.Close() method at end...
obj.SetWatching(true)
defer obj.SetWatching(false) lock := &sync.Mutex{} // lock around processChan closing and sending
processChan := make(chan event.Event) finished := false // did we close processChan ?
processChan := make(chan *event.Event)
go func() { go func() {
running := false running := false
done := make(chan struct{})
playback := false // do we need to run another one?
waiting := false
var timer = time.NewTimer(time.Duration(math.MaxInt64)) // longest duration var timer = time.NewTimer(time.Duration(math.MaxInt64)) // longest duration
if !timer.Stop() { if !timer.Stop() {
<-timer.C // unnecessary, shouldn't happen <-timer.C // unnecessary, shouldn't happen
} }
var delay = time.Duration(v.Meta().Delay) * time.Millisecond var delay = time.Duration(v.Meta().Delay) * time.Millisecond
var retry = v.Meta().Retry // number of tries left, -1 for infinite var retry = v.Meta().Retry // number of tries left, -1 for infinite
var saved event.Event
Loop: Loop:
for { for {
// this has to be synchronous, because otherwise the Res // this has to be synchronous, because otherwise the Res
// event loop will keep running and change state, // event loop will keep running and change state,
// causing the converged timeout to fire! // causing the converged timeout to fire!
select { select {
case event, ok := <-processChan: // must use like this case ev, ok := <-processChan: // must use like this
if running && ok {
// we got an event that wasn't a close,
// while we were waiting for the timer!
// if this happens, it might be a bug:(
log.Fatalf("%s[%s]: Worker: Unexpected event: %+v", v.Kind(), v.GetName(), event)
}
if !ok { // processChan closed, let's exit if !ok { // processChan closed, let's exit
break Loop // no event, so no ack! break Loop // no event, so no ack!
} }
// the above mentioned synchronous part, is the // if running, we skip running a new execution!
// running of this function, paired with an ack. // if waiting, we skip running a new execution!
if running || waiting {
playback = true
ev.ACK() // ready for next message
continue
}
running = true
go func(ev *event.Event) {
if e := g.Process(v); e != nil { if e := g.Process(v); e != nil {
saved = event playback = true
log.Printf("%s[%s]: CheckApply errored: %v", v.Kind(), v.GetName(), e) log.Printf("%s[%s]: CheckApply errored: %v", v.Kind(), v.GetName(), e)
if retry == 0 { if retry == 0 {
// wrap the error in the sentinel // wrap the error in the sentinel
event.ACKNACK(&SentinelErr{e}) // fail the Watch() v.SendEvent(event.EventExit, &SentinelErr{e})
break Loop return
} }
if retry > 0 { // don't decrement the -1 if retry > 0 { // don't decrement the -1
retry-- retry--
@@ -357,22 +363,44 @@ func (g *Graph) Worker(v *Vertex) error {
log.Printf("%s[%s]: CheckApply: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), delay.Seconds(), retry) log.Printf("%s[%s]: CheckApply: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), delay.Seconds(), retry)
// start the timer... // start the timer...
timer.Reset(delay) timer.Reset(delay)
running = true waiting = true // waiting for retry timer
continue return
} }
retry = v.Meta().Retry // reset on success retry = v.Meta().Retry // reset on success
event.ACK() // sync close(done) // trigger
}(ev)
ev.ACK() // sync (now mostly useless)
case <-timer.C: case <-timer.C:
waiting = false
if !timer.Stop() { if !timer.Stop() {
//<-timer.C // blocks, docs are wrong! //<-timer.C // blocks, docs are wrong!
} }
running = false
log.Printf("%s[%s]: CheckApply delay expired!", v.Kind(), v.GetName()) log.Printf("%s[%s]: CheckApply delay expired!", v.Kind(), v.GetName())
// re-send this failed event, to trigger a CheckApply() close(done)
go func() { processChan <- saved }()
// TODO: should we send a fake event instead? // a CheckApply run (with possibly retry pause) finished
//saved = nil case <-done:
if g.Flags.Debug {
log.Printf("%s[%s]: CheckApply finished!", v.Kind(), v.GetName())
}
done = make(chan struct{}) // reset
// re-send this event, to trigger a CheckApply()
if playback {
playback = false
// this lock avoids us sending to
// channel after we've closed it!
lock.Lock()
go func() {
if !finished {
// TODO: can this experience indefinite postponement ?
// see: https://github.com/golang/go/issues/11506
obj.Event(processChan) // replay a new event
}
lock.Unlock()
}()
}
running = false
} }
} }
}() }()
@@ -403,8 +431,14 @@ func (g *Graph) Worker(v *Vertex) error {
case event := <-obj.Events(): case event := <-obj.Events():
// NOTE: this code should match the similar Res code! // NOTE: this code should match the similar Res code!
//cuid.SetConverged(false) // TODO: ? //cuid.SetConverged(false) // TODO: ?
if exit, send := obj.ReadEvent(&event); exit { if exit, send := obj.ReadEvent(event); exit != nil {
return nil // exit err := *exit // exit err
if e := obj.Close(); err == nil {
err = e
} else if e != nil {
err = multierr.Append(err, e) // list of errors
}
return err // exit
} else if send { } else if send {
// if we dive down this rabbit hole, our // if we dive down this rabbit hole, our
// timer.C won't get seen until we get out! // timer.C won't get seen until we get out!
@@ -442,7 +476,7 @@ func (g *Graph) Worker(v *Vertex) error {
// TODO: reset the watch retry count after some amount of success // TODO: reset the watch retry count after some amount of success
v.Res.RegisterConverger() v.Res.RegisterConverger()
var e error var e error
if v.Meta().Poll > 0 { // poll instead of watching :( if v.Res.Meta().Poll > 0 { // poll instead of watching :(
cuid := v.Res.ConvergerUID() // get the converger uid used to report status cuid := v.Res.ConvergerUID() // get the converger uid used to report status
cuid.StartTimer() cuid.StartTimer()
e = v.Res.Poll(processChan) e = v.Res.Poll(processChan)
@@ -474,7 +508,17 @@ func (g *Graph) Worker(v *Vertex) error {
// by getting the Watch resource to send one event once it's up! // by getting the Watch resource to send one event once it's up!
//v.SendEvent(eventPoke, false, false) //v.SendEvent(eventPoke, false, false)
} }
lock.Lock() // lock to avoid a send when closed!
finished = true
close(processChan) close(processChan)
lock.Unlock()
// close resource and return possible errors if any
if e := obj.Close(); err == nil {
err = e
} else if e != nil {
err = multierr.Append(err, e) // list of errors
}
return err return err
} }
@@ -504,7 +548,7 @@ func (g *Graph) Start(first bool) { // start or continue
v.Res.Starter(true) // let the startup code know to poke v.Res.Starter(true) // let the startup code know to poke
} }
if !v.Res.IsWatching() { // if Watch() is not running... if !v.Res.IsWorking() { // if Worker() is not running...
g.wg.Add(1) g.wg.Add(1)
// must pass in value to avoid races... // must pass in value to avoid races...
// see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/ // see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/
@@ -529,7 +573,7 @@ func (g *Graph) Start(first bool) { // start or continue
}(v) }(v)
if !first { // unpause! if !first { // unpause!
v.Res.SendEvent(event.EventStart, true, false) // sync! v.Res.SendEvent(event.EventStart, nil) // sync!
} }
} }
@@ -542,7 +586,7 @@ func (g *Graph) Pause() {
defer log.Printf("State: %v -> %v", g.setState(graphStatePaused), g.getState()) defer log.Printf("State: %v -> %v", g.setState(graphStatePaused), g.getState())
t, _ := g.TopologicalSort() t, _ := g.TopologicalSort()
for _, v := range t { // squeeze out the events... for _, v := range t { // squeeze out the events...
v.SendEvent(event.EventPause, true, false) v.SendEvent(event.EventPause, nil)
} }
} }
@@ -559,7 +603,7 @@ func (g *Graph) Exit() {
// when we hit the 'default' in the select statement! // when we hit the 'default' in the select statement!
// XXX: we can do this to quiesce, but it's not necessary now // XXX: we can do this to quiesce, but it's not necessary now
v.SendEvent(event.EventExit, true, false) v.SendEvent(event.EventExit, nil)
} }
g.wg.Wait() // for now, this doesn't need to be a separate Wait() method g.wg.Wait() // for now, this doesn't need to be a separate Wait() method
} }

View File

@@ -567,7 +567,7 @@ func (g *Graph) GraphSync(oldGraph *Graph) (*Graph, error) {
for v := range oldGraph.Adjacency { for v := range oldGraph.Adjacency {
if !VertexContains(v, vertexKeep) { if !VertexContains(v, vertexKeep) {
// wait for exit before starting new graph! // wait for exit before starting new graph!
v.SendEvent(event.EventExit, true, false) v.SendEvent(event.EventExit, nil) // sync
oldGraph.DeleteVertex(v) oldGraph.DeleteVertex(v)
} }
} }

View File

@@ -113,11 +113,11 @@ func (obj *ExecRes) BufioChanScanner(scanner *bufio.Scanner) (chan string, chan
} }
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *ExecRes) Watch(processChan chan event.Event) error { func (obj *ExecRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status cuid := obj.ConvergerUID() // get the converger uid used to report status
var send = false // send event? var send = false // send event?
var exit = false var exit *error
bufioch, errch := make(chan string), make(chan error) bufioch, errch := make(chan string), make(chan error)
if obj.WatchCmd != "" { if obj.WatchCmd != "" {
@@ -185,8 +185,8 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error {
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false) cuid.SetConverged(false)
if exit, send = obj.ReadEvent(&event); exit { if exit, send = obj.ReadEvent(event); exit != nil {
return nil // exit return *exit // exit
} }
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
@@ -199,9 +199,7 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error {
send = false send = false
// it is okay to invalidate the clean state on poke too // it is okay to invalidate the clean state on poke too
obj.StateOK(false) // something made state dirty obj.StateOK(false) // something made state dirty
if exit, err := obj.DoSend(processChan, ""); exit || err != nil { obj.Event(processChan)
return err // we exit or bubble up a NACK...
}
} }
} }
} }

View File

@@ -147,7 +147,7 @@ func (obj *FileRes) GetPath() string {
// If the Watch returns an error, it means that something has gone wrong, and it // If the Watch returns an error, it means that something has gone wrong, and it
// must be restarted. On a clean exit it returns nil. // must be restarted. On a clean exit it returns nil.
// FIXME: Also watch the source directory when using obj.Source !!! // FIXME: Also watch the source directory when using obj.Source !!!
func (obj *FileRes) Watch(processChan chan event.Event) error { func (obj *FileRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status cuid := obj.ConvergerUID() // get the converger uid used to report status
var err error var err error
@@ -163,7 +163,7 @@ func (obj *FileRes) Watch(processChan chan event.Event) error {
} }
var send = false // send event? var send = false // send event?
var exit = false var exit *error
for { for {
if obj.debug { if obj.debug {
@@ -188,8 +188,8 @@ func (obj *FileRes) Watch(processChan chan event.Event) error {
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false) cuid.SetConverged(false)
if exit, send = obj.ReadEvent(&event); exit { if exit, send = obj.ReadEvent(event); exit != nil {
return nil // exit return *exit // exit
} }
//obj.StateOK(false) // dirty // these events don't invalidate state //obj.StateOK(false) // dirty // these events don't invalidate state
@@ -201,9 +201,7 @@ func (obj *FileRes) Watch(processChan chan event.Event) error {
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
send = false send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil { obj.Event(processChan)
return err // we exit or bubble up a NACK...
}
} }
} }
} }

View File

@@ -111,7 +111,7 @@ func (obj *HostnameRes) Init() error {
} }
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *HostnameRes) Watch(processChan chan event.Event) error { func (obj *HostnameRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status cuid := obj.ConvergerUID() // get the converger uid used to report status
// if we share the bus with others, we will get each others messages!! // if we share the bus with others, we will get each others messages!!
@@ -148,8 +148,8 @@ func (obj *HostnameRes) Watch(processChan chan event.Event) error {
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false) cuid.SetConverged(false)
// we avoid sending events on unpause // we avoid sending events on unpause
if exit, _ := obj.ReadEvent(&event); exit { if exit, _ := obj.ReadEvent(event); exit != nil {
return nil // exit return *exit // exit
} }
send = true send = true
obj.StateOK(false) // dirty obj.StateOK(false) // dirty
@@ -162,10 +162,7 @@ func (obj *HostnameRes) Watch(processChan chan event.Event) error {
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
send = false send = false
obj.Event(processChan)
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
return err // we exit or bubble up a NACK...
}
} }
} }
} }

View File

@@ -139,7 +139,7 @@ func (obj *MsgRes) journalPriority() journal.Priority {
} }
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *MsgRes) Watch(processChan chan event.Event) error { func (obj *MsgRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status cuid := obj.ConvergerUID() // get the converger uid used to report status
// notify engine that we're running // notify engine that we're running
@@ -148,15 +148,15 @@ func (obj *MsgRes) Watch(processChan chan event.Event) error {
} }
var send = false // send event? var send = false // send event?
var exit = false var exit *error
for { for {
obj.SetState(ResStateWatching) // reset obj.SetState(ResStateWatching) // reset
select { select {
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false) cuid.SetConverged(false)
// we avoid sending events on unpause // we avoid sending events on unpause
if exit, send = obj.ReadEvent(&event); exit { if exit, send = obj.ReadEvent(event); exit != nil {
return nil // exit return *exit // exit
} }
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
@@ -167,9 +167,7 @@ func (obj *MsgRes) Watch(processChan chan event.Event) error {
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
send = false send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil { obj.Event(processChan)
return err // we exit or bubble up a NACK...
}
} }
} }
} }

View File

@@ -63,7 +63,7 @@ func (obj *NoopRes) Init() error {
} }
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *NoopRes) Watch(processChan chan event.Event) error { func (obj *NoopRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status cuid := obj.ConvergerUID() // get the converger uid used to report status
// notify engine that we're running // notify engine that we're running
@@ -72,15 +72,15 @@ func (obj *NoopRes) Watch(processChan chan event.Event) error {
} }
var send = false // send event? var send = false // send event?
var exit = false var exit *error
for { for {
obj.SetState(ResStateWatching) // reset obj.SetState(ResStateWatching) // reset
select { select {
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false) cuid.SetConverged(false)
// we avoid sending events on unpause // we avoid sending events on unpause
if exit, send = obj.ReadEvent(&event); exit { if exit, send = obj.ReadEvent(event); exit != nil {
return nil // exit return *exit // exit
} }
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
@@ -91,9 +91,7 @@ func (obj *NoopRes) Watch(processChan chan event.Event) error {
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
send = false send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil { obj.Event(processChan)
return err // we exit or bubble up a NACK...
}
} }
} }
} }

View File

@@ -102,7 +102,7 @@ func (obj *NspawnRes) Init() error {
} }
// Watch for state changes and sends a message to the bus if there is a change // Watch for state changes and sends a message to the bus if there is a change
func (obj *NspawnRes) Watch(processChan chan event.Event) error { func (obj *NspawnRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status cuid := obj.ConvergerUID() // get the converger uid used to report status
// this resource depends on systemd ensure that it's running // this resource depends on systemd ensure that it's running
@@ -133,7 +133,7 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error {
} }
var send = false var send = false
var exit = false var exit *error
for { for {
obj.SetState(ResStateWatching) obj.SetState(ResStateWatching)
@@ -155,8 +155,8 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error {
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false) cuid.SetConverged(false)
if exit, send = obj.ReadEvent(&event); exit { if exit, send = obj.ReadEvent(event); exit != nil {
return nil // exit return *exit // exit
} }
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
@@ -167,9 +167,7 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error {
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
send = false send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil { obj.Event(processChan)
return err // we exit or bubble up a NACK...
}
} }
} }
} }

View File

@@ -173,7 +173,7 @@ Loop:
} }
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *PasswordRes) Watch(processChan chan event.Event) error { func (obj *PasswordRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status cuid := obj.ConvergerUID() // get the converger uid used to report status
var err error var err error
@@ -189,7 +189,7 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error {
} }
var send = false // send event? var send = false // send event?
var exit = false var exit *error
for { for {
obj.SetState(ResStateWatching) // reset obj.SetState(ResStateWatching) // reset
select { select {
@@ -208,8 +208,8 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error {
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false) cuid.SetConverged(false)
// we avoid sending events on unpause // we avoid sending events on unpause
if exit, send = obj.ReadEvent(&event); exit { if exit, send = obj.ReadEvent(event); exit != nil {
return nil // exit return *exit // exit
} }
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
@@ -220,9 +220,7 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error {
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
send = false send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil { obj.Event(processChan)
return err // we exit or bubble up a NACK...
}
} }
} }
} }

View File

@@ -115,7 +115,7 @@ func (obj *PkgRes) Init() error {
// It uses the PackageKit UpdatesChanged signal to watch for changes. // It uses the PackageKit UpdatesChanged signal to watch for changes.
// TODO: https://github.com/hughsie/PackageKit/issues/109 // TODO: https://github.com/hughsie/PackageKit/issues/109
// TODO: https://github.com/hughsie/PackageKit/issues/110 // TODO: https://github.com/hughsie/PackageKit/issues/110
func (obj *PkgRes) Watch(processChan chan event.Event) error { func (obj *PkgRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status cuid := obj.ConvergerUID() // get the converger uid used to report status
bus := packagekit.NewBus() bus := packagekit.NewBus()
@@ -135,7 +135,7 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error {
} }
var send = false // send event? var send = false // send event?
var exit = false var exit *error
for { for {
if obj.debug { if obj.debug {
@@ -163,8 +163,8 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error {
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false) cuid.SetConverged(false)
if exit, send = obj.ReadEvent(&event); exit { if exit, send = obj.ReadEvent(event); exit != nil {
return nil // exit return *exit // exit
} }
//obj.StateOK(false) // these events don't invalidate state //obj.StateOK(false) // these events don't invalidate state
@@ -176,9 +176,7 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error {
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
send = false send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil { obj.Event(processChan)
return err // we exit or bubble up a NACK...
}
} }
} }
} }

View File

@@ -26,6 +26,7 @@ import (
"log" "log"
"os" "os"
"path" "path"
"sync"
"time" "time"
// TODO: should each resource be a sub-package? // TODO: should each resource be a sub-package?
@@ -129,19 +130,19 @@ type Base interface {
SetKind(string) SetKind(string)
Kind() string Kind() string
Meta() *MetaParams Meta() *MetaParams
Events() chan event.Event Events() chan *event.Event
AssociateData(*Data) AssociateData(*Data)
IsWatching() bool IsWorking() bool
SetWatching(bool) SetWorking(bool)
Converger() converger.Converger Converger() converger.Converger
RegisterConverger() RegisterConverger()
UnregisterConverger() UnregisterConverger()
ConvergerUID() converger.ConvergerUID ConvergerUID() converger.ConvergerUID
GetState() ResState GetState() ResState
SetState(ResState) SetState(ResState)
DoSend(chan event.Event, string) (bool, error) Event(chan *event.Event) error
SendEvent(event.EventName, bool, bool) bool SendEvent(event.EventName, error) error
ReadEvent(*event.Event) (bool, bool) // TODO: optional here? ReadEvent(*event.Event) (*error, bool)
Refresh() bool // is there a pending refresh to run? Refresh() bool // is there a pending refresh to run?
SetRefresh(bool) // set the refresh state of this resource SetRefresh(bool) // set the refresh state of this resource
SendRecv(Res) (map[string]bool, error) // send->recv data passing function SendRecv(Res) (map[string]bool, error) // send->recv data passing function
@@ -154,10 +155,10 @@ type Base interface {
GetGroup() []Res // return everyone grouped inside me GetGroup() []Res // return everyone grouped inside me
SetGroup([]Res) SetGroup([]Res)
VarDir(string) (string, error) VarDir(string) (string, error)
Running(chan event.Event) error // notify the engine that Watch started Running(chan *event.Event) error // notify the engine that Watch started
Started() <-chan struct{} // returns when the resource has started Started() <-chan struct{} // returns when the resource has started
Starter(bool) Starter(bool)
Poll(chan event.Event) error // poll alternative to watching :( Poll(chan *event.Event) error // poll alternative to watching :(
} }
// Res is the minimum interface you need to implement to define a new resource. // Res is the minimum interface you need to implement to define a new resource.
@@ -166,8 +167,9 @@ type Res interface {
Default() Res // return a struct with sane defaults as a Res Default() Res // return a struct with sane defaults as a Res
Validate() error Validate() error
Init() error Init() error
Close() error
GetUIDs() []ResUID // most resources only return one GetUIDs() []ResUID // most resources only return one
Watch(chan event.Event) error // send on channel to signal process() events Watch(chan *event.Event) error // send on channel to signal process() events
CheckApply(apply bool) (checkOK bool, err error) CheckApply(apply bool) (checkOK bool, err error)
AutoEdges() AutoEdge AutoEdges() AutoEdge
Compare(Res) bool Compare(Res) bool
@@ -182,13 +184,14 @@ type BaseRes struct {
Recv map[string]*Send // mapping of key to receive on from value Recv map[string]*Send // mapping of key to receive on from value
kind string kind string
events chan event.Event mutex *sync.Mutex // locks around sending and closing of events channel
events chan *event.Event
converger converger.Converger // converged tracking converger converger.Converger // converged tracking
cuid converger.ConvergerUID cuid converger.ConvergerUID
prefix string // base prefix for this resource prefix string // base prefix for this resource
debug bool debug bool
state ResState state ResState
watching bool // is Watch() loop running ? working bool // is the Worker() loop running ?
started chan struct{} // closed when worker is started/running started chan struct{} // closed when worker is started/running
starter bool // does this have indegree == 0 ? XXX: usually? starter bool // does this have indegree == 0 ? XXX: usually?
isStateOK bool // whether the state is okay based on events or not isStateOK bool // whether the state is okay based on events or not
@@ -244,7 +247,8 @@ func (obj *BaseRes) Init() error {
if obj.kind == "" { if obj.kind == "" {
return fmt.Errorf("Resource did not set kind!") return fmt.Errorf("Resource did not set kind!")
} }
obj.events = make(chan event.Event) // unbuffered chan to avoid stale events obj.mutex = &sync.Mutex{}
obj.events = make(chan *event.Event) // unbuffered chan to avoid stale events
obj.started = make(chan struct{}) // closes when started obj.started = make(chan struct{}) // closes when started
//dir, err := obj.VarDir("") //dir, err := obj.VarDir("")
//if err != nil { //if err != nil {
@@ -255,6 +259,15 @@ func (obj *BaseRes) Init() error {
return nil return nil
} }
// Close shuts down and performs any cleanup.
func (obj *BaseRes) Close() error {
obj.mutex.Lock()
obj.working = false // obj.SetWorking(false)
close(obj.events) // this is where we properly close this channel!
obj.mutex.Unlock()
return nil
}
// GetName is used by all the resources to Get the name. // GetName is used by all the resources to Get the name.
func (obj *BaseRes) GetName() string { func (obj *BaseRes) GetName() string {
return obj.Name return obj.Name
@@ -281,7 +294,7 @@ func (obj *BaseRes) Meta() *MetaParams {
} }
// Events returns the channel of events to listen on. // Events returns the channel of events to listen on.
func (obj *BaseRes) Events() chan event.Event { func (obj *BaseRes) Events() chan *event.Event {
return obj.events return obj.events
} }
@@ -292,14 +305,18 @@ func (obj *BaseRes) AssociateData(data *Data) {
obj.debug = data.Debug obj.debug = data.Debug
} }
// IsWatching tells us if the Worker() function is running. // IsWorking tells us if the Worker() function is running.
func (obj *BaseRes) IsWatching() bool { func (obj *BaseRes) IsWorking() bool {
return obj.watching obj.mutex.Lock()
defer obj.mutex.Unlock()
return obj.working
} }
// SetWatching stores the status of if the Worker() function is running. // SetWorking tracks the state of if Worker() function is running.
func (obj *BaseRes) SetWatching(b bool) { func (obj *BaseRes) SetWorking(b bool) {
obj.watching = b obj.mutex.Lock()
defer obj.mutex.Unlock()
obj.working = b
} }
// Converger returns the converger object used by the system. It can be used to // Converger returns the converger object used by the system. It can be used to
@@ -455,7 +472,7 @@ func (obj *BaseRes) Started() <-chan struct{} { return obj.started }
func (obj *BaseRes) Starter(b bool) { obj.starter = b } func (obj *BaseRes) Starter(b bool) { obj.starter = b }
// Poll is the watch replacement for when we want to poll, which outputs events. // Poll is the watch replacement for when we want to poll, which outputs events.
func (obj *BaseRes) Poll(processChan chan event.Event) error { func (obj *BaseRes) Poll(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status cuid := obj.ConvergerUID() // get the converger uid used to report status
// create a time.Ticker for the given interval // create a time.Ticker for the given interval
@@ -468,7 +485,7 @@ func (obj *BaseRes) Poll(processChan chan event.Event) error {
} }
var send = false var send = false
var exit = false var exit *error
for { for {
obj.SetState(ResStateWatching) obj.SetState(ResStateWatching)
select { select {
@@ -479,16 +496,14 @@ func (obj *BaseRes) Poll(processChan chan event.Event) error {
case event := <-obj.Events(): case event := <-obj.Events():
cuid.ResetTimer() // important cuid.ResetTimer() // important
if exit, send = obj.ReadEvent(&event); exit { if exit, send = obj.ReadEvent(event); exit != nil {
return nil return *exit // exit
} }
} }
if send { if send {
send = false send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil { obj.Event(processChan)
return err // we exit or bubble up a NACK...
}
} }
} }
} }

View File

@@ -28,97 +28,84 @@ import (
errwrap "github.com/pkg/errors" errwrap "github.com/pkg/errors"
) )
// SendEvent pushes an event into the message queue for a particular vertex // Event sends off an event, but doesn't block the incoming event queue.
func (obj *BaseRes) SendEvent(ev event.EventName, sync bool, activity bool) bool { func (obj *BaseRes) Event(processChan chan *event.Event) error {
// TODO: isn't this race-y ? resp := event.NewResp()
if !obj.IsWatching() { // element has already exited processChan <- &event.Event{Name: event.EventNil, Resp: resp} // trigger process
return false // if we don't return, we'll block on the send return resp.Wait()
}
if !sync {
obj.events <- event.Event{Name: ev, Resp: nil, Msg: "", Activity: activity}
return true
} }
// SendEvent pushes an event into the message queue for a particular vertex.
func (obj *BaseRes) SendEvent(ev event.EventName, err error) error {
resp := event.NewResp() resp := event.NewResp()
obj.events <- event.Event{Name: ev, Resp: resp, Msg: "", Activity: activity} obj.mutex.Lock()
if !obj.working {
obj.mutex.Unlock()
return fmt.Errorf("resource worker is not running")
}
obj.events <- &event.Event{Name: ev, Resp: resp, Err: err}
obj.mutex.Unlock()
resp.ACKWait() // waits until true (nil) value resp.ACKWait() // waits until true (nil) value
return true return nil
}
// DoSend sends off an event, but doesn't block the incoming event queue.
func (obj *BaseRes) DoSend(processChan chan event.Event, comment string) (exit bool, err error) {
resp := event.NewResp()
processChan <- event.Event{Name: event.EventNil, Resp: resp, Activity: false, Msg: comment} // trigger process
e := resp.Wait()
return false, e // XXX: at the moment, we don't use the exit bool.
} }
// ReadEvent processes events when a select gets one, and handles the pause // ReadEvent processes events when a select gets one, and handles the pause
// code too! The return values specify if we should exit and poke respectively. // code too! The return values specify if we should exit and poke respectively.
func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, send bool) { func (obj *BaseRes) ReadEvent(ev *event.Event) (exit *error, send bool) {
ev.ACK() ev.ACK()
var poke bool err := ev.Error()
// ensure that a CheckApply runs by sending with a dirty state...
if ev.GetActivity() { // if previous node did work, and we were notified...
//obj.StateOK(false) // not necessarily
poke = true // poke!
//obj.SetRefresh(true) // TODO: is this redundant?
}
switch ev.Name { switch ev.Name {
case event.EventStart: case event.EventStart:
send = true || poke return nil, true
return
case event.EventPoke: case event.EventPoke:
send = true || poke return nil, true
return
case event.EventBackPoke: case event.EventBackPoke:
send = true || poke return nil, true // forward poking in response to a back poke!
return // forward poking in response to a back poke!
case event.EventExit: case event.EventExit:
// FIXME: what do we do if we have a pending refresh (poke) and an exit? // FIXME: what do we do if we have a pending refresh (poke) and an exit?
return true, false return &err, false
case event.EventPause: case event.EventPause:
// wait for next event to continue // wait for next event to continue
select { select {
case e, ok := <-obj.Events(): case e, ok := <-obj.Events():
if !ok { // shutdown if !ok { // shutdown
return true, false err := error(nil)
return &err, false
} }
e.ACK() e.ACK()
err := e.Error()
if e.Name == event.EventExit { if e.Name == event.EventExit {
return true, false return &err, false
} else if e.Name == event.EventStart { // eventContinue } else if e.Name == event.EventStart { // eventContinue
return false, false // don't poke on unpause! return nil, false // don't poke on unpause!
} else { }
// if we get a poke event here, it's a bug! // if we get a poke event here, it's a bug!
log.Fatalf("%s[%s]: Unknown event: %v, while paused!", obj.Kind(), obj.GetName(), e) err = fmt.Errorf("%s[%s]: Unknown event: %v, while paused!", obj.Kind(), obj.GetName(), e)
panic(err) // TODO: return a special sentinel instead?
//return &err, false
} }
} }
err = fmt.Errorf("Unknown event: %v", ev)
default: panic(err) // TODO: return a special sentinel instead?
log.Fatal("Unknown event: ", ev) //return &err, false
}
return true, false // required to keep the stupid go compiler happy
} }
// Running is called by the Watch method of the resource once it has started up. // Running is called by the Watch method of the resource once it has started up.
// This signals to the engine to kick off the initial CheckApply resource check. // This signals to the engine to kick off the initial CheckApply resource check.
func (obj *BaseRes) Running(processChan chan event.Event) error { func (obj *BaseRes) Running(processChan chan *event.Event) error {
obj.StateOK(false) // assume we're initially dirty obj.StateOK(false) // assume we're initially dirty
cuid := obj.ConvergerUID() // get the converger uid used to report status cuid := obj.ConvergerUID() // get the converger uid used to report status
cuid.SetConverged(false) // a reasonable initial assumption cuid.SetConverged(false) // a reasonable initial assumption
close(obj.started) // send started signal close(obj.started) // send started signal
// FIXME: exit return value is unused atm, so ignore it for now...
//if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
var err error var err error
if obj.starter { // vertices of indegree == 0 should send initial pokes if obj.starter { // vertices of indegree == 0 should send initial pokes
_, err = obj.DoSend(processChan, "") // trigger a CheckApply err = obj.Event(processChan) // trigger a CheckApply
} }
return err // bubble up any possible error (or nil) return err // bubble up any possible error (or nil)
} }

View File

@@ -79,7 +79,7 @@ func (obj *SvcRes) Init() error {
} }
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *SvcRes) Watch(processChan chan event.Event) error { func (obj *SvcRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status cuid := obj.ConvergerUID() // get the converger uid used to report status
// obj.Name: svc name // obj.Name: svc name
@@ -112,7 +112,7 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
var svc = fmt.Sprintf("%s.service", obj.Name) // systemd name var svc = fmt.Sprintf("%s.service", obj.Name) // systemd name
var send = false // send event? var send = false // send event?
var exit = false var exit *error
var invalid = false // does the svc exist or not? var invalid = false // does the svc exist or not?
var previous bool // previous invalid value var previous bool // previous invalid value
set := conn.NewSubscriptionSet() // no error should be returned set := conn.NewSubscriptionSet() // no error should be returned
@@ -162,8 +162,8 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false) cuid.SetConverged(false)
if exit, send = obj.ReadEvent(&event); exit { if exit, send = obj.ReadEvent(event); exit != nil {
return nil // exit return *exit // exit
} }
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
@@ -209,8 +209,8 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false) cuid.SetConverged(false)
if exit, send = obj.ReadEvent(&event); exit { if exit, send = obj.ReadEvent(event); exit != nil {
return nil // exit return *exit // exit
} }
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
@@ -221,9 +221,7 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
if send { if send {
send = false send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil { obj.Event(processChan)
return err // we exit or bubble up a NACK...
}
} }
} }
} }

View File

@@ -77,7 +77,7 @@ func (obj *TimerRes) newTicker() *time.Ticker {
} }
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *TimerRes) Watch(processChan chan event.Event) error { func (obj *TimerRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status cuid := obj.ConvergerUID() // get the converger uid used to report status
// create a time.Ticker for the given interval // create a time.Ticker for the given interval
@@ -100,8 +100,8 @@ func (obj *TimerRes) Watch(processChan chan event.Event) error {
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false) cuid.SetConverged(false)
if exit, _ := obj.ReadEvent(&event); exit { if exit, _ := obj.ReadEvent(event); exit != nil {
return nil return *exit // exit
} }
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
@@ -111,9 +111,7 @@ func (obj *TimerRes) Watch(processChan chan event.Event) error {
if send { if send {
send = false send = false
if exit, err := obj.DoSend(processChan, "timer ticked"); exit || err != nil { obj.Event(processChan)
return err // we exit or bubble up a NACK...
}
} }
} }
} }

View File

@@ -155,7 +155,7 @@ func (obj *VirtRes) connect() (conn *libvirt.Connect, err error) {
} }
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *VirtRes) Watch(processChan chan event.Event) error { func (obj *VirtRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status cuid := obj.ConvergerUID() // get the converger uid used to report status
conn, err := obj.connect() conn, err := obj.connect()
@@ -209,7 +209,7 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error {
} }
var send = false var send = false
var exit = false var exit *error // if ptr exists, that is the exit error to return
for { for {
select { select {
@@ -261,8 +261,8 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error {
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false) cuid.SetConverged(false)
if exit, send = obj.ReadEvent(&event); exit { if exit, send = obj.ReadEvent(event); exit != nil {
return nil // exit return *exit // exit
} }
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
@@ -272,9 +272,7 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error {
if send { if send {
send = false send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil { obj.Event(processChan)
return err // we exit or bubble up a NACK...
}
} }
} }
} }