diff --git a/converger.go b/converger.go index ecbf54b2..5bd9671e 100644 --- a/converger.go +++ b/converger.go @@ -33,6 +33,9 @@ type Converger interface { // TODO: need a better name Pause() Loop(bool) ConvergedTimer(ConvergerUUID) <-chan time.Time + Status() map[uint64]bool + Timeout() int // returns the timeout that this was created with + SetStateFn(func(bool) error) // sets the stateFn } // ConvergerUUID is the interface resources can use to notify with if converged @@ -51,13 +54,14 @@ type ConvergerUUID interface { // converger is an implementation of the Converger interface type converger struct { - timeout int // must be zero (instant) or greater seconds to run - exitFn func() // TODO: generalize functionality eventually? - channel chan struct{} // signal here to run an isConverged check - control chan bool // control channel for start/pause - mutex sync.RWMutex // used for controlling access to status and lastid - lastid uint64 - status map[uint64]bool + timeout int // must be zero (instant) or greater seconds to run + stateFn func(bool) error // run on converged state changes with state bool + converged bool // did we converge (state changes of this run Fn) + channel chan struct{} // signal here to run an isConverged check + control chan bool // control channel for start/pause + mutex sync.RWMutex // used for controlling access to status and lastid + lastid uint64 + status map[uint64]bool } // convergerUUID is an implementation of the ConvergerUUID interface @@ -68,10 +72,10 @@ type convergerUUID struct { } // NewConverger builds a new converger struct -func NewConverger(timeout int, exitFn func()) *converger { +func NewConverger(timeout int, stateFn func(bool) error) *converger { return &converger{ timeout: timeout, - exitFn: exitFn, + stateFn: stateFn, channel: make(chan struct{}), control: make(chan bool), lastid: 0, @@ -117,7 +121,7 @@ func (obj *converger) SetConverged(uuid ConvergerUUID, isConverged bool) error { } obj.status[uuid.ID()] = isConverged // set obj.mutex.Unlock() // unlock *before* poke or deadlock! - if isConverged { // only poke if it would be helpful + if isConverged != obj.converged { // only poke if it would be helpful // run in a go routine so that we never block... just queue up! // this allows us to send events, even if we haven't started... go func() { obj.channel <- struct{}{} }() @@ -195,24 +199,31 @@ func (obj *converger) Loop(startPaused bool) { case _ = <-obj.channel: if !obj.isConverged() { + if obj.converged { // we're doing a state change + if obj.stateFn != nil { + // call an arbitrary function + if err := obj.stateFn(false); err != nil { + // FIXME: what to do on error ? + } + } + } + obj.converged = false continue } - // we have converged! + // we have converged! if obj.timeout >= 0 { // only run if timeout is valid - if obj.exitFn != nil { - // call an arbitrary function - obj.exitFn() + if !obj.converged { // we're doing a state change + if obj.stateFn != nil { + // call an arbitrary function + if err := obj.stateFn(true); err != nil { + // FIXME: what to do on error ? + } + } } } - - for { // unblock/drain - <-obj.channel - } - //return - // TODO: would it be useful to loop and wait again ? - // we would need to reset or wait otherwise we'd be - // likely instantly already converged when we looped! + obj.converged = true + // loop and wait again... } } } @@ -230,6 +241,29 @@ func (obj *converger) ConvergedTimer(uuid ConvergerUUID) <-chan time.Time { return TimeAfterOrBlock(obj.timeout) } +// Status returns a map of the converged status of each UUID. +func (obj *converger) Status() map[uint64]bool { + status := make(map[uint64]bool) + obj.mutex.RLock() // take a read lock + defer obj.mutex.RUnlock() + for k, v := range obj.status { // make a copy to avoid the mutex + status[k] = v + } + return status +} + +// Timeout returns the timeout in seconds that converger was created with. This +// is useful to avoid passing in the timeout value separately when you're +// already passing in the Converger struct. +func (obj *converger) Timeout() int { + return obj.timeout +} + +// SetStateFn sets the state function to be run on change of converged state. +func (obj *converger) SetStateFn(stateFn func(bool) error) { + obj.stateFn = stateFn +} + // Id returns the unique id of this UUID object func (obj *convergerUUID) ID() uint64 { return obj.id