golang: Split things into packages

This makes this logically more separate! :) As an aside...

I really hate the way golang does dependencies and packages. Yes, some
people insist on nesting their code deep into a $GOPATH, which is fine
if you're a google dev and are forced to work this way, but annoying for
the rest of the world. Your code shouldn't need a git commit to switch
to a a different vcs host! Gah I hate this so much.
This commit is contained in:
James Shubin
2016-09-20 06:33:13 -04:00
parent 361d643ce7
commit 63f21952f4
26 changed files with 1213 additions and 1041 deletions

566
config.go
View File

@@ -1,566 +0,0 @@
// Mgmt
// Copyright (C) 2013-2016+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main
import (
"errors"
"fmt"
"gopkg.in/yaml.v2"
"io/ioutil"
"log"
"reflect"
"strings"
)
type collectorResConfig struct {
Kind string `yaml:"kind"`
Pattern string `yaml:"pattern"` // XXX: Not Implemented
}
type vertexConfig struct {
Kind string `yaml:"kind"`
Name string `yaml:"name"`
}
type edgeConfig struct {
Name string `yaml:"name"`
From vertexConfig `yaml:"from"`
To vertexConfig `yaml:"to"`
}
// GraphConfig is the data structure that describes a single graph to run.
type GraphConfig struct {
Graph string `yaml:"graph"`
Resources struct {
Noop []*NoopRes `yaml:"noop"`
Pkg []*PkgRes `yaml:"pkg"`
File []*FileRes `yaml:"file"`
Svc []*SvcRes `yaml:"svc"`
Exec []*ExecRes `yaml:"exec"`
Timer []*TimerRes `yaml:"timer"`
} `yaml:"resources"`
Collector []collectorResConfig `yaml:"collect"`
Edges []edgeConfig `yaml:"edges"`
Comment string `yaml:"comment"`
Hostname string `yaml:"hostname"` // uuid for the host
Remote string `yaml:"remote"`
}
// Parse parses a data stream into the graph structure.
func (c *GraphConfig) Parse(data []byte) error {
if err := yaml.Unmarshal(data, c); err != nil {
return err
}
if c.Graph == "" {
return errors.New("Graph config: invalid `graph`")
}
return nil
}
// ParseConfigFromFile takes a filename and returns the graph config structure.
func ParseConfigFromFile(filename string) *GraphConfig {
data, err := ioutil.ReadFile(filename)
if err != nil {
log.Printf("Config: Error: ParseConfigFromFile: File: %v", err)
return nil
}
var config GraphConfig
if err := config.Parse(data); err != nil {
log.Printf("Config: Error: ParseConfigFromFile: Parse: %v", err)
return nil
}
return &config
}
// NewGraphFromConfig returns a new graph from existing input, such as from the
// existing graph, and a GraphConfig struct.
func (g *Graph) NewGraphFromConfig(config *GraphConfig, embdEtcd *EmbdEtcd, noop bool) (*Graph, error) {
if config.Hostname == "" {
return nil, fmt.Errorf("Config: Error: Hostname can't be empty!")
}
var graph *Graph // new graph to return
if g == nil { // FIXME: how can we check for an empty graph?
graph = NewGraph("Graph") // give graph a default name
} else {
graph = g.Copy() // same vertices, since they're pointers!
}
var lookup = make(map[string]map[string]*Vertex)
//log.Printf("%+v", config) // debug
// TODO: if defined (somehow)...
graph.SetName(config.Graph) // set graph name
var keep []*Vertex // list of vertex which are the same in new graph
var resources []Res // list of resources to export
// use reflection to avoid duplicating code... better options welcome!
value := reflect.Indirect(reflect.ValueOf(config.Resources))
vtype := value.Type()
for i := 0; i < vtype.NumField(); i++ { // number of fields in struct
name := vtype.Field(i).Name // string of field name
field := value.FieldByName(name)
iface := field.Interface() // interface type of value
slice := reflect.ValueOf(iface)
// XXX: should we just drop these everywhere and have the kind strings be all lowercase?
kind := FirstToUpper(name)
if DEBUG {
log.Printf("Config: Processing: %v...", kind)
}
for j := 0; j < slice.Len(); j++ { // loop through resources of same kind
x := slice.Index(j).Interface()
res, ok := x.(Res) // convert to Res type
if !ok {
return nil, fmt.Errorf("Config: Error: Can't convert: %v of type: %T to Res.", x, x)
}
if noop {
res.Meta().Noop = noop
}
if _, exists := lookup[kind]; !exists {
lookup[kind] = make(map[string]*Vertex)
}
// XXX: should we export based on a @@ prefix, or a metaparam
// like exported => true || exported => (host pattern)||(other pattern?)
if !strings.HasPrefix(res.GetName(), "@@") { // not exported resource
// XXX: we don't have a way of knowing if any of the
// metaparams are undefined, and as a result to set the
// defaults that we want! I hate the go yaml parser!!!
v := graph.GetVertexMatch(res)
if v == nil { // no match found
res.Init()
v = NewVertex(res)
graph.AddVertex(v) // call standalone in case not part of an edge
}
lookup[kind][res.GetName()] = v // used for constructing edges
keep = append(keep, v) // append
} else if !noop { // do not export any resources if noop
// store for addition to etcd storage...
res.SetName(res.GetName()[2:]) //slice off @@
res.setKind(kind) // cheap init
resources = append(resources, res)
}
}
}
// store in etcd
if err := EtcdSetResources(embdEtcd, config.Hostname, resources); err != nil {
return nil, fmt.Errorf("Config: Could not export resources: %v", err)
}
// lookup from etcd
var hostnameFilter []string // empty to get from everyone
kindFilter := []string{}
for _, t := range config.Collector {
// XXX: should we just drop these everywhere and have the kind strings be all lowercase?
kind := FirstToUpper(t.Kind)
kindFilter = append(kindFilter, kind)
}
// do all the graph look ups in one single step, so that if the etcd
// database changes, we don't have a partial state of affairs...
if len(kindFilter) > 0 { // if kindFilter is empty, don't need to do lookups!
var err error
resources, err = EtcdGetResources(embdEtcd, hostnameFilter, kindFilter)
if err != nil {
return nil, fmt.Errorf("Config: Could not collect resources: %v", err)
}
}
for _, res := range resources {
matched := false
// see if we find a collect pattern that matches
for _, t := range config.Collector {
// XXX: should we just drop these everywhere and have the kind strings be all lowercase?
kind := FirstToUpper(t.Kind)
// use t.Kind and optionally t.Pattern to collect from etcd storage
log.Printf("Collect: %v; Pattern: %v", kind, t.Pattern)
// XXX: expand to more complex pattern matching here...
if res.Kind() != kind {
continue
}
if matched {
// we've already matched this resource, should we match again?
log.Printf("Config: Warning: Matching %v[%v] again!", kind, res.GetName())
}
matched = true
// collect resources but add the noop metaparam
if noop {
res.Meta().Noop = noop
}
if t.Pattern != "" { // XXX: simplistic for now
res.CollectPattern(t.Pattern) // res.Dirname = t.Pattern
}
log.Printf("Collect: %v[%v]: collected!", kind, res.GetName())
// XXX: similar to other resource add code:
if _, exists := lookup[kind]; !exists {
lookup[kind] = make(map[string]*Vertex)
}
v := graph.GetVertexMatch(res)
if v == nil { // no match found
res.Init() // initialize go channels or things won't work!!!
v = NewVertex(res)
graph.AddVertex(v) // call standalone in case not part of an edge
}
lookup[kind][res.GetName()] = v // used for constructing edges
keep = append(keep, v) // append
//break // let's see if another resource even matches
}
}
// get rid of any vertices we shouldn't "keep" (that aren't in new graph)
for _, v := range graph.GetVertices() {
if !VertexContains(v, keep) {
// wait for exit before starting new graph!
v.SendEvent(eventExit, true, false)
graph.DeleteVertex(v)
}
}
for _, e := range config.Edges {
if _, ok := lookup[FirstToUpper(e.From.Kind)]; !ok {
return nil, fmt.Errorf("Can't find 'from' resource!")
}
if _, ok := lookup[FirstToUpper(e.To.Kind)]; !ok {
return nil, fmt.Errorf("Can't find 'to' resource!")
}
if _, ok := lookup[FirstToUpper(e.From.Kind)][e.From.Name]; !ok {
return nil, fmt.Errorf("Can't find 'from' name!")
}
if _, ok := lookup[FirstToUpper(e.To.Kind)][e.To.Name]; !ok {
return nil, fmt.Errorf("Can't find 'to' name!")
}
graph.AddEdge(lookup[FirstToUpper(e.From.Kind)][e.From.Name], lookup[FirstToUpper(e.To.Kind)][e.To.Name], NewEdge(e.Name))
}
return graph, nil
}
// add edges to the vertex in a graph based on if it matches a uuid list
func (g *Graph) addEdgesByMatchingUUIDS(v *Vertex, uuids []ResUUID) []bool {
// search for edges and see what matches!
var result []bool
// loop through each uuid, and see if it matches any vertex
for _, uuid := range uuids {
var found = false
// uuid is a ResUUID object
for _, vv := range g.GetVertices() { // search
if v == vv { // skip self
continue
}
if DEBUG {
log.Printf("Compile: AutoEdge: Match: %v[%v] with UUID: %v[%v]", vv.Kind(), vv.GetName(), uuid.Kind(), uuid.GetName())
}
// we must match to an effective UUID for the resource,
// that is to say, the name value of a res is a helpful
// handle, but it is not necessarily a unique identity!
// remember, resources can return multiple UUID's each!
if UUIDExistsInUUIDs(uuid, vv.GetUUIDs()) {
// add edge from: vv -> v
if uuid.Reversed() {
txt := fmt.Sprintf("AutoEdge: %v[%v] -> %v[%v]", vv.Kind(), vv.GetName(), v.Kind(), v.GetName())
log.Printf("Compile: Adding %v", txt)
g.AddEdge(vv, v, NewEdge(txt))
} else { // edges go the "normal" way, eg: pkg resource
txt := fmt.Sprintf("AutoEdge: %v[%v] -> %v[%v]", v.Kind(), v.GetName(), vv.Kind(), vv.GetName())
log.Printf("Compile: Adding %v", txt)
g.AddEdge(v, vv, NewEdge(txt))
}
found = true
break
}
}
result = append(result, found)
}
return result
}
// AutoEdges adds the automatic edges to the graph.
func (g *Graph) AutoEdges() {
log.Println("Compile: Adding AutoEdges...")
for _, v := range g.GetVertices() { // for each vertexes autoedges
if !v.Meta().AutoEdge { // is the metaparam true?
continue
}
autoEdgeObj := v.AutoEdges()
if autoEdgeObj == nil {
log.Printf("%v[%v]: Config: No auto edges were found!", v.Kind(), v.GetName())
continue // next vertex
}
for { // while the autoEdgeObj has more uuids to add...
uuids := autoEdgeObj.Next() // get some!
if uuids == nil {
log.Printf("%v[%v]: Config: The auto edge list is empty!", v.Kind(), v.GetName())
break // inner loop
}
if DEBUG {
log.Println("Compile: AutoEdge: UUIDS:")
for i, u := range uuids {
log.Printf("Compile: AutoEdge: UUID%d: %v", i, u)
}
}
// match and add edges
result := g.addEdgesByMatchingUUIDS(v, uuids)
// report back, and find out if we should continue
if !autoEdgeObj.Test(result) {
break
}
}
}
}
// AutoGrouper is the required interface to implement for an autogroup algorithm
type AutoGrouper interface {
// listed in the order these are typically called in...
name() string // friendly identifier
init(*Graph) error // only call once
vertexNext() (*Vertex, *Vertex, error) // mostly algorithmic
vertexCmp(*Vertex, *Vertex) error // can we merge these ?
vertexMerge(*Vertex, *Vertex) (*Vertex, error) // vertex merge fn to use
edgeMerge(*Edge, *Edge) *Edge // edge merge fn to use
vertexTest(bool) (bool, error) // call until false
}
// baseGrouper is the base type for implementing the AutoGrouper interface
type baseGrouper struct {
graph *Graph // store a pointer to the graph
vertices []*Vertex // cached list of vertices
i int
j int
done bool
}
// name provides a friendly name for the logs to see
func (ag *baseGrouper) name() string {
return "baseGrouper"
}
// init is called only once and before using other AutoGrouper interface methods
// the name method is the only exception: call it any time without side effects!
func (ag *baseGrouper) init(g *Graph) error {
if ag.graph != nil {
return fmt.Errorf("The init method has already been called!")
}
ag.graph = g // pointer
ag.vertices = ag.graph.GetVerticesSorted() // cache in deterministic order!
ag.i = 0
ag.j = 0
if len(ag.vertices) == 0 { // empty graph
ag.done = true
return nil
}
return nil
}
// vertexNext is a simple iterator that loops through vertex (pair) combinations
// an intelligent algorithm would selectively offer only valid pairs of vertices
// these should satisfy logical grouping requirements for the autogroup designs!
// the desired algorithms can override, but keep this method as a base iterator!
func (ag *baseGrouper) vertexNext() (v1, v2 *Vertex, err error) {
// this does a for v... { for w... { return v, w }} but stepwise!
l := len(ag.vertices)
if ag.i < l {
v1 = ag.vertices[ag.i]
}
if ag.j < l {
v2 = ag.vertices[ag.j]
}
// in case the vertex was deleted
if !ag.graph.HasVertex(v1) {
v1 = nil
}
if !ag.graph.HasVertex(v2) {
v2 = nil
}
// two nested loops...
if ag.j < l {
ag.j++
}
if ag.j == l {
ag.j = 0
if ag.i < l {
ag.i++
}
if ag.i == l {
ag.done = true
}
}
return
}
func (ag *baseGrouper) vertexCmp(v1, v2 *Vertex) error {
if v1 == nil || v2 == nil {
return fmt.Errorf("Vertex is nil!")
}
if v1 == v2 { // skip yourself
return fmt.Errorf("Vertices are the same!")
}
if v1.Kind() != v2.Kind() { // we must group similar kinds
// TODO: maybe future resources won't need this limitation?
return fmt.Errorf("The two resources aren't the same kind!")
}
// someone doesn't want to group!
if !v1.Meta().AutoGroup || !v2.Meta().AutoGroup {
return fmt.Errorf("One of the autogroup flags is false!")
}
if v1.Res.IsGrouped() { // already grouped!
return fmt.Errorf("Already grouped!")
}
if len(v2.Res.GetGroup()) > 0 { // already has children grouped!
return fmt.Errorf("Already has groups!")
}
if !v1.Res.GroupCmp(v2.Res) { // resource groupcmp failed!
return fmt.Errorf("The GroupCmp failed!")
}
return nil // success
}
func (ag *baseGrouper) vertexMerge(v1, v2 *Vertex) (v *Vertex, err error) {
// NOTE: it's important to use w.Res instead of w, b/c
// the w by itself is the *Vertex obj, not the *Res obj
// which is contained within it! They both satisfy the
// Res interface, which is why both will compile! :(
err = v1.Res.GroupRes(v2.Res) // GroupRes skips stupid groupings
return // success or fail, and no need to merge the actual vertices!
}
func (ag *baseGrouper) edgeMerge(e1, e2 *Edge) *Edge {
return e1 // noop
}
// vertexTest processes the results of the grouping for the algorithm to know
// return an error if something went horribly wrong, and bool false to stop
func (ag *baseGrouper) vertexTest(b bool) (bool, error) {
// NOTE: this particular baseGrouper version doesn't track what happens
// because since we iterate over every pair, we don't care which merge!
if ag.done {
return false, nil
}
return true, nil
}
// TODO: this algorithm may not be correct in all cases. replace if needed!
type nonReachabilityGrouper struct {
baseGrouper // "inherit" what we want, and reimplement the rest
}
func (ag *nonReachabilityGrouper) name() string {
return "nonReachabilityGrouper"
}
// this algorithm relies on the observation that if there's a path from a to b,
// then they *can't* be merged (b/c of the existing dependency) so therefore we
// merge anything that *doesn't* satisfy this condition or that of the reverse!
func (ag *nonReachabilityGrouper) vertexNext() (v1, v2 *Vertex, err error) {
for {
v1, v2, err = ag.baseGrouper.vertexNext() // get all iterable pairs
if err != nil {
log.Fatalf("Error running autoGroup(vertexNext): %v", err)
}
if v1 != v2 { // ignore self cmp early (perf optimization)
// if NOT reachable, they're viable...
out1 := ag.graph.Reachability(v1, v2)
out2 := ag.graph.Reachability(v2, v1)
if len(out1) == 0 && len(out2) == 0 {
return // return v1 and v2, they're viable
}
}
// if we got here, it means we're skipping over this candidate!
if ok, err := ag.baseGrouper.vertexTest(false); err != nil {
log.Fatalf("Error running autoGroup(vertexTest): %v", err)
} else if !ok {
return nil, nil, nil // done!
}
// the vertexTest passed, so loop and try with a new pair...
}
}
// autoGroup is the mechanical auto group "runner" that runs the interface spec
func (g *Graph) autoGroup(ag AutoGrouper) chan string {
strch := make(chan string) // output log messages here
go func(strch chan string) {
strch <- fmt.Sprintf("Compile: Grouping: Algorithm: %v...", ag.name())
if err := ag.init(g); err != nil {
log.Fatalf("Error running autoGroup(init): %v", err)
}
for {
var v, w *Vertex
v, w, err := ag.vertexNext() // get pair to compare
if err != nil {
log.Fatalf("Error running autoGroup(vertexNext): %v", err)
}
merged := false
// save names since they change during the runs
vStr := fmt.Sprintf("%s", v) // valid even if it is nil
wStr := fmt.Sprintf("%s", w)
if err := ag.vertexCmp(v, w); err != nil { // cmp ?
if DEBUG {
strch <- fmt.Sprintf("Compile: Grouping: !GroupCmp for: %s into %s", wStr, vStr)
}
// remove grouped vertex and merge edges (res is safe)
} else if err := g.VertexMerge(v, w, ag.vertexMerge, ag.edgeMerge); err != nil { // merge...
strch <- fmt.Sprintf("Compile: Grouping: !VertexMerge for: %s into %s", wStr, vStr)
} else { // success!
strch <- fmt.Sprintf("Compile: Grouping: Success for: %s into %s", wStr, vStr)
merged = true // woo
}
// did these get used?
if ok, err := ag.vertexTest(merged); err != nil {
log.Fatalf("Error running autoGroup(vertexTest): %v", err)
} else if !ok {
break // done!
}
}
close(strch)
return
}(strch) // call function
return strch
}
// AutoGroup runs the auto grouping on the graph and prints out log messages
func (g *Graph) AutoGroup() {
// receive log messages from channel...
// this allows test cases to avoid printing them when they're unwanted!
// TODO: this algorithm may not be correct in all cases. replace if needed!
for str := range g.autoGroup(&nonReachabilityGrouper{}) {
log.Println(str)
}
}

