converger: Add new methods to the API

This adds new helper methods to the API such as the ability to query the
set timeout, and to set the state change function after initialization.
The first makes it easier to pass in the timeout value to functions and
structs, because you don't need to pass it separately from the converger
object. The second makes it easy to specify the state change functon
when you don't know what it is at creation time. In addition, it is more
powerful now, and tells you when we converge or un-converge in case we
want to take different actions on each.
This commit is contained in:
James Shubin
2016-08-30 03:50:15 -04:00
parent f5fb135793
commit 6d45cd45d1

View File

@@ -33,6 +33,9 @@ type Converger interface { // TODO: need a better name
Pause() Pause()
Loop(bool) Loop(bool)
ConvergedTimer(ConvergerUUID) <-chan time.Time ConvergedTimer(ConvergerUUID) <-chan time.Time
Status() map[uint64]bool
Timeout() int // returns the timeout that this was created with
SetStateFn(func(bool) error) // sets the stateFn
} }
// ConvergerUUID is the interface resources can use to notify with if converged // ConvergerUUID is the interface resources can use to notify with if converged
@@ -52,7 +55,8 @@ type ConvergerUUID interface {
// converger is an implementation of the Converger interface // converger is an implementation of the Converger interface
type converger struct { type converger struct {
timeout int // must be zero (instant) or greater seconds to run timeout int // must be zero (instant) or greater seconds to run
exitFn func() // TODO: generalize functionality eventually? stateFn func(bool) error // run on converged state changes with state bool
converged bool // did we converge (state changes of this run Fn)
channel chan struct{} // signal here to run an isConverged check channel chan struct{} // signal here to run an isConverged check
control chan bool // control channel for start/pause control chan bool // control channel for start/pause
mutex sync.RWMutex // used for controlling access to status and lastid mutex sync.RWMutex // used for controlling access to status and lastid
@@ -68,10 +72,10 @@ type convergerUUID struct {
} }
// NewConverger builds a new converger struct // NewConverger builds a new converger struct
func NewConverger(timeout int, exitFn func()) *converger { func NewConverger(timeout int, stateFn func(bool) error) *converger {
return &converger{ return &converger{
timeout: timeout, timeout: timeout,
exitFn: exitFn, stateFn: stateFn,
channel: make(chan struct{}), channel: make(chan struct{}),
control: make(chan bool), control: make(chan bool),
lastid: 0, lastid: 0,
@@ -117,7 +121,7 @@ func (obj *converger) SetConverged(uuid ConvergerUUID, isConverged bool) error {
} }
obj.status[uuid.ID()] = isConverged // set obj.status[uuid.ID()] = isConverged // set
obj.mutex.Unlock() // unlock *before* poke or deadlock! obj.mutex.Unlock() // unlock *before* poke or deadlock!
if isConverged { // only poke if it would be helpful if isConverged != obj.converged { // only poke if it would be helpful
// run in a go routine so that we never block... just queue up! // run in a go routine so that we never block... just queue up!
// this allows us to send events, even if we haven't started... // this allows us to send events, even if we haven't started...
go func() { obj.channel <- struct{}{} }() go func() { obj.channel <- struct{}{} }()
@@ -195,24 +199,31 @@ func (obj *converger) Loop(startPaused bool) {
case _ = <-obj.channel: case _ = <-obj.channel:
if !obj.isConverged() { if !obj.isConverged() {
if obj.converged { // we're doing a state change
if obj.stateFn != nil {
// call an arbitrary function
if err := obj.stateFn(false); err != nil {
// FIXME: what to do on error ?
}
}
}
obj.converged = false
continue continue
} }
// we have converged! // we have converged!
if obj.timeout >= 0 { // only run if timeout is valid if obj.timeout >= 0 { // only run if timeout is valid
if obj.exitFn != nil { if !obj.converged { // we're doing a state change
if obj.stateFn != nil {
// call an arbitrary function // call an arbitrary function
obj.exitFn() if err := obj.stateFn(true); err != nil {
// FIXME: what to do on error ?
} }
} }
for { // unblock/drain
<-obj.channel
} }
//return }
// TODO: would it be useful to loop and wait again ? obj.converged = true
// we would need to reset or wait otherwise we'd be // loop and wait again...
// likely instantly already converged when we looped!
} }
} }
} }
@@ -230,6 +241,29 @@ func (obj *converger) ConvergedTimer(uuid ConvergerUUID) <-chan time.Time {
return TimeAfterOrBlock(obj.timeout) return TimeAfterOrBlock(obj.timeout)
} }
// Status returns a map of the converged status of each UUID.
func (obj *converger) Status() map[uint64]bool {
status := make(map[uint64]bool)
obj.mutex.RLock() // take a read lock
defer obj.mutex.RUnlock()
for k, v := range obj.status { // make a copy to avoid the mutex
status[k] = v
}
return status
}
// Timeout returns the timeout in seconds that converger was created with. This
// is useful to avoid passing in the timeout value separately when you're
// already passing in the Converger struct.
func (obj *converger) Timeout() int {
return obj.timeout
}
// SetStateFn sets the state function to be run on change of converged state.
func (obj *converger) SetStateFn(stateFn func(bool) error) {
obj.stateFn = stateFn
}
// Id returns the unique id of this UUID object // Id returns the unique id of this UUID object
func (obj *convergerUUID) ID() uint64 { func (obj *convergerUUID) ID() uint64 {
return obj.id return obj.id