diff --git a/examples/lib/libmgmt3.go b/examples/lib/libmgmt3.go index 4a960a38..b0eb190f 100644 --- a/examples/lib/libmgmt3.go +++ b/examples/lib/libmgmt3.go @@ -83,7 +83,20 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { v2 := pgraph.NewVertex(f1) g.AddVertex(v2) + s1 := &resources.SvcRes{ + BaseRes: resources.BaseRes{ + Name: "purpleidea", + }, + State: "stopped", + } + + v3 := pgraph.NewVertex(s1) + g.AddVertex(v3) + g.AddEdge(v1, v2, pgraph.NewEdge("e1")) + e2 := pgraph.NewEdge("e2") + e2.Notify = true // send a notification from v2 to v3 + g.AddEdge(v2, v3, e2) //g, err := config.NewGraphFromConfig(obj.data.Hostname, obj.data.EmbdEtcd, obj.data.Noop) return g, nil diff --git a/pgraph/actions.go b/pgraph/actions.go index 2bfcef90..d38e829e 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -29,6 +29,7 @@ import ( "github.com/purpleidea/mgmt/resources" errwrap "github.com/pkg/errors" + "golang.org/x/sync/errgroup" ) // GetTimestamp returns the timestamp of a vertex @@ -45,7 +46,7 @@ func (v *Vertex) UpdateTimestamp() int64 { // OKTimestamp returns true if this element can run right now? func (g *Graph) OKTimestamp(v *Vertex) bool { // these are all the vertices pointing TO v, eg: ??? -> v - for _, n := range g.IncomingGraphEdges(v) { + for _, n := range g.IncomingGraphVertices(v) { // if the vertex has a greater timestamp than any pre-req (n) // then we can't run right now... // if they're equal (eg: on init of 0) then we also can't run @@ -63,29 +64,42 @@ func (g *Graph) OKTimestamp(v *Vertex) bool { // Poke notifies nodes after me in the dependency graph that they need refreshing... // NOTE: this assumes that this can never fail or need to be rescheduled -func (g *Graph) Poke(v *Vertex, activity bool) { +func (g *Graph) Poke(v *Vertex, activity bool) error { + var eg errgroup.Group // these are all the vertices pointing AWAY FROM v, eg: v -> ??? - for _, n := range g.OutgoingGraphEdges(v) { + for _, n := range g.OutgoingGraphVertices(v) { // XXX: if we're in state event and haven't been cancelled by // apply, then we can cancel a poke to a child, right? XXX - // XXX: if n.Res.getState() != resources.ResStateEvent { // is this correct? - if true { // XXX + // XXX: if n.Res.getState() != resources.ResStateEvent || activity { // is this correct? + if true || activity { // XXX: ??? if global.DEBUG { log.Printf("%s[%s]: Poke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) } - n.SendEvent(event.EventPoke, false, activity) // XXX: can this be switched to sync? + //wg.Add(1) + eg.Go(func() error { + //defer wg.Done() + edge := g.Adjacency[v][n] // lookup + notify := edge.Notify && edge.Refresh() + + // FIXME: is it okay that this is sync? + n.SendEvent(event.EventPoke, true, notify) + // TODO: check return value? + return nil // never error for now... + }) + } else { if global.DEBUG { log.Printf("%s[%s]: Poke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) } } } + return eg.Wait() // wait for all the pokes to complete } // BackPoke pokes the pre-requisites that are stale and need to run before I can run. func (g *Graph) BackPoke(v *Vertex) { // these are all the vertices pointing TO v, eg: ??? -> v - for _, n := range g.IncomingGraphEdges(v) { + for _, n := range g.IncomingGraphVertices(v) { x, y, s := v.GetTimestamp(), n.GetTimestamp(), n.Res.GetState() // if the parent timestamp needs poking AND it's not in state // ResStateEvent, then poke it. If the parent is in ResStateEvent it @@ -97,7 +111,8 @@ func (g *Graph) BackPoke(v *Vertex) { if global.DEBUG { log.Printf("%s[%s]: BackPoke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) } - n.SendEvent(event.EventBackPoke, false, false) // XXX: can this be switched to sync? + // FIXME: is it okay that this is sync? + n.SendEvent(event.EventBackPoke, true, false) } else { if global.DEBUG { log.Printf("%s[%s]: BackPoke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) @@ -106,6 +121,39 @@ func (g *Graph) BackPoke(v *Vertex) { } } +// RefreshPending determines if any previous nodes have a refresh pending here. +// If this is true, it means I am expected to apply a refresh when I next run. +func (g *Graph) RefreshPending(v *Vertex) bool { + var refresh bool + for _, edge := range g.IncomingGraphEdges(v) { + // if we asked for a notify *and* if one is pending! + if edge.Notify && edge.Refresh() { + refresh = true + break + } + } + return refresh +} + +// SetUpstreamRefresh sets the refresh value to any upstream vertices. +func (g *Graph) SetUpstreamRefresh(v *Vertex, b bool) { + for _, edge := range g.IncomingGraphEdges(v) { + if edge.Notify { + edge.SetRefresh(b) + } + } +} + +// SetDownstreamRefresh sets the refresh value to any downstream vertices. +func (g *Graph) SetDownstreamRefresh(v *Vertex, b bool) { + for _, edge := range g.OutgoingGraphEdges(v) { + // if we asked for a notify *and* if one is pending! + if edge.Notify { + edge.SetRefresh(b) + } + } +} + // Process is the primary function to execute for a particular vertex in the graph. func (g *Graph) Process(v *Vertex) error { obj := v.Res @@ -114,7 +162,7 @@ func (g *Graph) Process(v *Vertex) error { } obj.SetState(resources.ResStateEvent) var ok = true - var apply = false // did we run an apply? + var applied = false // did we run an apply? // is it okay to run dependency wise right now? // if not, that's okay because when the dependency runs, it will poke // us back and we will run if needed then! @@ -132,17 +180,33 @@ func (g *Graph) Process(v *Vertex) error { obj.StateOK(false) // invalidate cache, mark as dirty } - if global.DEBUG { - log.Printf("%s[%s]: CheckApply(%t)", obj.Kind(), obj.GetName(), !obj.Meta().Noop) - } - + var noop = obj.Meta().Noop // lookup the noop value + var refresh bool var checkOK bool var err error - if obj.IsStateOK() { // check cached state, to skip CheckApply + + if global.DEBUG { + log.Printf("%s[%s]: CheckApply(%t)", obj.Kind(), obj.GetName(), !noop) + } + + // lookup the refresh (notification) variable + refresh = g.RefreshPending(v) // do i need to perform a refresh? + obj.SetRefresh(refresh) // tell the resource + + // check cached state, to skip CheckApply; can't skip if refreshing + if !refresh && obj.IsStateOK() { checkOK, err = true, nil + + // NOTE: technically this block is wrong because we don't know + // if the resource implements refresh! If it doesn't, we could + // skip this, but it doesn't make a big difference under noop! + } else if noop && refresh { // had a refresh to do w/ noop! + checkOK, err = false, nil // therefore the state is wrong + + // run the CheckApply! } else { // if this fails, don't UpdateTimestamp() - checkOK, err = obj.CheckApply(!obj.Meta().Noop) + checkOK, err = obj.CheckApply(!noop) } if checkOK && err != nil { // should never return this way @@ -153,32 +217,45 @@ func (g *Graph) Process(v *Vertex) error { } // if CheckApply ran without noop and without error, state should be good - if !obj.Meta().Noop && err == nil { // aka !obj.Meta().Noop || checkOK - obj.StateOK(true) // reset + if !noop && err == nil { // aka !noop || checkOK + obj.StateOK(true) // reset + g.SetUpstreamRefresh(v, false) // refresh happened, clear the request } if !checkOK { // if state *was* not ok, we had to have apply'ed if err != nil { // error during check or apply ok = false } else { - apply = true + applied = true } } // when noop is true we always want to update timestamp - if obj.Meta().Noop && err == nil { + if noop && err == nil { ok = true } if ok { + // did we actually do work? + activity := applied + if noop { + activity = false // no we didn't do work... + } + + if activity { // add refresh flag to downstream edges... + g.SetDownstreamRefresh(v, true) + } + // update this timestamp *before* we poke or the poked // nodes might fail due to having a too old timestamp! v.UpdateTimestamp() // this was touched... obj.SetState(resources.ResStatePoking) // can't cancel parent poke - g.Poke(v, apply) + if err := g.Poke(v, activity); err != nil { + return errwrap.Wrapf(err, "the Poke() failed") + } } // poke at our pre-req's instead since they need to refresh/run... - return err + return errwrap.Wrapf(err, "could not Process() successfully") } // else... only poke at the pre-req's that need to run go g.BackPoke(v) diff --git a/pgraph/autogroup.go b/pgraph/autogroup.go index e9effdbc..a53a2b73 100644 --- a/pgraph/autogroup.go +++ b/pgraph/autogroup.go @@ -221,7 +221,7 @@ func (g *Graph) VertexMerge(v1, v2 *Vertex, vertexMergeFn func(*Vertex, *Vertex) } // 2) edges that point towards v2 from X now point to v1 from X (no dupes) - for _, x := range g.IncomingGraphEdges(v2) { // all to vertex v (??? -> v) + for _, x := range g.IncomingGraphVertices(v2) { // all to vertex v (??? -> v) e := g.Adjacency[x][v2] // previous edge r := g.Reachability(x, v1) // merge e with ex := g.Adjacency[x][v1] if it exists! @@ -248,7 +248,7 @@ func (g *Graph) VertexMerge(v1, v2 *Vertex, vertexMergeFn func(*Vertex, *Vertex) } // 3) edges that point from v2 to X now point from v1 to X (no dupes) - for _, x := range g.OutgoingGraphEdges(v2) { // all from vertex v (v -> ???) + for _, x := range g.OutgoingGraphVertices(v2) { // all from vertex v (v -> ???) e := g.Adjacency[v2][x] // previous edge r := g.Reachability(v1, x) // merge e with ex := g.Adjacency[v1][x] if it exists! diff --git a/pgraph/pgraph.go b/pgraph/pgraph.go index 922ce025..9529744c 100644 --- a/pgraph/pgraph.go +++ b/pgraph/pgraph.go @@ -61,7 +61,10 @@ type Vertex struct { // Edge is the primary edge struct in this library. type Edge struct { - Name string + Name string + Notify bool // should we send a refresh notification along this edge? + + refresh bool // is there a notify pending for the dest vertex ? } // NewGraph builds a new graph. @@ -87,6 +90,16 @@ func NewEdge(name string) *Edge { } } +// Refresh returns the pending refresh status of this edge. +func (obj *Edge) Refresh() bool { + return obj.refresh +} + +// SetRefresh sets the pending refresh status of this edge. +func (obj *Edge) SetRefresh(b bool) { + obj.refresh = b +} + // Copy makes a copy of the graph struct func (g *Graph) Copy() *Graph { newGraph := &Graph{ @@ -249,9 +262,9 @@ func (v *Vertex) String() string { return fmt.Sprintf("%s[%s]", v.Res.Kind(), v.Res.GetName()) } -// IncomingGraphEdges returns an array (slice) of all directed vertices to +// IncomingGraphVertices returns an array (slice) of all directed vertices to // vertex v (??? -> v). OKTimestamp should probably use this. -func (g *Graph) IncomingGraphEdges(v *Vertex) []*Vertex { +func (g *Graph) IncomingGraphVertices(v *Vertex) []*Vertex { // TODO: we might be able to implement this differently by reversing // the Adjacency graph and then looping through it again... var s []*Vertex @@ -265,9 +278,9 @@ func (g *Graph) IncomingGraphEdges(v *Vertex) []*Vertex { return s } -// OutgoingGraphEdges returns an array (slice) of all vertices that vertex v +// OutgoingGraphVertices returns an array (slice) of all vertices that vertex v // points to (v -> ???). Poke should probably use this. -func (g *Graph) OutgoingGraphEdges(v *Vertex) []*Vertex { +func (g *Graph) OutgoingGraphVertices(v *Vertex) []*Vertex { var s []*Vertex for k := range g.Adjacency[v] { // forward paths s = append(s, k) @@ -275,15 +288,46 @@ func (g *Graph) OutgoingGraphEdges(v *Vertex) []*Vertex { return s } -// GraphEdges returns an array (slice) of all vertices that connect to vertex v. -// This is the union of IncomingGraphEdges and OutgoingGraphEdges. -func (g *Graph) GraphEdges(v *Vertex) []*Vertex { +// GraphVertices returns an array (slice) of all vertices that connect to vertex v. +// This is the union of IncomingGraphVertices and OutgoingGraphVertices. +func (g *Graph) GraphVertices(v *Vertex) []*Vertex { var s []*Vertex - s = append(s, g.IncomingGraphEdges(v)...) - s = append(s, g.OutgoingGraphEdges(v)...) + s = append(s, g.IncomingGraphVertices(v)...) + s = append(s, g.OutgoingGraphVertices(v)...) return s } +// IncomingGraphEdges returns all of the edges that point to vertex v (??? -> v). +func (g *Graph) IncomingGraphEdges(v *Vertex) []*Edge { + var edges []*Edge + for v1 := range g.Adjacency { // reverse paths + for v2, e := range g.Adjacency[v1] { + if v2 == v { + edges = append(edges, e) + } + } + } + return edges +} + +// OutgoingGraphEdges returns all of the edges that point from vertex v (v -> ???). +func (g *Graph) OutgoingGraphEdges(v *Vertex) []*Edge { + var edges []*Edge + for _, e := range g.Adjacency[v] { // forward paths + edges = append(edges, e) + } + return edges +} + +// GraphEdges returns an array (slice) of all edges that connect to vertex v. +// This is the union of IncomingGraphEdges and OutgoingGraphEdges. +func (g *Graph) GraphEdges(v *Vertex) []*Edge { + var edges []*Edge + edges = append(edges, g.IncomingGraphEdges(v)...) + edges = append(edges, g.OutgoingGraphEdges(v)...) + return edges +} + // DFS returns a depth first search for the graph, starting at the input vertex. func (g *Graph) DFS(start *Vertex) []*Vertex { var d []*Vertex // discovered @@ -299,7 +343,7 @@ func (g *Graph) DFS(start *Vertex) []*Vertex { if !VertexContains(v, d) { // if not discovered d = append(d, v) // label as discovered - for _, w := range g.GraphEdges(v) { + for _, w := range g.GraphVertices(v) { s = append(s, w) } } @@ -446,7 +490,7 @@ func (g *Graph) Reachability(a, b *Vertex) []*Vertex { if a == nil || b == nil { return nil } - vertices := g.OutgoingGraphEdges(a) // what points away from a ? + vertices := g.OutgoingGraphVertices(a) // what points away from a ? if len(vertices) == 0 { return []*Vertex{} // nope } diff --git a/resources/msg.go b/resources/msg.go index 88a72297..2bf956fd 100644 --- a/resources/msg.go +++ b/resources/msg.go @@ -94,6 +94,46 @@ func (obj *MsgRes) Validate() error { return nil } +// isAllStateOK derives a compound state from all internal cache flags that apply to this resource. +func (obj *MsgRes) isAllStateOK() bool { + if obj.Journal && !obj.journalStateOK { + return false + } + if obj.Syslog && !obj.syslogStateOK { + return false + } + return obj.logStateOK +} + +// updateStateOK sets the global state so it can be read by the engine. +func (obj *MsgRes) updateStateOK() { + obj.StateOK(obj.isAllStateOK()) +} + +// JournalPriority converts a string description to a numeric priority. +// XXX: Have Validate() make sure it actually is one of these. +func (obj *MsgRes) journalPriority() journal.Priority { + switch obj.Priority { + case "Emerg": + return journal.PriEmerg + case "Alert": + return journal.PriAlert + case "Crit": + return journal.PriCrit + case "Err": + return journal.PriErr + case "Warning": + return journal.PriWarning + case "Notice": + return journal.PriNotice + case "Info": + return journal.PriInfo + case "Debug": + return journal.PriDebug + } + return journal.PriNotice +} + // Watch is the primary listener for this resource and it outputs events. func (obj *MsgRes) Watch(processChan chan event.Event) error { if obj.IsWatching() { @@ -125,17 +165,6 @@ func (obj *MsgRes) Watch(processChan chan event.Event) error { return nil // exit } - // TODO: invalidate cached state on poke events - //obj.logStateOK = false - //if obj.Journal { - // obj.journalStateOK = false - //} - //if obj.Syslog { - // obj.syslogStateOK = false - //} - //obj.updateStateOK() - send = true - case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue @@ -158,6 +187,51 @@ func (obj *MsgRes) Watch(processChan chan event.Event) error { } } +// CheckApply method for Msg resource. +// Every check leads to an apply, meaning that the message is flushed to the journal. +func (obj *MsgRes) CheckApply(apply bool) (bool, error) { + + // isStateOK() done by engine, so we updateStateOK() to pass in value + //if obj.isAllStateOK() { + // return true, nil + //} + + if obj.Refresh() { // if we were notified... + // invalidate cached state... + obj.logStateOK = false + if obj.Journal { + obj.journalStateOK = false + } + if obj.Syslog { + obj.syslogStateOK = false + } + obj.updateStateOK() + } + + if !obj.logStateOK { + log.Printf("%s[%s]: Body: %s", obj.Kind(), obj.GetName(), obj.Body) + obj.logStateOK = true + obj.updateStateOK() + } + + if !apply { + return false, nil + } + if obj.Journal && !obj.journalStateOK { + if err := journal.Send(obj.Body, obj.journalPriority(), obj.Fields); err != nil { + return false, err + } + obj.journalStateOK = true + obj.updateStateOK() + } + if obj.Syslog && !obj.syslogStateOK { + // TODO: implement syslog client + obj.syslogStateOK = true + obj.updateStateOK() + } + return false, nil +} + // GetUIDs includes all params to make a unique identification of this object. // Most resources only return one, although some resources can return multiple. func (obj *MsgRes) GetUIDs() []ResUID { @@ -203,76 +277,3 @@ func (obj *MsgRes) Compare(res Res) bool { } return true } - -// isAllStateOK derives a compound state from all internal cache flags that apply to this resource. -func (obj *MsgRes) isAllStateOK() bool { - if obj.Journal && !obj.journalStateOK { - return false - } - if obj.Syslog && !obj.syslogStateOK { - return false - } - return obj.logStateOK -} - -// updateStateOK sets the global state so it can be read by the engine. -func (obj *MsgRes) updateStateOK() { - obj.StateOK(obj.isAllStateOK()) -} - -// JournalPriority converts a string description to a numeric priority. -// XXX: Have Validate() make sure it actually is one of these. -func (obj *MsgRes) journalPriority() journal.Priority { - switch obj.Priority { - case "Emerg": - return journal.PriEmerg - case "Alert": - return journal.PriAlert - case "Crit": - return journal.PriCrit - case "Err": - return journal.PriErr - case "Warning": - return journal.PriWarning - case "Notice": - return journal.PriNotice - case "Info": - return journal.PriInfo - case "Debug": - return journal.PriDebug - } - return journal.PriNotice -} - -// CheckApply method for Msg resource. -// Every check leads to an apply, meaning that the message is flushed to the journal. -func (obj *MsgRes) CheckApply(apply bool) (bool, error) { - - // isStateOK() done by engine, so we updateStateOK() to pass in value - //if obj.isAllStateOK() { - // return true, nil - //} - - if !obj.logStateOK { - log.Printf("%s[%s]: Body: %s", obj.Kind(), obj.GetName(), obj.Body) - obj.logStateOK = true - obj.updateStateOK() - } - - if !apply { - return false, nil - } - if obj.Journal && !obj.journalStateOK { - if err := journal.Send(obj.Body, obj.journalPriority(), obj.Fields); err != nil { - return false, err - } - obj.journalStateOK = true - obj.updateStateOK() - } - if obj.Syslog && !obj.syslogStateOK { - // TODO: implement syslog client - obj.syslogStateOK = true - obj.updateStateOK() - } - return false, nil -} diff --git a/resources/refresh.go b/resources/refresh.go new file mode 100644 index 00000000..640de553 --- /dev/null +++ b/resources/refresh.go @@ -0,0 +1,104 @@ +// Mgmt +// Copyright (C) 2013-2016+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package resources + +import ( + "fmt" + "os" + "strings" + + errwrap "github.com/pkg/errors" +) + +// Refresh returns the pending state of a notification. It should only be called +// in the CheckApply portion of a resource where a refresh should be acted upon. +func (obj *BaseRes) Refresh() bool { + return obj.refresh +} + +// SetRefresh sets the pending state of a notification. It should only be called +// by the mgmt engine. +func (obj *BaseRes) SetRefresh(b bool) { + obj.refresh = b +} + +// StatefulBool is an interface for storing a boolean flag in a permanent spot. +type StatefulBool interface { + Get() (bool, error) // get value of token + Set() error // set token to true + Del() error // rm token if it exists +} + +// DiskBool stores a boolean variable on disk for stateful access across runs. +// The absence of the path is treated as false. If the path contains a special +// value, then it is treated as true. All the other non-error cases are false. +type DiskBool struct { + Path string // path to token +} + +// str returns the string data which represents true (aka set). +func (obj *DiskBool) str() string { + const TrueToken = "true" + const newline = "\n" + return TrueToken + newline +} + +// Get returns if the boolean setting, if no error reading the value occurs. +func (obj *DiskBool) Get() (bool, error) { + file, err := os.Open(obj.Path) // open a handle to read the file + if err != nil { + if os.IsNotExist(err) { + return false, nil // no token means value is false + } + return false, errwrap.Wrapf(err, "could not read token") + } + defer file.Close() + str := obj.str() + data := make([]byte, len(str)) // data + newline + if _, err := file.Read(data); err != nil { + return false, errwrap.Wrapf(err, "could not read from file") + } + return strings.TrimSpace(string(data)) == strings.TrimSpace(str), nil +} + +// Set stores the true boolean value, if no error setting the value occurs. +func (obj *DiskBool) Set() error { + file, err := os.Create(obj.Path) // open a handle to create the file + if err != nil { + return errwrap.Wrapf(err, "can't create file") + } + defer file.Close() + str := obj.str() + if c, err := file.Write([]byte(str)); err != nil { + return errwrap.Wrapf(err, "error writing to file") + } else if l := len(str); c != l { + return fmt.Errorf("wrote %d bytes instead of %d", c, l) + } + return file.Sync() // guarantee it! +} + +// Del stores the false boolean value, if no error clearing the value occurs. +func (obj *DiskBool) Del() error { + if err := os.Remove(obj.Path); err != nil { // remove the file + if os.IsNotExist(err) { + return nil // no file means this is already fine + } + return errwrap.Wrapf(err, "could not delete token") + } + return nil +} diff --git a/resources/resources.go b/resources/resources.go index 07825e9a..0ceb9d2e 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -49,6 +49,8 @@ const ( ResStatePoking ) +const refreshPathToken = "refresh" + // Data is the set of input values passed into the pgraph for the resources. type Data struct { //Hostname string // uuid for the host @@ -133,6 +135,8 @@ type Base interface { DoSend(chan event.Event, string) (bool, error) SendEvent(event.EventName, bool, bool) bool ReadEvent(*event.Event) (bool, bool) // TODO: optional here? + Refresh() bool // is there a pending refresh to run? + SetRefresh(bool) // set the refresh state of this resource SendRecv(Res) (bool, error) // send->recv data passing function IsStateOK() bool StateOK(b bool) @@ -173,6 +177,8 @@ type BaseRes struct { isStateOK bool // whether the state is okay based on events or not isGrouped bool // am i contained within a group? grouped []Res // list of any grouped resources + refresh bool // does this resource have a refresh to run? + //refreshState StatefulBool // TODO: future stateful bool } // UIDExistsInUIDs wraps the IFF method when used with a list of UID's. @@ -222,6 +228,12 @@ func (obj *BaseRes) Init() error { return fmt.Errorf("Resource did not set kind!") } obj.events = make(chan event.Event) // unbuffered chan to avoid stale events + //dir, err := obj.VarDir("") + //if err != nil { + // return errwrap.Wrapf(err, "VarDir failed in Init()") + //} + // TODO: this StatefulBool implementation could be eventually swappable + //obj.refreshState = &DiskBool{Path: path.Join(dir, refreshPathToken)} return nil } diff --git a/resources/sendrecv.go b/resources/sendrecv.go index f964ea42..4af0529d 100644 --- a/resources/sendrecv.go +++ b/resources/sendrecv.go @@ -29,32 +29,6 @@ import ( errwrap "github.com/pkg/errors" ) -// DoSend sends off an event, but doesn't block the incoming event queue. It can -// also recursively call itself when events need processing during the wait. -// I'm not completely comfortable with this fn, but it will have to do for now. -func (obj *BaseRes) DoSend(processChan chan event.Event, comment string) (bool, error) { - resp := event.NewResp() - processChan <- event.Event{Name: event.EventNil, Resp: resp, Msg: comment, Activity: true} // trigger process - e := resp.Wait() - return false, e // XXX: at the moment, we don't use the exit bool. - // XXX: this can cause a deadlock. do we need to recursively send? fix event stuff! - //select { - //case e := <-resp: // wait for the ACK() - // if e != nil { // we got a NACK - // return true, e // exit with error - // } - //case event := <-obj.events: - // // NOTE: this code should match the similar code below! - // //cuid.SetConverged(false) // TODO: ? - // if exit, send := obj.ReadEvent(&event); exit { - // return true, nil // exit, without error - // } else if send { - // return obj.DoSend(processChan, comment) // recurse - // } - //} - //return false, nil // return, no error or exit signal -} - // SendEvent pushes an event into the message queue for a particular vertex func (obj *BaseRes) SendEvent(ev event.EventName, sync bool, activity bool) bool { // TODO: isn't this race-y ? @@ -72,27 +46,52 @@ func (obj *BaseRes) SendEvent(ev event.EventName, sync bool, activity bool) bool return true } +// DoSend sends off an event, but doesn't block the incoming event queue. +func (obj *BaseRes) DoSend(processChan chan event.Event, comment string) (exit bool, err error) { + resp := event.NewResp() + processChan <- event.Event{Name: event.EventNil, Resp: resp, Activity: false, Msg: comment} // trigger process + e := resp.Wait() + return false, e // XXX: at the moment, we don't use the exit bool. +} + // ReadEvent processes events when a select gets one, and handles the pause // code too! The return values specify if we should exit and poke respectively. -func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, poke bool) { +func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, send bool) { ev.ACK() + var poke bool + // ensure that a CheckApply runs by sending with a dirty state... + if ev.GetActivity() { // if previous node did work, and we were notified... + obj.StateOK(false) // dirty + poke = true // poke! + // XXX: this should be elsewhere in case Watch isn't used (eg: Polling instead...) + // XXX: unless this is used in our "fallback" polling implementation??? + obj.SetRefresh(true) + } + switch ev.Name { case event.EventStart: - return false, true + send = true || poke + return case event.EventPoke: - return false, true + send = true || poke + return case event.EventBackPoke: - return false, true // forward poking in response to a back poke! + send = true || poke + return // forward poking in response to a back poke! case event.EventExit: + // FIXME: what do we do if we have a pending refresh (poke) and an exit? return true, false case event.EventPause: // wait for next event to continue select { - case e := <-obj.Events(): + case e, ok := <-obj.Events(): + if !ok { // shutdown + return true, false + } e.ACK() if e.Name == event.EventExit { return true, false diff --git a/resources/svc.go b/resources/svc.go index 4d6aedbc..ee16b9d7 100644 --- a/resources/svc.go +++ b/resources/svc.go @@ -171,9 +171,6 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error { if exit, send = obj.ReadEvent(&event); exit { return nil // exit } - if event.GetActivity() { - obj.StateOK(false) // dirty - } case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! @@ -226,9 +223,6 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error { if exit, send = obj.ReadEvent(&event); exit { return nil // exit } - if event.GetActivity() { - obj.StateOK(false) // dirty - } case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! @@ -287,9 +281,10 @@ func (obj *SvcRes) CheckApply(apply bool) (checkOK bool, err error) { var running = (activestate.Value == dbus.MakeVariant("active")) var stateOK = ((obj.State == "") || (obj.State == "running" && running) || (obj.State == "stopped" && !running)) - var startupOK = true // XXX: DETECT AND SET + var startupOK = true // XXX: DETECT AND SET + var refresh = obj.Refresh() // do we have a pending reload to apply? - if stateOK && startupOK { + if stateOK && startupOK && !refresh { return true, nil // we are in the correct state } @@ -320,11 +315,19 @@ func (obj *SvcRes) CheckApply(apply bool) (checkOK bool, err error) { if err != nil { return false, errwrap.Wrapf(err, "Failed to start unit") } + if refresh { + log.Printf("%s[%s]: Skipping reload, due to pending start", obj.Kind(), obj.GetName()) + } + refresh = false // we did a start, so a reload is not needed } else if obj.State == "stopped" { _, err = conn.StopUnit(svc, "fail", result) if err != nil { return false, errwrap.Wrapf(err, "Failed to stop unit") } + if refresh { + log.Printf("%s[%s]: Skipping reload, due to pending stop", obj.Kind(), obj.GetName()) + } + refresh = false // we did a stop, so a reload is not needed } status := <-result @@ -335,6 +338,11 @@ func (obj *SvcRes) CheckApply(apply bool) (checkOK bool, err error) { return false, fmt.Errorf("Unknown systemd return string: %v", status) } + if refresh { // we need to reload the service + // XXX: run a svc reload here! + log.Printf("%s[%s]: Reloading...", obj.Kind(), obj.GetName()) + } + // XXX: also set enabled on boot return false, nil // success diff --git a/resources/timer.go b/resources/timer.go index 5b392f96..7c35f7b8 100644 --- a/resources/timer.go +++ b/resources/timer.go @@ -33,6 +33,8 @@ func init() { type TimerRes struct { BaseRes `yaml:",inline"` Interval int `yaml:"interval"` // Interval : Interval between runs + + ticker *time.Ticker } // TimerUID is the UID struct for TimerRes. @@ -65,6 +67,11 @@ func (obj *TimerRes) Validate() error { return nil } +// newTicker creates a new ticker +func (obj *TimerRes) newTicker() *time.Ticker { + return time.NewTicker(time.Duration(obj.Interval) * time.Second) +} + // Watch is the primary listener for this resource and it outputs events. func (obj *TimerRes) Watch(processChan chan event.Event) error { if obj.IsWatching() { @@ -84,16 +91,16 @@ func (obj *TimerRes) Watch(processChan chan event.Event) error { return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout } - // Create a time.Ticker for the given interval - ticker := time.NewTicker(time.Duration(obj.Interval) * time.Second) - defer ticker.Stop() + // create a time.Ticker for the given interval + obj.ticker = obj.newTicker() + defer obj.ticker.Stop() var send = false for { obj.SetState(ResStateWatching) select { - case <-ticker.C: // received the timer event + case <-obj.ticker.C: // received the timer event send = true log.Printf("%s[%s]: received tick", obj.Kind(), obj.GetName()) @@ -121,6 +128,22 @@ func (obj *TimerRes) Watch(processChan chan event.Event) error { } } +// CheckApply method for Timer resource. Triggers a timer reset on notify. +func (obj *TimerRes) CheckApply(apply bool) (bool, error) { + // because there are no checks to run, this resource has a less + // traditional pattern than what is seen in most resources... + if !obj.Refresh() { // this works for apply || !apply + return true, nil // state is always okay if no refresh to do + } else if !apply { // we had a refresh to do + return false, nil // therefore state is wrong + } + + // reset the timer since apply && refresh + obj.ticker.Stop() + obj.ticker = obj.newTicker() + return false, nil +} + // GetUIDs includes all params to make a unique identification of this object. // Most resources only return one, although some resources can return multiple. func (obj *TimerRes) GetUIDs() []ResUID { @@ -158,8 +181,3 @@ func (obj *TimerRes) Compare(res Res) bool { } return true } - -// CheckApply method for Timer resource. Does nothing, returns happy! -func (obj *TimerRes) CheckApply(apply bool) (bool, error) { - return true, nil // state is always okay -}