diff --git a/converger.go b/converger.go new file mode 100644 index 00000000..a475dc5e --- /dev/null +++ b/converger.go @@ -0,0 +1,261 @@ +// Mgmt +// Copyright (C) 2013-2016+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package main + +import ( + "log" + "sync" + "time" +) + +// Converger is the general interface for implementing a convergence watcher +type Converger interface { // TODO: need a better name + Register() ConvergerUUID + IsConverged(ConvergerUUID) bool // is the UUID converged ? + SetConverged(ConvergerUUID, bool) // set the converged state of the UUID + Unregister(ConvergerUUID) + Start() + Pause() + Loop(bool) + ConvergedTimer(ConvergerUUID) <-chan time.Time +} + +// ConvergerUUID is the interface resources can use to notify with if converged +// you'll need to use part of the Converger interface to Register initially too +type ConvergerUUID interface { + ID() uint64 // get Id + IsValid() bool // has Id been initialized ? + InvalidateID() // set Id to nil + IsConverged() bool + SetConverged(bool) + Unregister() + ConvergedTimer() <-chan time.Time +} + +// converger is an implementation of the Converger interface +type converger struct { + timeout int // must be zero (instant) or greater seconds to run + exitFn func() // TODO: generalize functionality eventually? + channel chan struct{} // signal here to run an isConverged check + control chan bool // control channel for start/pause + mutex sync.RWMutex // used for controlling access to status and lastid + lastid uint64 + status map[uint64]bool +} + +// convergerUUID is an implementation of the ConvergerUUID interface +type convergerUUID struct { + converger Converger + id uint64 +} + +// NewConverger builds a new converger struct +func NewConverger(timeout int, exitFn func()) *converger { + return &converger{ + timeout: timeout, + exitFn: exitFn, + channel: make(chan struct{}), + control: make(chan bool), + lastid: 0, + status: make(map[uint64]bool), + } +} + +// Register assigns a ConvergerUUID to the caller +func (obj *converger) Register() ConvergerUUID { + obj.mutex.Lock() + defer obj.mutex.Unlock() + obj.lastid++ + obj.status[obj.lastid] = false // initialize as not converged + return &convergerUUID{ + converger: obj, + id: obj.lastid, + } +} + +// IsConverged gets the converged status of a uuid +func (obj *converger) IsConverged(uuid ConvergerUUID) bool { + if !uuid.IsValid() { + log.Fatal("Id of ConvergerUUID is nil!") + } + obj.mutex.RLock() + isConverged, found := obj.status[uuid.ID()] // lookup + obj.mutex.RUnlock() + if !found { + log.Fatal("Id of ConvergerUUID is unregistered!") + } + return isConverged +} + +// SetConverged updates the converger with the converged state of the UUID +func (obj *converger) SetConverged(uuid ConvergerUUID, isConverged bool) { + if !uuid.IsValid() { + log.Fatal("Id of ConvergerUUID is nil!") + } + obj.mutex.Lock() + if _, found := obj.status[uuid.ID()]; !found { + log.Fatal("Id of ConvergerUUID is unregistered!") + } + obj.status[uuid.ID()] = isConverged // set + obj.mutex.Unlock() // unlock *before* poke or deadlock! + if isConverged { // only poke if it would be helpful + // run in a go routine so that we never block... just queue up! + // this allows us to send events, even if we haven't started... + go func() { obj.channel <- struct{}{} }() + } +} + +// isConverged returns true if *every* registered uuid has converged +func (obj *converger) isConverged() bool { + obj.mutex.RLock() // take a read lock + defer obj.mutex.RUnlock() + for _, v := range obj.status { + if !v { // everyone must be converged for this to be true + return false + } + } + return true +} + +// Unregister dissociates the ConvergedUUID from the converged checking +func (obj *converger) Unregister(uuid ConvergerUUID) { + if !uuid.IsValid() { + log.Fatal("Id of ConvergerUUID is nil!") + } + obj.mutex.Lock() + delete(obj.status, uuid.ID()) + obj.mutex.Unlock() + uuid.InvalidateID() +} + +// Start causes a Converger object to start or resume running +func (obj *converger) Start() { + obj.control <- true +} + +// Pause causes a Converger object to stop running temporarily +func (obj *converger) Pause() { // FIXME: add a sync ACK on pause before return + obj.control <- false +} + +// Loop is the main loop for a Converger object; it usually runs in a goroutine +// TODO: we could eventually have each resource tell us as soon as it converges +// and then keep track of the time delays here, to avoid callers needing select +// NOTE: when we have very short timeouts, if we start before all the resources +// have joined the map, then it might appears as if we converged before we did! +func (obj *converger) Loop(startPaused bool) { + if obj.control == nil { + log.Fatal("Converger not initialized correctly") + } + if startPaused { // start paused without racing + select { + case e := <-obj.control: + if !e { + log.Fatal("Converger expected true!") + } + } + } + for { + select { + case e := <-obj.control: // expecting "false" which means pause! + if e { + log.Fatal("Converger expected false!") + } + // now i'm paused... + select { + case e := <-obj.control: + if !e { + log.Fatal("Converger expected true!") + } + // restart + // kick once to refresh the check... + go func() { obj.channel <- struct{}{} }() + continue + } + + case _ = <-obj.channel: + if !obj.isConverged() { + continue + } + // we have converged! + + if obj.timeout >= 0 { // only run if timeout is valid + if obj.exitFn != nil { + // call an arbitrary function + obj.exitFn() + } + } + + for { // unblock/drain + <-obj.channel + } + //return + // TODO: would it be useful to loop and wait again ? + // we would need to reset or wait otherwise we'd be + // likely instantly already converged when we looped! + } + } +} + +// ConvergedTimer adds a timeout to a select call and blocks until then +// TODO: this means we could eventually have per resource converged timeouts +func (obj *converger) ConvergedTimer(uuid ConvergerUUID) <-chan time.Time { + // be clever: if i'm already converged, this timeout should block which + // avoids unnecessary new signals being sent! this avoids fast loops if + // we have a low timeout, or in particular a timeout == 0 + if uuid.IsConverged() { + // blocks the case statement in select forever! + return TimeAfterOrBlock(-1) + } + return TimeAfterOrBlock(obj.timeout) +} + +// Id returns the unique id of this UUID object +func (obj *convergerUUID) ID() uint64 { + return obj.id +} + +// IsValid tells us if the id is valid or has already been destroyed +func (obj *convergerUUID) IsValid() bool { + return obj.id != 0 // an id of 0 is invalid +} + +// InvalidateID marks the id as no longer valid +func (obj *convergerUUID) InvalidateID() { + obj.id = 0 // an id of 0 is invalid +} + +// IsConverged is a helper function to the regular IsConverged method +func (obj *convergerUUID) IsConverged() bool { + return obj.converger.IsConverged(obj) +} + +// SetConverged is a helper function to the regular SetConverged notification +func (obj *convergerUUID) SetConverged(isConverged bool) { + obj.converger.SetConverged(obj, isConverged) +} + +// Unregister is a helper function to unregister myself +func (obj *convergerUUID) Unregister() { + obj.converger.Unregister(obj) +} + +// ConvergedTimer is a helper around the regular ConvergedTimer method +func (obj *convergerUUID) ConvergedTimer() <-chan time.Time { + return obj.converger.ConvergedTimer(obj) +} diff --git a/etcd.go b/etcd.go index 2e7f347d..7f6d9b74 100644 --- a/etcd.go +++ b/etcd.go @@ -37,29 +37,10 @@ const ( etcdBar ) -//go:generate stringer -type=etcdConvergedState -output=etcdconvergedstate_stringer.go -type etcdConvergedState int - -const ( - etcdConvergedNil etcdConvergedState = iota - //etcdConverged - etcdConvergedTimeout -) - type EtcdWObject struct { // etcd wrapper object - seed string - ctimeout int - converged chan bool - kapi etcd.KeysAPI - convergedState etcdConvergedState -} - -func (etcdO *EtcdWObject) GetConvergedState() etcdConvergedState { - return etcdO.convergedState -} - -func (etcdO *EtcdWObject) SetConvergedState(state etcdConvergedState) { - etcdO.convergedState = state + seed string + converger Converger // converged tracking + kapi etcd.KeysAPI } func (etcdO *EtcdWObject) GetKAPI() etcd.KeysAPI { @@ -114,8 +95,6 @@ func (etcdO *EtcdWObject) EtcdChannelWatch(watcher etcd.Watcher, context etcd_co func (etcdO *EtcdWObject) EtcdWatch() chan etcdMsg { kapi := etcdO.GetKAPI() - ctimeout := etcdO.ctimeout - converged := etcdO.converged // XXX: i think we need this buffered so that when we're hanging on the // channel, which is inside the EtcdWatch main loop, we still want the // calls to Get/Set on etcd to succeed, so blocking them here would @@ -126,6 +105,8 @@ func (etcdO *EtcdWObject) EtcdWatch() chan etcdMsg { t := tmin // current time tmult := 2 // multiplier for exponential delay tmax := 16000 // max delay + cuuid := etcdO.converger.Register() + defer cuuid.Unregister() watcher := kapi.Watcher("/exported/", &etcd.WatcherOptions{Recursive: true}) etcdch := etcdO.EtcdChannelWatch(watcher, etcd_context.Background()) for { @@ -134,12 +115,11 @@ func (etcdO *EtcdWObject) EtcdWatch() chan etcdMsg { var err error select { case out := <-etcdch: - etcdO.SetConvergedState(etcdConvergedNil) + cuuid.SetConverged(false) resp, err = out.resp, out.err - case _ = <-TimeAfterOrBlock(ctimeout): - etcdO.SetConvergedState(etcdConvergedTimeout) - converged <- true + case _ = <-cuuid.ConvergedTimer(): + cuuid.SetConverged(true) // converged! continue } diff --git a/exec.go b/exec.go index 139b47d7..6f42caa5 100644 --- a/exec.go +++ b/exec.go @@ -108,11 +108,12 @@ func (obj *ExecRes) Watch(processChan chan Event) { } obj.SetWatching(true) defer obj.SetWatching(false) + cuuid := obj.converger.Register() + defer cuuid.Unregister() var send = false // send event? var exit = false bufioch, errch := make(chan string), make(chan error) - //vertex := obj.GetVertex() // stored with SetVertex if obj.WatchCmd != "" { var cmdName string @@ -157,7 +158,7 @@ func (obj *ExecRes) Watch(processChan chan Event) { obj.SetState(resStateWatching) // reset select { case text := <-bufioch: - obj.SetConvergedState(resConvergedNil) + cuuid.SetConverged(false) // each time we get a line of output, we loop! log.Printf("%v[%v]: Watch output: %s", obj.Kind(), obj.GetName(), text) if text != "" { @@ -165,8 +166,8 @@ func (obj *ExecRes) Watch(processChan chan Event) { } case err := <-errch: - obj.SetConvergedState(resConvergedNil) // XXX ? - if err == nil { // EOF + cuuid.SetConverged(false) // XXX ? + if err == nil { // EOF // FIXME: add an "if watch command ends/crashes" // restart or generate error option log.Printf("%v[%v]: Reached EOF", obj.Kind(), obj.GetName()) @@ -177,14 +178,13 @@ func (obj *ExecRes) Watch(processChan chan Event) { // XXX: how should we handle errors? case event := <-obj.events: - obj.SetConvergedState(resConvergedNil) + cuuid.SetConverged(false) if exit, send = obj.ReadEvent(&event); exit { return // exit } - case _ = <-TimeAfterOrBlock(obj.ctimeout): - obj.SetConvergedState(resConvergedTimeout) - obj.converged <- true + case _ = <-cuuid.ConvergedTimer(): + cuuid.SetConverged(true) // converged! continue } diff --git a/file.go b/file.go index 77413dca..16f53bf8 100644 --- a/file.go +++ b/file.go @@ -108,11 +108,12 @@ func (obj *FileRes) Watch(processChan chan Event) { } obj.SetWatching(true) defer obj.SetWatching(false) + cuuid := obj.converger.Register() + defer cuuid.Unregister() //var recursive bool = false //var isdir = (obj.GetPath()[len(obj.GetPath())-1:] == "/") // dirs have trailing slashes //log.Printf("IsDirectory: %v", isdir) - //vertex := obj.GetVertex() // stored with SetVertex var safename = path.Clean(obj.GetPath()) // no trailing slash watcher, err := fsnotify.NewWatcher() @@ -164,7 +165,7 @@ func (obj *FileRes) Watch(processChan chan Event) { if DEBUG { log.Printf("File[%v]: Watch(%v), Event(%v): %v", obj.GetName(), current, event.Name, event.Op) } - obj.SetConvergedState(resConvergedNil) // XXX: technically i can detect if the event is erroneous or not first + cuuid.SetConverged(false) // XXX: technically i can detect if the event is erroneous or not first // the deeper you go, the bigger the deltaDepth is... // this is the difference between what we're watching, // and the event... doesn't mean we can't watch deeper @@ -234,21 +235,20 @@ func (obj *FileRes) Watch(processChan chan Event) { } case err := <-watcher.Errors: - obj.SetConvergedState(resConvergedNil) // XXX ? + cuuid.SetConverged(false) // XXX ? log.Printf("error: %v", err) log.Fatal(err) //obj.events <- fmt.Sprintf("file: %v", "error") // XXX: how should we handle errors? case event := <-obj.events: - obj.SetConvergedState(resConvergedNil) + cuuid.SetConverged(false) if exit, send = obj.ReadEvent(&event); exit { return // exit } //dirty = false // these events don't invalidate state - case _ = <-TimeAfterOrBlock(obj.ctimeout): - obj.SetConvergedState(resConvergedTimeout) - obj.converged <- true + case _ = <-cuuid.ConvergedTimer(): + cuuid.SetConverged(true) // converged! continue } diff --git a/main.go b/main.go index 079673b9..4a670b44 100644 --- a/main.go +++ b/main.go @@ -59,8 +59,7 @@ func waitForSignal(exit chan bool) { func run(c *cli.Context) { var start = time.Now().UnixNano() var wg sync.WaitGroup - exit := make(chan bool) // exit signal - converged := make(chan bool) // converged signal + exit := make(chan bool) // exit signal log.Printf("This is: %v, version: %v", program, version) log.Printf("Main: Start: %v", start) var G, fullGraph *Graph @@ -73,6 +72,16 @@ func run(c *cli.Context) { }() } + // setup converger + converger := NewConverger( + c.Int("converged-timeout"), + func() { // lambda to run when converged + log.Printf("Converged for %d seconds, exiting!", c.Int("converged-timeout")) + exit <- true // trigger an exit! + }, + ) + go converger.Loop(true) // main loop for converger, true to start paused + // initial etcd peer endpoint seed := c.String("seed") if seed == "" { @@ -86,8 +95,7 @@ func run(c *cli.Context) { // etcd etcdO := &EtcdWObject{ seed: seed, - ctimeout: c.Int("converged-timeout"), - converged: converged, + converger: converger, } hostname := c.String("hostname") @@ -136,7 +144,8 @@ func run(c *cli.Context) { // run graph vertex LOCK... if !first { // TODO: we can flatten this check out I think - G.Pause() // sync + converger.Pause() // FIXME: add sync wait? + G.Pause() // sync } // build graph from yaml file on events (eg: from etcd) @@ -148,6 +157,7 @@ func run(c *cli.Context) { // unpause! if !first { G.Start(&wg, first) // sync + converger.Start() // after G.Start() } continue } @@ -165,44 +175,18 @@ func run(c *cli.Context) { } else { log.Printf("Graphviz: Successfully generated graph!") } - G.SetVertex() - G.SetConvergedCallback(c.Int("converged-timeout"), converged) + G.AssociateData(converger) // G.Start(...) needs to be synchronous or wait, // because if half of the nodes are started and // some are not ready yet and the EtcdWatch // loops, we'll cause G.Pause(...) before we // even got going, thus causing nil pointer errors G.Start(&wg, first) // sync + converger.Start() // after G.Start() first = false } }() - if i := c.Int("converged-timeout"); i >= 0 { - go func() { - ConvergedLoop: - for { - <-converged // when anyone says they have converged - - if etcdO.GetConvergedState() != etcdConvergedTimeout { - continue - } - for v := range G.GetVerticesChan() { - if v.Res.GetConvergedState() != resConvergedTimeout { - continue ConvergedLoop - } - } - - // if all have converged, exit - log.Printf("Converged for %d seconds, exiting!", i) - exit <- true - for { - <-converged - } // unblock/drain - //return - } - }() - } - log.Println("Main: Running...") waitForSignal(exit) // pass in exit channel to watch diff --git a/noop.go b/noop.go index 763d4628..5a96f1cf 100644 --- a/noop.go +++ b/noop.go @@ -59,23 +59,23 @@ func (obj *NoopRes) Watch(processChan chan Event) { } obj.SetWatching(true) defer obj.SetWatching(false) + cuuid := obj.converger.Register() + defer cuuid.Unregister() - //vertex := obj.vertex // stored with SetVertex var send = false // send event? var exit = false for { obj.SetState(resStateWatching) // reset select { case event := <-obj.events: - obj.SetConvergedState(resConvergedNil) + cuuid.SetConverged(false) // we avoid sending events on unpause if exit, send = obj.ReadEvent(&event); exit { return // exit } - case _ = <-TimeAfterOrBlock(obj.ctimeout): - obj.SetConvergedState(resConvergedTimeout) - obj.converged <- true + case _ = <-cuuid.ConvergedTimer(): + cuuid.SetConverged(true) // converged! continue } diff --git a/pgraph.go b/pgraph.go index 39805d53..9d62c427 100644 --- a/pgraph.go +++ b/pgraph.go @@ -122,13 +122,6 @@ func (g *Graph) SetState(state graphState) graphState { return prev } -// store a pointer in the resource to it's parent vertex -func (g *Graph) SetVertex() { - for v := range g.GetVerticesChan() { - v.Res.SetVertex(v) - } -} - // AddVertex uses variadic input to add all listed vertices to the graph func (g *Graph) AddVertex(xv ...*Vertex) { for _, v := range xv { @@ -855,9 +848,10 @@ func (g *Graph) Exit() { } } -func (g *Graph) SetConvergedCallback(ctimeout int, converged chan bool) { +// AssociateData associates some data with the object in the graph in question +func (g *Graph) AssociateData(converger Converger) { for v := range g.GetVerticesChan() { - v.Res.SetConvergedCallback(ctimeout, converged) + v.Res.AssociateData(converger) } } diff --git a/pkg.go b/pkg.go index cbe7f2d8..1ed95777 100644 --- a/pkg.go +++ b/pkg.go @@ -45,9 +45,7 @@ type PkgRes struct { func NewPkgRes(name, state string, allowuntrusted, allownonfree, allowunsupported bool) *PkgRes { obj := &PkgRes{ BaseRes: BaseRes{ - Name: name, - events: make(chan Event), - vertex: nil, + Name: name, }, State: state, AllowUntrusted: allowuntrusted, @@ -113,6 +111,8 @@ func (obj *PkgRes) Watch(processChan chan Event) { } obj.SetWatching(true) defer obj.SetWatching(false) + cuuid := obj.converger.Register() + defer cuuid.Unregister() bus := NewBus() if bus == nil { @@ -148,20 +148,19 @@ func (obj *PkgRes) Watch(processChan chan Event) { <-ch // discard } - obj.SetConvergedState(resConvergedNil) + cuuid.SetConverged(false) send = true dirty = true case event := <-obj.events: - obj.SetConvergedState(resConvergedNil) + cuuid.SetConverged(false) if exit, send = obj.ReadEvent(&event); exit { return // exit } //dirty = false // these events don't invalidate state - case _ = <-TimeAfterOrBlock(obj.ctimeout): - obj.SetConvergedState(resConvergedTimeout) - obj.converged <- true + case _ = <-cuuid.ConvergedTimer(): + cuuid.SetConverged(true) // converged! continue } diff --git a/resources.go b/resources.go index 5c87d26d..0c5f1359 100644 --- a/resources.go +++ b/resources.go @@ -36,15 +36,6 @@ const ( resStatePoking ) -//go:generate stringer -type=resConvergedState -output=resconvergedstate_stringer.go -type resConvergedState int - -const ( - resConvergedNil resConvergedState = iota - //resConverged - resConvergedTimeout -) - // a unique identifier for a resource, namely it's name, and the kind ("type") type ResUUID interface { GetName() string @@ -78,12 +69,9 @@ type Base interface { SetName(string) Kind() string GetMeta() MetaParams - SetVertex(*Vertex) - SetConvergedCallback(ctimeout int, converged chan bool) + AssociateData(Converger) IsWatching() bool SetWatching(bool) - GetConvergedState() resConvergedState - SetConvergedState(resConvergedState) GetState() resState SetState(resState) SendEvent(eventName, bool, bool) bool @@ -110,19 +98,16 @@ type Res interface { } type BaseRes struct { - Name string `yaml:"name"` - Meta MetaParams `yaml:"meta"` // struct of all the metaparams - kind string - events chan Event - vertex *Vertex - state resState - convergedState resConvergedState - watching bool // is Watch() loop running ? - ctimeout int // converged timeout - converged chan bool - isStateOK bool // whether the state is okay based on events or not - isGrouped bool // am i contained within a group? - grouped []Res // list of any grouped resources + Name string `yaml:"name"` + Meta MetaParams `yaml:"meta"` // struct of all the metaparams + kind string + events chan Event + converger Converger // converged tracking + state resState + watching bool // is Watch() loop running ? + isStateOK bool // whether the state is okay based on events or not + isGrouped bool // am i contained within a group? + grouped []Res // list of any grouped resources } // wraps the IFF method when used with a list of UUID's @@ -185,17 +170,9 @@ func (obj *BaseRes) GetMeta() MetaParams { return obj.Meta } -func (obj *BaseRes) GetVertex() *Vertex { - return obj.vertex -} - -func (obj *BaseRes) SetVertex(v *Vertex) { - obj.vertex = v -} - -func (obj *BaseRes) SetConvergedCallback(ctimeout int, converged chan bool) { - obj.ctimeout = ctimeout - obj.converged = converged +// AssociateData associates some data with the object in question +func (obj *BaseRes) AssociateData(converger Converger) { + obj.converger = converger } // is the Watch() function running? @@ -208,14 +185,6 @@ func (obj *BaseRes) SetWatching(b bool) { obj.watching = b } -func (obj *BaseRes) GetConvergedState() resConvergedState { - return obj.convergedState -} - -func (obj *BaseRes) SetConvergedState(state resConvergedState) { - obj.convergedState = state -} - func (obj *BaseRes) GetState() resState { return obj.state } diff --git a/svc.go b/svc.go index 1b308f4b..7576aefe 100644 --- a/svc.go +++ b/svc.go @@ -73,9 +73,10 @@ func (obj *SvcRes) Watch(processChan chan Event) { } obj.SetWatching(true) defer obj.SetWatching(false) + cuuid := obj.converger.Register() + defer cuuid.Unregister() // obj.Name: svc name - //vertex := obj.GetVertex() // stored with SetVertex if !systemdUtil.IsRunningSystemd() { log.Fatal("Systemd is not running.") } @@ -145,12 +146,12 @@ func (obj *SvcRes) Watch(processChan chan Event) { obj.SetState(resStateWatching) // reset select { case _ = <-buschan: // XXX wait for new units event to unstick - obj.SetConvergedState(resConvergedNil) + cuuid.SetConverged(false) // loop so that we can see the changed invalid signal log.Printf("Svc[%v]->DaemonReload()", svc) case event := <-obj.events: - obj.SetConvergedState(resConvergedNil) + cuuid.SetConverged(false) if exit, send = obj.ReadEvent(&event); exit { return // exit } @@ -158,9 +159,8 @@ func (obj *SvcRes) Watch(processChan chan Event) { dirty = true } - case _ = <-TimeAfterOrBlock(obj.ctimeout): - obj.SetConvergedState(resConvergedTimeout) - obj.converged <- true + case _ = <-cuuid.ConvergedTimer(): + cuuid.SetConverged(true) // converged! continue } } else { @@ -193,19 +193,23 @@ func (obj *SvcRes) Watch(processChan chan Event) { dirty = true case err := <-subErrors: - obj.SetConvergedState(resConvergedNil) // XXX ? + cuuid.SetConverged(false) // XXX ? log.Printf("error: %v", err) log.Fatal(err) //vertex.events <- fmt.Sprintf("svc: %v", "error") // XXX: how should we handle errors? case event := <-obj.events: - obj.SetConvergedState(resConvergedNil) + cuuid.SetConverged(false) if exit, send = obj.ReadEvent(&event); exit { return // exit } if event.GetActivity() { dirty = true } + + case _ = <-cuuid.ConvergedTimer(): + cuuid.SetConverged(true) // converged! + continue } }