From c57946e29b4811f65b270bde31b6dcdfa810a14e Mon Sep 17 00:00:00 2001 From: James Shubin Date: Mon, 11 Jan 2016 16:20:32 -0500 Subject: [PATCH] Fix dependency issue * Fix Process() object calling * Add PokeParent() to poke upwards * Break linear exec chains :( This was the issue where in a graph f1 -> f2, if you were to rm f2 && cat f2, then f2 would not come back because we didn't poke upwards to refresh the timestamp. Unfortunately this adds another bug which we solve in a later patch. --- examples/graph1a.yaml | 22 +++++++++++++++++++ examples/graph1b.yaml | 22 +++++++++++++++++++ exec.go | 2 +- file.go | 2 +- service.go | 2 +- types.go | 51 ++++++++++++++++++++++++++++++------------- 6 files changed, 83 insertions(+), 18 deletions(-) create mode 100644 examples/graph1a.yaml create mode 100644 examples/graph1b.yaml diff --git a/examples/graph1a.yaml b/examples/graph1a.yaml new file mode 100644 index 00000000..77db8396 --- /dev/null +++ b/examples/graph1a.yaml @@ -0,0 +1,22 @@ +--- +graph: mygraph +types: + file: + - name: file1 + path: "/tmp/mgmt/f1" + content: | + i am f1 + state: exists + - name: file2 + path: "/tmp/mgmt/f2" + content: | + i am f2 + state: exists +edges: +- name: e1 + from: + type: file + name: file1 + to: + type: file + name: file2 diff --git a/examples/graph1b.yaml b/examples/graph1b.yaml new file mode 100644 index 00000000..32650283 --- /dev/null +++ b/examples/graph1b.yaml @@ -0,0 +1,22 @@ +--- +graph: mygraph +types: + file: + - name: file2 + path: "/tmp/mgmt/f2" + content: | + i am f2 + state: exists + - name: file3 + path: "/tmp/mgmt/f3" + content: | + i am f3 + state: exists +edges: +- name: e2 + from: + type: file + name: file2 + to: + type: file + name: file3 diff --git a/exec.go b/exec.go index 5a45f50b..3d7dfdd0 100644 --- a/exec.go +++ b/exec.go @@ -183,7 +183,7 @@ func (obj *ExecType) Watch() { if send { send = false obj.isStateOK = false // something made state dirty - obj.Process(obj) // XXX: rename this function + Process(obj) // XXX: rename this function } } } diff --git a/file.go b/file.go index e7382e25..42d5a614 100644 --- a/file.go +++ b/file.go @@ -235,7 +235,7 @@ func (obj *FileType) Watch() { // do all our event sending all together to avoid duplicate msgs if send { send = false - obj.Process(obj) // XXX: rename this function + Process(obj) // XXX: rename this function } } } diff --git a/service.go b/service.go index 140804aa..16fddc7f 100644 --- a/service.go +++ b/service.go @@ -182,7 +182,7 @@ func (obj *ServiceType) Watch() { if send { send = false - obj.Process(obj) // XXX: rename this function + Process(obj) // XXX: rename this function } } diff --git a/types.go b/types.go index bbd24b27..8d02a7e8 100644 --- a/types.go +++ b/types.go @@ -48,7 +48,9 @@ type Type interface { SetState(typeState) GetTimestamp() int64 UpdateTimestamp() int64 - //Process() + OKTimestamp() bool + Poke() + ParentPoke() } type BaseType struct { @@ -145,21 +147,39 @@ func (obj *BaseType) OKTimestamp() bool { // then we can't run right now... // if they're equal (eg: on init of 0) then we also can't run // b/c we should let our pre-req's go first... - if obj.GetTimestamp() >= n.Type.GetTimestamp() { + x, y := obj.GetTimestamp(), n.Type.GetTimestamp() + if x >= y { return false } } return true } -func (obj *BaseType) Poke() bool { // XXX: how can this ever fail and return false? eg: when is a poke not possible and should be rescheduled? +// notify nodes after me in the dependency graph that they need refreshing... +// NOTE: this assumes that this can never fail or need to be rescheduled +func (obj *BaseType) Poke() { v := obj.GetVertex() g := v.GetGraph() // these are all the vertices pointing AWAY FROM v, eg: v -> ??? for _, n := range g.OutgoingGraphEdges(v) { n.SendEvent(eventPoke, false) // XXX: should this be sync or not? XXX: try it as async for now, but switch to sync and see if we deadlock -- maybe it's possible, i don't know for sure yet } - return true +} + +// poke the pre-requisites that are stale and need to run before I can run... +func (obj *BaseType) ParentPoke() { + v := obj.GetVertex() + g := v.GetGraph() + // these are all the vertices pointing TO v, eg: ??? -> v + for _, n := range g.IncomingGraphEdges(v) { + x, y := obj.GetTimestamp(), n.Type.GetTimestamp() + if x >= y { + if DEBUG { + log.Printf("ParentPoke: From(%v): To(%v)", v.GetName(), n.GetName()) + } + n.SendEvent(eventPoke, false) // XXX: should this be sync or not? XXX: try it as async for now, but switch to sync and see if we deadlock -- maybe it's possible, i don't know for sure yet + } + } } // push an event into the message queue for a particular type vertex @@ -216,7 +236,7 @@ func (obj *BaseType) ReadEvent(event *Event) bool { } // XXX: rename this function -func (obj *BaseType) Process(typ Type) { +func Process(obj Type) { if DEBUG { log.Printf("%v[%v]: Process()", obj.GetType(), obj.GetName()) } @@ -228,27 +248,28 @@ func (obj *BaseType) Process(typ Type) { if DEBUG { log.Printf("%v[%v]: OKTimestamp(%v)", obj.GetType(), obj.GetName(), obj.GetTimestamp()) } - // XXX XXX: why does this have to be typ instead of just obj! "obj.StateOK undefined (type *BaseType has no field or method StateOK)" - if !typ.StateOK() { // TODO: can we rename this to something better? + if !obj.StateOK() { // TODO: can we rename this to something better? if DEBUG { log.Printf("%v[%v]: !StateOK()", obj.GetType(), obj.GetName()) } // throw an error if apply fails... // if this fails, don't UpdateTimestamp() - if !typ.Apply() { // check for error + if !obj.Apply() { // check for error ok = false } } if ok { - // if poke fails, don't update timestamp - // since we didn't propagate the pokes! - if obj.Poke() { - obj.UpdateTimestamp() // this was touched... - } + // update this timestamp *before* we poke or the poked + // nodes might fail due to having a too old timestamp! + obj.UpdateTimestamp() // this was touched... + obj.Poke() } + // poke at our pre-req's instead since they need to refresh/run... + } else { + // only poke at the pre-req's that need to run + go obj.ParentPoke() } - } func (obj *NoopType) GetType() string { @@ -283,7 +304,7 @@ func (obj *NoopType) Watch() { if send { send = false - obj.Process(obj) // XXX: rename this function + Process(obj) // XXX: rename this function } } }