From 63f21952f4347f9fb0cd4655110a37d57606e7c6 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Tue, 20 Sep 2016 06:33:13 -0400 Subject: [PATCH] golang: Split things into packages This makes this logically more separate! :) As an aside... I really hate the way golang does dependencies and packages. Yes, some people insist on nesting their code deep into a $GOPATH, which is fine if you're a google dev and are forced to work this way, but annoying for the rest of the world. Your code shouldn't need a git commit to switch to a a different vcs host! Gah I hate this so much. --- config.go | 566 ------------------ configwatch.go | 34 +- converger.go => converger/converger.go | 9 +- doc.go | 19 + etcd.go => etcd/etcd.go | 204 ++++--- event.go => event/event.go | 21 +- gconfig/gconfig.go | 268 +++++++++ global/global.go | 25 + main.go | 66 +- pgraph/autoedge.go | 104 ++++ pgraph/autogroup.go | 348 +++++++++++ pgraph.go => pgraph/pgraph.go | 158 ++--- pgraph_test.go => pgraph/pgraph_test.go | 4 +- puppet.go => puppet/puppet.go | 16 +- remote.go => remote/remote.go | 65 +- exec.go => resources/exec.go | 11 +- file.go => resources/file.go | 77 +-- noop.go => resources/noop.go | 8 +- .../packagekit/packagekit.go | 12 +- pkg.go => resources/pkg.go | 76 +-- resources.go => resources/resources.go | 109 ++-- .../resources_test.go | 2 +- svc.go => resources/svc.go | 18 +- timer.go => resources/timer.go | 8 +- misc.go => util/util.go | 6 +- misc_test.go => util/util_test.go | 20 +- 26 files changed, 1213 insertions(+), 1041 deletions(-) delete mode 100644 config.go rename converger.go => converger/converger.go (98%) create mode 100644 doc.go rename etcd.go => etcd/etcd.go (96%) rename event.go => event/event.go (91%) create mode 100644 gconfig/gconfig.go create mode 100644 global/global.go create mode 100644 pgraph/autoedge.go create mode 100644 pgraph/autogroup.go rename pgraph.go => pgraph/pgraph.go (87%) rename pgraph_test.go => pgraph/pgraph_test.go (99%) rename puppet.go => puppet/puppet.go (92%) rename remote.go => remote/remote.go (94%) rename exec.go => resources/exec.go (98%) rename file.go => resources/file.go (94%) rename noop.go => resources/noop.go (96%) rename packagekit.go => resources/packagekit/packagekit.go (99%) rename pkg.go => resources/pkg.go (85%) rename resources.go => resources/resources.go (81%) rename resources_test.go => resources/resources_test.go (99%) rename svc.go => resources/svc.go (97%) rename timer.go => resources/timer.go (96%) rename misc.go => util/util.go (99%) rename misc_test.go => util/util_test.go (98%) diff --git a/config.go b/config.go deleted file mode 100644 index 54bf1efd..00000000 --- a/config.go +++ /dev/null @@ -1,566 +0,0 @@ -// Mgmt -// Copyright (C) 2013-2016+ James Shubin and the project contributors -// Written by James Shubin and the project contributors -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package main - -import ( - "errors" - "fmt" - "gopkg.in/yaml.v2" - "io/ioutil" - "log" - "reflect" - "strings" -) - -type collectorResConfig struct { - Kind string `yaml:"kind"` - Pattern string `yaml:"pattern"` // XXX: Not Implemented -} - -type vertexConfig struct { - Kind string `yaml:"kind"` - Name string `yaml:"name"` -} - -type edgeConfig struct { - Name string `yaml:"name"` - From vertexConfig `yaml:"from"` - To vertexConfig `yaml:"to"` -} - -// GraphConfig is the data structure that describes a single graph to run. -type GraphConfig struct { - Graph string `yaml:"graph"` - Resources struct { - Noop []*NoopRes `yaml:"noop"` - Pkg []*PkgRes `yaml:"pkg"` - File []*FileRes `yaml:"file"` - Svc []*SvcRes `yaml:"svc"` - Exec []*ExecRes `yaml:"exec"` - Timer []*TimerRes `yaml:"timer"` - } `yaml:"resources"` - Collector []collectorResConfig `yaml:"collect"` - Edges []edgeConfig `yaml:"edges"` - Comment string `yaml:"comment"` - Hostname string `yaml:"hostname"` // uuid for the host - Remote string `yaml:"remote"` -} - -// Parse parses a data stream into the graph structure. -func (c *GraphConfig) Parse(data []byte) error { - if err := yaml.Unmarshal(data, c); err != nil { - return err - } - if c.Graph == "" { - return errors.New("Graph config: invalid `graph`") - } - return nil -} - -// ParseConfigFromFile takes a filename and returns the graph config structure. -func ParseConfigFromFile(filename string) *GraphConfig { - data, err := ioutil.ReadFile(filename) - if err != nil { - log.Printf("Config: Error: ParseConfigFromFile: File: %v", err) - return nil - } - - var config GraphConfig - if err := config.Parse(data); err != nil { - log.Printf("Config: Error: ParseConfigFromFile: Parse: %v", err) - return nil - } - - return &config -} - -// NewGraphFromConfig returns a new graph from existing input, such as from the -// existing graph, and a GraphConfig struct. -func (g *Graph) NewGraphFromConfig(config *GraphConfig, embdEtcd *EmbdEtcd, noop bool) (*Graph, error) { - if config.Hostname == "" { - return nil, fmt.Errorf("Config: Error: Hostname can't be empty!") - } - - var graph *Graph // new graph to return - if g == nil { // FIXME: how can we check for an empty graph? - graph = NewGraph("Graph") // give graph a default name - } else { - graph = g.Copy() // same vertices, since they're pointers! - } - - var lookup = make(map[string]map[string]*Vertex) - - //log.Printf("%+v", config) // debug - - // TODO: if defined (somehow)... - graph.SetName(config.Graph) // set graph name - - var keep []*Vertex // list of vertex which are the same in new graph - var resources []Res // list of resources to export - // use reflection to avoid duplicating code... better options welcome! - value := reflect.Indirect(reflect.ValueOf(config.Resources)) - vtype := value.Type() - for i := 0; i < vtype.NumField(); i++ { // number of fields in struct - name := vtype.Field(i).Name // string of field name - field := value.FieldByName(name) - iface := field.Interface() // interface type of value - slice := reflect.ValueOf(iface) - // XXX: should we just drop these everywhere and have the kind strings be all lowercase? - kind := FirstToUpper(name) - if DEBUG { - log.Printf("Config: Processing: %v...", kind) - } - for j := 0; j < slice.Len(); j++ { // loop through resources of same kind - x := slice.Index(j).Interface() - res, ok := x.(Res) // convert to Res type - if !ok { - return nil, fmt.Errorf("Config: Error: Can't convert: %v of type: %T to Res.", x, x) - } - if noop { - res.Meta().Noop = noop - } - if _, exists := lookup[kind]; !exists { - lookup[kind] = make(map[string]*Vertex) - } - // XXX: should we export based on a @@ prefix, or a metaparam - // like exported => true || exported => (host pattern)||(other pattern?) - if !strings.HasPrefix(res.GetName(), "@@") { // not exported resource - // XXX: we don't have a way of knowing if any of the - // metaparams are undefined, and as a result to set the - // defaults that we want! I hate the go yaml parser!!! - v := graph.GetVertexMatch(res) - if v == nil { // no match found - res.Init() - v = NewVertex(res) - graph.AddVertex(v) // call standalone in case not part of an edge - } - lookup[kind][res.GetName()] = v // used for constructing edges - keep = append(keep, v) // append - - } else if !noop { // do not export any resources if noop - // store for addition to etcd storage... - res.SetName(res.GetName()[2:]) //slice off @@ - res.setKind(kind) // cheap init - resources = append(resources, res) - } - } - } - // store in etcd - if err := EtcdSetResources(embdEtcd, config.Hostname, resources); err != nil { - return nil, fmt.Errorf("Config: Could not export resources: %v", err) - } - - // lookup from etcd - var hostnameFilter []string // empty to get from everyone - kindFilter := []string{} - for _, t := range config.Collector { - // XXX: should we just drop these everywhere and have the kind strings be all lowercase? - kind := FirstToUpper(t.Kind) - kindFilter = append(kindFilter, kind) - } - // do all the graph look ups in one single step, so that if the etcd - // database changes, we don't have a partial state of affairs... - if len(kindFilter) > 0 { // if kindFilter is empty, don't need to do lookups! - var err error - resources, err = EtcdGetResources(embdEtcd, hostnameFilter, kindFilter) - if err != nil { - return nil, fmt.Errorf("Config: Could not collect resources: %v", err) - } - } - for _, res := range resources { - matched := false - // see if we find a collect pattern that matches - for _, t := range config.Collector { - // XXX: should we just drop these everywhere and have the kind strings be all lowercase? - kind := FirstToUpper(t.Kind) - // use t.Kind and optionally t.Pattern to collect from etcd storage - log.Printf("Collect: %v; Pattern: %v", kind, t.Pattern) - - // XXX: expand to more complex pattern matching here... - if res.Kind() != kind { - continue - } - - if matched { - // we've already matched this resource, should we match again? - log.Printf("Config: Warning: Matching %v[%v] again!", kind, res.GetName()) - } - matched = true - - // collect resources but add the noop metaparam - if noop { - res.Meta().Noop = noop - } - - if t.Pattern != "" { // XXX: simplistic for now - res.CollectPattern(t.Pattern) // res.Dirname = t.Pattern - } - - log.Printf("Collect: %v[%v]: collected!", kind, res.GetName()) - - // XXX: similar to other resource add code: - if _, exists := lookup[kind]; !exists { - lookup[kind] = make(map[string]*Vertex) - } - v := graph.GetVertexMatch(res) - if v == nil { // no match found - res.Init() // initialize go channels or things won't work!!! - v = NewVertex(res) - graph.AddVertex(v) // call standalone in case not part of an edge - } - lookup[kind][res.GetName()] = v // used for constructing edges - keep = append(keep, v) // append - - //break // let's see if another resource even matches - } - } - - // get rid of any vertices we shouldn't "keep" (that aren't in new graph) - for _, v := range graph.GetVertices() { - if !VertexContains(v, keep) { - // wait for exit before starting new graph! - v.SendEvent(eventExit, true, false) - graph.DeleteVertex(v) - } - } - - for _, e := range config.Edges { - if _, ok := lookup[FirstToUpper(e.From.Kind)]; !ok { - return nil, fmt.Errorf("Can't find 'from' resource!") - } - if _, ok := lookup[FirstToUpper(e.To.Kind)]; !ok { - return nil, fmt.Errorf("Can't find 'to' resource!") - } - if _, ok := lookup[FirstToUpper(e.From.Kind)][e.From.Name]; !ok { - return nil, fmt.Errorf("Can't find 'from' name!") - } - if _, ok := lookup[FirstToUpper(e.To.Kind)][e.To.Name]; !ok { - return nil, fmt.Errorf("Can't find 'to' name!") - } - graph.AddEdge(lookup[FirstToUpper(e.From.Kind)][e.From.Name], lookup[FirstToUpper(e.To.Kind)][e.To.Name], NewEdge(e.Name)) - } - - return graph, nil -} - -// add edges to the vertex in a graph based on if it matches a uuid list -func (g *Graph) addEdgesByMatchingUUIDS(v *Vertex, uuids []ResUUID) []bool { - // search for edges and see what matches! - var result []bool - - // loop through each uuid, and see if it matches any vertex - for _, uuid := range uuids { - var found = false - // uuid is a ResUUID object - for _, vv := range g.GetVertices() { // search - if v == vv { // skip self - continue - } - if DEBUG { - log.Printf("Compile: AutoEdge: Match: %v[%v] with UUID: %v[%v]", vv.Kind(), vv.GetName(), uuid.Kind(), uuid.GetName()) - } - // we must match to an effective UUID for the resource, - // that is to say, the name value of a res is a helpful - // handle, but it is not necessarily a unique identity! - // remember, resources can return multiple UUID's each! - if UUIDExistsInUUIDs(uuid, vv.GetUUIDs()) { - // add edge from: vv -> v - if uuid.Reversed() { - txt := fmt.Sprintf("AutoEdge: %v[%v] -> %v[%v]", vv.Kind(), vv.GetName(), v.Kind(), v.GetName()) - log.Printf("Compile: Adding %v", txt) - g.AddEdge(vv, v, NewEdge(txt)) - } else { // edges go the "normal" way, eg: pkg resource - txt := fmt.Sprintf("AutoEdge: %v[%v] -> %v[%v]", v.Kind(), v.GetName(), vv.Kind(), vv.GetName()) - log.Printf("Compile: Adding %v", txt) - g.AddEdge(v, vv, NewEdge(txt)) - } - found = true - break - } - } - result = append(result, found) - } - return result -} - -// AutoEdges adds the automatic edges to the graph. -func (g *Graph) AutoEdges() { - log.Println("Compile: Adding AutoEdges...") - for _, v := range g.GetVertices() { // for each vertexes autoedges - if !v.Meta().AutoEdge { // is the metaparam true? - continue - } - autoEdgeObj := v.AutoEdges() - if autoEdgeObj == nil { - log.Printf("%v[%v]: Config: No auto edges were found!", v.Kind(), v.GetName()) - continue // next vertex - } - - for { // while the autoEdgeObj has more uuids to add... - uuids := autoEdgeObj.Next() // get some! - if uuids == nil { - log.Printf("%v[%v]: Config: The auto edge list is empty!", v.Kind(), v.GetName()) - break // inner loop - } - if DEBUG { - log.Println("Compile: AutoEdge: UUIDS:") - for i, u := range uuids { - log.Printf("Compile: AutoEdge: UUID%d: %v", i, u) - } - } - - // match and add edges - result := g.addEdgesByMatchingUUIDS(v, uuids) - - // report back, and find out if we should continue - if !autoEdgeObj.Test(result) { - break - } - } - } -} - -// AutoGrouper is the required interface to implement for an autogroup algorithm -type AutoGrouper interface { - // listed in the order these are typically called in... - name() string // friendly identifier - init(*Graph) error // only call once - vertexNext() (*Vertex, *Vertex, error) // mostly algorithmic - vertexCmp(*Vertex, *Vertex) error // can we merge these ? - vertexMerge(*Vertex, *Vertex) (*Vertex, error) // vertex merge fn to use - edgeMerge(*Edge, *Edge) *Edge // edge merge fn to use - vertexTest(bool) (bool, error) // call until false -} - -// baseGrouper is the base type for implementing the AutoGrouper interface -type baseGrouper struct { - graph *Graph // store a pointer to the graph - vertices []*Vertex // cached list of vertices - i int - j int - done bool -} - -// name provides a friendly name for the logs to see -func (ag *baseGrouper) name() string { - return "baseGrouper" -} - -// init is called only once and before using other AutoGrouper interface methods -// the name method is the only exception: call it any time without side effects! -func (ag *baseGrouper) init(g *Graph) error { - if ag.graph != nil { - return fmt.Errorf("The init method has already been called!") - } - ag.graph = g // pointer - ag.vertices = ag.graph.GetVerticesSorted() // cache in deterministic order! - ag.i = 0 - ag.j = 0 - if len(ag.vertices) == 0 { // empty graph - ag.done = true - return nil - } - return nil -} - -// vertexNext is a simple iterator that loops through vertex (pair) combinations -// an intelligent algorithm would selectively offer only valid pairs of vertices -// these should satisfy logical grouping requirements for the autogroup designs! -// the desired algorithms can override, but keep this method as a base iterator! -func (ag *baseGrouper) vertexNext() (v1, v2 *Vertex, err error) { - // this does a for v... { for w... { return v, w }} but stepwise! - l := len(ag.vertices) - if ag.i < l { - v1 = ag.vertices[ag.i] - } - if ag.j < l { - v2 = ag.vertices[ag.j] - } - - // in case the vertex was deleted - if !ag.graph.HasVertex(v1) { - v1 = nil - } - if !ag.graph.HasVertex(v2) { - v2 = nil - } - - // two nested loops... - if ag.j < l { - ag.j++ - } - if ag.j == l { - ag.j = 0 - if ag.i < l { - ag.i++ - } - if ag.i == l { - ag.done = true - } - } - - return -} - -func (ag *baseGrouper) vertexCmp(v1, v2 *Vertex) error { - if v1 == nil || v2 == nil { - return fmt.Errorf("Vertex is nil!") - } - if v1 == v2 { // skip yourself - return fmt.Errorf("Vertices are the same!") - } - if v1.Kind() != v2.Kind() { // we must group similar kinds - // TODO: maybe future resources won't need this limitation? - return fmt.Errorf("The two resources aren't the same kind!") - } - // someone doesn't want to group! - if !v1.Meta().AutoGroup || !v2.Meta().AutoGroup { - return fmt.Errorf("One of the autogroup flags is false!") - } - if v1.Res.IsGrouped() { // already grouped! - return fmt.Errorf("Already grouped!") - } - if len(v2.Res.GetGroup()) > 0 { // already has children grouped! - return fmt.Errorf("Already has groups!") - } - if !v1.Res.GroupCmp(v2.Res) { // resource groupcmp failed! - return fmt.Errorf("The GroupCmp failed!") - } - return nil // success -} - -func (ag *baseGrouper) vertexMerge(v1, v2 *Vertex) (v *Vertex, err error) { - // NOTE: it's important to use w.Res instead of w, b/c - // the w by itself is the *Vertex obj, not the *Res obj - // which is contained within it! They both satisfy the - // Res interface, which is why both will compile! :( - err = v1.Res.GroupRes(v2.Res) // GroupRes skips stupid groupings - return // success or fail, and no need to merge the actual vertices! -} - -func (ag *baseGrouper) edgeMerge(e1, e2 *Edge) *Edge { - return e1 // noop -} - -// vertexTest processes the results of the grouping for the algorithm to know -// return an error if something went horribly wrong, and bool false to stop -func (ag *baseGrouper) vertexTest(b bool) (bool, error) { - // NOTE: this particular baseGrouper version doesn't track what happens - // because since we iterate over every pair, we don't care which merge! - if ag.done { - return false, nil - } - return true, nil -} - -// TODO: this algorithm may not be correct in all cases. replace if needed! -type nonReachabilityGrouper struct { - baseGrouper // "inherit" what we want, and reimplement the rest -} - -func (ag *nonReachabilityGrouper) name() string { - return "nonReachabilityGrouper" -} - -// this algorithm relies on the observation that if there's a path from a to b, -// then they *can't* be merged (b/c of the existing dependency) so therefore we -// merge anything that *doesn't* satisfy this condition or that of the reverse! -func (ag *nonReachabilityGrouper) vertexNext() (v1, v2 *Vertex, err error) { - for { - v1, v2, err = ag.baseGrouper.vertexNext() // get all iterable pairs - if err != nil { - log.Fatalf("Error running autoGroup(vertexNext): %v", err) - } - - if v1 != v2 { // ignore self cmp early (perf optimization) - // if NOT reachable, they're viable... - out1 := ag.graph.Reachability(v1, v2) - out2 := ag.graph.Reachability(v2, v1) - if len(out1) == 0 && len(out2) == 0 { - return // return v1 and v2, they're viable - } - } - - // if we got here, it means we're skipping over this candidate! - if ok, err := ag.baseGrouper.vertexTest(false); err != nil { - log.Fatalf("Error running autoGroup(vertexTest): %v", err) - } else if !ok { - return nil, nil, nil // done! - } - - // the vertexTest passed, so loop and try with a new pair... - } -} - -// autoGroup is the mechanical auto group "runner" that runs the interface spec -func (g *Graph) autoGroup(ag AutoGrouper) chan string { - strch := make(chan string) // output log messages here - go func(strch chan string) { - strch <- fmt.Sprintf("Compile: Grouping: Algorithm: %v...", ag.name()) - if err := ag.init(g); err != nil { - log.Fatalf("Error running autoGroup(init): %v", err) - } - - for { - var v, w *Vertex - v, w, err := ag.vertexNext() // get pair to compare - if err != nil { - log.Fatalf("Error running autoGroup(vertexNext): %v", err) - } - merged := false - // save names since they change during the runs - vStr := fmt.Sprintf("%s", v) // valid even if it is nil - wStr := fmt.Sprintf("%s", w) - - if err := ag.vertexCmp(v, w); err != nil { // cmp ? - if DEBUG { - strch <- fmt.Sprintf("Compile: Grouping: !GroupCmp for: %s into %s", wStr, vStr) - } - - // remove grouped vertex and merge edges (res is safe) - } else if err := g.VertexMerge(v, w, ag.vertexMerge, ag.edgeMerge); err != nil { // merge... - strch <- fmt.Sprintf("Compile: Grouping: !VertexMerge for: %s into %s", wStr, vStr) - - } else { // success! - strch <- fmt.Sprintf("Compile: Grouping: Success for: %s into %s", wStr, vStr) - merged = true // woo - } - - // did these get used? - if ok, err := ag.vertexTest(merged); err != nil { - log.Fatalf("Error running autoGroup(vertexTest): %v", err) - } else if !ok { - break // done! - } - } - - close(strch) - return - }(strch) // call function - return strch -} - -// AutoGroup runs the auto grouping on the graph and prints out log messages -func (g *Graph) AutoGroup() { - // receive log messages from channel... - // this allows test cases to avoid printing them when they're unwanted! - // TODO: this algorithm may not be correct in all cases. replace if needed! - for str := range g.autoGroup(&nonReachabilityGrouper{}) { - log.Println(str) - } -} diff --git a/configwatch.go b/configwatch.go index d3ce2cb0..6be3b523 100644 --- a/configwatch.go +++ b/configwatch.go @@ -18,14 +18,18 @@ package main import ( - "gopkg.in/fsnotify.v1" - //"github.com/go-fsnotify/fsnotify" // git master of "gopkg.in/fsnotify.v1" "log" "math" "path" "strings" "sync" "syscall" + + "github.com/purpleidea/mgmt/global" + "github.com/purpleidea/mgmt/util" + + "gopkg.in/fsnotify.v1" + //"github.com/go-fsnotify/fsnotify" // git master of "gopkg.in/fsnotify.v1" ) // ConfigWatcher returns events on a channel anytime one of its files events. @@ -105,18 +109,18 @@ func ConfigWatch(file string) chan bool { } defer watcher.Close() - patharray := PathSplit(safename) // tokenize the path - var index = len(patharray) // starting index - var current string // current "watcher" location - var deltaDepth int // depth delta between watcher and event - var send = false // send event? + patharray := util.PathSplit(safename) // tokenize the path + var index = len(patharray) // starting index + var current string // current "watcher" location + var deltaDepth int // depth delta between watcher and event + var send = false // send event? for { current = strings.Join(patharray[0:index], "/") if current == "" { // the empty string top is the root dir ("/") current = "/" } - if DEBUG { + if global.DEBUG { log.Printf("Watching: %v", current) // attempting to watch... } // initialize in the loop so that we can reset on rm-ed handles @@ -145,11 +149,11 @@ func ConfigWatch(file string) chan bool { if current == event.Name { deltaDepth = 0 // i was watching what i was looking for - } else if HasPathPrefix(event.Name, current) { - deltaDepth = len(PathSplit(current)) - len(PathSplit(event.Name)) // -1 or less + } else if util.HasPathPrefix(event.Name, current) { + deltaDepth = len(util.PathSplit(current)) - len(util.PathSplit(event.Name)) // -1 or less - } else if HasPathPrefix(current, event.Name) { - deltaDepth = len(PathSplit(event.Name)) - len(PathSplit(current)) // +1 or more + } else if util.HasPathPrefix(current, event.Name) { + deltaDepth = len(util.PathSplit(event.Name)) - len(util.PathSplit(current)) // +1 or more } else { // TODO different watchers get each others events! @@ -182,7 +186,7 @@ func ConfigWatch(file string) chan bool { } // if safename starts with event.Name, we're above, and no event should be sent - } else if HasPathPrefix(safename, event.Name) { + } else if util.HasPathPrefix(safename, event.Name) { //log.Println("Above!") if deltaDepth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) { @@ -193,7 +197,7 @@ func ConfigWatch(file string) chan bool { if deltaDepth < 0 { log.Println("Parent!") - if PathPrefixDelta(safename, event.Name) == 1 { // we're the parent dir + if util.PathPrefixDelta(safename, event.Name) == 1 { // we're the parent dir //send = true } watcher.Remove(current) @@ -201,7 +205,7 @@ func ConfigWatch(file string) chan bool { } // if event.Name startswith safename, send event, we're already deeper - } else if HasPathPrefix(event.Name, safename) { + } else if util.HasPathPrefix(event.Name, safename) { //log.Println("Event2!") //send = true } diff --git a/converger.go b/converger/converger.go similarity index 98% rename from converger.go rename to converger/converger.go index 513919b4..aac5bde3 100644 --- a/converger.go +++ b/converger/converger.go @@ -15,12 +15,15 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package main +// Package converger is a facility for reporting the converged state. +package converger import ( "fmt" "sync" "time" + + "github.com/purpleidea/mgmt/util" ) // TODO: we could make a new function that masks out the state of certain @@ -248,9 +251,9 @@ func (obj *converger) ConvergedTimer(uuid ConvergerUUID) <-chan time.Time { // we have a low timeout, or in particular a timeout == 0 if uuid.IsConverged() { // blocks the case statement in select forever! - return TimeAfterOrBlock(-1) + return util.TimeAfterOrBlock(-1) } - return TimeAfterOrBlock(obj.timeout) + return util.TimeAfterOrBlock(obj.timeout) } // Status returns a map of the converged status of each UUID. diff --git a/doc.go b/doc.go new file mode 100644 index 00000000..23ac778c --- /dev/null +++ b/doc.go @@ -0,0 +1,19 @@ +// Mgmt +// Copyright (C) 2013-2016+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +// Package main provides the main entrypoint for using the `mgmt` software. +package main diff --git a/etcd.go b/etcd/etcd.go similarity index 96% rename from etcd.go rename to etcd/etcd.go index ffda4bb3..ba1eb913 100644 --- a/etcd.go +++ b/etcd/etcd.go @@ -24,6 +24,17 @@ // TODO: Auto assign ports/ip's for peers (if possible) // TODO: Fix godoc +// Package etcd implements the distributed key value store integration. +// This also takes care of managing and clustering the embedded etcd server. +// The elastic etcd algorithm works in the following way: +// * When you start up mgmt, you can pass it a list of seeds. +// * If no seeds are given, then assume you are the first server and startup. +// * If a seed is given, connect as a client, and optionally volunteer to be a server. +// * All volunteering clients should listen for a message from the master for nomination. +// * If a client has been nominated, it should startup a server. +// * All servers should list for their nomination to be removed and shutdown if so. +// * The elected leader should decide who to nominate/unnominate to keep the right number of servers. +// // Smoke testing: // ./mgmt run --file examples/etcd1a.yaml --hostname h1 // ./mgmt run --file examples/etcd1b.yaml --hostname h2 --tmp-prefix --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2381 --server-urls http://127.0.0.1:2382 @@ -33,16 +44,7 @@ // ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 member list // ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 put /_mgmt/idealClusterSize 5 // ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 member list - -// The elastic etcd algorithm works in the following way: -// * When you start up mgmt, you can pass it a list of seeds. -// * If no seeds are given, then assume you are the first server and startup. -// * If a seed is given, connect as a client, and optionally volunteer to be a server. -// * All volunteering clients should listen for a message from the master for nomination. -// * If a client has been nominated, it should startup a server. -// * All servers should list for their nomination to be removed and shutdown if so. -// * The elected leader should decide who to nominate/unnominate to keep the right number of servers. -package main +package etcd import ( "bytes" @@ -59,6 +61,12 @@ import ( "sync" "time" + "github.com/purpleidea/mgmt/converger" + "github.com/purpleidea/mgmt/event" + "github.com/purpleidea/mgmt/global" + "github.com/purpleidea/mgmt/resources" + "github.com/purpleidea/mgmt/util" + etcd "github.com/coreos/etcd/clientv3" // "clientv3" "github.com/coreos/etcd/embed" "github.com/coreos/etcd/etcdserver" @@ -78,7 +86,7 @@ const ( maxClientConnectRetries = 5 // number of times to retry consecutive connect failures selfRemoveTimeout = 3 // give unnominated members a chance to self exit exitDelay = 3 // number of sec of inactivity after exit to clean up - defaultIdealClusterSize = 5 // default ideal cluster size target for initial seed + DefaultIdealClusterSize = 5 // default ideal cluster size target for initial seed DefaultClientURL = "127.0.0.1:2379" DefaultServerURL = "127.0.0.1:2380" ) @@ -94,7 +102,7 @@ type AW struct { callback func(*RE) error errCheck bool skipConv bool // ask event to skip converger updates - resp Resp + resp event.Resp cancelFunc func() // data } @@ -116,7 +124,7 @@ type KV struct { key string value string opts []etcd.OpOption - resp Resp + resp event.Resp } // GQ is a struct for the get queue @@ -124,7 +132,7 @@ type GQ struct { path string skipConv bool opts []etcd.OpOption - resp Resp + resp event.Resp data map[string]string } @@ -132,7 +140,7 @@ type GQ struct { type DL struct { path string opts []etcd.OpOption - resp Resp + resp event.Resp data int64 } @@ -141,7 +149,7 @@ type TN struct { ifcmps []etcd.Cmp thenops []etcd.Op elseops []etcd.Op - resp Resp + resp event.Resp data *etcd.TxnResponse } @@ -181,8 +189,8 @@ type EmbdEtcd struct { // EMBeddeD etcd delq chan *DL // delete queue txnq chan *TN // txn queue - prefix string // folder prefix to use for misc storage - converger Converger // converged tracking + prefix string // folder prefix to use for misc storage + converger converger.Converger // converged tracking // etcd server related serverwg sync.WaitGroup // wait for server to shutdown @@ -191,7 +199,7 @@ type EmbdEtcd struct { // EMBeddeD etcd } // NewEmbdEtcd creates the top level embedded etcd struct client and server obj -func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs etcdtypes.URLs, noServer bool, idealClusterSize uint16, prefix string, converger Converger) *EmbdEtcd { +func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs etcdtypes.URLs, noServer bool, idealClusterSize uint16, prefix string, converger converger.Converger) *EmbdEtcd { endpoints := make(etcdtypes.URLsMap) if hostname == seedSentinel { // safety return nil @@ -264,7 +272,7 @@ func (obj *EmbdEtcd) GetConfig() etcd.Config { // Connect connects the client to a server, and then builds the *API structs. // If reconnect is true, it will force a reconnect with new config endpoints. func (obj *EmbdEtcd) Connect(reconnect bool) error { - if DEBUG { + if global.DEBUG { log.Println("Etcd: Connect...") } obj.cLock.Lock() @@ -520,29 +528,29 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context, var isTimeout = false var iter int // = 0 if ctxerr, ok := ctx.Value(ctxErr).(error); ok { - if DEBUG { + if global.DEBUG { log.Printf("Etcd: CtxError: err(%v), ctxerr(%v)", err, ctxerr) } if i, ok := ctx.Value(ctxIter).(int); ok { iter = i + 1 // load and increment - if DEBUG { + if global.DEBUG { log.Printf("Etcd: CtxError: Iter: %v", iter) } } isTimeout = err == context.DeadlineExceeded - if DEBUG { + if global.DEBUG { log.Printf("Etcd: CtxError: isTimeout: %v", isTimeout) } if !isTimeout { iter = 0 // reset timer } err = ctxerr // restore error - } else if DEBUG { + } else if global.DEBUG { log.Printf("Etcd: CtxError: No value found") } ctxHelper := func(tmin, texp, tmax int) context.Context { t := expBackoff(tmin, texp, iter, tmax) - if DEBUG { + if global.DEBUG { log.Printf("Etcd: CtxError: Timeout: %v", t) } @@ -629,13 +637,13 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context, fallthrough case isGrpc(grpc.ErrClientConnClosing): - if DEBUG { + if global.DEBUG { log.Printf("Etcd: CtxError: Error(%T): %+v", err, err) log.Printf("Etcd: Endpoints are: %v", obj.client.Endpoints()) log.Printf("Etcd: Client endpoints are: %v", obj.endpoints) } - if DEBUG { + if global.DEBUG { log.Printf("Etcd: CtxError: Locking...") } obj.rLock.Lock() @@ -656,7 +664,7 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context, obj.ctxErr = fmt.Errorf("Etcd: Permanent connect error: %v", err) return ctx, obj.ctxErr } - if DEBUG { + if global.DEBUG { log.Printf("Etcd: CtxError: Unlocking...") } obj.rLock.Unlock() @@ -700,7 +708,7 @@ func (obj *EmbdEtcd) CbLoop() { if !re.skipConv { // if we want to count it... cuuid.ResetTimer() // activity! } - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: CbLoop: Event: StartLoop") } for { @@ -708,11 +716,11 @@ func (obj *EmbdEtcd) CbLoop() { //re.resp.NACK() // nope! break } - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: CbLoop: rawCallback()") } err := rawCallback(ctx, re) - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: CbLoop: rawCallback(): %v", err) } if err == nil { @@ -724,7 +732,7 @@ func (obj *EmbdEtcd) CbLoop() { break // TODO: it's bad, break or return? } } - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: CbLoop: Event: FinishLoop") } @@ -752,11 +760,11 @@ func (obj *EmbdEtcd) Loop() { select { case aw := <-obj.awq: cuuid.ResetTimer() // activity! - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: Loop: PriorityAW: StartLoop") } obj.loopProcessAW(ctx, aw) - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: Loop: PriorityAW: FinishLoop") } continue // loop to drain the priority channel first! @@ -768,18 +776,18 @@ func (obj *EmbdEtcd) Loop() { // add watcher case aw := <-obj.awq: cuuid.ResetTimer() // activity! - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: Loop: AW: StartLoop") } obj.loopProcessAW(ctx, aw) - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: Loop: AW: FinishLoop") } // set kv pair case kv := <-obj.setq: cuuid.ResetTimer() // activity! - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: Loop: Set: StartLoop") } for { @@ -796,7 +804,7 @@ func (obj *EmbdEtcd) Loop() { break // TODO: it's bad, break or return? } } - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: Loop: Set: FinishLoop") } @@ -805,7 +813,7 @@ func (obj *EmbdEtcd) Loop() { if !gq.skipConv { cuuid.ResetTimer() // activity! } - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: Loop: Get: StartLoop") } for { @@ -823,14 +831,14 @@ func (obj *EmbdEtcd) Loop() { break // TODO: it's bad, break or return? } } - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: Loop: Get: FinishLoop") } // delete value case dl := <-obj.delq: cuuid.ResetTimer() // activity! - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: Loop: Delete: StartLoop") } for { @@ -848,14 +856,14 @@ func (obj *EmbdEtcd) Loop() { break // TODO: it's bad, break or return? } } - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: Loop: Delete: FinishLoop") } // run txn case tn := <-obj.txnq: cuuid.ResetTimer() // activity! - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: Loop: Txn: StartLoop") } for { @@ -873,7 +881,7 @@ func (obj *EmbdEtcd) Loop() { break // TODO: it's bad, break or return? } } - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: Loop: Txn: FinishLoop") } @@ -884,7 +892,7 @@ func (obj *EmbdEtcd) Loop() { // seconds of inactivity in this select switch, which // lets everything get bled dry to avoid blocking calls // which would otherwise block us from exiting cleanly! - obj.exitTimeout = TimeAfterOrBlock(exitDelay) + obj.exitTimeout = util.TimeAfterOrBlock(exitDelay) // exit loop commit case <-obj.exitTimeout: @@ -917,7 +925,7 @@ func (obj *EmbdEtcd) loopProcessAW(ctx context.Context, aw *AW) { // Set queues up a set operation to occur using our mainloop func (obj *EmbdEtcd) Set(key, value string, opts ...etcd.OpOption) error { - resp := NewResp() + resp := event.NewResp() obj.setq <- &KV{key: key, value: value, opts: opts, resp: resp} if err := resp.Wait(); err != nil { // wait for ack/nack return fmt.Errorf("Etcd: Set: Probably received an exit: %v", err) @@ -927,7 +935,7 @@ func (obj *EmbdEtcd) Set(key, value string, opts ...etcd.OpOption) error { // rawSet actually implements the key set operation func (obj *EmbdEtcd) rawSet(ctx context.Context, kv *KV) error { - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: rawSet()") } // key is the full key path @@ -936,7 +944,7 @@ func (obj *EmbdEtcd) rawSet(ctx context.Context, kv *KV) error { response, err := obj.client.KV.Put(ctx, kv.key, kv.value, kv.opts...) obj.rLock.RUnlock() log.Printf("Etcd: Set(%s): %v", kv.key, response) // w00t... bonus - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: rawSet(): %v", err) } return err @@ -951,7 +959,7 @@ func (obj *EmbdEtcd) Get(path string, opts ...etcd.OpOption) (map[string]string, // accept more arguments that are useful for the less common operations. // TODO: perhaps a get should never cause an un-converge ? func (obj *EmbdEtcd) ComplexGet(path string, skipConv bool, opts ...etcd.OpOption) (map[string]string, error) { - resp := NewResp() + resp := event.NewResp() gq := &GQ{path: path, skipConv: skipConv, opts: opts, resp: resp, data: nil} obj.getq <- gq // send if err := resp.Wait(); err != nil { // wait for ack/nack @@ -961,7 +969,7 @@ func (obj *EmbdEtcd) ComplexGet(path string, skipConv bool, opts ...etcd.OpOptio } func (obj *EmbdEtcd) rawGet(ctx context.Context, gq *GQ) (result map[string]string, err error) { - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: rawGet()") } obj.rLock.RLock() @@ -977,7 +985,7 @@ func (obj *EmbdEtcd) rawGet(ctx context.Context, gq *GQ) (result map[string]stri result[bytes.NewBuffer(x.Key).String()] = bytes.NewBuffer(x.Value).String() } - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: rawGet(): %v", result) } return @@ -985,7 +993,7 @@ func (obj *EmbdEtcd) rawGet(ctx context.Context, gq *GQ) (result map[string]stri // Delete performs a delete operation and waits for an ACK to continue func (obj *EmbdEtcd) Delete(path string, opts ...etcd.OpOption) (int64, error) { - resp := NewResp() + resp := event.NewResp() dl := &DL{path: path, opts: opts, resp: resp, data: -1} obj.delq <- dl // send if err := resp.Wait(); err != nil { // wait for ack/nack @@ -995,7 +1003,7 @@ func (obj *EmbdEtcd) Delete(path string, opts ...etcd.OpOption) (int64, error) { } func (obj *EmbdEtcd) rawDelete(ctx context.Context, dl *DL) (count int64, err error) { - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: rawDelete()") } count = -1 @@ -1005,7 +1013,7 @@ func (obj *EmbdEtcd) rawDelete(ctx context.Context, dl *DL) (count int64, err er if err == nil { count = response.Deleted } - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: rawDelete(): %v", err) } return @@ -1013,7 +1021,7 @@ func (obj *EmbdEtcd) rawDelete(ctx context.Context, dl *DL) (count int64, err er // Txn performs a transaction and waits for an ACK to continue func (obj *EmbdEtcd) Txn(ifcmps []etcd.Cmp, thenops, elseops []etcd.Op) (*etcd.TxnResponse, error) { - resp := NewResp() + resp := event.NewResp() tn := &TN{ifcmps: ifcmps, thenops: thenops, elseops: elseops, resp: resp, data: nil} obj.txnq <- tn // send if err := resp.Wait(); err != nil { // wait for ack/nack @@ -1023,13 +1031,13 @@ func (obj *EmbdEtcd) Txn(ifcmps []etcd.Cmp, thenops, elseops []etcd.Op) (*etcd.T } func (obj *EmbdEtcd) rawTxn(ctx context.Context, tn *TN) (*etcd.TxnResponse, error) { - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: rawTxn()") } obj.rLock.RLock() response, err := obj.client.KV.Txn(ctx).If(tn.ifcmps...).Then(tn.thenops...).Else(tn.elseops...).Commit() obj.rLock.RUnlock() - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: rawTxn(): %v, %v", response, err) } return response, err @@ -1038,7 +1046,7 @@ func (obj *EmbdEtcd) rawTxn(ctx context.Context, tn *TN) (*etcd.TxnResponse, err // AddWatcher queues up an add watcher request and returns a cancel function // Remember to add the etcd.WithPrefix() option if you want to watch recursively func (obj *EmbdEtcd) AddWatcher(path string, callback func(re *RE) error, errCheck bool, skipConv bool, opts ...etcd.OpOption) (func(), error) { - resp := NewResp() + resp := event.NewResp() awq := &AW{path: path, opts: opts, callback: callback, errCheck: errCheck, skipConv: skipConv, cancelFunc: nil, resp: resp} obj.awq <- awq // send if err := resp.Wait(); err != nil { // wait for ack/nack @@ -1063,7 +1071,7 @@ func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error) err := response.Err() isCanceled := response.Canceled || err == context.Canceled if response.Header.Revision == 0 { // by inspection - if DEBUG { + if global.DEBUG { log.Printf("Etcd: Watch: Received empty message!") // switched client connection } isCanceled = true @@ -1084,7 +1092,7 @@ func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error) } locked = false } else { - if DEBUG { + if global.DEBUG { log.Printf("Etcd: Watch: Error: %v", err) // probably fixable } // this new context is the fix for a tricky set @@ -1133,7 +1141,7 @@ func rawCallback(ctx context.Context, re *RE) error { // NOTE: the callback must *not* block! // FIXME: do we need to pass ctx in via *RE, or in the callback signature ? err = callback(re) // run the callback - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: rawCallback(): %v", err) } if !re.errCheck || err == nil { @@ -1151,7 +1159,7 @@ func rawCallback(ctx context.Context, re *RE) error { // FIXME: we might need to respond to member change/disconnect/shutdown events, // see: https://github.com/coreos/etcd/issues/5277 func (obj *EmbdEtcd) volunteerCallback(re *RE) error { - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: volunteerCallback()") defer log.Printf("Trace: Etcd: volunteerCallback(): Finished!") } @@ -1178,7 +1186,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error { if err != nil { return fmt.Errorf("Etcd: Members: Error: %+v", err) } - members := StrMapValuesUint64(membersMap) // get values + members := util.StrMapValuesUint64(membersMap) // get values log.Printf("Etcd: Members: List: %+v", members) // we only do *one* change operation at a time so that the cluster can @@ -1224,7 +1232,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error { log.Printf("Etcd: Volunteers: %v", volunteers) // unnominate anyone that unvolunteers, so that they can shutdown cleanly - quitters := StrFilterElementsInList(volunteers, members) + quitters := util.StrFilterElementsInList(volunteers, members) log.Printf("Etcd: Quitters: %v", quitters) // if we're the only member left, just shutdown... @@ -1236,7 +1244,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error { return nil } - candidates := StrFilterElementsInList(members, volunteers) + candidates := util.StrFilterElementsInList(members, volunteers) log.Printf("Etcd: Candidates: %v", candidates) // TODO: switch to < 0 so that we can shut the whole cluster down with 0 @@ -1291,7 +1299,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error { log.Printf("Etcd: Quitters: Shutting down %d members...", lq) } for _, quitter := range quitters { - mID, ok := Uint64KeyFromStrInMap(quitter, membersMap) + mID, ok := util.Uint64KeyFromStrInMap(quitter, membersMap) if !ok { // programming error log.Fatalf("Etcd: Member Remove: Error: %v(%v) not in members list!", quitter, mID) @@ -1339,7 +1347,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error { // nominateCallback runs to respond to the nomination list change events // functionally, it controls the starting and stopping of the server process func (obj *EmbdEtcd) nominateCallback(re *RE) error { - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: nominateCallback()") defer log.Printf("Trace: Etcd: nominateCallback(): Finished!") } @@ -1388,7 +1396,7 @@ func (obj *EmbdEtcd) nominateCallback(re *RE) error { _, exists := obj.nominated[obj.hostname] // FIXME: can we get rid of the len(obj.nominated) == 0 ? newCluster := len(obj.nominated) == 0 || (len(obj.nominated) == 1 && exists) - if DEBUG { + if global.DEBUG { log.Printf("Etcd: nominateCallback(): newCluster: %v; exists: %v; obj.server == nil: %t", newCluster, exists, obj.server == nil) } // XXX check if i have actually volunteered first of all... @@ -1487,7 +1495,7 @@ func (obj *EmbdEtcd) nominateCallback(re *RE) error { // endpointCallback runs to respond to the endpoint list change events func (obj *EmbdEtcd) endpointCallback(re *RE) error { - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: endpointCallback()") defer log.Printf("Trace: Etcd: endpointCallback(): Finished!") } @@ -1553,7 +1561,7 @@ func (obj *EmbdEtcd) endpointCallback(re *RE) error { // idealClusterSizeCallback runs to respond to the ideal cluster size changes func (obj *EmbdEtcd) idealClusterSizeCallback(re *RE) error { - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: idealClusterSizeCallback()") defer log.Printf("Trace: Etcd: idealClusterSizeCallback(): Finished!") } @@ -1692,7 +1700,7 @@ func (obj *EmbdEtcd) DestroyServer() error { // EtcdNominate nominates a particular client to be a server (peer) func EtcdNominate(obj *EmbdEtcd, hostname string, urls etcdtypes.URLs) error { - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: EtcdNominate(%v): %v", hostname, urls.String()) defer log.Printf("Trace: Etcd: EtcdNominate(%v): Finished!", hostname) } @@ -1734,7 +1742,7 @@ func EtcdNominated(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { return nil, fmt.Errorf("Etcd: Nominated: Data format error!: %v", err) } nominated[name] = urls // add to map - if DEBUG { + if global.DEBUG { log.Printf("Etcd: Nominated(%v): %v", name, val) } } @@ -1743,7 +1751,7 @@ func EtcdNominated(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { // EtcdVolunteer offers yourself up to be a server if needed func EtcdVolunteer(obj *EmbdEtcd, urls etcdtypes.URLs) error { - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: EtcdVolunteer(%v): %v", obj.hostname, urls.String()) defer log.Printf("Trace: Etcd: EtcdVolunteer(%v): Finished!", obj.hostname) } @@ -1766,7 +1774,7 @@ func EtcdVolunteer(obj *EmbdEtcd, urls etcdtypes.URLs) error { // EtcdVolunteers returns a urls map of available etcd server volunteers func EtcdVolunteers(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: EtcdVolunteers()") defer log.Printf("Trace: Etcd: EtcdVolunteers(): Finished!") } @@ -1789,7 +1797,7 @@ func EtcdVolunteers(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { return nil, fmt.Errorf("Etcd: Volunteers: Data format error!: %v", err) } volunteers[name] = urls // add to map - if DEBUG { + if global.DEBUG { log.Printf("Etcd: Volunteer(%v): %v", name, val) } } @@ -1798,7 +1806,7 @@ func EtcdVolunteers(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { // EtcdAdvertiseEndpoints advertises the list of available client endpoints func EtcdAdvertiseEndpoints(obj *EmbdEtcd, urls etcdtypes.URLs) error { - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: EtcdAdvertiseEndpoints(%v): %v", obj.hostname, urls.String()) defer log.Printf("Trace: Etcd: EtcdAdvertiseEndpoints(%v): Finished!", obj.hostname) } @@ -1821,7 +1829,7 @@ func EtcdAdvertiseEndpoints(obj *EmbdEtcd, urls etcdtypes.URLs) error { // EtcdEndpoints returns a urls map of available etcd server endpoints func EtcdEndpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: EtcdEndpoints()") defer log.Printf("Trace: Etcd: EtcdEndpoints(): Finished!") } @@ -1844,7 +1852,7 @@ func EtcdEndpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { return nil, fmt.Errorf("Etcd: Endpoints: Data format error!: %v", err) } endpoints[name] = urls // add to map - if DEBUG { + if global.DEBUG { log.Printf("Etcd: Endpoint(%v): %v", name, val) } } @@ -1853,7 +1861,7 @@ func EtcdEndpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { // EtcdSetHostnameConverged sets whether a specific hostname is converged. func EtcdSetHostnameConverged(obj *EmbdEtcd, hostname string, isConverged bool) error { - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: EtcdSetHostnameConverged(%s): %v", hostname, isConverged) defer log.Printf("Trace: Etcd: EtcdSetHostnameConverged(%v): Finished!", hostname) } @@ -1867,7 +1875,7 @@ func EtcdSetHostnameConverged(obj *EmbdEtcd, hostname string, isConverged bool) // EtcdHostnameConverged returns a map of every hostname's converged state. func EtcdHostnameConverged(obj *EmbdEtcd) (map[string]bool, error) { - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: EtcdHostnameConverged()") defer log.Printf("Trace: Etcd: EtcdHostnameConverged(): Finished!") } @@ -1912,7 +1920,7 @@ func EtcdAddHostnameConvergedWatcher(obj *EmbdEtcd, callbackFn func(map[string]b // EtcdSetClusterSize sets the ideal target cluster size of etcd peers func EtcdSetClusterSize(obj *EmbdEtcd, value uint16) error { - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: EtcdSetClusterSize(): %v", value) defer log.Printf("Trace: Etcd: EtcdSetClusterSize(): Finished!") } @@ -2006,7 +2014,7 @@ func EtcdMembers(obj *EmbdEtcd) (map[uint64]string, error) { return nil, fmt.Errorf("Exiting...") } obj.rLock.RLock() - if TRACE { + if global.TRACE { log.Printf("Trace: Etcd: EtcdMembers(): Endpoints are: %v", obj.client.Endpoints()) } response, err = obj.client.MemberList(ctx) @@ -2100,7 +2108,7 @@ func EtcdWatch(obj *EmbdEtcd) chan bool { } // EtcdSetResources exports all of the resources which we pass in to etcd -func EtcdSetResources(obj *EmbdEtcd, hostname string, resources []Res) error { +func EtcdSetResources(obj *EmbdEtcd, hostname string, resourceList []resources.Res) error { // key structure is /$NS/exported/$hostname/resources/$uuid = $data var kindFilter []string // empty to get from everyone @@ -2112,19 +2120,19 @@ func EtcdSetResources(obj *EmbdEtcd, hostname string, resources []Res) error { return err } - if len(originals) == 0 && len(resources) == 0 { // special case of no add or del + if len(originals) == 0 && len(resourceList) == 0 { // special case of no add or del return nil } ifs := []etcd.Cmp{} // list matching the desired state ops := []etcd.Op{} // list of ops in this transaction - for _, res := range resources { + for _, res := range resourceList { if res.Kind() == "" { log.Fatalf("Etcd: SetResources: Error: Empty kind: %v", res.GetName()) } uuid := fmt.Sprintf("%s/%s", res.Kind(), res.GetName()) path := fmt.Sprintf("/%s/exported/%s/resources/%s", NS, hostname, uuid) - if data, err := ResToB64(res); err == nil { + if data, err := resources.ResToB64(res); err == nil { ifs = append(ifs, etcd.Compare(etcd.Value(path), "=", data)) // desired state ops = append(ops, etcd.OpPut(path, data)) } else { @@ -2132,8 +2140,8 @@ func EtcdSetResources(obj *EmbdEtcd, hostname string, resources []Res) error { } } - match := func(res Res, resources []Res) bool { // helper lambda - for _, x := range resources { + match := func(res resources.Res, resourceList []resources.Res) bool { // helper lambda + for _, x := range resourceList { if res.Kind() == x.Kind() && res.GetName() == x.GetName() { return true } @@ -2150,7 +2158,7 @@ func EtcdSetResources(obj *EmbdEtcd, hostname string, resources []Res) error { uuid := fmt.Sprintf("%s/%s", res.Kind(), res.GetName()) path := fmt.Sprintf("/%s/exported/%s/resources/%s", NS, hostname, uuid) - if match(res, resources) { // if we match, no need to delete! + if match(res, resourceList) { // if we match, no need to delete! continue } @@ -2175,10 +2183,10 @@ func EtcdSetResources(obj *EmbdEtcd, hostname string, resources []Res) error { // TODO: Expand this with a more powerful filter based on what we eventually // support in our collect DSL. Ideally a server side filter like WithFilter() // We could do this if the pattern was /$NS/exported/$kind/$hostname/$uuid = $data -func EtcdGetResources(obj *EmbdEtcd, hostnameFilter, kindFilter []string) ([]Res, error) { +func EtcdGetResources(obj *EmbdEtcd, hostnameFilter, kindFilter []string) ([]resources.Res, error) { // key structure is /$NS/exported/$hostname/resources/$uuid = $data path := fmt.Sprintf("/%s/exported/", NS) - resources := []Res{} + resourceList := []resources.Res{} keyMap, err := obj.Get(path, etcd.WithPrefix(), etcd.WithSort(etcd.SortByKey, etcd.SortAscend)) if err != nil { return nil, fmt.Errorf("Etcd: GetResources: Error: Could not get resources: %v", err) @@ -2201,24 +2209,24 @@ func EtcdGetResources(obj *EmbdEtcd, hostnameFilter, kindFilter []string) ([]Res } // FIXME: ideally this would be a server side filter instead! - if len(hostnameFilter) > 0 && !StrInList(hostname, hostnameFilter) { + if len(hostnameFilter) > 0 && !util.StrInList(hostname, hostnameFilter) { continue } // FIXME: ideally this would be a server side filter instead! - if len(kindFilter) > 0 && !StrInList(kind, kindFilter) { + if len(kindFilter) > 0 && !util.StrInList(kind, kindFilter) { continue } - if obj, err := B64ToRes(val); err == nil { - obj.setKind(kind) // cheap init + if obj, err := resources.B64ToRes(val); err == nil { + obj.SetKind(kind) // cheap init log.Printf("Etcd: Get: (Hostname, Kind, Name): (%s, %s, %s)", hostname, kind, name) - resources = append(resources, obj) + resourceList = append(resourceList, obj) } else { return nil, fmt.Errorf("Etcd: GetResources: Error: Can't convert from B64: %v", err) } } - return resources, nil + return resourceList, nil } //func UrlRemoveScheme(urls etcdtypes.URLs) []string { @@ -2258,7 +2266,7 @@ func ApplyDeltaEvents(re *RE, urlsmap etcdtypes.URLsMap) (etcdtypes.URLsMap, err if _, exists := urlsmap[key]; !exists { // this can happen if we retry an operation b/w // a reconnect so ignore if we are reconnecting - if DEBUG { + if global.DEBUG { log.Printf("Etcd: ApplyDeltaEvents: Inconsistent key: %v", key) } return nil, errApplyDeltaEventsInconsistent diff --git a/event.go b/event/event.go similarity index 91% rename from event.go rename to event/event.go index bf2868d4..f6a4d542 100644 --- a/event.go +++ b/event/event.go @@ -15,22 +15,23 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package main +// Package event provides some primitives that are used for message passing. +package event import ( "fmt" ) -//go:generate stringer -type=eventName -output=eventname_stringer.go -type eventName int +//go:generate stringer -type=EventName -output=eventname_stringer.go +type EventName int const ( - eventNil eventName = iota - eventExit - eventStart - eventPause - eventPoke - eventBackPoke + EventNil EventName = iota + EventExit + EventStart + EventPause + EventPoke + EventBackPoke ) // Resp is a channel to be used for boolean responses. A nil represents an ACK, @@ -39,7 +40,7 @@ type Resp chan error // Event is the main struct that stores event information and responses. type Event struct { - Name eventName + Name EventName Resp Resp // channel to send an ack response on, nil to skip //Wg *sync.WaitGroup // receiver barrier to Wait() for everyone else on Msg string // some words for fun diff --git a/gconfig/gconfig.go b/gconfig/gconfig.go new file mode 100644 index 00000000..3b2c1b17 --- /dev/null +++ b/gconfig/gconfig.go @@ -0,0 +1,268 @@ +// Mgmt +// Copyright (C) 2013-2016+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +// Package gconfig provides the facilities for loading a graph from a yaml file. +package gconfig + +import ( + "errors" + "fmt" + "io/ioutil" + "log" + "reflect" + "strings" + + "github.com/purpleidea/mgmt/etcd" + "github.com/purpleidea/mgmt/event" + "github.com/purpleidea/mgmt/global" + "github.com/purpleidea/mgmt/pgraph" + "github.com/purpleidea/mgmt/resources" + "github.com/purpleidea/mgmt/util" + + "gopkg.in/yaml.v2" +) + +type collectorResConfig struct { + Kind string `yaml:"kind"` + Pattern string `yaml:"pattern"` // XXX: Not Implemented +} + +type vertexConfig struct { + Kind string `yaml:"kind"` + Name string `yaml:"name"` +} + +type edgeConfig struct { + Name string `yaml:"name"` + From vertexConfig `yaml:"from"` + To vertexConfig `yaml:"to"` +} + +// GraphConfig is the data structure that describes a single graph to run. +type GraphConfig struct { + Graph string `yaml:"graph"` + Resources struct { + Noop []*resources.NoopRes `yaml:"noop"` + Pkg []*resources.PkgRes `yaml:"pkg"` + File []*resources.FileRes `yaml:"file"` + Svc []*resources.SvcRes `yaml:"svc"` + Exec []*resources.ExecRes `yaml:"exec"` + Timer []*resources.TimerRes `yaml:"timer"` + } `yaml:"resources"` + Collector []collectorResConfig `yaml:"collect"` + Edges []edgeConfig `yaml:"edges"` + Comment string `yaml:"comment"` + Hostname string `yaml:"hostname"` // uuid for the host + Remote string `yaml:"remote"` +} + +// Parse parses a data stream into the graph structure. +func (c *GraphConfig) Parse(data []byte) error { + if err := yaml.Unmarshal(data, c); err != nil { + return err + } + if c.Graph == "" { + return errors.New("Graph config: invalid `graph`") + } + return nil +} + +// ParseConfigFromFile takes a filename and returns the graph config structure. +func ParseConfigFromFile(filename string) *GraphConfig { + data, err := ioutil.ReadFile(filename) + if err != nil { + log.Printf("Config: Error: ParseConfigFromFile: File: %v", err) + return nil + } + + var config GraphConfig + if err := config.Parse(data); err != nil { + log.Printf("Config: Error: ParseConfigFromFile: Parse: %v", err) + return nil + } + + return &config +} + +// NewGraphFromConfig returns a new graph from existing input, such as from the +// existing graph, and a GraphConfig struct. +func (c *GraphConfig) NewGraphFromConfig(g *pgraph.Graph, embdEtcd *etcd.EmbdEtcd, noop bool) (*pgraph.Graph, error) { + if c.Hostname == "" { + return nil, fmt.Errorf("Config: Error: Hostname can't be empty!") + } + + var graph *pgraph.Graph // new graph to return + if g == nil { // FIXME: how can we check for an empty graph? + graph = pgraph.NewGraph("Graph") // give graph a default name + } else { + graph = g.Copy() // same vertices, since they're pointers! + } + + var lookup = make(map[string]map[string]*pgraph.Vertex) + + //log.Printf("%+v", config) // debug + + // TODO: if defined (somehow)... + graph.SetName(c.Graph) // set graph name + + var keep []*pgraph.Vertex // list of vertex which are the same in new graph + var resourceList []resources.Res // list of resources to export + // use reflection to avoid duplicating code... better options welcome! + value := reflect.Indirect(reflect.ValueOf(c.Resources)) + vtype := value.Type() + for i := 0; i < vtype.NumField(); i++ { // number of fields in struct + name := vtype.Field(i).Name // string of field name + field := value.FieldByName(name) + iface := field.Interface() // interface type of value + slice := reflect.ValueOf(iface) + // XXX: should we just drop these everywhere and have the kind strings be all lowercase? + kind := util.FirstToUpper(name) + if global.DEBUG { + log.Printf("Config: Processing: %v...", kind) + } + for j := 0; j < slice.Len(); j++ { // loop through resources of same kind + x := slice.Index(j).Interface() + res, ok := x.(resources.Res) // convert to Res type + if !ok { + return nil, fmt.Errorf("Config: Error: Can't convert: %v of type: %T to Res.", x, x) + } + if noop { + res.Meta().Noop = noop + } + if _, exists := lookup[kind]; !exists { + lookup[kind] = make(map[string]*pgraph.Vertex) + } + // XXX: should we export based on a @@ prefix, or a metaparam + // like exported => true || exported => (host pattern)||(other pattern?) + if !strings.HasPrefix(res.GetName(), "@@") { // not exported resource + // XXX: we don't have a way of knowing if any of the + // metaparams are undefined, and as a result to set the + // defaults that we want! I hate the go yaml parser!!! + v := graph.GetVertexMatch(res) + if v == nil { // no match found + res.Init() + v = pgraph.NewVertex(res) + graph.AddVertex(v) // call standalone in case not part of an edge + } + lookup[kind][res.GetName()] = v // used for constructing edges + keep = append(keep, v) // append + + } else if !noop { // do not export any resources if noop + // store for addition to etcd storage... + res.SetName(res.GetName()[2:]) //slice off @@ + res.SetKind(kind) // cheap init + resourceList = append(resourceList, res) + } + } + } + // store in etcd + if err := etcd.EtcdSetResources(embdEtcd, c.Hostname, resourceList); err != nil { + return nil, fmt.Errorf("Config: Could not export resources: %v", err) + } + + // lookup from etcd + var hostnameFilter []string // empty to get from everyone + kindFilter := []string{} + for _, t := range c.Collector { + // XXX: should we just drop these everywhere and have the kind strings be all lowercase? + kind := util.FirstToUpper(t.Kind) + kindFilter = append(kindFilter, kind) + } + // do all the graph look ups in one single step, so that if the etcd + // database changes, we don't have a partial state of affairs... + if len(kindFilter) > 0 { // if kindFilter is empty, don't need to do lookups! + var err error + resourceList, err = etcd.EtcdGetResources(embdEtcd, hostnameFilter, kindFilter) + if err != nil { + return nil, fmt.Errorf("Config: Could not collect resources: %v", err) + } + } + for _, res := range resourceList { + matched := false + // see if we find a collect pattern that matches + for _, t := range c.Collector { + // XXX: should we just drop these everywhere and have the kind strings be all lowercase? + kind := util.FirstToUpper(t.Kind) + // use t.Kind and optionally t.Pattern to collect from etcd storage + log.Printf("Collect: %v; Pattern: %v", kind, t.Pattern) + + // XXX: expand to more complex pattern matching here... + if res.Kind() != kind { + continue + } + + if matched { + // we've already matched this resource, should we match again? + log.Printf("Config: Warning: Matching %v[%v] again!", kind, res.GetName()) + } + matched = true + + // collect resources but add the noop metaparam + if noop { + res.Meta().Noop = noop + } + + if t.Pattern != "" { // XXX: simplistic for now + res.CollectPattern(t.Pattern) // res.Dirname = t.Pattern + } + + log.Printf("Collect: %v[%v]: collected!", kind, res.GetName()) + + // XXX: similar to other resource add code: + if _, exists := lookup[kind]; !exists { + lookup[kind] = make(map[string]*pgraph.Vertex) + } + v := graph.GetVertexMatch(res) + if v == nil { // no match found + res.Init() // initialize go channels or things won't work!!! + v = pgraph.NewVertex(res) + graph.AddVertex(v) // call standalone in case not part of an edge + } + lookup[kind][res.GetName()] = v // used for constructing edges + keep = append(keep, v) // append + + //break // let's see if another resource even matches + } + } + + // get rid of any vertices we shouldn't "keep" (that aren't in new graph) + for _, v := range graph.GetVertices() { + if !pgraph.VertexContains(v, keep) { + // wait for exit before starting new graph! + v.SendEvent(event.EventExit, true, false) + graph.DeleteVertex(v) + } + } + + for _, e := range c.Edges { + if _, ok := lookup[util.FirstToUpper(e.From.Kind)]; !ok { + return nil, fmt.Errorf("Can't find 'from' resource!") + } + if _, ok := lookup[util.FirstToUpper(e.To.Kind)]; !ok { + return nil, fmt.Errorf("Can't find 'to' resource!") + } + if _, ok := lookup[util.FirstToUpper(e.From.Kind)][e.From.Name]; !ok { + return nil, fmt.Errorf("Can't find 'from' name!") + } + if _, ok := lookup[util.FirstToUpper(e.To.Kind)][e.To.Name]; !ok { + return nil, fmt.Errorf("Can't find 'to' name!") + } + graph.AddEdge(lookup[util.FirstToUpper(e.From.Kind)][e.From.Name], lookup[util.FirstToUpper(e.To.Kind)][e.To.Name], pgraph.NewEdge(e.Name)) + } + + return graph, nil +} diff --git a/global/global.go b/global/global.go new file mode 100644 index 00000000..7ec1efa1 --- /dev/null +++ b/global/global.go @@ -0,0 +1,25 @@ +// Mgmt +// Copyright (C) 2013-2016+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +// Package global holds some global variables that are used throughout the code. +package global + +const ( + DEBUG = false // add additional log messages + TRACE = false // add execution flow log messages + VERBOSE = false // add extra log message output +) diff --git a/main.go b/main.go index fe65f7d3..760f60f9 100644 --- a/main.go +++ b/main.go @@ -19,9 +19,6 @@ package main import ( "fmt" - etcdtypes "github.com/coreos/etcd/pkg/types" - "github.com/coreos/pkg/capnslog" - "github.com/urfave/cli" "io/ioutil" "log" "os" @@ -29,6 +26,19 @@ import ( "sync" "syscall" "time" + + "github.com/purpleidea/mgmt/converger" + "github.com/purpleidea/mgmt/etcd" + "github.com/purpleidea/mgmt/gconfig" + "github.com/purpleidea/mgmt/global" + "github.com/purpleidea/mgmt/pgraph" + "github.com/purpleidea/mgmt/puppet" + "github.com/purpleidea/mgmt/remote" + "github.com/purpleidea/mgmt/util" + + etcdtypes "github.com/coreos/etcd/pkg/types" + "github.com/coreos/pkg/capnslog" + "github.com/urfave/cli" ) // set at compile time @@ -38,13 +48,6 @@ var ( prefix = fmt.Sprintf("/var/lib/%s/", program) ) -// variables controlling verbosity -const ( - DEBUG = false // add additional log messages - TRACE = false // add execution flow log messages - VERBOSE = false // add extra log message output -) - // signal handler func waitForSignal(exit chan bool) { signals := make(chan os.Signal, 1) @@ -73,7 +76,7 @@ func run(c *cli.Context) error { hostname, _ := os.Hostname() // allow passing in the hostname, instead of using --hostname if c.IsSet("file") { - if config := ParseConfigFromFile(c.String("file")); config != nil { + if config := gconfig.ParseConfigFromFile(c.String("file")); config != nil { if h := config.Hostname; h != "" { hostname = h } @@ -87,21 +90,21 @@ func run(c *cli.Context) error { noop := c.Bool("noop") seeds, err := etcdtypes.NewURLs( - FlattenListWithSplit(c.StringSlice("seeds"), []string{",", ";", " "}), + util.FlattenListWithSplit(c.StringSlice("seeds"), []string{",", ";", " "}), ) if err != nil && len(c.StringSlice("seeds")) > 0 { log.Printf("Main: Error: seeds didn't parse correctly!") return cli.NewExitError("", 1) } clientURLs, err := etcdtypes.NewURLs( - FlattenListWithSplit(c.StringSlice("client-urls"), []string{",", ";", " "}), + util.FlattenListWithSplit(c.StringSlice("client-urls"), []string{",", ";", " "}), ) if err != nil && len(c.StringSlice("client-urls")) > 0 { log.Printf("Main: Error: clientURLs didn't parse correctly!") return cli.NewExitError("", 1) } serverURLs, err := etcdtypes.NewURLs( - FlattenListWithSplit(c.StringSlice("server-urls"), []string{",", ";", " "}), + util.FlattenListWithSplit(c.StringSlice("server-urls"), []string{",", ";", " "}), ) if err != nil && len(c.StringSlice("server-urls")) > 0 { log.Printf("Main: Error: serverURLs didn't parse correctly!") @@ -171,7 +174,7 @@ func run(c *cli.Context) error { var wg sync.WaitGroup exit := make(chan bool) // exit signal - var G, fullGraph *Graph + var G, fullGraph *pgraph.Graph // exit after `max-runtime` seconds for no reason at all... if i := c.Int("max-runtime"); i > 0 { @@ -182,7 +185,7 @@ func run(c *cli.Context) error { } // setup converger - converger := NewConverger( + converger := converger.NewConverger( c.Int("converged-timeout"), nil, // stateFn gets added in by EmbdEtcd ) @@ -194,7 +197,7 @@ func run(c *cli.Context) error { } else { log.Printf("Main: Seeds(%v): %v", len(seeds), seeds) } - EmbdEtcd := NewEmbdEtcd( + EmbdEtcd := etcd.NewEmbdEtcd( hostname, seeds, clientURLs, @@ -225,7 +228,7 @@ func run(c *cli.Context) error { return nil } // send our individual state into etcd for others to see - return EtcdSetHostnameConverged(EmbdEtcd, hostname, b) // TODO: what should happen on error? + return etcd.EtcdSetHostnameConverged(EmbdEtcd, hostname, b) // TODO: what should happen on error? } if EmbdEtcd != nil { converger.SetStateFn(convergerStateFn) @@ -241,11 +244,11 @@ func run(c *cli.Context) error { if !c.Bool("no-watch") && c.IsSet("file") { configchan = ConfigWatch(file) } else if c.IsSet("puppet") { - interval := PuppetInterval(c.String("puppet-conf")) + interval := puppet.PuppetInterval(c.String("puppet-conf")) puppetchan = time.Tick(time.Duration(interval) * time.Second) } log.Println("Etcd: Starting...") - etcdchan := EtcdWatch(EmbdEtcd) + etcdchan := etcd.EtcdWatch(EmbdEtcd) first := true // first loop or not for { log.Println("Main: Waiting...") @@ -272,11 +275,11 @@ func run(c *cli.Context) error { return } - var config *GraphConfig + var config *gconfig.GraphConfig if c.IsSet("file") { - config = ParseConfigFromFile(file) + config = gconfig.ParseConfigFromFile(file) } else if c.IsSet("puppet") { - config = ParseConfigFromPuppet(c.String("puppet"), c.String("puppet-conf")) + config = puppet.ParseConfigFromPuppet(c.String("puppet"), c.String("puppet-conf")) } if config == nil { log.Printf("Config: Parse failure") @@ -297,7 +300,7 @@ func run(c *cli.Context) error { // build graph from yaml file on events (eg: from etcd) // we need the vertices to be paused to work on them - if newFullgraph, err := fullGraph.NewGraphFromConfig(config, EmbdEtcd, noop); err == nil { // keep references to all original elements + if newFullgraph, err := config.NewGraphFromConfig(fullGraph, EmbdEtcd, noop); err == nil { // keep references to all original elements fullGraph = newFullgraph } else { log.Printf("Config: Error making new graph from config: %v", err) @@ -344,13 +347,13 @@ func run(c *cli.Context) error { // initialize the add watcher, which calls the f callback on map changes convergerCb := func(f func(map[string]bool) error) (func(), error) { - return EtcdAddHostnameConvergedWatcher(EmbdEtcd, f) + return etcd.EtcdAddHostnameConvergedWatcher(EmbdEtcd, f) } // build remotes struct for remote ssh - remotes := NewRemotes( + remotes := remote.NewRemotes( EmbdEtcd.LocalhostClientURLs().StringSlice(), - []string{DefaultClientURL}, + []string{etcd.DefaultClientURL}, noop, c.StringSlice("remote"), // list of files events, // watch for file changes @@ -362,6 +365,7 @@ func run(c *cli.Context) error { prefix, converger, convergerCb, + program, ) // TODO: is there any benefit to running the remotes above in the loop? @@ -390,7 +394,7 @@ func run(c *cli.Context) error { log.Printf("Etcd exited poorly with: %v", err) } - if DEBUG { + if global.DEBUG { log.Printf("Graph: %v", G) } @@ -403,7 +407,7 @@ func run(c *cli.Context) error { func main() { var flags int - if DEBUG || true { // TODO: remove || true + if global.DEBUG || true { // TODO: remove || true flags = log.LstdFlags | log.Lshortfile } flags = (flags - log.Ldate) // remove the date for now @@ -411,7 +415,7 @@ func main() { // un-hijack from capnslog... log.SetOutput(os.Stderr) - if VERBOSE { + if global.VERBOSE { capnslog.SetFormatter(capnslog.NewLogFormatter(os.Stderr, "(etcd) ", flags)) } else { capnslog.SetFormatter(capnslog.NewNilFormatter()) @@ -492,7 +496,7 @@ func main() { }, cli.IntFlag{ Name: "ideal-cluster-size", - Value: defaultIdealClusterSize, + Value: etcd.DefaultIdealClusterSize, Usage: "ideal number of server peers in cluster, only read by initial server", EnvVar: "MGMT_IDEAL_CLUSTER_SIZE", }, diff --git a/pgraph/autoedge.go b/pgraph/autoedge.go new file mode 100644 index 00000000..9dbde87f --- /dev/null +++ b/pgraph/autoedge.go @@ -0,0 +1,104 @@ +// Mgmt +// Copyright (C) 2013-2016+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +// Package pgraph represents the internal "pointer graph" that we use. +package pgraph + +import ( + "fmt" + "log" + + "github.com/purpleidea/mgmt/global" + "github.com/purpleidea/mgmt/resources" +) + +// add edges to the vertex in a graph based on if it matches a uuid list +func (g *Graph) addEdgesByMatchingUUIDS(v *Vertex, uuids []resources.ResUUID) []bool { + // search for edges and see what matches! + var result []bool + + // loop through each uuid, and see if it matches any vertex + for _, uuid := range uuids { + var found = false + // uuid is a ResUUID object + for _, vv := range g.GetVertices() { // search + if v == vv { // skip self + continue + } + if global.DEBUG { + log.Printf("Compile: AutoEdge: Match: %v[%v] with UUID: %v[%v]", vv.Kind(), vv.GetName(), uuid.Kind(), uuid.GetName()) + } + // we must match to an effective UUID for the resource, + // that is to say, the name value of a res is a helpful + // handle, but it is not necessarily a unique identity! + // remember, resources can return multiple UUID's each! + if resources.UUIDExistsInUUIDs(uuid, vv.GetUUIDs()) { + // add edge from: vv -> v + if uuid.Reversed() { + txt := fmt.Sprintf("AutoEdge: %v[%v] -> %v[%v]", vv.Kind(), vv.GetName(), v.Kind(), v.GetName()) + log.Printf("Compile: Adding %v", txt) + g.AddEdge(vv, v, NewEdge(txt)) + } else { // edges go the "normal" way, eg: pkg resource + txt := fmt.Sprintf("AutoEdge: %v[%v] -> %v[%v]", v.Kind(), v.GetName(), vv.Kind(), vv.GetName()) + log.Printf("Compile: Adding %v", txt) + g.AddEdge(v, vv, NewEdge(txt)) + } + found = true + break + } + } + result = append(result, found) + } + return result +} + +// AutoEdges adds the automatic edges to the graph. +func (g *Graph) AutoEdges() { + log.Println("Compile: Adding AutoEdges...") + for _, v := range g.GetVertices() { // for each vertexes autoedges + if !v.Meta().AutoEdge { // is the metaparam true? + continue + } + autoEdgeObj := v.AutoEdges() + if autoEdgeObj == nil { + log.Printf("%v[%v]: Config: No auto edges were found!", v.Kind(), v.GetName()) + continue // next vertex + } + + for { // while the autoEdgeObj has more uuids to add... + uuids := autoEdgeObj.Next() // get some! + if uuids == nil { + log.Printf("%v[%v]: Config: The auto edge list is empty!", v.Kind(), v.GetName()) + break // inner loop + } + if global.DEBUG { + log.Println("Compile: AutoEdge: UUIDS:") + for i, u := range uuids { + log.Printf("Compile: AutoEdge: UUID%d: %v", i, u) + } + } + + // match and add edges + result := g.addEdgesByMatchingUUIDS(v, uuids) + + // report back, and find out if we should continue + if !autoEdgeObj.Test(result) { + break + } + } + } +} diff --git a/pgraph/autogroup.go b/pgraph/autogroup.go new file mode 100644 index 00000000..6ae8f3a7 --- /dev/null +++ b/pgraph/autogroup.go @@ -0,0 +1,348 @@ +// Mgmt +// Copyright (C) 2013-2016+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgraph + +import ( + "fmt" + "log" + + "github.com/purpleidea/mgmt/global" +) + +// AutoGrouper is the required interface to implement for an autogroup algorithm +type AutoGrouper interface { + // listed in the order these are typically called in... + name() string // friendly identifier + init(*Graph) error // only call once + vertexNext() (*Vertex, *Vertex, error) // mostly algorithmic + vertexCmp(*Vertex, *Vertex) error // can we merge these ? + vertexMerge(*Vertex, *Vertex) (*Vertex, error) // vertex merge fn to use + edgeMerge(*Edge, *Edge) *Edge // edge merge fn to use + vertexTest(bool) (bool, error) // call until false +} + +// baseGrouper is the base type for implementing the AutoGrouper interface +type baseGrouper struct { + graph *Graph // store a pointer to the graph + vertices []*Vertex // cached list of vertices + i int + j int + done bool +} + +// name provides a friendly name for the logs to see +func (ag *baseGrouper) name() string { + return "baseGrouper" +} + +// init is called only once and before using other AutoGrouper interface methods +// the name method is the only exception: call it any time without side effects! +func (ag *baseGrouper) init(g *Graph) error { + if ag.graph != nil { + return fmt.Errorf("The init method has already been called!") + } + ag.graph = g // pointer + ag.vertices = ag.graph.GetVerticesSorted() // cache in deterministic order! + ag.i = 0 + ag.j = 0 + if len(ag.vertices) == 0 { // empty graph + ag.done = true + return nil + } + return nil +} + +// vertexNext is a simple iterator that loops through vertex (pair) combinations +// an intelligent algorithm would selectively offer only valid pairs of vertices +// these should satisfy logical grouping requirements for the autogroup designs! +// the desired algorithms can override, but keep this method as a base iterator! +func (ag *baseGrouper) vertexNext() (v1, v2 *Vertex, err error) { + // this does a for v... { for w... { return v, w }} but stepwise! + l := len(ag.vertices) + if ag.i < l { + v1 = ag.vertices[ag.i] + } + if ag.j < l { + v2 = ag.vertices[ag.j] + } + + // in case the vertex was deleted + if !ag.graph.HasVertex(v1) { + v1 = nil + } + if !ag.graph.HasVertex(v2) { + v2 = nil + } + + // two nested loops... + if ag.j < l { + ag.j++ + } + if ag.j == l { + ag.j = 0 + if ag.i < l { + ag.i++ + } + if ag.i == l { + ag.done = true + } + } + + return +} + +func (ag *baseGrouper) vertexCmp(v1, v2 *Vertex) error { + if v1 == nil || v2 == nil { + return fmt.Errorf("Vertex is nil!") + } + if v1 == v2 { // skip yourself + return fmt.Errorf("Vertices are the same!") + } + if v1.Kind() != v2.Kind() { // we must group similar kinds + // TODO: maybe future resources won't need this limitation? + return fmt.Errorf("The two resources aren't the same kind!") + } + // someone doesn't want to group! + if !v1.Meta().AutoGroup || !v2.Meta().AutoGroup { + return fmt.Errorf("One of the autogroup flags is false!") + } + if v1.Res.IsGrouped() { // already grouped! + return fmt.Errorf("Already grouped!") + } + if len(v2.Res.GetGroup()) > 0 { // already has children grouped! + return fmt.Errorf("Already has groups!") + } + if !v1.Res.GroupCmp(v2.Res) { // resource groupcmp failed! + return fmt.Errorf("The GroupCmp failed!") + } + return nil // success +} + +func (ag *baseGrouper) vertexMerge(v1, v2 *Vertex) (v *Vertex, err error) { + // NOTE: it's important to use w.Res instead of w, b/c + // the w by itself is the *Vertex obj, not the *Res obj + // which is contained within it! They both satisfy the + // Res interface, which is why both will compile! :( + err = v1.Res.GroupRes(v2.Res) // GroupRes skips stupid groupings + return // success or fail, and no need to merge the actual vertices! +} + +func (ag *baseGrouper) edgeMerge(e1, e2 *Edge) *Edge { + return e1 // noop +} + +// vertexTest processes the results of the grouping for the algorithm to know +// return an error if something went horribly wrong, and bool false to stop +func (ag *baseGrouper) vertexTest(b bool) (bool, error) { + // NOTE: this particular baseGrouper version doesn't track what happens + // because since we iterate over every pair, we don't care which merge! + if ag.done { + return false, nil + } + return true, nil +} + +// TODO: this algorithm may not be correct in all cases. replace if needed! +type nonReachabilityGrouper struct { + baseGrouper // "inherit" what we want, and reimplement the rest +} + +func (ag *nonReachabilityGrouper) name() string { + return "nonReachabilityGrouper" +} + +// this algorithm relies on the observation that if there's a path from a to b, +// then they *can't* be merged (b/c of the existing dependency) so therefore we +// merge anything that *doesn't* satisfy this condition or that of the reverse! +func (ag *nonReachabilityGrouper) vertexNext() (v1, v2 *Vertex, err error) { + for { + v1, v2, err = ag.baseGrouper.vertexNext() // get all iterable pairs + if err != nil { + log.Fatalf("Error running autoGroup(vertexNext): %v", err) + } + + if v1 != v2 { // ignore self cmp early (perf optimization) + // if NOT reachable, they're viable... + out1 := ag.graph.Reachability(v1, v2) + out2 := ag.graph.Reachability(v2, v1) + if len(out1) == 0 && len(out2) == 0 { + return // return v1 and v2, they're viable + } + } + + // if we got here, it means we're skipping over this candidate! + if ok, err := ag.baseGrouper.vertexTest(false); err != nil { + log.Fatalf("Error running autoGroup(vertexTest): %v", err) + } else if !ok { + return nil, nil, nil // done! + } + + // the vertexTest passed, so loop and try with a new pair... + } +} + +// VertexMerge merges v2 into v1 by reattaching the edges where appropriate, +// and then by deleting v2 from the graph. Since more than one edge between two +// vertices is not allowed, duplicate edges are merged as well. an edge merge +// function can be provided if you'd like to control how you merge the edges! +func (g *Graph) VertexMerge(v1, v2 *Vertex, vertexMergeFn func(*Vertex, *Vertex) (*Vertex, error), edgeMergeFn func(*Edge, *Edge) *Edge) error { + // methodology + // 1) edges between v1 and v2 are removed + //Loop: + for k1 := range g.Adjacency { + for k2 := range g.Adjacency[k1] { + // v1 -> v2 || v2 -> v1 + if (k1 == v1 && k2 == v2) || (k1 == v2 && k2 == v1) { + delete(g.Adjacency[k1], k2) // delete map & edge + // NOTE: if we assume this is a DAG, then we can + // assume only v1 -> v2 OR v2 -> v1 exists, and + // we can break out of these loops immediately! + //break Loop + break + } + } + } + + // 2) edges that point towards v2 from X now point to v1 from X (no dupes) + for _, x := range g.IncomingGraphEdges(v2) { // all to vertex v (??? -> v) + e := g.Adjacency[x][v2] // previous edge + r := g.Reachability(x, v1) + // merge e with ex := g.Adjacency[x][v1] if it exists! + if ex, exists := g.Adjacency[x][v1]; exists && edgeMergeFn != nil && len(r) == 0 { + e = edgeMergeFn(e, ex) + } + if len(r) == 0 { // if not reachable, add it + g.AddEdge(x, v1, e) // overwrite edge + } else if edgeMergeFn != nil { // reachable, merge e through... + prev := x // initial condition + for i, next := range r { + if i == 0 { + // next == prev, therefore skip + continue + } + // this edge is from: prev, to: next + ex, _ := g.Adjacency[prev][next] // get + ex = edgeMergeFn(ex, e) + g.Adjacency[prev][next] = ex // set + prev = next + } + } + delete(g.Adjacency[x], v2) // delete old edge + } + + // 3) edges that point from v2 to X now point from v1 to X (no dupes) + for _, x := range g.OutgoingGraphEdges(v2) { // all from vertex v (v -> ???) + e := g.Adjacency[v2][x] // previous edge + r := g.Reachability(v1, x) + // merge e with ex := g.Adjacency[v1][x] if it exists! + if ex, exists := g.Adjacency[v1][x]; exists && edgeMergeFn != nil && len(r) == 0 { + e = edgeMergeFn(e, ex) + } + if len(r) == 0 { + g.AddEdge(v1, x, e) // overwrite edge + } else if edgeMergeFn != nil { // reachable, merge e through... + prev := v1 // initial condition + for i, next := range r { + if i == 0 { + // next == prev, therefore skip + continue + } + // this edge is from: prev, to: next + ex, _ := g.Adjacency[prev][next] + ex = edgeMergeFn(ex, e) + g.Adjacency[prev][next] = ex + prev = next + } + } + delete(g.Adjacency[v2], x) + } + + // 4) merge and then remove the (now merged/grouped) vertex + if vertexMergeFn != nil { // run vertex merge function + if v, err := vertexMergeFn(v1, v2); err != nil { + return err + } else if v != nil { // replace v1 with the "merged" version... + v1 = v // XXX: will this replace v1 the way we want? + } + } + g.DeleteVertex(v2) // remove grouped vertex + + // 5) creation of a cyclic graph should throw an error + if _, dag := g.TopologicalSort(); !dag { // am i a dag or not? + return fmt.Errorf("Graph is not a dag!") + } + return nil // success +} + +// autoGroup is the mechanical auto group "runner" that runs the interface spec +func (g *Graph) autoGroup(ag AutoGrouper) chan string { + strch := make(chan string) // output log messages here + go func(strch chan string) { + strch <- fmt.Sprintf("Compile: Grouping: Algorithm: %v...", ag.name()) + if err := ag.init(g); err != nil { + log.Fatalf("Error running autoGroup(init): %v", err) + } + + for { + var v, w *Vertex + v, w, err := ag.vertexNext() // get pair to compare + if err != nil { + log.Fatalf("Error running autoGroup(vertexNext): %v", err) + } + merged := false + // save names since they change during the runs + vStr := fmt.Sprintf("%s", v) // valid even if it is nil + wStr := fmt.Sprintf("%s", w) + + if err := ag.vertexCmp(v, w); err != nil { // cmp ? + if global.DEBUG { + strch <- fmt.Sprintf("Compile: Grouping: !GroupCmp for: %s into %s", wStr, vStr) + } + + // remove grouped vertex and merge edges (res is safe) + } else if err := g.VertexMerge(v, w, ag.vertexMerge, ag.edgeMerge); err != nil { // merge... + strch <- fmt.Sprintf("Compile: Grouping: !VertexMerge for: %s into %s", wStr, vStr) + + } else { // success! + strch <- fmt.Sprintf("Compile: Grouping: Success for: %s into %s", wStr, vStr) + merged = true // woo + } + + // did these get used? + if ok, err := ag.vertexTest(merged); err != nil { + log.Fatalf("Error running autoGroup(vertexTest): %v", err) + } else if !ok { + break // done! + } + } + + close(strch) + return + }(strch) // call function + return strch +} + +// AutoGroup runs the auto grouping on the graph and prints out log messages +func (g *Graph) AutoGroup() { + // receive log messages from channel... + // this allows test cases to avoid printing them when they're unwanted! + // TODO: this algorithm may not be correct in all cases. replace if needed! + for str := range g.autoGroup(&nonReachabilityGrouper{}) { + log.Println(str) + } +} diff --git a/pgraph.go b/pgraph/pgraph.go similarity index 87% rename from pgraph.go rename to pgraph/pgraph.go index b2e30954..b5ec0031 100644 --- a/pgraph.go +++ b/pgraph/pgraph.go @@ -15,8 +15,8 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -// Pgraph (Pointer Graph) -package main +// Package pgraph represents the internal "pointer graph" that we use. +package pgraph import ( "errors" @@ -31,6 +31,11 @@ import ( "sync" "syscall" "time" + + "github.com/purpleidea/mgmt/converger" + "github.com/purpleidea/mgmt/event" + "github.com/purpleidea/mgmt/global" + "github.com/purpleidea/mgmt/resources" ) //go:generate stringer -type=graphState -output=graphstate_stringer.go @@ -59,8 +64,8 @@ type Graph struct { // Vertex is the primary vertex struct in this library. type Vertex struct { - Res // anonymous field - timestamp int64 // last updated timestamp ? + resources.Res // anonymous field + timestamp int64 // last updated timestamp ? } // Edge is the primary edge struct in this library. @@ -78,7 +83,7 @@ func NewGraph(name string) *Graph { } // NewVertex returns a new graph vertex struct with a contained resource. -func NewVertex(r Res) *Vertex { +func NewVertex(r resources.Res) *Vertex { return &Vertex{ Res: r, } @@ -160,7 +165,7 @@ func (g *Graph) AddEdge(v1, v2 *Vertex, e *Edge) { // GetVertexMatch searches for an equivalent resource in the graph and returns // the vertex it is found in, or nil if not found. -func (g *Graph) GetVertexMatch(obj Res) *Vertex { +func (g *Graph) GetVertexMatch(obj resources.Res) *Vertex { for k := range g.Adjacency { if k.Res.Compare(obj) { return k @@ -549,99 +554,6 @@ func (g *Graph) Reachability(a, b *Vertex) []*Vertex { return result } -// VertexMerge merges v2 into v1 by reattaching the edges where appropriate, -// and then by deleting v2 from the graph. Since more than one edge between two -// vertices is not allowed, duplicate edges are merged as well. an edge merge -// function can be provided if you'd like to control how you merge the edges! -func (g *Graph) VertexMerge(v1, v2 *Vertex, vertexMergeFn func(*Vertex, *Vertex) (*Vertex, error), edgeMergeFn func(*Edge, *Edge) *Edge) error { - // methodology - // 1) edges between v1 and v2 are removed - //Loop: - for k1 := range g.Adjacency { - for k2 := range g.Adjacency[k1] { - // v1 -> v2 || v2 -> v1 - if (k1 == v1 && k2 == v2) || (k1 == v2 && k2 == v1) { - delete(g.Adjacency[k1], k2) // delete map & edge - // NOTE: if we assume this is a DAG, then we can - // assume only v1 -> v2 OR v2 -> v1 exists, and - // we can break out of these loops immediately! - //break Loop - break - } - } - } - - // 2) edges that point towards v2 from X now point to v1 from X (no dupes) - for _, x := range g.IncomingGraphEdges(v2) { // all to vertex v (??? -> v) - e := g.Adjacency[x][v2] // previous edge - r := g.Reachability(x, v1) - // merge e with ex := g.Adjacency[x][v1] if it exists! - if ex, exists := g.Adjacency[x][v1]; exists && edgeMergeFn != nil && len(r) == 0 { - e = edgeMergeFn(e, ex) - } - if len(r) == 0 { // if not reachable, add it - g.AddEdge(x, v1, e) // overwrite edge - } else if edgeMergeFn != nil { // reachable, merge e through... - prev := x // initial condition - for i, next := range r { - if i == 0 { - // next == prev, therefore skip - continue - } - // this edge is from: prev, to: next - ex, _ := g.Adjacency[prev][next] // get - ex = edgeMergeFn(ex, e) - g.Adjacency[prev][next] = ex // set - prev = next - } - } - delete(g.Adjacency[x], v2) // delete old edge - } - - // 3) edges that point from v2 to X now point from v1 to X (no dupes) - for _, x := range g.OutgoingGraphEdges(v2) { // all from vertex v (v -> ???) - e := g.Adjacency[v2][x] // previous edge - r := g.Reachability(v1, x) - // merge e with ex := g.Adjacency[v1][x] if it exists! - if ex, exists := g.Adjacency[v1][x]; exists && edgeMergeFn != nil && len(r) == 0 { - e = edgeMergeFn(e, ex) - } - if len(r) == 0 { - g.AddEdge(v1, x, e) // overwrite edge - } else if edgeMergeFn != nil { // reachable, merge e through... - prev := v1 // initial condition - for i, next := range r { - if i == 0 { - // next == prev, therefore skip - continue - } - // this edge is from: prev, to: next - ex, _ := g.Adjacency[prev][next] - ex = edgeMergeFn(ex, e) - g.Adjacency[prev][next] = ex - prev = next - } - } - delete(g.Adjacency[v2], x) - } - - // 4) merge and then remove the (now merged/grouped) vertex - if vertexMergeFn != nil { // run vertex merge function - if v, err := vertexMergeFn(v1, v2); err != nil { - return err - } else if v != nil { // replace v1 with the "merged" version... - v1 = v // XXX: will this replace v1 the way we want? - } - } - g.DeleteVertex(v2) // remove grouped vertex - - // 5) creation of a cyclic graph should throw an error - if _, dag := g.TopologicalSort(); !dag { // am i a dag or not? - return fmt.Errorf("Graph is not a dag!") - } - return nil // success -} - // GetTimestamp returns the timestamp of a vertex func (v *Vertex) GetTimestamp() int64 { return v.timestamp @@ -662,7 +574,7 @@ func (g *Graph) OKTimestamp(v *Vertex) bool { // if they're equal (eg: on init of 0) then we also can't run // b/c we should let our pre-req's go first... x, y := v.GetTimestamp(), n.GetTimestamp() - if DEBUG { + if global.DEBUG { log.Printf("%v[%v]: OKTimestamp: (%v) >= %v[%v](%v): !%v", v.Kind(), v.GetName(), x, n.Kind(), n.GetName(), y, x >= y) } if x >= y { @@ -679,14 +591,14 @@ func (g *Graph) Poke(v *Vertex, activity bool) { for _, n := range g.OutgoingGraphEdges(v) { // XXX: if we're in state event and haven't been cancelled by // apply, then we can cancel a poke to a child, right? XXX - // XXX: if n.Res.getState() != resStateEvent { // is this correct? + // XXX: if n.Res.getState() != resources.ResStateEvent { // is this correct? if true { // XXX - if DEBUG { + if global.DEBUG { log.Printf("%v[%v]: Poke: %v[%v]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) } - n.SendEvent(eventPoke, false, activity) // XXX: can this be switched to sync? + n.SendEvent(event.EventPoke, false, activity) // XXX: can this be switched to sync? } else { - if DEBUG { + if global.DEBUG { log.Printf("%v[%v]: Poke: %v[%v]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) } } @@ -699,18 +611,18 @@ func (g *Graph) BackPoke(v *Vertex) { for _, n := range g.IncomingGraphEdges(v) { x, y, s := v.GetTimestamp(), n.GetTimestamp(), n.Res.GetState() // if the parent timestamp needs poking AND it's not in state - // resStateEvent, then poke it. If the parent is in resStateEvent it + // ResStateEvent, then poke it. If the parent is in ResStateEvent it // means that an event is pending, so we'll be expecting a poke // back soon, so we can safely discard the extra parent poke... // TODO: implement a stateLT (less than) to tell if something // happens earlier in the state cycle and that doesn't wrap nil - if x >= y && (s != resStateEvent && s != resStateCheckApply) { - if DEBUG { + if x >= y && (s != resources.ResStateEvent && s != resources.ResStateCheckApply) { + if global.DEBUG { log.Printf("%v[%v]: BackPoke: %v[%v]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) } - n.SendEvent(eventBackPoke, false, false) // XXX: can this be switched to sync? + n.SendEvent(event.EventBackPoke, false, false) // XXX: can this be switched to sync? } else { - if DEBUG { + if global.DEBUG { log.Printf("%v[%v]: BackPoke: %v[%v]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) } } @@ -721,27 +633,27 @@ func (g *Graph) BackPoke(v *Vertex) { // XXX: rename this function func (g *Graph) Process(v *Vertex) error { obj := v.Res - if DEBUG { + if global.DEBUG { log.Printf("%v[%v]: Process()", obj.Kind(), obj.GetName()) } - obj.SetState(resStateEvent) + obj.SetState(resources.ResStateEvent) var ok = true var apply = false // did we run an apply? // is it okay to run dependency wise right now? // if not, that's okay because when the dependency runs, it will poke // us back and we will run if needed then! if g.OKTimestamp(v) { - if DEBUG { + if global.DEBUG { log.Printf("%v[%v]: OKTimestamp(%v)", obj.Kind(), obj.GetName(), v.GetTimestamp()) } - obj.SetState(resStateCheckApply) + obj.SetState(resources.ResStateCheckApply) // if this fails, don't UpdateTimestamp() checkok, err := obj.CheckApply(!obj.Meta().Noop) if checkok && err != nil { // should never return this way log.Fatalf("%v[%v]: CheckApply(): %t, %+v", obj.Kind(), obj.GetName(), checkok, err) } - if DEBUG { + if global.DEBUG { log.Printf("%v[%v]: CheckApply(): %t, %v", obj.Kind(), obj.GetName(), checkok, err) } @@ -761,8 +673,8 @@ func (g *Graph) Process(v *Vertex) error { if ok { // update this timestamp *before* we poke or the poked // nodes might fail due to having a too old timestamp! - v.UpdateTimestamp() // this was touched... - obj.SetState(resStatePoking) // can't cancel parent poke + v.UpdateTimestamp() // this was touched... + obj.SetState(resources.ResStatePoking) // can't cancel parent poke g.Poke(v, apply) } // poke at our pre-req's instead since they need to refresh/run... @@ -794,7 +706,7 @@ func (g *Graph) Worker(v *Vertex) error { // the Watch() function about which graph it is // running on, which isolates things nicely... obj := v.Res - chanProcess := make(chan Event) + chanProcess := make(chan event.Event) go func() { running := false var timer = time.NewTimer(time.Duration(math.MaxInt64)) // longest duration @@ -803,7 +715,7 @@ func (g *Graph) Worker(v *Vertex) error { } var delay = time.Duration(v.Meta().Delay) * time.Millisecond var retry int16 = v.Meta().Retry // number of tries left, -1 for infinite - var saved Event + var saved event.Event Loop: for { // this has to be synchronous, because otherwise the Res @@ -989,8 +901,8 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue // and not just selectively the subset with no indegree. if (!first) || indegree[v] == 0 { // ensure state is started before continuing on to next vertex - for !v.SendEvent(eventStart, true, false) { - if DEBUG { + for !v.SendEvent(event.EventStart, true, false) { + if global.DEBUG { // if SendEvent fails, we aren't up yet log.Printf("%v[%v]: Retrying SendEvent(Start)", v.Kind(), v.GetName()) // sleep here briefly or otherwise cause @@ -1008,7 +920,7 @@ func (g *Graph) Pause() { defer log.Printf("State: %v -> %v", g.setState(graphStatePaused), g.getState()) t, _ := g.TopologicalSort() for _, v := range t { // squeeze out the events... - v.SendEvent(eventPause, true, false) + v.SendEvent(event.EventPause, true, false) } } @@ -1025,12 +937,12 @@ func (g *Graph) Exit() { // when we hit the 'default' in the select statement! // XXX: we can do this to quiesce, but it's not necessary now - v.SendEvent(eventExit, true, false) + v.SendEvent(event.EventExit, true, false) } } // AssociateData associates some data with the object in the graph in question -func (g *Graph) AssociateData(converger Converger) { +func (g *Graph) AssociateData(converger converger.Converger) { for v := range g.GetVerticesChan() { v.Res.AssociateData(converger) } diff --git a/pgraph_test.go b/pgraph/pgraph_test.go similarity index 99% rename from pgraph_test.go rename to pgraph/pgraph_test.go index 8e12f88a..583abbf4 100644 --- a/pgraph_test.go +++ b/pgraph/pgraph_test.go @@ -15,9 +15,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -// NOTE: this is pgraph, a pointer graph - -package main +package pgraph import ( "fmt" diff --git a/puppet.go b/puppet/puppet.go similarity index 92% rename from puppet.go rename to puppet/puppet.go index 6e9a05cc..168e4e85 100644 --- a/puppet.go +++ b/puppet/puppet.go @@ -15,7 +15,8 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package main +// Package puppet provides the integration entrypoint for the puppet language. +package puppet import ( "bufio" @@ -24,6 +25,9 @@ import ( "os/exec" "strconv" "strings" + + "github.com/purpleidea/mgmt/gconfig" + "github.com/purpleidea/mgmt/global" ) const ( @@ -32,7 +36,7 @@ const ( ) func runPuppetCommand(cmd *exec.Cmd) ([]byte, error) { - if DEBUG { + if global.DEBUG { log.Printf("Puppet: running command: %v", cmd) } @@ -67,7 +71,7 @@ func runPuppetCommand(cmd *exec.Cmd) ([]byte, error) { // will choke on an oversized slice. http://stackoverflow.com/a/33726617/3356612 result = append(result, data[0:count]...) } - if DEBUG { + if global.DEBUG { log.Printf("Puppet: read %v bytes of data from puppet", len(result)) } for scanner := bufio.NewScanner(stderr); scanner.Scan(); { @@ -83,7 +87,7 @@ func runPuppetCommand(cmd *exec.Cmd) ([]byte, error) { // ParseConfigFromPuppet takes a special puppet param string and config and // returns the graph configuration structure. -func ParseConfigFromPuppet(puppetParam, puppetConf string) *GraphConfig { +func ParseConfigFromPuppet(puppetParam, puppetConf string) *gconfig.GraphConfig { var puppetConfArg string if puppetConf != "" { puppetConfArg = "--config=" + puppetConf @@ -100,7 +104,7 @@ func ParseConfigFromPuppet(puppetParam, puppetConf string) *GraphConfig { log.Println("Puppet: launching translator") - var config GraphConfig + var config gconfig.GraphConfig if data, err := runPuppetCommand(cmd); err != nil { return nil } else if err := config.Parse(data); err != nil { @@ -113,7 +117,7 @@ func ParseConfigFromPuppet(puppetParam, puppetConf string) *GraphConfig { // PuppetInterval returns the graph refresh interval from the puppet configuration. func PuppetInterval(puppetConf string) int { - if DEBUG { + if global.DEBUG { log.Printf("Puppet: determining graph refresh interval") } var cmd *exec.Cmd diff --git a/remote.go b/remote/remote.go similarity index 94% rename from remote.go rename to remote/remote.go index 0c5452f8..21c8c1ed 100644 --- a/remote.go +++ b/remote/remote.go @@ -15,6 +15,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +// Package remote provides the remoting facilities for agentless execution. // This set of structs and methods are for running mgmt remotely over SSH. This // gives us the architectural robustness of our current design, combined with // the ability to run it with an "agent-less" approach for bootstrapping, and @@ -35,7 +36,7 @@ // remote mgmt transient agents are running, they can still exchange data and // converge together without directly connecting, since they all tunnel through // the etcd server running on the initiator. -package main // TODO: make this a separate "remote" package +package remote // TODO: running with two identical remote endpoints over a slow connection, eg: // --remote file1.yaml --remote file1.yaml @@ -46,10 +47,6 @@ import ( "crypto/sha256" "encoding/hex" "fmt" - "github.com/howeyc/gopass" - "github.com/kardianos/osext" - "github.com/pkg/sftp" - "golang.org/x/crypto/ssh" "io" "io/ioutil" "log" @@ -63,9 +60,19 @@ import ( "strings" "sync" "time" + + cv "github.com/purpleidea/mgmt/converger" + "github.com/purpleidea/mgmt/gconfig" + "github.com/purpleidea/mgmt/util" + + "github.com/howeyc/gopass" + "github.com/kardianos/osext" + "github.com/pkg/sftp" + "golang.org/x/crypto/ssh" ) const ( + DEBUG = false // FIXME: should this dir be in /var/ instead? formatPattern = "/tmp/mgmt.%s/" // remote format, to match `mktemp` formatChars = "abcdefghijklmnopqrstuvwxyz0123456789" // chars for fmt string // TODO: what does mktemp use? @@ -94,7 +101,7 @@ type SSH struct { depth uint16 // depth of this node in the remote execution hierarchy caching bool // whether to try and cache the copy of the binary prefix string // location we're allowed to put data on the remote server - converger Converger + converger cv.Converger client *ssh.Client // client object sftp *sftp.Client // sftp object @@ -107,6 +114,7 @@ type SSH struct { lock sync.Mutex // mutex to avoid exit races exiting bool // flag to let us know if we're exiting + program string // name of the binary remotewd string // path to remote working directory execpath string // path to remote mgmt binary filepath string // path to remote file config @@ -214,7 +222,7 @@ func (obj *SSH) Sftp() error { break } - obj.execpath = path.Join(obj.remotewd, program) // program is a compile time string from main.go + obj.execpath = path.Join(obj.remotewd, obj.program) // program is a compile time string log.Printf("Remote: Remote path is: %s", obj.execpath) var same bool @@ -553,7 +561,7 @@ func (obj *SSH) ExecExit() error { } // FIXME: workaround: force a signal! - if _, err := obj.simpleRun(fmt.Sprintf("killall -SIGINT %s", program)); err != nil { // FIXME: low specificity + if _, err := obj.simpleRun(fmt.Sprintf("killall -SIGINT %s", obj.program)); err != nil { // FIXME: low specificity log.Printf("Remote: Failed to send SIGINT: %s", err.Error()) } @@ -562,12 +570,12 @@ func (obj *SSH) ExecExit() error { // try killing the process more violently time.Sleep(10 * time.Second) //obj.session.Signal(ssh.SIGKILL) - cmd := fmt.Sprintf("killall -SIGKILL %s", program) // FIXME: low specificity + cmd := fmt.Sprintf("killall -SIGKILL %s", obj.program) // FIXME: low specificity obj.simpleRun(cmd) }() // FIXME: workaround: wait (spin lock) until process quits cleanly... - cmd := fmt.Sprintf("while killall -0 %s 2> /dev/null; do sleep 1s; done", program) // FIXME: low specificity + cmd := fmt.Sprintf("while killall -0 %s 2> /dev/null; do sleep 1s; done", obj.program) // FIXME: low specificity if _, err := obj.simpleRun(cmd); err != nil { return fmt.Errorf("Error waiting: %s", err) } @@ -680,29 +688,30 @@ type Remotes struct { caching bool // whether to try and cache the copy of the binary depth uint16 // depth of this node in the remote execution hierarchy prefix string // folder prefix to use for misc storage - converger Converger + converger cv.Converger convergerCb func(func(map[string]bool) error) (func(), error) - wg sync.WaitGroup // keep track of each running SSH connection - lock sync.Mutex // mutex for access to sshmap - sshmap map[string]*SSH // map to each SSH struct with the remote as the key - exiting bool // flag to let us know if we're exiting - exitChan chan struct{} // closes when we should exit - semaphore Semaphore // counting semaphore to limit concurrent connections - hostnames []string // list of hostnames we've seen so far - cuuid ConvergerUUID // convergerUUID for the remote itself - cuuids map[string]ConvergerUUID // map to each SSH struct with the remote as the key - callbackCancelFunc func() // stored callback function cancel function + wg sync.WaitGroup // keep track of each running SSH connection + lock sync.Mutex // mutex for access to sshmap + sshmap map[string]*SSH // map to each SSH struct with the remote as the key + exiting bool // flag to let us know if we're exiting + exitChan chan struct{} // closes when we should exit + semaphore Semaphore // counting semaphore to limit concurrent connections + hostnames []string // list of hostnames we've seen so far + cuuid cv.ConvergerUUID // convergerUUID for the remote itself + cuuids map[string]cv.ConvergerUUID // map to each SSH struct with the remote as the key + callbackCancelFunc func() // stored callback function cancel function + program string // name of the program } // The NewRemotes function builds a Remotes struct. -func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fileWatch chan string, cConns uint16, interactive bool, sshPrivIdRsa string, caching bool, depth uint16, prefix string, converger Converger, convergerCb func(func(map[string]bool) error) (func(), error)) *Remotes { +func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fileWatch chan string, cConns uint16, interactive bool, sshPrivIdRsa string, caching bool, depth uint16, prefix string, converger cv.Converger, convergerCb func(func(map[string]bool) error) (func(), error), program string) *Remotes { return &Remotes{ clientURLs: clientURLs, remoteURLs: remoteURLs, noop: noop, - remotes: StrRemoveDuplicatesInList(remotes), + remotes: util.StrRemoveDuplicatesInList(remotes), fileWatch: fileWatch, cConns: cConns, interactive: interactive, @@ -716,7 +725,8 @@ func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fi exitChan: make(chan struct{}), semaphore: NewSemaphore(int(cConns)), hostnames: make([]string, len(remotes)), - cuuids: make(map[string]ConvergerUUID), + cuuids: make(map[string]cv.ConvergerUUID), + program: program, } } @@ -724,7 +734,7 @@ func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fi // It takes as input the path to a graph definition file. func (obj *Remotes) NewSSH(file string) (*SSH, error) { // first do the parsing... - config := ParseConfigFromFile(file) + config := gconfig.ParseConfigFromFile(file) if config == nil { return nil, fmt.Errorf("Remote: Error parsing remote graph: %s", file) } @@ -785,7 +795,7 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) { if hostname == "" { hostname = host // default to above } - if StrInList(hostname, obj.hostnames) { + if util.StrInList(hostname, obj.hostnames) { return nil, fmt.Errorf("Remote: Hostname `%s` already exists!", hostname) } obj.hostnames = append(obj.hostnames, hostname) @@ -805,6 +815,7 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) { caching: obj.caching, converger: obj.converger, prefix: obj.prefix, + program: obj.program, }, nil } @@ -872,7 +883,7 @@ func (obj *Remotes) passwordCallback(user, host string) func() (string, error) { return p, nil case e := <-failchan: return "", e - case <-TimeAfterOrBlock(timeout): + case <-util.TimeAfterOrBlock(timeout): return "", fmt.Errorf("Interactive timeout reached!") } } diff --git a/exec.go b/resources/exec.go similarity index 98% rename from exec.go rename to resources/exec.go index 0dc8ae09..8406d58d 100644 --- a/exec.go +++ b/resources/exec.go @@ -15,7 +15,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package main +package resources import ( "bufio" @@ -27,6 +27,9 @@ import ( "os/exec" "strings" "time" + + "github.com/purpleidea/mgmt/event" + "github.com/purpleidea/mgmt/util" ) func init() { @@ -107,7 +110,7 @@ func (obj *ExecRes) BufioChanScanner(scanner *bufio.Scanner) (chan string, chan } // Watch is the primary listener for this resource and it outputs events. -func (obj *ExecRes) Watch(processChan chan Event) error { +func (obj *ExecRes) Watch(processChan chan event.Event) error { if obj.IsWatching() { return nil } @@ -167,7 +170,7 @@ func (obj *ExecRes) Watch(processChan chan Event) error { } for { - obj.SetState(resStateWatching) // reset + obj.SetState(ResStateWatching) // reset select { case text := <-bufioch: cuuid.SetConverged(false) @@ -312,7 +315,7 @@ func (obj *ExecRes) CheckApply(apply bool) (checkok bool, err error) { return false, err } - case <-TimeAfterOrBlock(timeout): + case <-util.TimeAfterOrBlock(timeout): log.Printf("%v[%v]: Timeout waiting for Cmd", obj.Kind(), obj.GetName()) //cmd.Process.Kill() // TODO: is this necessary? return false, errors.New("Timeout waiting for Cmd!") diff --git a/file.go b/resources/file.go similarity index 94% rename from file.go rename to resources/file.go index 8c0bd9ab..7346d6c9 100644 --- a/file.go +++ b/resources/file.go @@ -15,15 +15,13 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package main +package resources import ( "bytes" "crypto/sha256" - "encoding/hex" - "gopkg.in/fsnotify.v1" - //"github.com/go-fsnotify/fsnotify" // git master of "gopkg.in/fsnotify.v1" "encoding/gob" + "encoding/hex" "fmt" "io" "io/ioutil" @@ -35,6 +33,13 @@ import ( "strings" "syscall" "time" + + "github.com/purpleidea/mgmt/event" + "github.com/purpleidea/mgmt/global" // XXX: package mgmtmain instead? + "github.com/purpleidea/mgmt/util" + + "gopkg.in/fsnotify.v1" + //"github.com/go-fsnotify/fsnotify" // git master of "gopkg.in/fsnotify.v1" ) func init() { @@ -95,8 +100,8 @@ func (obj *FileRes) Init() { // GetPath returns the actual path to use for this resource. It computes this // after analysis of the Path, Dirname and Basename values. Dirs end with slash. func (obj *FileRes) GetPath() string { - d := Dirname(obj.Path) - b := Basename(obj.Path) + d := util.Dirname(obj.Path) + b := util.Basename(obj.Path) if obj.Dirname == "" && obj.Basename == "" { return obj.Path } @@ -143,7 +148,7 @@ func (obj *FileRes) addSubFolders(p string) error { } // look at all subfolders... walkFn := func(path string, info os.FileInfo, err error) error { - if DEBUG { + if global.DEBUG { log.Printf("%s[%s]: Walk: %s (%v): %v", obj.Kind(), obj.GetName(), path, info, err) } if err != nil { @@ -168,7 +173,7 @@ func (obj *FileRes) addSubFolders(p string) error { // If the Watch returns an error, it means that something has gone wrong, and it // must be restarted. On a clean exit it returns nil. // FIXME: Also watch the source directory when using obj.Source !!! -func (obj *FileRes) Watch(processChan chan Event) error { +func (obj *FileRes) Watch(processChan chan event.Event) error { if obj.IsWatching() { return nil // TODO: should this be an error? } @@ -195,11 +200,11 @@ func (obj *FileRes) Watch(processChan chan Event) error { } defer obj.watcher.Close() - patharray := PathSplit(safename) // tokenize the path - var index = len(patharray) // starting index - var current string // current "watcher" location - var deltaDepth int // depth delta between watcher and event - var send = false // send event? + patharray := util.PathSplit(safename) // tokenize the path + var index = len(patharray) // starting index + var current string // current "watcher" location + var deltaDepth int // depth delta between watcher and event + var send = false // send event? var exit = false var dirty = false @@ -221,13 +226,13 @@ func (obj *FileRes) Watch(processChan chan Event) error { if current == "" { // the empty string top is the root dir ("/") current = "/" } - if DEBUG { + if global.DEBUG { log.Printf("%s[%s]: Watching: %v", obj.Kind(), obj.GetName(), current) // attempting to watch... } // initialize in the loop so that we can reset on rm-ed handles err = obj.watcher.Add(current) if err != nil { - if DEBUG { + if global.DEBUG { log.Printf("%s[%s]: watcher.Add(%v): Error: %v", obj.Kind(), obj.GetName(), current, err) } if err == syscall.ENOENT { @@ -246,10 +251,10 @@ func (obj *FileRes) Watch(processChan chan Event) error { continue } - obj.SetState(resStateWatching) // reset + obj.SetState(ResStateWatching) // reset select { case event := <-obj.watcher.Events: - if DEBUG { + if global.DEBUG { log.Printf("%s[%s]: Watch(%s), Event(%s): %v", obj.Kind(), obj.GetName(), current, event.Name, event.Op) } cuuid.SetConverged(false) // XXX: technically i can detect if the event is erroneous or not first @@ -259,11 +264,11 @@ func (obj *FileRes) Watch(processChan chan Event) error { if current == event.Name { deltaDepth = 0 // i was watching what i was looking for - } else if HasPathPrefix(event.Name, current) { - deltaDepth = len(PathSplit(current)) - len(PathSplit(event.Name)) // -1 or less + } else if util.HasPathPrefix(event.Name, current) { + deltaDepth = len(util.PathSplit(current)) - len(util.PathSplit(event.Name)) // -1 or less - } else if HasPathPrefix(current, event.Name) { - deltaDepth = len(PathSplit(event.Name)) - len(PathSplit(current)) // +1 or more + } else if util.HasPathPrefix(current, event.Name) { + deltaDepth = len(util.PathSplit(event.Name)) - len(util.PathSplit(current)) // +1 or more // if below me... if _, exists := obj.watches[event.Name]; exists { send = true @@ -317,7 +322,7 @@ func (obj *FileRes) Watch(processChan chan Event) error { } // if safename starts with event.Name, we're above, and no event should be sent - } else if HasPathPrefix(safename, event.Name) { + } else if util.HasPathPrefix(safename, event.Name) { //log.Println("Above!") if deltaDepth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) { @@ -328,7 +333,7 @@ func (obj *FileRes) Watch(processChan chan Event) error { if deltaDepth < 0 { log.Println("Parent!") - if PathPrefixDelta(safename, event.Name) == 1 { // we're the parent dir + if util.PathPrefixDelta(safename, event.Name) == 1 { // we're the parent dir send = true dirty = true } @@ -337,7 +342,7 @@ func (obj *FileRes) Watch(processChan chan Event) error { } // if event.Name startswith safename, send event, we're already deeper - } else if HasPathPrefix(event.Name, safename) { + } else if util.HasPathPrefix(event.Name, safename) { //log.Println("Event2!") send = true dirty = true @@ -450,7 +455,7 @@ func mapPaths(fileInfos []FileInfo) map[string]FileInfo { func (obj *FileRes) fileCheckApply(apply bool, src io.ReadSeeker, dst string, sha256sum string) (string, bool, error) { // TODO: does it make sense to switch dst to an io.Writer ? // TODO: use obj.Force when dealing with symlinks and other file types! - if DEBUG { + if global.DEBUG { log.Printf("fileCheckApply: %s -> %s", src, dst) } @@ -547,7 +552,7 @@ func (obj *FileRes) fileCheckApply(apply bool, src io.ReadSeeker, dst string, sh if !apply { return sha256sum, false, nil } - if DEBUG { + if global.DEBUG { log.Printf("fileCheckApply: Apply: %s -> %s", src, dst) } @@ -568,12 +573,12 @@ func (obj *FileRes) fileCheckApply(apply bool, src io.ReadSeeker, dst string, sh // syscall.Splice(rfd int, roff *int64, wfd int, woff *int64, len int, flags int) (n int64, err error) // TODO: should we offer a way to cancel the copy on ^C ? - if DEBUG { + if global.DEBUG { log.Printf("fileCheckApply: Copy: %s -> %s", src, dst) } if n, err := io.Copy(dstFile, src); err != nil { return sha256sum, false, err - } else if DEBUG { + } else if global.DEBUG { log.Printf("fileCheckApply: Copied: %v", n) } return sha256sum, false, dstFile.Sync() @@ -583,7 +588,7 @@ func (obj *FileRes) fileCheckApply(apply bool, src io.ReadSeeker, dst string, sh // It is recursive and can create directories directly, and files via the usual // fileCheckApply method. It returns checkOK and error as is normally expected. func (obj *FileRes) syncCheckApply(apply bool, src, dst string) (bool, error) { - if DEBUG { + if global.DEBUG { log.Printf("syncCheckApply: %s -> %s", src, dst) } if src == "" || dst == "" { @@ -601,12 +606,12 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string) (bool, error) { } if !srcIsDir && !dstIsDir { - if DEBUG { + if global.DEBUG { log.Printf("syncCheckApply: %s -> %s", src, dst) } fin, err := os.Open(src) if err != nil { - if DEBUG && os.IsNotExist(err) { // if we get passed an empty src + if global.DEBUG && os.IsNotExist(err) { // if we get passed an empty src log.Printf("syncCheckApply: Missing src: %s", src) } return false, err @@ -662,7 +667,7 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string) (bool, error) { delete(smartDst, relPathFile) // rm from purge list } - if DEBUG { + if global.DEBUG { log.Printf("syncCheckApply: mkdir -m %s '%s'", fileInfo.Mode(), absDst) } if err := os.Mkdir(absDst, fileInfo.Mode()); err != nil { @@ -673,7 +678,7 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string) (bool, error) { // if we're a regular file, the recurse will create it } - if DEBUG { + if global.DEBUG { log.Printf("syncCheckApply: Recurse: %s -> %s", absSrc, absDst) } if obj.Recurse { @@ -887,9 +892,9 @@ func (obj *FileResAutoEdges) Test(input []bool) bool { // AutoEdges generates a simple linear sequence of each parent directory from // the bottom up! func (obj *FileRes) AutoEdges() AutoEdge { - var data []ResUUID // store linear result chain here... - values := PathSplitFullReversed(obj.path) // build it - _, values = values[0], values[1:] // get rid of first value which is me! + var data []ResUUID // store linear result chain here... + values := util.PathSplitFullReversed(obj.path) // build it + _, values = values[0], values[1:] // get rid of first value which is me! for _, x := range values { var reversed = true // cheat by passing a pointer data = append(data, &FileUUID{ diff --git a/noop.go b/resources/noop.go similarity index 96% rename from noop.go rename to resources/noop.go index 2d354304..6cc51df4 100644 --- a/noop.go +++ b/resources/noop.go @@ -15,12 +15,14 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package main +package resources import ( "encoding/gob" "log" "time" + + "github.com/purpleidea/mgmt/event" ) func init() { @@ -58,7 +60,7 @@ func (obj *NoopRes) Validate() bool { } // Watch is the primary listener for this resource and it outputs events. -func (obj *NoopRes) Watch(processChan chan Event) error { +func (obj *NoopRes) Watch(processChan chan event.Event) error { if obj.IsWatching() { return nil // TODO: should this be an error? } @@ -79,7 +81,7 @@ func (obj *NoopRes) Watch(processChan chan Event) error { var send = false // send event? var exit = false for { - obj.SetState(resStateWatching) // reset + obj.SetState(ResStateWatching) // reset select { case event := <-obj.events: cuuid.SetConverged(false) diff --git a/packagekit.go b/resources/packagekit/packagekit.go similarity index 99% rename from packagekit.go rename to resources/packagekit/packagekit.go index f2862eb7..a0a277d0 100644 --- a/packagekit.go +++ b/resources/packagekit/packagekit.go @@ -17,15 +17,17 @@ // DOCS: https://www.freedesktop.org/software/PackageKit/gtk-doc/index.html -//package packagekit // TODO -package main +package packagekit import ( "fmt" - "github.com/godbus/dbus" "log" "runtime" "strings" + + "github.com/purpleidea/mgmt/util" + + "github.com/godbus/dbus" ) // global tweaks of verbosity and code path @@ -160,7 +162,7 @@ type PkPackageIDActionData struct { // NewBus returns a new bus connection. func NewBus() *Conn { // if we share the bus with others, we will get each others messages!! - bus, err := SystemBusPrivateUsable() // don't share the bus connection! + bus, err := util.SystemBusPrivateUsable() // don't share the bus connection! if err != nil { return nil } @@ -422,7 +424,7 @@ loop: } else { return fmt.Errorf("PackageKit: Error: %v", signal.Body) } - case <-TimeAfterOrBlock(timeout): + case <-util.TimeAfterOrBlock(timeout): if finished { log.Println("PackageKit: Timeout: InstallPackages: Waiting for 'Destroy'") return nil // got tired of waiting for Destroy diff --git a/pkg.go b/resources/pkg.go similarity index 85% rename from pkg.go rename to resources/pkg.go index 89e435b2..c112fe77 100644 --- a/pkg.go +++ b/resources/pkg.go @@ -15,10 +15,9 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package main +package resources import ( - //"packagekit" // TODO "encoding/gob" "errors" "fmt" @@ -26,6 +25,11 @@ import ( "path" "strings" "time" + + "github.com/purpleidea/mgmt/event" + "github.com/purpleidea/mgmt/global" // XXX: package mgmtmain instead? + "github.com/purpleidea/mgmt/resources/packagekit" + "github.com/purpleidea/mgmt/util" ) func init() { @@ -39,7 +43,7 @@ type PkgRes struct { AllowUntrusted bool `yaml:"allowuntrusted"` // allow untrusted packages to be installed? AllowNonFree bool `yaml:"allownonfree"` // allow nonfree packages to be found? AllowUnsupported bool `yaml:"allowunsupported"` // allow unsupported packages to be found? - //bus *Conn // pk bus connection + //bus *packagekit.Conn // pk bus connection fileList []string // FIXME: update if pkg changes } @@ -63,7 +67,7 @@ func (obj *PkgRes) Init() { obj.BaseRes.kind = "Pkg" obj.BaseRes.Init() // call base init, b/c we're overriding - bus := NewBus() + bus := packagekit.NewBus() if bus == nil { log.Fatal("Can't connect to PackageKit bus.") } @@ -92,7 +96,7 @@ func (obj *PkgRes) Init() { return } if files, ok := filesMap[data.PackageID]; ok { - obj.fileList = DirifyFileList(files, false) + obj.fileList = util.DirifyFileList(files, false) } } @@ -109,7 +113,7 @@ func (obj *PkgRes) Validate() bool { // It uses the PackageKit UpdatesChanged signal to watch for changes. // TODO: https://github.com/hughsie/PackageKit/issues/109 // TODO: https://github.com/hughsie/PackageKit/issues/110 -func (obj *PkgRes) Watch(processChan chan Event) error { +func (obj *PkgRes) Watch(processChan chan event.Event) error { if obj.IsWatching() { return nil } @@ -127,7 +131,7 @@ func (obj *PkgRes) Watch(processChan chan Event) error { return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout } - bus := NewBus() + bus := packagekit.NewBus() if bus == nil { log.Fatal("Can't connect to PackageKit bus.") } @@ -143,17 +147,17 @@ func (obj *PkgRes) Watch(processChan chan Event) error { var dirty = false for { - if DEBUG { + if global.DEBUG { log.Printf("%v: Watching...", obj.fmtNames(obj.getNames())) } - obj.SetState(resStateWatching) // reset + obj.SetState(ResStateWatching) // reset select { case event := <-ch: cuuid.SetConverged(false) // FIXME: ask packagekit for info on what packages changed - if DEBUG { + if global.DEBUG { log.Printf("%v: Event: %v", obj.fmtNames(obj.getNames()), event.Name) } @@ -236,23 +240,23 @@ func (obj *PkgRes) groupMappingHelper() map[string]string { return result } -func (obj *PkgRes) pkgMappingHelper(bus *Conn) (map[string]*PkPackageIDActionData, error) { - packageMap := obj.groupMappingHelper() // get the grouped values - packageMap[obj.Name] = obj.State // key is pkg name, value is pkg state - var filter uint64 // initializes at the "zero" value of 0 - filter += PK_FILTER_ENUM_ARCH // always search in our arch (optional!) +func (obj *PkgRes) pkgMappingHelper(bus *packagekit.Conn) (map[string]*packagekit.PkPackageIDActionData, error) { + packageMap := obj.groupMappingHelper() // get the grouped values + packageMap[obj.Name] = obj.State // key is pkg name, value is pkg state + var filter uint64 // initializes at the "zero" value of 0 + filter += packagekit.PK_FILTER_ENUM_ARCH // always search in our arch (optional!) // we're requesting latest version, or to narrow down install choices! if obj.State == "newest" || obj.State == "installed" { // if we add this, we'll still see older packages if installed // this is an optimization, and is *optional*, this logic is // handled inside of PackagesToPackageIDs now automatically! - filter += PK_FILTER_ENUM_NEWEST // only search for newest packages + filter += packagekit.PK_FILTER_ENUM_NEWEST // only search for newest packages } if !obj.AllowNonFree { - filter += PK_FILTER_ENUM_FREE + filter += packagekit.PK_FILTER_ENUM_FREE } if !obj.AllowUnsupported { - filter += PK_FILTER_ENUM_SUPPORTED + filter += packagekit.PK_FILTER_ENUM_SUPPORTED } result, e := bus.PackagesToPackageIDs(packageMap, filter) if e != nil { @@ -274,7 +278,7 @@ func (obj *PkgRes) CheckApply(apply bool) (checkok bool, err error) { return true, nil } - bus := NewBus() + bus := packagekit.NewBus() if bus == nil { return false, errors.New("Can't connect to PackageKit bus.") } @@ -287,18 +291,18 @@ func (obj *PkgRes) CheckApply(apply bool) (checkok bool, err error) { packageMap := obj.groupMappingHelper() // map[string]string packageList := []string{obj.Name} - packageList = append(packageList, StrMapKeys(packageMap)...) + packageList = append(packageList, util.StrMapKeys(packageMap)...) //stateList := []string{obj.State} - //stateList = append(stateList, StrMapValues(packageMap)...) + //stateList = append(stateList, util.StrMapValues(packageMap)...) // TODO: at the moment, all the states are the same, but // eventually we might be able to drop this constraint! - states, err := FilterState(result, packageList, obj.State) + states, err := packagekit.FilterState(result, packageList, obj.State) if err != nil { return false, fmt.Errorf("The FilterState method failed with: %v.", err) } data, _ := result[obj.Name] // if above didn't error, we won't either! - validState := BoolMapTrue(BoolMapValues(states)) + validState := util.BoolMapTrue(util.BoolMapValues(states)) // obj.State == "installed" || "uninstalled" || "newest" || "4.2-1.fc23" switch obj.State { @@ -325,20 +329,20 @@ func (obj *PkgRes) CheckApply(apply bool) (checkok bool, err error) { // apply portion log.Printf("%v: Apply", obj.fmtNames(obj.getNames())) - readyPackages, err := FilterPackageState(result, packageList, obj.State) + readyPackages, err := packagekit.FilterPackageState(result, packageList, obj.State) if err != nil { return false, err // fail } // these are the packages that actually need their states applied! - applyPackages := StrFilterElementsInList(readyPackages, packageList) - packageIDs, _ := FilterPackageIDs(result, applyPackages) // would be same err as above + applyPackages := util.StrFilterElementsInList(readyPackages, packageList) + packageIDs, _ := packagekit.FilterPackageIDs(result, applyPackages) // would be same err as above var transactionFlags uint64 // initializes at the "zero" value of 0 if !obj.AllowUntrusted { // allow - transactionFlags += PK_TRANSACTION_FLAG_ENUM_ONLY_TRUSTED + transactionFlags += packagekit.PK_TRANSACTION_FLAG_ENUM_ONLY_TRUSTED } // apply correct state! - log.Printf("%v: Set: %v...", obj.fmtNames(StrListIntersection(applyPackages, obj.getNames())), obj.State) + log.Printf("%v: Set: %v...", obj.fmtNames(util.StrListIntersection(applyPackages, obj.getNames())), obj.State) switch obj.State { case "uninstalled": // run remove // NOTE: packageID is different than when installed, because now @@ -356,7 +360,7 @@ func (obj *PkgRes) CheckApply(apply bool) (checkok bool, err error) { if err != nil { return false, err // fail } - log.Printf("%v: Set: %v success!", obj.fmtNames(StrListIntersection(applyPackages, obj.getNames())), obj.State) + log.Printf("%v: Set: %v success!", obj.fmtNames(util.StrListIntersection(applyPackages, obj.getNames())), obj.State) obj.isStateOK = true // reset return false, nil // success } @@ -450,16 +454,16 @@ func (obj *PkgResAutoEdges) Test(input []bool) bool { var dirs = make([]string, count) done := []string{} for i := 0; i < count; i++ { - dir := Dirname(obj.fileList[i]) // dirname of /foo/ should be / + dir := util.Dirname(obj.fileList[i]) // dirname of /foo/ should be / dirs[i] = dir if input[i] { done = append(done, dir) } } - nodupes := StrRemoveDuplicatesInList(dirs) // remove duplicates - nodones := StrFilterElementsInList(done, nodupes) // filter out done - noempty := StrFilterElementsInList([]string{""}, nodones) // remove the "" from / - obj.fileList = RemoveCommonFilePrefixes(noempty) // magic + nodupes := util.StrRemoveDuplicatesInList(dirs) // remove duplicates + nodones := util.StrFilterElementsInList(done, nodupes) // filter out done + noempty := util.StrFilterElementsInList([]string{""}, nodones) // remove the "" from / + obj.fileList = util.RemoveCommonFilePrefixes(noempty) // magic if len(obj.fileList) == 0 { // nothing more, don't continue return false @@ -489,7 +493,7 @@ func (obj *PkgRes) AutoEdges() AutoEdge { } return &PkgResAutoEdges{ - fileList: RemoveCommonFilePrefixes(obj.fileList), // clean start! + fileList: util.RemoveCommonFilePrefixes(obj.fileList), // clean start! svcUUIDs: svcUUIDs, testIsNext: false, // start with Next() call name: obj.GetName(), // save data for PkgResAutoEdges obj @@ -573,7 +577,7 @@ func ReturnSvcInFileList(fileList []string) []string { if !strings.HasSuffix(basename, ".service") { continue } - if s := strings.TrimSuffix(basename, ".service"); !StrInList(s, result) { + if s := strings.TrimSuffix(basename, ".service"); !util.StrInList(s, result) { result = append(result, s) } } diff --git a/resources.go b/resources/resources.go similarity index 81% rename from resources.go rename to resources/resources.go index 667762f5..ff956d99 100644 --- a/resources.go +++ b/resources/resources.go @@ -15,7 +15,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package main +package resources import ( "bytes" @@ -23,17 +23,22 @@ import ( "encoding/gob" "fmt" "log" + + // TODO: should each resource be a sub-package? + "github.com/purpleidea/mgmt/converger" + "github.com/purpleidea/mgmt/event" + "github.com/purpleidea/mgmt/global" ) -//go:generate stringer -type=resState -output=resstate_stringer.go -type resState int +//go:generate stringer -type=ResState -output=resstate_stringer.go +type ResState int const ( - resStateNil resState = iota - resStateWatching - resStateEvent // an event has happened, but we haven't poked yet - resStateCheckApply - resStatePoking + ResStateNil ResState = iota + ResStateWatching + ResStateEvent // an event has happened, but we haven't poked yet + ResStateCheckApply + ResStatePoking ) // ResUUID is a unique identifier for a resource, namely it's name, and the kind ("type"). @@ -76,23 +81,23 @@ type MetaParams struct { type Base interface { GetName() string // can't be named "Name()" because of struct field SetName(string) - setKind(string) + SetKind(string) Kind() string Meta() *MetaParams - Events() chan Event - AssociateData(Converger) + Events() chan event.Event + AssociateData(converger.Converger) IsWatching() bool SetWatching(bool) - GetState() resState - SetState(resState) - DoSend(chan Event, string) (bool, error) - SendEvent(eventName, bool, bool) bool - ReadEvent(*Event) (bool, bool) // TODO: optional here? - GroupCmp(Res) bool // TODO: is there a better name for this? - GroupRes(Res) error // group resource (arg) into self - IsGrouped() bool // am I grouped? - SetGrouped(bool) // set grouped bool - GetGroup() []Res // return everyone grouped inside me + GetState() ResState + SetState(ResState) + DoSend(chan event.Event, string) (bool, error) + SendEvent(event.EventName, bool, bool) bool + ReadEvent(*event.Event) (bool, bool) // TODO: optional here? + GroupCmp(Res) bool // TODO: is there a better name for this? + GroupRes(Res) error // group resource (arg) into self + IsGrouped() bool // am I grouped? + SetGrouped(bool) // set grouped bool + GetGroup() []Res // return everyone grouped inside me SetGroup([]Res) } @@ -101,8 +106,8 @@ type Res interface { Base // include everything from the Base interface Init() //Validate() bool // TODO: this might one day be added - GetUUIDs() []ResUUID // most resources only return one - Watch(chan Event) error // send on channel to signal process() events + GetUUIDs() []ResUUID // most resources only return one + Watch(chan event.Event) error // send on channel to signal process() events CheckApply(bool) (bool, error) AutoEdges() AutoEdge Compare(Res) bool @@ -114,9 +119,9 @@ type BaseRes struct { Name string `yaml:"name"` MetaParams MetaParams `yaml:"meta"` // struct of all the metaparams kind string - events chan Event - converger Converger // converged tracking - state resState + events chan event.Event + converger converger.Converger // converged tracking + state ResState watching bool // is Watch() loop running ? isStateOK bool // whether the state is okay based on events or not isGrouped bool // am i contained within a group? @@ -166,7 +171,7 @@ func (obj *BaseUUID) Reversed() bool { // Init initializes structures like channels if created without New constructor. func (obj *BaseRes) Init() { - obj.events = make(chan Event) // unbuffered chan size to avoid stale events + obj.events = make(chan event.Event) // unbuffered chan size to avoid stale events } // GetName is used by all the resources to Get the name. @@ -179,8 +184,8 @@ func (obj *BaseRes) SetName(name string) { obj.Name = name } -// setKind sets the kind. This is used internally for exported resources. -func (obj *BaseRes) setKind(kind string) { +// SetKind sets the kind. This is used internally for exported resources. +func (obj *BaseRes) SetKind(kind string) { obj.kind = kind } @@ -195,12 +200,12 @@ func (obj *BaseRes) Meta() *MetaParams { } // Events returns the channel of events to listen on. -func (obj *BaseRes) Events() chan Event { +func (obj *BaseRes) Events() chan event.Event { return obj.events } // AssociateData associates some data with the object in question. -func (obj *BaseRes) AssociateData(converger Converger) { +func (obj *BaseRes) AssociateData(converger converger.Converger) { obj.converger = converger } @@ -215,13 +220,13 @@ func (obj *BaseRes) SetWatching(b bool) { } // GetState returns the state of the resource. -func (obj *BaseRes) GetState() resState { +func (obj *BaseRes) GetState() ResState { return obj.state } // SetState sets the state of the resource. -func (obj *BaseRes) SetState(state resState) { - if DEBUG { +func (obj *BaseRes) SetState(state ResState) { + if global.DEBUG { log.Printf("%v[%v]: State: %v -> %v", obj.Kind(), obj.GetName(), obj.GetState(), state) } obj.state = state @@ -230,9 +235,9 @@ func (obj *BaseRes) SetState(state resState) { // DoSend sends off an event, but doesn't block the incoming event queue. It can // also recursively call itself when events need processing during the wait. // I'm not completely comfortable with this fn, but it will have to do for now. -func (obj *BaseRes) DoSend(processChan chan Event, comment string) (bool, error) { - resp := NewResp() - processChan <- Event{eventNil, resp, comment, true} // trigger process +func (obj *BaseRes) DoSend(processChan chan event.Event, comment string) (bool, error) { + resp := event.NewResp() + processChan <- event.Event{event.EventNil, resp, comment, true} // trigger process select { case e := <-resp: // wait for the ACK() if e != nil { // we got a NACK @@ -252,47 +257,47 @@ func (obj *BaseRes) DoSend(processChan chan Event, comment string) (bool, error) } // SendEvent pushes an event into the message queue for a particular vertex -func (obj *BaseRes) SendEvent(event eventName, sync bool, activity bool) bool { +func (obj *BaseRes) SendEvent(ev event.EventName, sync bool, activity bool) bool { // TODO: isn't this race-y ? if !obj.IsWatching() { // element has already exited return false // if we don't return, we'll block on the send } if !sync { - obj.events <- Event{event, nil, "", activity} + obj.events <- event.Event{ev, nil, "", activity} return true } - resp := NewResp() - obj.events <- Event{event, resp, "", activity} + resp := event.NewResp() + obj.events <- event.Event{ev, resp, "", activity} resp.ACKWait() // waits until true (nil) value return true } // ReadEvent processes events when a select gets one, and handles the pause // code too! The return values specify if we should exit and poke respectively. -func (obj *BaseRes) ReadEvent(event *Event) (exit, poke bool) { - event.ACK() - switch event.Name { - case eventStart: +func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, poke bool) { + ev.ACK() + switch ev.Name { + case event.EventStart: return false, true - case eventPoke: + case event.EventPoke: return false, true - case eventBackPoke: + case event.EventBackPoke: return false, true // forward poking in response to a back poke! - case eventExit: + case event.EventExit: return true, false - case eventPause: + case event.EventPause: // wait for next event to continue select { case e := <-obj.events: e.ACK() - if e.Name == eventExit { + if e.Name == event.EventExit { return true, false - } else if e.Name == eventStart { // eventContinue + } else if e.Name == event.EventStart { // eventContinue return false, false // don't poke on unpause! } else { // if we get a poke event here, it's a bug! @@ -301,7 +306,7 @@ func (obj *BaseRes) ReadEvent(event *Event) (exit, poke bool) { } default: - log.Fatal("Unknown event: ", event) + log.Fatal("Unknown event: ", ev) } return true, false // required to keep the stupid go compiler happy } diff --git a/resources_test.go b/resources/resources_test.go similarity index 99% rename from resources_test.go rename to resources/resources_test.go index 146b4e36..24f4b1e4 100644 --- a/resources_test.go +++ b/resources/resources_test.go @@ -15,7 +15,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package main +package resources import ( "bytes" diff --git a/svc.go b/resources/svc.go similarity index 97% rename from svc.go rename to resources/svc.go index 969a114f..ca28d541 100644 --- a/svc.go +++ b/resources/svc.go @@ -17,17 +17,21 @@ // DOCS: https://godoc.org/github.com/coreos/go-systemd/dbus -package main +package resources import ( "encoding/gob" "errors" "fmt" + "log" + "time" + + "github.com/purpleidea/mgmt/event" + "github.com/purpleidea/mgmt/util" + systemd "github.com/coreos/go-systemd/dbus" // change namespace systemdUtil "github.com/coreos/go-systemd/util" "github.com/godbus/dbus" // namespace collides with systemd wrapper - "log" - "time" ) func init() { @@ -72,7 +76,7 @@ func (obj *SvcRes) Validate() bool { } // Watch is the primary listener for this resource and it outputs events. -func (obj *SvcRes) Watch(processChan chan Event) error { +func (obj *SvcRes) Watch(processChan chan event.Event) error { if obj.IsWatching() { return nil } @@ -102,7 +106,7 @@ func (obj *SvcRes) Watch(processChan chan Event) error { defer conn.Close() // if we share the bus with others, we will get each others messages!! - bus, err := SystemBusPrivateUsable() // don't share the bus connection! + bus, err := util.SystemBusPrivateUsable() // don't share the bus connection! if err != nil { return fmt.Errorf("Failed to connect to bus: %s", err) } @@ -157,7 +161,7 @@ func (obj *SvcRes) Watch(processChan chan Event) error { set.Remove(svc) // no return value should ever occur } - obj.SetState(resStateWatching) // reset + obj.SetState(ResStateWatching) // reset select { case <-buschan: // XXX wait for new units event to unstick cuuid.SetConverged(false) @@ -189,7 +193,7 @@ func (obj *SvcRes) Watch(processChan chan Event) error { } log.Printf("Watching: %v", svc) // attempting to watch... - obj.SetState(resStateWatching) // reset + obj.SetState(ResStateWatching) // reset select { case event := <-subChannel: diff --git a/timer.go b/resources/timer.go similarity index 96% rename from timer.go rename to resources/timer.go index d6bb413d..89c9dbc6 100644 --- a/timer.go +++ b/resources/timer.go @@ -15,12 +15,14 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package main +package resources import ( "encoding/gob" "log" "time" + + "github.com/purpleidea/mgmt/event" ) func init() { @@ -65,7 +67,7 @@ func (obj *TimerRes) Validate() bool { } // Watch is the primary listener for this resource and it outputs events. -func (obj *TimerRes) Watch(processChan chan Event) error { +func (obj *TimerRes) Watch(processChan chan event.Event) error { if obj.IsWatching() { return nil } @@ -90,7 +92,7 @@ func (obj *TimerRes) Watch(processChan chan Event) error { var send = false for { - obj.SetState(resStateWatching) + obj.SetState(ResStateWatching) select { case <-ticker.C: // received the timer event send = true diff --git a/misc.go b/util/util.go similarity index 99% rename from misc.go rename to util/util.go index 92ea3b5c..42162c75 100644 --- a/misc.go +++ b/util/util.go @@ -15,14 +15,16 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package main +// Package util contains a collection of miscellaneous utility functions. +package util import ( - "github.com/godbus/dbus" "path" "sort" "strings" "time" + + "github.com/godbus/dbus" ) // FirstToUpper returns the string with the first character capitalized. diff --git a/misc_test.go b/util/util_test.go similarity index 98% rename from misc_test.go rename to util/util_test.go index bf6b9d5d..0a003c32 100644 --- a/misc_test.go +++ b/util/util_test.go @@ -15,7 +15,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package main +package util import ( "reflect" @@ -23,7 +23,7 @@ import ( "testing" ) -func TestMiscT1(t *testing.T) { +func TestUtilT1(t *testing.T) { if Dirname("/foo/bar/baz") != "/foo/bar/" { t.Errorf("Result is incorrect.") @@ -62,7 +62,7 @@ func TestMiscT1(t *testing.T) { } } -func TestMiscT2(t *testing.T) { +func TestUtilT2(t *testing.T) { // TODO: compare the output with the actual list p0 := "/" @@ -86,7 +86,7 @@ func TestMiscT2(t *testing.T) { } } -func TestMiscT3(t *testing.T) { +func TestUtilT3(t *testing.T) { if HasPathPrefix("/foo/bar/baz", "/foo/ba") != false { t.Errorf("Result should be false.") @@ -117,7 +117,7 @@ func TestMiscT3(t *testing.T) { } } -func TestMiscT4(t *testing.T) { +func TestUtilT4(t *testing.T) { if PathPrefixDelta("/foo/bar/baz", "/foo/ba") != -1 { t.Errorf("Result should be -1.") @@ -152,7 +152,7 @@ func TestMiscT4(t *testing.T) { } } -func TestMiscT8(t *testing.T) { +func TestUtilT8(t *testing.T) { r0 := []string{"/"} if fullList0 := PathSplitFullReversed("/"); !reflect.DeepEqual(r0, fullList0) { @@ -171,7 +171,7 @@ func TestMiscT8(t *testing.T) { } -func TestMiscT9(t *testing.T) { +func TestUtilT9(t *testing.T) { fileListIn := []string{ // list taken from drbd-utils package "/etc/drbd.conf", "/etc/drbd.d/global_common.conf", @@ -315,7 +315,7 @@ func TestMiscT9(t *testing.T) { } } -func TestMiscT10(t *testing.T) { +func TestUtilT10(t *testing.T) { fileListIn := []string{ // fake package list "/etc/drbd.conf", "/usr/share/man/man8/drbdsetup.8.gz", @@ -351,7 +351,7 @@ func TestMiscT10(t *testing.T) { } } -func TestMiscT11(t *testing.T) { +func TestUtilT11(t *testing.T) { in1 := []string{"/", "/usr/", "/usr/lib/", "/usr/share/"} // input ex1 := []string{"/usr/lib/", "/usr/share/"} // expected sort.Strings(ex1) @@ -724,7 +724,7 @@ func TestMiscT11(t *testing.T) { } } -func TestMiscFlattenListWithSplit1(t *testing.T) { +func TestUtilFlattenListWithSplit1(t *testing.T) { { in := []string{} // input ex := []string{} // expected