pgraph, resources: Major refactoring continued
There was simply some technical debt I needed to kill off. Sorry for not splitting this up into more patches.
This commit is contained in:
@@ -35,17 +35,27 @@ import (
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// SentinelErr is a sentinal as an error type that wraps an arbitrary error.
|
||||
type SentinelErr struct {
|
||||
err error
|
||||
}
|
||||
|
||||
// Error is the required method to fulfill the error type.
|
||||
func (obj *SentinelErr) Error() string {
|
||||
return obj.err.Error()
|
||||
}
|
||||
|
||||
// OKTimestamp returns true if this element can run right now?
|
||||
func OKTimestamp(g *pgraph.Graph, v pgraph.Vertex) bool {
|
||||
func (obj *BaseRes) OKTimestamp() bool {
|
||||
// these are all the vertices pointing TO v, eg: ??? -> v
|
||||
for _, n := range g.IncomingGraphVertices(v) {
|
||||
for _, n := range obj.Graph.IncomingGraphVertices(obj.Vertex) {
|
||||
// if the vertex has a greater timestamp than any pre-req (n)
|
||||
// then we can't run right now...
|
||||
// if they're equal (eg: on init of 0) then we also can't run
|
||||
// b/c we should let our pre-req's go first...
|
||||
x, y := VtoR(v).Timestamp(), VtoR(n).Timestamp()
|
||||
if b, ok := g.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: OKTimestamp: (%v) >= %s(%v): !%v", VtoR(v).String(), x, VtoR(n).String(), y, x >= y)
|
||||
x, y := obj.Timestamp(), VtoR(n).Timestamp()
|
||||
if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: OKTimestamp: (%v) >= %s(%v): !%v", obj, x, n, y, x >= y)
|
||||
}
|
||||
if x >= y {
|
||||
return false
|
||||
@@ -55,36 +65,35 @@ func OKTimestamp(g *pgraph.Graph, v pgraph.Vertex) bool {
|
||||
}
|
||||
|
||||
// Poke tells nodes after me in the dependency graph that they need to refresh.
|
||||
func Poke(g *pgraph.Graph, v pgraph.Vertex) error {
|
||||
|
||||
func (obj *BaseRes) Poke() error {
|
||||
// if we're pausing (or exiting) then we should suspend poke's so that
|
||||
// the graph doesn't go on running forever until it's completely done!
|
||||
// this is an optional feature which we can do by default on user exit
|
||||
if b, ok := g.Value("fastpause"); ok && util.Bool(b) {
|
||||
if b, ok := obj.Graph.Value("fastpause"); ok && util.Bool(b) {
|
||||
return nil // TODO: should this be an error instead?
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
// these are all the vertices pointing AWAY FROM v, eg: v -> ???
|
||||
for _, n := range g.OutgoingGraphVertices(v) {
|
||||
for _, n := range obj.Graph.OutgoingGraphVertices(obj.Vertex) {
|
||||
// we can skip this poke if resource hasn't done work yet... it
|
||||
// needs to be poked if already running, or not running though!
|
||||
// TODO: does this need an || activity flag?
|
||||
if VtoR(n).GetState() != ResStateProcess {
|
||||
if b, ok := g.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: Poke: %s", VtoR(v).String(), VtoR(n).String())
|
||||
if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: Poke: %s", obj, n)
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(nn pgraph.Vertex) error {
|
||||
defer wg.Done()
|
||||
//edge := g.adjacency[v][nn] // lookup
|
||||
//edge := obj.Graph.adjacency[v][nn] // lookup
|
||||
//notify := edge.Notify && edge.Refresh()
|
||||
return VtoR(nn).SendEvent(event.EventPoke, nil)
|
||||
}(n)
|
||||
|
||||
} else {
|
||||
if b, ok := g.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: Poke: %s: Skipped!", VtoR(v).String(), VtoR(n).String())
|
||||
if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: Poke: %s: Skipped!", obj, n)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -94,11 +103,11 @@ func Poke(g *pgraph.Graph, v pgraph.Vertex) error {
|
||||
}
|
||||
|
||||
// BackPoke pokes the pre-requisites that are stale and need to run before I can run.
|
||||
func BackPoke(g *pgraph.Graph, v pgraph.Vertex) {
|
||||
func (obj *BaseRes) BackPoke() {
|
||||
var wg sync.WaitGroup
|
||||
// these are all the vertices pointing TO v, eg: ??? -> v
|
||||
for _, n := range g.IncomingGraphVertices(v) {
|
||||
x, y, s := VtoR(v).Timestamp(), VtoR(n).Timestamp(), VtoR(n).GetState()
|
||||
for _, n := range obj.Graph.IncomingGraphVertices(obj.Vertex) {
|
||||
x, y, s := obj.Timestamp(), VtoR(n).Timestamp(), VtoR(n).GetState()
|
||||
// If the parent timestamp needs poking AND it's not running
|
||||
// Process, then poke it. If the parent is in ResStateProcess it
|
||||
// means that an event is pending, so we'll be expecting a poke
|
||||
@@ -106,8 +115,8 @@ func BackPoke(g *pgraph.Graph, v pgraph.Vertex) {
|
||||
// TODO: implement a stateLT (less than) to tell if something
|
||||
// happens earlier in the state cycle and that doesn't wrap nil
|
||||
if x >= y && (s != ResStateProcess && s != ResStateCheckApply) {
|
||||
if b, ok := g.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: BackPoke: %s", VtoR(v).String(), VtoR(n).String())
|
||||
if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: BackPoke: %s", obj, n)
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(nn pgraph.Vertex) error {
|
||||
@@ -116,8 +125,8 @@ func BackPoke(g *pgraph.Graph, v pgraph.Vertex) {
|
||||
}(n)
|
||||
|
||||
} else {
|
||||
if b, ok := g.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: BackPoke: %s: Skipped!", VtoR(v).String(), VtoR(n).String())
|
||||
if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: BackPoke: %s: Skipped!", obj, n)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -127,9 +136,9 @@ func BackPoke(g *pgraph.Graph, v pgraph.Vertex) {
|
||||
|
||||
// RefreshPending determines if any previous nodes have a refresh pending here.
|
||||
// If this is true, it means I am expected to apply a refresh when I next run.
|
||||
func RefreshPending(g *pgraph.Graph, v pgraph.Vertex) bool {
|
||||
func (obj *BaseRes) RefreshPending() bool {
|
||||
var refresh bool
|
||||
for _, edge := range g.IncomingGraphEdges(v) {
|
||||
for _, edge := range obj.Graph.IncomingGraphEdges(obj.Vertex) {
|
||||
// if we asked for a notify *and* if one is pending!
|
||||
if edge.Notify && edge.Refresh() {
|
||||
refresh = true
|
||||
@@ -140,8 +149,8 @@ func RefreshPending(g *pgraph.Graph, v pgraph.Vertex) bool {
|
||||
}
|
||||
|
||||
// SetUpstreamRefresh sets the refresh value to any upstream vertices.
|
||||
func SetUpstreamRefresh(g *pgraph.Graph, v pgraph.Vertex, b bool) {
|
||||
for _, edge := range g.IncomingGraphEdges(v) {
|
||||
func (obj *BaseRes) SetUpstreamRefresh(b bool) {
|
||||
for _, edge := range obj.Graph.IncomingGraphEdges(obj.Vertex) {
|
||||
if edge.Notify {
|
||||
edge.SetRefresh(b)
|
||||
}
|
||||
@@ -149,8 +158,8 @@ func SetUpstreamRefresh(g *pgraph.Graph, v pgraph.Vertex, b bool) {
|
||||
}
|
||||
|
||||
// SetDownstreamRefresh sets the refresh value to any downstream vertices.
|
||||
func SetDownstreamRefresh(g *pgraph.Graph, v pgraph.Vertex, b bool) {
|
||||
for _, edge := range g.OutgoingGraphEdges(v) {
|
||||
func (obj *BaseRes) SetDownstreamRefresh(b bool) {
|
||||
for _, edge := range obj.Graph.OutgoingGraphEdges(obj.Vertex) {
|
||||
// if we asked for a notify *and* if one is pending!
|
||||
if edge.Notify {
|
||||
edge.SetRefresh(b)
|
||||
@@ -159,10 +168,9 @@ func SetDownstreamRefresh(g *pgraph.Graph, v pgraph.Vertex, b bool) {
|
||||
}
|
||||
|
||||
// Process is the primary function to execute for a particular vertex in the graph.
|
||||
func Process(g *pgraph.Graph, v pgraph.Vertex) error {
|
||||
obj := VtoR(v)
|
||||
if b, ok := g.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s[%s]: Process()", obj.GetKind(), obj.GetName())
|
||||
func (obj *BaseRes) Process() error {
|
||||
if obj.debug {
|
||||
log.Printf("%s: Process()", obj)
|
||||
}
|
||||
// FIXME: should these SetState methods be here or after the sema code?
|
||||
defer obj.SetState(ResStateNil) // reset state when finished
|
||||
@@ -171,13 +179,13 @@ func Process(g *pgraph.Graph, v pgraph.Vertex) error {
|
||||
// is it okay to run dependency wise right now?
|
||||
// if not, that's okay because when the dependency runs, it will poke
|
||||
// us back and we will run if needed then!
|
||||
if !OKTimestamp(g, v) {
|
||||
go BackPoke(g, v)
|
||||
if !obj.OKTimestamp() {
|
||||
go obj.BackPoke()
|
||||
return nil
|
||||
}
|
||||
// timestamp must be okay...
|
||||
if b, ok := g.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s[%s]: OKTimestamp(%v)", obj.GetKind(), obj.GetName(), VtoR(v).Timestamp())
|
||||
if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: OKTimestamp(%v)", obj, obj.Timestamp())
|
||||
}
|
||||
|
||||
// semaphores!
|
||||
@@ -188,23 +196,23 @@ func Process(g *pgraph.Graph, v pgraph.Vertex) error {
|
||||
// The exception is that semaphores with a zero count will always block!
|
||||
// TODO: Add a close mechanism to close/unblock zero count semaphores...
|
||||
semas := obj.Meta().Sema
|
||||
if b, ok := g.Value("debug"); ok && util.Bool(b) && len(semas) > 0 {
|
||||
log.Printf("%s[%s]: Sema: P(%s)", obj.GetKind(), obj.GetName(), strings.Join(semas, ", "))
|
||||
if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) && len(semas) > 0 {
|
||||
log.Printf("%s: Sema: P(%s)", obj, strings.Join(semas, ", "))
|
||||
}
|
||||
if err := SemaLock(g, semas); err != nil { // lock
|
||||
if err := SemaLock(obj.Graph, semas); err != nil { // lock
|
||||
// NOTE: in practice, this might not ever be truly necessary...
|
||||
return fmt.Errorf("shutdown of semaphores")
|
||||
}
|
||||
defer SemaUnlock(g, semas) // unlock
|
||||
if b, ok := g.Value("debug"); ok && util.Bool(b) && len(semas) > 0 {
|
||||
defer log.Printf("%s[%s]: Sema: V(%s)", obj.GetKind(), obj.GetName(), strings.Join(semas, ", "))
|
||||
defer SemaUnlock(obj.Graph, semas) // unlock
|
||||
if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) && len(semas) > 0 {
|
||||
defer log.Printf("%s: Sema: V(%s)", obj, strings.Join(semas, ", "))
|
||||
}
|
||||
|
||||
var ok = true
|
||||
var applied = false // did we run an apply?
|
||||
|
||||
// connect any senders to receivers and detect if values changed
|
||||
if updated, err := obj.SendRecv(obj); err != nil {
|
||||
if updated, err := obj.SendRecv(obj.Res); err != nil {
|
||||
return errwrap.Wrapf(err, "could not SendRecv in Process")
|
||||
} else if len(updated) > 0 {
|
||||
for _, changed := range updated {
|
||||
@@ -220,12 +228,12 @@ func Process(g *pgraph.Graph, v pgraph.Vertex) error {
|
||||
var checkOK bool
|
||||
var err error
|
||||
|
||||
if b, ok := g.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s[%s]: CheckApply(%t)", obj.GetKind(), obj.GetName(), !noop)
|
||||
if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: CheckApply(%t)", obj, !noop)
|
||||
}
|
||||
|
||||
// lookup the refresh (notification) variable
|
||||
refresh = RefreshPending(g, v) // do i need to perform a refresh?
|
||||
refresh = obj.RefreshPending() // do i need to perform a refresh?
|
||||
obj.SetRefresh(refresh) // tell the resource
|
||||
|
||||
// changes can occur after this...
|
||||
@@ -244,38 +252,37 @@ func Process(g *pgraph.Graph, v pgraph.Vertex) error {
|
||||
// run the CheckApply!
|
||||
} else {
|
||||
// if this fails, don't UpdateTimestamp()
|
||||
checkOK, err = obj.CheckApply(!noop)
|
||||
checkOK, err = obj.Res.CheckApply(!noop)
|
||||
|
||||
if promErr := obj.Prometheus().UpdateCheckApplyTotal(obj.GetKind(), !noop, !checkOK, err != nil); promErr != nil {
|
||||
if promErr := obj.Data().Prometheus.UpdateCheckApplyTotal(obj.GetKind(), !noop, !checkOK, err != nil); promErr != nil {
|
||||
// TODO: how to error correctly
|
||||
log.Printf("%s: Prometheus.UpdateCheckApplyTotal() errored: %v", VtoR(v).String(), err)
|
||||
log.Printf("%s: Prometheus.UpdateCheckApplyTotal() errored: %v", obj, err)
|
||||
}
|
||||
// TODO: Can the `Poll` converged timeout tracking be a
|
||||
// more general method for all converged timeouts? this
|
||||
// would simplify the resources by removing boilerplate
|
||||
if VtoR(v).Meta().Poll > 0 {
|
||||
if obj.Meta().Poll > 0 {
|
||||
if !checkOK { // something changed, restart timer
|
||||
cuid, _, _ := VtoR(v).ConvergerUIDs() // get the converger uid used to report status
|
||||
cuid.ResetTimer() // activity!
|
||||
if b, ok := g.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s[%s]: Converger: ResetTimer", obj.GetKind(), obj.GetName())
|
||||
obj.cuid.ResetTimer() // activity!
|
||||
if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: Converger: ResetTimer", obj)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if checkOK && err != nil { // should never return this way
|
||||
log.Fatalf("%s[%s]: CheckApply(): %t, %+v", obj.GetKind(), obj.GetName(), checkOK, err)
|
||||
log.Fatalf("%s: CheckApply(): %t, %+v", obj, checkOK, err)
|
||||
}
|
||||
if b, ok := g.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s[%s]: CheckApply(): %t, %v", obj.GetKind(), obj.GetName(), checkOK, err)
|
||||
if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: CheckApply(): %t, %v", obj, checkOK, err)
|
||||
}
|
||||
|
||||
// if CheckApply ran without noop and without error, state should be good
|
||||
if !noop && err == nil { // aka !noop || checkOK
|
||||
obj.StateOK(true) // reset
|
||||
if refresh {
|
||||
SetUpstreamRefresh(g, v, false) // refresh happened, clear the request
|
||||
obj.SetUpstreamRefresh(false) // refresh happened, clear the request
|
||||
obj.SetRefresh(false)
|
||||
}
|
||||
}
|
||||
@@ -301,14 +308,14 @@ func Process(g *pgraph.Graph, v pgraph.Vertex) error {
|
||||
}
|
||||
|
||||
if activity { // add refresh flag to downstream edges...
|
||||
SetDownstreamRefresh(g, v, true)
|
||||
obj.SetDownstreamRefresh(true)
|
||||
}
|
||||
|
||||
// update this timestamp *before* we poke or the poked
|
||||
// nodes might fail due to having a too old timestamp!
|
||||
VtoR(v).UpdateTimestamp() // this was touched...
|
||||
obj.UpdateTimestamp() // this was touched...
|
||||
obj.SetState(ResStatePoking) // can't cancel parent poke
|
||||
if err := Poke(g, v); err != nil {
|
||||
if err := obj.Poke(); err != nil {
|
||||
return errwrap.Wrapf(err, "the Poke() failed")
|
||||
}
|
||||
}
|
||||
@@ -316,24 +323,11 @@ func Process(g *pgraph.Graph, v pgraph.Vertex) error {
|
||||
return errwrap.Wrapf(err, "could not Process() successfully")
|
||||
}
|
||||
|
||||
// SentinelErr is a sentinal as an error type that wraps an arbitrary error.
|
||||
type SentinelErr struct {
|
||||
err error
|
||||
}
|
||||
|
||||
// Error is the required method to fulfill the error type.
|
||||
func (obj *SentinelErr) Error() string {
|
||||
return obj.err.Error()
|
||||
}
|
||||
|
||||
// innerWorker is the CheckApply runner that reads from processChan.
|
||||
// TODO: would it be better if this was a method on BaseRes that took in *pgraph.Graph?
|
||||
func innerWorker(g *pgraph.Graph, v pgraph.Vertex) {
|
||||
obj := VtoR(v)
|
||||
func (obj *BaseRes) innerWorker() {
|
||||
running := false
|
||||
done := make(chan struct{})
|
||||
playback := false // do we need to run another one?
|
||||
_, wcuid, pcuid := obj.ConvergerUIDs() // get extra cuids (worker, process)
|
||||
playback := false // do we need to run another one?
|
||||
|
||||
waiting := false
|
||||
var timer = time.NewTimer(time.Duration(math.MaxInt64)) // longest duration
|
||||
@@ -341,9 +335,9 @@ func innerWorker(g *pgraph.Graph, v pgraph.Vertex) {
|
||||
<-timer.C // unnecessary, shouldn't happen
|
||||
}
|
||||
|
||||
var delay = time.Duration(VtoR(v).Meta().Delay) * time.Millisecond
|
||||
var retry = VtoR(v).Meta().Retry // number of tries left, -1 for infinite
|
||||
var limiter = rate.NewLimiter(VtoR(v).Meta().Limit, VtoR(v).Meta().Burst)
|
||||
var delay = time.Duration(obj.Meta().Delay) * time.Millisecond
|
||||
var retry = obj.Meta().Retry // number of tries left, -1 for infinite
|
||||
var limiter = rate.NewLimiter(obj.Meta().Limit, obj.Meta().Burst)
|
||||
limited := false
|
||||
|
||||
wg := &sync.WaitGroup{} // wait for Process routine to exit
|
||||
@@ -351,49 +345,49 @@ func innerWorker(g *pgraph.Graph, v pgraph.Vertex) {
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
case ev, ok := <-obj.ProcessChan(): // must use like this
|
||||
case ev, ok := <-obj.processChan: // must use like this
|
||||
if !ok { // processChan closed, let's exit
|
||||
break Loop // no event, so no ack!
|
||||
}
|
||||
if VtoR(v).Meta().Poll == 0 { // skip for polling
|
||||
wcuid.SetConverged(false)
|
||||
if obj.Meta().Poll == 0 { // skip for polling
|
||||
obj.wcuid.SetConverged(false)
|
||||
}
|
||||
|
||||
// if process started, but no action yet, skip!
|
||||
if VtoR(v).GetState() == ResStateProcess {
|
||||
if b, ok := g.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: Skipped event!", VtoR(v).String())
|
||||
if obj.GetState() == ResStateProcess {
|
||||
if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: Skipped event!", obj)
|
||||
}
|
||||
ev.ACK() // ready for next message
|
||||
VtoR(v).QuiesceGroup().Done()
|
||||
obj.quiesceGroup.Done()
|
||||
continue
|
||||
}
|
||||
|
||||
// if running, we skip running a new execution!
|
||||
// if waiting, we skip running a new execution!
|
||||
if running || waiting {
|
||||
if b, ok := g.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: Playback added!", VtoR(v).String())
|
||||
if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: Playback added!", obj)
|
||||
}
|
||||
playback = true
|
||||
ev.ACK() // ready for next message
|
||||
VtoR(v).QuiesceGroup().Done()
|
||||
obj.quiesceGroup.Done()
|
||||
continue
|
||||
}
|
||||
|
||||
// catch invalid rates
|
||||
if VtoR(v).Meta().Burst == 0 && !(VtoR(v).Meta().Limit == rate.Inf) { // blocked
|
||||
e := fmt.Errorf("%s: Permanently limited (rate != Inf, burst: 0)", VtoR(v).String())
|
||||
if obj.Meta().Burst == 0 && !(obj.Meta().Limit == rate.Inf) { // blocked
|
||||
e := fmt.Errorf("%s: Permanently limited (rate != Inf, burst: 0)", obj)
|
||||
ev.ACK() // ready for next message
|
||||
VtoR(v).QuiesceGroup().Done()
|
||||
VtoR(v).SendEvent(event.EventExit, &SentinelErr{e})
|
||||
obj.quiesceGroup.Done()
|
||||
obj.SendEvent(event.EventExit, &SentinelErr{e})
|
||||
continue
|
||||
}
|
||||
|
||||
// rate limit
|
||||
// FIXME: consider skipping rate limit check if
|
||||
// the event is a poke instead of a watch event
|
||||
if !limited && !(VtoR(v).Meta().Limit == rate.Inf) { // skip over the playback event...
|
||||
if !limited && !(obj.Meta().Limit == rate.Inf) { // skip over the playback event...
|
||||
now := time.Now()
|
||||
r := limiter.ReserveN(now, 1) // one event
|
||||
// r.OK() seems to always be true here!
|
||||
@@ -401,12 +395,12 @@ Loop:
|
||||
if d > 0 { // delay
|
||||
limited = true
|
||||
playback = true
|
||||
log.Printf("%s: Limited (rate: %v/sec, burst: %d, next: %v)", VtoR(v).String(), VtoR(v).Meta().Limit, VtoR(v).Meta().Burst, d)
|
||||
log.Printf("%s: Limited (rate: %v/sec, burst: %d, next: %v)", obj, obj.Meta().Limit, obj.Meta().Burst, d)
|
||||
// start the timer...
|
||||
timer.Reset(d)
|
||||
waiting = true // waiting for retry timer
|
||||
ev.ACK()
|
||||
VtoR(v).QuiesceGroup().Done()
|
||||
obj.quiesceGroup.Done()
|
||||
continue
|
||||
} // otherwise, we run directly!
|
||||
}
|
||||
@@ -415,60 +409,60 @@ Loop:
|
||||
wg.Add(1)
|
||||
running = true
|
||||
go func(ev *event.Event) {
|
||||
pcuid.SetConverged(false) // "block" Process
|
||||
obj.pcuid.SetConverged(false) // "block" Process
|
||||
defer wg.Done()
|
||||
if e := Process(g, v); e != nil {
|
||||
if e := obj.Process(); e != nil {
|
||||
playback = true
|
||||
log.Printf("%s: CheckApply errored: %v", VtoR(v).String(), e)
|
||||
log.Printf("%s: CheckApply errored: %v", obj, e)
|
||||
if retry == 0 {
|
||||
if err := obj.Prometheus().UpdateState(VtoR(v).String(), VtoR(v).GetKind(), prometheus.ResStateHardFail); err != nil {
|
||||
if err := obj.Data().Prometheus.UpdateState(obj.String(), obj.GetKind(), prometheus.ResStateHardFail); err != nil {
|
||||
// TODO: how to error this?
|
||||
log.Printf("%s: Prometheus.UpdateState() errored: %v", VtoR(v).String(), err)
|
||||
log.Printf("%s: Prometheus.UpdateState() errored: %v", obj, err)
|
||||
}
|
||||
|
||||
// wrap the error in the sentinel
|
||||
VtoR(v).QuiesceGroup().Done() // before the Wait that happens in SendEvent!
|
||||
VtoR(v).SendEvent(event.EventExit, &SentinelErr{e})
|
||||
obj.quiesceGroup.Done() // before the Wait that happens in SendEvent!
|
||||
obj.SendEvent(event.EventExit, &SentinelErr{e})
|
||||
return
|
||||
}
|
||||
if retry > 0 { // don't decrement the -1
|
||||
retry--
|
||||
}
|
||||
if err := obj.Prometheus().UpdateState(VtoR(v).String(), VtoR(v).GetKind(), prometheus.ResStateSoftFail); err != nil {
|
||||
if err := obj.Data().Prometheus.UpdateState(obj.String(), obj.GetKind(), prometheus.ResStateSoftFail); err != nil {
|
||||
// TODO: how to error this?
|
||||
log.Printf("%s: Prometheus.UpdateState() errored: %v", VtoR(v).String(), err)
|
||||
log.Printf("%s: Prometheus.UpdateState() errored: %v", obj, err)
|
||||
}
|
||||
log.Printf("%s: CheckApply: Retrying after %.4f seconds (%d left)", VtoR(v).String(), delay.Seconds(), retry)
|
||||
log.Printf("%s: CheckApply: Retrying after %.4f seconds (%d left)", obj, delay.Seconds(), retry)
|
||||
// start the timer...
|
||||
timer.Reset(delay)
|
||||
waiting = true // waiting for retry timer
|
||||
// don't VtoR(v).QuiesceGroup().Done() b/c
|
||||
// don't obj.quiesceGroup.Done() b/c
|
||||
// the timer is running and it can exit!
|
||||
return
|
||||
}
|
||||
retry = VtoR(v).Meta().Retry // reset on success
|
||||
close(done) // trigger
|
||||
retry = obj.Meta().Retry // reset on success
|
||||
close(done) // trigger
|
||||
}(ev)
|
||||
ev.ACK() // sync (now mostly useless)
|
||||
|
||||
case <-timer.C:
|
||||
if VtoR(v).Meta().Poll == 0 { // skip for polling
|
||||
wcuid.SetConverged(false)
|
||||
if obj.Meta().Poll == 0 { // skip for polling
|
||||
obj.wcuid.SetConverged(false)
|
||||
}
|
||||
waiting = false
|
||||
if !timer.Stop() {
|
||||
//<-timer.C // blocks, docs are wrong!
|
||||
}
|
||||
log.Printf("%s: CheckApply delay expired!", VtoR(v).String())
|
||||
log.Printf("%s: CheckApply delay expired!", obj)
|
||||
close(done)
|
||||
|
||||
// a CheckApply run (with possibly retry pause) finished
|
||||
case <-done:
|
||||
if VtoR(v).Meta().Poll == 0 { // skip for polling
|
||||
wcuid.SetConverged(false)
|
||||
if obj.Meta().Poll == 0 { // skip for polling
|
||||
obj.wcuid.SetConverged(false)
|
||||
}
|
||||
if b, ok := g.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: CheckApply finished!", VtoR(v).String())
|
||||
if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: CheckApply finished!", obj)
|
||||
}
|
||||
done = make(chan struct{}) // reset
|
||||
// re-send this event, to trigger a CheckApply()
|
||||
@@ -478,21 +472,21 @@ Loop:
|
||||
// TODO: can this experience indefinite postponement ?
|
||||
// see: https://github.com/golang/go/issues/11506
|
||||
// pause or exit is in process if not quiescing!
|
||||
if !VtoR(v).IsQuiescing() {
|
||||
if !obj.quiescing {
|
||||
playback = false
|
||||
VtoR(v).QuiesceGroup().Add(1) // lock around it, b/c still running...
|
||||
obj.quiesceGroup.Add(1) // lock around it, b/c still running...
|
||||
go func() {
|
||||
obj.Event() // replay a new event
|
||||
VtoR(v).QuiesceGroup().Done()
|
||||
obj.quiesceGroup.Done()
|
||||
}()
|
||||
}
|
||||
}
|
||||
running = false
|
||||
pcuid.SetConverged(true) // "unblock" Process
|
||||
VtoR(v).QuiesceGroup().Done()
|
||||
obj.pcuid.SetConverged(true) // "unblock" Process
|
||||
obj.quiesceGroup.Done()
|
||||
|
||||
case <-wcuid.ConvergedTimer():
|
||||
wcuid.SetConverged(true) // converged!
|
||||
case <-obj.wcuid.ConvergedTimer():
|
||||
obj.wcuid.SetConverged(true) // converged!
|
||||
continue
|
||||
}
|
||||
}
|
||||
@@ -503,22 +497,21 @@ Loop:
|
||||
// Worker is the common run frontend of the vertex. It handles all of the retry
|
||||
// and retry delay common code, and ultimately returns the final status of this
|
||||
// vertex execution.
|
||||
func Worker(g *pgraph.Graph, v pgraph.Vertex) error {
|
||||
func (obj *BaseRes) Worker() error {
|
||||
// listen for chan events from Watch() and run
|
||||
// the Process() function when they're received
|
||||
// this avoids us having to pass the data into
|
||||
// the Watch() function about which graph it is
|
||||
// running on, which isolates things nicely...
|
||||
obj := VtoR(v)
|
||||
if b, ok := g.Value("debug"); ok && util.Bool(b) {
|
||||
log.Printf("%s: Worker: Running", VtoR(v).String())
|
||||
defer log.Printf("%s: Worker: Stopped", VtoR(v).String())
|
||||
if obj.debug {
|
||||
log.Printf("%s: Worker: Running", obj)
|
||||
defer log.Printf("%s: Worker: Stopped", obj)
|
||||
}
|
||||
// run the init (should match 1-1 with Close function)
|
||||
if err := obj.Init(); err != nil {
|
||||
if err := obj.Res.Init(); err != nil {
|
||||
obj.ProcessExit()
|
||||
// always exit the worker function by finishing with Close()
|
||||
if e := obj.Close(); e != nil {
|
||||
if e := obj.Res.Close(); e != nil {
|
||||
err = multierr.Append(err, e) // list of errors
|
||||
}
|
||||
return errwrap.Wrapf(err, "could not Init() resource")
|
||||
@@ -528,16 +521,15 @@ func Worker(g *pgraph.Graph, v pgraph.Vertex) error {
|
||||
// timeout, we could inappropriately converge mid-apply!
|
||||
// avoid this by blocking convergence with a fake report
|
||||
// we also add a similar blocker around the worker loop!
|
||||
_, wcuid, pcuid := obj.ConvergerUIDs() // get extra cuids (worker, process)
|
||||
// XXX: put these in Init() ?
|
||||
wcuid.SetConverged(true) // starts off false, and waits for loop timeout
|
||||
pcuid.SetConverged(true) // starts off true, because it's not running...
|
||||
// get extra cuids (worker, process)
|
||||
obj.wcuid.SetConverged(true) // starts off false, and waits for loop timeout
|
||||
obj.pcuid.SetConverged(true) // starts off true, because it's not running...
|
||||
|
||||
wg := obj.ProcessSync()
|
||||
wg.Add(1)
|
||||
obj.processSync.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
innerWorker(g, v)
|
||||
defer obj.processSync.Done()
|
||||
obj.innerWorker()
|
||||
}()
|
||||
|
||||
var err error // propagate the error up (this is a permanent BAD error!)
|
||||
@@ -547,7 +539,7 @@ func Worker(g *pgraph.Graph, v pgraph.Vertex) error {
|
||||
// NOTE: we're using the same retry and delay metaparams that CheckApply
|
||||
// uses. This is for practicality. We can separate them later if needed!
|
||||
var watchDelay time.Duration
|
||||
var watchRetry = VtoR(v).Meta().Retry // number of tries left, -1 for infinite
|
||||
var watchRetry = obj.Meta().Retry // number of tries left, -1 for infinite
|
||||
// watch blocks until it ends, & errors to retry
|
||||
for {
|
||||
// TODO: do we have to stop the converged-timeout when in this block (perhaps we're in the delay block!)
|
||||
@@ -570,7 +562,7 @@ func Worker(g *pgraph.Graph, v pgraph.Vertex) error {
|
||||
if exit, send := obj.ReadEvent(event); exit != nil {
|
||||
obj.ProcessExit()
|
||||
err := *exit // exit err
|
||||
if e := obj.Close(); err == nil {
|
||||
if e := obj.Res.Close(); err == nil {
|
||||
err = e
|
||||
} else if e != nil {
|
||||
err = multierr.Append(err, e) // list of errors
|
||||
@@ -600,7 +592,7 @@ func Worker(g *pgraph.Graph, v pgraph.Vertex) error {
|
||||
}
|
||||
}
|
||||
timer.Stop() // it's nice to cleanup
|
||||
log.Printf("%s: Watch delay expired!", VtoR(v).String())
|
||||
log.Printf("%s: Watch delay expired!", obj)
|
||||
// NOTE: we can avoid the send if running Watch guarantees
|
||||
// one CheckApply event on startup!
|
||||
//if pendingSendEvent { // TODO: should this become a list in the future?
|
||||
@@ -612,13 +604,12 @@ func Worker(g *pgraph.Graph, v pgraph.Vertex) error {
|
||||
|
||||
// TODO: reset the watch retry count after some amount of success
|
||||
var e error
|
||||
if VtoR(v).Meta().Poll > 0 { // poll instead of watching :(
|
||||
cuid, _, _ := VtoR(v).ConvergerUIDs() // get the converger uid used to report status
|
||||
cuid.StartTimer()
|
||||
e = VtoR(v).Poll()
|
||||
cuid.StopTimer() // clean up nicely
|
||||
if obj.Meta().Poll > 0 { // poll instead of watching :(
|
||||
obj.cuid.StartTimer()
|
||||
e = obj.Poll()
|
||||
obj.cuid.StopTimer() // clean up nicely
|
||||
} else {
|
||||
e = VtoR(v).Watch() // run the watch normally
|
||||
e = obj.Res.Watch() // run the watch normally
|
||||
}
|
||||
if e == nil { // exit signal
|
||||
err = nil // clean exit
|
||||
@@ -628,7 +619,7 @@ func Worker(g *pgraph.Graph, v pgraph.Vertex) error {
|
||||
err = sentinelErr.err
|
||||
break // sentinel means, perma-exit
|
||||
}
|
||||
log.Printf("%s: Watch errored: %v", VtoR(v).String(), e)
|
||||
log.Printf("%s: Watch errored: %v", obj, e)
|
||||
if watchRetry == 0 {
|
||||
err = fmt.Errorf("Permanent watch error: %v", e)
|
||||
break
|
||||
@@ -636,8 +627,8 @@ func Worker(g *pgraph.Graph, v pgraph.Vertex) error {
|
||||
if watchRetry > 0 { // don't decrement the -1
|
||||
watchRetry--
|
||||
}
|
||||
watchDelay = time.Duration(VtoR(v).Meta().Delay) * time.Millisecond
|
||||
log.Printf("%s: Watch: Retrying after %.4f seconds (%d left)", VtoR(v).String(), watchDelay.Seconds(), watchRetry)
|
||||
watchDelay = time.Duration(obj.Meta().Delay) * time.Millisecond
|
||||
log.Printf("%s: Watch: Retrying after %.4f seconds (%d left)", obj, watchDelay.Seconds(), watchRetry)
|
||||
// We need to trigger a CheckApply after Watch restarts, so that
|
||||
// we catch any lost events that happened while down. We do this
|
||||
// by getting the Watch resource to send one event once it's up!
|
||||
@@ -646,148 +637,10 @@ func Worker(g *pgraph.Graph, v pgraph.Vertex) error {
|
||||
|
||||
obj.ProcessExit()
|
||||
// close resource and return possible errors if any
|
||||
if e := obj.Close(); err == nil {
|
||||
if e := obj.Res.Close(); err == nil {
|
||||
err = e
|
||||
} else if e != nil {
|
||||
err = multierr.Append(err, e) // list of errors
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Start is a main kick to start the graph. It goes through in reverse topological
|
||||
// sort order so that events can't hit un-started vertices.
|
||||
func Start(g *pgraph.Graph, first bool) { // start or continue
|
||||
log.Printf("State: %v -> %v", setState(g, graphStateStarting), getState(g))
|
||||
defer log.Printf("State: %v -> %v", setState(g, graphStateStarted), getState(g))
|
||||
t, _ := g.TopologicalSort()
|
||||
indegree := g.InDegree() // compute all of the indegree's
|
||||
reversed := pgraph.Reverse(t)
|
||||
wg := &sync.WaitGroup{}
|
||||
for _, v := range reversed { // run the Setup() for everyone first
|
||||
// run these in parallel, as long as we wait before continuing
|
||||
wg.Add(1)
|
||||
go func(vv pgraph.Vertex) {
|
||||
defer wg.Done()
|
||||
if !VtoR(vv).IsWorking() { // if Worker() is not running...
|
||||
VtoR(vv).Setup() // initialize some vars in the resource
|
||||
}
|
||||
}(v)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// ptr b/c: Mutex/WaitGroup must not be copied after first use
|
||||
gwg := WgFromGraph(g)
|
||||
|
||||
// run through the topological reverse, and start or unpause each vertex
|
||||
for _, v := range reversed {
|
||||
// selective poke: here we reduce the number of initial pokes
|
||||
// to the minimum required to activate every vertex in the
|
||||
// graph, either by direct action, or by getting poked by a
|
||||
// vertex that was previously activated. if we poke each vertex
|
||||
// that has no incoming edges, then we can be sure to reach the
|
||||
// whole graph. Please note: this may mask certain optimization
|
||||
// failures, such as any poke limiting code in Poke() or
|
||||
// BackPoke(). You might want to disable this selective start
|
||||
// when experimenting with and testing those elements.
|
||||
// if we are unpausing (since it's not the first run of this
|
||||
// function) we need to poke to *unpause* every graph vertex,
|
||||
// and not just selectively the subset with no indegree.
|
||||
|
||||
// let the startup code know to poke or not
|
||||
// this triggers a CheckApply AFTER Watch is Running()
|
||||
// We *don't* need to also do this to new nodes or nodes that
|
||||
// are about to get unpaused, because they'll get poked by one
|
||||
// of the indegree == 0 vertices, and an important aspect of the
|
||||
// Process() function is that even if the state is correct, it
|
||||
// will pass through the Poke so that it flows through the DAG.
|
||||
VtoR(v).Starter(indegree[v] == 0)
|
||||
|
||||
var unpause = true
|
||||
if !VtoR(v).IsWorking() { // if Worker() is not running...
|
||||
unpause = false // doesn't need unpausing on first start
|
||||
gwg.Add(1)
|
||||
// must pass in value to avoid races...
|
||||
// see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/
|
||||
go func(vv pgraph.Vertex) {
|
||||
defer gwg.Done()
|
||||
defer VtoR(vv).Reset()
|
||||
// TODO: if a sufficient number of workers error,
|
||||
// should something be done? Should these restart
|
||||
// after perma-failure if we have a graph change?
|
||||
log.Printf("%s: Started", VtoR(vv).String())
|
||||
if err := Worker(g, vv); err != nil { // contains the Watch and CheckApply loops
|
||||
log.Printf("%s: Exited with failure: %v", VtoR(vv).String(), err)
|
||||
return
|
||||
}
|
||||
log.Printf("%s: Exited", VtoR(vv).String())
|
||||
}(v)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-VtoR(v).Started(): // block until started
|
||||
case <-VtoR(v).Stopped(): // we failed on init
|
||||
// if the resource Init() fails, we don't hang!
|
||||
}
|
||||
|
||||
if unpause { // unpause (if needed)
|
||||
VtoR(v).SendEvent(event.EventStart, nil) // sync!
|
||||
}
|
||||
}
|
||||
// we wait for everyone to start before exiting!
|
||||
}
|
||||
|
||||
// Pause sends pause events to the graph in a topological sort order. If you set
|
||||
// the fastPause argument to true, then it will ask future propagation waves to
|
||||
// not run through the graph before exiting, and instead will exit much quicker.
|
||||
func Pause(g *pgraph.Graph, fastPause bool) {
|
||||
log.Printf("State: %v -> %v", setState(g, graphStatePausing), getState(g))
|
||||
defer log.Printf("State: %v -> %v", setState(g, graphStatePaused), getState(g))
|
||||
if fastPause {
|
||||
g.SetValue("fastpause", true) // set flag
|
||||
}
|
||||
t, _ := g.TopologicalSort()
|
||||
for _, v := range t { // squeeze out the events...
|
||||
VtoR(v).SendEvent(event.EventPause, nil) // sync
|
||||
}
|
||||
g.SetValue("fastpause", false) // reset flag
|
||||
}
|
||||
|
||||
// Exit sends exit events to the graph in a topological sort order.
|
||||
func Exit(g *pgraph.Graph) {
|
||||
if g == nil { // empty graph that wasn't populated yet
|
||||
return
|
||||
}
|
||||
|
||||
// FIXME: a second ^C could put this into fast pause, but do it for now!
|
||||
Pause(g, true) // implement this with pause to avoid duplicating the code
|
||||
|
||||
t, _ := g.TopologicalSort()
|
||||
for _, v := range t { // squeeze out the events...
|
||||
// turn off the taps...
|
||||
// XXX: consider instead doing this by closing the Res.events channel instead?
|
||||
// XXX: do this by sending an exit signal, and then returning
|
||||
// when we hit the 'default' in the select statement!
|
||||
// XXX: we can do this to quiesce, but it's not necessary now
|
||||
|
||||
VtoR(v).SendEvent(event.EventExit, nil)
|
||||
VtoR(v).WaitGroup().Wait()
|
||||
}
|
||||
gwg := WgFromGraph(g)
|
||||
gwg.Wait() // for now, this doesn't need to be a separate Wait() method
|
||||
}
|
||||
|
||||
// WgFromGraph returns a pointer to the waitgroup stored with the graph,
|
||||
// otherwise it panics. If one does not exist, it will create it.
|
||||
func WgFromGraph(g *pgraph.Graph) *sync.WaitGroup {
|
||||
x, exists := g.Value("waitgroup")
|
||||
if !exists {
|
||||
g.SetValue("waitgroup", &sync.WaitGroup{})
|
||||
x, _ = g.Value("waitgroup")
|
||||
}
|
||||
|
||||
wg, ok := x.(*sync.WaitGroup)
|
||||
if !ok {
|
||||
panic("not a *sync.WaitGroup")
|
||||
}
|
||||
return wg
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user