View File

@@ -18,14 +18,18 @@
package main
import (
"gopkg.in/fsnotify.v1"
//"github.com/go-fsnotify/fsnotify" // git master of "gopkg.in/fsnotify.v1"
"log"
"math"
"path"
"strings"
"sync"
"syscall"
"github.com/purpleidea/mgmt/global"
"github.com/purpleidea/mgmt/util"
"gopkg.in/fsnotify.v1"
//"github.com/go-fsnotify/fsnotify" // git master of "gopkg.in/fsnotify.v1"
)
// ConfigWatcher returns events on a channel anytime one of its files events.
@@ -105,7 +109,7 @@ func ConfigWatch(file string) chan bool {
}
defer watcher.Close()
patharray := PathSplit(safename) // tokenize the path
patharray := util.PathSplit(safename) // tokenize the path
var index = len(patharray) // starting index
var current string // current "watcher" location
var deltaDepth int // depth delta between watcher and event
@@ -116,7 +120,7 @@ func ConfigWatch(file string) chan bool {
if current == "" { // the empty string top is the root dir ("/")
current = "/"
}
if DEBUG {
if global.DEBUG {
log.Printf("Watching: %v", current) // attempting to watch...
}
// initialize in the loop so that we can reset on rm-ed handles
@@ -145,11 +149,11 @@ func ConfigWatch(file string) chan bool {
if current == event.Name {
deltaDepth = 0 // i was watching what i was looking for
} else if HasPathPrefix(event.Name, current) {
deltaDepth = len(PathSplit(current)) - len(PathSplit(event.Name)) // -1 or less
} else if util.HasPathPrefix(event.Name, current) {
deltaDepth = len(util.PathSplit(current)) - len(util.PathSplit(event.Name)) // -1 or less
} else if HasPathPrefix(current, event.Name) {
deltaDepth = len(PathSplit(event.Name)) - len(PathSplit(current)) // +1 or more
} else if util.HasPathPrefix(current, event.Name) {
deltaDepth = len(util.PathSplit(event.Name)) - len(util.PathSplit(current)) // +1 or more
} else {
// TODO different watchers get each others events!
@@ -182,7 +186,7 @@ func ConfigWatch(file string) chan bool {
}
// if safename starts with event.Name, we're above, and no event should be sent
} else if HasPathPrefix(safename, event.Name) {
} else if util.HasPathPrefix(safename, event.Name) {
//log.Println("Above!")
if deltaDepth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) {
@@ -193,7 +197,7 @@ func ConfigWatch(file string) chan bool {
if deltaDepth < 0 {
log.Println("Parent!")
if PathPrefixDelta(safename, event.Name) == 1 { // we're the parent dir
if util.PathPrefixDelta(safename, event.Name) == 1 { // we're the parent dir
//send = true
}
watcher.Remove(current)
@@ -201,7 +205,7 @@ func ConfigWatch(file string) chan bool {
}
// if event.Name startswith safename, send event, we're already deeper
} else if HasPathPrefix(event.Name, safename) {
} else if util.HasPathPrefix(event.Name, safename) {
//log.Println("Event2!")
//send = true
}

View File

@@ -15,12 +15,15 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main
// Package converger is a facility for reporting the converged state.
package converger
import (
"fmt"
"sync"
"time"
"github.com/purpleidea/mgmt/util"
)
// TODO: we could make a new function that masks out the state of certain
@@ -248,9 +251,9 @@ func (obj *converger) ConvergedTimer(uuid ConvergerUUID) <-chan time.Time {
// we have a low timeout, or in particular a timeout == 0
if uuid.IsConverged() {
// blocks the case statement in select forever!
return TimeAfterOrBlock(-1)
return util.TimeAfterOrBlock(-1)
}
return TimeAfterOrBlock(obj.timeout)
return util.TimeAfterOrBlock(obj.timeout)
}
// Status returns a map of the converged status of each UUID.

19
doc.go Normal file
View File

@@ -0,0 +1,19 @@
// Mgmt
// Copyright (C) 2013-2016+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// Package main provides the main entrypoint for using the `mgmt` software.
package main

View File

@@ -24,6 +24,17 @@
// TODO: Auto assign ports/ip's for peers (if possible)
// TODO: Fix godoc
// Package etcd implements the distributed key value store integration.
// This also takes care of managing and clustering the embedded etcd server.
// The elastic etcd algorithm works in the following way:
// * When you start up mgmt, you can pass it a list of seeds.
// * If no seeds are given, then assume you are the first server and startup.
// * If a seed is given, connect as a client, and optionally volunteer to be a server.
// * All volunteering clients should listen for a message from the master for nomination.
// * If a client has been nominated, it should startup a server.
// * All servers should list for their nomination to be removed and shutdown if so.
// * The elected leader should decide who to nominate/unnominate to keep the right number of servers.
//
// Smoke testing:
// ./mgmt run --file examples/etcd1a.yaml --hostname h1
// ./mgmt run --file examples/etcd1b.yaml --hostname h2 --tmp-prefix --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2381 --server-urls http://127.0.0.1:2382
@@ -33,16 +44,7 @@
// ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 member list
// ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 put /_mgmt/idealClusterSize 5
// ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 member list
// The elastic etcd algorithm works in the following way:
// * When you start up mgmt, you can pass it a list of seeds.
// * If no seeds are given, then assume you are the first server and startup.
// * If a seed is given, connect as a client, and optionally volunteer to be a server.
// * All volunteering clients should listen for a message from the master for nomination.
// * If a client has been nominated, it should startup a server.
// * All servers should list for their nomination to be removed and shutdown if so.
// * The elected leader should decide who to nominate/unnominate to keep the right number of servers.
package main
package etcd
import (
"bytes"
@@ -59,6 +61,12 @@ import (
"sync"
"time"
"github.com/purpleidea/mgmt/converger"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/global"
"github.com/purpleidea/mgmt/resources"
"github.com/purpleidea/mgmt/util"
etcd "github.com/coreos/etcd/clientv3" // "clientv3"
"github.com/coreos/etcd/embed"
"github.com/coreos/etcd/etcdserver"
@@ -78,7 +86,7 @@ const (
maxClientConnectRetries = 5 // number of times to retry consecutive connect failures
selfRemoveTimeout = 3 // give unnominated members a chance to self exit
exitDelay = 3 // number of sec of inactivity after exit to clean up
defaultIdealClusterSize = 5 // default ideal cluster size target for initial seed
DefaultIdealClusterSize = 5 // default ideal cluster size target for initial seed
DefaultClientURL = "127.0.0.1:2379"
DefaultServerURL = "127.0.0.1:2380"
)
@@ -94,7 +102,7 @@ type AW struct {
callback func(*RE) error
errCheck bool
skipConv bool // ask event to skip converger updates
resp Resp
resp event.Resp
cancelFunc func() // data
}
@@ -116,7 +124,7 @@ type KV struct {
key string
value string
opts []etcd.OpOption
resp Resp
resp event.Resp
}
// GQ is a struct for the get queue
@@ -124,7 +132,7 @@ type GQ struct {
path string
skipConv bool
opts []etcd.OpOption
resp Resp
resp event.Resp
data map[string]string
}
@@ -132,7 +140,7 @@ type GQ struct {
type DL struct {
path string
opts []etcd.OpOption
resp Resp
resp event.Resp
data int64
}
@@ -141,7 +149,7 @@ type TN struct {
ifcmps []etcd.Cmp
thenops []etcd.Op
elseops []etcd.Op
resp Resp
resp event.Resp
data *etcd.TxnResponse
}
@@ -182,7 +190,7 @@ type EmbdEtcd struct { // EMBeddeD etcd
txnq chan *TN // txn queue
prefix string // folder prefix to use for misc storage
converger Converger // converged tracking
converger converger.Converger // converged tracking
// etcd server related
serverwg sync.WaitGroup // wait for server to shutdown
@@ -191,7 +199,7 @@ type EmbdEtcd struct { // EMBeddeD etcd
}
// NewEmbdEtcd creates the top level embedded etcd struct client and server obj
func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs etcdtypes.URLs, noServer bool, idealClusterSize uint16, prefix string, converger Converger) *EmbdEtcd {
func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs etcdtypes.URLs, noServer bool, idealClusterSize uint16, prefix string, converger converger.Converger) *EmbdEtcd {
endpoints := make(etcdtypes.URLsMap)
if hostname == seedSentinel { // safety
return nil
@@ -264,7 +272,7 @@ func (obj *EmbdEtcd) GetConfig() etcd.Config {
// Connect connects the client to a server, and then builds the *API structs.
// If reconnect is true, it will force a reconnect with new config endpoints.
func (obj *EmbdEtcd) Connect(reconnect bool) error {
if DEBUG {
if global.DEBUG {
log.Println("Etcd: Connect...")
}
obj.cLock.Lock()
@@ -520,29 +528,29 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context,
var isTimeout = false
var iter int // = 0
if ctxerr, ok := ctx.Value(ctxErr).(error); ok {
if DEBUG {
if global.DEBUG {
log.Printf("Etcd: CtxError: err(%v), ctxerr(%v)", err, ctxerr)
}
if i, ok := ctx.Value(ctxIter).(int); ok {
iter = i + 1 // load and increment
if DEBUG {
if global.DEBUG {
log.Printf("Etcd: CtxError: Iter: %v", iter)
}
}
isTimeout = err == context.DeadlineExceeded
if DEBUG {
if global.DEBUG {
log.Printf("Etcd: CtxError: isTimeout: %v", isTimeout)
}
if !isTimeout {
iter = 0 // reset timer
}
err = ctxerr // restore error
} else if DEBUG {
} else if global.DEBUG {
log.Printf("Etcd: CtxError: No value found")
}
ctxHelper := func(tmin, texp, tmax int) context.Context {
t := expBackoff(tmin, texp, iter, tmax)
if DEBUG {
if global.DEBUG {
log.Printf("Etcd: CtxError: Timeout: %v", t)
}
@@ -629,13 +637,13 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context,
fallthrough
case isGrpc(grpc.ErrClientConnClosing):
if DEBUG {
if global.DEBUG {
log.Printf("Etcd: CtxError: Error(%T): %+v", err, err)
log.Printf("Etcd: Endpoints are: %v", obj.client.Endpoints())
log.Printf("Etcd: Client endpoints are: %v", obj.endpoints)
}
if DEBUG {
if global.DEBUG {
log.Printf("Etcd: CtxError: Locking...")
}
obj.rLock.Lock()
@@ -656,7 +664,7 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context,
obj.ctxErr = fmt.Errorf("Etcd: Permanent connect error: %v", err)
return ctx, obj.ctxErr
}
if DEBUG {
if global.DEBUG {
log.Printf("Etcd: CtxError: Unlocking...")
}
obj.rLock.Unlock()
@@ -700,7 +708,7 @@ func (obj *EmbdEtcd) CbLoop() {
if !re.skipConv { // if we want to count it...
cuuid.ResetTimer() // activity!
}
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: CbLoop: Event: StartLoop")
}
for {
@@ -708,11 +716,11 @@ func (obj *EmbdEtcd) CbLoop() {
//re.resp.NACK() // nope!
break
}
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: CbLoop: rawCallback()")
}
err := rawCallback(ctx, re)
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: CbLoop: rawCallback(): %v", err)
}
if err == nil {
@@ -724,7 +732,7 @@ func (obj *EmbdEtcd) CbLoop() {
break // TODO: it's bad, break or return?
}
}
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: CbLoop: Event: FinishLoop")
}
@@ -752,11 +760,11 @@ func (obj *EmbdEtcd) Loop() {
select {
case aw := <-obj.awq:
cuuid.ResetTimer() // activity!
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: Loop: PriorityAW: StartLoop")
}
obj.loopProcessAW(ctx, aw)
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: Loop: PriorityAW: FinishLoop")
}
continue // loop to drain the priority channel first!
@@ -768,18 +776,18 @@ func (obj *EmbdEtcd) Loop() {
// add watcher
case aw := <-obj.awq:
cuuid.ResetTimer() // activity!
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: Loop: AW: StartLoop")
}
obj.loopProcessAW(ctx, aw)
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: Loop: AW: FinishLoop")
}
// set kv pair
case kv := <-obj.setq:
cuuid.ResetTimer() // activity!
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: Loop: Set: StartLoop")
}
for {
@@ -796,7 +804,7 @@ func (obj *EmbdEtcd) Loop() {
break // TODO: it's bad, break or return?
}
}
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: Loop: Set: FinishLoop")
}
@@ -805,7 +813,7 @@ func (obj *EmbdEtcd) Loop() {
if !gq.skipConv {
cuuid.ResetTimer() // activity!
}
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: Loop: Get: StartLoop")
}
for {
@@ -823,14 +831,14 @@ func (obj *EmbdEtcd) Loop() {
break // TODO: it's bad, break or return?
}
}
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: Loop: Get: FinishLoop")
}
// delete value
case dl := <-obj.delq:
cuuid.ResetTimer() // activity!
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: Loop: Delete: StartLoop")
}
for {
@@ -848,14 +856,14 @@ func (obj *EmbdEtcd) Loop() {
break // TODO: it's bad, break or return?
}
}
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: Loop: Delete: FinishLoop")
}
// run txn
case tn := <-obj.txnq:
cuuid.ResetTimer() // activity!
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: Loop: Txn: StartLoop")
}
for {
@@ -873,7 +881,7 @@ func (obj *EmbdEtcd) Loop() {
break // TODO: it's bad, break or return?
}
}
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: Loop: Txn: FinishLoop")
}
@@ -884,7 +892,7 @@ func (obj *EmbdEtcd) Loop() {
// seconds of inactivity in this select switch, which
// lets everything get bled dry to avoid blocking calls
// which would otherwise block us from exiting cleanly!
obj.exitTimeout = TimeAfterOrBlock(exitDelay)
obj.exitTimeout = util.TimeAfterOrBlock(exitDelay)
// exit loop commit
case <-obj.exitTimeout:
@@ -917,7 +925,7 @@ func (obj *EmbdEtcd) loopProcessAW(ctx context.Context, aw *AW) {
// Set queues up a set operation to occur using our mainloop
func (obj *EmbdEtcd) Set(key, value string, opts ...etcd.OpOption) error {
resp := NewResp()
resp := event.NewResp()
obj.setq <- &KV{key: key, value: value, opts: opts, resp: resp}
if err := resp.Wait(); err != nil { // wait for ack/nack
return fmt.Errorf("Etcd: Set: Probably received an exit: %v", err)
@@ -927,7 +935,7 @@ func (obj *EmbdEtcd) Set(key, value string, opts ...etcd.OpOption) error {
// rawSet actually implements the key set operation
func (obj *EmbdEtcd) rawSet(ctx context.Context, kv *KV) error {
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: rawSet()")
}
// key is the full key path
@@ -936,7 +944,7 @@ func (obj *EmbdEtcd) rawSet(ctx context.Context, kv *KV) error {
response, err := obj.client.KV.Put(ctx, kv.key, kv.value, kv.opts...)
obj.rLock.RUnlock()
log.Printf("Etcd: Set(%s): %v", kv.key, response) // w00t... bonus
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: rawSet(): %v", err)
}
return err
@@ -951,7 +959,7 @@ func (obj *EmbdEtcd) Get(path string, opts ...etcd.OpOption) (map[string]string,
// accept more arguments that are useful for the less common operations.
// TODO: perhaps a get should never cause an un-converge ?
func (obj *EmbdEtcd) ComplexGet(path string, skipConv bool, opts ...etcd.OpOption) (map[string]string, error) {
resp := NewResp()
resp := event.NewResp()
gq := &GQ{path: path, skipConv: skipConv, opts: opts, resp: resp, data: nil}
obj.getq <- gq // send
if err := resp.Wait(); err != nil { // wait for ack/nack
@@ -961,7 +969,7 @@ func (obj *EmbdEtcd) ComplexGet(path string, skipConv bool, opts ...etcd.OpOptio
}
func (obj *EmbdEtcd) rawGet(ctx context.Context, gq *GQ) (result map[string]string, err error) {
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: rawGet()")
}
obj.rLock.RLock()
@@ -977,7 +985,7 @@ func (obj *EmbdEtcd) rawGet(ctx context.Context, gq *GQ) (result map[string]stri
result[bytes.NewBuffer(x.Key).String()] = bytes.NewBuffer(x.Value).String()
}
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: rawGet(): %v", result)
}
return
@@ -985,7 +993,7 @@ func (obj *EmbdEtcd) rawGet(ctx context.Context, gq *GQ) (result map[string]stri
// Delete performs a delete operation and waits for an ACK to continue
func (obj *EmbdEtcd) Delete(path string, opts ...etcd.OpOption) (int64, error) {
resp := NewResp()
resp := event.NewResp()
dl := &DL{path: path, opts: opts, resp: resp, data: -1}
obj.delq <- dl // send
if err := resp.Wait(); err != nil { // wait for ack/nack
@@ -995,7 +1003,7 @@ func (obj *EmbdEtcd) Delete(path string, opts ...etcd.OpOption) (int64, error) {
}
func (obj *EmbdEtcd) rawDelete(ctx context.Context, dl *DL) (count int64, err error) {
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: rawDelete()")
}
count = -1
@@ -1005,7 +1013,7 @@ func (obj *EmbdEtcd) rawDelete(ctx context.Context, dl *DL) (count int64, err er
if err == nil {
count = response.Deleted
}
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: rawDelete(): %v", err)
}
return
@@ -1013,7 +1021,7 @@ func (obj *EmbdEtcd) rawDelete(ctx context.Context, dl *DL) (count int64, err er
// Txn performs a transaction and waits for an ACK to continue
func (obj *EmbdEtcd) Txn(ifcmps []etcd.Cmp, thenops, elseops []etcd.Op) (*etcd.TxnResponse, error) {
resp := NewResp()
resp := event.NewResp()
tn := &TN{ifcmps: ifcmps, thenops: thenops, elseops: elseops, resp: resp, data: nil}
obj.txnq <- tn // send
if err := resp.Wait(); err != nil { // wait for ack/nack
@@ -1023,13 +1031,13 @@ func (obj *EmbdEtcd) Txn(ifcmps []etcd.Cmp, thenops, elseops []etcd.Op) (*etcd.T
}
func (obj *EmbdEtcd) rawTxn(ctx context.Context, tn *TN) (*etcd.TxnResponse, error) {
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: rawTxn()")
}
obj.rLock.RLock()
response, err := obj.client.KV.Txn(ctx).If(tn.ifcmps...).Then(tn.thenops...).Else(tn.elseops...).Commit()
obj.rLock.RUnlock()
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: rawTxn(): %v, %v", response, err)
}
return response, err
@@ -1038,7 +1046,7 @@ func (obj *EmbdEtcd) rawTxn(ctx context.Context, tn *TN) (*etcd.TxnResponse, err
// AddWatcher queues up an add watcher request and returns a cancel function
// Remember to add the etcd.WithPrefix() option if you want to watch recursively
func (obj *EmbdEtcd) AddWatcher(path string, callback func(re *RE) error, errCheck bool, skipConv bool, opts ...etcd.OpOption) (func(), error) {
resp := NewResp()
resp := event.NewResp()
awq := &AW{path: path, opts: opts, callback: callback, errCheck: errCheck, skipConv: skipConv, cancelFunc: nil, resp: resp}
obj.awq <- awq // send
if err := resp.Wait(); err != nil { // wait for ack/nack
@@ -1063,7 +1071,7 @@ func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error)
err := response.Err()
isCanceled := response.Canceled || err == context.Canceled
if response.Header.Revision == 0 { // by inspection
if DEBUG {
if global.DEBUG {
log.Printf("Etcd: Watch: Received empty message!") // switched client connection
}
isCanceled = true
@@ -1084,7 +1092,7 @@ func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error)
}
locked = false
} else {
if DEBUG {
if global.DEBUG {
log.Printf("Etcd: Watch: Error: %v", err) // probably fixable
}
// this new context is the fix for a tricky set
@@ -1133,7 +1141,7 @@ func rawCallback(ctx context.Context, re *RE) error {
// NOTE: the callback must *not* block!
// FIXME: do we need to pass ctx in via *RE, or in the callback signature ?
err = callback(re) // run the callback
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: rawCallback(): %v", err)
}
if !re.errCheck || err == nil {
@@ -1151,7 +1159,7 @@ func rawCallback(ctx context.Context, re *RE) error {
// FIXME: we might need to respond to member change/disconnect/shutdown events,
// see: https://github.com/coreos/etcd/issues/5277
func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: volunteerCallback()")
defer log.Printf("Trace: Etcd: volunteerCallback(): Finished!")
}
@@ -1178,7 +1186,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
if err != nil {
return fmt.Errorf("Etcd: Members: Error: %+v", err)
}
members := StrMapValuesUint64(membersMap) // get values
members := util.StrMapValuesUint64(membersMap) // get values
log.Printf("Etcd: Members: List: %+v", members)
// we only do *one* change operation at a time so that the cluster can
@@ -1224,7 +1232,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
log.Printf("Etcd: Volunteers: %v", volunteers)
// unnominate anyone that unvolunteers, so that they can shutdown cleanly
quitters := StrFilterElementsInList(volunteers, members)
quitters := util.StrFilterElementsInList(volunteers, members)
log.Printf("Etcd: Quitters: %v", quitters)
// if we're the only member left, just shutdown...
@@ -1236,7 +1244,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
return nil
}
candidates := StrFilterElementsInList(members, volunteers)
candidates := util.StrFilterElementsInList(members, volunteers)
log.Printf("Etcd: Candidates: %v", candidates)
// TODO: switch to < 0 so that we can shut the whole cluster down with 0
@@ -1291,7 +1299,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
log.Printf("Etcd: Quitters: Shutting down %d members...", lq)
}
for _, quitter := range quitters {
mID, ok := Uint64KeyFromStrInMap(quitter, membersMap)
mID, ok := util.Uint64KeyFromStrInMap(quitter, membersMap)
if !ok {
// programming error
log.Fatalf("Etcd: Member Remove: Error: %v(%v) not in members list!", quitter, mID)
@@ -1339,7 +1347,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
// nominateCallback runs to respond to the nomination list change events
// functionally, it controls the starting and stopping of the server process
func (obj *EmbdEtcd) nominateCallback(re *RE) error {
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: nominateCallback()")
defer log.Printf("Trace: Etcd: nominateCallback(): Finished!")
}
@@ -1388,7 +1396,7 @@ func (obj *EmbdEtcd) nominateCallback(re *RE) error {
_, exists := obj.nominated[obj.hostname]
// FIXME: can we get rid of the len(obj.nominated) == 0 ?
newCluster := len(obj.nominated) == 0 || (len(obj.nominated) == 1 && exists)
if DEBUG {
if global.DEBUG {
log.Printf("Etcd: nominateCallback(): newCluster: %v; exists: %v; obj.server == nil: %t", newCluster, exists, obj.server == nil)
}
// XXX check if i have actually volunteered first of all...
@@ -1487,7 +1495,7 @@ func (obj *EmbdEtcd) nominateCallback(re *RE) error {
// endpointCallback runs to respond to the endpoint list change events
func (obj *EmbdEtcd) endpointCallback(re *RE) error {
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: endpointCallback()")
defer log.Printf("Trace: Etcd: endpointCallback(): Finished!")
}
@@ -1553,7 +1561,7 @@ func (obj *EmbdEtcd) endpointCallback(re *RE) error {
// idealClusterSizeCallback runs to respond to the ideal cluster size changes
func (obj *EmbdEtcd) idealClusterSizeCallback(re *RE) error {
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: idealClusterSizeCallback()")
defer log.Printf("Trace: Etcd: idealClusterSizeCallback(): Finished!")
}
@@ -1692,7 +1700,7 @@ func (obj *EmbdEtcd) DestroyServer() error {
// EtcdNominate nominates a particular client to be a server (peer)
func EtcdNominate(obj *EmbdEtcd, hostname string, urls etcdtypes.URLs) error {
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: EtcdNominate(%v): %v", hostname, urls.String())
defer log.Printf("Trace: Etcd: EtcdNominate(%v): Finished!", hostname)
}
@@ -1734,7 +1742,7 @@ func EtcdNominated(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
return nil, fmt.Errorf("Etcd: Nominated: Data format error!: %v", err)
}
nominated[name] = urls // add to map
if DEBUG {
if global.DEBUG {
log.Printf("Etcd: Nominated(%v): %v", name, val)
}
}
@@ -1743,7 +1751,7 @@ func EtcdNominated(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
// EtcdVolunteer offers yourself up to be a server if needed
func EtcdVolunteer(obj *EmbdEtcd, urls etcdtypes.URLs) error {
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: EtcdVolunteer(%v): %v", obj.hostname, urls.String())
defer log.Printf("Trace: Etcd: EtcdVolunteer(%v): Finished!", obj.hostname)
}
@@ -1766,7 +1774,7 @@ func EtcdVolunteer(obj *EmbdEtcd, urls etcdtypes.URLs) error {
// EtcdVolunteers returns a urls map of available etcd server volunteers
func EtcdVolunteers(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: EtcdVolunteers()")
defer log.Printf("Trace: Etcd: EtcdVolunteers(): Finished!")
}
@@ -1789,7 +1797,7 @@ func EtcdVolunteers(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
return nil, fmt.Errorf("Etcd: Volunteers: Data format error!: %v", err)
}
volunteers[name] = urls // add to map
if DEBUG {
if global.DEBUG {
log.Printf("Etcd: Volunteer(%v): %v", name, val)
}
}
@@ -1798,7 +1806,7 @@ func EtcdVolunteers(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
// EtcdAdvertiseEndpoints advertises the list of available client endpoints
func EtcdAdvertiseEndpoints(obj *EmbdEtcd, urls etcdtypes.URLs) error {
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: EtcdAdvertiseEndpoints(%v): %v", obj.hostname, urls.String())
defer log.Printf("Trace: Etcd: EtcdAdvertiseEndpoints(%v): Finished!", obj.hostname)
}
@@ -1821,7 +1829,7 @@ func EtcdAdvertiseEndpoints(obj *EmbdEtcd, urls etcdtypes.URLs) error {
// EtcdEndpoints returns a urls map of available etcd server endpoints
func EtcdEndpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: EtcdEndpoints()")
defer log.Printf("Trace: Etcd: EtcdEndpoints(): Finished!")
}
@@ -1844,7 +1852,7 @@ func EtcdEndpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
return nil, fmt.Errorf("Etcd: Endpoints: Data format error!: %v", err)
}
endpoints[name] = urls // add to map
if DEBUG {
if global.DEBUG {
log.Printf("Etcd: Endpoint(%v): %v", name, val)
}
}
@@ -1853,7 +1861,7 @@ func EtcdEndpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
// EtcdSetHostnameConverged sets whether a specific hostname is converged.
func EtcdSetHostnameConverged(obj *EmbdEtcd, hostname string, isConverged bool) error {
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: EtcdSetHostnameConverged(%s): %v", hostname, isConverged)
defer log.Printf("Trace: Etcd: EtcdSetHostnameConverged(%v): Finished!", hostname)
}
@@ -1867,7 +1875,7 @@ func EtcdSetHostnameConverged(obj *EmbdEtcd, hostname string, isConverged bool)
// EtcdHostnameConverged returns a map of every hostname's converged state.
func EtcdHostnameConverged(obj *EmbdEtcd) (map[string]bool, error) {
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: EtcdHostnameConverged()")
defer log.Printf("Trace: Etcd: EtcdHostnameConverged(): Finished!")
}
@@ -1912,7 +1920,7 @@ func EtcdAddHostnameConvergedWatcher(obj *EmbdEtcd, callbackFn func(map[string]b
// EtcdSetClusterSize sets the ideal target cluster size of etcd peers
func EtcdSetClusterSize(obj *EmbdEtcd, value uint16) error {
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: EtcdSetClusterSize(): %v", value)
defer log.Printf("Trace: Etcd: EtcdSetClusterSize(): Finished!")
}
@@ -2006,7 +2014,7 @@ func EtcdMembers(obj *EmbdEtcd) (map[uint64]string, error) {
return nil, fmt.Errorf("Exiting...")
}
obj.rLock.RLock()
if TRACE {
if global.TRACE {
log.Printf("Trace: Etcd: EtcdMembers(): Endpoints are: %v", obj.client.Endpoints())
}
response, err = obj.client.MemberList(ctx)
@@ -2100,7 +2108,7 @@ func EtcdWatch(obj *EmbdEtcd) chan bool {
}
// EtcdSetResources exports all of the resources which we pass in to etcd
func EtcdSetResources(obj *EmbdEtcd, hostname string, resources []Res) error {
func EtcdSetResources(obj *EmbdEtcd, hostname string, resourceList []resources.Res) error {
// key structure is /$NS/exported/$hostname/resources/$uuid = $data
var kindFilter []string // empty to get from everyone
@@ -2112,19 +2120,19 @@ func EtcdSetResources(obj *EmbdEtcd, hostname string, resources []Res) error {
return err
}
if len(originals) == 0 && len(resources) == 0 { // special case of no add or del
if len(originals) == 0 && len(resourceList) == 0 { // special case of no add or del
return nil
}
ifs := []etcd.Cmp{} // list matching the desired state
ops := []etcd.Op{} // list of ops in this transaction
for _, res := range resources {
for _, res := range resourceList {
if res.Kind() == "" {
log.Fatalf("Etcd: SetResources: Error: Empty kind: %v", res.GetName())
}
uuid := fmt.Sprintf("%s/%s", res.Kind(), res.GetName())
path := fmt.Sprintf("/%s/exported/%s/resources/%s", NS, hostname, uuid)
if data, err := ResToB64(res); err == nil {
if data, err := resources.ResToB64(res); err == nil {
ifs = append(ifs, etcd.Compare(etcd.Value(path), "=", data)) // desired state
ops = append(ops, etcd.OpPut(path, data))
} else {
@@ -2132,8 +2140,8 @@ func EtcdSetResources(obj *EmbdEtcd, hostname string, resources []Res) error {
}
}
match := func(res Res, resources []Res) bool { // helper lambda
for _, x := range resources {
match := func(res resources.Res, resourceList []resources.Res) bool { // helper lambda
for _, x := range resourceList {
if res.Kind() == x.Kind() && res.GetName() == x.GetName() {
return true
}
@@ -2150,7 +2158,7 @@ func EtcdSetResources(obj *EmbdEtcd, hostname string, resources []Res) error {
uuid := fmt.Sprintf("%s/%s", res.Kind(), res.GetName())
path := fmt.Sprintf("/%s/exported/%s/resources/%s", NS, hostname, uuid)
if match(res, resources) { // if we match, no need to delete!
if match(res, resourceList) { // if we match, no need to delete!
continue
}
@@ -2175,10 +2183,10 @@ func EtcdSetResources(obj *EmbdEtcd, hostname string, resources []Res) error {
// TODO: Expand this with a more powerful filter based on what we eventually
// support in our collect DSL. Ideally a server side filter like WithFilter()
// We could do this if the pattern was /$NS/exported/$kind/$hostname/$uuid = $data
func EtcdGetResources(obj *EmbdEtcd, hostnameFilter, kindFilter []string) ([]Res, error) {
func EtcdGetResources(obj *EmbdEtcd, hostnameFilter, kindFilter []string) ([]resources.Res, error) {
// key structure is /$NS/exported/$hostname/resources/$uuid = $data
path := fmt.Sprintf("/%s/exported/", NS)
resources := []Res{}
resourceList := []resources.Res{}
keyMap, err := obj.Get(path, etcd.WithPrefix(), etcd.WithSort(etcd.SortByKey, etcd.SortAscend))
if err != nil {
return nil, fmt.Errorf("Etcd: GetResources: Error: Could not get resources: %v", err)
@@ -2201,24 +2209,24 @@ func EtcdGetResources(obj *EmbdEtcd, hostnameFilter, kindFilter []string) ([]Res
}
// FIXME: ideally this would be a server side filter instead!
if len(hostnameFilter) > 0 && !StrInList(hostname, hostnameFilter) {
if len(hostnameFilter) > 0 && !util.StrInList(hostname, hostnameFilter) {
continue
}
// FIXME: ideally this would be a server side filter instead!
if len(kindFilter) > 0 && !StrInList(kind, kindFilter) {
if len(kindFilter) > 0 && !util.StrInList(kind, kindFilter) {
continue
}
if obj, err := B64ToRes(val); err == nil {
obj.setKind(kind) // cheap init
if obj, err := resources.B64ToRes(val); err == nil {
obj.SetKind(kind) // cheap init
log.Printf("Etcd: Get: (Hostname, Kind, Name): (%s, %s, %s)", hostname, kind, name)
resources = append(resources, obj)
resourceList = append(resourceList, obj)
} else {
return nil, fmt.Errorf("Etcd: GetResources: Error: Can't convert from B64: %v", err)
}
}
return resources, nil
return resourceList, nil
}
//func UrlRemoveScheme(urls etcdtypes.URLs) []string {
@@ -2258,7 +2266,7 @@ func ApplyDeltaEvents(re *RE, urlsmap etcdtypes.URLsMap) (etcdtypes.URLsMap, err
if _, exists := urlsmap[key]; !exists {
// this can happen if we retry an operation b/w
// a reconnect so ignore if we are reconnecting
if DEBUG {
if global.DEBUG {
log.Printf("Etcd: ApplyDeltaEvents: Inconsistent key: %v", key)
}
return nil, errApplyDeltaEventsInconsistent

View File

@@ -15,22 +15,23 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main
// Package event provides some primitives that are used for message passing.
package event
import (
"fmt"
)
//go:generate stringer -type=eventName -output=eventname_stringer.go
type eventName int
//go:generate stringer -type=EventName -output=eventname_stringer.go
type EventName int
const (
eventNil eventName = iota
eventExit
eventStart
eventPause
eventPoke
eventBackPoke
EventNil EventName = iota
EventExit
EventStart
EventPause
EventPoke
EventBackPoke
)
// Resp is a channel to be used for boolean responses. A nil represents an ACK,
@@ -39,7 +40,7 @@ type Resp chan error
// Event is the main struct that stores event information and responses.
type Event struct {
Name eventName
Name EventName
Resp Resp // channel to send an ack response on, nil to skip
//Wg *sync.WaitGroup // receiver barrier to Wait() for everyone else on
Msg string // some words for fun

268
gconfig/gconfig.go Normal file
View File

@@ -0,0 +1,268 @@
// Mgmt
// Copyright (C) 2013-2016+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// Package gconfig provides the facilities for loading a graph from a yaml file.
package gconfig
import (
"errors"
"fmt"
"io/ioutil"
"log"
"reflect"
"strings"
"github.com/purpleidea/mgmt/etcd"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/global"
"github.com/purpleidea/mgmt/pgraph"
"github.com/purpleidea/mgmt/resources"
"github.com/purpleidea/mgmt/util"
"gopkg.in/yaml.v2"
)
type collectorResConfig struct {
Kind string `yaml:"kind"`
Pattern string `yaml:"pattern"` // XXX: Not Implemented
}
type vertexConfig struct {
Kind string `yaml:"kind"`
Name string `yaml:"name"`
}
type edgeConfig struct {
Name string `yaml:"name"`
From vertexConfig `yaml:"from"`
To vertexConfig `yaml:"to"`
}
// GraphConfig is the data structure that describes a single graph to run.
type GraphConfig struct {
Graph string `yaml:"graph"`
Resources struct {
Noop []*resources.NoopRes `yaml:"noop"`
Pkg []*resources.PkgRes `yaml:"pkg"`
File []*resources.FileRes `yaml:"file"`
Svc []*resources.SvcRes `yaml:"svc"`
Exec []*resources.ExecRes `yaml:"exec"`
Timer []*resources.TimerRes `yaml:"timer"`
} `yaml:"resources"`
Collector []collectorResConfig `yaml:"collect"`
Edges []edgeConfig `yaml:"edges"`
Comment string `yaml:"comment"`
Hostname string `yaml:"hostname"` // uuid for the host
Remote string `yaml:"remote"`
}
// Parse parses a data stream into the graph structure.
func (c *GraphConfig) Parse(data []byte) error {
if err := yaml.Unmarshal(data, c); err != nil {
return err
}
if c.Graph == "" {
return errors.New("Graph config: invalid `graph`")
}
return nil
}
// ParseConfigFromFile takes a filename and returns the graph config structure.
func ParseConfigFromFile(filename string) *GraphConfig {
data, err := ioutil.ReadFile(filename)
if err != nil {
log.Printf("Config: Error: ParseConfigFromFile: File: %v", err)
return nil
}
var config GraphConfig
if err := config.Parse(data); err != nil {
log.Printf("Config: Error: ParseConfigFromFile: Parse: %v", err)
return nil
}
return &config
}
// NewGraphFromConfig returns a new graph from existing input, such as from the
// existing graph, and a GraphConfig struct.
func (c *GraphConfig) NewGraphFromConfig(g *pgraph.Graph, embdEtcd *etcd.EmbdEtcd, noop bool) (*pgraph.Graph, error) {
if c.Hostname == "" {
return nil, fmt.Errorf("Config: Error: Hostname can't be empty!")
}
var graph *pgraph.Graph // new graph to return
if g == nil { // FIXME: how can we check for an empty graph?
graph = pgraph.NewGraph("Graph") // give graph a default name
} else {
graph = g.Copy() // same vertices, since they're pointers!
}
var lookup = make(map[string]map[string]*pgraph.Vertex)
//log.Printf("%+v", config) // debug
// TODO: if defined (somehow)...
graph.SetName(c.Graph) // set graph name
var keep []*pgraph.Vertex // list of vertex which are the same in new graph
var resourceList []resources.Res // list of resources to export
// use reflection to avoid duplicating code... better options welcome!
value := reflect.Indirect(reflect.ValueOf(c.Resources))
vtype := value.Type()
for i := 0; i < vtype.NumField(); i++ { // number of fields in struct
name := vtype.Field(i).Name // string of field name
field := value.FieldByName(name)
iface := field.Interface() // interface type of value
slice := reflect.ValueOf(iface)
// XXX: should we just drop these everywhere and have the kind strings be all lowercase?
kind := util.FirstToUpper(name)
if global.DEBUG {
log.Printf("Config: Processing: %v...", kind)
}
for j := 0; j < slice.Len(); j++ { // loop through resources of same kind
x := slice.Index(j).Interface()
res, ok := x.(resources.Res) // convert to Res type
if !ok {
return nil, fmt.Errorf("Config: Error: Can't convert: %v of type: %T to Res.", x, x)
}
if noop {
res.Meta().Noop = noop
}
if _, exists := lookup[kind]; !exists {
lookup[kind] = make(map[string]*pgraph.Vertex)
}
// XXX: should we export based on a @@ prefix, or a metaparam
// like exported => true || exported => (host pattern)||(other pattern?)
if !strings.HasPrefix(res.GetName(), "@@") { // not exported resource
// XXX: we don't have a way of knowing if any of the
// metaparams are undefined, and as a result to set the
// defaults that we want! I hate the go yaml parser!!!
v := graph.GetVertexMatch(res)
if v == nil { // no match found
res.Init()
v = pgraph.NewVertex(res)
graph.AddVertex(v) // call standalone in case not part of an edge
}
lookup[kind][res.GetName()] = v // used for constructing edges
keep = append(keep, v) // append
} else if !noop { // do not export any resources if noop
// store for addition to etcd storage...
res.SetName(res.GetName()[2:]) //slice off @@
res.SetKind(kind) // cheap init
resourceList = append(resourceList, res)
}
}
}
// store in etcd
if err := etcd.EtcdSetResources(embdEtcd, c.Hostname, resourceList); err != nil {
return nil, fmt.Errorf("Config: Could not export resources: %v", err)
}
// lookup from etcd
var hostnameFilter []string // empty to get from everyone
kindFilter := []string{}
for _, t := range c.Collector {
// XXX: should we just drop these everywhere and have the kind strings be all lowercase?
kind := util.FirstToUpper(t.Kind)
kindFilter = append(kindFilter, kind)
}
// do all the graph look ups in one single step, so that if the etcd
// database changes, we don't have a partial state of affairs...
if len(kindFilter) > 0 { // if kindFilter is empty, don't need to do lookups!
var err error
resourceList, err = etcd.EtcdGetResources(embdEtcd, hostnameFilter, kindFilter)
if err != nil {
return nil, fmt.Errorf("Config: Could not collect resources: %v", err)
}
}
for _, res := range resourceList {
matched := false
// see if we find a collect pattern that matches
for _, t := range c.Collector {
// XXX: should we just drop these everywhere and have the kind strings be all lowercase?
kind := util.FirstToUpper(t.Kind)
// use t.Kind and optionally t.Pattern to collect from etcd storage
log.Printf("Collect: %v; Pattern: %v", kind, t.Pattern)
// XXX: expand to more complex pattern matching here...
if res.Kind() != kind {
continue
}
if matched {
// we've already matched this resource, should we match again?
log.Printf("Config: Warning: Matching %v[%v] again!", kind, res.GetName())
}
matched = true
// collect resources but add the noop metaparam
if noop {
res.Meta().Noop = noop
}
if t.Pattern != "" { // XXX: simplistic for now
res.CollectPattern(t.Pattern) // res.Dirname = t.Pattern
}
log.Printf("Collect: %v[%v]: collected!", kind, res.GetName())
// XXX: similar to other resource add code:
if _, exists := lookup[kind]; !exists {
lookup[kind] = make(map[string]*pgraph.Vertex)
}
v := graph.GetVertexMatch(res)
if v == nil { // no match found
res.Init() // initialize go channels or things won't work!!!
v = pgraph.NewVertex(res)
graph.AddVertex(v) // call standalone in case not part of an edge
}
lookup[kind][res.GetName()] = v // used for constructing edges
keep = append(keep, v) // append
//break // let's see if another resource even matches
}
}
// get rid of any vertices we shouldn't "keep" (that aren't in new graph)
for _, v := range graph.GetVertices() {
if !pgraph.VertexContains(v, keep) {
// wait for exit before starting new graph!
v.SendEvent(event.EventExit, true, false)
graph.DeleteVertex(v)
}
}
for _, e := range c.Edges {
if _, ok := lookup[util.FirstToUpper(e.From.Kind)]; !ok {
return nil, fmt.Errorf("Can't find 'from' resource!")
}
if _, ok := lookup[util.FirstToUpper(e.To.Kind)]; !ok {
return nil, fmt.Errorf("Can't find 'to' resource!")
}
if _, ok := lookup[util.FirstToUpper(e.From.Kind)][e.From.Name]; !ok {
return nil, fmt.Errorf("Can't find 'from' name!")
}
if _, ok := lookup[util.FirstToUpper(e.To.Kind)][e.To.Name]; !ok {
return nil, fmt.Errorf("Can't find 'to' name!")
}
graph.AddEdge(lookup[util.FirstToUpper(e.From.Kind)][e.From.Name], lookup[util.FirstToUpper(e.To.Kind)][e.To.Name], pgraph.NewEdge(e.Name))
}
return graph, nil
}

25
global/global.go Normal file
View File

@@ -0,0 +1,25 @@
// Mgmt
// Copyright (C) 2013-2016+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// Package global holds some global variables that are used throughout the code.
package global
const (
DEBUG = false // add additional log messages
TRACE = false // add execution flow log messages
VERBOSE = false // add extra log message output
)

66
main.go
View File

@@ -19,9 +19,6 @@ package main
import (
"fmt"
etcdtypes "github.com/coreos/etcd/pkg/types"
"github.com/coreos/pkg/capnslog"
"github.com/urfave/cli"
"io/ioutil"
"log"
"os"
@@ -29,6 +26,19 @@ import (
"sync"
"syscall"
"time"
"github.com/purpleidea/mgmt/converger"
"github.com/purpleidea/mgmt/etcd"
"github.com/purpleidea/mgmt/gconfig"
"github.com/purpleidea/mgmt/global"
"github.com/purpleidea/mgmt/pgraph"
"github.com/purpleidea/mgmt/puppet"
"github.com/purpleidea/mgmt/remote"
"github.com/purpleidea/mgmt/util"
etcdtypes "github.com/coreos/etcd/pkg/types"
"github.com/coreos/pkg/capnslog"
"github.com/urfave/cli"
)
// set at compile time
@@ -38,13 +48,6 @@ var (
prefix = fmt.Sprintf("/var/lib/%s/", program)
)
// variables controlling verbosity
const (
DEBUG = false // add additional log messages
TRACE = false // add execution flow log messages
VERBOSE = false // add extra log message output
)
// signal handler
func waitForSignal(exit chan bool) {
signals := make(chan os.Signal, 1)
@@ -73,7 +76,7 @@ func run(c *cli.Context) error {
hostname, _ := os.Hostname()
// allow passing in the hostname, instead of using --hostname
if c.IsSet("file") {
if config := ParseConfigFromFile(c.String("file")); config != nil {
if config := gconfig.ParseConfigFromFile(c.String("file")); config != nil {
if h := config.Hostname; h != "" {
hostname = h
}
@@ -87,21 +90,21 @@ func run(c *cli.Context) error {
noop := c.Bool("noop")
seeds, err := etcdtypes.NewURLs(
FlattenListWithSplit(c.StringSlice("seeds"), []string{",", ";", " "}),
util.FlattenListWithSplit(c.StringSlice("seeds"), []string{",", ";", " "}),
)
if err != nil && len(c.StringSlice("seeds")) > 0 {
log.Printf("Main: Error: seeds didn't parse correctly!")
return cli.NewExitError("", 1)
}
clientURLs, err := etcdtypes.NewURLs(
FlattenListWithSplit(c.StringSlice("client-urls"), []string{",", ";", " "}),
util.FlattenListWithSplit(c.StringSlice("client-urls"), []string{",", ";", " "}),
)
if err != nil && len(c.StringSlice("client-urls")) > 0 {
log.Printf("Main: Error: clientURLs didn't parse correctly!")
return cli.NewExitError("", 1)
}
serverURLs, err := etcdtypes.NewURLs(
FlattenListWithSplit(c.StringSlice("server-urls"), []string{",", ";", " "}),
util.FlattenListWithSplit(c.StringSlice("server-urls"), []string{",", ";", " "}),
)
if err != nil && len(c.StringSlice("server-urls")) > 0 {
log.Printf("Main: Error: serverURLs didn't parse correctly!")
@@ -171,7 +174,7 @@ func run(c *cli.Context) error {
var wg sync.WaitGroup
exit := make(chan bool) // exit signal
var G, fullGraph *Graph
var G, fullGraph *pgraph.Graph
// exit after `max-runtime` seconds for no reason at all...
if i := c.Int("max-runtime"); i > 0 {
@@ -182,7 +185,7 @@ func run(c *cli.Context) error {
}
// setup converger
converger := NewConverger(
converger := converger.NewConverger(
c.Int("converged-timeout"),
nil, // stateFn gets added in by EmbdEtcd
)
@@ -194,7 +197,7 @@ func run(c *cli.Context) error {
} else {
log.Printf("Main: Seeds(%v): %v", len(seeds), seeds)
}
EmbdEtcd := NewEmbdEtcd(
EmbdEtcd := etcd.NewEmbdEtcd(
hostname,
seeds,
clientURLs,
@@ -225,7 +228,7 @@ func run(c *cli.Context) error {
return nil
}
// send our individual state into etcd for others to see
return EtcdSetHostnameConverged(EmbdEtcd, hostname, b) // TODO: what should happen on error?
return etcd.EtcdSetHostnameConverged(EmbdEtcd, hostname, b) // TODO: what should happen on error?
}
if EmbdEtcd != nil {
converger.SetStateFn(convergerStateFn)
@@ -241,11 +244,11 @@ func run(c *cli.Context) error {
if !c.Bool("no-watch") && c.IsSet("file") {
configchan = ConfigWatch(file)
} else if c.IsSet("puppet") {
interval := PuppetInterval(c.String("puppet-conf"))
interval := puppet.PuppetInterval(c.String("puppet-conf"))
puppetchan = time.Tick(time.Duration(interval) * time.Second)
}
log.Println("Etcd: Starting...")
etcdchan := EtcdWatch(EmbdEtcd)
etcdchan := etcd.EtcdWatch(EmbdEtcd)
first := true // first loop or not
for {
log.Println("Main: Waiting...")
@@ -272,11 +275,11 @@ func run(c *cli.Context) error {
return
}
var config *GraphConfig
var config *gconfig.GraphConfig
if c.IsSet("file") {
config = ParseConfigFromFile(file)
config = gconfig.ParseConfigFromFile(file)
} else if c.IsSet("puppet") {
config = ParseConfigFromPuppet(c.String("puppet"), c.String("puppet-conf"))
config = puppet.ParseConfigFromPuppet(c.String("puppet"), c.String("puppet-conf"))
}
if config == nil {
log.Printf("Config: Parse failure")
@@ -297,7 +300,7 @@ func run(c *cli.Context) error {
// build graph from yaml file on events (eg: from etcd)
// we need the vertices to be paused to work on them
if newFullgraph, err := fullGraph.NewGraphFromConfig(config, EmbdEtcd, noop); err == nil { // keep references to all original elements
if newFullgraph, err := config.NewGraphFromConfig(fullGraph, EmbdEtcd, noop); err == nil { // keep references to all original elements
fullGraph = newFullgraph
} else {
log.Printf("Config: Error making new graph from config: %v", err)
@@ -344,13 +347,13 @@ func run(c *cli.Context) error {
// initialize the add watcher, which calls the f callback on map changes
convergerCb := func(f func(map[string]bool) error) (func(), error) {
return EtcdAddHostnameConvergedWatcher(EmbdEtcd, f)
return etcd.EtcdAddHostnameConvergedWatcher(EmbdEtcd, f)
}
// build remotes struct for remote ssh
remotes := NewRemotes(
remotes := remote.NewRemotes(
EmbdEtcd.LocalhostClientURLs().StringSlice(),
[]string{DefaultClientURL},
[]string{etcd.DefaultClientURL},
noop,
c.StringSlice("remote"), // list of files
events, // watch for file changes
@@ -362,6 +365,7 @@ func run(c *cli.Context) error {
prefix,
converger,
convergerCb,
program,
)
// TODO: is there any benefit to running the remotes above in the loop?
@@ -390,7 +394,7 @@ func run(c *cli.Context) error {
log.Printf("Etcd exited poorly with: %v", err)
}
if DEBUG {
if global.DEBUG {
log.Printf("Graph: %v", G)
}
@@ -403,7 +407,7 @@ func run(c *cli.Context) error {
func main() {
var flags int
if DEBUG || true { // TODO: remove || true
if global.DEBUG || true { // TODO: remove || true
flags = log.LstdFlags | log.Lshortfile
}
flags = (flags - log.Ldate) // remove the date for now
@@ -411,7 +415,7 @@ func main() {
// un-hijack from capnslog...
log.SetOutput(os.Stderr)
if VERBOSE {
if global.VERBOSE {
capnslog.SetFormatter(capnslog.NewLogFormatter(os.Stderr, "(etcd) ", flags))
} else {
capnslog.SetFormatter(capnslog.NewNilFormatter())
@@ -492,7 +496,7 @@ func main() {
},
cli.IntFlag{
Name: "ideal-cluster-size",
Value: defaultIdealClusterSize,
Value: etcd.DefaultIdealClusterSize,
Usage: "ideal number of server peers in cluster, only read by initial server",
EnvVar: "MGMT_IDEAL_CLUSTER_SIZE",
},

104
pgraph/autoedge.go Normal file
View File

@@ -0,0 +1,104 @@
// Mgmt
// Copyright (C) 2013-2016+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// Package pgraph represents the internal "pointer graph" that we use.
package pgraph
import (
"fmt"
"log"
"github.com/purpleidea/mgmt/global"
"github.com/purpleidea/mgmt/resources"
)
// add edges to the vertex in a graph based on if it matches a uuid list
func (g *Graph) addEdgesByMatchingUUIDS(v *Vertex, uuids []resources.ResUUID) []bool {
// search for edges and see what matches!
var result []bool
// loop through each uuid, and see if it matches any vertex
for _, uuid := range uuids {
var found = false
// uuid is a ResUUID object
for _, vv := range g.GetVertices() { // search
if v == vv { // skip self
continue
}
if global.DEBUG {
log.Printf("Compile: AutoEdge: Match: %v[%v] with UUID: %v[%v]", vv.Kind(), vv.GetName(), uuid.Kind(), uuid.GetName())
}
// we must match to an effective UUID for the resource,
// that is to say, the name value of a res is a helpful
// handle, but it is not necessarily a unique identity!
// remember, resources can return multiple UUID's each!
if resources.UUIDExistsInUUIDs(uuid, vv.GetUUIDs()) {
// add edge from: vv -> v
if uuid.Reversed() {
txt := fmt.Sprintf("AutoEdge: %v[%v] -> %v[%v]", vv.Kind(), vv.GetName(), v.Kind(), v.GetName())
log.Printf("Compile: Adding %v", txt)
g.AddEdge(vv, v, NewEdge(txt))
} else { // edges go the "normal" way, eg: pkg resource
txt := fmt.Sprintf("AutoEdge: %v[%v] -> %v[%v]", v.Kind(), v.GetName(), vv.Kind(), vv.GetName())
log.Printf("Compile: Adding %v", txt)
g.AddEdge(v, vv, NewEdge(txt))
}
found = true
break
}
}
result = append(result, found)
}
return result
}
// AutoEdges adds the automatic edges to the graph.
func (g *Graph) AutoEdges() {
log.Println("Compile: Adding AutoEdges...")
for _, v := range g.GetVertices() { // for each vertexes autoedges
if !v.Meta().AutoEdge { // is the metaparam true?
continue
}
autoEdgeObj := v.AutoEdges()
if autoEdgeObj == nil {
log.Printf("%v[%v]: Config: No auto edges were found!", v.Kind(), v.GetName())
continue // next vertex
}
for { // while the autoEdgeObj has more uuids to add...
uuids := autoEdgeObj.Next() // get some!
if uuids == nil {
log.Printf("%v[%v]: Config: The auto edge list is empty!", v.Kind(), v.GetName())
break // inner loop
}
if global.DEBUG {
log.Println("Compile: AutoEdge: UUIDS:")
for i, u := range uuids {
log.Printf("Compile: AutoEdge: UUID%d: %v", i, u)
}
}
// match and add edges
result := g.addEdgesByMatchingUUIDS(v, uuids)
// report back, and find out if we should continue
if !autoEdgeObj.Test(result) {
break
}
}
}
}

348
pgraph/autogroup.go Normal file
View File

@@ -0,0 +1,348 @@
// Mgmt
// Copyright (C) 2013-2016+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgraph
import (
"fmt"
"log"
"github.com/purpleidea/mgmt/global"
)
// AutoGrouper is the required interface to implement for an autogroup algorithm
type AutoGrouper interface {
// listed in the order these are typically called in...
name() string // friendly identifier
init(*Graph) error // only call once
vertexNext() (*Vertex, *Vertex, error) // mostly algorithmic
vertexCmp(*Vertex, *Vertex) error // can we merge these ?
vertexMerge(*Vertex, *Vertex) (*Vertex, error) // vertex merge fn to use
edgeMerge(*Edge, *Edge) *Edge // edge merge fn to use
vertexTest(bool) (bool, error) // call until false
}
// baseGrouper is the base type for implementing the AutoGrouper interface
type baseGrouper struct {
graph *Graph // store a pointer to the graph
vertices []*Vertex // cached list of vertices
i int
j int
done bool
}
// name provides a friendly name for the logs to see
func (ag *baseGrouper) name() string {
return "baseGrouper"
}
// init is called only once and before using other AutoGrouper interface methods
// the name method is the only exception: call it any time without side effects!
func (ag *baseGrouper) init(g *Graph) error {
if ag.graph != nil {
return fmt.Errorf("The init method has already been called!")
}
ag.graph = g // pointer
ag.vertices = ag.graph.GetVerticesSorted() // cache in deterministic order!
ag.i = 0
ag.j = 0
if len(ag.vertices) == 0 { // empty graph
ag.done = true
return nil
}
return nil
}
// vertexNext is a simple iterator that loops through vertex (pair) combinations
// an intelligent algorithm would selectively offer only valid pairs of vertices
// these should satisfy logical grouping requirements for the autogroup designs!
// the desired algorithms can override, but keep this method as a base iterator!
func (ag *baseGrouper) vertexNext() (v1, v2 *Vertex, err error) {
// this does a for v... { for w... { return v, w }} but stepwise!
l := len(ag.vertices)
if ag.i < l {
v1 = ag.vertices[ag.i]
}
if ag.j < l {
v2 = ag.vertices[ag.j]
}
// in case the vertex was deleted
if !ag.graph.HasVertex(v1) {
v1 = nil
}
if !ag.graph.HasVertex(v2) {
v2 = nil
}
// two nested loops...
if ag.j < l {
ag.j++
}
if ag.j == l {
ag.j = 0
if ag.i < l {
ag.i++
}
if ag.i == l {
ag.done = true
}
}
return
}
func (ag *baseGrouper) vertexCmp(v1, v2 *Vertex) error {
if v1 == nil || v2 == nil {
return fmt.Errorf("Vertex is nil!")
}
if v1 == v2 { // skip yourself
return fmt.Errorf("Vertices are the same!")
}
if v1.Kind() != v2.Kind() { // we must group similar kinds
// TODO: maybe future resources won't need this limitation?
return fmt.Errorf("The two resources aren't the same kind!")
}
// someone doesn't want to group!
if !v1.Meta().AutoGroup || !v2.Meta().AutoGroup {
return fmt.Errorf("One of the autogroup flags is false!")
}
if v1.Res.IsGrouped() { // already grouped!
return fmt.Errorf("Already grouped!")
}
if len(v2.Res.GetGroup()) > 0 { // already has children grouped!
return fmt.Errorf("Already has groups!")
}
if !v1.Res.GroupCmp(v2.Res) { // resource groupcmp failed!
return fmt.Errorf("The GroupCmp failed!")
}
return nil // success
}
func (ag *baseGrouper) vertexMerge(v1, v2 *Vertex) (v *Vertex, err error) {
// NOTE: it's important to use w.Res instead of w, b/c
// the w by itself is the *Vertex obj, not the *Res obj
// which is contained within it! They both satisfy the
// Res interface, which is why both will compile! :(
err = v1.Res.GroupRes(v2.Res) // GroupRes skips stupid groupings
return // success or fail, and no need to merge the actual vertices!
}
func (ag *baseGrouper) edgeMerge(e1, e2 *Edge) *Edge {
return e1 // noop
}
// vertexTest processes the results of the grouping for the algorithm to know
// return an error if something went horribly wrong, and bool false to stop
func (ag *baseGrouper) vertexTest(b bool) (bool, error) {
// NOTE: this particular baseGrouper version doesn't track what happens
// because since we iterate over every pair, we don't care which merge!
if ag.done {
return false, nil
}
return true, nil
}
// TODO: this algorithm may not be correct in all cases. replace if needed!
type nonReachabilityGrouper struct {
baseGrouper // "inherit" what we want, and reimplement the rest
}
func (ag *nonReachabilityGrouper) name() string {
return "nonReachabilityGrouper"
}
// this algorithm relies on the observation that if there's a path from a to b,
// then they *can't* be merged (b/c of the existing dependency) so therefore we
// merge anything that *doesn't* satisfy this condition or that of the reverse!
func (ag *nonReachabilityGrouper) vertexNext() (v1, v2 *Vertex, err error) {
for {
v1, v2, err = ag.baseGrouper.vertexNext() // get all iterable pairs
if err != nil {
log.Fatalf("Error running autoGroup(vertexNext): %v", err)
}
if v1 != v2 { // ignore self cmp early (perf optimization)
// if NOT reachable, they're viable...
out1 := ag.graph.Reachability(v1, v2)
out2 := ag.graph.Reachability(v2, v1)
if len(out1) == 0 && len(out2) == 0 {
return // return v1 and v2, they're viable
}
}
// if we got here, it means we're skipping over this candidate!
if ok, err := ag.baseGrouper.vertexTest(false); err != nil {
log.Fatalf("Error running autoGroup(vertexTest): %v", err)
} else if !ok {
return nil, nil, nil // done!
}
// the vertexTest passed, so loop and try with a new pair...
}
}
// VertexMerge merges v2 into v1 by reattaching the edges where appropriate,
// and then by deleting v2 from the graph. Since more than one edge between two
// vertices is not allowed, duplicate edges are merged as well. an edge merge
// function can be provided if you'd like to control how you merge the edges!
func (g *Graph) VertexMerge(v1, v2 *Vertex, vertexMergeFn func(*Vertex, *Vertex) (*Vertex, error), edgeMergeFn func(*Edge, *Edge) *Edge) error {
// methodology
// 1) edges between v1 and v2 are removed
//Loop:
for k1 := range g.Adjacency {
for k2 := range g.Adjacency[k1] {
// v1 -> v2 || v2 -> v1
if (k1 == v1 && k2 == v2) || (k1 == v2 && k2 == v1) {
delete(g.Adjacency[k1], k2) // delete map & edge
// NOTE: if we assume this is a DAG, then we can
// assume only v1 -> v2 OR v2 -> v1 exists, and
// we can break out of these loops immediately!
//break Loop
break
}
}
}
// 2) edges that point towards v2 from X now point to v1 from X (no dupes)
for _, x := range g.IncomingGraphEdges(v2) { // all to vertex v (??? -> v)
e := g.Adjacency[x][v2] // previous edge
r := g.Reachability(x, v1)
// merge e with ex := g.Adjacency[x][v1] if it exists!
if ex, exists := g.Adjacency[x][v1]; exists && edgeMergeFn != nil && len(r) == 0 {
e = edgeMergeFn(e, ex)
}
if len(r) == 0 { // if not reachable, add it
g.AddEdge(x, v1, e) // overwrite edge
} else if edgeMergeFn != nil { // reachable, merge e through...
prev := x // initial condition
for i, next := range r {
if i == 0 {
// next == prev, therefore skip
continue
}
// this edge is from: prev, to: next
ex, _ := g.Adjacency[prev][next] // get
ex = edgeMergeFn(ex, e)
g.Adjacency[prev][next] = ex // set
prev = next
}
}
delete(g.Adjacency[x], v2) // delete old edge
}
// 3) edges that point from v2 to X now point from v1 to X (no dupes)
for _, x := range g.OutgoingGraphEdges(v2) { // all from vertex v (v -> ???)
e := g.Adjacency[v2][x] // previous edge
r := g.Reachability(v1, x)
// merge e with ex := g.Adjacency[v1][x] if it exists!
if ex, exists := g.Adjacency[v1][x]; exists && edgeMergeFn != nil && len(r) == 0 {
e = edgeMergeFn(e, ex)
}
if len(r) == 0 {
g.AddEdge(v1, x, e) // overwrite edge
} else if edgeMergeFn != nil { // reachable, merge e through...
prev := v1 // initial condition
for i, next := range r {
if i == 0 {
// next == prev, therefore skip
continue
}
// this edge is from: prev, to: next
ex, _ := g.Adjacency[prev][next]
ex = edgeMergeFn(ex, e)
g.Adjacency[prev][next] = ex
prev = next
}
}
delete(g.Adjacency[v2], x)
}
// 4) merge and then remove the (now merged/grouped) vertex
if vertexMergeFn != nil { // run vertex merge function
if v, err := vertexMergeFn(v1, v2); err != nil {
return err
} else if v != nil { // replace v1 with the "merged" version...
v1 = v // XXX: will this replace v1 the way we want?
}
}
g.DeleteVertex(v2) // remove grouped vertex
// 5) creation of a cyclic graph should throw an error
if _, dag := g.TopologicalSort(); !dag { // am i a dag or not?
return fmt.Errorf("Graph is not a dag!")
}
return nil // success
}
// autoGroup is the mechanical auto group "runner" that runs the interface spec
func (g *Graph) autoGroup(ag AutoGrouper) chan string {
strch := make(chan string) // output log messages here
go func(strch chan string) {
strch <- fmt.Sprintf("Compile: Grouping: Algorithm: %v...", ag.name())
if err := ag.init(g); err != nil {
log.Fatalf("Error running autoGroup(init): %v", err)
}
for {
var v, w *Vertex
v, w, err := ag.vertexNext() // get pair to compare
if err != nil {
log.Fatalf("Error running autoGroup(vertexNext): %v", err)
}
merged := false
// save names since they change during the runs
vStr := fmt.Sprintf("%s", v) // valid even if it is nil
wStr := fmt.Sprintf("%s", w)
if err := ag.vertexCmp(v, w); err != nil { // cmp ?
if global.DEBUG {
strch <- fmt.Sprintf("Compile: Grouping: !GroupCmp for: %s into %s", wStr, vStr)
}
// remove grouped vertex and merge edges (res is safe)
} else if err := g.VertexMerge(v, w, ag.vertexMerge, ag.edgeMerge); err != nil { // merge...
strch <- fmt.Sprintf("Compile: Grouping: !VertexMerge for: %s into %s", wStr, vStr)
} else { // success!
strch <- fmt.Sprintf("Compile: Grouping: Success for: %s into %s", wStr, vStr)
merged = true // woo
}
// did these get used?
if ok, err := ag.vertexTest(merged); err != nil {
log.Fatalf("Error running autoGroup(vertexTest): %v", err)
} else if !ok {
break // done!
}
}
close(strch)
return
}(strch) // call function
return strch
}
// AutoGroup runs the auto grouping on the graph and prints out log messages
func (g *Graph) AutoGroup() {
// receive log messages from channel...
// this allows test cases to avoid printing them when they're unwanted!
// TODO: this algorithm may not be correct in all cases. replace if needed!
for str := range g.autoGroup(&nonReachabilityGrouper{}) {
log.Println(str)
}
}

View File

@@ -15,8 +15,8 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// Pgraph (Pointer Graph)
package main
// Package pgraph represents the internal "pointer graph" that we use.
package pgraph
import (
"errors"
@@ -31,6 +31,11 @@ import (
"sync"
"syscall"
"time"
"github.com/purpleidea/mgmt/converger"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/global"
"github.com/purpleidea/mgmt/resources"
)
//go:generate stringer -type=graphState -output=graphstate_stringer.go
@@ -59,7 +64,7 @@ type Graph struct {
// Vertex is the primary vertex struct in this library.
type Vertex struct {
Res // anonymous field
resources.Res // anonymous field
timestamp int64 // last updated timestamp ?
}
@@ -78,7 +83,7 @@ func NewGraph(name string) *Graph {
}
// NewVertex returns a new graph vertex struct with a contained resource.
func NewVertex(r Res) *Vertex {
func NewVertex(r resources.Res) *Vertex {
return &Vertex{
Res: r,
}
@@ -160,7 +165,7 @@ func (g *Graph) AddEdge(v1, v2 *Vertex, e *Edge) {
// GetVertexMatch searches for an equivalent resource in the graph and returns
// the vertex it is found in, or nil if not found.
func (g *Graph) GetVertexMatch(obj Res) *Vertex {
func (g *Graph) GetVertexMatch(obj resources.Res) *Vertex {
for k := range g.Adjacency {
if k.Res.Compare(obj) {
return k
@@ -549,99 +554,6 @@ func (g *Graph) Reachability(a, b *Vertex) []*Vertex {
return result
}
// VertexMerge merges v2 into v1 by reattaching the edges where appropriate,
// and then by deleting v2 from the graph. Since more than one edge between two
// vertices is not allowed, duplicate edges are merged as well. an edge merge
// function can be provided if you'd like to control how you merge the edges!
func (g *Graph) VertexMerge(v1, v2 *Vertex, vertexMergeFn func(*Vertex, *Vertex) (*Vertex, error), edgeMergeFn func(*Edge, *Edge) *Edge) error {
// methodology
// 1) edges between v1 and v2 are removed
//Loop:
for k1 := range g.Adjacency {
for k2 := range g.Adjacency[k1] {
// v1 -> v2 || v2 -> v1
if (k1 == v1 && k2 == v2) || (k1 == v2 && k2 == v1) {
delete(g.Adjacency[k1], k2) // delete map & edge
// NOTE: if we assume this is a DAG, then we can
// assume only v1 -> v2 OR v2 -> v1 exists, and
// we can break out of these loops immediately!
//break Loop
break
}
}
}
// 2) edges that point towards v2 from X now point to v1 from X (no dupes)
for _, x := range g.IncomingGraphEdges(v2) { // all to vertex v (??? -> v)
e := g.Adjacency[x][v2] // previous edge
r := g.Reachability(x, v1)
// merge e with ex := g.Adjacency[x][v1] if it exists!
if ex, exists := g.Adjacency[x][v1]; exists && edgeMergeFn != nil && len(r) == 0 {
e = edgeMergeFn(e, ex)
}
if len(r) == 0 { // if not reachable, add it
g.AddEdge(x, v1, e) // overwrite edge
} else if edgeMergeFn != nil { // reachable, merge e through...
prev := x // initial condition
for i, next := range r {
if i == 0 {
// next == prev, therefore skip
continue
}
// this edge is from: prev, to: next
ex, _ := g.Adjacency[prev][next] // get
ex = edgeMergeFn(ex, e)
g.Adjacency[prev][next] = ex // set
prev = next
}
}
delete(g.Adjacency[x], v2) // delete old edge
}
// 3) edges that point from v2 to X now point from v1 to X (no dupes)
for _, x := range g.OutgoingGraphEdges(v2) { // all from vertex v (v -> ???)
e := g.Adjacency[v2][x] // previous edge
r := g.Reachability(v1, x)
// merge e with ex := g.Adjacency[v1][x] if it exists!
if ex, exists := g.Adjacency[v1][x]; exists && edgeMergeFn != nil && len(r) == 0 {
e = edgeMergeFn(e, ex)
}
if len(r) == 0 {
g.AddEdge(v1, x, e) // overwrite edge
} else if edgeMergeFn != nil { // reachable, merge e through...
prev := v1 // initial condition
for i, next := range r {
if i == 0 {
// next == prev, therefore skip
continue
}
// this edge is from: prev, to: next
ex, _ := g.Adjacency[prev][next]
ex = edgeMergeFn(ex, e)
g.Adjacency[prev][next] = ex
prev = next
}
}
delete(g.Adjacency[v2], x)
}
// 4) merge and then remove the (now merged/grouped) vertex
if vertexMergeFn != nil { // run vertex merge function
if v, err := vertexMergeFn(v1, v2); err != nil {
return err
} else if v != nil { // replace v1 with the "merged" version...
v1 = v // XXX: will this replace v1 the way we want?
}
}
g.DeleteVertex(v2) // remove grouped vertex
// 5) creation of a cyclic graph should throw an error
if _, dag := g.TopologicalSort(); !dag { // am i a dag or not?
return fmt.Errorf("Graph is not a dag!")
}
return nil // success
}
// GetTimestamp returns the timestamp of a vertex
func (v *Vertex) GetTimestamp() int64 {
return v.timestamp
@@ -662,7 +574,7 @@ func (g *Graph) OKTimestamp(v *Vertex) bool {
// if they're equal (eg: on init of 0) then we also can't run
// b/c we should let our pre-req's go first...
x, y := v.GetTimestamp(), n.GetTimestamp()
if DEBUG {
if global.DEBUG {
log.Printf("%v[%v]: OKTimestamp: (%v) >= %v[%v](%v): !%v", v.Kind(), v.GetName(), x, n.Kind(), n.GetName(), y, x >= y)
}
if x >= y {
@@ -679,14 +591,14 @@ func (g *Graph) Poke(v *Vertex, activity bool) {
for _, n := range g.OutgoingGraphEdges(v) {
// XXX: if we're in state event and haven't been cancelled by
// apply, then we can cancel a poke to a child, right? XXX
// XXX: if n.Res.getState() != resStateEvent { // is this correct?
// XXX: if n.Res.getState() != resources.ResStateEvent { // is this correct?
if true { // XXX
if DEBUG {
if global.DEBUG {
log.Printf("%v[%v]: Poke: %v[%v]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
}
n.SendEvent(eventPoke, false, activity) // XXX: can this be switched to sync?
n.SendEvent(event.EventPoke, false, activity) // XXX: can this be switched to sync?
} else {
if DEBUG {
if global.DEBUG {
log.Printf("%v[%v]: Poke: %v[%v]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName())
}
}
@@ -699,18 +611,18 @@ func (g *Graph) BackPoke(v *Vertex) {
for _, n := range g.IncomingGraphEdges(v) {
x, y, s := v.GetTimestamp(), n.GetTimestamp(), n.Res.GetState()
// if the parent timestamp needs poking AND it's not in state
// resStateEvent, then poke it. If the parent is in resStateEvent it
// ResStateEvent, then poke it. If the parent is in ResStateEvent it
// means that an event is pending, so we'll be expecting a poke
// back soon, so we can safely discard the extra parent poke...
// TODO: implement a stateLT (less than) to tell if something
// happens earlier in the state cycle and that doesn't wrap nil
if x >= y && (s != resStateEvent && s != resStateCheckApply) {
if DEBUG {
if x >= y && (s != resources.ResStateEvent && s != resources.ResStateCheckApply) {
if global.DEBUG {
log.Printf("%v[%v]: BackPoke: %v[%v]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
}
n.SendEvent(eventBackPoke, false, false) // XXX: can this be switched to sync?
n.SendEvent(event.EventBackPoke, false, false) // XXX: can this be switched to sync?
} else {
if DEBUG {
if global.DEBUG {
log.Printf("%v[%v]: BackPoke: %v[%v]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName())
}
}
@@ -721,27 +633,27 @@ func (g *Graph) BackPoke(v *Vertex) {
// XXX: rename this function
func (g *Graph) Process(v *Vertex) error {
obj := v.Res
if DEBUG {
if global.DEBUG {
log.Printf("%v[%v]: Process()", obj.Kind(), obj.GetName())
}
obj.SetState(resStateEvent)
obj.SetState(resources.ResStateEvent)
var ok = true
var apply = false // did we run an apply?
// is it okay to run dependency wise right now?
// if not, that's okay because when the dependency runs, it will poke
// us back and we will run if needed then!
if g.OKTimestamp(v) {
if DEBUG {
if global.DEBUG {
log.Printf("%v[%v]: OKTimestamp(%v)", obj.Kind(), obj.GetName(), v.GetTimestamp())
}
obj.SetState(resStateCheckApply)
obj.SetState(resources.ResStateCheckApply)
// if this fails, don't UpdateTimestamp()
checkok, err := obj.CheckApply(!obj.Meta().Noop)
if checkok && err != nil { // should never return this way
log.Fatalf("%v[%v]: CheckApply(): %t, %+v", obj.Kind(), obj.GetName(), checkok, err)
}
if DEBUG {
if global.DEBUG {
log.Printf("%v[%v]: CheckApply(): %t, %v", obj.Kind(), obj.GetName(), checkok, err)
}
@@ -762,7 +674,7 @@ func (g *Graph) Process(v *Vertex) error {
// update this timestamp *before* we poke or the poked
// nodes might fail due to having a too old timestamp!
v.UpdateTimestamp() // this was touched...
obj.SetState(resStatePoking) // can't cancel parent poke
obj.SetState(resources.ResStatePoking) // can't cancel parent poke
g.Poke(v, apply)
}
// poke at our pre-req's instead since they need to refresh/run...
@@ -794,7 +706,7 @@ func (g *Graph) Worker(v *Vertex) error {
// the Watch() function about which graph it is
// running on, which isolates things nicely...
obj := v.Res
chanProcess := make(chan Event)
chanProcess := make(chan event.Event)
go func() {
running := false
var timer = time.NewTimer(time.Duration(math.MaxInt64)) // longest duration
@@ -803,7 +715,7 @@ func (g *Graph) Worker(v *Vertex) error {
}
var delay = time.Duration(v.Meta().Delay) * time.Millisecond
var retry int16 = v.Meta().Retry // number of tries left, -1 for infinite
var saved Event
var saved event.Event
Loop:
for {
// this has to be synchronous, because otherwise the Res
@@ -989,8 +901,8 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
// and not just selectively the subset with no indegree.
if (!first) || indegree[v] == 0 {
// ensure state is started before continuing on to next vertex
for !v.SendEvent(eventStart, true, false) {
if DEBUG {
for !v.SendEvent(event.EventStart, true, false) {
if global.DEBUG {
// if SendEvent fails, we aren't up yet
log.Printf("%v[%v]: Retrying SendEvent(Start)", v.Kind(), v.GetName())
// sleep here briefly or otherwise cause
@@ -1008,7 +920,7 @@ func (g *Graph) Pause() {
defer log.Printf("State: %v -> %v", g.setState(graphStatePaused), g.getState())
t, _ := g.TopologicalSort()
for _, v := range t { // squeeze out the events...
v.SendEvent(eventPause, true, false)
v.SendEvent(event.EventPause, true, false)
}
}
@@ -1025,12 +937,12 @@ func (g *Graph) Exit() {
// when we hit the 'default' in the select statement!
// XXX: we can do this to quiesce, but it's not necessary now
v.SendEvent(eventExit, true, false)
v.SendEvent(event.EventExit, true, false)
}
}
// AssociateData associates some data with the object in the graph in question
func (g *Graph) AssociateData(converger Converger) {
func (g *Graph) AssociateData(converger converger.Converger) {
for v := range g.GetVerticesChan() {
v.Res.AssociateData(converger)
}

View File

@@ -15,9 +15,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// NOTE: this is pgraph, a pointer graph
package main
package pgraph
import (
"fmt"

View File

@@ -15,7 +15,8 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main
// Package puppet provides the integration entrypoint for the puppet language.
package puppet
import (
"bufio"
@@ -24,6 +25,9 @@ import (
"os/exec"
"strconv"
"strings"
"github.com/purpleidea/mgmt/gconfig"
"github.com/purpleidea/mgmt/global"
)
const (
@@ -32,7 +36,7 @@ const (
)
func runPuppetCommand(cmd *exec.Cmd) ([]byte, error) {
if DEBUG {
if global.DEBUG {
log.Printf("Puppet: running command: %v", cmd)
}
@@ -67,7 +71,7 @@ func runPuppetCommand(cmd *exec.Cmd) ([]byte, error) {
// will choke on an oversized slice. http://stackoverflow.com/a/33726617/3356612
result = append(result, data[0:count]...)
}
if DEBUG {
if global.DEBUG {
log.Printf("Puppet: read %v bytes of data from puppet", len(result))
}
for scanner := bufio.NewScanner(stderr); scanner.Scan(); {
@@ -83,7 +87,7 @@ func runPuppetCommand(cmd *exec.Cmd) ([]byte, error) {
// ParseConfigFromPuppet takes a special puppet param string and config and
// returns the graph configuration structure.
func ParseConfigFromPuppet(puppetParam, puppetConf string) *GraphConfig {
func ParseConfigFromPuppet(puppetParam, puppetConf string) *gconfig.GraphConfig {
var puppetConfArg string
if puppetConf != "" {
puppetConfArg = "--config=" + puppetConf
@@ -100,7 +104,7 @@ func ParseConfigFromPuppet(puppetParam, puppetConf string) *GraphConfig {
log.Println("Puppet: launching translator")
var config GraphConfig
var config gconfig.GraphConfig
if data, err := runPuppetCommand(cmd); err != nil {
return nil
} else if err := config.Parse(data); err != nil {
@@ -113,7 +117,7 @@ func ParseConfigFromPuppet(puppetParam, puppetConf string) *GraphConfig {
// PuppetInterval returns the graph refresh interval from the puppet configuration.
func PuppetInterval(puppetConf string) int {
if DEBUG {
if global.DEBUG {
log.Printf("Puppet: determining graph refresh interval")
}
var cmd *exec.Cmd

View File

@@ -15,6 +15,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// Package remote provides the remoting facilities for agentless execution.
// This set of structs and methods are for running mgmt remotely over SSH. This
// gives us the architectural robustness of our current design, combined with
// the ability to run it with an "agent-less" approach for bootstrapping, and
@@ -35,7 +36,7 @@
// remote mgmt transient agents are running, they can still exchange data and
// converge together without directly connecting, since they all tunnel through
// the etcd server running on the initiator.
package main // TODO: make this a separate "remote" package
package remote
// TODO: running with two identical remote endpoints over a slow connection, eg:
// --remote file1.yaml --remote file1.yaml
@@ -46,10 +47,6 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"github.com/howeyc/gopass"
"github.com/kardianos/osext"
"github.com/pkg/sftp"
"golang.org/x/crypto/ssh"
"io"
"io/ioutil"
"log"
@@ -63,9 +60,19 @@ import (
"strings"
"sync"
"time"
cv "github.com/purpleidea/mgmt/converger"
"github.com/purpleidea/mgmt/gconfig"
"github.com/purpleidea/mgmt/util"
"github.com/howeyc/gopass"
"github.com/kardianos/osext"
"github.com/pkg/sftp"
"golang.org/x/crypto/ssh"
)
const (
DEBUG = false
// FIXME: should this dir be in /var/ instead?
formatPattern = "/tmp/mgmt.%s/" // remote format, to match `mktemp`
formatChars = "abcdefghijklmnopqrstuvwxyz0123456789" // chars for fmt string // TODO: what does mktemp use?
@@ -94,7 +101,7 @@ type SSH struct {
depth uint16 // depth of this node in the remote execution hierarchy
caching bool // whether to try and cache the copy of the binary
prefix string // location we're allowed to put data on the remote server
converger Converger
converger cv.Converger
client *ssh.Client // client object
sftp *sftp.Client // sftp object
@@ -107,6 +114,7 @@ type SSH struct {
lock sync.Mutex // mutex to avoid exit races
exiting bool // flag to let us know if we're exiting
program string // name of the binary
remotewd string // path to remote working directory
execpath string // path to remote mgmt binary
filepath string // path to remote file config
@@ -214,7 +222,7 @@ func (obj *SSH) Sftp() error {
break
}
obj.execpath = path.Join(obj.remotewd, program) // program is a compile time string from main.go
obj.execpath = path.Join(obj.remotewd, obj.program) // program is a compile time string
log.Printf("Remote: Remote path is: %s", obj.execpath)
var same bool
@@ -553,7 +561,7 @@ func (obj *SSH) ExecExit() error {
}
// FIXME: workaround: force a signal!
if _, err := obj.simpleRun(fmt.Sprintf("killall -SIGINT %s", program)); err != nil { // FIXME: low specificity
if _, err := obj.simpleRun(fmt.Sprintf("killall -SIGINT %s", obj.program)); err != nil { // FIXME: low specificity
log.Printf("Remote: Failed to send SIGINT: %s", err.Error())
}
@@ -562,12 +570,12 @@ func (obj *SSH) ExecExit() error {
// try killing the process more violently
time.Sleep(10 * time.Second)
//obj.session.Signal(ssh.SIGKILL)
cmd := fmt.Sprintf("killall -SIGKILL %s", program) // FIXME: low specificity
cmd := fmt.Sprintf("killall -SIGKILL %s", obj.program) // FIXME: low specificity
obj.simpleRun(cmd)
}()
// FIXME: workaround: wait (spin lock) until process quits cleanly...
cmd := fmt.Sprintf("while killall -0 %s 2> /dev/null; do sleep 1s; done", program) // FIXME: low specificity
cmd := fmt.Sprintf("while killall -0 %s 2> /dev/null; do sleep 1s; done", obj.program) // FIXME: low specificity
if _, err := obj.simpleRun(cmd); err != nil {
return fmt.Errorf("Error waiting: %s", err)
}
@@ -680,7 +688,7 @@ type Remotes struct {
caching bool // whether to try and cache the copy of the binary
depth uint16 // depth of this node in the remote execution hierarchy
prefix string // folder prefix to use for misc storage
converger Converger
converger cv.Converger
convergerCb func(func(map[string]bool) error) (func(), error)
wg sync.WaitGroup // keep track of each running SSH connection
@@ -690,19 +698,20 @@ type Remotes struct {
exitChan chan struct{} // closes when we should exit
semaphore Semaphore // counting semaphore to limit concurrent connections
hostnames []string // list of hostnames we've seen so far
cuuid ConvergerUUID // convergerUUID for the remote itself
cuuids map[string]ConvergerUUID // map to each SSH struct with the remote as the key
cuuid cv.ConvergerUUID // convergerUUID for the remote itself
cuuids map[string]cv.ConvergerUUID // map to each SSH struct with the remote as the key
callbackCancelFunc func() // stored callback function cancel function
program string // name of the program
}
// The NewRemotes function builds a Remotes struct.
func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fileWatch chan string, cConns uint16, interactive bool, sshPrivIdRsa string, caching bool, depth uint16, prefix string, converger Converger, convergerCb func(func(map[string]bool) error) (func(), error)) *Remotes {
func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fileWatch chan string, cConns uint16, interactive bool, sshPrivIdRsa string, caching bool, depth uint16, prefix string, converger cv.Converger, convergerCb func(func(map[string]bool) error) (func(), error), program string) *Remotes {
return &Remotes{
clientURLs: clientURLs,
remoteURLs: remoteURLs,
noop: noop,
remotes: StrRemoveDuplicatesInList(remotes),
remotes: util.StrRemoveDuplicatesInList(remotes),
fileWatch: fileWatch,
cConns: cConns,
interactive: interactive,
@@ -716,7 +725,8 @@ func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fi
exitChan: make(chan struct{}),
semaphore: NewSemaphore(int(cConns)),
hostnames: make([]string, len(remotes)),
cuuids: make(map[string]ConvergerUUID),
cuuids: make(map[string]cv.ConvergerUUID),
program: program,
}
}
@@ -724,7 +734,7 @@ func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fi
// It takes as input the path to a graph definition file.
func (obj *Remotes) NewSSH(file string) (*SSH, error) {
// first do the parsing...
config := ParseConfigFromFile(file)
config := gconfig.ParseConfigFromFile(file)
if config == nil {
return nil, fmt.Errorf("Remote: Error parsing remote graph: %s", file)
}
@@ -785,7 +795,7 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) {
if hostname == "" {
hostname = host // default to above
}
if StrInList(hostname, obj.hostnames) {
if util.StrInList(hostname, obj.hostnames) {
return nil, fmt.Errorf("Remote: Hostname `%s` already exists!", hostname)
}
obj.hostnames = append(obj.hostnames, hostname)
@@ -805,6 +815,7 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) {
caching: obj.caching,
converger: obj.converger,
prefix: obj.prefix,
program: obj.program,
}, nil
}
@@ -872,7 +883,7 @@ func (obj *Remotes) passwordCallback(user, host string) func() (string, error) {
return p, nil
case e := <-failchan:
return "", e
case <-TimeAfterOrBlock(timeout):
case <-util.TimeAfterOrBlock(timeout):
return "", fmt.Errorf("Interactive timeout reached!")
}
}

View File

@@ -15,7 +15,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main
package resources
import (
"bufio"
@@ -27,6 +27,9 @@ import (
"os/exec"
"strings"
"time"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/util"
)
func init() {
@@ -107,7 +110,7 @@ func (obj *ExecRes) BufioChanScanner(scanner *bufio.Scanner) (chan string, chan
}
// Watch is the primary listener for this resource and it outputs events.
func (obj *ExecRes) Watch(processChan chan Event) error {
func (obj *ExecRes) Watch(processChan chan event.Event) error {
if obj.IsWatching() {
return nil
}
@@ -167,7 +170,7 @@ func (obj *ExecRes) Watch(processChan chan Event) error {
}
for {
obj.SetState(resStateWatching) // reset
obj.SetState(ResStateWatching) // reset
select {
case text := <-bufioch:
cuuid.SetConverged(false)
@@ -312,7 +315,7 @@ func (obj *ExecRes) CheckApply(apply bool) (checkok bool, err error) {
return false, err
}
case <-TimeAfterOrBlock(timeout):
case <-util.TimeAfterOrBlock(timeout):
log.Printf("%v[%v]: Timeout waiting for Cmd", obj.Kind(), obj.GetName())
//cmd.Process.Kill() // TODO: is this necessary?
return false, errors.New("Timeout waiting for Cmd!")

View File

@@ -15,15 +15,13 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main
package resources
import (
"bytes"
"crypto/sha256"
"encoding/hex"
"gopkg.in/fsnotify.v1"
//"github.com/go-fsnotify/fsnotify" // git master of "gopkg.in/fsnotify.v1"
"encoding/gob"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
@@ -35,6 +33,13 @@ import (
"strings"
"syscall"
"time"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/global" // XXX: package mgmtmain instead?
"github.com/purpleidea/mgmt/util"
"gopkg.in/fsnotify.v1"
//"github.com/go-fsnotify/fsnotify" // git master of "gopkg.in/fsnotify.v1"
)
func init() {
@@ -95,8 +100,8 @@ func (obj *FileRes) Init() {
// GetPath returns the actual path to use for this resource. It computes this
// after analysis of the Path, Dirname and Basename values. Dirs end with slash.
func (obj *FileRes) GetPath() string {
d := Dirname(obj.Path)
b := Basename(obj.Path)
d := util.Dirname(obj.Path)
b := util.Basename(obj.Path)
if obj.Dirname == "" && obj.Basename == "" {
return obj.Path
}
@@ -143,7 +148,7 @@ func (obj *FileRes) addSubFolders(p string) error {
}
// look at all subfolders...
walkFn := func(path string, info os.FileInfo, err error) error {
if DEBUG {
if global.DEBUG {
log.Printf("%s[%s]: Walk: %s (%v): %v", obj.Kind(), obj.GetName(), path, info, err)
}
if err != nil {
@@ -168,7 +173,7 @@ func (obj *FileRes) addSubFolders(p string) error {
// If the Watch returns an error, it means that something has gone wrong, and it
// must be restarted. On a clean exit it returns nil.
// FIXME: Also watch the source directory when using obj.Source !!!
func (obj *FileRes) Watch(processChan chan Event) error {
func (obj *FileRes) Watch(processChan chan event.Event) error {
if obj.IsWatching() {
return nil // TODO: should this be an error?
}
@@ -195,7 +200,7 @@ func (obj *FileRes) Watch(processChan chan Event) error {
}
defer obj.watcher.Close()
patharray := PathSplit(safename) // tokenize the path
patharray := util.PathSplit(safename) // tokenize the path
var index = len(patharray) // starting index
var current string // current "watcher" location
var deltaDepth int // depth delta between watcher and event
@@ -221,13 +226,13 @@ func (obj *FileRes) Watch(processChan chan Event) error {
if current == "" { // the empty string top is the root dir ("/")
current = "/"
}
if DEBUG {
if global.DEBUG {
log.Printf("%s[%s]: Watching: %v", obj.Kind(), obj.GetName(), current) // attempting to watch...
}
// initialize in the loop so that we can reset on rm-ed handles
err = obj.watcher.Add(current)
if err != nil {
if DEBUG {
if global.DEBUG {
log.Printf("%s[%s]: watcher.Add(%v): Error: %v", obj.Kind(), obj.GetName(), current, err)
}
if err == syscall.ENOENT {
@@ -246,10 +251,10 @@ func (obj *FileRes) Watch(processChan chan Event) error {
continue
}
obj.SetState(resStateWatching) // reset
obj.SetState(ResStateWatching) // reset
select {
case event := <-obj.watcher.Events:
if DEBUG {
if global.DEBUG {
log.Printf("%s[%s]: Watch(%s), Event(%s): %v", obj.Kind(), obj.GetName(), current, event.Name, event.Op)
}
cuuid.SetConverged(false) // XXX: technically i can detect if the event is erroneous or not first
@@ -259,11 +264,11 @@ func (obj *FileRes) Watch(processChan chan Event) error {
if current == event.Name {
deltaDepth = 0 // i was watching what i was looking for
} else if HasPathPrefix(event.Name, current) {
deltaDepth = len(PathSplit(current)) - len(PathSplit(event.Name)) // -1 or less
} else if util.HasPathPrefix(event.Name, current) {
deltaDepth = len(util.PathSplit(current)) - len(util.PathSplit(event.Name)) // -1 or less
} else if HasPathPrefix(current, event.Name) {
deltaDepth = len(PathSplit(event.Name)) - len(PathSplit(current)) // +1 or more
} else if util.HasPathPrefix(current, event.Name) {
deltaDepth = len(util.PathSplit(event.Name)) - len(util.PathSplit(current)) // +1 or more
// if below me...
if _, exists := obj.watches[event.Name]; exists {
send = true
@@ -317,7 +322,7 @@ func (obj *FileRes) Watch(processChan chan Event) error {
}
// if safename starts with event.Name, we're above, and no event should be sent
} else if HasPathPrefix(safename, event.Name) {
} else if util.HasPathPrefix(safename, event.Name) {
//log.Println("Above!")
if deltaDepth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) {
@@ -328,7 +333,7 @@ func (obj *FileRes) Watch(processChan chan Event) error {
if deltaDepth < 0 {
log.Println("Parent!")
if PathPrefixDelta(safename, event.Name) == 1 { // we're the parent dir
if util.PathPrefixDelta(safename, event.Name) == 1 { // we're the parent dir
send = true
dirty = true
}
@@ -337,7 +342,7 @@ func (obj *FileRes) Watch(processChan chan Event) error {
}
// if event.Name startswith safename, send event, we're already deeper
} else if HasPathPrefix(event.Name, safename) {
} else if util.HasPathPrefix(event.Name, safename) {
//log.Println("Event2!")
send = true
dirty = true
@@ -450,7 +455,7 @@ func mapPaths(fileInfos []FileInfo) map[string]FileInfo {
func (obj *FileRes) fileCheckApply(apply bool, src io.ReadSeeker, dst string, sha256sum string) (string, bool, error) {
// TODO: does it make sense to switch dst to an io.Writer ?
// TODO: use obj.Force when dealing with symlinks and other file types!
if DEBUG {
if global.DEBUG {
log.Printf("fileCheckApply: %s -> %s", src, dst)
}
@@ -547,7 +552,7 @@ func (obj *FileRes) fileCheckApply(apply bool, src io.ReadSeeker, dst string, sh
if !apply {
return sha256sum, false, nil
}
if DEBUG {
if global.DEBUG {
log.Printf("fileCheckApply: Apply: %s -> %s", src, dst)
}
@@ -568,12 +573,12 @@ func (obj *FileRes) fileCheckApply(apply bool, src io.ReadSeeker, dst string, sh
// syscall.Splice(rfd int, roff *int64, wfd int, woff *int64, len int, flags int) (n int64, err error)
// TODO: should we offer a way to cancel the copy on ^C ?
if DEBUG {
if global.DEBUG {
log.Printf("fileCheckApply: Copy: %s -> %s", src, dst)
}
if n, err := io.Copy(dstFile, src); err != nil {
return sha256sum, false, err
} else if DEBUG {
} else if global.DEBUG {
log.Printf("fileCheckApply: Copied: %v", n)
}
return sha256sum, false, dstFile.Sync()
@@ -583,7 +588,7 @@ func (obj *FileRes) fileCheckApply(apply bool, src io.ReadSeeker, dst string, sh
// It is recursive and can create directories directly, and files via the usual
// fileCheckApply method. It returns checkOK and error as is normally expected.
func (obj *FileRes) syncCheckApply(apply bool, src, dst string) (bool, error) {
if DEBUG {
if global.DEBUG {
log.Printf("syncCheckApply: %s -> %s", src, dst)
}
if src == "" || dst == "" {
@@ -601,12 +606,12 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string) (bool, error) {
}
if !srcIsDir && !dstIsDir {
if DEBUG {
if global.DEBUG {
log.Printf("syncCheckApply: %s -> %s", src, dst)
}
fin, err := os.Open(src)
if err != nil {
if DEBUG && os.IsNotExist(err) { // if we get passed an empty src
if global.DEBUG && os.IsNotExist(err) { // if we get passed an empty src
log.Printf("syncCheckApply: Missing src: %s", src)
}
return false, err
@@ -662,7 +667,7 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string) (bool, error) {
delete(smartDst, relPathFile) // rm from purge list
}
if DEBUG {
if global.DEBUG {
log.Printf("syncCheckApply: mkdir -m %s '%s'", fileInfo.Mode(), absDst)
}
if err := os.Mkdir(absDst, fileInfo.Mode()); err != nil {
@@ -673,7 +678,7 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string) (bool, error) {
// if we're a regular file, the recurse will create it
}
if DEBUG {
if global.DEBUG {
log.Printf("syncCheckApply: Recurse: %s -> %s", absSrc, absDst)
}
if obj.Recurse {
@@ -888,7 +893,7 @@ func (obj *FileResAutoEdges) Test(input []bool) bool {
// the bottom up!
func (obj *FileRes) AutoEdges() AutoEdge {
var data []ResUUID // store linear result chain here...
values := PathSplitFullReversed(obj.path) // build it
values := util.PathSplitFullReversed(obj.path) // build it
_, values = values[0], values[1:] // get rid of first value which is me!
for _, x := range values {
var reversed = true // cheat by passing a pointer

View File

@@ -15,12 +15,14 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main
package resources
import (
"encoding/gob"
"log"
"time"
"github.com/purpleidea/mgmt/event"
)
func init() {
@@ -58,7 +60,7 @@ func (obj *NoopRes) Validate() bool {
}
// Watch is the primary listener for this resource and it outputs events.
func (obj *NoopRes) Watch(processChan chan Event) error {
func (obj *NoopRes) Watch(processChan chan event.Event) error {
if obj.IsWatching() {
return nil // TODO: should this be an error?
}
@@ -79,7 +81,7 @@ func (obj *NoopRes) Watch(processChan chan Event) error {
var send = false // send event?
var exit = false
for {
obj.SetState(resStateWatching) // reset
obj.SetState(ResStateWatching) // reset
select {
case event := <-obj.events:
cuuid.SetConverged(false)

View File

@@ -17,15 +17,17 @@
// DOCS: https://www.freedesktop.org/software/PackageKit/gtk-doc/index.html
//package packagekit // TODO
package main
package packagekit
import (
"fmt"
"github.com/godbus/dbus"
"log"
"runtime"
"strings"
"github.com/purpleidea/mgmt/util"
"github.com/godbus/dbus"
)
// global tweaks of verbosity and code path
@@ -160,7 +162,7 @@ type PkPackageIDActionData struct {
// NewBus returns a new bus connection.
func NewBus() *Conn {
// if we share the bus with others, we will get each others messages!!
bus, err := SystemBusPrivateUsable() // don't share the bus connection!
bus, err := util.SystemBusPrivateUsable() // don't share the bus connection!
if err != nil {
return nil
}
@@ -422,7 +424,7 @@ loop:
} else {
return fmt.Errorf("PackageKit: Error: %v", signal.Body)
}
case <-TimeAfterOrBlock(timeout):
case <-util.TimeAfterOrBlock(timeout):
if finished {
log.Println("PackageKit: Timeout: InstallPackages: Waiting for 'Destroy'")
return nil // got tired of waiting for Destroy

View File

@@ -15,10 +15,9 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main
package resources
import (
//"packagekit" // TODO
"encoding/gob"
"errors"
"fmt"
@@ -26,6 +25,11 @@ import (
"path"
"strings"
"time"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/global" // XXX: package mgmtmain instead?
"github.com/purpleidea/mgmt/resources/packagekit"
"github.com/purpleidea/mgmt/util"
)
func init() {
@@ -39,7 +43,7 @@ type PkgRes struct {
AllowUntrusted bool `yaml:"allowuntrusted"` // allow untrusted packages to be installed?
AllowNonFree bool `yaml:"allownonfree"` // allow nonfree packages to be found?
AllowUnsupported bool `yaml:"allowunsupported"` // allow unsupported packages to be found?
//bus *Conn // pk bus connection
//bus *packagekit.Conn // pk bus connection
fileList []string // FIXME: update if pkg changes
}
@@ -63,7 +67,7 @@ func (obj *PkgRes) Init() {
obj.BaseRes.kind = "Pkg"
obj.BaseRes.Init() // call base init, b/c we're overriding
bus := NewBus()
bus := packagekit.NewBus()
if bus == nil {
log.Fatal("Can't connect to PackageKit bus.")
}
@@ -92,7 +96,7 @@ func (obj *PkgRes) Init() {
return
}
if files, ok := filesMap[data.PackageID]; ok {
obj.fileList = DirifyFileList(files, false)
obj.fileList = util.DirifyFileList(files, false)
}
}
@@ -109,7 +113,7 @@ func (obj *PkgRes) Validate() bool {
// It uses the PackageKit UpdatesChanged signal to watch for changes.
// TODO: https://github.com/hughsie/PackageKit/issues/109
// TODO: https://github.com/hughsie/PackageKit/issues/110
func (obj *PkgRes) Watch(processChan chan Event) error {
func (obj *PkgRes) Watch(processChan chan event.Event) error {
if obj.IsWatching() {
return nil
}
@@ -127,7 +131,7 @@ func (obj *PkgRes) Watch(processChan chan Event) error {
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
}
bus := NewBus()
bus := packagekit.NewBus()
if bus == nil {
log.Fatal("Can't connect to PackageKit bus.")
}
@@ -143,17 +147,17 @@ func (obj *PkgRes) Watch(processChan chan Event) error {
var dirty = false
for {
if DEBUG {
if global.DEBUG {
log.Printf("%v: Watching...", obj.fmtNames(obj.getNames()))
}
obj.SetState(resStateWatching) // reset
obj.SetState(ResStateWatching) // reset
select {
case event := <-ch:
cuuid.SetConverged(false)
// FIXME: ask packagekit for info on what packages changed
if DEBUG {
if global.DEBUG {
log.Printf("%v: Event: %v", obj.fmtNames(obj.getNames()), event.Name)
}
@@ -236,23 +240,23 @@ func (obj *PkgRes) groupMappingHelper() map[string]string {
return result
}
func (obj *PkgRes) pkgMappingHelper(bus *Conn) (map[string]*PkPackageIDActionData, error) {
func (obj *PkgRes) pkgMappingHelper(bus *packagekit.Conn) (map[string]*packagekit.PkPackageIDActionData, error) {
packageMap := obj.groupMappingHelper() // get the grouped values
packageMap[obj.Name] = obj.State // key is pkg name, value is pkg state
var filter uint64 // initializes at the "zero" value of 0
filter += PK_FILTER_ENUM_ARCH // always search in our arch (optional!)
filter += packagekit.PK_FILTER_ENUM_ARCH // always search in our arch (optional!)
// we're requesting latest version, or to narrow down install choices!
if obj.State == "newest" || obj.State == "installed" {
// if we add this, we'll still see older packages if installed
// this is an optimization, and is *optional*, this logic is
// handled inside of PackagesToPackageIDs now automatically!
filter += PK_FILTER_ENUM_NEWEST // only search for newest packages
filter += packagekit.PK_FILTER_ENUM_NEWEST // only search for newest packages
}
if !obj.AllowNonFree {
filter += PK_FILTER_ENUM_FREE
filter += packagekit.PK_FILTER_ENUM_FREE
}
if !obj.AllowUnsupported {
filter += PK_FILTER_ENUM_SUPPORTED
filter += packagekit.PK_FILTER_ENUM_SUPPORTED
}
result, e := bus.PackagesToPackageIDs(packageMap, filter)
if e != nil {
@@ -274,7 +278,7 @@ func (obj *PkgRes) CheckApply(apply bool) (checkok bool, err error) {
return true, nil
}
bus := NewBus()
bus := packagekit.NewBus()
if bus == nil {
return false, errors.New("Can't connect to PackageKit bus.")
}
@@ -287,18 +291,18 @@ func (obj *PkgRes) CheckApply(apply bool) (checkok bool, err error) {
packageMap := obj.groupMappingHelper() // map[string]string
packageList := []string{obj.Name}
packageList = append(packageList, StrMapKeys(packageMap)...)
packageList = append(packageList, util.StrMapKeys(packageMap)...)
//stateList := []string{obj.State}
//stateList = append(stateList, StrMapValues(packageMap)...)
//stateList = append(stateList, util.StrMapValues(packageMap)...)
// TODO: at the moment, all the states are the same, but
// eventually we might be able to drop this constraint!
states, err := FilterState(result, packageList, obj.State)
states, err := packagekit.FilterState(result, packageList, obj.State)
if err != nil {
return false, fmt.Errorf("The FilterState method failed with: %v.", err)
}
data, _ := result[obj.Name] // if above didn't error, we won't either!
validState := BoolMapTrue(BoolMapValues(states))
validState := util.BoolMapTrue(util.BoolMapValues(states))
// obj.State == "installed" || "uninstalled" || "newest" || "4.2-1.fc23"
switch obj.State {
@@ -325,20 +329,20 @@ func (obj *PkgRes) CheckApply(apply bool) (checkok bool, err error) {
// apply portion
log.Printf("%v: Apply", obj.fmtNames(obj.getNames()))
readyPackages, err := FilterPackageState(result, packageList, obj.State)
readyPackages, err := packagekit.FilterPackageState(result, packageList, obj.State)
if err != nil {
return false, err // fail
}
// these are the packages that actually need their states applied!
applyPackages := StrFilterElementsInList(readyPackages, packageList)
packageIDs, _ := FilterPackageIDs(result, applyPackages) // would be same err as above
applyPackages := util.StrFilterElementsInList(readyPackages, packageList)
packageIDs, _ := packagekit.FilterPackageIDs(result, applyPackages) // would be same err as above
var transactionFlags uint64 // initializes at the "zero" value of 0
if !obj.AllowUntrusted { // allow
transactionFlags += PK_TRANSACTION_FLAG_ENUM_ONLY_TRUSTED
transactionFlags += packagekit.PK_TRANSACTION_FLAG_ENUM_ONLY_TRUSTED
}
// apply correct state!
log.Printf("%v: Set: %v...", obj.fmtNames(StrListIntersection(applyPackages, obj.getNames())), obj.State)
log.Printf("%v: Set: %v...", obj.fmtNames(util.StrListIntersection(applyPackages, obj.getNames())), obj.State)
switch obj.State {
case "uninstalled": // run remove
// NOTE: packageID is different than when installed, because now
@@ -356,7 +360,7 @@ func (obj *PkgRes) CheckApply(apply bool) (checkok bool, err error) {
if err != nil {
return false, err // fail
}
log.Printf("%v: Set: %v success!", obj.fmtNames(StrListIntersection(applyPackages, obj.getNames())), obj.State)
log.Printf("%v: Set: %v success!", obj.fmtNames(util.StrListIntersection(applyPackages, obj.getNames())), obj.State)
obj.isStateOK = true // reset
return false, nil // success
}
@@ -450,16 +454,16 @@ func (obj *PkgResAutoEdges) Test(input []bool) bool {
var dirs = make([]string, count)
done := []string{}
for i := 0; i < count; i++ {
dir := Dirname(obj.fileList[i]) // dirname of /foo/ should be /
dir := util.Dirname(obj.fileList[i]) // dirname of /foo/ should be /
dirs[i] = dir
if input[i] {
done = append(done, dir)
}
}
nodupes := StrRemoveDuplicatesInList(dirs) // remove duplicates
nodones := StrFilterElementsInList(done, nodupes) // filter out done
noempty := StrFilterElementsInList([]string{""}, nodones) // remove the "" from /
obj.fileList = RemoveCommonFilePrefixes(noempty) // magic
nodupes := util.StrRemoveDuplicatesInList(dirs) // remove duplicates
nodones := util.StrFilterElementsInList(done, nodupes) // filter out done
noempty := util.StrFilterElementsInList([]string{""}, nodones) // remove the "" from /
obj.fileList = util.RemoveCommonFilePrefixes(noempty) // magic
if len(obj.fileList) == 0 { // nothing more, don't continue
return false
@@ -489,7 +493,7 @@ func (obj *PkgRes) AutoEdges() AutoEdge {
}
return &PkgResAutoEdges{
fileList: RemoveCommonFilePrefixes(obj.fileList), // clean start!
fileList: util.RemoveCommonFilePrefixes(obj.fileList), // clean start!
svcUUIDs: svcUUIDs,
testIsNext: false, // start with Next() call
name: obj.GetName(), // save data for PkgResAutoEdges obj
@@ -573,7 +577,7 @@ func ReturnSvcInFileList(fileList []string) []string {
if !strings.HasSuffix(basename, ".service") {
continue
}
if s := strings.TrimSuffix(basename, ".service"); !StrInList(s, result) {
if s := strings.TrimSuffix(basename, ".service"); !util.StrInList(s, result) {
result = append(result, s)
}
}

View File

@@ -15,7 +15,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main
package resources
import (
"bytes"
@@ -23,17 +23,22 @@ import (
"encoding/gob"
"fmt"
"log"
// TODO: should each resource be a sub-package?
"github.com/purpleidea/mgmt/converger"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/global"
)
//go:generate stringer -type=resState -output=resstate_stringer.go
type resState int
//go:generate stringer -type=ResState -output=resstate_stringer.go
type ResState int
const (
resStateNil resState = iota
resStateWatching
resStateEvent // an event has happened, but we haven't poked yet
resStateCheckApply
resStatePoking
ResStateNil ResState = iota
ResStateWatching
ResStateEvent // an event has happened, but we haven't poked yet
ResStateCheckApply
ResStatePoking
)
// ResUUID is a unique identifier for a resource, namely it's name, and the kind ("type").
@@ -76,18 +81,18 @@ type MetaParams struct {
type Base interface {
GetName() string // can't be named "Name()" because of struct field
SetName(string)
setKind(string)
SetKind(string)
Kind() string
Meta() *MetaParams
Events() chan Event
AssociateData(Converger)
Events() chan event.Event
AssociateData(converger.Converger)
IsWatching() bool
SetWatching(bool)
GetState() resState
SetState(resState)
DoSend(chan Event, string) (bool, error)
SendEvent(eventName, bool, bool) bool
ReadEvent(*Event) (bool, bool) // TODO: optional here?
GetState() ResState
SetState(ResState)
DoSend(chan event.Event, string) (bool, error)
SendEvent(event.EventName, bool, bool) bool
ReadEvent(*event.Event) (bool, bool) // TODO: optional here?
GroupCmp(Res) bool // TODO: is there a better name for this?
GroupRes(Res) error // group resource (arg) into self
IsGrouped() bool // am I grouped?
@@ -102,7 +107,7 @@ type Res interface {
Init()
//Validate() bool // TODO: this might one day be added
GetUUIDs() []ResUUID // most resources only return one
Watch(chan Event) error // send on channel to signal process() events
Watch(chan event.Event) error // send on channel to signal process() events
CheckApply(bool) (bool, error)
AutoEdges() AutoEdge
Compare(Res) bool
@@ -114,9 +119,9 @@ type BaseRes struct {
Name string `yaml:"name"`
MetaParams MetaParams `yaml:"meta"` // struct of all the metaparams
kind string
events chan Event
converger Converger // converged tracking
state resState
events chan event.Event
converger converger.Converger // converged tracking
state ResState
watching bool // is Watch() loop running ?
isStateOK bool // whether the state is okay based on events or not
isGrouped bool // am i contained within a group?
@@ -166,7 +171,7 @@ func (obj *BaseUUID) Reversed() bool {
// Init initializes structures like channels if created without New constructor.
func (obj *BaseRes) Init() {
obj.events = make(chan Event) // unbuffered chan size to avoid stale events
obj.events = make(chan event.Event) // unbuffered chan size to avoid stale events
}
// GetName is used by all the resources to Get the name.
@@ -179,8 +184,8 @@ func (obj *BaseRes) SetName(name string) {
obj.Name = name
}
// setKind sets the kind. This is used internally for exported resources.
func (obj *BaseRes) setKind(kind string) {
// SetKind sets the kind. This is used internally for exported resources.
func (obj *BaseRes) SetKind(kind string) {
obj.kind = kind
}
@@ -195,12 +200,12 @@ func (obj *BaseRes) Meta() *MetaParams {
}
// Events returns the channel of events to listen on.
func (obj *BaseRes) Events() chan Event {
func (obj *BaseRes) Events() chan event.Event {
return obj.events
}
// AssociateData associates some data with the object in question.
func (obj *BaseRes) AssociateData(converger Converger) {
func (obj *BaseRes) AssociateData(converger converger.Converger) {
obj.converger = converger
}
@@ -215,13 +220,13 @@ func (obj *BaseRes) SetWatching(b bool) {
}
// GetState returns the state of the resource.
func (obj *BaseRes) GetState() resState {
func (obj *BaseRes) GetState() ResState {
return obj.state
}
// SetState sets the state of the resource.
func (obj *BaseRes) SetState(state resState) {
if DEBUG {
func (obj *BaseRes) SetState(state ResState) {
if global.DEBUG {
log.Printf("%v[%v]: State: %v -> %v", obj.Kind(), obj.GetName(), obj.GetState(), state)
}
obj.state = state
@@ -230,9 +235,9 @@ func (obj *BaseRes) SetState(state resState) {
// DoSend sends off an event, but doesn't block the incoming event queue. It can
// also recursively call itself when events need processing during the wait.
// I'm not completely comfortable with this fn, but it will have to do for now.
func (obj *BaseRes) DoSend(processChan chan Event, comment string) (bool, error) {
resp := NewResp()
processChan <- Event{eventNil, resp, comment, true} // trigger process
func (obj *BaseRes) DoSend(processChan chan event.Event, comment string) (bool, error) {
resp := event.NewResp()
processChan <- event.Event{event.EventNil, resp, comment, true} // trigger process
select {
case e := <-resp: // wait for the ACK()
if e != nil { // we got a NACK
@@ -252,47 +257,47 @@ func (obj *BaseRes) DoSend(processChan chan Event, comment string) (bool, error)
}
// SendEvent pushes an event into the message queue for a particular vertex
func (obj *BaseRes) SendEvent(event eventName, sync bool, activity bool) bool {
func (obj *BaseRes) SendEvent(ev event.EventName, sync bool, activity bool) bool {
// TODO: isn't this race-y ?
if !obj.IsWatching() { // element has already exited
return false // if we don't return, we'll block on the send
}
if !sync {
obj.events <- Event{event, nil, "", activity}
obj.events <- event.Event{ev, nil, "", activity}
return true
}
resp := NewResp()
obj.events <- Event{event, resp, "", activity}
resp := event.NewResp()
obj.events <- event.Event{ev, resp, "", activity}
resp.ACKWait() // waits until true (nil) value
return true
}
// ReadEvent processes events when a select gets one, and handles the pause
// code too! The return values specify if we should exit and poke respectively.
func (obj *BaseRes) ReadEvent(event *Event) (exit, poke bool) {
event.ACK()
switch event.Name {
case eventStart:
func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, poke bool) {
ev.ACK()
switch ev.Name {
case event.EventStart:
return false, true
case eventPoke:
case event.EventPoke:
return false, true
case eventBackPoke:
case event.EventBackPoke:
return false, true // forward poking in response to a back poke!
case eventExit:
case event.EventExit:
return true, false
case eventPause:
case event.EventPause:
// wait for next event to continue
select {
case e := <-obj.events:
e.ACK()
if e.Name == eventExit {
if e.Name == event.EventExit {
return true, false
} else if e.Name == eventStart { // eventContinue
} else if e.Name == event.EventStart { // eventContinue
return false, false // don't poke on unpause!
} else {
// if we get a poke event here, it's a bug!
@@ -301,7 +306,7 @@ func (obj *BaseRes) ReadEvent(event *Event) (exit, poke bool) {
}
default:
log.Fatal("Unknown event: ", event)
log.Fatal("Unknown event: ", ev)
}
return true, false // required to keep the stupid go compiler happy
}

View File

@@ -15,7 +15,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main
package resources
import (
"bytes"

View File

@@ -17,17 +17,21 @@
// DOCS: https://godoc.org/github.com/coreos/go-systemd/dbus
package main
package resources
import (
"encoding/gob"
"errors"
"fmt"
"log"
"time"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/util"
systemd "github.com/coreos/go-systemd/dbus" // change namespace
systemdUtil "github.com/coreos/go-systemd/util"
"github.com/godbus/dbus" // namespace collides with systemd wrapper
"log"
"time"
)
func init() {
@@ -72,7 +76,7 @@ func (obj *SvcRes) Validate() bool {
}
// Watch is the primary listener for this resource and it outputs events.
func (obj *SvcRes) Watch(processChan chan Event) error {
func (obj *SvcRes) Watch(processChan chan event.Event) error {
if obj.IsWatching() {
return nil
}
@@ -102,7 +106,7 @@ func (obj *SvcRes) Watch(processChan chan Event) error {
defer conn.Close()
// if we share the bus with others, we will get each others messages!!
bus, err := SystemBusPrivateUsable() // don't share the bus connection!
bus, err := util.SystemBusPrivateUsable() // don't share the bus connection!
if err != nil {
return fmt.Errorf("Failed to connect to bus: %s", err)
}
@@ -157,7 +161,7 @@ func (obj *SvcRes) Watch(processChan chan Event) error {
set.Remove(svc) // no return value should ever occur
}
obj.SetState(resStateWatching) // reset
obj.SetState(ResStateWatching) // reset
select {
case <-buschan: // XXX wait for new units event to unstick
cuuid.SetConverged(false)
@@ -189,7 +193,7 @@ func (obj *SvcRes) Watch(processChan chan Event) error {
}
log.Printf("Watching: %v", svc) // attempting to watch...
obj.SetState(resStateWatching) // reset
obj.SetState(ResStateWatching) // reset
select {
case event := <-subChannel:

View File

@@ -15,12 +15,14 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main
package resources
import (
"encoding/gob"
"log"
"time"
"github.com/purpleidea/mgmt/event"
)
func init() {
@@ -65,7 +67,7 @@ func (obj *TimerRes) Validate() bool {
}
// Watch is the primary listener for this resource and it outputs events.
func (obj *TimerRes) Watch(processChan chan Event) error {
func (obj *TimerRes) Watch(processChan chan event.Event) error {
if obj.IsWatching() {
return nil
}
@@ -90,7 +92,7 @@ func (obj *TimerRes) Watch(processChan chan Event) error {
var send = false
for {
obj.SetState(resStateWatching)
obj.SetState(ResStateWatching)
select {
case <-ticker.C: // received the timer event
send = true

View File

@@ -15,14 +15,16 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main
// Package util contains a collection of miscellaneous utility functions.
package util
import (
"github.com/godbus/dbus"
"path"
"sort"
"strings"
"time"
"github.com/godbus/dbus"
)
// FirstToUpper returns the string with the first character capitalized.

View File

@@ -15,7 +15,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main
package util
import (
"reflect"
@@ -23,7 +23,7 @@ import (
"testing"
)
func TestMiscT1(t *testing.T) {
func TestUtilT1(t *testing.T) {
if Dirname("/foo/bar/baz") != "/foo/bar/" {
t.Errorf("Result is incorrect.")
@@ -62,7 +62,7 @@ func TestMiscT1(t *testing.T) {
}
}
func TestMiscT2(t *testing.T) {
func TestUtilT2(t *testing.T) {
// TODO: compare the output with the actual list
p0 := "/"
@@ -86,7 +86,7 @@ func TestMiscT2(t *testing.T) {
}
}
func TestMiscT3(t *testing.T) {
func TestUtilT3(t *testing.T) {
if HasPathPrefix("/foo/bar/baz", "/foo/ba") != false {
t.Errorf("Result should be false.")
@@ -117,7 +117,7 @@ func TestMiscT3(t *testing.T) {
}
}
func TestMiscT4(t *testing.T) {
func TestUtilT4(t *testing.T) {
if PathPrefixDelta("/foo/bar/baz", "/foo/ba") != -1 {
t.Errorf("Result should be -1.")
@@ -152,7 +152,7 @@ func TestMiscT4(t *testing.T) {
}
}
func TestMiscT8(t *testing.T) {
func TestUtilT8(t *testing.T) {
r0 := []string{"/"}
if fullList0 := PathSplitFullReversed("/"); !reflect.DeepEqual(r0, fullList0) {
@@ -171,7 +171,7 @@ func TestMiscT8(t *testing.T) {
}
func TestMiscT9(t *testing.T) {
func TestUtilT9(t *testing.T) {
fileListIn := []string{ // list taken from drbd-utils package
"/etc/drbd.conf",
"/etc/drbd.d/global_common.conf",
@@ -315,7 +315,7 @@ func TestMiscT9(t *testing.T) {
}
}
func TestMiscT10(t *testing.T) {
func TestUtilT10(t *testing.T) {
fileListIn := []string{ // fake package list
"/etc/drbd.conf",
"/usr/share/man/man8/drbdsetup.8.gz",
@@ -351,7 +351,7 @@ func TestMiscT10(t *testing.T) {
}
}
func TestMiscT11(t *testing.T) {
func TestUtilT11(t *testing.T) {
in1 := []string{"/", "/usr/", "/usr/lib/", "/usr/share/"} // input
ex1 := []string{"/usr/lib/", "/usr/share/"} // expected
sort.Strings(ex1)
@@ -724,7 +724,7 @@ func TestMiscT11(t *testing.T) {
}
}
func TestMiscFlattenListWithSplit1(t *testing.T) {
func TestUtilFlattenListWithSplit1(t *testing.T) {
{
in := []string{} // input
ex := []string{} // expected