resources, pgraph: split logical chunks into separate files
This commit is contained in:
425
pgraph/actions.go
Normal file
425
pgraph/actions.go
Normal file
@@ -0,0 +1,425 @@
|
|||||||
|
// Mgmt
|
||||||
|
// Copyright (C) 2013-2016+ James Shubin and the project contributors
|
||||||
|
// Written by James Shubin <james@shubin.ca> and the project contributors
|
||||||
|
//
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package pgraph
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"math"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/purpleidea/mgmt/event"
|
||||||
|
"github.com/purpleidea/mgmt/global"
|
||||||
|
"github.com/purpleidea/mgmt/resources"
|
||||||
|
|
||||||
|
errwrap "github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GetTimestamp returns the timestamp of a vertex
|
||||||
|
func (v *Vertex) GetTimestamp() int64 {
|
||||||
|
return v.timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateTimestamp updates the timestamp on a vertex and returns the new value
|
||||||
|
func (v *Vertex) UpdateTimestamp() int64 {
|
||||||
|
v.timestamp = time.Now().UnixNano() // update
|
||||||
|
return v.timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
// OKTimestamp returns true if this element can run right now?
|
||||||
|
func (g *Graph) OKTimestamp(v *Vertex) bool {
|
||||||
|
// these are all the vertices pointing TO v, eg: ??? -> v
|
||||||
|
for _, n := range g.IncomingGraphEdges(v) {
|
||||||
|
// if the vertex has a greater timestamp than any pre-req (n)
|
||||||
|
// then we can't run right now...
|
||||||
|
// if they're equal (eg: on init of 0) then we also can't run
|
||||||
|
// b/c we should let our pre-req's go first...
|
||||||
|
x, y := v.GetTimestamp(), n.GetTimestamp()
|
||||||
|
if global.DEBUG {
|
||||||
|
log.Printf("%s[%s]: OKTimestamp: (%v) >= %s[%s](%v): !%v", v.Kind(), v.GetName(), x, n.Kind(), n.GetName(), y, x >= y)
|
||||||
|
}
|
||||||
|
if x >= y {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Poke notifies nodes after me in the dependency graph that they need refreshing...
|
||||||
|
// NOTE: this assumes that this can never fail or need to be rescheduled
|
||||||
|
func (g *Graph) Poke(v *Vertex, activity bool) {
|
||||||
|
// these are all the vertices pointing AWAY FROM v, eg: v -> ???
|
||||||
|
for _, n := range g.OutgoingGraphEdges(v) {
|
||||||
|
// XXX: if we're in state event and haven't been cancelled by
|
||||||
|
// apply, then we can cancel a poke to a child, right? XXX
|
||||||
|
// XXX: if n.Res.getState() != resources.ResStateEvent { // is this correct?
|
||||||
|
if true { // XXX
|
||||||
|
if global.DEBUG {
|
||||||
|
log.Printf("%s[%s]: Poke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
|
||||||
|
}
|
||||||
|
n.SendEvent(event.EventPoke, false, activity) // XXX: can this be switched to sync?
|
||||||
|
} else {
|
||||||
|
if global.DEBUG {
|
||||||
|
log.Printf("%s[%s]: Poke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// BackPoke pokes the pre-requisites that are stale and need to run before I can run.
|
||||||
|
func (g *Graph) BackPoke(v *Vertex) {
|
||||||
|
// these are all the vertices pointing TO v, eg: ??? -> v
|
||||||
|
for _, n := range g.IncomingGraphEdges(v) {
|
||||||
|
x, y, s := v.GetTimestamp(), n.GetTimestamp(), n.Res.GetState()
|
||||||
|
// if the parent timestamp needs poking AND it's not in state
|
||||||
|
// ResStateEvent, then poke it. If the parent is in ResStateEvent it
|
||||||
|
// means that an event is pending, so we'll be expecting a poke
|
||||||
|
// back soon, so we can safely discard the extra parent poke...
|
||||||
|
// TODO: implement a stateLT (less than) to tell if something
|
||||||
|
// happens earlier in the state cycle and that doesn't wrap nil
|
||||||
|
if x >= y && (s != resources.ResStateEvent && s != resources.ResStateCheckApply) {
|
||||||
|
if global.DEBUG {
|
||||||
|
log.Printf("%s[%s]: BackPoke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
|
||||||
|
}
|
||||||
|
n.SendEvent(event.EventBackPoke, false, false) // XXX: can this be switched to sync?
|
||||||
|
} else {
|
||||||
|
if global.DEBUG {
|
||||||
|
log.Printf("%s[%s]: BackPoke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process is the primary function to execute for a particular vertex in the graph.
|
||||||
|
func (g *Graph) Process(v *Vertex) error {
|
||||||
|
obj := v.Res
|
||||||
|
if global.DEBUG {
|
||||||
|
log.Printf("%s[%s]: Process()", obj.Kind(), obj.GetName())
|
||||||
|
}
|
||||||
|
obj.SetState(resources.ResStateEvent)
|
||||||
|
var ok = true
|
||||||
|
var apply = false // did we run an apply?
|
||||||
|
// is it okay to run dependency wise right now?
|
||||||
|
// if not, that's okay because when the dependency runs, it will poke
|
||||||
|
// us back and we will run if needed then!
|
||||||
|
if g.OKTimestamp(v) {
|
||||||
|
if global.DEBUG {
|
||||||
|
log.Printf("%s[%s]: OKTimestamp(%v)", obj.Kind(), obj.GetName(), v.GetTimestamp())
|
||||||
|
}
|
||||||
|
|
||||||
|
obj.SetState(resources.ResStateCheckApply)
|
||||||
|
|
||||||
|
// connect any senders to receivers and detect if values changed
|
||||||
|
if changed, err := obj.SendRecv(obj); err != nil {
|
||||||
|
return errwrap.Wrapf(err, "could not SendRecv in Process")
|
||||||
|
} else if changed {
|
||||||
|
obj.StateOK(false) // invalidate cache
|
||||||
|
}
|
||||||
|
|
||||||
|
// if this fails, don't UpdateTimestamp()
|
||||||
|
checkok, err := obj.CheckApply(!obj.Meta().Noop)
|
||||||
|
if checkok && err != nil { // should never return this way
|
||||||
|
log.Fatalf("%s[%s]: CheckApply(): %t, %+v", obj.Kind(), obj.GetName(), checkok, err)
|
||||||
|
}
|
||||||
|
if global.DEBUG {
|
||||||
|
log.Printf("%s[%s]: CheckApply(): %t, %v", obj.Kind(), obj.GetName(), checkok, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !checkok { // if state *was* not ok, we had to have apply'ed
|
||||||
|
if err != nil { // error during check or apply
|
||||||
|
ok = false
|
||||||
|
} else {
|
||||||
|
apply = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// when noop is true we always want to update timestamp
|
||||||
|
if obj.Meta().Noop && err == nil {
|
||||||
|
ok = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok {
|
||||||
|
// update this timestamp *before* we poke or the poked
|
||||||
|
// nodes might fail due to having a too old timestamp!
|
||||||
|
v.UpdateTimestamp() // this was touched...
|
||||||
|
obj.SetState(resources.ResStatePoking) // can't cancel parent poke
|
||||||
|
g.Poke(v, apply)
|
||||||
|
}
|
||||||
|
// poke at our pre-req's instead since they need to refresh/run...
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// else... only poke at the pre-req's that need to run
|
||||||
|
go g.BackPoke(v)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SentinelErr is a sentinal as an error type that wraps an arbitrary error.
|
||||||
|
type SentinelErr struct {
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error is the required method to fulfill the error type.
|
||||||
|
func (obj *SentinelErr) Error() string {
|
||||||
|
return obj.err.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Worker is the common run frontend of the vertex. It handles all of the retry
|
||||||
|
// and retry delay common code, and ultimately returns the final status of this
|
||||||
|
// vertex execution.
|
||||||
|
func (g *Graph) Worker(v *Vertex) error {
|
||||||
|
// listen for chan events from Watch() and run
|
||||||
|
// the Process() function when they're received
|
||||||
|
// this avoids us having to pass the data into
|
||||||
|
// the Watch() function about which graph it is
|
||||||
|
// running on, which isolates things nicely...
|
||||||
|
obj := v.Res
|
||||||
|
chanProcess := make(chan event.Event)
|
||||||
|
go func() {
|
||||||
|
running := false
|
||||||
|
var timer = time.NewTimer(time.Duration(math.MaxInt64)) // longest duration
|
||||||
|
if !timer.Stop() {
|
||||||
|
<-timer.C // unnecessary, shouldn't happen
|
||||||
|
}
|
||||||
|
var delay = time.Duration(v.Meta().Delay) * time.Millisecond
|
||||||
|
var retry = v.Meta().Retry // number of tries left, -1 for infinite
|
||||||
|
var saved event.Event
|
||||||
|
Loop:
|
||||||
|
for {
|
||||||
|
// this has to be synchronous, because otherwise the Res
|
||||||
|
// event loop will keep running and change state,
|
||||||
|
// causing the converged timeout to fire!
|
||||||
|
select {
|
||||||
|
case event, ok := <-chanProcess: // must use like this
|
||||||
|
if running && ok {
|
||||||
|
// we got an event that wasn't a close,
|
||||||
|
// while we were waiting for the timer!
|
||||||
|
// if this happens, it might be a bug:(
|
||||||
|
log.Fatalf("%s[%s]: Worker: Unexpected event: %+v", v.Kind(), v.GetName(), event)
|
||||||
|
}
|
||||||
|
if !ok { // chanProcess closed, let's exit
|
||||||
|
break Loop // no event, so no ack!
|
||||||
|
}
|
||||||
|
|
||||||
|
// the above mentioned synchronous part, is the
|
||||||
|
// running of this function, paired with an ack.
|
||||||
|
if e := g.Process(v); e != nil {
|
||||||
|
saved = event
|
||||||
|
log.Printf("%s[%s]: CheckApply errored: %v", v.Kind(), v.GetName(), e)
|
||||||
|
if retry == 0 {
|
||||||
|
// wrap the error in the sentinel
|
||||||
|
event.ACKNACK(&SentinelErr{e}) // fail the Watch()
|
||||||
|
break Loop
|
||||||
|
}
|
||||||
|
if retry > 0 { // don't decrement the -1
|
||||||
|
retry--
|
||||||
|
}
|
||||||
|
log.Printf("%s[%s]: CheckApply: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), delay.Seconds(), retry)
|
||||||
|
// start the timer...
|
||||||
|
timer.Reset(delay)
|
||||||
|
running = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
retry = v.Meta().Retry // reset on success
|
||||||
|
event.ACK() // sync
|
||||||
|
|
||||||
|
case <-timer.C:
|
||||||
|
if !timer.Stop() {
|
||||||
|
//<-timer.C // blocks, docs are wrong!
|
||||||
|
}
|
||||||
|
running = false
|
||||||
|
log.Printf("%s[%s]: CheckApply delay expired!", v.Kind(), v.GetName())
|
||||||
|
// re-send this failed event, to trigger a CheckApply()
|
||||||
|
go func() { chanProcess <- saved }()
|
||||||
|
// TODO: should we send a fake event instead?
|
||||||
|
//saved = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
var err error // propagate the error up (this is a permanent BAD error!)
|
||||||
|
// the watch delay runs inside of the Watch resource loop, so that it
|
||||||
|
// can still process signals and exit if needed. It shouldn't run any
|
||||||
|
// resource specific code since this is supposed to be a retry delay.
|
||||||
|
// NOTE: we're using the same retry and delay metaparams that CheckApply
|
||||||
|
// uses. This is for practicality. We can separate them later if needed!
|
||||||
|
var watchDelay time.Duration
|
||||||
|
var watchRetry = v.Meta().Retry // number of tries left, -1 for infinite
|
||||||
|
// watch blocks until it ends, & errors to retry
|
||||||
|
for {
|
||||||
|
// TODO: do we have to stop the converged-timeout when in this block (perhaps we're in the delay block!)
|
||||||
|
// TODO: should we setup/manage some of the converged timeout stuff in here anyways?
|
||||||
|
|
||||||
|
// if a retry-delay was requested, wait, but don't block our events!
|
||||||
|
if watchDelay > 0 {
|
||||||
|
//var pendingSendEvent bool
|
||||||
|
timer := time.NewTimer(watchDelay)
|
||||||
|
Loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-timer.C: // the wait is over
|
||||||
|
break Loop // critical
|
||||||
|
|
||||||
|
// TODO: resources could have a separate exit channel to avoid this complexity!?
|
||||||
|
case event := <-obj.Events():
|
||||||
|
// NOTE: this code should match the similar Res code!
|
||||||
|
//cuid.SetConverged(false) // TODO: ?
|
||||||
|
if exit, send := obj.ReadEvent(&event); exit {
|
||||||
|
return nil // exit
|
||||||
|
} else if send {
|
||||||
|
// if we dive down this rabbit hole, our
|
||||||
|
// timer.C won't get seen until we get out!
|
||||||
|
// in this situation, the Watch() is blocked
|
||||||
|
// from performing until CheckApply returns
|
||||||
|
// successfully, or errors out. This isn't
|
||||||
|
// so bad, but we should document it. Is it
|
||||||
|
// possible that some resource *needs* Watch
|
||||||
|
// to run to be able to execute a CheckApply?
|
||||||
|
// That situation shouldn't be common, and
|
||||||
|
// should probably not be allowed. Can we
|
||||||
|
// avoid it though?
|
||||||
|
//if exit, err := doSend(); exit || err != nil {
|
||||||
|
// return err // we exit or bubble up a NACK...
|
||||||
|
//}
|
||||||
|
// Instead of doing the above, we can
|
||||||
|
// add events to a pending list, and
|
||||||
|
// when we finish the delay, we can run
|
||||||
|
// them.
|
||||||
|
//pendingSendEvent = true // all events are identical for now...
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
timer.Stop() // it's nice to cleanup
|
||||||
|
log.Printf("%s[%s]: Watch delay expired!", v.Kind(), v.GetName())
|
||||||
|
// NOTE: we can avoid the send if running Watch guarantees
|
||||||
|
// one CheckApply event on startup!
|
||||||
|
//if pendingSendEvent { // TODO: should this become a list in the future?
|
||||||
|
// if exit, err := obj.DoSend(chanProcess, ""); exit || err != nil {
|
||||||
|
// return err // we exit or bubble up a NACK...
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: reset the watch retry count after some amount of success
|
||||||
|
e := v.Res.Watch(chanProcess)
|
||||||
|
if e == nil { // exit signal
|
||||||
|
err = nil // clean exit
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if sentinelErr, ok := e.(*SentinelErr); ok { // unwrap the sentinel
|
||||||
|
err = sentinelErr.err
|
||||||
|
break // sentinel means, perma-exit
|
||||||
|
}
|
||||||
|
log.Printf("%s[%s]: Watch errored: %v", v.Kind(), v.GetName(), e)
|
||||||
|
if watchRetry == 0 {
|
||||||
|
err = fmt.Errorf("Permanent watch error: %v", e)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if watchRetry > 0 { // don't decrement the -1
|
||||||
|
watchRetry--
|
||||||
|
}
|
||||||
|
watchDelay = time.Duration(v.Meta().Delay) * time.Millisecond
|
||||||
|
log.Printf("%s[%s]: Watch: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), watchDelay.Seconds(), watchRetry)
|
||||||
|
// We need to trigger a CheckApply after Watch restarts, so that
|
||||||
|
// we catch any lost events that happened while down. We do this
|
||||||
|
// by getting the Watch resource to send one event once it's up!
|
||||||
|
//v.SendEvent(eventPoke, false, false)
|
||||||
|
}
|
||||||
|
close(chanProcess)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start is a main kick to start the graph. It goes through in reverse topological
|
||||||
|
// sort order so that events can't hit un-started vertices.
|
||||||
|
func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
|
||||||
|
log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState())
|
||||||
|
defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState())
|
||||||
|
t, _ := g.TopologicalSort()
|
||||||
|
// TODO: only calculate indegree if `first` is true to save resources
|
||||||
|
indegree := g.InDegree() // compute all of the indegree's
|
||||||
|
for _, v := range Reverse(t) {
|
||||||
|
|
||||||
|
if !v.Res.IsWatching() { // if Watch() is not running...
|
||||||
|
wg.Add(1)
|
||||||
|
// must pass in value to avoid races...
|
||||||
|
// see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/
|
||||||
|
go func(vv *Vertex) {
|
||||||
|
defer wg.Done()
|
||||||
|
// TODO: if a sufficient number of workers error,
|
||||||
|
// should something be done? Will these restart
|
||||||
|
// after perma-failure if we have a graph change?
|
||||||
|
if err := g.Worker(vv); err != nil { // contains the Watch and CheckApply loops
|
||||||
|
log.Printf("%s[%s]: Exited with failure: %v", vv.Kind(), vv.GetName(), err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Printf("%s[%s]: Exited", vv.Kind(), vv.GetName())
|
||||||
|
}(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// selective poke: here we reduce the number of initial pokes
|
||||||
|
// to the minimum required to activate every vertex in the
|
||||||
|
// graph, either by direct action, or by getting poked by a
|
||||||
|
// vertex that was previously activated. if we poke each vertex
|
||||||
|
// that has no incoming edges, then we can be sure to reach the
|
||||||
|
// whole graph. Please note: this may mask certain optimization
|
||||||
|
// failures, such as any poke limiting code in Poke() or
|
||||||
|
// BackPoke(). You might want to disable this selective start
|
||||||
|
// when experimenting with and testing those elements.
|
||||||
|
// if we are unpausing (since it's not the first run of this
|
||||||
|
// function) we need to poke to *unpause* every graph vertex,
|
||||||
|
// and not just selectively the subset with no indegree.
|
||||||
|
if (!first) || indegree[v] == 0 {
|
||||||
|
// ensure state is started before continuing on to next vertex
|
||||||
|
for !v.SendEvent(event.EventStart, true, false) {
|
||||||
|
if global.DEBUG {
|
||||||
|
// if SendEvent fails, we aren't up yet
|
||||||
|
log.Printf("%s[%s]: Retrying SendEvent(Start)", v.Kind(), v.GetName())
|
||||||
|
// sleep here briefly or otherwise cause
|
||||||
|
// a different goroutine to be scheduled
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pause sends pause events to the graph in a topological sort order.
|
||||||
|
func (g *Graph) Pause() {
|
||||||
|
log.Printf("State: %v -> %v", g.setState(graphStatePausing), g.getState())
|
||||||
|
defer log.Printf("State: %v -> %v", g.setState(graphStatePaused), g.getState())
|
||||||
|
t, _ := g.TopologicalSort()
|
||||||
|
for _, v := range t { // squeeze out the events...
|
||||||
|
v.SendEvent(event.EventPause, true, false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exit sends exit events to the graph in a topological sort order.
|
||||||
|
func (g *Graph) Exit() {
|
||||||
|
if g == nil {
|
||||||
|
return
|
||||||
|
} // empty graph that wasn't populated yet
|
||||||
|
t, _ := g.TopologicalSort()
|
||||||
|
for _, v := range t { // squeeze out the events...
|
||||||
|
// turn off the taps...
|
||||||
|
// XXX: consider instead doing this by closing the Res.events channel instead?
|
||||||
|
// XXX: do this by sending an exit signal, and then returning
|
||||||
|
// when we hit the 'default' in the select statement!
|
||||||
|
// XXX: we can do this to quiesce, but it's not necessary now
|
||||||
|
|
||||||
|
v.SendEvent(event.EventExit, true, false)
|
||||||
|
}
|
||||||
|
}
|
||||||
110
pgraph/graphviz.go
Normal file
110
pgraph/graphviz.go
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
// Mgmt
|
||||||
|
// Copyright (C) 2013-2016+ James Shubin and the project contributors
|
||||||
|
// Written by James Shubin <james@shubin.ca> and the project contributors
|
||||||
|
//
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package pgraph
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"strconv"
|
||||||
|
"syscall"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Graphviz outputs the graph in graphviz format.
|
||||||
|
// https://en.wikipedia.org/wiki/DOT_%28graph_description_language%29
|
||||||
|
func (g *Graph) Graphviz() (out string) {
|
||||||
|
//digraph g {
|
||||||
|
// label="hello world";
|
||||||
|
// node [shape=box];
|
||||||
|
// A [label="A"];
|
||||||
|
// B [label="B"];
|
||||||
|
// C [label="C"];
|
||||||
|
// D [label="D"];
|
||||||
|
// E [label="E"];
|
||||||
|
// A -> B [label=f];
|
||||||
|
// B -> C [label=g];
|
||||||
|
// D -> E [label=h];
|
||||||
|
//}
|
||||||
|
out += fmt.Sprintf("digraph %s {\n", g.GetName())
|
||||||
|
out += fmt.Sprintf("\tlabel=\"%s\";\n", g.GetName())
|
||||||
|
//out += "\tnode [shape=box];\n"
|
||||||
|
str := ""
|
||||||
|
for i := range g.Adjacency { // reverse paths
|
||||||
|
out += fmt.Sprintf("\t%s [label=\"%s[%s]\"];\n", i.GetName(), i.Kind(), i.GetName())
|
||||||
|
for j := range g.Adjacency[i] {
|
||||||
|
k := g.Adjacency[i][j]
|
||||||
|
// use str for clearer output ordering
|
||||||
|
str += fmt.Sprintf("\t%s -> %s [label=%s];\n", i.GetName(), j.GetName(), k.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out += str
|
||||||
|
out += "}\n"
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExecGraphviz writes out the graphviz data and runs the correct graphviz
|
||||||
|
// filter command.
|
||||||
|
func (g *Graph) ExecGraphviz(program, filename string) error {
|
||||||
|
|
||||||
|
switch program {
|
||||||
|
case "dot", "neato", "twopi", "circo", "fdp":
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("Invalid graphviz program selected!")
|
||||||
|
}
|
||||||
|
|
||||||
|
if filename == "" {
|
||||||
|
return fmt.Errorf("No filename given!")
|
||||||
|
}
|
||||||
|
|
||||||
|
// run as a normal user if possible when run with sudo
|
||||||
|
uid, err1 := strconv.Atoi(os.Getenv("SUDO_UID"))
|
||||||
|
gid, err2 := strconv.Atoi(os.Getenv("SUDO_GID"))
|
||||||
|
|
||||||
|
err := ioutil.WriteFile(filename, []byte(g.Graphviz()), 0644)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Error writing to filename!")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err1 == nil && err2 == nil {
|
||||||
|
if err := os.Chown(filename, uid, gid); err != nil {
|
||||||
|
return fmt.Errorf("Error changing file owner!")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
path, err := exec.LookPath(program)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Graphviz is missing!")
|
||||||
|
}
|
||||||
|
|
||||||
|
out := fmt.Sprintf("%s.png", filename)
|
||||||
|
cmd := exec.Command(path, "-Tpng", fmt.Sprintf("-o%s", out), filename)
|
||||||
|
|
||||||
|
if err1 == nil && err2 == nil {
|
||||||
|
cmd.SysProcAttr = &syscall.SysProcAttr{}
|
||||||
|
cmd.SysProcAttr.Credential = &syscall.Credential{
|
||||||
|
Uid: uint32(uid),
|
||||||
|
Gid: uint32(gid),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_, err = cmd.Output()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Error writing to image!")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
485
pgraph/pgraph.go
485
pgraph/pgraph.go
@@ -20,19 +20,10 @@ package pgraph
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"log"
|
|
||||||
"math"
|
|
||||||
"os"
|
|
||||||
"os/exec"
|
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
|
||||||
"sync"
|
"sync"
|
||||||
"syscall"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/purpleidea/mgmt/event"
|
"github.com/purpleidea/mgmt/event"
|
||||||
"github.com/purpleidea/mgmt/global"
|
|
||||||
"github.com/purpleidea/mgmt/resources"
|
"github.com/purpleidea/mgmt/resources"
|
||||||
|
|
||||||
errwrap "github.com/pkg/errors"
|
errwrap "github.com/pkg/errors"
|
||||||
@@ -258,89 +249,6 @@ func (v *Vertex) String() string {
|
|||||||
return fmt.Sprintf("%s[%s]", v.Res.Kind(), v.Res.GetName())
|
return fmt.Sprintf("%s[%s]", v.Res.Kind(), v.Res.GetName())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Graphviz outputs the graph in graphviz format.
|
|
||||||
// https://en.wikipedia.org/wiki/DOT_%28graph_description_language%29
|
|
||||||
func (g *Graph) Graphviz() (out string) {
|
|
||||||
//digraph g {
|
|
||||||
// label="hello world";
|
|
||||||
// node [shape=box];
|
|
||||||
// A [label="A"];
|
|
||||||
// B [label="B"];
|
|
||||||
// C [label="C"];
|
|
||||||
// D [label="D"];
|
|
||||||
// E [label="E"];
|
|
||||||
// A -> B [label=f];
|
|
||||||
// B -> C [label=g];
|
|
||||||
// D -> E [label=h];
|
|
||||||
//}
|
|
||||||
out += fmt.Sprintf("digraph %s {\n", g.GetName())
|
|
||||||
out += fmt.Sprintf("\tlabel=\"%s\";\n", g.GetName())
|
|
||||||
//out += "\tnode [shape=box];\n"
|
|
||||||
str := ""
|
|
||||||
for i := range g.Adjacency { // reverse paths
|
|
||||||
out += fmt.Sprintf("\t%s [label=\"%s[%s]\"];\n", i.GetName(), i.Kind(), i.GetName())
|
|
||||||
for j := range g.Adjacency[i] {
|
|
||||||
k := g.Adjacency[i][j]
|
|
||||||
// use str for clearer output ordering
|
|
||||||
str += fmt.Sprintf("\t%s -> %s [label=%s];\n", i.GetName(), j.GetName(), k.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
out += str
|
|
||||||
out += "}\n"
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// ExecGraphviz writes out the graphviz data and runs the correct graphviz
|
|
||||||
// filter command.
|
|
||||||
func (g *Graph) ExecGraphviz(program, filename string) error {
|
|
||||||
|
|
||||||
switch program {
|
|
||||||
case "dot", "neato", "twopi", "circo", "fdp":
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("Invalid graphviz program selected!")
|
|
||||||
}
|
|
||||||
|
|
||||||
if filename == "" {
|
|
||||||
return fmt.Errorf("No filename given!")
|
|
||||||
}
|
|
||||||
|
|
||||||
// run as a normal user if possible when run with sudo
|
|
||||||
uid, err1 := strconv.Atoi(os.Getenv("SUDO_UID"))
|
|
||||||
gid, err2 := strconv.Atoi(os.Getenv("SUDO_GID"))
|
|
||||||
|
|
||||||
err := ioutil.WriteFile(filename, []byte(g.Graphviz()), 0644)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Error writing to filename!")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err1 == nil && err2 == nil {
|
|
||||||
if err := os.Chown(filename, uid, gid); err != nil {
|
|
||||||
return fmt.Errorf("Error changing file owner!")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
path, err := exec.LookPath(program)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Graphviz is missing!")
|
|
||||||
}
|
|
||||||
|
|
||||||
out := fmt.Sprintf("%s.png", filename)
|
|
||||||
cmd := exec.Command(path, "-Tpng", fmt.Sprintf("-o%s", out), filename)
|
|
||||||
|
|
||||||
if err1 == nil && err2 == nil {
|
|
||||||
cmd.SysProcAttr = &syscall.SysProcAttr{}
|
|
||||||
cmd.SysProcAttr.Credential = &syscall.Credential{
|
|
||||||
Uid: uint32(uid),
|
|
||||||
Gid: uint32(gid),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_, err = cmd.Output()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Error writing to image!")
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// IncomingGraphEdges returns an array (slice) of all directed vertices to
|
// IncomingGraphEdges returns an array (slice) of all directed vertices to
|
||||||
// vertex v (??? -> v). OKTimestamp should probably use this.
|
// vertex v (??? -> v). OKTimestamp should probably use this.
|
||||||
func (g *Graph) IncomingGraphEdges(v *Vertex) []*Vertex {
|
func (g *Graph) IncomingGraphEdges(v *Vertex) []*Vertex {
|
||||||
@@ -566,399 +474,6 @@ func (g *Graph) Reachability(a, b *Vertex) []*Vertex {
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetTimestamp returns the timestamp of a vertex
|
|
||||||
func (v *Vertex) GetTimestamp() int64 {
|
|
||||||
return v.timestamp
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateTimestamp updates the timestamp on a vertex and returns the new value
|
|
||||||
func (v *Vertex) UpdateTimestamp() int64 {
|
|
||||||
v.timestamp = time.Now().UnixNano() // update
|
|
||||||
return v.timestamp
|
|
||||||
}
|
|
||||||
|
|
||||||
// OKTimestamp returns true if this element can run right now?
|
|
||||||
func (g *Graph) OKTimestamp(v *Vertex) bool {
|
|
||||||
// these are all the vertices pointing TO v, eg: ??? -> v
|
|
||||||
for _, n := range g.IncomingGraphEdges(v) {
|
|
||||||
// if the vertex has a greater timestamp than any pre-req (n)
|
|
||||||
// then we can't run right now...
|
|
||||||
// if they're equal (eg: on init of 0) then we also can't run
|
|
||||||
// b/c we should let our pre-req's go first...
|
|
||||||
x, y := v.GetTimestamp(), n.GetTimestamp()
|
|
||||||
if global.DEBUG {
|
|
||||||
log.Printf("%s[%s]: OKTimestamp: (%v) >= %s[%s](%v): !%v", v.Kind(), v.GetName(), x, n.Kind(), n.GetName(), y, x >= y)
|
|
||||||
}
|
|
||||||
if x >= y {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Poke notifies nodes after me in the dependency graph that they need refreshing...
|
|
||||||
// NOTE: this assumes that this can never fail or need to be rescheduled
|
|
||||||
func (g *Graph) Poke(v *Vertex, activity bool) {
|
|
||||||
// these are all the vertices pointing AWAY FROM v, eg: v -> ???
|
|
||||||
for _, n := range g.OutgoingGraphEdges(v) {
|
|
||||||
// XXX: if we're in state event and haven't been cancelled by
|
|
||||||
// apply, then we can cancel a poke to a child, right? XXX
|
|
||||||
// XXX: if n.Res.getState() != resources.ResStateEvent { // is this correct?
|
|
||||||
if true { // XXX
|
|
||||||
if global.DEBUG {
|
|
||||||
log.Printf("%s[%s]: Poke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
|
|
||||||
}
|
|
||||||
n.SendEvent(event.EventPoke, false, activity) // XXX: can this be switched to sync?
|
|
||||||
} else {
|
|
||||||
if global.DEBUG {
|
|
||||||
log.Printf("%s[%s]: Poke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// BackPoke pokes the pre-requisites that are stale and need to run before I can run.
|
|
||||||
func (g *Graph) BackPoke(v *Vertex) {
|
|
||||||
// these are all the vertices pointing TO v, eg: ??? -> v
|
|
||||||
for _, n := range g.IncomingGraphEdges(v) {
|
|
||||||
x, y, s := v.GetTimestamp(), n.GetTimestamp(), n.Res.GetState()
|
|
||||||
// if the parent timestamp needs poking AND it's not in state
|
|
||||||
// ResStateEvent, then poke it. If the parent is in ResStateEvent it
|
|
||||||
// means that an event is pending, so we'll be expecting a poke
|
|
||||||
// back soon, so we can safely discard the extra parent poke...
|
|
||||||
// TODO: implement a stateLT (less than) to tell if something
|
|
||||||
// happens earlier in the state cycle and that doesn't wrap nil
|
|
||||||
if x >= y && (s != resources.ResStateEvent && s != resources.ResStateCheckApply) {
|
|
||||||
if global.DEBUG {
|
|
||||||
log.Printf("%s[%s]: BackPoke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
|
|
||||||
}
|
|
||||||
n.SendEvent(event.EventBackPoke, false, false) // XXX: can this be switched to sync?
|
|
||||||
} else {
|
|
||||||
if global.DEBUG {
|
|
||||||
log.Printf("%s[%s]: BackPoke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process is the primary function to execute for a particular vertex in the graph.
|
|
||||||
func (g *Graph) Process(v *Vertex) error {
|
|
||||||
obj := v.Res
|
|
||||||
if global.DEBUG {
|
|
||||||
log.Printf("%s[%s]: Process()", obj.Kind(), obj.GetName())
|
|
||||||
}
|
|
||||||
obj.SetState(resources.ResStateEvent)
|
|
||||||
var ok = true
|
|
||||||
var apply = false // did we run an apply?
|
|
||||||
// is it okay to run dependency wise right now?
|
|
||||||
// if not, that's okay because when the dependency runs, it will poke
|
|
||||||
// us back and we will run if needed then!
|
|
||||||
if g.OKTimestamp(v) {
|
|
||||||
if global.DEBUG {
|
|
||||||
log.Printf("%s[%s]: OKTimestamp(%v)", obj.Kind(), obj.GetName(), v.GetTimestamp())
|
|
||||||
}
|
|
||||||
|
|
||||||
obj.SetState(resources.ResStateCheckApply)
|
|
||||||
|
|
||||||
// connect any senders to receivers and detect if values changed
|
|
||||||
if changed, err := obj.SendRecv(obj); err != nil {
|
|
||||||
return errwrap.Wrapf(err, "could not SendRecv in Process")
|
|
||||||
} else if changed {
|
|
||||||
obj.StateOK(false) // invalidate cache
|
|
||||||
}
|
|
||||||
|
|
||||||
// if this fails, don't UpdateTimestamp()
|
|
||||||
checkok, err := obj.CheckApply(!obj.Meta().Noop)
|
|
||||||
if checkok && err != nil { // should never return this way
|
|
||||||
log.Fatalf("%s[%s]: CheckApply(): %t, %+v", obj.Kind(), obj.GetName(), checkok, err)
|
|
||||||
}
|
|
||||||
if global.DEBUG {
|
|
||||||
log.Printf("%s[%s]: CheckApply(): %t, %v", obj.Kind(), obj.GetName(), checkok, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !checkok { // if state *was* not ok, we had to have apply'ed
|
|
||||||
if err != nil { // error during check or apply
|
|
||||||
ok = false
|
|
||||||
} else {
|
|
||||||
apply = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// when noop is true we always want to update timestamp
|
|
||||||
if obj.Meta().Noop && err == nil {
|
|
||||||
ok = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if ok {
|
|
||||||
// update this timestamp *before* we poke or the poked
|
|
||||||
// nodes might fail due to having a too old timestamp!
|
|
||||||
v.UpdateTimestamp() // this was touched...
|
|
||||||
obj.SetState(resources.ResStatePoking) // can't cancel parent poke
|
|
||||||
g.Poke(v, apply)
|
|
||||||
}
|
|
||||||
// poke at our pre-req's instead since they need to refresh/run...
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// else... only poke at the pre-req's that need to run
|
|
||||||
go g.BackPoke(v)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// SentinelErr is a sentinal as an error type that wraps an arbitrary error.
|
|
||||||
type SentinelErr struct {
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
// Error is the required method to fulfill the error type.
|
|
||||||
func (obj *SentinelErr) Error() string {
|
|
||||||
return obj.err.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Worker is the common run frontend of the vertex. It handles all of the retry
|
|
||||||
// and retry delay common code, and ultimately returns the final status of this
|
|
||||||
// vertex execution.
|
|
||||||
func (g *Graph) Worker(v *Vertex) error {
|
|
||||||
// listen for chan events from Watch() and run
|
|
||||||
// the Process() function when they're received
|
|
||||||
// this avoids us having to pass the data into
|
|
||||||
// the Watch() function about which graph it is
|
|
||||||
// running on, which isolates things nicely...
|
|
||||||
obj := v.Res
|
|
||||||
chanProcess := make(chan event.Event)
|
|
||||||
go func() {
|
|
||||||
running := false
|
|
||||||
var timer = time.NewTimer(time.Duration(math.MaxInt64)) // longest duration
|
|
||||||
if !timer.Stop() {
|
|
||||||
<-timer.C // unnecessary, shouldn't happen
|
|
||||||
}
|
|
||||||
var delay = time.Duration(v.Meta().Delay) * time.Millisecond
|
|
||||||
var retry = v.Meta().Retry // number of tries left, -1 for infinite
|
|
||||||
var saved event.Event
|
|
||||||
Loop:
|
|
||||||
for {
|
|
||||||
// this has to be synchronous, because otherwise the Res
|
|
||||||
// event loop will keep running and change state,
|
|
||||||
// causing the converged timeout to fire!
|
|
||||||
select {
|
|
||||||
case event, ok := <-chanProcess: // must use like this
|
|
||||||
if running && ok {
|
|
||||||
// we got an event that wasn't a close,
|
|
||||||
// while we were waiting for the timer!
|
|
||||||
// if this happens, it might be a bug:(
|
|
||||||
log.Fatalf("%s[%s]: Worker: Unexpected event: %+v", v.Kind(), v.GetName(), event)
|
|
||||||
}
|
|
||||||
if !ok { // chanProcess closed, let's exit
|
|
||||||
break Loop // no event, so no ack!
|
|
||||||
}
|
|
||||||
|
|
||||||
// the above mentioned synchronous part, is the
|
|
||||||
// running of this function, paired with an ack.
|
|
||||||
if e := g.Process(v); e != nil {
|
|
||||||
saved = event
|
|
||||||
log.Printf("%s[%s]: CheckApply errored: %v", v.Kind(), v.GetName(), e)
|
|
||||||
if retry == 0 {
|
|
||||||
// wrap the error in the sentinel
|
|
||||||
event.ACKNACK(&SentinelErr{e}) // fail the Watch()
|
|
||||||
break Loop
|
|
||||||
}
|
|
||||||
if retry > 0 { // don't decrement the -1
|
|
||||||
retry--
|
|
||||||
}
|
|
||||||
log.Printf("%s[%s]: CheckApply: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), delay.Seconds(), retry)
|
|
||||||
// start the timer...
|
|
||||||
timer.Reset(delay)
|
|
||||||
running = true
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
retry = v.Meta().Retry // reset on success
|
|
||||||
event.ACK() // sync
|
|
||||||
|
|
||||||
case <-timer.C:
|
|
||||||
if !timer.Stop() {
|
|
||||||
//<-timer.C // blocks, docs are wrong!
|
|
||||||
}
|
|
||||||
running = false
|
|
||||||
log.Printf("%s[%s]: CheckApply delay expired!", v.Kind(), v.GetName())
|
|
||||||
// re-send this failed event, to trigger a CheckApply()
|
|
||||||
go func() { chanProcess <- saved }()
|
|
||||||
// TODO: should we send a fake event instead?
|
|
||||||
//saved = nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
var err error // propagate the error up (this is a permanent BAD error!)
|
|
||||||
// the watch delay runs inside of the Watch resource loop, so that it
|
|
||||||
// can still process signals and exit if needed. It shouldn't run any
|
|
||||||
// resource specific code since this is supposed to be a retry delay.
|
|
||||||
// NOTE: we're using the same retry and delay metaparams that CheckApply
|
|
||||||
// uses. This is for practicality. We can separate them later if needed!
|
|
||||||
var watchDelay time.Duration
|
|
||||||
var watchRetry = v.Meta().Retry // number of tries left, -1 for infinite
|
|
||||||
// watch blocks until it ends, & errors to retry
|
|
||||||
for {
|
|
||||||
// TODO: do we have to stop the converged-timeout when in this block (perhaps we're in the delay block!)
|
|
||||||
// TODO: should we setup/manage some of the converged timeout stuff in here anyways?
|
|
||||||
|
|
||||||
// if a retry-delay was requested, wait, but don't block our events!
|
|
||||||
if watchDelay > 0 {
|
|
||||||
//var pendingSendEvent bool
|
|
||||||
timer := time.NewTimer(watchDelay)
|
|
||||||
Loop:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-timer.C: // the wait is over
|
|
||||||
break Loop // critical
|
|
||||||
|
|
||||||
// TODO: resources could have a separate exit channel to avoid this complexity!?
|
|
||||||
case event := <-obj.Events():
|
|
||||||
// NOTE: this code should match the similar Res code!
|
|
||||||
//cuid.SetConverged(false) // TODO: ?
|
|
||||||
if exit, send := obj.ReadEvent(&event); exit {
|
|
||||||
return nil // exit
|
|
||||||
} else if send {
|
|
||||||
// if we dive down this rabbit hole, our
|
|
||||||
// timer.C won't get seen until we get out!
|
|
||||||
// in this situation, the Watch() is blocked
|
|
||||||
// from performing until CheckApply returns
|
|
||||||
// successfully, or errors out. This isn't
|
|
||||||
// so bad, but we should document it. Is it
|
|
||||||
// possible that some resource *needs* Watch
|
|
||||||
// to run to be able to execute a CheckApply?
|
|
||||||
// That situation shouldn't be common, and
|
|
||||||
// should probably not be allowed. Can we
|
|
||||||
// avoid it though?
|
|
||||||
//if exit, err := doSend(); exit || err != nil {
|
|
||||||
// return err // we exit or bubble up a NACK...
|
|
||||||
//}
|
|
||||||
// Instead of doing the above, we can
|
|
||||||
// add events to a pending list, and
|
|
||||||
// when we finish the delay, we can run
|
|
||||||
// them.
|
|
||||||
//pendingSendEvent = true // all events are identical for now...
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
timer.Stop() // it's nice to cleanup
|
|
||||||
log.Printf("%s[%s]: Watch delay expired!", v.Kind(), v.GetName())
|
|
||||||
// NOTE: we can avoid the send if running Watch guarantees
|
|
||||||
// one CheckApply event on startup!
|
|
||||||
//if pendingSendEvent { // TODO: should this become a list in the future?
|
|
||||||
// if exit, err := obj.DoSend(chanProcess, ""); exit || err != nil {
|
|
||||||
// return err // we exit or bubble up a NACK...
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: reset the watch retry count after some amount of success
|
|
||||||
e := v.Res.Watch(chanProcess)
|
|
||||||
if e == nil { // exit signal
|
|
||||||
err = nil // clean exit
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if sentinelErr, ok := e.(*SentinelErr); ok { // unwrap the sentinel
|
|
||||||
err = sentinelErr.err
|
|
||||||
break // sentinel means, perma-exit
|
|
||||||
}
|
|
||||||
log.Printf("%s[%s]: Watch errored: %v", v.Kind(), v.GetName(), e)
|
|
||||||
if watchRetry == 0 {
|
|
||||||
err = fmt.Errorf("Permanent watch error: %v", e)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if watchRetry > 0 { // don't decrement the -1
|
|
||||||
watchRetry--
|
|
||||||
}
|
|
||||||
watchDelay = time.Duration(v.Meta().Delay) * time.Millisecond
|
|
||||||
log.Printf("%s[%s]: Watch: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), watchDelay.Seconds(), watchRetry)
|
|
||||||
// We need to trigger a CheckApply after Watch restarts, so that
|
|
||||||
// we catch any lost events that happened while down. We do this
|
|
||||||
// by getting the Watch resource to send one event once it's up!
|
|
||||||
//v.SendEvent(eventPoke, false, false)
|
|
||||||
}
|
|
||||||
close(chanProcess)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start is a main kick to start the graph. It goes through in reverse topological
|
|
||||||
// sort order so that events can't hit un-started vertices.
|
|
||||||
func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
|
|
||||||
log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState())
|
|
||||||
defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState())
|
|
||||||
t, _ := g.TopologicalSort()
|
|
||||||
// TODO: only calculate indegree if `first` is true to save resources
|
|
||||||
indegree := g.InDegree() // compute all of the indegree's
|
|
||||||
for _, v := range Reverse(t) {
|
|
||||||
|
|
||||||
if !v.Res.IsWatching() { // if Watch() is not running...
|
|
||||||
wg.Add(1)
|
|
||||||
// must pass in value to avoid races...
|
|
||||||
// see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/
|
|
||||||
go func(vv *Vertex) {
|
|
||||||
defer wg.Done()
|
|
||||||
// TODO: if a sufficient number of workers error,
|
|
||||||
// should something be done? Will these restart
|
|
||||||
// after perma-failure if we have a graph change?
|
|
||||||
if err := g.Worker(vv); err != nil { // contains the Watch and CheckApply loops
|
|
||||||
log.Printf("%s[%s]: Exited with failure: %v", vv.Kind(), vv.GetName(), err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
log.Printf("%s[%s]: Exited", vv.Kind(), vv.GetName())
|
|
||||||
}(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
// selective poke: here we reduce the number of initial pokes
|
|
||||||
// to the minimum required to activate every vertex in the
|
|
||||||
// graph, either by direct action, or by getting poked by a
|
|
||||||
// vertex that was previously activated. if we poke each vertex
|
|
||||||
// that has no incoming edges, then we can be sure to reach the
|
|
||||||
// whole graph. Please note: this may mask certain optimization
|
|
||||||
// failures, such as any poke limiting code in Poke() or
|
|
||||||
// BackPoke(). You might want to disable this selective start
|
|
||||||
// when experimenting with and testing those elements.
|
|
||||||
// if we are unpausing (since it's not the first run of this
|
|
||||||
// function) we need to poke to *unpause* every graph vertex,
|
|
||||||
// and not just selectively the subset with no indegree.
|
|
||||||
if (!first) || indegree[v] == 0 {
|
|
||||||
// ensure state is started before continuing on to next vertex
|
|
||||||
for !v.SendEvent(event.EventStart, true, false) {
|
|
||||||
if global.DEBUG {
|
|
||||||
// if SendEvent fails, we aren't up yet
|
|
||||||
log.Printf("%s[%s]: Retrying SendEvent(Start)", v.Kind(), v.GetName())
|
|
||||||
// sleep here briefly or otherwise cause
|
|
||||||
// a different goroutine to be scheduled
|
|
||||||
time.Sleep(1 * time.Millisecond)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pause sends pause events to the graph in a topological sort order.
|
|
||||||
func (g *Graph) Pause() {
|
|
||||||
log.Printf("State: %v -> %v", g.setState(graphStatePausing), g.getState())
|
|
||||||
defer log.Printf("State: %v -> %v", g.setState(graphStatePaused), g.getState())
|
|
||||||
t, _ := g.TopologicalSort()
|
|
||||||
for _, v := range t { // squeeze out the events...
|
|
||||||
v.SendEvent(event.EventPause, true, false)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Exit sends exit events to the graph in a topological sort order.
|
|
||||||
func (g *Graph) Exit() {
|
|
||||||
if g == nil {
|
|
||||||
return
|
|
||||||
} // empty graph that wasn't populated yet
|
|
||||||
t, _ := g.TopologicalSort()
|
|
||||||
for _, v := range t { // squeeze out the events...
|
|
||||||
// turn off the taps...
|
|
||||||
// XXX: consider instead doing this by closing the Res.events channel instead?
|
|
||||||
// XXX: do this by sending an exit signal, and then returning
|
|
||||||
// when we hit the 'default' in the select statement!
|
|
||||||
// XXX: we can do this to quiesce, but it's not necessary now
|
|
||||||
|
|
||||||
v.SendEvent(event.EventExit, true, false)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// GraphSync updates the oldGraph so that it matches the newGraph receiver. It
|
// GraphSync updates the oldGraph so that it matches the newGraph receiver. It
|
||||||
// leaves identical elements alone so that they don't need to be refreshed.
|
// leaves identical elements alone so that they don't need to be refreshed.
|
||||||
// FIXME: add test cases
|
// FIXME: add test cases
|
||||||
|
|||||||
@@ -284,87 +284,6 @@ func (obj *BaseRes) SetState(state ResState) {
|
|||||||
obj.state = state
|
obj.state = state
|
||||||
}
|
}
|
||||||
|
|
||||||
// DoSend sends off an event, but doesn't block the incoming event queue. It can
|
|
||||||
// also recursively call itself when events need processing during the wait.
|
|
||||||
// I'm not completely comfortable with this fn, but it will have to do for now.
|
|
||||||
func (obj *BaseRes) DoSend(processChan chan event.Event, comment string) (bool, error) {
|
|
||||||
resp := event.NewResp()
|
|
||||||
processChan <- event.Event{Name: event.EventNil, Resp: resp, Msg: comment, Activity: true} // trigger process
|
|
||||||
e := resp.Wait()
|
|
||||||
return false, e // XXX: at the moment, we don't use the exit bool.
|
|
||||||
// XXX: this can cause a deadlock. do we need to recursively send? fix event stuff!
|
|
||||||
//select {
|
|
||||||
//case e := <-resp: // wait for the ACK()
|
|
||||||
// if e != nil { // we got a NACK
|
|
||||||
// return true, e // exit with error
|
|
||||||
// }
|
|
||||||
//case event := <-obj.events:
|
|
||||||
// // NOTE: this code should match the similar code below!
|
|
||||||
// //cuid.SetConverged(false) // TODO: ?
|
|
||||||
// if exit, send := obj.ReadEvent(&event); exit {
|
|
||||||
// return true, nil // exit, without error
|
|
||||||
// } else if send {
|
|
||||||
// return obj.DoSend(processChan, comment) // recurse
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
//return false, nil // return, no error or exit signal
|
|
||||||
}
|
|
||||||
|
|
||||||
// SendEvent pushes an event into the message queue for a particular vertex
|
|
||||||
func (obj *BaseRes) SendEvent(ev event.EventName, sync bool, activity bool) bool {
|
|
||||||
// TODO: isn't this race-y ?
|
|
||||||
if !obj.IsWatching() { // element has already exited
|
|
||||||
return false // if we don't return, we'll block on the send
|
|
||||||
}
|
|
||||||
if !sync {
|
|
||||||
obj.events <- event.Event{Name: ev, Resp: nil, Msg: "", Activity: activity}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
resp := event.NewResp()
|
|
||||||
obj.events <- event.Event{Name: ev, Resp: resp, Msg: "", Activity: activity}
|
|
||||||
resp.ACKWait() // waits until true (nil) value
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReadEvent processes events when a select gets one, and handles the pause
|
|
||||||
// code too! The return values specify if we should exit and poke respectively.
|
|
||||||
func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, poke bool) {
|
|
||||||
ev.ACK()
|
|
||||||
switch ev.Name {
|
|
||||||
case event.EventStart:
|
|
||||||
return false, true
|
|
||||||
|
|
||||||
case event.EventPoke:
|
|
||||||
return false, true
|
|
||||||
|
|
||||||
case event.EventBackPoke:
|
|
||||||
return false, true // forward poking in response to a back poke!
|
|
||||||
|
|
||||||
case event.EventExit:
|
|
||||||
return true, false
|
|
||||||
|
|
||||||
case event.EventPause:
|
|
||||||
// wait for next event to continue
|
|
||||||
select {
|
|
||||||
case e := <-obj.Events():
|
|
||||||
e.ACK()
|
|
||||||
if e.Name == event.EventExit {
|
|
||||||
return true, false
|
|
||||||
} else if e.Name == event.EventStart { // eventContinue
|
|
||||||
return false, false // don't poke on unpause!
|
|
||||||
} else {
|
|
||||||
// if we get a poke event here, it's a bug!
|
|
||||||
log.Fatalf("%s[%s]: Unknown event: %v, while paused!", obj.Kind(), obj.GetName(), e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
default:
|
|
||||||
log.Fatal("Unknown event: ", ev)
|
|
||||||
}
|
|
||||||
return true, false // required to keep the stupid go compiler happy
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsStateOK returns the cached state value.
|
// IsStateOK returns the cached state value.
|
||||||
func (obj *BaseRes) IsStateOK() bool {
|
func (obj *BaseRes) IsStateOK() bool {
|
||||||
return obj.isStateOK
|
return obj.isStateOK
|
||||||
|
|||||||
@@ -22,12 +22,94 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
|
||||||
|
"github.com/purpleidea/mgmt/event"
|
||||||
"github.com/purpleidea/mgmt/global"
|
"github.com/purpleidea/mgmt/global"
|
||||||
|
|
||||||
multierr "github.com/hashicorp/go-multierror"
|
multierr "github.com/hashicorp/go-multierror"
|
||||||
errwrap "github.com/pkg/errors"
|
errwrap "github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// DoSend sends off an event, but doesn't block the incoming event queue. It can
|
||||||
|
// also recursively call itself when events need processing during the wait.
|
||||||
|
// I'm not completely comfortable with this fn, but it will have to do for now.
|
||||||
|
func (obj *BaseRes) DoSend(processChan chan event.Event, comment string) (bool, error) {
|
||||||
|
resp := event.NewResp()
|
||||||
|
processChan <- event.Event{Name: event.EventNil, Resp: resp, Msg: comment, Activity: true} // trigger process
|
||||||
|
e := resp.Wait()
|
||||||
|
return false, e // XXX: at the moment, we don't use the exit bool.
|
||||||
|
// XXX: this can cause a deadlock. do we need to recursively send? fix event stuff!
|
||||||
|
//select {
|
||||||
|
//case e := <-resp: // wait for the ACK()
|
||||||
|
// if e != nil { // we got a NACK
|
||||||
|
// return true, e // exit with error
|
||||||
|
// }
|
||||||
|
//case event := <-obj.events:
|
||||||
|
// // NOTE: this code should match the similar code below!
|
||||||
|
// //cuid.SetConverged(false) // TODO: ?
|
||||||
|
// if exit, send := obj.ReadEvent(&event); exit {
|
||||||
|
// return true, nil // exit, without error
|
||||||
|
// } else if send {
|
||||||
|
// return obj.DoSend(processChan, comment) // recurse
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
//return false, nil // return, no error or exit signal
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendEvent pushes an event into the message queue for a particular vertex
|
||||||
|
func (obj *BaseRes) SendEvent(ev event.EventName, sync bool, activity bool) bool {
|
||||||
|
// TODO: isn't this race-y ?
|
||||||
|
if !obj.IsWatching() { // element has already exited
|
||||||
|
return false // if we don't return, we'll block on the send
|
||||||
|
}
|
||||||
|
if !sync {
|
||||||
|
obj.events <- event.Event{Name: ev, Resp: nil, Msg: "", Activity: activity}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := event.NewResp()
|
||||||
|
obj.events <- event.Event{Name: ev, Resp: resp, Msg: "", Activity: activity}
|
||||||
|
resp.ACKWait() // waits until true (nil) value
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadEvent processes events when a select gets one, and handles the pause
|
||||||
|
// code too! The return values specify if we should exit and poke respectively.
|
||||||
|
func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, poke bool) {
|
||||||
|
ev.ACK()
|
||||||
|
switch ev.Name {
|
||||||
|
case event.EventStart:
|
||||||
|
return false, true
|
||||||
|
|
||||||
|
case event.EventPoke:
|
||||||
|
return false, true
|
||||||
|
|
||||||
|
case event.EventBackPoke:
|
||||||
|
return false, true // forward poking in response to a back poke!
|
||||||
|
|
||||||
|
case event.EventExit:
|
||||||
|
return true, false
|
||||||
|
|
||||||
|
case event.EventPause:
|
||||||
|
// wait for next event to continue
|
||||||
|
select {
|
||||||
|
case e := <-obj.Events():
|
||||||
|
e.ACK()
|
||||||
|
if e.Name == event.EventExit {
|
||||||
|
return true, false
|
||||||
|
} else if e.Name == event.EventStart { // eventContinue
|
||||||
|
return false, false // don't poke on unpause!
|
||||||
|
} else {
|
||||||
|
// if we get a poke event here, it's a bug!
|
||||||
|
log.Fatalf("%s[%s]: Unknown event: %v, while paused!", obj.Kind(), obj.GetName(), e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
log.Fatal("Unknown event: ", ev)
|
||||||
|
}
|
||||||
|
return true, false // required to keep the stupid go compiler happy
|
||||||
|
}
|
||||||
|
|
||||||
// Send points to a value that a resource will send.
|
// Send points to a value that a resource will send.
|
||||||
type Send struct {
|
type Send struct {
|
||||||
Res Res // a handle to the resource which is sending a value
|
Res Res // a handle to the resource which is sending a value
|
||||||
|
|||||||
Reference in New Issue
Block a user