From ba6044e9e8d520f34fb81686cd8e48948f18f1b3 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Wed, 23 Nov 2016 23:22:27 -0500 Subject: [PATCH] resources, pgraph: split logical chunks into separate files --- pgraph/actions.go | 425 ++++++++++++++++++++++++++++++++++++ pgraph/graphviz.go | 110 ++++++++++ pgraph/pgraph.go | 485 ----------------------------------------- resources/resources.go | 81 ------- resources/sendrecv.go | 82 +++++++ 5 files changed, 617 insertions(+), 566 deletions(-) create mode 100644 pgraph/actions.go create mode 100644 pgraph/graphviz.go diff --git a/pgraph/actions.go b/pgraph/actions.go new file mode 100644 index 00000000..461dab47 --- /dev/null +++ b/pgraph/actions.go @@ -0,0 +1,425 @@ +// Mgmt +// Copyright (C) 2013-2016+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgraph + +import ( + "fmt" + "log" + "math" + "sync" + "time" + + "github.com/purpleidea/mgmt/event" + "github.com/purpleidea/mgmt/global" + "github.com/purpleidea/mgmt/resources" + + errwrap "github.com/pkg/errors" +) + +// GetTimestamp returns the timestamp of a vertex +func (v *Vertex) GetTimestamp() int64 { + return v.timestamp +} + +// UpdateTimestamp updates the timestamp on a vertex and returns the new value +func (v *Vertex) UpdateTimestamp() int64 { + v.timestamp = time.Now().UnixNano() // update + return v.timestamp +} + +// OKTimestamp returns true if this element can run right now? +func (g *Graph) OKTimestamp(v *Vertex) bool { + // these are all the vertices pointing TO v, eg: ??? -> v + for _, n := range g.IncomingGraphEdges(v) { + // if the vertex has a greater timestamp than any pre-req (n) + // then we can't run right now... + // if they're equal (eg: on init of 0) then we also can't run + // b/c we should let our pre-req's go first... + x, y := v.GetTimestamp(), n.GetTimestamp() + if global.DEBUG { + log.Printf("%s[%s]: OKTimestamp: (%v) >= %s[%s](%v): !%v", v.Kind(), v.GetName(), x, n.Kind(), n.GetName(), y, x >= y) + } + if x >= y { + return false + } + } + return true +} + +// Poke notifies nodes after me in the dependency graph that they need refreshing... +// NOTE: this assumes that this can never fail or need to be rescheduled +func (g *Graph) Poke(v *Vertex, activity bool) { + // these are all the vertices pointing AWAY FROM v, eg: v -> ??? + for _, n := range g.OutgoingGraphEdges(v) { + // XXX: if we're in state event and haven't been cancelled by + // apply, then we can cancel a poke to a child, right? XXX + // XXX: if n.Res.getState() != resources.ResStateEvent { // is this correct? + if true { // XXX + if global.DEBUG { + log.Printf("%s[%s]: Poke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) + } + n.SendEvent(event.EventPoke, false, activity) // XXX: can this be switched to sync? + } else { + if global.DEBUG { + log.Printf("%s[%s]: Poke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) + } + } + } +} + +// BackPoke pokes the pre-requisites that are stale and need to run before I can run. +func (g *Graph) BackPoke(v *Vertex) { + // these are all the vertices pointing TO v, eg: ??? -> v + for _, n := range g.IncomingGraphEdges(v) { + x, y, s := v.GetTimestamp(), n.GetTimestamp(), n.Res.GetState() + // if the parent timestamp needs poking AND it's not in state + // ResStateEvent, then poke it. If the parent is in ResStateEvent it + // means that an event is pending, so we'll be expecting a poke + // back soon, so we can safely discard the extra parent poke... + // TODO: implement a stateLT (less than) to tell if something + // happens earlier in the state cycle and that doesn't wrap nil + if x >= y && (s != resources.ResStateEvent && s != resources.ResStateCheckApply) { + if global.DEBUG { + log.Printf("%s[%s]: BackPoke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) + } + n.SendEvent(event.EventBackPoke, false, false) // XXX: can this be switched to sync? + } else { + if global.DEBUG { + log.Printf("%s[%s]: BackPoke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) + } + } + } +} + +// Process is the primary function to execute for a particular vertex in the graph. +func (g *Graph) Process(v *Vertex) error { + obj := v.Res + if global.DEBUG { + log.Printf("%s[%s]: Process()", obj.Kind(), obj.GetName()) + } + obj.SetState(resources.ResStateEvent) + var ok = true + var apply = false // did we run an apply? + // is it okay to run dependency wise right now? + // if not, that's okay because when the dependency runs, it will poke + // us back and we will run if needed then! + if g.OKTimestamp(v) { + if global.DEBUG { + log.Printf("%s[%s]: OKTimestamp(%v)", obj.Kind(), obj.GetName(), v.GetTimestamp()) + } + + obj.SetState(resources.ResStateCheckApply) + + // connect any senders to receivers and detect if values changed + if changed, err := obj.SendRecv(obj); err != nil { + return errwrap.Wrapf(err, "could not SendRecv in Process") + } else if changed { + obj.StateOK(false) // invalidate cache + } + + // if this fails, don't UpdateTimestamp() + checkok, err := obj.CheckApply(!obj.Meta().Noop) + if checkok && err != nil { // should never return this way + log.Fatalf("%s[%s]: CheckApply(): %t, %+v", obj.Kind(), obj.GetName(), checkok, err) + } + if global.DEBUG { + log.Printf("%s[%s]: CheckApply(): %t, %v", obj.Kind(), obj.GetName(), checkok, err) + } + + if !checkok { // if state *was* not ok, we had to have apply'ed + if err != nil { // error during check or apply + ok = false + } else { + apply = true + } + } + + // when noop is true we always want to update timestamp + if obj.Meta().Noop && err == nil { + ok = true + } + + if ok { + // update this timestamp *before* we poke or the poked + // nodes might fail due to having a too old timestamp! + v.UpdateTimestamp() // this was touched... + obj.SetState(resources.ResStatePoking) // can't cancel parent poke + g.Poke(v, apply) + } + // poke at our pre-req's instead since they need to refresh/run... + return err + } + // else... only poke at the pre-req's that need to run + go g.BackPoke(v) + return nil +} + +// SentinelErr is a sentinal as an error type that wraps an arbitrary error. +type SentinelErr struct { + err error +} + +// Error is the required method to fulfill the error type. +func (obj *SentinelErr) Error() string { + return obj.err.Error() +} + +// Worker is the common run frontend of the vertex. It handles all of the retry +// and retry delay common code, and ultimately returns the final status of this +// vertex execution. +func (g *Graph) Worker(v *Vertex) error { + // listen for chan events from Watch() and run + // the Process() function when they're received + // this avoids us having to pass the data into + // the Watch() function about which graph it is + // running on, which isolates things nicely... + obj := v.Res + chanProcess := make(chan event.Event) + go func() { + running := false + var timer = time.NewTimer(time.Duration(math.MaxInt64)) // longest duration + if !timer.Stop() { + <-timer.C // unnecessary, shouldn't happen + } + var delay = time.Duration(v.Meta().Delay) * time.Millisecond + var retry = v.Meta().Retry // number of tries left, -1 for infinite + var saved event.Event + Loop: + for { + // this has to be synchronous, because otherwise the Res + // event loop will keep running and change state, + // causing the converged timeout to fire! + select { + case event, ok := <-chanProcess: // must use like this + if running && ok { + // we got an event that wasn't a close, + // while we were waiting for the timer! + // if this happens, it might be a bug:( + log.Fatalf("%s[%s]: Worker: Unexpected event: %+v", v.Kind(), v.GetName(), event) + } + if !ok { // chanProcess closed, let's exit + break Loop // no event, so no ack! + } + + // the above mentioned synchronous part, is the + // running of this function, paired with an ack. + if e := g.Process(v); e != nil { + saved = event + log.Printf("%s[%s]: CheckApply errored: %v", v.Kind(), v.GetName(), e) + if retry == 0 { + // wrap the error in the sentinel + event.ACKNACK(&SentinelErr{e}) // fail the Watch() + break Loop + } + if retry > 0 { // don't decrement the -1 + retry-- + } + log.Printf("%s[%s]: CheckApply: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), delay.Seconds(), retry) + // start the timer... + timer.Reset(delay) + running = true + continue + } + retry = v.Meta().Retry // reset on success + event.ACK() // sync + + case <-timer.C: + if !timer.Stop() { + //<-timer.C // blocks, docs are wrong! + } + running = false + log.Printf("%s[%s]: CheckApply delay expired!", v.Kind(), v.GetName()) + // re-send this failed event, to trigger a CheckApply() + go func() { chanProcess <- saved }() + // TODO: should we send a fake event instead? + //saved = nil + } + } + }() + var err error // propagate the error up (this is a permanent BAD error!) + // the watch delay runs inside of the Watch resource loop, so that it + // can still process signals and exit if needed. It shouldn't run any + // resource specific code since this is supposed to be a retry delay. + // NOTE: we're using the same retry and delay metaparams that CheckApply + // uses. This is for practicality. We can separate them later if needed! + var watchDelay time.Duration + var watchRetry = v.Meta().Retry // number of tries left, -1 for infinite + // watch blocks until it ends, & errors to retry + for { + // TODO: do we have to stop the converged-timeout when in this block (perhaps we're in the delay block!) + // TODO: should we setup/manage some of the converged timeout stuff in here anyways? + + // if a retry-delay was requested, wait, but don't block our events! + if watchDelay > 0 { + //var pendingSendEvent bool + timer := time.NewTimer(watchDelay) + Loop: + for { + select { + case <-timer.C: // the wait is over + break Loop // critical + + // TODO: resources could have a separate exit channel to avoid this complexity!? + case event := <-obj.Events(): + // NOTE: this code should match the similar Res code! + //cuid.SetConverged(false) // TODO: ? + if exit, send := obj.ReadEvent(&event); exit { + return nil // exit + } else if send { + // if we dive down this rabbit hole, our + // timer.C won't get seen until we get out! + // in this situation, the Watch() is blocked + // from performing until CheckApply returns + // successfully, or errors out. This isn't + // so bad, but we should document it. Is it + // possible that some resource *needs* Watch + // to run to be able to execute a CheckApply? + // That situation shouldn't be common, and + // should probably not be allowed. Can we + // avoid it though? + //if exit, err := doSend(); exit || err != nil { + // return err // we exit or bubble up a NACK... + //} + // Instead of doing the above, we can + // add events to a pending list, and + // when we finish the delay, we can run + // them. + //pendingSendEvent = true // all events are identical for now... + } + } + } + timer.Stop() // it's nice to cleanup + log.Printf("%s[%s]: Watch delay expired!", v.Kind(), v.GetName()) + // NOTE: we can avoid the send if running Watch guarantees + // one CheckApply event on startup! + //if pendingSendEvent { // TODO: should this become a list in the future? + // if exit, err := obj.DoSend(chanProcess, ""); exit || err != nil { + // return err // we exit or bubble up a NACK... + // } + //} + } + + // TODO: reset the watch retry count after some amount of success + e := v.Res.Watch(chanProcess) + if e == nil { // exit signal + err = nil // clean exit + break + } + if sentinelErr, ok := e.(*SentinelErr); ok { // unwrap the sentinel + err = sentinelErr.err + break // sentinel means, perma-exit + } + log.Printf("%s[%s]: Watch errored: %v", v.Kind(), v.GetName(), e) + if watchRetry == 0 { + err = fmt.Errorf("Permanent watch error: %v", e) + break + } + if watchRetry > 0 { // don't decrement the -1 + watchRetry-- + } + watchDelay = time.Duration(v.Meta().Delay) * time.Millisecond + log.Printf("%s[%s]: Watch: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), watchDelay.Seconds(), watchRetry) + // We need to trigger a CheckApply after Watch restarts, so that + // we catch any lost events that happened while down. We do this + // by getting the Watch resource to send one event once it's up! + //v.SendEvent(eventPoke, false, false) + } + close(chanProcess) + return err +} + +// Start is a main kick to start the graph. It goes through in reverse topological +// sort order so that events can't hit un-started vertices. +func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue + log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState()) + defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState()) + t, _ := g.TopologicalSort() + // TODO: only calculate indegree if `first` is true to save resources + indegree := g.InDegree() // compute all of the indegree's + for _, v := range Reverse(t) { + + if !v.Res.IsWatching() { // if Watch() is not running... + wg.Add(1) + // must pass in value to avoid races... + // see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/ + go func(vv *Vertex) { + defer wg.Done() + // TODO: if a sufficient number of workers error, + // should something be done? Will these restart + // after perma-failure if we have a graph change? + if err := g.Worker(vv); err != nil { // contains the Watch and CheckApply loops + log.Printf("%s[%s]: Exited with failure: %v", vv.Kind(), vv.GetName(), err) + return + } + log.Printf("%s[%s]: Exited", vv.Kind(), vv.GetName()) + }(v) + } + + // selective poke: here we reduce the number of initial pokes + // to the minimum required to activate every vertex in the + // graph, either by direct action, or by getting poked by a + // vertex that was previously activated. if we poke each vertex + // that has no incoming edges, then we can be sure to reach the + // whole graph. Please note: this may mask certain optimization + // failures, such as any poke limiting code in Poke() or + // BackPoke(). You might want to disable this selective start + // when experimenting with and testing those elements. + // if we are unpausing (since it's not the first run of this + // function) we need to poke to *unpause* every graph vertex, + // and not just selectively the subset with no indegree. + if (!first) || indegree[v] == 0 { + // ensure state is started before continuing on to next vertex + for !v.SendEvent(event.EventStart, true, false) { + if global.DEBUG { + // if SendEvent fails, we aren't up yet + log.Printf("%s[%s]: Retrying SendEvent(Start)", v.Kind(), v.GetName()) + // sleep here briefly or otherwise cause + // a different goroutine to be scheduled + time.Sleep(1 * time.Millisecond) + } + } + } + } +} + +// Pause sends pause events to the graph in a topological sort order. +func (g *Graph) Pause() { + log.Printf("State: %v -> %v", g.setState(graphStatePausing), g.getState()) + defer log.Printf("State: %v -> %v", g.setState(graphStatePaused), g.getState()) + t, _ := g.TopologicalSort() + for _, v := range t { // squeeze out the events... + v.SendEvent(event.EventPause, true, false) + } +} + +// Exit sends exit events to the graph in a topological sort order. +func (g *Graph) Exit() { + if g == nil { + return + } // empty graph that wasn't populated yet + t, _ := g.TopologicalSort() + for _, v := range t { // squeeze out the events... + // turn off the taps... + // XXX: consider instead doing this by closing the Res.events channel instead? + // XXX: do this by sending an exit signal, and then returning + // when we hit the 'default' in the select statement! + // XXX: we can do this to quiesce, but it's not necessary now + + v.SendEvent(event.EventExit, true, false) + } +} diff --git a/pgraph/graphviz.go b/pgraph/graphviz.go new file mode 100644 index 00000000..110ec366 --- /dev/null +++ b/pgraph/graphviz.go @@ -0,0 +1,110 @@ +// Mgmt +// Copyright (C) 2013-2016+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgraph + +import ( + "fmt" + "io/ioutil" + "os" + "os/exec" + "strconv" + "syscall" +) + +// Graphviz outputs the graph in graphviz format. +// https://en.wikipedia.org/wiki/DOT_%28graph_description_language%29 +func (g *Graph) Graphviz() (out string) { + //digraph g { + // label="hello world"; + // node [shape=box]; + // A [label="A"]; + // B [label="B"]; + // C [label="C"]; + // D [label="D"]; + // E [label="E"]; + // A -> B [label=f]; + // B -> C [label=g]; + // D -> E [label=h]; + //} + out += fmt.Sprintf("digraph %s {\n", g.GetName()) + out += fmt.Sprintf("\tlabel=\"%s\";\n", g.GetName()) + //out += "\tnode [shape=box];\n" + str := "" + for i := range g.Adjacency { // reverse paths + out += fmt.Sprintf("\t%s [label=\"%s[%s]\"];\n", i.GetName(), i.Kind(), i.GetName()) + for j := range g.Adjacency[i] { + k := g.Adjacency[i][j] + // use str for clearer output ordering + str += fmt.Sprintf("\t%s -> %s [label=%s];\n", i.GetName(), j.GetName(), k.Name) + } + } + out += str + out += "}\n" + return +} + +// ExecGraphviz writes out the graphviz data and runs the correct graphviz +// filter command. +func (g *Graph) ExecGraphviz(program, filename string) error { + + switch program { + case "dot", "neato", "twopi", "circo", "fdp": + default: + return fmt.Errorf("Invalid graphviz program selected!") + } + + if filename == "" { + return fmt.Errorf("No filename given!") + } + + // run as a normal user if possible when run with sudo + uid, err1 := strconv.Atoi(os.Getenv("SUDO_UID")) + gid, err2 := strconv.Atoi(os.Getenv("SUDO_GID")) + + err := ioutil.WriteFile(filename, []byte(g.Graphviz()), 0644) + if err != nil { + return fmt.Errorf("Error writing to filename!") + } + + if err1 == nil && err2 == nil { + if err := os.Chown(filename, uid, gid); err != nil { + return fmt.Errorf("Error changing file owner!") + } + } + + path, err := exec.LookPath(program) + if err != nil { + return fmt.Errorf("Graphviz is missing!") + } + + out := fmt.Sprintf("%s.png", filename) + cmd := exec.Command(path, "-Tpng", fmt.Sprintf("-o%s", out), filename) + + if err1 == nil && err2 == nil { + cmd.SysProcAttr = &syscall.SysProcAttr{} + cmd.SysProcAttr.Credential = &syscall.Credential{ + Uid: uint32(uid), + Gid: uint32(gid), + } + } + _, err = cmd.Output() + if err != nil { + return fmt.Errorf("Error writing to image!") + } + return nil +} diff --git a/pgraph/pgraph.go b/pgraph/pgraph.go index 8d254dc7..922ce025 100644 --- a/pgraph/pgraph.go +++ b/pgraph/pgraph.go @@ -20,19 +20,10 @@ package pgraph import ( "fmt" - "io/ioutil" - "log" - "math" - "os" - "os/exec" "sort" - "strconv" "sync" - "syscall" - "time" "github.com/purpleidea/mgmt/event" - "github.com/purpleidea/mgmt/global" "github.com/purpleidea/mgmt/resources" errwrap "github.com/pkg/errors" @@ -258,89 +249,6 @@ func (v *Vertex) String() string { return fmt.Sprintf("%s[%s]", v.Res.Kind(), v.Res.GetName()) } -// Graphviz outputs the graph in graphviz format. -// https://en.wikipedia.org/wiki/DOT_%28graph_description_language%29 -func (g *Graph) Graphviz() (out string) { - //digraph g { - // label="hello world"; - // node [shape=box]; - // A [label="A"]; - // B [label="B"]; - // C [label="C"]; - // D [label="D"]; - // E [label="E"]; - // A -> B [label=f]; - // B -> C [label=g]; - // D -> E [label=h]; - //} - out += fmt.Sprintf("digraph %s {\n", g.GetName()) - out += fmt.Sprintf("\tlabel=\"%s\";\n", g.GetName()) - //out += "\tnode [shape=box];\n" - str := "" - for i := range g.Adjacency { // reverse paths - out += fmt.Sprintf("\t%s [label=\"%s[%s]\"];\n", i.GetName(), i.Kind(), i.GetName()) - for j := range g.Adjacency[i] { - k := g.Adjacency[i][j] - // use str for clearer output ordering - str += fmt.Sprintf("\t%s -> %s [label=%s];\n", i.GetName(), j.GetName(), k.Name) - } - } - out += str - out += "}\n" - return -} - -// ExecGraphviz writes out the graphviz data and runs the correct graphviz -// filter command. -func (g *Graph) ExecGraphviz(program, filename string) error { - - switch program { - case "dot", "neato", "twopi", "circo", "fdp": - default: - return fmt.Errorf("Invalid graphviz program selected!") - } - - if filename == "" { - return fmt.Errorf("No filename given!") - } - - // run as a normal user if possible when run with sudo - uid, err1 := strconv.Atoi(os.Getenv("SUDO_UID")) - gid, err2 := strconv.Atoi(os.Getenv("SUDO_GID")) - - err := ioutil.WriteFile(filename, []byte(g.Graphviz()), 0644) - if err != nil { - return fmt.Errorf("Error writing to filename!") - } - - if err1 == nil && err2 == nil { - if err := os.Chown(filename, uid, gid); err != nil { - return fmt.Errorf("Error changing file owner!") - } - } - - path, err := exec.LookPath(program) - if err != nil { - return fmt.Errorf("Graphviz is missing!") - } - - out := fmt.Sprintf("%s.png", filename) - cmd := exec.Command(path, "-Tpng", fmt.Sprintf("-o%s", out), filename) - - if err1 == nil && err2 == nil { - cmd.SysProcAttr = &syscall.SysProcAttr{} - cmd.SysProcAttr.Credential = &syscall.Credential{ - Uid: uint32(uid), - Gid: uint32(gid), - } - } - _, err = cmd.Output() - if err != nil { - return fmt.Errorf("Error writing to image!") - } - return nil -} - // IncomingGraphEdges returns an array (slice) of all directed vertices to // vertex v (??? -> v). OKTimestamp should probably use this. func (g *Graph) IncomingGraphEdges(v *Vertex) []*Vertex { @@ -566,399 +474,6 @@ func (g *Graph) Reachability(a, b *Vertex) []*Vertex { return result } -// GetTimestamp returns the timestamp of a vertex -func (v *Vertex) GetTimestamp() int64 { - return v.timestamp -} - -// UpdateTimestamp updates the timestamp on a vertex and returns the new value -func (v *Vertex) UpdateTimestamp() int64 { - v.timestamp = time.Now().UnixNano() // update - return v.timestamp -} - -// OKTimestamp returns true if this element can run right now? -func (g *Graph) OKTimestamp(v *Vertex) bool { - // these are all the vertices pointing TO v, eg: ??? -> v - for _, n := range g.IncomingGraphEdges(v) { - // if the vertex has a greater timestamp than any pre-req (n) - // then we can't run right now... - // if they're equal (eg: on init of 0) then we also can't run - // b/c we should let our pre-req's go first... - x, y := v.GetTimestamp(), n.GetTimestamp() - if global.DEBUG { - log.Printf("%s[%s]: OKTimestamp: (%v) >= %s[%s](%v): !%v", v.Kind(), v.GetName(), x, n.Kind(), n.GetName(), y, x >= y) - } - if x >= y { - return false - } - } - return true -} - -// Poke notifies nodes after me in the dependency graph that they need refreshing... -// NOTE: this assumes that this can never fail or need to be rescheduled -func (g *Graph) Poke(v *Vertex, activity bool) { - // these are all the vertices pointing AWAY FROM v, eg: v -> ??? - for _, n := range g.OutgoingGraphEdges(v) { - // XXX: if we're in state event and haven't been cancelled by - // apply, then we can cancel a poke to a child, right? XXX - // XXX: if n.Res.getState() != resources.ResStateEvent { // is this correct? - if true { // XXX - if global.DEBUG { - log.Printf("%s[%s]: Poke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) - } - n.SendEvent(event.EventPoke, false, activity) // XXX: can this be switched to sync? - } else { - if global.DEBUG { - log.Printf("%s[%s]: Poke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) - } - } - } -} - -// BackPoke pokes the pre-requisites that are stale and need to run before I can run. -func (g *Graph) BackPoke(v *Vertex) { - // these are all the vertices pointing TO v, eg: ??? -> v - for _, n := range g.IncomingGraphEdges(v) { - x, y, s := v.GetTimestamp(), n.GetTimestamp(), n.Res.GetState() - // if the parent timestamp needs poking AND it's not in state - // ResStateEvent, then poke it. If the parent is in ResStateEvent it - // means that an event is pending, so we'll be expecting a poke - // back soon, so we can safely discard the extra parent poke... - // TODO: implement a stateLT (less than) to tell if something - // happens earlier in the state cycle and that doesn't wrap nil - if x >= y && (s != resources.ResStateEvent && s != resources.ResStateCheckApply) { - if global.DEBUG { - log.Printf("%s[%s]: BackPoke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) - } - n.SendEvent(event.EventBackPoke, false, false) // XXX: can this be switched to sync? - } else { - if global.DEBUG { - log.Printf("%s[%s]: BackPoke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) - } - } - } -} - -// Process is the primary function to execute for a particular vertex in the graph. -func (g *Graph) Process(v *Vertex) error { - obj := v.Res - if global.DEBUG { - log.Printf("%s[%s]: Process()", obj.Kind(), obj.GetName()) - } - obj.SetState(resources.ResStateEvent) - var ok = true - var apply = false // did we run an apply? - // is it okay to run dependency wise right now? - // if not, that's okay because when the dependency runs, it will poke - // us back and we will run if needed then! - if g.OKTimestamp(v) { - if global.DEBUG { - log.Printf("%s[%s]: OKTimestamp(%v)", obj.Kind(), obj.GetName(), v.GetTimestamp()) - } - - obj.SetState(resources.ResStateCheckApply) - - // connect any senders to receivers and detect if values changed - if changed, err := obj.SendRecv(obj); err != nil { - return errwrap.Wrapf(err, "could not SendRecv in Process") - } else if changed { - obj.StateOK(false) // invalidate cache - } - - // if this fails, don't UpdateTimestamp() - checkok, err := obj.CheckApply(!obj.Meta().Noop) - if checkok && err != nil { // should never return this way - log.Fatalf("%s[%s]: CheckApply(): %t, %+v", obj.Kind(), obj.GetName(), checkok, err) - } - if global.DEBUG { - log.Printf("%s[%s]: CheckApply(): %t, %v", obj.Kind(), obj.GetName(), checkok, err) - } - - if !checkok { // if state *was* not ok, we had to have apply'ed - if err != nil { // error during check or apply - ok = false - } else { - apply = true - } - } - - // when noop is true we always want to update timestamp - if obj.Meta().Noop && err == nil { - ok = true - } - - if ok { - // update this timestamp *before* we poke or the poked - // nodes might fail due to having a too old timestamp! - v.UpdateTimestamp() // this was touched... - obj.SetState(resources.ResStatePoking) // can't cancel parent poke - g.Poke(v, apply) - } - // poke at our pre-req's instead since they need to refresh/run... - return err - } - // else... only poke at the pre-req's that need to run - go g.BackPoke(v) - return nil -} - -// SentinelErr is a sentinal as an error type that wraps an arbitrary error. -type SentinelErr struct { - err error -} - -// Error is the required method to fulfill the error type. -func (obj *SentinelErr) Error() string { - return obj.err.Error() -} - -// Worker is the common run frontend of the vertex. It handles all of the retry -// and retry delay common code, and ultimately returns the final status of this -// vertex execution. -func (g *Graph) Worker(v *Vertex) error { - // listen for chan events from Watch() and run - // the Process() function when they're received - // this avoids us having to pass the data into - // the Watch() function about which graph it is - // running on, which isolates things nicely... - obj := v.Res - chanProcess := make(chan event.Event) - go func() { - running := false - var timer = time.NewTimer(time.Duration(math.MaxInt64)) // longest duration - if !timer.Stop() { - <-timer.C // unnecessary, shouldn't happen - } - var delay = time.Duration(v.Meta().Delay) * time.Millisecond - var retry = v.Meta().Retry // number of tries left, -1 for infinite - var saved event.Event - Loop: - for { - // this has to be synchronous, because otherwise the Res - // event loop will keep running and change state, - // causing the converged timeout to fire! - select { - case event, ok := <-chanProcess: // must use like this - if running && ok { - // we got an event that wasn't a close, - // while we were waiting for the timer! - // if this happens, it might be a bug:( - log.Fatalf("%s[%s]: Worker: Unexpected event: %+v", v.Kind(), v.GetName(), event) - } - if !ok { // chanProcess closed, let's exit - break Loop // no event, so no ack! - } - - // the above mentioned synchronous part, is the - // running of this function, paired with an ack. - if e := g.Process(v); e != nil { - saved = event - log.Printf("%s[%s]: CheckApply errored: %v", v.Kind(), v.GetName(), e) - if retry == 0 { - // wrap the error in the sentinel - event.ACKNACK(&SentinelErr{e}) // fail the Watch() - break Loop - } - if retry > 0 { // don't decrement the -1 - retry-- - } - log.Printf("%s[%s]: CheckApply: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), delay.Seconds(), retry) - // start the timer... - timer.Reset(delay) - running = true - continue - } - retry = v.Meta().Retry // reset on success - event.ACK() // sync - - case <-timer.C: - if !timer.Stop() { - //<-timer.C // blocks, docs are wrong! - } - running = false - log.Printf("%s[%s]: CheckApply delay expired!", v.Kind(), v.GetName()) - // re-send this failed event, to trigger a CheckApply() - go func() { chanProcess <- saved }() - // TODO: should we send a fake event instead? - //saved = nil - } - } - }() - var err error // propagate the error up (this is a permanent BAD error!) - // the watch delay runs inside of the Watch resource loop, so that it - // can still process signals and exit if needed. It shouldn't run any - // resource specific code since this is supposed to be a retry delay. - // NOTE: we're using the same retry and delay metaparams that CheckApply - // uses. This is for practicality. We can separate them later if needed! - var watchDelay time.Duration - var watchRetry = v.Meta().Retry // number of tries left, -1 for infinite - // watch blocks until it ends, & errors to retry - for { - // TODO: do we have to stop the converged-timeout when in this block (perhaps we're in the delay block!) - // TODO: should we setup/manage some of the converged timeout stuff in here anyways? - - // if a retry-delay was requested, wait, but don't block our events! - if watchDelay > 0 { - //var pendingSendEvent bool - timer := time.NewTimer(watchDelay) - Loop: - for { - select { - case <-timer.C: // the wait is over - break Loop // critical - - // TODO: resources could have a separate exit channel to avoid this complexity!? - case event := <-obj.Events(): - // NOTE: this code should match the similar Res code! - //cuid.SetConverged(false) // TODO: ? - if exit, send := obj.ReadEvent(&event); exit { - return nil // exit - } else if send { - // if we dive down this rabbit hole, our - // timer.C won't get seen until we get out! - // in this situation, the Watch() is blocked - // from performing until CheckApply returns - // successfully, or errors out. This isn't - // so bad, but we should document it. Is it - // possible that some resource *needs* Watch - // to run to be able to execute a CheckApply? - // That situation shouldn't be common, and - // should probably not be allowed. Can we - // avoid it though? - //if exit, err := doSend(); exit || err != nil { - // return err // we exit or bubble up a NACK... - //} - // Instead of doing the above, we can - // add events to a pending list, and - // when we finish the delay, we can run - // them. - //pendingSendEvent = true // all events are identical for now... - } - } - } - timer.Stop() // it's nice to cleanup - log.Printf("%s[%s]: Watch delay expired!", v.Kind(), v.GetName()) - // NOTE: we can avoid the send if running Watch guarantees - // one CheckApply event on startup! - //if pendingSendEvent { // TODO: should this become a list in the future? - // if exit, err := obj.DoSend(chanProcess, ""); exit || err != nil { - // return err // we exit or bubble up a NACK... - // } - //} - } - - // TODO: reset the watch retry count after some amount of success - e := v.Res.Watch(chanProcess) - if e == nil { // exit signal - err = nil // clean exit - break - } - if sentinelErr, ok := e.(*SentinelErr); ok { // unwrap the sentinel - err = sentinelErr.err - break // sentinel means, perma-exit - } - log.Printf("%s[%s]: Watch errored: %v", v.Kind(), v.GetName(), e) - if watchRetry == 0 { - err = fmt.Errorf("Permanent watch error: %v", e) - break - } - if watchRetry > 0 { // don't decrement the -1 - watchRetry-- - } - watchDelay = time.Duration(v.Meta().Delay) * time.Millisecond - log.Printf("%s[%s]: Watch: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), watchDelay.Seconds(), watchRetry) - // We need to trigger a CheckApply after Watch restarts, so that - // we catch any lost events that happened while down. We do this - // by getting the Watch resource to send one event once it's up! - //v.SendEvent(eventPoke, false, false) - } - close(chanProcess) - return err -} - -// Start is a main kick to start the graph. It goes through in reverse topological -// sort order so that events can't hit un-started vertices. -func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue - log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState()) - defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState()) - t, _ := g.TopologicalSort() - // TODO: only calculate indegree if `first` is true to save resources - indegree := g.InDegree() // compute all of the indegree's - for _, v := range Reverse(t) { - - if !v.Res.IsWatching() { // if Watch() is not running... - wg.Add(1) - // must pass in value to avoid races... - // see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/ - go func(vv *Vertex) { - defer wg.Done() - // TODO: if a sufficient number of workers error, - // should something be done? Will these restart - // after perma-failure if we have a graph change? - if err := g.Worker(vv); err != nil { // contains the Watch and CheckApply loops - log.Printf("%s[%s]: Exited with failure: %v", vv.Kind(), vv.GetName(), err) - return - } - log.Printf("%s[%s]: Exited", vv.Kind(), vv.GetName()) - }(v) - } - - // selective poke: here we reduce the number of initial pokes - // to the minimum required to activate every vertex in the - // graph, either by direct action, or by getting poked by a - // vertex that was previously activated. if we poke each vertex - // that has no incoming edges, then we can be sure to reach the - // whole graph. Please note: this may mask certain optimization - // failures, such as any poke limiting code in Poke() or - // BackPoke(). You might want to disable this selective start - // when experimenting with and testing those elements. - // if we are unpausing (since it's not the first run of this - // function) we need to poke to *unpause* every graph vertex, - // and not just selectively the subset with no indegree. - if (!first) || indegree[v] == 0 { - // ensure state is started before continuing on to next vertex - for !v.SendEvent(event.EventStart, true, false) { - if global.DEBUG { - // if SendEvent fails, we aren't up yet - log.Printf("%s[%s]: Retrying SendEvent(Start)", v.Kind(), v.GetName()) - // sleep here briefly or otherwise cause - // a different goroutine to be scheduled - time.Sleep(1 * time.Millisecond) - } - } - } - } -} - -// Pause sends pause events to the graph in a topological sort order. -func (g *Graph) Pause() { - log.Printf("State: %v -> %v", g.setState(graphStatePausing), g.getState()) - defer log.Printf("State: %v -> %v", g.setState(graphStatePaused), g.getState()) - t, _ := g.TopologicalSort() - for _, v := range t { // squeeze out the events... - v.SendEvent(event.EventPause, true, false) - } -} - -// Exit sends exit events to the graph in a topological sort order. -func (g *Graph) Exit() { - if g == nil { - return - } // empty graph that wasn't populated yet - t, _ := g.TopologicalSort() - for _, v := range t { // squeeze out the events... - // turn off the taps... - // XXX: consider instead doing this by closing the Res.events channel instead? - // XXX: do this by sending an exit signal, and then returning - // when we hit the 'default' in the select statement! - // XXX: we can do this to quiesce, but it's not necessary now - - v.SendEvent(event.EventExit, true, false) - } -} - // GraphSync updates the oldGraph so that it matches the newGraph receiver. It // leaves identical elements alone so that they don't need to be refreshed. // FIXME: add test cases diff --git a/resources/resources.go b/resources/resources.go index 089181b8..b3893a8a 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -284,87 +284,6 @@ func (obj *BaseRes) SetState(state ResState) { obj.state = state } -// DoSend sends off an event, but doesn't block the incoming event queue. It can -// also recursively call itself when events need processing during the wait. -// I'm not completely comfortable with this fn, but it will have to do for now. -func (obj *BaseRes) DoSend(processChan chan event.Event, comment string) (bool, error) { - resp := event.NewResp() - processChan <- event.Event{Name: event.EventNil, Resp: resp, Msg: comment, Activity: true} // trigger process - e := resp.Wait() - return false, e // XXX: at the moment, we don't use the exit bool. - // XXX: this can cause a deadlock. do we need to recursively send? fix event stuff! - //select { - //case e := <-resp: // wait for the ACK() - // if e != nil { // we got a NACK - // return true, e // exit with error - // } - //case event := <-obj.events: - // // NOTE: this code should match the similar code below! - // //cuid.SetConverged(false) // TODO: ? - // if exit, send := obj.ReadEvent(&event); exit { - // return true, nil // exit, without error - // } else if send { - // return obj.DoSend(processChan, comment) // recurse - // } - //} - //return false, nil // return, no error or exit signal -} - -// SendEvent pushes an event into the message queue for a particular vertex -func (obj *BaseRes) SendEvent(ev event.EventName, sync bool, activity bool) bool { - // TODO: isn't this race-y ? - if !obj.IsWatching() { // element has already exited - return false // if we don't return, we'll block on the send - } - if !sync { - obj.events <- event.Event{Name: ev, Resp: nil, Msg: "", Activity: activity} - return true - } - - resp := event.NewResp() - obj.events <- event.Event{Name: ev, Resp: resp, Msg: "", Activity: activity} - resp.ACKWait() // waits until true (nil) value - return true -} - -// ReadEvent processes events when a select gets one, and handles the pause -// code too! The return values specify if we should exit and poke respectively. -func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, poke bool) { - ev.ACK() - switch ev.Name { - case event.EventStart: - return false, true - - case event.EventPoke: - return false, true - - case event.EventBackPoke: - return false, true // forward poking in response to a back poke! - - case event.EventExit: - return true, false - - case event.EventPause: - // wait for next event to continue - select { - case e := <-obj.Events(): - e.ACK() - if e.Name == event.EventExit { - return true, false - } else if e.Name == event.EventStart { // eventContinue - return false, false // don't poke on unpause! - } else { - // if we get a poke event here, it's a bug! - log.Fatalf("%s[%s]: Unknown event: %v, while paused!", obj.Kind(), obj.GetName(), e) - } - } - - default: - log.Fatal("Unknown event: ", ev) - } - return true, false // required to keep the stupid go compiler happy -} - // IsStateOK returns the cached state value. func (obj *BaseRes) IsStateOK() bool { return obj.isStateOK diff --git a/resources/sendrecv.go b/resources/sendrecv.go index 9446d18c..f964ea42 100644 --- a/resources/sendrecv.go +++ b/resources/sendrecv.go @@ -22,12 +22,94 @@ import ( "log" "reflect" + "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/global" multierr "github.com/hashicorp/go-multierror" errwrap "github.com/pkg/errors" ) +// DoSend sends off an event, but doesn't block the incoming event queue. It can +// also recursively call itself when events need processing during the wait. +// I'm not completely comfortable with this fn, but it will have to do for now. +func (obj *BaseRes) DoSend(processChan chan event.Event, comment string) (bool, error) { + resp := event.NewResp() + processChan <- event.Event{Name: event.EventNil, Resp: resp, Msg: comment, Activity: true} // trigger process + e := resp.Wait() + return false, e // XXX: at the moment, we don't use the exit bool. + // XXX: this can cause a deadlock. do we need to recursively send? fix event stuff! + //select { + //case e := <-resp: // wait for the ACK() + // if e != nil { // we got a NACK + // return true, e // exit with error + // } + //case event := <-obj.events: + // // NOTE: this code should match the similar code below! + // //cuid.SetConverged(false) // TODO: ? + // if exit, send := obj.ReadEvent(&event); exit { + // return true, nil // exit, without error + // } else if send { + // return obj.DoSend(processChan, comment) // recurse + // } + //} + //return false, nil // return, no error or exit signal +} + +// SendEvent pushes an event into the message queue for a particular vertex +func (obj *BaseRes) SendEvent(ev event.EventName, sync bool, activity bool) bool { + // TODO: isn't this race-y ? + if !obj.IsWatching() { // element has already exited + return false // if we don't return, we'll block on the send + } + if !sync { + obj.events <- event.Event{Name: ev, Resp: nil, Msg: "", Activity: activity} + return true + } + + resp := event.NewResp() + obj.events <- event.Event{Name: ev, Resp: resp, Msg: "", Activity: activity} + resp.ACKWait() // waits until true (nil) value + return true +} + +// ReadEvent processes events when a select gets one, and handles the pause +// code too! The return values specify if we should exit and poke respectively. +func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, poke bool) { + ev.ACK() + switch ev.Name { + case event.EventStart: + return false, true + + case event.EventPoke: + return false, true + + case event.EventBackPoke: + return false, true // forward poking in response to a back poke! + + case event.EventExit: + return true, false + + case event.EventPause: + // wait for next event to continue + select { + case e := <-obj.Events(): + e.ACK() + if e.Name == event.EventExit { + return true, false + } else if e.Name == event.EventStart { // eventContinue + return false, false // don't poke on unpause! + } else { + // if we get a poke event here, it's a bug! + log.Fatalf("%s[%s]: Unknown event: %v, while paused!", obj.Kind(), obj.GetName(), e) + } + } + + default: + log.Fatal("Unknown event: ", ev) + } + return true, false // required to keep the stupid go compiler happy +} + // Send points to a value that a resource will send. type Send struct { Res Res // a handle to the resource which is sending a value