resources: Improve notification system and notify refreshes

Resources can send "refresh" notifications along edges. These messages
are sent whenever the upstream (initiating vertex) changes state. When
the changed state propagates downstream, it will be paired with a
refresh flag which can be queried in the CheckApply method of that
resource.

Future work will include a stateful refresh tracking mechanism so that
if a refresh event is generated and not consumed, it will be saved
across an interrupt (shutdown) or a crash so that it can be re-applied
on the subsequent run. This is important because the unapplied refresh
is a form of hysteresis which needs to be tracked and remembered or we
won't be able to determine that the state is wrong!

Still to do:
* Update the autogrouping code to handle the edge notify properties!
* Actually finish the stateful bool code
This commit is contained in:
James Shubin
2016-11-29 22:11:42 -05:00
parent b0a8fc165c
commit 2e718c0e9d
10 changed files with 443 additions and 167 deletions

View File

@@ -83,7 +83,20 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
v2 := pgraph.NewVertex(f1) v2 := pgraph.NewVertex(f1)
g.AddVertex(v2) g.AddVertex(v2)
s1 := &resources.SvcRes{
BaseRes: resources.BaseRes{
Name: "purpleidea",
},
State: "stopped",
}
v3 := pgraph.NewVertex(s1)
g.AddVertex(v3)
g.AddEdge(v1, v2, pgraph.NewEdge("e1")) g.AddEdge(v1, v2, pgraph.NewEdge("e1"))
e2 := pgraph.NewEdge("e2")
e2.Notify = true // send a notification from v2 to v3
g.AddEdge(v2, v3, e2)
//g, err := config.NewGraphFromConfig(obj.data.Hostname, obj.data.EmbdEtcd, obj.data.Noop) //g, err := config.NewGraphFromConfig(obj.data.Hostname, obj.data.EmbdEtcd, obj.data.Noop)
return g, nil return g, nil

View File

