Limit the number of initial start poke's required
Every graph needs each vertex to have a change to run initially (after it has started up) so that initial state detection can be applied to fix anything that happened while the program was not running. We used to poke every vertex which was unnecessary, when in fact we only need to poke the set of vertices that are the minimum set of ultimate pre-requisites for every other vertex in the graph. That way, you're either poked directly, or poked by someone who was, etc... It turns out we don't need Dilworth's theorem, and that looking at vertices with an indegree of 0 is enough (I think it is a special case when we have a DAG). This also fixes a goroutine start scheduling race by ensuring the initial pokes are received!
This commit is contained in:
30
pgraph.go
30
pgraph.go
@@ -28,6 +28,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
//go:generate stringer -type=graphState -output=graphstate_stringer.go
|
//go:generate stringer -type=graphState -output=graphstate_stringer.go
|
||||||
@@ -422,7 +423,8 @@ func (g *Graph) GetDisconnectedGraphs() chan *Graph {
|
|||||||
return ch
|
return ch
|
||||||
}
|
}
|
||||||
|
|
||||||
// return the indegree for the graph
|
// return the indegree for the graph, IOW the count of vertices that point to me
|
||||||
|
// NOTE: this returns the values for all vertices in one big lookup table
|
||||||
func (g *Graph) InDegree() map[*Vertex]int {
|
func (g *Graph) InDegree() map[*Vertex]int {
|
||||||
result := make(map[*Vertex]int)
|
result := make(map[*Vertex]int)
|
||||||
for k := range g.Adjacency {
|
for k := range g.Adjacency {
|
||||||
@@ -437,7 +439,8 @@ func (g *Graph) InDegree() map[*Vertex]int {
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// return the outdegree for the graph
|
// return the outdegree for the graph, IOW the count of vertices that point away
|
||||||
|
// NOTE: this returns the values for all vertices in one big lookup table
|
||||||
func (g *Graph) OutDegree() map[*Vertex]int {
|
func (g *Graph) OutDegree() map[*Vertex]int {
|
||||||
result := make(map[*Vertex]int)
|
result := make(map[*Vertex]int)
|
||||||
|
|
||||||
@@ -542,6 +545,7 @@ func HeisenbergCount(ch chan *Vertex) int {
|
|||||||
// main kick to start the graph
|
// main kick to start the graph
|
||||||
func (g *Graph) Start(wg *sync.WaitGroup) { // start or continue
|
func (g *Graph) Start(wg *sync.WaitGroup) { // start or continue
|
||||||
t, _ := g.TopologicalSort()
|
t, _ := g.TopologicalSort()
|
||||||
|
indegree := g.InDegree() // compute all of the indegree's
|
||||||
for _, v := range Reverse(t) {
|
for _, v := range Reverse(t) {
|
||||||
|
|
||||||
if !v.Type.IsWatching() { // if Watch() is not running...
|
if !v.Type.IsWatching() { // if Watch() is not running...
|
||||||
@@ -555,9 +559,27 @@ func (g *Graph) Start(wg *sync.WaitGroup) { // start or continue
|
|||||||
}(v)
|
}(v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// selective poke: here we reduce the number of initial pokes
|
||||||
|
// to the minimum required to activate every vertex in the
|
||||||
|
// graph, either by direct action, or by getting poked by a
|
||||||
|
// vertex that was previously activated. if we poke each vertex
|
||||||
|
// that has no incoming edges, then we can be sure to reach the
|
||||||
|
// whole graph. Please note: this may mask certain optimization
|
||||||
|
// failures, such as any poke limiting code in Poke() or
|
||||||
|
// BackPoke(). You might want to disable this selective start
|
||||||
|
// when experimenting with and testing those elements.
|
||||||
|
if indegree[v] == 0 {
|
||||||
// ensure state is started before continuing on to next vertex
|
// ensure state is started before continuing on to next vertex
|
||||||
v.Type.SendEvent(eventStart, true, false)
|
for !v.Type.SendEvent(eventStart, true, false) {
|
||||||
|
if DEBUG {
|
||||||
|
// if SendEvent fails, we aren't up yet
|
||||||
|
log.Printf("%v[%v]: Retrying SendEvent(Start)", v.GetType(), v.GetName())
|
||||||
|
// sleep here briefly or otherwise cause
|
||||||
|
// a different goroutine to be scheduled
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user