converger: Add new timer system for determining convergence

This adds a new method of marking whether a particular UUID has
converged or not. You can now Start, Stop, or Reset a convergence timer
on the individual UUID's. This wraps the existing SetConverged calls
with a hidden go routine. It is not recommended to use the SetConverged
calls and the Timer calls on the same UUID.
This commit is contained in:
James Shubin
2016-08-30 03:56:22 -04:00
parent 6d45cd45d1
commit eee652cefe
2 changed files with 75 additions and 3 deletions

View File

@@ -23,6 +23,9 @@ import (
"time"
)
// TODO: we could make a new function that masks out the state of certain
// UUID's, but at the moment the new Timer code has obsoleted the need...
// Converger is the general interface for implementing a convergence watcher
type Converger interface { // TODO: need a better name
Register() ConvergerUUID
@@ -50,6 +53,9 @@ type ConvergerUUID interface {
SetConverged(bool) error
Unregister()
ConvergedTimer() <-chan time.Time
StartTimer() (func() error, error) // cancellable is the same as StopTimer()
ResetTimer() error // resets counter to zero
StopTimer() error
}
// converger is an implementation of the Converger interface
@@ -69,6 +75,9 @@ type convergerUUID struct {
converger Converger
id uint64
name string // user defined, friendly name
mutex sync.Mutex
timer chan struct{}
running bool // is the above timer running?
}
// NewConverger builds a new converger struct
@@ -93,6 +102,8 @@ func (obj *converger) Register() ConvergerUUID {
converger: obj,
id: obj.lastid,
name: fmt.Sprintf("%d", obj.lastid), // some default
timer: nil,
running: false,
}
}
@@ -147,6 +158,7 @@ func (obj *converger) Unregister(uuid ConvergerUUID) {
panic(fmt.Sprintf("Id of ConvergerUUID(%s) is nil!", uuid.Name()))
}
obj.mutex.Lock()
uuid.StopTimer() // ignore any errors
delete(obj.status, uuid.ID())
obj.mutex.Unlock()
uuid.InvalidateID()
@@ -308,3 +320,60 @@ func (obj *convergerUUID) Unregister() {
func (obj *convergerUUID) ConvergedTimer() <-chan time.Time {
return obj.converger.ConvergedTimer(obj)
}
// StartTimer runs an invisible timer that automatically converges on timeout.
func (obj *convergerUUID) StartTimer() (func() error, error) {
obj.mutex.Lock()
if !obj.running {
obj.timer = make(chan struct{})
obj.running = true
} else {
obj.mutex.Unlock()
return obj.StopTimer, fmt.Errorf("Timer already started!")
}
obj.mutex.Unlock()
go func() {
for {
select {
case _, ok := <-obj.timer: // reset signal channel
if !ok { // channel is closed
return // false to exit
}
obj.SetConverged(false)
case <-obj.ConvergedTimer():
obj.SetConverged(true) // converged!
select {
case _, ok := <-obj.timer: // reset signal channel
if !ok { // channel is closed
return // false to exit
}
}
}
}
}()
return obj.StopTimer, nil
}
// ResetTimer resets the counter to zero if using a StartTimer internally.
func (obj *convergerUUID) ResetTimer() error {
obj.mutex.Lock()
defer obj.mutex.Unlock()
if obj.running {
obj.timer <- struct{}{} // send the reset message
return nil
}
return fmt.Errorf("Timer hasn't been started!")
}
// StopTimer stops the running timer permanently until a StartTimer is run.
func (obj *convergerUUID) StopTimer() error {
obj.mutex.Lock()
defer obj.mutex.Unlock()
if !obj.running {
return fmt.Errorf("Timer isn't running!")
}
close(obj.timer)
obj.running = false
return nil
}

View File

@@ -161,9 +161,12 @@ func run(c *cli.Context) error {
// setup converger
converger := NewConverger(
c.Int("converged-timeout"),
func() { // lambda to run when converged
func(b bool) error { // lambda to run when converged
if b {
log.Printf("Converged for %d seconds, exiting!", c.Int("converged-timeout"))
exit <- true // trigger an exit!
}
return nil
},
)
go converger.Loop(true) // main loop for converger, true to start paused