Fixup state related items

* Fixup graph state readability
* Rename original SetState() to SetConvergedState() and friends...
* Add type state management for proper BackPoke() commands...
* Add better DEBUG logging

This is an important optimization that prevents running a BackPoke on a
parent which is in the process of running and will most certainly poke
the caller back in a moment. This avoids unnecessary roundtrips.
Unfortunately, there are still other algorithms required so that races
can't cause the graph to run for longer than necessary.
This commit is contained in:
James Shubin
2016-01-12 04:30:35 -05:00
parent c57946e29b
commit 4c6647d807
8 changed files with 156 additions and 68 deletions

20
etcd.go
View File

@@ -37,11 +37,11 @@ const (
etcdBar
)
//go:generate stringer -type=etcdState -output=etcdstate_stringer.go
type etcdState int
//go:generate stringer -type=etcdConvergedState -output=etcdconvergedstate_stringer.go
type etcdConvergedState int
const (
etcdNil etcdState = iota
etcdConvergedNil etcdConvergedState = iota
//etcdConverged
etcdConvergedTimeout
)
@@ -51,15 +51,15 @@ type EtcdWObject struct { // etcd wrapper object
ctimeout int
converged chan bool
kapi etcd.KeysAPI
state etcdState
convergedState etcdConvergedState
}
func (obj *EtcdWObject) GetState() etcdState {
return obj.state
func (obj *EtcdWObject) GetConvergedState() etcdConvergedState {
return obj.convergedState
}
func (obj *EtcdWObject) SetState(state etcdState) {
obj.state = state
func (obj *EtcdWObject) SetConvergedState(state etcdConvergedState) {
obj.convergedState = state
}
func (etcdO *EtcdWObject) GetKAPI() etcd.KeysAPI {
@@ -134,11 +134,11 @@ func (etcdO *EtcdWObject) EtcdWatch() chan etcdMsg {
var err error = nil
select {
case out := <-etcdch:
etcdO.SetState(etcdNil)
etcdO.SetConvergedState(etcdConvergedNil)
resp, err = out.resp, out.err
case _ = <-TimeAfterOrBlock(ctimeout):
etcdO.SetState(etcdConvergedTimeout)
etcdO.SetConvergedState(etcdConvergedTimeout)
converged <- true
continue
}

32
examples/graph8a.yaml Normal file
View File

@@ -0,0 +1,32 @@
---
graph: mygraph
types:
exec:
- name: exec1
cmd: sleep 10s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
pollint: 0
state: present
- name: exec2
cmd: sleep 10s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
pollint: 0
state: present
edges:
- name: e1
from:
type: exec
name: exec1
to:
type: exec
name: exec2

10
exec.go
View File

@@ -144,10 +144,10 @@ func (obj *ExecType) Watch() {
}
for {
obj.SetState(typeWatching) // reset
select {
case text := <-bufioch:
obj.SetState(typeNil)
obj.SetConvergedState(typeConvergedNil)
// each time we get a line of output, we loop!
log.Printf("%v[%v]: Watch output: %s", obj.GetType(), obj.GetName(), text)
if text != "" {
@@ -155,7 +155,7 @@ func (obj *ExecType) Watch() {
}
case err := <-errch:
obj.SetState(typeNil) // XXX ?
obj.SetConvergedState(typeConvergedNil) // XXX ?
if err == nil { // EOF
// FIXME: add an "if watch command ends/crashes"
// restart or generate error option
@@ -167,14 +167,14 @@ func (obj *ExecType) Watch() {
// XXX: how should we handle errors?
case event := <-obj.events:
obj.SetState(typeNil)
obj.SetConvergedState(typeConvergedNil)
if ok := obj.ReadEvent(&event); !ok {
return // exit
}
send = true
case _ = <-TimeAfterOrBlock(obj.ctimeout):
obj.SetState(typeConvergedTimeout)
obj.SetConvergedState(typeConvergedTimeout)
obj.converged <- true
continue
}

20
file.go
View File

@@ -126,11 +126,15 @@ func (obj *FileType) Watch() {
if current == "" { // the empty string top is the root dir ("/")
current = "/"
}
log.Printf("Watching: %v", current) // attempting to watch...
if DEBUG {
log.Printf("File[%v]: Watching: %v", obj.GetName(), current) // attempting to watch...
}
// initialize in the loop so that we can reset on rm-ed handles
err = watcher.Add(current)
if err != nil {
if DEBUG {
log.Printf("File[%v]: watcher.Add(%v): Error: %v", obj.GetName(), current, err)
}
if err == syscall.ENOENT {
index-- // usually not found, move up one dir
} else if err == syscall.ENOSPC {
@@ -146,9 +150,13 @@ func (obj *FileType) Watch() {
continue
}
obj.SetState(typeWatching) // reset
select {
case event := <-watcher.Events:
obj.SetState(typeNil) // XXX: technically i can detect is the event is erroneous or not first
if DEBUG {
log.Printf("File[%v]: Watch(%v), Event(%v): %v", obj.GetName(), current, event.Name, event)
}
obj.SetConvergedState(typeConvergedNil) // XXX: technically i can detect if the event is erroneous or not first
// the deeper you go, the bigger the delta_depth is...
// this is the difference between what we're watching,
// and the event... doesn't mean we can't watch deeper
@@ -214,20 +222,20 @@ func (obj *FileType) Watch() {
}
case err := <-watcher.Errors:
obj.SetState(typeNil) // XXX ?
obj.SetConvergedState(typeConvergedNil) // XXX ?
log.Println("error:", err)
log.Fatal(err)
//obj.events <- fmt.Sprintf("file: %v", "error") // XXX: how should we handle errors?
case event := <-obj.events:
obj.SetState(typeNil)
obj.SetConvergedState(typeConvergedNil)
if ok := obj.ReadEvent(&event); !ok {
return // exit
}
send = true
case _ = <-TimeAfterOrBlock(obj.ctimeout):
obj.SetState(typeConvergedTimeout)
obj.SetConvergedState(typeConvergedTimeout)
obj.converged <- true
continue
}

20
main.go
View File

@@ -133,11 +133,9 @@ func run(c *cli.Context) {
// run graph vertex LOCK...
if !first { // XXX: we can flatten this check out I think
G.SetState(graphPausing)
log.Printf("State: %v", G.GetState())
log.Printf("State: %v -> %v", G.SetState(graphPausing), G.GetState())
G.Pause() // sync
G.SetState(graphPaused)
log.Printf("State: %v", G.GetState())
log.Printf("State: %v -> %v", G.SetState(graphPaused), G.GetState())
}
// build the graph from a config file
@@ -159,12 +157,9 @@ func run(c *cli.Context) {
// some are not ready yet and the EtcdWatch
// loops, we'll cause G.Pause(...) before we
// even got going, thus causing nil pointer errors
G.SetState(graphStarting)
log.Printf("State: %v", G.GetState())
log.Printf("State: %v -> %v", G.SetState(graphStarting), G.GetState())
G.Start(&wg) // sync
G.SetState(graphStarted)
log.Printf("State: %v", G.GetState())
log.Printf("State: %v -> %v", G.SetState(graphStarted), G.GetState())
first = false
}
}()
@@ -175,11 +170,11 @@ func run(c *cli.Context) {
for {
<-converged // when anyone says they have converged
if etcdO.GetState() != etcdConvergedTimeout {
if etcdO.GetConvergedState() != etcdConvergedTimeout {
continue
}
for v := range G.GetVerticesChan() {
if v.Type.GetState() != typeConvergedTimeout {
if v.Type.GetConvergedState() != typeConvergedTimeout {
continue ConvergedLoop
}
}
@@ -202,9 +197,6 @@ func run(c *cli.Context) {
G.Exit() // tell all the children to exit
if DEBUG {
for i := range G.GetVerticesChan() {
log.Printf("Vertex: %v", i)
}
log.Printf("Graph: %v", G)
}

View File

@@ -96,15 +96,18 @@ func (g *Graph) SetName(name string) {
}
func (g *Graph) GetState() graphState {
g.mutex.Lock()
defer g.mutex.Unlock()
//g.mutex.Lock()
//defer g.mutex.Unlock()
return g.state
}
func (g *Graph) SetState(state graphState) {
// set graph state and return previous state
func (g *Graph) SetState(state graphState) graphState {
g.mutex.Lock()
defer g.mutex.Unlock()
prev := g.GetState()
g.state = state
return prev
}
// store a pointer in the type to it's parent vertex

View File

@@ -121,20 +121,21 @@ func (obj *ServiceType) Watch() {
set.Remove(service) // no return value should ever occur
}
obj.SetState(typeWatching) // reset
select {
case _ = <-buschan: // XXX wait for new units event to unstick
obj.SetState(typeNil)
obj.SetConvergedState(typeConvergedNil)
// loop so that we can see the changed invalid signal
log.Printf("Service[%v]->DaemonReload()", service)
case event := <-obj.events:
obj.SetState(typeNil)
obj.SetConvergedState(typeConvergedNil)
if ok := obj.ReadEvent(&event); !ok {
return // exit
}
send = true
case _ = <-TimeAfterOrBlock(obj.ctimeout):
obj.SetState(typeConvergedTimeout)
obj.SetConvergedState(typeConvergedTimeout)
obj.converged <- true
continue
}
@@ -145,6 +146,7 @@ func (obj *ServiceType) Watch() {
}
log.Printf("Watching: %v", service) // attempting to watch...
obj.SetState(typeWatching) // reset
select {
case event := <-subChannel:
@@ -166,13 +168,13 @@ func (obj *ServiceType) Watch() {
send = true
case err := <-subErrors:
obj.SetState(typeNil) // XXX ?
obj.SetConvergedState(typeConvergedNil) // XXX ?
log.Println("error:", err)
log.Fatal(err)
//vertex.events <- fmt.Sprintf("service: %v", "error") // XXX: how should we handle errors?
case event := <-obj.events:
obj.SetState(typeNil)
obj.SetConvergedState(typeConvergedNil)
if ok := obj.ReadEvent(&event); !ok {
return // exit
}

View File

@@ -27,6 +27,17 @@ type typeState int
const (
typeNil typeState = iota
typeWatching
typeEvent // an event has happened, but we haven't poked yet
typeApplying
typePoking
)
//go:generate stringer -type=typeConvergedState -output=typeconvergedstate_stringer.go
type typeConvergedState int
const (
typeConvergedNil typeConvergedState = iota
//typeConverged
typeConvergedTimeout
)
@@ -44,13 +55,15 @@ type Type interface {
SendEvent(eventName, bool)
IsWatching() bool
SetWatching(bool)
GetConvergedState() typeConvergedState
SetConvergedState(typeConvergedState)
GetState() typeState
SetState(typeState)
GetTimestamp() int64
UpdateTimestamp() int64
OKTimestamp() bool
Poke()
ParentPoke()
BackPoke()
}
type BaseType struct {
@@ -59,6 +72,7 @@ type BaseType struct {
events chan Event
vertex *Vertex
state typeState
convergedState typeConvergedState
watching bool // is Watch() loop running ?
ctimeout int // converged timeout
converged chan bool
@@ -118,11 +132,22 @@ func (obj *BaseType) SetWatching(b bool) {
obj.watching = b
}
func (obj *BaseType) GetConvergedState() typeConvergedState {
return obj.convergedState
}
func (obj *BaseType) SetConvergedState(state typeConvergedState) {
obj.convergedState = state
}
func (obj *BaseType) GetState() typeState {
return obj.state
}
func (obj *BaseType) SetState(state typeState) {
if DEBUG {
log.Printf("%v[%v]: State: %v -> %v", obj.GetType(), obj.GetName(), obj.GetState(), state)
}
obj.state = state
}
@@ -148,6 +173,9 @@ func (obj *BaseType) OKTimestamp() bool {
// if they're equal (eg: on init of 0) then we also can't run
// b/c we should let our pre-req's go first...
x, y := obj.GetTimestamp(), n.Type.GetTimestamp()
if DEBUG {
log.Printf("%v[%v]: OKTimestamp: (%v) >= %v[%v](%v): %v", obj.GetType(), obj.GetName(), x, n.GetType(), n.GetName(), y, !(x >= y))
}
if x >= y {
return false
}
@@ -162,22 +190,43 @@ func (obj *BaseType) Poke() {
g := v.GetGraph()
// these are all the vertices pointing AWAY FROM v, eg: v -> ???
for _, n := range g.OutgoingGraphEdges(v) {
// if we're in state event and haven't been cancelled by apply,
// then we can cancel a poke to a child XXX: right?
if n.Type.GetState() != typeEvent {
if DEBUG {
log.Printf("%v[%v]: Poke: %v[%v]", v.GetType(), v.GetName(), n.GetType(), n.GetName())
}
n.SendEvent(eventPoke, false) // XXX: should this be sync or not? XXX: try it as async for now, but switch to sync and see if we deadlock -- maybe it's possible, i don't know for sure yet
} else {
if DEBUG {
log.Printf("%v[%v]: Poke: %v[%v]: Skipped!", v.GetType(), v.GetName(), n.GetType(), n.GetName())
}
}
}
}
// poke the pre-requisites that are stale and need to run before I can run...
func (obj *BaseType) ParentPoke() {
func (obj *BaseType) BackPoke() {
v := obj.GetVertex()
g := v.GetGraph()
// these are all the vertices pointing TO v, eg: ??? -> v
for _, n := range g.IncomingGraphEdges(v) {
x, y := obj.GetTimestamp(), n.Type.GetTimestamp()
if x >= y {
x, y, s := obj.GetTimestamp(), n.Type.GetTimestamp(), n.Type.GetState()
// if the parent timestamp needs poking AND it's not in state
// typeEvent, then poke it. If the parent is in typeEvent it
// means that an event is pending, so we'll be expecting a poke
// back soon, so we can safely discard the extra parent poke...
// TODO: implement a stateLT (less than) to tell if something
// happens earlier in the state cycle and that doesn't wrap nil
if x >= y && (s != typeEvent && s != typeApplying) {
if DEBUG {
log.Printf("ParentPoke: From(%v): To(%v)", v.GetName(), n.GetName())
log.Printf("%v[%v]: BackPoke: %v[%v]", v.GetType(), v.GetName(), n.GetType(), n.GetName())
}
n.SendEvent(eventPoke, false) // XXX: should this be sync or not? XXX: try it as async for now, but switch to sync and see if we deadlock -- maybe it's possible, i don't know for sure yet
} else {
if DEBUG {
log.Printf("%v[%v]: BackPoke: %v[%v]: Skipped!", v.GetType(), v.GetName(), n.GetType(), n.GetName())
}
}
}
}
@@ -203,7 +252,6 @@ func (obj *BaseType) SendEvent(event eventName, sync bool) {
// process events when a select gets one
// this handles the pause code too!
func (obj *BaseType) ReadEvent(event *Event) bool {
event.ACK()
switch event.Name {
case eventStart:
@@ -240,6 +288,7 @@ func Process(obj Type) {
if DEBUG {
log.Printf("%v[%v]: Process()", obj.GetType(), obj.GetName())
}
obj.SetState(typeEvent)
var ok bool = true
// is it okay to run dependency wise right now?
// if not, that's okay because when the dependency runs, it will poke
@@ -254,6 +303,7 @@ func Process(obj Type) {
}
// throw an error if apply fails...
// if this fails, don't UpdateTimestamp()
obj.SetState(typeApplying)
if !obj.Apply() { // check for error
ok = false
}
@@ -263,12 +313,13 @@ func Process(obj Type) {
// update this timestamp *before* we poke or the poked
// nodes might fail due to having a too old timestamp!
obj.UpdateTimestamp() // this was touched...
obj.SetState(typePoking) // can't cancel parent poke
obj.Poke()
}
// poke at our pre-req's instead since they need to refresh/run...
} else {
// only poke at the pre-req's that need to run
go obj.ParentPoke()
go obj.BackPoke()
}
}
@@ -286,16 +337,17 @@ func (obj *NoopType) Watch() {
//vertex := obj.vertex // stored with SetVertex
var send = false // send event?
for {
obj.SetState(typeWatching) // reset
select {
case event := <-obj.events:
obj.SetState(typeNil)
obj.SetConvergedState(typeConvergedNil)
if ok := obj.ReadEvent(&event); !ok {
return // exit
}
send = true
case _ = <-TimeAfterOrBlock(obj.ctimeout):
obj.SetState(typeConvergedTimeout)
obj.SetConvergedState(typeConvergedTimeout)
obj.converged <- true
continue
}
@@ -303,7 +355,6 @@ func (obj *NoopType) Watch() {
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
Process(obj) // XXX: rename this function
}
}