Fix dependency issue
* Fix Process() object calling * Add PokeParent() to poke upwards * Break linear exec chains :( This was the issue where in a graph f1 -> f2, if you were to rm f2 && cat f2, then f2 would not come back because we didn't poke upwards to refresh the timestamp. Unfortunately this adds another bug which we solve in a later patch.
This commit is contained in:
22
examples/graph1a.yaml
Normal file
22
examples/graph1a.yaml
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
---
|
||||||
|
graph: mygraph
|
||||||
|
types:
|
||||||
|
file:
|
||||||
|
- name: file1
|
||||||
|
path: "/tmp/mgmt/f1"
|
||||||
|
content: |
|
||||||
|
i am f1
|
||||||
|
state: exists
|
||||||
|
- name: file2
|
||||||
|
path: "/tmp/mgmt/f2"
|
||||||
|
content: |
|
||||||
|
i am f2
|
||||||
|
state: exists
|
||||||
|
edges:
|
||||||
|
- name: e1
|
||||||
|
from:
|
||||||
|
type: file
|
||||||
|
name: file1
|
||||||
|
to:
|
||||||
|
type: file
|
||||||
|
name: file2
|
||||||
22
examples/graph1b.yaml
Normal file
22
examples/graph1b.yaml
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
---
|
||||||
|
graph: mygraph
|
||||||
|
types:
|
||||||
|
file:
|
||||||
|
- name: file2
|
||||||
|
path: "/tmp/mgmt/f2"
|
||||||
|
content: |
|
||||||
|
i am f2
|
||||||
|
state: exists
|
||||||
|
- name: file3
|
||||||
|
path: "/tmp/mgmt/f3"
|
||||||
|
content: |
|
||||||
|
i am f3
|
||||||
|
state: exists
|
||||||
|
edges:
|
||||||
|
- name: e2
|
||||||
|
from:
|
||||||
|
type: file
|
||||||
|
name: file2
|
||||||
|
to:
|
||||||
|
type: file
|
||||||
|
name: file3
|
||||||
2
exec.go
2
exec.go
@@ -183,7 +183,7 @@ func (obj *ExecType) Watch() {
|
|||||||
if send {
|
if send {
|
||||||
send = false
|
send = false
|
||||||
obj.isStateOK = false // something made state dirty
|
obj.isStateOK = false // something made state dirty
|
||||||
obj.Process(obj) // XXX: rename this function
|
Process(obj) // XXX: rename this function
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
2
file.go
2
file.go
@@ -235,7 +235,7 @@ func (obj *FileType) Watch() {
|
|||||||
// do all our event sending all together to avoid duplicate msgs
|
// do all our event sending all together to avoid duplicate msgs
|
||||||
if send {
|
if send {
|
||||||
send = false
|
send = false
|
||||||
obj.Process(obj) // XXX: rename this function
|
Process(obj) // XXX: rename this function
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -182,7 +182,7 @@ func (obj *ServiceType) Watch() {
|
|||||||
|
|
||||||
if send {
|
if send {
|
||||||
send = false
|
send = false
|
||||||
obj.Process(obj) // XXX: rename this function
|
Process(obj) // XXX: rename this function
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
51
types.go
51
types.go
@@ -48,7 +48,9 @@ type Type interface {
|
|||||||
SetState(typeState)
|
SetState(typeState)
|
||||||
GetTimestamp() int64
|
GetTimestamp() int64
|
||||||
UpdateTimestamp() int64
|
UpdateTimestamp() int64
|
||||||
//Process()
|
OKTimestamp() bool
|
||||||
|
Poke()
|
||||||
|
ParentPoke()
|
||||||
}
|
}
|
||||||
|
|
||||||
type BaseType struct {
|
type BaseType struct {
|
||||||
@@ -145,21 +147,39 @@ func (obj *BaseType) OKTimestamp() bool {
|
|||||||
// then we can't run right now...
|
// then we can't run right now...
|
||||||
// if they're equal (eg: on init of 0) then we also can't run
|
// if they're equal (eg: on init of 0) then we also can't run
|
||||||
// b/c we should let our pre-req's go first...
|
// b/c we should let our pre-req's go first...
|
||||||
if obj.GetTimestamp() >= n.Type.GetTimestamp() {
|
x, y := obj.GetTimestamp(), n.Type.GetTimestamp()
|
||||||
|
if x >= y {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (obj *BaseType) Poke() bool { // XXX: how can this ever fail and return false? eg: when is a poke not possible and should be rescheduled?
|
// notify nodes after me in the dependency graph that they need refreshing...
|
||||||
|
// NOTE: this assumes that this can never fail or need to be rescheduled
|
||||||
|
func (obj *BaseType) Poke() {
|
||||||
v := obj.GetVertex()
|
v := obj.GetVertex()
|
||||||
g := v.GetGraph()
|
g := v.GetGraph()
|
||||||
// these are all the vertices pointing AWAY FROM v, eg: v -> ???
|
// these are all the vertices pointing AWAY FROM v, eg: v -> ???
|
||||||
for _, n := range g.OutgoingGraphEdges(v) {
|
for _, n := range g.OutgoingGraphEdges(v) {
|
||||||
n.SendEvent(eventPoke, false) // XXX: should this be sync or not? XXX: try it as async for now, but switch to sync and see if we deadlock -- maybe it's possible, i don't know for sure yet
|
n.SendEvent(eventPoke, false) // XXX: should this be sync or not? XXX: try it as async for now, but switch to sync and see if we deadlock -- maybe it's possible, i don't know for sure yet
|
||||||
}
|
}
|
||||||
return true
|
}
|
||||||
|
|
||||||
|
// poke the pre-requisites that are stale and need to run before I can run...
|
||||||
|
func (obj *BaseType) ParentPoke() {
|
||||||
|
v := obj.GetVertex()
|
||||||
|
g := v.GetGraph()
|
||||||
|
// these are all the vertices pointing TO v, eg: ??? -> v
|
||||||
|
for _, n := range g.IncomingGraphEdges(v) {
|
||||||
|
x, y := obj.GetTimestamp(), n.Type.GetTimestamp()
|
||||||
|
if x >= y {
|
||||||
|
if DEBUG {
|
||||||
|
log.Printf("ParentPoke: From(%v): To(%v)", v.GetName(), n.GetName())
|
||||||
|
}
|
||||||
|
n.SendEvent(eventPoke, false) // XXX: should this be sync or not? XXX: try it as async for now, but switch to sync and see if we deadlock -- maybe it's possible, i don't know for sure yet
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// push an event into the message queue for a particular type vertex
|
// push an event into the message queue for a particular type vertex
|
||||||
@@ -216,7 +236,7 @@ func (obj *BaseType) ReadEvent(event *Event) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// XXX: rename this function
|
// XXX: rename this function
|
||||||
func (obj *BaseType) Process(typ Type) {
|
func Process(obj Type) {
|
||||||
if DEBUG {
|
if DEBUG {
|
||||||
log.Printf("%v[%v]: Process()", obj.GetType(), obj.GetName())
|
log.Printf("%v[%v]: Process()", obj.GetType(), obj.GetName())
|
||||||
}
|
}
|
||||||
@@ -228,27 +248,28 @@ func (obj *BaseType) Process(typ Type) {
|
|||||||
if DEBUG {
|
if DEBUG {
|
||||||
log.Printf("%v[%v]: OKTimestamp(%v)", obj.GetType(), obj.GetName(), obj.GetTimestamp())
|
log.Printf("%v[%v]: OKTimestamp(%v)", obj.GetType(), obj.GetName(), obj.GetTimestamp())
|
||||||
}
|
}
|
||||||
// XXX XXX: why does this have to be typ instead of just obj! "obj.StateOK undefined (type *BaseType has no field or method StateOK)"
|
if !obj.StateOK() { // TODO: can we rename this to something better?
|
||||||
if !typ.StateOK() { // TODO: can we rename this to something better?
|
|
||||||
if DEBUG {
|
if DEBUG {
|
||||||
log.Printf("%v[%v]: !StateOK()", obj.GetType(), obj.GetName())
|
log.Printf("%v[%v]: !StateOK()", obj.GetType(), obj.GetName())
|
||||||
}
|
}
|
||||||
// throw an error if apply fails...
|
// throw an error if apply fails...
|
||||||
// if this fails, don't UpdateTimestamp()
|
// if this fails, don't UpdateTimestamp()
|
||||||
if !typ.Apply() { // check for error
|
if !obj.Apply() { // check for error
|
||||||
ok = false
|
ok = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ok {
|
if ok {
|
||||||
// if poke fails, don't update timestamp
|
// update this timestamp *before* we poke or the poked
|
||||||
// since we didn't propagate the pokes!
|
// nodes might fail due to having a too old timestamp!
|
||||||
if obj.Poke() {
|
obj.UpdateTimestamp() // this was touched...
|
||||||
obj.UpdateTimestamp() // this was touched...
|
obj.Poke()
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
// poke at our pre-req's instead since they need to refresh/run...
|
||||||
|
} else {
|
||||||
|
// only poke at the pre-req's that need to run
|
||||||
|
go obj.ParentPoke()
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (obj *NoopType) GetType() string {
|
func (obj *NoopType) GetType() string {
|
||||||
@@ -283,7 +304,7 @@ func (obj *NoopType) Watch() {
|
|||||||
if send {
|
if send {
|
||||||
send = false
|
send = false
|
||||||
|
|
||||||
obj.Process(obj) // XXX: rename this function
|
Process(obj) // XXX: rename this function
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user