From 2e718c0e9d8739d414275aa07289cb26dc17bd04 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Tue, 29 Nov 2016 22:11:42 -0500 Subject: [PATCH] resources: Improve notification system and notify refreshes Resources can send "refresh" notifications along edges. These messages are sent whenever the upstream (initiating vertex) changes state. When the changed state propagates downstream, it will be paired with a refresh flag which can be queried in the CheckApply method of that resource. Future work will include a stateful refresh tracking mechanism so that if a refresh event is generated and not consumed, it will be saved across an interrupt (shutdown) or a crash so that it can be re-applied on the subsequent run. This is important because the unapplied refresh is a form of hysteresis which needs to be tracked and remembered or we won't be able to determine that the state is wrong! Still to do: * Update the autogrouping code to handle the edge notify properties! * Actually finish the stateful bool code --- examples/lib/libmgmt3.go | 13 +++ pgraph/actions.go | 119 ++++++++++++++++++++++----- pgraph/autogroup.go | 4 +- pgraph/pgraph.go | 68 +++++++++++++--- resources/msg.go | 169 ++++++++++++++++++++------------------- resources/refresh.go | 104 ++++++++++++++++++++++++ resources/resources.go | 12 +++ resources/sendrecv.go | 61 +++++++------- resources/svc.go | 24 ++++-- resources/timer.go | 36 ++++++--- 10 files changed, 443 insertions(+), 167 deletions(-) create mode 100644 resources/refresh.go diff --git a/examples/lib/libmgmt3.go b/examples/lib/libmgmt3.go index 4a960a38..b0eb190f 100644 --- a/examples/lib/libmgmt3.go +++ b/examples/lib/libmgmt3.go @@ -83,7 +83,20 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { v2 := pgraph.NewVertex(f1) g.AddVertex(v2) + s1 := &resources.SvcRes{ + BaseRes: resources.BaseRes{ + Name: "purpleidea", + }, + State: "stopped", + } + + v3 := pgraph.NewVertex(s1) + g.AddVertex(v3) + g.AddEdge(v1, v2, pgraph.NewEdge("e1")) + e2 := pgraph.NewEdge("e2") + e2.Notify = true // send a notification from v2 to v3 + g.AddEdge(v2, v3, e2) //g, err := config.NewGraphFromConfig(obj.data.Hostname, obj.data.EmbdEtcd, obj.data.Noop) return g, nil diff --git a/pgraph/actions.go b/pgraph/actions.go index 2bfcef90..d38e829e 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -29,6 +29,7 @@ import ( "github.com/purpleidea/mgmt/resources" errwrap "github.com/pkg/errors" + "golang.org/x/sync/errgroup" ) // GetTimestamp returns the timestamp of a vertex @@ -45,7 +46,7 @@ func (v *Vertex) UpdateTimestamp() int64 { // OKTimestamp returns true if this element can run right now? func (g *Graph) OKTimestamp(v *Vertex) bool { // these are all the vertices pointing TO v, eg: ??? -> v - for _, n := range g.IncomingGraphEdges(v) { + for _, n := range g.IncomingGraphVertices(v) { // if the vertex has a greater timestamp than any pre-req (n) // then we can't run right now... // if they're equal (eg: on init of 0) then we also can't run @@ -63,29 +64,42 @@ func (g *Graph) OKTimestamp(v *Vertex) bool { // Poke notifies nodes after me in the dependency graph that they need refreshing... // NOTE: this assumes that this can never fail or need to be rescheduled -func (g *Graph) Poke(v *Vertex, activity bool) { +func (g *Graph) Poke(v *Vertex, activity bool) error { + var eg errgroup.Group // these are all the vertices pointing AWAY FROM v, eg: v -> ??? - for _, n := range g.OutgoingGraphEdges(v) { + for _, n := range g.OutgoingGraphVertices(v) { // XXX: if we're in state event and haven't been cancelled by // apply, then we can cancel a poke to a child, right? XXX - // XXX: if n.Res.getState() != resources.ResStateEvent { // is this correct? - if true { // XXX + // XXX: if n.Res.getState() != resources.ResStateEvent || activity { // is this correct? + if true || activity { // XXX: ??? if global.DEBUG { log.Printf("%s[%s]: Poke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) } - n.SendEvent(event.EventPoke, false, activity) // XXX: can this be switched to sync? + //wg.Add(1) + eg.Go(func() error { + //defer wg.Done() + edge := g.Adjacency[v][n] // lookup + notify := edge.Notify && edge.Refresh() + + // FIXME: is it okay that this is sync? + n.SendEvent(event.EventPoke, true, notify) + // TODO: check return value? + return nil // never error for now... + }) + } else { if global.DEBUG { log.Printf("%s[%s]: Poke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) } } } + return eg.Wait() // wait for all the pokes to complete } // BackPoke pokes the pre-requisites that are stale and need to run before I can run. func (g *Graph) BackPoke(v *Vertex) { // these are all the vertices pointing TO v, eg: ??? -> v - for _, n := range g.IncomingGraphEdges(v) { + for _, n := range g.IncomingGraphVertices(v) { x, y, s := v.GetTimestamp(), n.GetTimestamp(), n.Res.GetState() // if the parent timestamp needs poking AND it's not in state // ResStateEvent, then poke it. If the parent is in ResStateEvent it @@ -97,7 +111,8 @@ func (g *Graph) BackPoke(v *Vertex) { if global.DEBUG { log.Printf("%s[%s]: BackPoke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) } - n.SendEvent(event.EventBackPoke, false, false) // XXX: can this be switched to sync? + // FIXME: is it okay that this is sync? + n.SendEvent(event.EventBackPoke, true, false) } else { if global.DEBUG { log.Printf("%s[%s]: BackPoke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) @@ -106,6 +121,39 @@ func (g *Graph) BackPoke(v *Vertex) { } } +// RefreshPending determines if any previous nodes have a refresh pending here. +// If this is true, it means I am expected to apply a refresh when I next run. +func (g *Graph) RefreshPending(v *Vertex) bool { + var refresh bool + for _, edge := range g.IncomingGraphEdges(v) { + // if we asked for a notify *and* if one is pending! + if edge.Notify && edge.Refresh() { + refresh = true + break + } + } + return refresh +} + +// SetUpstreamRefresh sets the refresh value to any upstream vertices. +func (g *Graph) SetUpstreamRefresh(v *Vertex, b bool) { + for _, edge := range g.IncomingGraphEdges(v) { + if edge.Notify { + edge.SetRefresh(b) + } + } +} + +// SetDownstreamRefresh sets the refresh value to any downstream vertices. +func (g *Graph) SetDownstreamRefresh(v *Vertex, b bool) { + for _, edge := range g.OutgoingGraphEdges(v) { + // if we asked for a notify *and* if one is pending! + if edge.Notify { + edge.SetRefresh(b) + } + } +} + // Process is the primary function to execute for a particular vertex in the graph. func (g *Graph) Process(v *Vertex) error { obj := v.Res @@ -114,7 +162,7 @@ func (g *Graph) Process(v *Vertex) error { } obj.SetState(resources.ResStateEvent) var ok = true - var apply = false // did we run an apply? + var applied = false // did we run an apply? // is it okay to run dependency wise right now? // if not, that's okay because when the dependency runs, it will poke // us back and we will run if needed then! @@ -132,17 +180,33 @@ func (g *Graph) Process(v *Vertex) error { obj.StateOK(false) // invalidate cache, mark as dirty } - if global.DEBUG { - log.Printf("%s[%s]: CheckApply(%t)", obj.Kind(), obj.GetName(), !obj.Meta().Noop) - } - + var noop = obj.Meta().Noop // lookup the noop value + var refresh bool var checkOK bool var err error - if obj.IsStateOK() { // check cached state, to skip CheckApply + + if global.DEBUG { + log.Printf("%s[%s]: CheckApply(%t)", obj.Kind(), obj.GetName(), !noop) + } + + // lookup the refresh (notification) variable + refresh = g.RefreshPending(v) // do i need to perform a refresh? + obj.SetRefresh(refresh) // tell the resource + + // check cached state, to skip CheckApply; can't skip if refreshing + if !refresh && obj.IsStateOK() { checkOK, err = true, nil + + // NOTE: technically this block is wrong because we don't know + // if the resource implements refresh! If it doesn't, we could + // skip this, but it doesn't make a big difference under noop! + } else if noop && refresh { // had a refresh to do w/ noop! + checkOK, err = false, nil // therefore the state is wrong + + // run the CheckApply! } else { // if this fails, don't UpdateTimestamp() - checkOK, err = obj.CheckApply(!obj.Meta().Noop) + checkOK, err = obj.CheckApply(!noop) } if checkOK && err != nil { // should never return this way @@ -153,32 +217,45 @@ func (g *Graph) Process(v *Vertex) error { } // if CheckApply ran without noop and without error, state should be good - if !obj.Meta().Noop && err == nil { // aka !obj.Meta().Noop || checkOK - obj.StateOK(true) // reset + if !noop && err == nil { // aka !noop || checkOK + obj.StateOK(true) // reset + g.SetUpstreamRefresh(v, false) // refresh happened, clear the request } if !checkOK { // if state *was* not ok, we had to have apply'ed if err != nil { // error during check or apply ok = false } else { - apply = true + applied = true } } // when noop is true we always want to update timestamp - if obj.Meta().Noop && err == nil { + if noop && err == nil { ok = true } if ok { + // did we actually do work? + activity := applied + if noop { + activity = false // no we didn't do work... + } + + if activity { // add refresh flag to downstream edges... + g.SetDownstreamRefresh(v, true) + } + // update this timestamp *before* we poke or the poked // nodes might fail due to having a too old timestamp! v.UpdateTimestamp() // this was touched... obj.SetState(resources.ResStatePoking) // can't cancel parent poke - g.Poke(v, apply) + if err := g.Poke(v, activity); err != nil { + return errwrap.Wrapf(err, "the Poke() failed") + } } // poke at our pre-req's instead since they need to refresh/run... - return err + return errwrap.Wrapf(err, "could not Process() successfully") } // else... only poke at the pre-req's that need to run go g.BackPoke(v) diff --git a/pgraph/autogroup.go b/pgraph/autogroup.go index e9effdbc..a53a2b73 100644 --- a/pgraph/autogroup.go +++ b/pgraph/autogroup.go @@ -221,7 +221,7 @@ func (g *Graph) VertexMerge(v1, v2 *Vertex, vertexMergeFn func(*Vertex, *Vertex) } // 2) edges that point towards v2 from X now point to v1 from X (no dupes) - for _, x := range g.IncomingGraphEdges(v2) { // all to vertex v (??? -> v) + for _, x := range g.IncomingGraphVertices(v2) { // all to vertex v (??? -> v) e := g.Adjacency[x][v2] // previous edge r := g.Reachability(x, v1) // merge e with ex := g.Adjacency[x][v1] if it exists! @@ -248,7 +248,7 @@ func (g *Graph) VertexMerge(v1, v2 *Vertex, vertexMergeFn func(*Vertex, *Vertex) } // 3) edges that point from v2 to X now point from v1 to X (no dupes) - for _, x := range g.OutgoingGraphEdges(v2) { // all from vertex v (v -> ???) + for _, x := range g.OutgoingGraphVertices(v2) { // all from vertex v (v -> ???) e := g.Adjacency[v2][x] // previous edge r := g.Reachability(v1, x) // merge e with ex := g.Adjacency[v1][x] if it exists! diff --git a/pgraph/pgraph.go b/pgraph/pgraph.go index 922ce025..9529744c 100644 --- a/pgraph/pgraph.go +++ b/pgraph/pgraph.go @@ -61,7 +61,10 @@ type Vertex struct { // Edge is the primary edge struct in this library. type Edge struct { - Name string + Name string + Notify bool // should we send a refresh notification along this edge? + + refresh bool // is there a notify pending for the dest vertex ? } // NewGraph builds a new graph. @@ -87,6 +90,16 @@ func NewEdge(name string) *Edge { } } +// Refresh returns the pending refresh status of this edge. +func (obj *Edge) Refresh() bool { + return obj.refresh +} + +// SetRefresh sets the pending refresh status of this edge. +func (obj *Edge) SetRefresh(b bool) { + obj.refresh = b +} + // Copy makes a copy of the graph struct func (g *Graph) Copy() *Graph { newGraph := &Graph{ @@ -249,9 +262,9 @@ func (v *Vertex) String() string { return fmt.Sprintf("%s[%s]", v.Res.Kind(), v.Res.GetName()) } -// IncomingGraphEdges returns an array (slice) of all directed vertices to +// IncomingGraphVertices returns an array (slice) of all directed vertices to // vertex v (??? -> v). OKTimestamp should probably use this. -func (g *Graph) IncomingGraphEdges(v *Vertex) []*Vertex { +func (g *Graph) IncomingGraphVertices(v *Vertex) []*Vertex { // TODO: we might be able to implement this differently by reversing // the Adjacency graph and then looping through it again... var s []*Vertex @@ -265,9 +278,9 @@ func (g *Graph) IncomingGraphEdges(v *Vertex) []*Vertex { return s } -// OutgoingGraphEdges returns an array (slice) of all vertices that vertex v +// OutgoingGraphVertices returns an array (slice) of all vertices that vertex v // points to (v -> ???). Poke should probably use this. -func (g *Graph) OutgoingGraphEdges(v *Vertex) []*Vertex { +func (g *Graph) OutgoingGraphVertices(v *Vertex) []*Vertex { var s []*Vertex for k := range g.Adjacency[v] { // forward paths s = append(s, k) @@ -275,15 +288,46 @@ func (g *Graph) OutgoingGraphEdges(v *Vertex) []*Vertex { return s } -// GraphEdges returns an array (slice) of all vertices that connect to vertex v. -// This is the union of IncomingGraphEdges and OutgoingGraphEdges. -func (g *Graph) GraphEdges(v *Vertex) []*Vertex { +// GraphVertices returns an array (slice) of all vertices that connect to vertex v. +// This is the union of IncomingGraphVertices and OutgoingGraphVertices. +func (g *Graph) GraphVertices(v *Vertex) []*Vertex { var s []*Vertex - s = append(s, g.IncomingGraphEdges(v)...) - s = append(s, g.OutgoingGraphEdges(v)...) + s = append(s, g.IncomingGraphVertices(v)...) + s = append(s, g.OutgoingGraphVertices(v)...) return s } +// IncomingGraphEdges returns all of the edges that point to vertex v (??? -> v). +func (g *Graph) IncomingGraphEdges(v *Vertex) []*Edge { + var edges []*Edge + for v1 := range g.Adjacency { // reverse paths + for v2, e := range g.Adjacency[v1] { + if v2 == v { + edges = append(edges, e) + } + } + } + return edges +} + +// OutgoingGraphEdges returns all of the edges that point from vertex v (v -> ???). +func (g *Graph) OutgoingGraphEdges(v *Vertex) []*Edge { + var edges []*Edge + for _, e := range g.Adjacency[v] { // forward paths + edges = append(edges, e) + } + return edges +} + +// GraphEdges returns an array (slice) of all edges that connect to vertex v. +// This is the union of IncomingGraphEdges and OutgoingGraphEdges. +func (g *Graph) GraphEdges(v *Vertex) []*Edge { + var edges []*Edge + edges = append(edges, g.IncomingGraphEdges(v)...) + edges = append(edges, g.OutgoingGraphEdges(v)...) + return edges +} + // DFS returns a depth first search for the graph, starting at the input vertex. func (g *Graph) DFS(start *Vertex) []*Vertex { var d []*Vertex // discovered @@ -299,7 +343,7 @@ func (g *Graph) DFS(start *Vertex) []*Vertex { if !VertexContains(v, d) { // if not discovered d = append(d, v) // label as discovered - for _, w := range g.GraphEdges(v) { + for _, w := range g.GraphVertices(v) { s = append(s, w) } } @@ -446,7 +490,7 @@ func (g *Graph) Reachability(a, b *Vertex) []*Vertex { if a == nil || b == nil { return nil } - vertices := g.OutgoingGraphEdges(a) // what points away from a ? + vertices := g.OutgoingGraphVertices(a) // what points away from a ? if len(vertices) == 0 { return []*Vertex{} // nope } diff --git a/resources/msg.go b/resources/msg.go index 88a72297..2bf956fd 100644 --- a/resources/msg.go +++ b/resources/msg.go @@ -94,6 +94,46 @@ func (obj *MsgRes) Validate() error { return nil } +// isAllStateOK derives a compound state from all internal cache flags that apply to this resource. +func (obj *MsgRes) isAllStateOK() bool { + if obj.Journal && !obj.journalStateOK { + return false + } + if obj.Syslog && !obj.syslogStateOK { + return false + } + return obj.logStateOK +} + +// updateStateOK sets the global state so it can be read by the engine. +func (obj *MsgRes) updateStateOK() { + obj.StateOK(obj.isAllStateOK()) +} + +// JournalPriority converts a string description to a numeric priority. +// XXX: Have Validate() make sure it actually is one of these. +func (obj *MsgRes) journalPriority() journal.Priority { + switch obj.Priority { + case "Emerg": + return journal.PriEmerg + case "Alert": + return journal.PriAlert + case "Crit": + return journal.PriCrit + case "Err": + return journal.PriErr + case "Warning": + return journal.PriWarning + case "Notice": + return journal.PriNotice + case "Info": + return journal.PriInfo + case "Debug": + return journal.PriDebug + } + return journal.PriNotice +} + // Watch is the primary listener for this resource and it outputs events. func (obj *MsgRes) Watch(processChan chan event.Event) error { if obj.IsWatching() { @@ -125,17 +165,6 @@ func (obj *MsgRes) Watch(processChan chan event.Event) error { return nil // exit } - // TODO: invalidate cached state on poke events - //obj.logStateOK = false - //if obj.Journal { - // obj.journalStateOK = false - //} - //if obj.Syslog { - // obj.syslogStateOK = false - //} - //obj.updateStateOK() - send = true - case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue @@ -158,6 +187,51 @@ func (obj *MsgRes) Watch(processChan chan event.Event) error { } } +// CheckApply method for Msg resource. +// Every check leads to an apply, meaning that the message is flushed to the journal. +func (obj *MsgRes) CheckApply(apply bool) (bool, error) { + + // isStateOK() done by engine, so we updateStateOK() to pass in value + //if obj.isAllStateOK() { + // return true, nil + //} + + if obj.Refresh() { // if we were notified... + // invalidate cached state... + obj.logStateOK = false + if obj.Journal { + obj.journalStateOK = false + } + if obj.Syslog { + obj.syslogStateOK = false + } + obj.updateStateOK() + } + + if !obj.logStateOK { + log.Printf("%s[%s]: Body: %s", obj.Kind(), obj.GetName(), obj.Body) + obj.logStateOK = true + obj.updateStateOK() + } + + if !apply { + return false, nil + } + if obj.Journal && !obj.journalStateOK { + if err := journal.Send(obj.Body, obj.journalPriority(), obj.Fields); err != nil { + return false, err + } + obj.journalStateOK = true + obj.updateStateOK() + } + if obj.Syslog && !obj.syslogStateOK { + // TODO: implement syslog client + obj.syslogStateOK = true + obj.updateStateOK() + } + return false, nil +} + // GetUIDs includes all params to make a unique identification of this object. // Most resources only return one, although some resources can return multiple. func (obj *MsgRes) GetUIDs() []ResUID { @@ -203,76 +277,3 @@ func (obj *MsgRes) Compare(res Res) bool { } return true } - -// isAllStateOK derives a compound state from all internal cache flags that apply to this resource. -func (obj *MsgRes) isAllStateOK() bool { - if obj.Journal && !obj.journalStateOK { - return false - } - if obj.Syslog && !obj.syslogStateOK { - return false - } - return obj.logStateOK -} - -// updateStateOK sets the global state so it can be read by the engine. -func (obj *MsgRes) updateStateOK() { - obj.StateOK(obj.isAllStateOK()) -} - -// JournalPriority converts a string description to a numeric priority. -// XXX: Have Validate() make sure it actually is one of these. -func (obj *MsgRes) journalPriority() journal.Priority { - switch obj.Priority { - case "Emerg": - return journal.PriEmerg - case "Alert": - return journal.PriAlert - case "Crit": - return journal.PriCrit - case "Err": - return journal.PriErr - case "Warning": - return journal.PriWarning - case "Notice": - return journal.PriNotice - case "Info": - return journal.PriInfo - case "Debug": - return journal.PriDebug - } - return journal.PriNotice -} - -// CheckApply method for Msg resource. -// Every check leads to an apply, meaning that the message is flushed to the journal. -func (obj *MsgRes) CheckApply(apply bool) (bool, error) { - - // isStateOK() done by engine, so we updateStateOK() to pass in value - //if obj.isAllStateOK() { - // return true, nil - //} - - if !obj.logStateOK { - log.Printf("%s[%s]: Body: %s", obj.Kind(), obj.GetName(), obj.Body) - obj.logStateOK = true - obj.updateStateOK() - } - - if !apply { - return false, nil - } - if obj.Journal && !obj.journalStateOK { - if err := journal.Send(obj.Body, obj.journalPriority(), obj.Fields); err != nil { - return false, err - } - obj.journalStateOK = true - obj.updateStateOK() - } - if obj.Syslog && !obj.syslogStateOK { - // TODO: implement syslog client - obj.syslogStateOK = true - obj.updateStateOK() - } - return false, nil -} diff --git a/resources/refresh.go b/resources/refresh.go new file mode 100644 index 00000000..640de553 --- /dev/null +++ b/resources/refresh.go @@ -0,0 +1,104 @@ +// Mgmt +// Copyright (C) 2013-2016+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package resources + +import ( + "fmt" + "os" + "strings" + + errwrap "github.com/pkg/errors" +) + +// Refresh returns the pending state of a notification. It should only be called +// in the CheckApply portion of a resource where a refresh should be acted upon. +func (obj *BaseRes) Refresh() bool { + return obj.refresh +} + +// SetRefresh sets the pending state of a notification. It should only be called +// by the mgmt engine. +func (obj *BaseRes) SetRefresh(b bool) { + obj.refresh = b +} + +// StatefulBool is an interface for storing a boolean flag in a permanent spot. +type StatefulBool interface { + Get() (bool, error) // get value of token + Set() error // set token to true + Del() error // rm token if it exists +} + +// DiskBool stores a boolean variable on disk for stateful access across runs. +// The absence of the path is treated as false. If the path contains a special +// value, then it is treated as true. All the other non-error cases are false. +type DiskBool struct { + Path string // path to token +} + +// str returns the string data which represents true (aka set). +func (obj *DiskBool) str() string { + const TrueToken = "true" + const newline = "\n" + return TrueToken + newline +} + +// Get returns if the boolean setting, if no error reading the value occurs. +func (obj *DiskBool) Get() (bool, error) { + file, err := os.Open(obj.Path) // open a handle to read the file + if err != nil { + if os.IsNotExist(err) { + return false, nil // no token means value is false + } + return false, errwrap.Wrapf(err, "could not read token") + } + defer file.Close() + str := obj.str() + data := make([]byte, len(str)) // data + newline + if _, err := file.Read(data); err != nil { + return false, errwrap.Wrapf(err, "could not read from file") + } + return strings.TrimSpace(string(data)) == strings.TrimSpace(str), nil +} + +// Set stores the true boolean value, if no error setting the value occurs. +func (obj *DiskBool) Set() error { + file, err := os.Create(obj.Path) // open a handle to create the file + if err != nil { + return errwrap.Wrapf(err, "can't create file") + } + defer file.Close() + str := obj.str() + if c, err := file.Write([]byte(str)); err != nil { + return errwrap.Wrapf(err, "error writing to file") + } else if l := len(str); c != l { + return fmt.Errorf("wrote %d bytes instead of %d", c, l) + } + return file.Sync() // guarantee it! +} + +// Del stores the false boolean value, if no error clearing the value occurs. +func (obj *DiskBool) Del() error { + if err := os.Remove(obj.Path); err != nil { // remove the file + if os.IsNotExist(err) { + return nil // no file means this is already fine + } + return errwrap.Wrapf(err, "could not delete token") + } + return nil +} diff --git a/resources/resources.go b/resources/resources.go index 07825e9a..0ceb9d2e 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -49,6 +49,8 @@ const ( ResStatePoking ) +const refreshPathToken = "refresh" + // Data is the set of input values passed into the pgraph for the resources. type Data struct { //Hostname string // uuid for the host @@ -133,6 +135,8 @@ type Base interface { DoSend(chan event.Event, string) (bool, error) SendEvent(event.EventName, bool, bool) bool ReadEvent(*event.Event) (bool, bool) // TODO: optional here? + Refresh() bool // is there a pending refresh to run? + SetRefresh(bool) // set the refresh state of this resource SendRecv(Res) (bool, error) // send->recv data passing function IsStateOK() bool StateOK(b bool) @@ -173,6 +177,8 @@ type BaseRes struct { isStateOK bool // whether the state is okay based on events or not isGrouped bool // am i contained within a group? grouped []Res // list of any grouped resources + refresh bool // does this resource have a refresh to run? + //refreshState StatefulBool // TODO: future stateful bool } // UIDExistsInUIDs wraps the IFF method when used with a list of UID's. @@ -222,6 +228,12 @@ func (obj *BaseRes) Init() error { return fmt.Errorf("Resource did not set kind!") } obj.events = make(chan event.Event) // unbuffered chan to avoid stale events + //dir, err := obj.VarDir("") + //if err != nil { + // return errwrap.Wrapf(err, "VarDir failed in Init()") + //} + // TODO: this StatefulBool implementation could be eventually swappable + //obj.refreshState = &DiskBool{Path: path.Join(dir, refreshPathToken)} return nil } diff --git a/resources/sendrecv.go b/resources/sendrecv.go index f964ea42..4af0529d 100644 --- a/resources/sendrecv.go +++ b/resources/sendrecv.go @@ -29,32 +29,6 @@ import ( errwrap "github.com/pkg/errors" ) -// DoSend sends off an event, but doesn't block the incoming event queue. It can -// also recursively call itself when events need processing during the wait. -// I'm not completely comfortable with this fn, but it will have to do for now. -func (obj *BaseRes) DoSend(processChan chan event.Event, comment string) (bool, error) { - resp := event.NewResp() - processChan <- event.Event{Name: event.EventNil, Resp: resp, Msg: comment, Activity: true} // trigger process - e := resp.Wait() - return false, e // XXX: at the moment, we don't use the exit bool. - // XXX: this can cause a deadlock. do we need to recursively send? fix event stuff! - //select { - //case e := <-resp: // wait for the ACK() - // if e != nil { // we got a NACK - // return true, e // exit with error - // } - //case event := <-obj.events: - // // NOTE: this code should match the similar code below! - // //cuid.SetConverged(false) // TODO: ? - // if exit, send := obj.ReadEvent(&event); exit { - // return true, nil // exit, without error - // } else if send { - // return obj.DoSend(processChan, comment) // recurse - // } - //} - //return false, nil // return, no error or exit signal -} - // SendEvent pushes an event into the message queue for a particular vertex func (obj *BaseRes) SendEvent(ev event.EventName, sync bool, activity bool) bool { // TODO: isn't this race-y ? @@ -72,27 +46,52 @@ func (obj *BaseRes) SendEvent(ev event.EventName, sync bool, activity bool) bool return true } +// DoSend sends off an event, but doesn't block the incoming event queue. +func (obj *BaseRes) DoSend(processChan chan event.Event, comment string) (exit bool, err error) { + resp := event.NewResp() + processChan <- event.Event{Name: event.EventNil, Resp: resp, Activity: false, Msg: comment} // trigger process + e := resp.Wait() + return false, e // XXX: at the moment, we don't use the exit bool. +} + // ReadEvent processes events when a select gets one, and handles the pause // code too! The return values specify if we should exit and poke respectively. -func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, poke bool) { +func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, send bool) { ev.ACK() + var poke bool + // ensure that a CheckApply runs by sending with a dirty state... + if ev.GetActivity() { // if previous node did work, and we were notified... + obj.StateOK(false) // dirty + poke = true // poke! + // XXX: this should be elsewhere in case Watch isn't used (eg: Polling instead...) + // XXX: unless this is used in our "fallback" polling implementation??? + obj.SetRefresh(true) + } + switch ev.Name { case event.EventStart: - return false, true + send = true || poke + return case event.EventPoke: - return false, true + send = true || poke + return case event.EventBackPoke: - return false, true // forward poking in response to a back poke! + send = true || poke + return // forward poking in response to a back poke! case event.EventExit: + // FIXME: what do we do if we have a pending refresh (poke) and an exit? return true, false case event.EventPause: // wait for next event to continue select { - case e := <-obj.Events(): + case e, ok := <-obj.Events(): + if !ok { // shutdown + return true, false + } e.ACK() if e.Name == event.EventExit { return true, false diff --git a/resources/svc.go b/resources/svc.go index 4d6aedbc..ee16b9d7 100644 --- a/resources/svc.go +++ b/resources/svc.go @@ -171,9 +171,6 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error { if exit, send = obj.ReadEvent(&event); exit { return nil // exit } - if event.GetActivity() { - obj.StateOK(false) // dirty - } case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! @@ -226,9 +223,6 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error { if exit, send = obj.ReadEvent(&event); exit { return nil // exit } - if event.GetActivity() { - obj.StateOK(false) // dirty - } case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! @@ -287,9 +281,10 @@ func (obj *SvcRes) CheckApply(apply bool) (checkOK bool, err error) { var running = (activestate.Value == dbus.MakeVariant("active")) var stateOK = ((obj.State == "") || (obj.State == "running" && running) || (obj.State == "stopped" && !running)) - var startupOK = true // XXX: DETECT AND SET + var startupOK = true // XXX: DETECT AND SET + var refresh = obj.Refresh() // do we have a pending reload to apply? - if stateOK && startupOK { + if stateOK && startupOK && !refresh { return true, nil // we are in the correct state } @@ -320,11 +315,19 @@ func (obj *SvcRes) CheckApply(apply bool) (checkOK bool, err error) { if err != nil { return false, errwrap.Wrapf(err, "Failed to start unit") } + if refresh { + log.Printf("%s[%s]: Skipping reload, due to pending start", obj.Kind(), obj.GetName()) + } + refresh = false // we did a start, so a reload is not needed } else if obj.State == "stopped" { _, err = conn.StopUnit(svc, "fail", result) if err != nil { return false, errwrap.Wrapf(err, "Failed to stop unit") } + if refresh { + log.Printf("%s[%s]: Skipping reload, due to pending stop", obj.Kind(), obj.GetName()) + } + refresh = false // we did a stop, so a reload is not needed } status := <-result @@ -335,6 +338,11 @@ func (obj *SvcRes) CheckApply(apply bool) (checkOK bool, err error) { return false, fmt.Errorf("Unknown systemd return string: %v", status) } + if refresh { // we need to reload the service + // XXX: run a svc reload here! + log.Printf("%s[%s]: Reloading...", obj.Kind(), obj.GetName()) + } + // XXX: also set enabled on boot return false, nil // success diff --git a/resources/timer.go b/resources/timer.go index 5b392f96..7c35f7b8 100644 --- a/resources/timer.go +++ b/resources/timer.go @@ -33,6 +33,8 @@ func init() { type TimerRes struct { BaseRes `yaml:",inline"` Interval int `yaml:"interval"` // Interval : Interval between runs + + ticker *time.Ticker } // TimerUID is the UID struct for TimerRes. @@ -65,6 +67,11 @@ func (obj *TimerRes) Validate() error { return nil } +// newTicker creates a new ticker +func (obj *TimerRes) newTicker() *time.Ticker { + return time.NewTicker(time.Duration(obj.Interval) * time.Second) +} + // Watch is the primary listener for this resource and it outputs events. func (obj *TimerRes) Watch(processChan chan event.Event) error { if obj.IsWatching() { @@ -84,16 +91,16 @@ func (obj *TimerRes) Watch(processChan chan event.Event) error { return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout } - // Create a time.Ticker for the given interval - ticker := time.NewTicker(time.Duration(obj.Interval) * time.Second) - defer ticker.Stop() + // create a time.Ticker for the given interval + obj.ticker = obj.newTicker() + defer obj.ticker.Stop() var send = false for { obj.SetState(ResStateWatching) select { - case <-ticker.C: // received the timer event + case <-obj.ticker.C: // received the timer event send = true log.Printf("%s[%s]: received tick", obj.Kind(), obj.GetName()) @@ -121,6 +128,22 @@ func (obj *TimerRes) Watch(processChan chan event.Event) error { } } +// CheckApply method for Timer resource. Triggers a timer reset on notify. +func (obj *TimerRes) CheckApply(apply bool) (bool, error) { + // because there are no checks to run, this resource has a less + // traditional pattern than what is seen in most resources... + if !obj.Refresh() { // this works for apply || !apply + return true, nil // state is always okay if no refresh to do + } else if !apply { // we had a refresh to do + return false, nil // therefore state is wrong + } + + // reset the timer since apply && refresh + obj.ticker.Stop() + obj.ticker = obj.newTicker() + return false, nil +} + // GetUIDs includes all params to make a unique identification of this object. // Most resources only return one, although some resources can return multiple. func (obj *TimerRes) GetUIDs() []ResUID { @@ -158,8 +181,3 @@ func (obj *TimerRes) Compare(res Res) bool { } return true } - -// CheckApply method for Timer resource. Does nothing, returns happy! -func (obj *TimerRes) CheckApply(apply bool) (bool, error) { - return true, nil // state is always okay -}