From eee652cefe36421ef3626c061f2745b774e9c993 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Tue, 30 Aug 2016 03:56:22 -0400 Subject: [PATCH] converger: Add new timer system for determining convergence This adds a new method of marking whether a particular UUID has converged or not. You can now Start, Stop, or Reset a convergence timer on the individual UUID's. This wraps the existing SetConverged calls with a hidden go routine. It is not recommended to use the SetConverged calls and the Timer calls on the same UUID. --- converger.go | 69 ++++++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 9 ++++--- 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/converger.go b/converger.go index 5bd9671e..49e43008 100644 --- a/converger.go +++ b/converger.go @@ -23,6 +23,9 @@ import ( "time" ) +// TODO: we could make a new function that masks out the state of certain +// UUID's, but at the moment the new Timer code has obsoleted the need... + // Converger is the general interface for implementing a convergence watcher type Converger interface { // TODO: need a better name Register() ConvergerUUID @@ -50,6 +53,9 @@ type ConvergerUUID interface { SetConverged(bool) error Unregister() ConvergedTimer() <-chan time.Time + StartTimer() (func() error, error) // cancellable is the same as StopTimer() + ResetTimer() error // resets counter to zero + StopTimer() error } // converger is an implementation of the Converger interface @@ -69,6 +75,9 @@ type convergerUUID struct { converger Converger id uint64 name string // user defined, friendly name + mutex sync.Mutex + timer chan struct{} + running bool // is the above timer running? } // NewConverger builds a new converger struct @@ -93,6 +102,8 @@ func (obj *converger) Register() ConvergerUUID { converger: obj, id: obj.lastid, name: fmt.Sprintf("%d", obj.lastid), // some default + timer: nil, + running: false, } } @@ -147,6 +158,7 @@ func (obj *converger) Unregister(uuid ConvergerUUID) { panic(fmt.Sprintf("Id of ConvergerUUID(%s) is nil!", uuid.Name())) } obj.mutex.Lock() + uuid.StopTimer() // ignore any errors delete(obj.status, uuid.ID()) obj.mutex.Unlock() uuid.InvalidateID() @@ -308,3 +320,60 @@ func (obj *convergerUUID) Unregister() { func (obj *convergerUUID) ConvergedTimer() <-chan time.Time { return obj.converger.ConvergedTimer(obj) } + +// StartTimer runs an invisible timer that automatically converges on timeout. +func (obj *convergerUUID) StartTimer() (func() error, error) { + obj.mutex.Lock() + if !obj.running { + obj.timer = make(chan struct{}) + obj.running = true + } else { + obj.mutex.Unlock() + return obj.StopTimer, fmt.Errorf("Timer already started!") + } + obj.mutex.Unlock() + go func() { + for { + select { + case _, ok := <-obj.timer: // reset signal channel + if !ok { // channel is closed + return // false to exit + } + obj.SetConverged(false) + + case <-obj.ConvergedTimer(): + obj.SetConverged(true) // converged! + select { + case _, ok := <-obj.timer: // reset signal channel + if !ok { // channel is closed + return // false to exit + } + } + } + } + }() + return obj.StopTimer, nil +} + +// ResetTimer resets the counter to zero if using a StartTimer internally. +func (obj *convergerUUID) ResetTimer() error { + obj.mutex.Lock() + defer obj.mutex.Unlock() + if obj.running { + obj.timer <- struct{}{} // send the reset message + return nil + } + return fmt.Errorf("Timer hasn't been started!") +} + +// StopTimer stops the running timer permanently until a StartTimer is run. +func (obj *convergerUUID) StopTimer() error { + obj.mutex.Lock() + defer obj.mutex.Unlock() + if !obj.running { + return fmt.Errorf("Timer isn't running!") + } + close(obj.timer) + obj.running = false + return nil +} diff --git a/main.go b/main.go index 7bb339b5..dc40d8fb 100644 --- a/main.go +++ b/main.go @@ -161,9 +161,12 @@ func run(c *cli.Context) error { // setup converger converger := NewConverger( c.Int("converged-timeout"), - func() { // lambda to run when converged - log.Printf("Converged for %d seconds, exiting!", c.Int("converged-timeout")) - exit <- true // trigger an exit! + func(b bool) error { // lambda to run when converged + if b { + log.Printf("Converged for %d seconds, exiting!", c.Int("converged-timeout")) + exit <- true // trigger an exit! + } + return nil }, ) go converger.Loop(true) // main loop for converger, true to start paused