From 8308680a50a74bbd726f3a14c6202f9e1e3e03e8 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Tue, 19 Jan 2016 22:01:51 -0500 Subject: [PATCH] Make sure to unpause all elements when resuming The indegree code added a regression because elements with an indegree would not be unpaused! This is now corrected. Time to add more tests :) --- exec.go | 4 ++-- file.go | 4 ++-- main.go | 2 +- pgraph.go | 8 ++++++-- service.go | 8 ++++---- types.go | 29 +++++++++++++++-------------- 6 files changed, 30 insertions(+), 25 deletions(-) diff --git a/exec.go b/exec.go index d84683e6..552781e2 100644 --- a/exec.go +++ b/exec.go @@ -104,6 +104,7 @@ func (obj *ExecType) Watch() { defer obj.SetWatching(false) var send = false // send event? + var exit = false bufioch, errch := make(chan string), make(chan error) //vertex := obj.GetVertex() // stored with SetVertex @@ -171,10 +172,9 @@ func (obj *ExecType) Watch() { case event := <-obj.events: obj.SetConvergedState(typeConvergedNil) - if ok := obj.ReadEvent(&event); !ok { + if exit, send = obj.ReadEvent(&event); exit { return // exit } - send = true case _ = <-TimeAfterOrBlock(obj.ctimeout): obj.SetConvergedState(typeConvergedTimeout) diff --git a/file.go b/file.go index b1734a54..1577aa2e 100644 --- a/file.go +++ b/file.go @@ -120,6 +120,7 @@ func (obj *FileType) Watch() { var current string // current "watcher" location var delta_depth int // depth delta between watcher and event var send = false // send event? + var exit = false var dirty = false for { @@ -234,10 +235,9 @@ func (obj *FileType) Watch() { case event := <-obj.events: obj.SetConvergedState(typeConvergedNil) - if ok := obj.ReadEvent(&event); !ok { + if exit, send = obj.ReadEvent(&event); exit { return // exit } - send = true //dirty = false // these events don't invalidate state case _ = <-TimeAfterOrBlock(obj.ctimeout): diff --git a/main.go b/main.go index 6faf34b0..b293059e 100644 --- a/main.go +++ b/main.go @@ -158,7 +158,7 @@ func run(c *cli.Context) { // loops, we'll cause G.Pause(...) before we // even got going, thus causing nil pointer errors log.Printf("State: %v -> %v", G.SetState(graphStarting), G.GetState()) - G.Start(&wg) // sync + G.Start(&wg, first) // sync log.Printf("State: %v -> %v", G.SetState(graphStarted), G.GetState()) first = false } diff --git a/pgraph.go b/pgraph.go index 57027843..e9890089 100644 --- a/pgraph.go +++ b/pgraph.go @@ -543,8 +543,9 @@ func HeisenbergCount(ch chan *Vertex) int { } // main kick to start the graph -func (g *Graph) Start(wg *sync.WaitGroup) { // start or continue +func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue t, _ := g.TopologicalSort() + // TODO: only calculate indegree if `first` is true to save resources indegree := g.InDegree() // compute all of the indegree's for _, v := range Reverse(t) { @@ -568,7 +569,10 @@ func (g *Graph) Start(wg *sync.WaitGroup) { // start or continue // failures, such as any poke limiting code in Poke() or // BackPoke(). You might want to disable this selective start // when experimenting with and testing those elements. - if indegree[v] == 0 { + // if we are unpausing (since it's not the first run of this + // function) we need to poke to *unpause* every graph vertex, + // and not just selectively the subset with no indegree. + if (!first) || indegree[v] == 0 { // ensure state is started before continuing on to next vertex for !v.Type.SendEvent(eventStart, true, false) { if DEBUG { diff --git a/service.go b/service.go index 0eff14a7..36d5dd7a 100644 --- a/service.go +++ b/service.go @@ -82,6 +82,7 @@ func (obj *ServiceType) Watch() { var service = fmt.Sprintf("%v.service", obj.Name) // systemd name var send = false // send event? + var exit = false var dirty = false var invalid = false // does the service exist or not? var previous bool // previous invalid value @@ -132,13 +133,13 @@ func (obj *ServiceType) Watch() { case event := <-obj.events: obj.SetConvergedState(typeConvergedNil) - if ok := obj.ReadEvent(&event); !ok { + if exit, send = obj.ReadEvent(&event); exit { return // exit } if event.GetActivity() { dirty = true } - send = true + case _ = <-TimeAfterOrBlock(obj.ctimeout): obj.SetConvergedState(typeConvergedTimeout) obj.converged <- true @@ -181,13 +182,12 @@ func (obj *ServiceType) Watch() { case event := <-obj.events: obj.SetConvergedState(typeConvergedNil) - if ok := obj.ReadEvent(&event); !ok { + if exit, send = obj.ReadEvent(&event); exit { return // exit } if event.GetActivity() { dirty = true } - send = true } } diff --git a/types.go b/types.go index 0aeaf9dd..0845a71d 100644 --- a/types.go +++ b/types.go @@ -255,22 +255,22 @@ func (obj *BaseType) SendEvent(event eventName, sync bool, activity bool) bool { } } -// process events when a select gets one -// this handles the pause code too! -func (obj *BaseType) ReadEvent(event *Event) bool { +// process events when a select gets one, this handles the pause code too! +// the return values specify if we should exit and poke respectively +func (obj *BaseType) ReadEvent(event *Event) (exit, poke bool) { event.ACK() switch event.Name { case eventStart: - return true + return false, true case eventPoke: - return true + return false, true case eventBackPoke: - return true + return false, true // forward poking in response to a back poke! case eventExit: - return false + return true, false case eventPause: // wait for next event to continue @@ -278,18 +278,19 @@ func (obj *BaseType) ReadEvent(event *Event) bool { case e := <-obj.events: e.ACK() if e.Name == eventExit { - return false + return true, false } else if e.Name == eventStart { // eventContinue - return true + return false, false // don't poke on unpause! } else { - log.Fatal("Unknown event: ", e) + // if we get a poke event here, it's a bug! + log.Fatalf("%v[%v]: Unknown event: %v, while paused!", obj.GetType(), obj.GetName(), e) } } default: log.Fatal("Unknown event: ", event) } - return false // required to keep the stupid go compiler happy + return true, false // required to keep the stupid go compiler happy } // useful for using as: return CleanState() in the StateOK functions when there @@ -355,16 +356,16 @@ func (obj *NoopType) Watch() { //vertex := obj.vertex // stored with SetVertex var send = false // send event? + var exit = false for { obj.SetState(typeWatching) // reset select { case event := <-obj.events: obj.SetConvergedState(typeConvergedNil) - if ok := obj.ReadEvent(&event); !ok { + // we avoid sending events on unpause + if exit, send = obj.ReadEvent(&event); exit { return // exit } - // XXX: should we avoid sending events on UNPAUSE ? - send = true case _ = <-TimeAfterOrBlock(obj.ctimeout): obj.SetConvergedState(typeConvergedTimeout)