diff --git a/converger/converger.go b/converger/converger.go index 429d0a6b..8b5ae058 100644 --- a/converger/converger.go +++ b/converger/converger.go @@ -29,135 +29,248 @@ import ( multierr "github.com/hashicorp/go-multierror" ) -// TODO: we could make a new function that masks out the state of certain -// UID's, but at the moment the new Timer code has obsoleted the need... +// New builds a new converger coordinator. +func New(timeout int64) *Coordinator { + return &Coordinator{ + timeout: timeout, -// Converger is the general interface for implementing a convergence watcher. -type Converger interface { // TODO: need a better name - Register() UID - IsConverged(UID) bool // is the UID converged ? - SetConverged(UID, bool) error // set the converged state of the UID - Unregister(UID) - Start() - Pause() - Loop(bool) - ConvergedTimer(UID) <-chan time.Time - Status() map[uint64]bool - Timeout() int // returns the timeout that this was created with - AddStateFn(string, func(bool) error) error // adds a stateFn with a name - RemoveStateFn(string) error // remove a stateFn with a given name -} + mutex: &sync.RWMutex{}, -// UID is the interface resources can use to notify with if converged. You'll -// need to use part of the Converger interface to Register initially too. -type UID interface { - ID() uint64 // get Id - Name() string // get a friendly name - SetName(string) - IsValid() bool // has Id been initialized ? - InvalidateID() // set Id to nil - IsConverged() bool - SetConverged(bool) error - Unregister() - ConvergedTimer() <-chan time.Time - StartTimer() (func() error, error) // cancellable is the same as StopTimer() - ResetTimer() error // resets counter to zero - StopTimer() error -} + //lastid: 0, + status: make(map[*UID]struct{}), -// converger is an implementation of the Converger interface. -type converger struct { - timeout int // must be zero (instant) or greater seconds to run - converged bool // did we converge (state changes of this run Fn) - channel chan struct{} // signal here to run an isConverged check - control chan bool // control channel for start/pause - mutex *sync.RWMutex // used for controlling access to status and lastid - lastid uint64 - status map[uint64]bool - stateFns map[string]func(bool) error // run on converged state changes with state bool - smutex *sync.RWMutex // used for controlling access to stateFns -} + //converged: false, // initial state -// cuid is an implementation of the UID interface. -type cuid struct { - converger Converger - id uint64 - name string // user defined, friendly name - mutex *sync.Mutex - timer chan struct{} - running bool // is the above timer running? - wg *sync.WaitGroup -} + pokeChan: make(chan struct{}, 1), // must be buffered + + readyChan: make(chan struct{}), // ready signal + + //paused: false, // starts off as started + pauseSignal: make(chan struct{}), + //resumeSignal: make(chan struct{}), // happens on pause + //pausedAck: util.NewEasyAck(), // happens on pause -// NewConverger builds a new converger struct. -func NewConverger(timeout int) Converger { - return &converger{ - timeout: timeout, - channel: make(chan struct{}), - control: make(chan bool), - mutex: &sync.RWMutex{}, - lastid: 0, - status: make(map[uint64]bool), stateFns: make(map[string]func(bool) error), smutex: &sync.RWMutex{}, - } -} -// Register assigns a UID to the caller. -func (obj *converger) Register() UID { - obj.mutex.Lock() - defer obj.mutex.Unlock() - obj.lastid++ - obj.status[obj.lastid] = false // initialize as not converged - return &cuid{ - converger: obj, - id: obj.lastid, - name: fmt.Sprintf("%d", obj.lastid), // some default - mutex: &sync.Mutex{}, - timer: nil, - running: false, + closeChan: make(chan struct{}), wg: &sync.WaitGroup{}, } } -// IsConverged gets the converged status of a uid. -func (obj *converger) IsConverged(uid UID) bool { - if !uid.IsValid() { - panic(fmt.Sprintf("the ID of UID(%s) is nil", uid.Name())) - } - obj.mutex.RLock() - isConverged, found := obj.status[uid.ID()] // lookup - obj.mutex.RUnlock() - if !found { - panic("the ID of UID is unregistered") - } - return isConverged +// Coordinator is the central converger engine. +type Coordinator struct { + // timeout must be zero (instant) or greater seconds to run. If it's -1 + // then this is disabled, and we never run stateFns. + timeout int64 + + // mutex is used for controlling access to status and lastid. + mutex *sync.RWMutex + + // lastid contains the last uid we used for registration. + //lastid uint64 + // status contains a reference to each active UID. + status map[*UID]struct{} + + // converged stores the last convergence state. When this changes, we + // run the stateFns. + converged bool + + // pokeChan receives a message every time we might need to re-calculate. + pokeChan chan struct{} + + // readyChan closes to notify any interested parties that the main loop + // is running. + readyChan chan struct{} + + // paused represents if this coordinator is paused or not. + paused bool + // pauseSignal closes to request a pause of this coordinator. + pauseSignal chan struct{} + // resumeSignal closes to request a resume of this coordinator. + resumeSignal chan struct{} + // pausedAck is used to send an ack message saying that we've paused. + pausedAck *util.EasyAck + + // stateFns run on converged state changes. + stateFns map[string]func(bool) error + // smutex is used for controlling access to the stateFns map. + smutex *sync.RWMutex + + // closeChan closes when we've been requested to shutdown. + closeChan chan struct{} + // wg waits for everything to finish. + wg *sync.WaitGroup } -// SetConverged updates the converger with the converged state of the UID. -func (obj *converger) SetConverged(uid UID, isConverged bool) error { - if !uid.IsValid() { - return fmt.Errorf("the ID of UID(%s) is nil", uid.Name()) - } +// Register creates a new UID which can be used to report converged state. You +// must Unregister each UID before Shutdown will be able to finish running. +func (obj *Coordinator) Register() *UID { + obj.wg.Add(1) // additional tracking for each UID obj.mutex.Lock() - if _, found := obj.status[uid.ID()]; !found { - panic("the ID of UID is unregistered") + defer obj.mutex.Unlock() + //obj.lastid++ + uid := &UID{ + timeout: obj.timeout, // copy the timeout here + //id: obj.lastid, + //name: fmt.Sprintf("%d", obj.lastid), // some default + + poke: obj.poke, + + // timer + mutex: &sync.Mutex{}, + timer: nil, + running: false, + wg: &sync.WaitGroup{}, } - obj.status[uid.ID()] = isConverged // set - obj.mutex.Unlock() // unlock *before* poke or deadlock! - if isConverged != obj.converged { // only poke if it would be helpful - // run in a go routine so that we never block... just queue up! - // this allows us to send events, even if we haven't started... - go func() { obj.channel <- struct{}{} }() + uid.unregister = func() { obj.Unregister(uid) } // add unregister func + obj.status[uid] = struct{}{} // TODO: add converged state here? + return uid +} + +// Unregister removes the UID from the converger coordinator. If you supply an +// invalid or unregistered uid to this function, it will panic. An unregistered +// UID is no longer part of the convergence checking. +func (obj *Coordinator) Unregister(uid *UID) { + defer obj.wg.Done() // additional tracking for each UID + obj.mutex.Lock() + defer obj.mutex.Unlock() + + if _, exists := obj.status[uid]; !exists { + panic("uid is not registered") } + uid.StopTimer() // ignore any errors + delete(obj.status, uid) +} + +// Run starts the main loop for the converger coordinator. It is commonly run +// from a go routine. It blocks until the Shutdown method is run to close it. +// NOTE: when we have very short timeouts, if we start before all the resources +// have joined the map, then it might appear as if we converged before we did! +func (obj *Coordinator) Run(startPaused bool) { + obj.wg.Add(1) + wg := &sync.WaitGroup{} // needed for the startPaused + defer wg.Wait() // don't leave any leftover go routines running + if startPaused { + wg.Add(1) + go func() { + defer wg.Done() + obj.Pause() // ignore any errors + close(obj.readyChan) + }() + } else { + close(obj.readyChan) // we must wait till the wg.Add(1) has happened... + } + defer obj.wg.Done() + for { + // pause if one was requested... + select { + case <-obj.pauseSignal: // channel closes + obj.pausedAck.Ack() // send ack + // we are paused now, and waiting for resume or exit... + select { + case <-obj.resumeSignal: // channel closes + // resumed! + + case <-obj.closeChan: // we can always escape + return + } + + case _, ok := <-obj.pokeChan: // we got an event (re-calculate) + if !ok { + return + } + + if err := obj.test(); err != nil { + // FIXME: what to do on error ? + } + + case <-obj.closeChan: // we can always escape + return + } + } +} + +// Ready blocks until the Run loop has started up. This is useful so that we +// don't run Shutdown before we've even started up properly. +func (obj *Coordinator) Ready() { + select { + case <-obj.readyChan: + } +} + +// Shutdown sends a signal to the Run loop that it should exit. This blocks +// until it does. +func (obj *Coordinator) Shutdown() { + close(obj.closeChan) + obj.wg.Wait() + close(obj.pokeChan) // free memory? +} + +// Pause pauses the coordinator. It should not be called on an already paused +// coordinator. It will block until the coordinator pauses with an +// acknowledgment, or until an exit is requested. If the latter happens it will +// error. It is NOT thread-safe with the Resume() method so only call either one +// at a time. +func (obj *Coordinator) Pause() error { + if obj.paused { + return fmt.Errorf("already paused") + } + + obj.pausedAck = util.NewEasyAck() + obj.resumeSignal = make(chan struct{}) // build the resume signal + close(obj.pauseSignal) + + // wait for ack (or exit signal) + select { + case <-obj.pausedAck.Wait(): // we got it! + // we're paused + case <-obj.closeChan: + return fmt.Errorf("closing") + } + obj.paused = true + return nil } -// isConverged returns true if *every* registered uid has converged. -func (obj *converger) isConverged() bool { - obj.mutex.RLock() // take a read lock - defer obj.mutex.RUnlock() - for _, v := range obj.status { +// Resume unpauses the coordinator. It can be safely called on a brand-new +// coordinator that has just started running without incident. It is NOT +// thread-safe with the Pause() method, so only call either one at a time. +func (obj *Coordinator) Resume() { + // TODO: do we need a mutex around Resume? + if !obj.paused { // no need to unpause brand-new resources + return + } + + obj.pauseSignal = make(chan struct{}) // rebuild for next pause + close(obj.resumeSignal) + obj.poke() // unblock and notice the resume if necessary + + obj.paused = false + + // no need to wait for it to resume + //return // implied +} + +// poke sends a message to the coordinator telling it that it should re-evaluate +// whether we're converged or not. This does not block. Do not run this in a +// goroutine. It must not be called after Shutdown has been called. +func (obj *Coordinator) poke() { + // redundant + //if len(obj.pokeChan) > 0 { + // return + //} + + select { + case obj.pokeChan <- struct{}{}: + default: // if chan is now full because more than one poke happened... + } +} + +// IsConverged returns true if *every* registered uid has converged. If there +// are no registered UID's, then this will return true. +func (obj *Coordinator) IsConverged() bool { + for _, v := range obj.Status() { if !v { // everyone must be converged for this to be true return false } @@ -165,145 +278,40 @@ func (obj *converger) isConverged() bool { return true } -// Unregister dissociates the ConvergedUID from the converged checking. -func (obj *converger) Unregister(uid UID) { - if !uid.IsValid() { - panic(fmt.Sprintf("the ID of UID(%s) is nil", uid.Name())) +// test evaluates whether we're converged or not and runs the state change. It +// is NOT thread-safe. +func (obj *Coordinator) test() error { + // TODO: add these checks elsewhere to prevent anything from running? + if obj.timeout < 0 { + return nil // nothing to do (only run if timeout is valid) } - obj.mutex.Lock() - uid.StopTimer() // ignore any errors - delete(obj.status, uid.ID()) - obj.mutex.Unlock() - uid.InvalidateID() -} -// Start causes a Converger object to start or resume running. -func (obj *converger) Start() { - obj.control <- true -} + converged := obj.IsConverged() + defer func() { + obj.converged = converged // set this only at the end... + }() -// Pause causes a Converger object to stop running temporarily. -func (obj *converger) Pause() { // FIXME: add a sync ACK on pause before return - obj.control <- false -} - -// Loop is the main loop for a Converger object. It usually runs in a goroutine. -// TODO: we could eventually have each resource tell us as soon as it converges, -// and then keep track of the time delays here, to avoid callers needing select. -// NOTE: when we have very short timeouts, if we start before all the resources -// have joined the map, then it might appear as if we converged before we did! -func (obj *converger) Loop(startPaused bool) { - if obj.control == nil { - panic("converger not initialized correctly") - } - if startPaused { // start paused without racing - select { - case e := <-obj.control: - if !e { - panic("converger expected true") - } + if !converged { + if !obj.converged { // were we previously also not converged? + return nil // nothing to do } - } - for { - select { - case e := <-obj.control: // expecting "false" which means pause! - if e { - panic("converger expected false") - } - // now i'm paused... - select { - case e := <-obj.control: - if !e { - panic("converger expected true") - } - // restart - // kick once to refresh the check... - go func() { obj.channel <- struct{}{} }() - continue - } - case <-obj.channel: - if !obj.isConverged() { - if obj.converged { // we're doing a state change - // call the arbitrary functions (takes a read lock!) - if err := obj.runStateFns(false); err != nil { - // FIXME: what to do on error ? - } - } - obj.converged = false - continue - } - - // we have converged! - if obj.timeout >= 0 { // only run if timeout is valid - if !obj.converged { // we're doing a state change - // call the arbitrary functions (takes a read lock!) - if err := obj.runStateFns(true); err != nil { - // FIXME: what to do on error ? - } - } - } - obj.converged = true - // loop and wait again... - } + // we're doing a state change + // call the arbitrary functions (takes a read lock!) + return obj.runStateFns(false) } + + // we have converged! + if obj.converged { // were we previously also converged? + return nil // nothing to do + } + + // call the arbitrary functions (takes a read lock!) + return obj.runStateFns(true) } -// ConvergedTimer adds a timeout to a select call and blocks until then. -// TODO: this means we could eventually have per resource converged timeouts -func (obj *converger) ConvergedTimer(uid UID) <-chan time.Time { - // be clever: if i'm already converged, this timeout should block which - // avoids unnecessary new signals being sent! this avoids fast loops if - // we have a low timeout, or in particular a timeout == 0 - if uid.IsConverged() { - // blocks the case statement in select forever! - return util.TimeAfterOrBlock(-1) - } - return util.TimeAfterOrBlock(obj.timeout) -} - -// Status returns a map of the converged status of each UID. -func (obj *converger) Status() map[uint64]bool { - status := make(map[uint64]bool) - obj.mutex.RLock() // take a read lock - defer obj.mutex.RUnlock() - for k, v := range obj.status { // make a copy to avoid the mutex - status[k] = v - } - return status -} - -// Timeout returns the timeout in seconds that converger was created with. This -// is useful to avoid passing in the timeout value separately when you're -// already passing in the Converger struct. -func (obj *converger) Timeout() int { - return obj.timeout -} - -// AddStateFn adds a state function to be run on change of converged state. -func (obj *converger) AddStateFn(name string, stateFn func(bool) error) error { - obj.smutex.Lock() - defer obj.smutex.Unlock() - if _, exists := obj.stateFns[name]; exists { - return fmt.Errorf("a stateFn with that name already exists") - } - obj.stateFns[name] = stateFn - return nil -} - -// RemoveStateFn adds a state function to be run on change of converged state. -func (obj *converger) RemoveStateFn(name string) error { - obj.smutex.Lock() - defer obj.smutex.Unlock() - if _, exists := obj.stateFns[name]; !exists { - return fmt.Errorf("a stateFn with that name doesn't exist") - } - delete(obj.stateFns, name) - return nil -} - -// runStateFns runs the listed of stored state functions. -func (obj *converger) runStateFns(converged bool) error { +// runStateFns runs the list of stored state functions. +func (obj *Coordinator) runStateFns(converged bool) error { obj.smutex.RLock() defer obj.smutex.RUnlock() var keys []string @@ -322,70 +330,119 @@ func (obj *converger) runStateFns(converged bool) error { return err } -// ID returns the unique id of this UID object. -func (obj *cuid) ID() uint64 { - return obj.id +// AddStateFn adds a state function to be run on change of converged state. +func (obj *Coordinator) AddStateFn(name string, stateFn func(bool) error) error { + obj.smutex.Lock() + defer obj.smutex.Unlock() + if _, exists := obj.stateFns[name]; exists { + return fmt.Errorf("a stateFn with that name already exists") + } + obj.stateFns[name] = stateFn + return nil } -// Name returns a user defined name for the specific cuid. -func (obj *cuid) Name() string { - return obj.name +// RemoveStateFn removes a state function from running on change of converged +// state. +func (obj *Coordinator) RemoveStateFn(name string) error { + obj.smutex.Lock() + defer obj.smutex.Unlock() + if _, exists := obj.stateFns[name]; !exists { + return fmt.Errorf("a stateFn with that name doesn't exist") + } + delete(obj.stateFns, name) + return nil } -// SetName sets a user defined name for the specific cuid. -func (obj *cuid) SetName(name string) { - obj.name = name +// Status returns a map of the converged status of each UID. +func (obj *Coordinator) Status() map[*UID]bool { + status := make(map[*UID]bool) + obj.mutex.RLock() // take a read lock + defer obj.mutex.RUnlock() + for k := range obj.status { + status[k] = k.IsConverged() + } + return status } -// IsValid tells us if the id is valid or has already been destroyed. -func (obj *cuid) IsValid() bool { - return obj.id != 0 // an id of 0 is invalid +// Timeout returns the timeout in seconds that converger was created with. This +// is useful to avoid passing in the timeout value separately when you're +// already passing in the Coordinator struct. +func (obj *Coordinator) Timeout() int64 { + return obj.timeout } -// InvalidateID marks the id as no longer valid. -func (obj *cuid) InvalidateID() { - obj.id = 0 // an id of 0 is invalid +// UID represents one of the probes for the converger coordinator. It is created +// by calling the Register method of the Coordinator struct. It should be freed +// after use with Unregister. +type UID struct { + // timeout is a copy of the main timeout. It could eventually be used + // for per-UID timeouts too. + timeout int64 + // isConverged stores the convergence state of this particular UID. + isConverged bool + + // poke stores a reference to the main poke function. + poke func() + // unregister stores a reference to the unregister function. + unregister func() + + // timer + mutex *sync.Mutex + timer chan struct{} + running bool // is the timer running? + wg *sync.WaitGroup } -// IsConverged is a helper function to the regular IsConverged method. -func (obj *cuid) IsConverged() bool { - return obj.converger.IsConverged(obj) +// Unregister removes this UID from the converger coordinator. An unregistered +// UID is no longer part of the convergence checking. +func (obj *UID) Unregister() { + obj.unregister() } -// SetConverged is a helper function to the regular SetConverged notification. -func (obj *cuid) SetConverged(isConverged bool) error { - return obj.converger.SetConverged(obj, isConverged) +// IsConverged reports whether this UID is converged or not. +func (obj *UID) IsConverged() bool { + return obj.isConverged } -// Unregister is a helper function to unregister myself. -func (obj *cuid) Unregister() { - obj.converger.Unregister(obj) +// SetConverged sets the convergence state of this UID. This is used by the +// running timer if one is started. The timer will overwrite any value set by +// this method. +func (obj *UID) SetConverged(isConverged bool) { + obj.isConverged = isConverged + obj.poke() // notify of change } -// ConvergedTimer is a helper around the regular ConvergedTimer method. -func (obj *cuid) ConvergedTimer() <-chan time.Time { - return obj.converger.ConvergedTimer(obj) +// ConvergedTimer adds a timeout to a select call and blocks until then. +// TODO: this means we could eventually have per resource converged timeouts +func (obj *UID) ConvergedTimer() <-chan time.Time { + // be clever: if i'm already converged, this timeout should block which + // avoids unnecessary new signals being sent! this avoids fast loops if + // we have a low timeout, or in particular a timeout == 0 + if obj.IsConverged() { + // blocks the case statement in select forever! + return util.TimeAfterOrBlock(-1) + } + return util.TimeAfterOrBlock(int(obj.timeout)) } -// StartTimer runs an invisible timer that automatically converges on timeout. -func (obj *cuid) StartTimer() (func() error, error) { +// StartTimer runs a timer that sets us as converged on timeout. It also returns +// a handle to the StopTimer function which should be run before exit. +func (obj *UID) StartTimer() (func() error, error) { obj.mutex.Lock() - if !obj.running { - obj.timer = make(chan struct{}) - obj.running = true - } else { - obj.mutex.Unlock() + defer obj.mutex.Unlock() + if obj.running { return obj.StopTimer, fmt.Errorf("timer already started") } - obj.mutex.Unlock() + obj.timer = make(chan struct{}) + obj.running = true obj.wg.Add(1) go func() { defer obj.wg.Done() for { select { case _, ok := <-obj.timer: // reset signal channel - if !ok { // channel is closed - return // false to exit + if !ok { + return } obj.SetConverged(false) @@ -393,8 +450,8 @@ func (obj *cuid) StartTimer() (func() error, error) { obj.SetConverged(true) // converged! select { case _, ok := <-obj.timer: // reset signal channel - if !ok { // channel is closed - return // false to exit + if !ok { + return } } } @@ -403,8 +460,8 @@ func (obj *cuid) StartTimer() (func() error, error) { return obj.StopTimer, nil } -// ResetTimer resets the counter to zero if using a StartTimer internally. -func (obj *cuid) ResetTimer() error { +// ResetTimer resets the timer to zero. +func (obj *UID) ResetTimer() error { obj.mutex.Lock() defer obj.mutex.Unlock() if obj.running { @@ -414,8 +471,8 @@ func (obj *cuid) ResetTimer() error { return fmt.Errorf("timer hasn't been started") } -// StopTimer stops the running timer permanently until a StartTimer is run. -func (obj *cuid) StopTimer() error { +// StopTimer stops the running timer. +func (obj *UID) StopTimer() error { obj.mutex.Lock() defer obj.mutex.Unlock() if !obj.running { diff --git a/converger/converger_test.go b/converger/converger_test.go new file mode 100644 index 00000000..6da2497a --- /dev/null +++ b/converger/converger_test.go @@ -0,0 +1,31 @@ +// Mgmt +// Copyright (C) 2013-2018+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +// +build !root + +package converger + +import ( + "testing" +) + +func TestBufferedChan1(t *testing.T) { + ch := make(chan bool, 1) + ch <- true + close(ch) // closing a channel that's not empty should not block + // must be able to exit without blocking anywhere +} diff --git a/engine/graph/engine.go b/engine/graph/engine.go index 3950efaa..f47289b1 100644 --- a/engine/graph/engine.go +++ b/engine/graph/engine.go @@ -42,7 +42,7 @@ type Engine struct { // Prefix is a unique directory prefix which can be used. It should be // created if needed. Prefix string - Converger converger.Converger + Converger *converger.Coordinator Debug bool Logf func(format string, v ...interface{}) diff --git a/engine/graph/state.go b/engine/graph/state.go index e3fdc05d..6f714591 100644 --- a/engine/graph/state.go +++ b/engine/graph/state.go @@ -51,7 +51,7 @@ type State struct { // created if needed. Prefix string - //Converger converger.Converger + //Converger *converger.Coordinator // Debug turns on additional output and behaviours. Debug bool @@ -85,8 +85,8 @@ type State struct { starter bool // do we have an indegree of 0 ? working bool // is the Main() loop running ? - cuid converger.UID // primary converger - tuid converger.UID // secondary converger + cuid *converger.UID // primary converger + tuid *converger.UID // secondary converger init *engine.Init // a copy of the init struct passed to res Init } diff --git a/etcd/etcd.go b/etcd/etcd.go index ffc44bf8..840c113f 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -210,8 +210,8 @@ type EmbdEtcd struct { // EMBeddeD etcd txnq chan *TN // txn queue flags Flags - prefix string // folder prefix to use for misc storage - converger converger.Converger // converged tracking + prefix string // folder prefix to use for misc storage + converger *converger.Coordinator // converged tracking // etcd server related serverwg sync.WaitGroup // wait for server to shutdown @@ -221,7 +221,7 @@ type EmbdEtcd struct { // EMBeddeD etcd } // NewEmbdEtcd creates the top level embedded etcd struct client and server obj. -func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs, advertiseClientURLs, advertiseServerURLs etcdtypes.URLs, noServer bool, noNetwork bool, idealClusterSize uint16, flags Flags, prefix string, converger converger.Converger) *EmbdEtcd { +func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs, advertiseClientURLs, advertiseServerURLs etcdtypes.URLs, noServer bool, noNetwork bool, idealClusterSize uint16, flags Flags, prefix string, converger *converger.Coordinator) *EmbdEtcd { endpoints := make(etcdtypes.URLsMap) if hostname == seedSentinel { // safety return nil @@ -764,7 +764,6 @@ func (obj *EmbdEtcd) CbLoop() { obj.exitwg.Add(1) defer obj.exitwg.Done() cuid := obj.converger.Register() - cuid.SetName("Etcd: CbLoop") defer cuid.Unregister() if e := obj.Connect(false); e != nil { return // fatal @@ -833,7 +832,6 @@ func (obj *EmbdEtcd) Loop() { obj.exitwg.Add(1) // TODO: add these to other go routines? defer obj.exitwg.Done() cuid := obj.converger.Register() - cuid.SetName("Etcd: Loop") defer cuid.Unregister() if e := obj.Connect(false); e != nil { return // fatal diff --git a/lib/cli.go b/lib/cli.go index daf8cdfb..9456a237 100644 --- a/lib/cli.go +++ b/lib/cli.go @@ -104,7 +104,7 @@ func CLI(program, version string, flags Flags) error { Value: "", Usage: "graphviz filter to use", }, - cli.IntFlag{ + cli.Int64Flag{ Name: "converged-timeout, t", Value: -1, Usage: "after approximately this many seconds without activity, we're considered to be in a converged state", diff --git a/lib/main.go b/lib/main.go index 6d596602..86867826 100644 --- a/lib/main.go +++ b/lib/main.go @@ -77,7 +77,7 @@ type Main struct { Sema int // add a semaphore with this lock count to each resource Graphviz string // output file for graphviz data GraphvizFilter string // graphviz filter to use - ConvergedTimeout int // approximately this many seconds of inactivity means we're in a converged state; -1 to disable + ConvergedTimeout int64 // approximately this many seconds of inactivity means we're in a converged state; -1 to disable ConvergedTimeoutNoExit bool // don't exit on converged timeout ConvergedStatusFile string // file to append converged status to MaxRuntime uint // exit after a maximum of approximately this many seconds @@ -313,7 +313,7 @@ func (obj *Main) Run() error { } // setup converger - converger := converger.NewConverger( + converger := converger.New( obj.ConvergedTimeout, ) if obj.ConvergedStatusFile != "" { @@ -334,10 +334,12 @@ func (obj *Main) Run() error { } // XXX: should this be moved to later in the code? - go converger.Loop(true) // main loop for converger, true to start paused + go converger.Run(true) // main loop for converger, true to start paused + converger.Ready() // block until ready obj.cleanup = append(obj.cleanup, func() error { // TODO: shutdown converger, but make sure that using it in a // still running embdEtcd struct doesn't block waiting on it... + converger.Shutdown() return nil }) @@ -649,7 +651,7 @@ func (obj *Main) Run() error { Logf("error starting graph: %+v", err) continue } - converger.Start() // after Start() + converger.Resume() // after Start() started = true Logf("graph: %+v", obj.ge.Graph()) // show graph diff --git a/lib/run.go b/lib/run.go index 90130238..b5ef26ab 100644 --- a/lib/run.go +++ b/lib/run.go @@ -95,7 +95,7 @@ func run(c *cli.Context, name string, gapiObj gapi.GAPI) error { obj.Sema = cliContext.Int("sema") obj.Graphviz = cliContext.String("graphviz") obj.GraphvizFilter = cliContext.String("graphviz-filter") - obj.ConvergedTimeout = cliContext.Int("converged-timeout") + obj.ConvergedTimeout = cliContext.Int64("converged-timeout") obj.ConvergedTimeoutNoExit = cliContext.Bool("converged-timeout-no-exit") obj.ConvergedStatusFile = cliContext.String("converged-status-file") obj.MaxRuntime = uint(cliContext.Int("max-runtime"))