@@ -29,6 +29,7 @@ import (
"github.com/purpleidea/mgmt/resources" "github.com/purpleidea/mgmt/resources"
errwrap "github.com/pkg/errors" errwrap "github.com/pkg/errors"
"golang.org/x/sync/errgroup"
) )
// GetTimestamp returns the timestamp of a vertex // GetTimestamp returns the timestamp of a vertex
@@ -45,7 +46,7 @@ func (v *Vertex) UpdateTimestamp() int64 {
// OKTimestamp returns true if this element can run right now? // OKTimestamp returns true if this element can run right now?
func (g *Graph) OKTimestamp(v *Vertex) bool { func (g *Graph) OKTimestamp(v *Vertex) bool {
// these are all the vertices pointing TO v, eg: ??? -> v // these are all the vertices pointing TO v, eg: ??? -> v
for _, n := range g.IncomingGraphEdges(v) { for _, n := range g.IncomingGraphVertices(v) {
// if the vertex has a greater timestamp than any pre-req (n) // if the vertex has a greater timestamp than any pre-req (n)
// then we can't run right now... // then we can't run right now...
// if they're equal (eg: on init of 0) then we also can't run // if they're equal (eg: on init of 0) then we also can't run
@@ -63,29 +64,42 @@ func (g *Graph) OKTimestamp(v *Vertex) bool {
// Poke notifies nodes after me in the dependency graph that they need refreshing... // Poke notifies nodes after me in the dependency graph that they need refreshing...
// NOTE: this assumes that this can never fail or need to be rescheduled // NOTE: this assumes that this can never fail or need to be rescheduled
func (g *Graph) Poke(v *Vertex, activity bool) { func (g *Graph) Poke(v *Vertex, activity bool) error {
var eg errgroup.Group
// these are all the vertices pointing AWAY FROM v, eg: v -> ??? // these are all the vertices pointing AWAY FROM v, eg: v -> ???
for _, n := range g.OutgoingGraphEdges(v) { for _, n := range g.OutgoingGraphVertices(v) {
// XXX: if we're in state event and haven't been cancelled by // XXX: if we're in state event and haven't been cancelled by
// apply, then we can cancel a poke to a child, right? XXX // apply, then we can cancel a poke to a child, right? XXX
// XXX: if n.Res.getState() != resources.ResStateEvent { // is this correct? // XXX: if n.Res.getState() != resources.ResStateEvent || activity { // is this correct?
if true { // XXX if true || activity { // XXX: ???
if global.DEBUG { if global.DEBUG {
log.Printf("%s[%s]: Poke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) log.Printf("%s[%s]: Poke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
} }
n.SendEvent(event.EventPoke, false, activity) // XXX: can this be switched to sync? //wg.Add(1)
eg.Go(func() error {
//defer wg.Done()
edge := g.Adjacency[v][n] // lookup
notify := edge.Notify && edge.Refresh()
// FIXME: is it okay that this is sync?
n.SendEvent(event.EventPoke, true, notify)
// TODO: check return value?
return nil // never error for now...
})
} else { } else {
if global.DEBUG { if global.DEBUG {
log.Printf("%s[%s]: Poke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) log.Printf("%s[%s]: Poke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName())
} }
} }
} }
return eg.Wait() // wait for all the pokes to complete
} }
// BackPoke pokes the pre-requisites that are stale and need to run before I can run. // BackPoke pokes the pre-requisites that are stale and need to run before I can run.
func (g *Graph) BackPoke(v *Vertex) { func (g *Graph) BackPoke(v *Vertex) {
// these are all the vertices pointing TO v, eg: ??? -> v // these are all the vertices pointing TO v, eg: ??? -> v
for _, n := range g.IncomingGraphEdges(v) { for _, n := range g.IncomingGraphVertices(v) {
x, y, s := v.GetTimestamp(), n.GetTimestamp(), n.Res.GetState() x, y, s := v.GetTimestamp(), n.GetTimestamp(), n.Res.GetState()
// if the parent timestamp needs poking AND it's not in state // if the parent timestamp needs poking AND it's not in state
// ResStateEvent, then poke it. If the parent is in ResStateEvent it // ResStateEvent, then poke it. If the parent is in ResStateEvent it
@@ -97,7 +111,8 @@ func (g *Graph) BackPoke(v *Vertex) {
if global.DEBUG { if global.DEBUG {
log.Printf("%s[%s]: BackPoke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) log.Printf("%s[%s]: BackPoke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
} }
n.SendEvent(event.EventBackPoke, false, false) // XXX: can this be switched to sync? // FIXME: is it okay that this is sync?
n.SendEvent(event.EventBackPoke, true, false)
} else { } else {
if global.DEBUG { if global.DEBUG {
log.Printf("%s[%s]: BackPoke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) log.Printf("%s[%s]: BackPoke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName())
@@ -106,6 +121,39 @@ func (g *Graph) BackPoke(v *Vertex) {
} }
} }
// RefreshPending determines if any previous nodes have a refresh pending here.
// If this is true, it means I am expected to apply a refresh when I next run.
func (g *Graph) RefreshPending(v *Vertex) bool {
var refresh bool
for _, edge := range g.IncomingGraphEdges(v) {
// if we asked for a notify *and* if one is pending!
if edge.Notify && edge.Refresh() {
refresh = true
break
}
}
return refresh
}
// SetUpstreamRefresh sets the refresh value to any upstream vertices.
func (g *Graph) SetUpstreamRefresh(v *Vertex, b bool) {
for _, edge := range g.IncomingGraphEdges(v) {
if edge.Notify {
edge.SetRefresh(b)
}
}
}
// SetDownstreamRefresh sets the refresh value to any downstream vertices.
func (g *Graph) SetDownstreamRefresh(v *Vertex, b bool) {
for _, edge := range g.OutgoingGraphEdges(v) {
// if we asked for a notify *and* if one is pending!
if edge.Notify {
edge.SetRefresh(b)
}
}
}
// Process is the primary function to execute for a particular vertex in the graph. // Process is the primary function to execute for a particular vertex in the graph.
func (g *Graph) Process(v *Vertex) error { func (g *Graph) Process(v *Vertex) error {
obj := v.Res obj := v.Res
@@ -114,7 +162,7 @@ func (g *Graph) Process(v *Vertex) error {
} }
obj.SetState(resources.ResStateEvent) obj.SetState(resources.ResStateEvent)
var ok = true var ok = true
var apply = false // did we run an apply? var applied = false // did we run an apply?
// is it okay to run dependency wise right now? // is it okay to run dependency wise right now?
// if not, that's okay because when the dependency runs, it will poke // if not, that's okay because when the dependency runs, it will poke
// us back and we will run if needed then! // us back and we will run if needed then!
@@ -132,17 +180,33 @@ func (g *Graph) Process(v *Vertex) error {
obj.StateOK(false) // invalidate cache, mark as dirty obj.StateOK(false) // invalidate cache, mark as dirty
} }
if global.DEBUG { var noop = obj.Meta().Noop // lookup the noop value
log.Printf("%s[%s]: CheckApply(%t)", obj.Kind(), obj.GetName(), !obj.Meta().Noop) var refresh bool
}
var checkOK bool var checkOK bool
var err error var err error
if obj.IsStateOK() { // check cached state, to skip CheckApply
if global.DEBUG {
log.Printf("%s[%s]: CheckApply(%t)", obj.Kind(), obj.GetName(), !noop)
}
// lookup the refresh (notification) variable
refresh = g.RefreshPending(v) // do i need to perform a refresh?
obj.SetRefresh(refresh) // tell the resource
// check cached state, to skip CheckApply; can't skip if refreshing
if !refresh && obj.IsStateOK() {
checkOK, err = true, nil checkOK, err = true, nil
// NOTE: technically this block is wrong because we don't know
// if the resource implements refresh! If it doesn't, we could
// skip this, but it doesn't make a big difference under noop!
} else if noop && refresh { // had a refresh to do w/ noop!
checkOK, err = false, nil // therefore the state is wrong
// run the CheckApply!
} else { } else {
// if this fails, don't UpdateTimestamp() // if this fails, don't UpdateTimestamp()
checkOK, err = obj.CheckApply(!obj.Meta().Noop) checkOK, err = obj.CheckApply(!noop)
} }
if checkOK && err != nil { // should never return this way if checkOK && err != nil { // should never return this way
@@ -153,32 +217,45 @@ func (g *Graph) Process(v *Vertex) error {
} }
// if CheckApply ran without noop and without error, state should be good // if CheckApply ran without noop and without error, state should be good
if !obj.Meta().Noop && err == nil { // aka !obj.Meta().Noop || checkOK if !noop && err == nil { // aka !noop || checkOK
obj.StateOK(true) // reset obj.StateOK(true) // reset
g.SetUpstreamRefresh(v, false) // refresh happened, clear the request
} }
if !checkOK { // if state *was* not ok, we had to have apply'ed if !checkOK { // if state *was* not ok, we had to have apply'ed
if err != nil { // error during check or apply if err != nil { // error during check or apply
ok = false ok = false
} else { } else {
apply = true applied = true
} }
} }
// when noop is true we always want to update timestamp // when noop is true we always want to update timestamp
if obj.Meta().Noop && err == nil { if noop && err == nil {
ok = true ok = true
} }
if ok { if ok {
// did we actually do work?
activity := applied
if noop {
activity = false // no we didn't do work...
}
if activity { // add refresh flag to downstream edges...
g.SetDownstreamRefresh(v, true)
}
// update this timestamp *before* we poke or the poked // update this timestamp *before* we poke or the poked
// nodes might fail due to having a too old timestamp! // nodes might fail due to having a too old timestamp!
v.UpdateTimestamp() // this was touched... v.UpdateTimestamp() // this was touched...
obj.SetState(resources.ResStatePoking) // can't cancel parent poke obj.SetState(resources.ResStatePoking) // can't cancel parent poke
g.Poke(v, apply) if err := g.Poke(v, activity); err != nil {
return errwrap.Wrapf(err, "the Poke() failed")
}
} }
// poke at our pre-req's instead since they need to refresh/run... // poke at our pre-req's instead since they need to refresh/run...
return err return errwrap.Wrapf(err, "could not Process() successfully")
} }
// else... only poke at the pre-req's that need to run // else... only poke at the pre-req's that need to run
go g.BackPoke(v) go g.BackPoke(v)

View File

@@ -221,7 +221,7 @@ func (g *Graph) VertexMerge(v1, v2 *Vertex, vertexMergeFn func(*Vertex, *Vertex)
} }
// 2) edges that point towards v2 from X now point to v1 from X (no dupes) // 2) edges that point towards v2 from X now point to v1 from X (no dupes)
for _, x := range g.IncomingGraphEdges(v2) { // all to vertex v (??? -> v) for _, x := range g.IncomingGraphVertices(v2) { // all to vertex v (??? -> v)
e := g.Adjacency[x][v2] // previous edge e := g.Adjacency[x][v2] // previous edge
r := g.Reachability(x, v1) r := g.Reachability(x, v1)
// merge e with ex := g.Adjacency[x][v1] if it exists! // merge e with ex := g.Adjacency[x][v1] if it exists!
@@ -248,7 +248,7 @@ func (g *Graph) VertexMerge(v1, v2 *Vertex, vertexMergeFn func(*Vertex, *Vertex)
} }
// 3) edges that point from v2 to X now point from v1 to X (no dupes) // 3) edges that point from v2 to X now point from v1 to X (no dupes)
for _, x := range g.OutgoingGraphEdges(v2) { // all from vertex v (v -> ???) for _, x := range g.OutgoingGraphVertices(v2) { // all from vertex v (v -> ???)
e := g.Adjacency[v2][x] // previous edge e := g.Adjacency[v2][x] // previous edge
r := g.Reachability(v1, x) r := g.Reachability(v1, x)
// merge e with ex := g.Adjacency[v1][x] if it exists! // merge e with ex := g.Adjacency[v1][x] if it exists!

View File

@@ -61,7 +61,10 @@ type Vertex struct {
// Edge is the primary edge struct in this library. // Edge is the primary edge struct in this library.
type Edge struct { type Edge struct {
Name string Name string
Notify bool // should we send a refresh notification along this edge?
refresh bool // is there a notify pending for the dest vertex ?
} }
// NewGraph builds a new graph. // NewGraph builds a new graph.
@@ -87,6 +90,16 @@ func NewEdge(name string) *Edge {
} }
} }
// Refresh returns the pending refresh status of this edge.
func (obj *Edge) Refresh() bool {
return obj.refresh
}
// SetRefresh sets the pending refresh status of this edge.
func (obj *Edge) SetRefresh(b bool) {
obj.refresh = b
}
// Copy makes a copy of the graph struct // Copy makes a copy of the graph struct
func (g *Graph) Copy() *Graph { func (g *Graph) Copy() *Graph {
newGraph := &Graph{ newGraph := &Graph{
@@ -249,9 +262,9 @@ func (v *Vertex) String() string {
return fmt.Sprintf("%s[%s]", v.Res.Kind(), v.Res.GetName()) return fmt.Sprintf("%s[%s]", v.Res.Kind(), v.Res.GetName())
} }
// IncomingGraphEdges returns an array (slice) of all directed vertices to // IncomingGraphVertices returns an array (slice) of all directed vertices to
// vertex v (??? -> v). OKTimestamp should probably use this. // vertex v (??? -> v). OKTimestamp should probably use this.
func (g *Graph) IncomingGraphEdges(v *Vertex) []*Vertex { func (g *Graph) IncomingGraphVertices(v *Vertex) []*Vertex {
// TODO: we might be able to implement this differently by reversing // TODO: we might be able to implement this differently by reversing
// the Adjacency graph and then looping through it again... // the Adjacency graph and then looping through it again...
var s []*Vertex var s []*Vertex
@@ -265,9 +278,9 @@ func (g *Graph) IncomingGraphEdges(v *Vertex) []*Vertex {
return s return s
} }
// OutgoingGraphEdges returns an array (slice) of all vertices that vertex v // OutgoingGraphVertices returns an array (slice) of all vertices that vertex v
// points to (v -> ???). Poke should probably use this. // points to (v -> ???). Poke should probably use this.
func (g *Graph) OutgoingGraphEdges(v *Vertex) []*Vertex { func (g *Graph) OutgoingGraphVertices(v *Vertex) []*Vertex {
var s []*Vertex var s []*Vertex
for k := range g.Adjacency[v] { // forward paths for k := range g.Adjacency[v] { // forward paths
s = append(s, k) s = append(s, k)
@@ -275,15 +288,46 @@ func (g *Graph) OutgoingGraphEdges(v *Vertex) []*Vertex {
return s return s
} }
// GraphEdges returns an array (slice) of all vertices that connect to vertex v. // GraphVertices returns an array (slice) of all vertices that connect to vertex v.
// This is the union of IncomingGraphEdges and OutgoingGraphEdges. // This is the union of IncomingGraphVertices and OutgoingGraphVertices.
func (g *Graph) GraphEdges(v *Vertex) []*Vertex { func (g *Graph) GraphVertices(v *Vertex) []*Vertex {
var s []*Vertex var s []*Vertex
s = append(s, g.IncomingGraphEdges(v)...) s = append(s, g.IncomingGraphVertices(v)...)
s = append(s, g.OutgoingGraphEdges(v)...) s = append(s, g.OutgoingGraphVertices(v)...)
return s return s
} }
// IncomingGraphEdges returns all of the edges that point to vertex v (??? -> v).
func (g *Graph) IncomingGraphEdges(v *Vertex) []*Edge {
var edges []*Edge
for v1 := range g.Adjacency { // reverse paths
for v2, e := range g.Adjacency[v1] {
if v2 == v {
edges = append(edges, e)
}
}
}
return edges
}
// OutgoingGraphEdges returns all of the edges that point from vertex v (v -> ???).
func (g *Graph) OutgoingGraphEdges(v *Vertex) []*Edge {
var edges []*Edge
for _, e := range g.Adjacency[v] { // forward paths
edges = append(edges, e)
}
return edges
}
// GraphEdges returns an array (slice) of all edges that connect to vertex v.
// This is the union of IncomingGraphEdges and OutgoingGraphEdges.
func (g *Graph) GraphEdges(v *Vertex) []*Edge {
var edges []*Edge
edges = append(edges, g.IncomingGraphEdges(v)...)
edges = append(edges, g.OutgoingGraphEdges(v)...)
return edges
}
// DFS returns a depth first search for the graph, starting at the input vertex. // DFS returns a depth first search for the graph, starting at the input vertex.
func (g *Graph) DFS(start *Vertex) []*Vertex { func (g *Graph) DFS(start *Vertex) []*Vertex {
var d []*Vertex // discovered var d []*Vertex // discovered
@@ -299,7 +343,7 @@ func (g *Graph) DFS(start *Vertex) []*Vertex {
if !VertexContains(v, d) { // if not discovered if !VertexContains(v, d) { // if not discovered
d = append(d, v) // label as discovered d = append(d, v) // label as discovered
for _, w := range g.GraphEdges(v) { for _, w := range g.GraphVertices(v) {
s = append(s, w) s = append(s, w)
} }
} }
@@ -446,7 +490,7 @@ func (g *Graph) Reachability(a, b *Vertex) []*Vertex {
if a == nil || b == nil { if a == nil || b == nil {
return nil return nil
} }
vertices := g.OutgoingGraphEdges(a) // what points away from a ? vertices := g.OutgoingGraphVertices(a) // what points away from a ?
if len(vertices) == 0 { if len(vertices) == 0 {
return []*Vertex{} // nope return []*Vertex{} // nope
} }

View File

@@ -94,6 +94,46 @@ func (obj *MsgRes) Validate() error {
return nil return nil
} }
// isAllStateOK derives a compound state from all internal cache flags that apply to this resource.
func (obj *MsgRes) isAllStateOK() bool {
if obj.Journal && !obj.journalStateOK {
return false
}
if obj.Syslog && !obj.syslogStateOK {
return false
}
return obj.logStateOK
}
// updateStateOK sets the global state so it can be read by the engine.
func (obj *MsgRes) updateStateOK() {
obj.StateOK(obj.isAllStateOK())
}
// JournalPriority converts a string description to a numeric priority.
// XXX: Have Validate() make sure it actually is one of these.
func (obj *MsgRes) journalPriority() journal.Priority {
switch obj.Priority {
case "Emerg":
return journal.PriEmerg
case "Alert":
return journal.PriAlert
case "Crit":
return journal.PriCrit
case "Err":
return journal.PriErr
case "Warning":
return journal.PriWarning
case "Notice":
return journal.PriNotice
case "Info":
return journal.PriInfo
case "Debug":
return journal.PriDebug
}
return journal.PriNotice
}
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *MsgRes) Watch(processChan chan event.Event) error { func (obj *MsgRes) Watch(processChan chan event.Event) error {
if obj.IsWatching() { if obj.IsWatching() {
@@ -125,17 +165,6 @@ func (obj *MsgRes) Watch(processChan chan event.Event) error {
return nil // exit return nil // exit
} }
// TODO: invalidate cached state on poke events
//obj.logStateOK = false
//if obj.Journal {
// obj.journalStateOK = false
//}
//if obj.Syslog {
// obj.syslogStateOK = false
//}
//obj.updateStateOK()
send = true
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged! cuid.SetConverged(true) // converged!
continue continue
@@ -158,6 +187,51 @@ func (obj *MsgRes) Watch(processChan chan event.Event) error {
} }
} }
// CheckApply method for Msg resource.
// Every check leads to an apply, meaning that the message is flushed to the journal.
func (obj *MsgRes) CheckApply(apply bool) (bool, error) {
// isStateOK() done by engine, so we updateStateOK() to pass in value
//if obj.isAllStateOK() {
// return true, nil
//}
if obj.Refresh() { // if we were notified...
// invalidate cached state...
obj.logStateOK = false
if obj.Journal {
obj.journalStateOK = false
}
if obj.Syslog {
obj.syslogStateOK = false
}
obj.updateStateOK()
}
if !obj.logStateOK {
log.Printf("%s[%s]: Body: %s", obj.Kind(), obj.GetName(), obj.Body)
obj.logStateOK = true
obj.updateStateOK()
}
if !apply {
return false, nil
}
if obj.Journal && !obj.journalStateOK {
if err := journal.Send(obj.Body, obj.journalPriority(), obj.Fields); err != nil {
return false, err
}
obj.journalStateOK = true
obj.updateStateOK()
}
if obj.Syslog && !obj.syslogStateOK {
// TODO: implement syslog client
obj.syslogStateOK = true
obj.updateStateOK()
}
return false, nil
}
// GetUIDs includes all params to make a unique identification of this object. // GetUIDs includes all params to make a unique identification of this object.
// Most resources only return one, although some resources can return multiple. // Most resources only return one, although some resources can return multiple.
func (obj *MsgRes) GetUIDs() []ResUID { func (obj *MsgRes) GetUIDs() []ResUID {
@@ -203,76 +277,3 @@ func (obj *MsgRes) Compare(res Res) bool {
} }
return true return true
} }
// isAllStateOK derives a compound state from all internal cache flags that apply to this resource.
func (obj *MsgRes) isAllStateOK() bool {
if obj.Journal && !obj.journalStateOK {
return false
}
if obj.Syslog && !obj.syslogStateOK {
return false
}
return obj.logStateOK
}
// updateStateOK sets the global state so it can be read by the engine.
func (obj *MsgRes) updateStateOK() {
obj.StateOK(obj.isAllStateOK())
}
// JournalPriority converts a string description to a numeric priority.
// XXX: Have Validate() make sure it actually is one of these.
func (obj *MsgRes) journalPriority() journal.Priority {
switch obj.Priority {
case "Emerg":
return journal.PriEmerg
case "Alert":
return journal.PriAlert
case "Crit":
return journal.PriCrit
case "Err":
return journal.PriErr
case "Warning":
return journal.PriWarning
case "Notice":
return journal.PriNotice
case "Info":
return journal.PriInfo
case "Debug":
return journal.PriDebug
}
return journal.PriNotice
}
// CheckApply method for Msg resource.
// Every check leads to an apply, meaning that the message is flushed to the journal.
func (obj *MsgRes) CheckApply(apply bool) (bool, error) {
// isStateOK() done by engine, so we updateStateOK() to pass in value
//if obj.isAllStateOK() {
// return true, nil
//}
if !obj.logStateOK {
log.Printf("%s[%s]: Body: %s", obj.Kind(), obj.GetName(), obj.Body)
obj.logStateOK = true
obj.updateStateOK()
}
if !apply {
return false, nil
}
if obj.Journal && !obj.journalStateOK {
if err := journal.Send(obj.Body, obj.journalPriority(), obj.Fields); err != nil {
return false, err
}
obj.journalStateOK = true
obj.updateStateOK()
}
if obj.Syslog && !obj.syslogStateOK {
// TODO: implement syslog client
obj.syslogStateOK = true
obj.updateStateOK()
}
return false, nil
}

104
resources/refresh.go Normal file
View File

@@ -0,0 +1,104 @@
// Mgmt
// Copyright (C) 2013-2016+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package resources
import (
"fmt"
"os"
"strings"
errwrap "github.com/pkg/errors"
)
// Refresh returns the pending state of a notification. It should only be called
// in the CheckApply portion of a resource where a refresh should be acted upon.
func (obj *BaseRes) Refresh() bool {
return obj.refresh
}
// SetRefresh sets the pending state of a notification. It should only be called
// by the mgmt engine.
func (obj *BaseRes) SetRefresh(b bool) {
obj.refresh = b
}
// StatefulBool is an interface for storing a boolean flag in a permanent spot.
type StatefulBool interface {
Get() (bool, error) // get value of token
Set() error // set token to true
Del() error // rm token if it exists
}
// DiskBool stores a boolean variable on disk for stateful access across runs.
// The absence of the path is treated as false. If the path contains a special
// value, then it is treated as true. All the other non-error cases are false.
type DiskBool struct {
Path string // path to token
}
// str returns the string data which represents true (aka set).
func (obj *DiskBool) str() string {
const TrueToken = "true"
const newline = "\n"
return TrueToken + newline
}
// Get returns if the boolean setting, if no error reading the value occurs.
func (obj *DiskBool) Get() (bool, error) {
file, err := os.Open(obj.Path) // open a handle to read the file
if err != nil {
if os.IsNotExist(err) {
return false, nil // no token means value is false
}
return false, errwrap.Wrapf(err, "could not read token")
}
defer file.Close()
str := obj.str()
data := make([]byte, len(str)) // data + newline
if _, err := file.Read(data); err != nil {
return false, errwrap.Wrapf(err, "could not read from file")
}
return strings.TrimSpace(string(data)) == strings.TrimSpace(str), nil
}
// Set stores the true boolean value, if no error setting the value occurs.
func (obj *DiskBool) Set() error {
file, err := os.Create(obj.Path) // open a handle to create the file
if err != nil {
return errwrap.Wrapf(err, "can't create file")
}
defer file.Close()
str := obj.str()
if c, err := file.Write([]byte(str)); err != nil {
return errwrap.Wrapf(err, "error writing to file")
} else if l := len(str); c != l {
return fmt.Errorf("wrote %d bytes instead of %d", c, l)
}
return file.Sync() // guarantee it!
}
// Del stores the false boolean value, if no error clearing the value occurs.
func (obj *DiskBool) Del() error {
if err := os.Remove(obj.Path); err != nil { // remove the file
if os.IsNotExist(err) {
return nil // no file means this is already fine
}
return errwrap.Wrapf(err, "could not delete token")
}
return nil
}

View File

@@ -49,6 +49,8 @@ const (
ResStatePoking ResStatePoking
) )
const refreshPathToken = "refresh"
// Data is the set of input values passed into the pgraph for the resources. // Data is the set of input values passed into the pgraph for the resources.
type Data struct { type Data struct {
//Hostname string // uuid for the host //Hostname string // uuid for the host
@@ -133,6 +135,8 @@ type Base interface {
DoSend(chan event.Event, string) (bool, error) DoSend(chan event.Event, string) (bool, error)
SendEvent(event.EventName, bool, bool) bool SendEvent(event.EventName, bool, bool) bool
ReadEvent(*event.Event) (bool, bool) // TODO: optional here? ReadEvent(*event.Event) (bool, bool) // TODO: optional here?
Refresh() bool // is there a pending refresh to run?
SetRefresh(bool) // set the refresh state of this resource
SendRecv(Res) (bool, error) // send->recv data passing function SendRecv(Res) (bool, error) // send->recv data passing function
IsStateOK() bool IsStateOK() bool
StateOK(b bool) StateOK(b bool)
@@ -173,6 +177,8 @@ type BaseRes struct {
isStateOK bool // whether the state is okay based on events or not isStateOK bool // whether the state is okay based on events or not
isGrouped bool // am i contained within a group? isGrouped bool // am i contained within a group?
grouped []Res // list of any grouped resources grouped []Res // list of any grouped resources
refresh bool // does this resource have a refresh to run?
//refreshState StatefulBool // TODO: future stateful bool
} }
// UIDExistsInUIDs wraps the IFF method when used with a list of UID's. // UIDExistsInUIDs wraps the IFF method when used with a list of UID's.
@@ -222,6 +228,12 @@ func (obj *BaseRes) Init() error {
return fmt.Errorf("Resource did not set kind!") return fmt.Errorf("Resource did not set kind!")
} }
obj.events = make(chan event.Event) // unbuffered chan to avoid stale events obj.events = make(chan event.Event) // unbuffered chan to avoid stale events
//dir, err := obj.VarDir("")
//if err != nil {
// return errwrap.Wrapf(err, "VarDir failed in Init()")
//}
// TODO: this StatefulBool implementation could be eventually swappable
//obj.refreshState = &DiskBool{Path: path.Join(dir, refreshPathToken)}
return nil return nil
} }

View File

@@ -29,32 +29,6 @@ import (
errwrap "github.com/pkg/errors" errwrap "github.com/pkg/errors"
) )
// DoSend sends off an event, but doesn't block the incoming event queue. It can
// also recursively call itself when events need processing during the wait.
// I'm not completely comfortable with this fn, but it will have to do for now.
func (obj *BaseRes) DoSend(processChan chan event.Event, comment string) (bool, error) {
resp := event.NewResp()
processChan <- event.Event{Name: event.EventNil, Resp: resp, Msg: comment, Activity: true} // trigger process
e := resp.Wait()
return false, e // XXX: at the moment, we don't use the exit bool.
// XXX: this can cause a deadlock. do we need to recursively send? fix event stuff!
//select {
//case e := <-resp: // wait for the ACK()
// if e != nil { // we got a NACK
// return true, e // exit with error
// }
//case event := <-obj.events:
// // NOTE: this code should match the similar code below!
// //cuid.SetConverged(false) // TODO: ?
// if exit, send := obj.ReadEvent(&event); exit {
// return true, nil // exit, without error
// } else if send {
// return obj.DoSend(processChan, comment) // recurse
// }
//}
//return false, nil // return, no error or exit signal
}
// SendEvent pushes an event into the message queue for a particular vertex // SendEvent pushes an event into the message queue for a particular vertex
func (obj *BaseRes) SendEvent(ev event.EventName, sync bool, activity bool) bool { func (obj *BaseRes) SendEvent(ev event.EventName, sync bool, activity bool) bool {
// TODO: isn't this race-y ? // TODO: isn't this race-y ?
@@ -72,27 +46,52 @@ func (obj *BaseRes) SendEvent(ev event.EventName, sync bool, activity bool) bool
return true return true
} }
// DoSend sends off an event, but doesn't block the incoming event queue.
func (obj *BaseRes) DoSend(processChan chan event.Event, comment string) (exit bool, err error) {
resp := event.NewResp()
processChan <- event.Event{Name: event.EventNil, Resp: resp, Activity: false, Msg: comment} // trigger process
e := resp.Wait()
return false, e // XXX: at the moment, we don't use the exit bool.
}
// ReadEvent processes events when a select gets one, and handles the pause // ReadEvent processes events when a select gets one, and handles the pause
// code too! The return values specify if we should exit and poke respectively. // code too! The return values specify if we should exit and poke respectively.
func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, poke bool) { func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, send bool) {
ev.ACK() ev.ACK()
var poke bool
// ensure that a CheckApply runs by sending with a dirty state...
if ev.GetActivity() { // if previous node did work, and we were notified...
obj.StateOK(false) // dirty
poke = true // poke!
// XXX: this should be elsewhere in case Watch isn't used (eg: Polling instead...)
// XXX: unless this is used in our "fallback" polling implementation???
obj.SetRefresh(true)
}
switch ev.Name { switch ev.Name {
case event.EventStart: case event.EventStart:
return false, true send = true || poke
return
case event.EventPoke: case event.EventPoke:
return false, true send = true || poke
return
case event.EventBackPoke: case event.EventBackPoke:
return false, true // forward poking in response to a back poke! send = true || poke
return // forward poking in response to a back poke!
case event.EventExit: case event.EventExit:
// FIXME: what do we do if we have a pending refresh (poke) and an exit?
return true, false return true, false
case event.EventPause: case event.EventPause:
// wait for next event to continue // wait for next event to continue
select { select {
case e := <-obj.Events(): case e, ok := <-obj.Events():
if !ok { // shutdown
return true, false
}
e.ACK() e.ACK()
if e.Name == event.EventExit { if e.Name == event.EventExit {
return true, false return true, false

View File

@@ -171,9 +171,6 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
if exit, send = obj.ReadEvent(&event); exit { if exit, send = obj.ReadEvent(&event); exit {
return nil // exit return nil // exit
} }
if event.GetActivity() {
obj.StateOK(false) // dirty
}
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged! cuid.SetConverged(true) // converged!
@@ -226,9 +223,6 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
if exit, send = obj.ReadEvent(&event); exit { if exit, send = obj.ReadEvent(&event); exit {
return nil // exit return nil // exit
} }
if event.GetActivity() {
obj.StateOK(false) // dirty
}
case <-cuid.ConvergedTimer(): case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged! cuid.SetConverged(true) // converged!
@@ -287,9 +281,10 @@ func (obj *SvcRes) CheckApply(apply bool) (checkOK bool, err error) {
var running = (activestate.Value == dbus.MakeVariant("active")) var running = (activestate.Value == dbus.MakeVariant("active"))
var stateOK = ((obj.State == "") || (obj.State == "running" && running) || (obj.State == "stopped" && !running)) var stateOK = ((obj.State == "") || (obj.State == "running" && running) || (obj.State == "stopped" && !running))
var startupOK = true // XXX: DETECT AND SET var startupOK = true // XXX: DETECT AND SET
var refresh = obj.Refresh() // do we have a pending reload to apply?
if stateOK && startupOK { if stateOK && startupOK && !refresh {
return true, nil // we are in the correct state return true, nil // we are in the correct state
} }
@@ -320,11 +315,19 @@ func (obj *SvcRes) CheckApply(apply bool) (checkOK bool, err error) {
if err != nil { if err != nil {
return false, errwrap.Wrapf(err, "Failed to start unit") return false, errwrap.Wrapf(err, "Failed to start unit")
} }
if refresh {
log.Printf("%s[%s]: Skipping reload, due to pending start", obj.Kind(), obj.GetName())
}
refresh = false // we did a start, so a reload is not needed
} else if obj.State == "stopped" { } else if obj.State == "stopped" {
_, err = conn.StopUnit(svc, "fail", result) _, err = conn.StopUnit(svc, "fail", result)
if err != nil { if err != nil {
return false, errwrap.Wrapf(err, "Failed to stop unit") return false, errwrap.Wrapf(err, "Failed to stop unit")
} }
if refresh {
log.Printf("%s[%s]: Skipping reload, due to pending stop", obj.Kind(), obj.GetName())
}
refresh = false // we did a stop, so a reload is not needed
} }
status := <-result status := <-result
@@ -335,6 +338,11 @@ func (obj *SvcRes) CheckApply(apply bool) (checkOK bool, err error) {
return false, fmt.Errorf("Unknown systemd return string: %v", status) return false, fmt.Errorf("Unknown systemd return string: %v", status)
} }
if refresh { // we need to reload the service
// XXX: run a svc reload here!
log.Printf("%s[%s]: Reloading...", obj.Kind(), obj.GetName())
}
// XXX: also set enabled on boot // XXX: also set enabled on boot
return false, nil // success return false, nil // success

View File

@@ -33,6 +33,8 @@ func init() {
type TimerRes struct { type TimerRes struct {
BaseRes `yaml:",inline"` BaseRes `yaml:",inline"`
Interval int `yaml:"interval"` // Interval : Interval between runs Interval int `yaml:"interval"` // Interval : Interval between runs
ticker *time.Ticker
} }
// TimerUID is the UID struct for TimerRes. // TimerUID is the UID struct for TimerRes.
@@ -65,6 +67,11 @@ func (obj *TimerRes) Validate() error {
return nil return nil
} }
// newTicker creates a new ticker
func (obj *TimerRes) newTicker() *time.Ticker {
return time.NewTicker(time.Duration(obj.Interval) * time.Second)
}
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *TimerRes) Watch(processChan chan event.Event) error { func (obj *TimerRes) Watch(processChan chan event.Event) error {
if obj.IsWatching() { if obj.IsWatching() {
@@ -84,16 +91,16 @@ func (obj *TimerRes) Watch(processChan chan event.Event) error {
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
} }
// Create a time.Ticker for the given interval // create a time.Ticker for the given interval
ticker := time.NewTicker(time.Duration(obj.Interval) * time.Second) obj.ticker = obj.newTicker()
defer ticker.Stop() defer obj.ticker.Stop()
var send = false var send = false
for { for {
obj.SetState(ResStateWatching) obj.SetState(ResStateWatching)
select { select {
case <-ticker.C: // received the timer event case <-obj.ticker.C: // received the timer event
send = true send = true
log.Printf("%s[%s]: received tick", obj.Kind(), obj.GetName()) log.Printf("%s[%s]: received tick", obj.Kind(), obj.GetName())
@@ -121,6 +128,22 @@ func (obj *TimerRes) Watch(processChan chan event.Event) error {
} }
} }
// CheckApply method for Timer resource. Triggers a timer reset on notify.
func (obj *TimerRes) CheckApply(apply bool) (bool, error) {
// because there are no checks to run, this resource has a less
// traditional pattern than what is seen in most resources...
if !obj.Refresh() { // this works for apply || !apply
return true, nil // state is always okay if no refresh to do
} else if !apply { // we had a refresh to do
return false, nil // therefore state is wrong
}
// reset the timer since apply && refresh
obj.ticker.Stop()
obj.ticker = obj.newTicker()
return false, nil
}
// GetUIDs includes all params to make a unique identification of this object. // GetUIDs includes all params to make a unique identification of this object.
// Most resources only return one, although some resources can return multiple. // Most resources only return one, although some resources can return multiple.
func (obj *TimerRes) GetUIDs() []ResUID { func (obj *TimerRes) GetUIDs() []ResUID {
@@ -158,8 +181,3 @@ func (obj *TimerRes) Compare(res Res) bool {
} }
return true return true
} }
// CheckApply method for Timer resource. Does nothing, returns happy!
func (obj *TimerRes) CheckApply(apply bool) (bool, error) {
return true, nil // state is always okay
}