Make sure to unpause all elements when resuming

The indegree code added a regression because elements with an indegree
would not be unpaused! This is now corrected. Time to add more tests :)
This commit is contained in:
James Shubin
2016-01-19 22:01:51 -05:00
parent 9c18972af4
commit 8308680a50
6 changed files with 30 additions and 25 deletions

View File

@@ -104,6 +104,7 @@ func (obj *ExecType) Watch() {
defer obj.SetWatching(false) defer obj.SetWatching(false)
var send = false // send event? var send = false // send event?
var exit = false
bufioch, errch := make(chan string), make(chan error) bufioch, errch := make(chan string), make(chan error)
//vertex := obj.GetVertex() // stored with SetVertex //vertex := obj.GetVertex() // stored with SetVertex
@@ -171,10 +172,9 @@ func (obj *ExecType) Watch() {
case event := <-obj.events: case event := <-obj.events:
obj.SetConvergedState(typeConvergedNil) obj.SetConvergedState(typeConvergedNil)
if ok := obj.ReadEvent(&event); !ok { if exit, send = obj.ReadEvent(&event); exit {
return // exit return // exit
} }
send = true
case _ = <-TimeAfterOrBlock(obj.ctimeout): case _ = <-TimeAfterOrBlock(obj.ctimeout):
obj.SetConvergedState(typeConvergedTimeout) obj.SetConvergedState(typeConvergedTimeout)

View File

@@ -120,6 +120,7 @@ func (obj *FileType) Watch() {
var current string // current "watcher" location var current string // current "watcher" location
var delta_depth int // depth delta between watcher and event var delta_depth int // depth delta between watcher and event
var send = false // send event? var send = false // send event?
var exit = false
var dirty = false var dirty = false
for { for {
@@ -234,10 +235,9 @@ func (obj *FileType) Watch() {
case event := <-obj.events: case event := <-obj.events:
obj.SetConvergedState(typeConvergedNil) obj.SetConvergedState(typeConvergedNil)
if ok := obj.ReadEvent(&event); !ok { if exit, send = obj.ReadEvent(&event); exit {
return // exit return // exit
} }
send = true
//dirty = false // these events don't invalidate state //dirty = false // these events don't invalidate state
case _ = <-TimeAfterOrBlock(obj.ctimeout): case _ = <-TimeAfterOrBlock(obj.ctimeout):

View File

@@ -158,7 +158,7 @@ func run(c *cli.Context) {
// loops, we'll cause G.Pause(...) before we // loops, we'll cause G.Pause(...) before we
// even got going, thus causing nil pointer errors // even got going, thus causing nil pointer errors
log.Printf("State: %v -> %v", G.SetState(graphStarting), G.GetState()) log.Printf("State: %v -> %v", G.SetState(graphStarting), G.GetState())
G.Start(&wg) // sync G.Start(&wg, first) // sync
log.Printf("State: %v -> %v", G.SetState(graphStarted), G.GetState()) log.Printf("State: %v -> %v", G.SetState(graphStarted), G.GetState())
first = false first = false
} }

View File

@@ -543,8 +543,9 @@ func HeisenbergCount(ch chan *Vertex) int {
} }
// main kick to start the graph // main kick to start the graph
func (g *Graph) Start(wg *sync.WaitGroup) { // start or continue func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
t, _ := g.TopologicalSort() t, _ := g.TopologicalSort()
// TODO: only calculate indegree if `first` is true to save resources
indegree := g.InDegree() // compute all of the indegree's indegree := g.InDegree() // compute all of the indegree's
for _, v := range Reverse(t) { for _, v := range Reverse(t) {
@@ -568,7 +569,10 @@ func (g *Graph) Start(wg *sync.WaitGroup) { // start or continue
// failures, such as any poke limiting code in Poke() or // failures, such as any poke limiting code in Poke() or
// BackPoke(). You might want to disable this selective start // BackPoke(). You might want to disable this selective start
// when experimenting with and testing those elements. // when experimenting with and testing those elements.
if indegree[v] == 0 { // if we are unpausing (since it's not the first run of this
// function) we need to poke to *unpause* every graph vertex,
// and not just selectively the subset with no indegree.
if (!first) || indegree[v] == 0 {
// ensure state is started before continuing on to next vertex // ensure state is started before continuing on to next vertex
for !v.Type.SendEvent(eventStart, true, false) { for !v.Type.SendEvent(eventStart, true, false) {
if DEBUG { if DEBUG {

View File

@@ -82,6 +82,7 @@ func (obj *ServiceType) Watch() {
var service = fmt.Sprintf("%v.service", obj.Name) // systemd name var service = fmt.Sprintf("%v.service", obj.Name) // systemd name
var send = false // send event? var send = false // send event?
var exit = false
var dirty = false var dirty = false
var invalid = false // does the service exist or not? var invalid = false // does the service exist or not?
var previous bool // previous invalid value var previous bool // previous invalid value
@@ -132,13 +133,13 @@ func (obj *ServiceType) Watch() {
case event := <-obj.events: case event := <-obj.events:
obj.SetConvergedState(typeConvergedNil) obj.SetConvergedState(typeConvergedNil)
if ok := obj.ReadEvent(&event); !ok { if exit, send = obj.ReadEvent(&event); exit {
return // exit return // exit
} }
if event.GetActivity() { if event.GetActivity() {
dirty = true dirty = true
} }
send = true
case _ = <-TimeAfterOrBlock(obj.ctimeout): case _ = <-TimeAfterOrBlock(obj.ctimeout):
obj.SetConvergedState(typeConvergedTimeout) obj.SetConvergedState(typeConvergedTimeout)
obj.converged <- true obj.converged <- true
@@ -181,13 +182,12 @@ func (obj *ServiceType) Watch() {
case event := <-obj.events: case event := <-obj.events:
obj.SetConvergedState(typeConvergedNil) obj.SetConvergedState(typeConvergedNil)
if ok := obj.ReadEvent(&event); !ok { if exit, send = obj.ReadEvent(&event); exit {
return // exit return // exit
} }
if event.GetActivity() { if event.GetActivity() {
dirty = true dirty = true
} }
send = true
} }
} }

View File

@@ -255,22 +255,22 @@ func (obj *BaseType) SendEvent(event eventName, sync bool, activity bool) bool {
} }
} }
// process events when a select gets one // process events when a select gets one, this handles the pause code too!
// this handles the pause code too! // the return values specify if we should exit and poke respectively
func (obj *BaseType) ReadEvent(event *Event) bool { func (obj *BaseType) ReadEvent(event *Event) (exit, poke bool) {
event.ACK() event.ACK()
switch event.Name { switch event.Name {
case eventStart: case eventStart:
return true return false, true
case eventPoke: case eventPoke:
return true return false, true
case eventBackPoke: case eventBackPoke:
return true return false, true // forward poking in response to a back poke!
case eventExit: case eventExit:
return false return true, false
case eventPause: case eventPause:
// wait for next event to continue // wait for next event to continue
@@ -278,18 +278,19 @@ func (obj *BaseType) ReadEvent(event *Event) bool {
case e := <-obj.events: case e := <-obj.events:
e.ACK() e.ACK()
if e.Name == eventExit { if e.Name == eventExit {
return false return true, false
} else if e.Name == eventStart { // eventContinue } else if e.Name == eventStart { // eventContinue
return true return false, false // don't poke on unpause!
} else { } else {
log.Fatal("Unknown event: ", e) // if we get a poke event here, it's a bug!
log.Fatalf("%v[%v]: Unknown event: %v, while paused!", obj.GetType(), obj.GetName(), e)
} }
} }
default: default:
log.Fatal("Unknown event: ", event) log.Fatal("Unknown event: ", event)
} }
return false // required to keep the stupid go compiler happy return true, false // required to keep the stupid go compiler happy
} }
// useful for using as: return CleanState() in the StateOK functions when there // useful for using as: return CleanState() in the StateOK functions when there
@@ -355,16 +356,16 @@ func (obj *NoopType) Watch() {
//vertex := obj.vertex // stored with SetVertex //vertex := obj.vertex // stored with SetVertex
var send = false // send event? var send = false // send event?
var exit = false
for { for {
obj.SetState(typeWatching) // reset obj.SetState(typeWatching) // reset
select { select {
case event := <-obj.events: case event := <-obj.events:
obj.SetConvergedState(typeConvergedNil) obj.SetConvergedState(typeConvergedNil)
if ok := obj.ReadEvent(&event); !ok { // we avoid sending events on unpause
if exit, send = obj.ReadEvent(&event); exit {
return // exit return // exit
} }
// XXX: should we avoid sending events on UNPAUSE ?
send = true
case _ = <-TimeAfterOrBlock(obj.ctimeout): case _ = <-TimeAfterOrBlock(obj.ctimeout):
obj.SetConvergedState(typeConvergedTimeout) obj.SetConvergedState(typeConvergedTimeout)