Refactor etcd into object and add exit timers

This refactors my etcd use into a struct (object) wrapper, which makes
it easier to add an exit on converged timer.
This commit is contained in:
James Shubin
2016-01-06 19:35:29 -05:00
parent 95489b9c07
commit 72525d30b1
9 changed files with 205 additions and 30 deletions

View File

@@ -99,6 +99,13 @@ documentation, please run `mgmt --help`.
####`--file <graph.yaml>` ####`--file <graph.yaml>`
Point to a graph file to run. Point to a graph file to run.
####`--converged-timeout <seconds>`
Exit if the machine has converged for approximately this many seconds.
####`--max-runtime <seconds>`
Exit when the agent has run for approximately this many seconds. This is not
generally recommended, but may be useful for users who know what they're doing.
##Examples ##Examples
For example configurations, please consult the [examples/](https://github.com/purpleidea/mgmt/tree/master/examples) directory in the git For example configurations, please consult the [examples/](https://github.com/purpleidea/mgmt/tree/master/examples) directory in the git
source repository. It is available from: source repository. It is available from:

View File

@@ -18,9 +18,6 @@
package main package main
import ( import (
//etcd_context "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
etcd "github.com/coreos/etcd/client"
"errors" "errors"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
"io/ioutil" "io/ioutil"
@@ -82,7 +79,7 @@ func ParseConfigFromFile(filename string) *graphConfig {
return &config return &config
} }
func UpdateGraphFromConfig(config *graphConfig, hostname string, g *Graph, kapi etcd.KeysAPI) { func UpdateGraphFromConfig(config *graphConfig, hostname string, g *Graph, etcdO *EtcdWObject) {
var NoopMap map[string]*Vertex = make(map[string]*Vertex) var NoopMap map[string]*Vertex = make(map[string]*Vertex)
var FileMap map[string]*Vertex = make(map[string]*Vertex) var FileMap map[string]*Vertex = make(map[string]*Vertex)
@@ -116,7 +113,7 @@ func UpdateGraphFromConfig(config *graphConfig, hostname string, g *Graph, kapi
if strings.HasPrefix(t.Name, "@@") { // exported resource if strings.HasPrefix(t.Name, "@@") { // exported resource
// add to etcd storage... // add to etcd storage...
t.Name = t.Name[2:] //slice off @@ t.Name = t.Name[2:] //slice off @@
if !EtcdPut(kapi, hostname, t.Name, "file", t) { if !etcdO.EtcdPut(hostname, t.Name, "file", t) {
log.Printf("Problem exporting file resource %v.", t.Name) log.Printf("Problem exporting file resource %v.", t.Name)
continue continue
} }
@@ -146,13 +143,13 @@ func UpdateGraphFromConfig(config *graphConfig, hostname string, g *Graph, kapi
// lookup from etcd graph // lookup from etcd graph
// do all the graph look ups in one single step, so that if the etcd // do all the graph look ups in one single step, so that if the etcd
// database changes, we don't have a partial state of affairs... // database changes, we don't have a partial state of affairs...
nodes, ok := EtcdGet(kapi) nodes, ok := etcdO.EtcdGet()
if ok { if ok {
for _, t := range config.Collector { for _, t := range config.Collector {
// XXX: use t.Type and optionally t.Pattern to collect from etcd storage // XXX: use t.Type and optionally t.Pattern to collect from etcd storage
log.Printf("Collect: %v; Pattern: %v", t.Type, t.Pattern) log.Printf("Collect: %v; Pattern: %v", t.Type, t.Pattern)
for _, x := range EtcdGetProcess(nodes, "file") { for _, x := range etcdO.EtcdGetProcess(nodes, "file") {
var obj *FileType var obj *FileType
if B64ToObj(x, &obj) != true { if B64ToObj(x, &obj) != true {
log.Printf("Collect: File: %v not collected!", x) log.Printf("Collect: File: %v not collected!", x)

80
etcd.go
View File

@@ -37,9 +37,38 @@ const (
etcdBar etcdBar
) )
func EtcdGetKAPI(seed string) etcd.KeysAPI { //go:generate stringer -type=etcdState -output=etcdstate_stringer.go
type etcdState int
const (
etcdNil etcdState = iota
//etcdConverged
etcdConvergedTimeout
)
type EtcdWObject struct { // etcd wrapper object
seed string
ctimeout int
converged chan bool
kapi etcd.KeysAPI
state etcdState
}
func (obj *EtcdWObject) GetState() etcdState {
return obj.state
}
func (obj *EtcdWObject) SetState(state etcdState) {
obj.state = state
}
func (etcdO *EtcdWObject) GetKAPI() etcd.KeysAPI {
if etcdO.kapi != nil { // memoize
return etcdO.kapi
}
cfg := etcd.Config{ cfg := etcd.Config{
Endpoints: []string{seed}, Endpoints: []string{etcdO.seed},
Transport: etcd.DefaultTransport, Transport: etcd.DefaultTransport,
// set timeout per request to fail fast when the target endpoint is unavailable // set timeout per request to fail fast when the target endpoint is unavailable
HeaderTimeoutPerRequest: time.Second, HeaderTimeoutPerRequest: time.Second,
@@ -62,10 +91,31 @@ func EtcdGetKAPI(seed string) etcd.KeysAPI {
} }
log.Fatal(err) // some unhandled error log.Fatal(err) // some unhandled error
} }
return etcd.NewKeysAPI(c) etcdO.kapi = etcd.NewKeysAPI(c)
return etcdO.kapi
} }
func EtcdWatch(kapi etcd.KeysAPI) chan etcdMsg { type EtcdChannelWatchResponse struct {
resp *etcd.Response
err error
}
// wrap the etcd watcher.Next blocking function inside of a channel
func (etcdO *EtcdWObject) EtcdChannelWatch(watcher etcd.Watcher, context etcd_context.Context) chan *EtcdChannelWatchResponse {
ch := make(chan *EtcdChannelWatchResponse)
go func() {
for {
resp, err := watcher.Next(context) // blocks here
ch <- &EtcdChannelWatchResponse{resp, err}
}
}()
return ch
}
func (etcdO *EtcdWObject) EtcdWatch() chan etcdMsg {
kapi := etcdO.GetKAPI()
ctimeout := etcdO.ctimeout
converged := etcdO.converged
// XXX: i think we need this buffered so that when we're hanging on the // XXX: i think we need this buffered so that when we're hanging on the
// channel, which is inside the EtcdWatch main loop, we still want the // channel, which is inside the EtcdWatch main loop, we still want the
// calls to Get/Set on etcd to succeed, so blocking them here would // calls to Get/Set on etcd to succeed, so blocking them here would
@@ -79,7 +129,19 @@ func EtcdWatch(kapi etcd.KeysAPI) chan etcdMsg {
watcher := kapi.Watcher("/exported/", &etcd.WatcherOptions{Recursive: true}) watcher := kapi.Watcher("/exported/", &etcd.WatcherOptions{Recursive: true})
for { for {
log.Printf("Etcd: Watching...") log.Printf("Etcd: Watching...")
resp, err := watcher.Next(etcd_context.Background()) // blocks here var resp *etcd.Response = nil
var err error = nil
select {
case out := <-etcdO.EtcdChannelWatch(watcher, etcd_context.Background()):
etcdO.SetState(etcdNil)
resp, err = out.resp, out.err
case _ = <-TimeAfterOrBlock(ctimeout):
etcdO.SetState(etcdConvergedTimeout)
converged <- true
continue
}
if err != nil { if err != nil {
if err == etcd_context.Canceled { if err == etcd_context.Canceled {
// ctx is canceled by another routine // ctx is canceled by another routine
@@ -141,7 +203,8 @@ func EtcdWatch(kapi etcd.KeysAPI) chan etcdMsg {
} }
// helper function to store our data in etcd // helper function to store our data in etcd
func EtcdPut(kapi etcd.KeysAPI, hostname, key, typ string, obj interface{}) bool { func (etcdO *EtcdWObject) EtcdPut(hostname, key, typ string, obj interface{}) bool {
kapi := etcdO.GetKAPI()
output, ok := ObjToB64(obj) output, ok := ObjToB64(obj)
if !ok { if !ok {
log.Printf("Etcd: Could not encode %v key.", key) log.Printf("Etcd: Could not encode %v key.", key)
@@ -171,7 +234,8 @@ func EtcdPut(kapi etcd.KeysAPI, hostname, key, typ string, obj interface{}) bool
} }
// lookup /exported/ node hierarchy // lookup /exported/ node hierarchy
func EtcdGet(kapi etcd.KeysAPI) (etcd.Nodes, bool) { func (etcdO *EtcdWObject) EtcdGet() (etcd.Nodes, bool) {
kapi := etcdO.GetKAPI()
// key structure is /exported/<hostname>/types/... // key structure is /exported/<hostname>/types/...
resp, err := kapi.Get(etcd_context.Background(), "/exported/", &etcd.GetOptions{Recursive: true}) resp, err := kapi.Get(etcd_context.Background(), "/exported/", &etcd.GetOptions{Recursive: true})
if err != nil { if err != nil {
@@ -180,7 +244,7 @@ func EtcdGet(kapi etcd.KeysAPI) (etcd.Nodes, bool) {
return resp.Node.Nodes, true return resp.Node.Nodes, true
} }
func EtcdGetProcess(nodes etcd.Nodes, typ string) []string { func (etcdO *EtcdWObject) EtcdGetProcess(nodes etcd.Nodes, typ string) []string {
//path := fmt.Sprintf("/exported/%s/types/", h) //path := fmt.Sprintf("/exported/%s/types/", h)
top := "/exported/" top := "/exported/"
log.Printf("Etcd: Get: %+v", nodes) // Get().Nodes.Nodes log.Printf("Etcd: Get: %+v", nodes) // Get().Nodes.Nodes

View File

@@ -149,6 +149,7 @@ func (obj *FileType) Watch() {
select { select {
case event := <-watcher.Events: case event := <-watcher.Events:
obj.SetState(typeNil) // XXX: technically i can detect is the event is erroneous or not first
// the deeper you go, the bigger the delta_depth is... // the deeper you go, the bigger the delta_depth is...
// this is the difference between what we're watching, // this is the difference between what we're watching,
// and the event... doesn't mean we can't watch deeper // and the event... doesn't mean we can't watch deeper
@@ -214,15 +215,22 @@ func (obj *FileType) Watch() {
} }
case err := <-watcher.Errors: case err := <-watcher.Errors:
obj.SetState(typeNil) // XXX ?
log.Println("error:", err) log.Println("error:", err)
log.Fatal(err) log.Fatal(err)
//obj.events <- fmt.Sprintf("file: %v", "error") // XXX: how should we handle errors? //obj.events <- fmt.Sprintf("file: %v", "error") // XXX: how should we handle errors?
case event := <-obj.events: case event := <-obj.events:
obj.SetState(typeNil)
if ok := obj.ReadEvent(&event); !ok { if ok := obj.ReadEvent(&event); !ok {
return // exit return // exit
} }
send = true send = true
case _ = <-TimeAfterOrBlock(obj.ctimeout):
obj.SetState(typeConvergedTimeout)
obj.converged <- true
continue
} }
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs

67
main.go
View File

@@ -26,6 +26,7 @@ import (
"sync" "sync"
"syscall" "syscall"
"time" "time"
//etcd_context "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
) )
// set at compile time // set at compile time
@@ -61,13 +62,14 @@ func waitForSignal(exit chan bool) {
func run(c *cli.Context) { func run(c *cli.Context) {
var start int64 = time.Now().UnixNano() var start int64 = time.Now().UnixNano()
var wg sync.WaitGroup var wg sync.WaitGroup
exit := make(chan bool) // exit signal exit := make(chan bool) // exit signal
converged := make(chan bool) // converged signal
log.Printf("This is: %v, version: %v\n", program, version) log.Printf("This is: %v, version: %v\n", program, version)
log.Printf("Start: %v\n", start) log.Printf("Start: %v\n", start)
G := NewGraph("Graph") // give graph a default name G := NewGraph("Graph") // give graph a default name
// exit after `exittime` seconds for no reason at all... // exit after `max-runtime` seconds for no reason at all...
if i := c.Int("exittime"); i > 0 { if i := c.Int("max-runtime"); i > 0 {
go func() { go func() {
time.Sleep(time.Duration(i) * time.Second) time.Sleep(time.Duration(i) * time.Second)
exit <- true exit <- true
@@ -85,6 +87,12 @@ func run(c *cli.Context) {
// FIXME: validate seed, or wait for it to fail in etcd init? // FIXME: validate seed, or wait for it to fail in etcd init?
// etcd // etcd
etcdO := &EtcdWObject{
seed: seed,
ctimeout: c.Int("converged-timeout"),
converged: converged,
}
hostname := c.String("hostname") hostname := c.String("hostname")
if hostname == "" { if hostname == "" {
hostname, _ = os.Hostname() // etcd watch key // XXX: this is not the correct key name this is the set key name... WOOPS hostname, _ = os.Hostname() // etcd watch key // XXX: this is not the correct key name this is the set key name... WOOPS
@@ -95,8 +103,7 @@ func run(c *cli.Context) {
file := c.String("file") file := c.String("file")
configchan := ConfigWatch(file) configchan := ConfigWatch(file)
log.Printf("Starting etcd...\n") log.Printf("Starting etcd...\n")
kapi := EtcdGetKAPI(seed) etcdchan := etcdO.EtcdWatch()
etcdchan := EtcdWatch(kapi)
first := true // first loop or not first := true // first loop or not
for { for {
select { select {
@@ -117,7 +124,6 @@ func run(c *cli.Context) {
if c.Bool("no-watch") || !msg { if c.Bool("no-watch") || !msg {
continue // not ready to read config continue // not ready to read config
} }
//case compile_event: XXX //case compile_event: XXX
} }
@@ -130,15 +136,15 @@ func run(c *cli.Context) {
// run graph vertex LOCK... // run graph vertex LOCK...
if !first { // XXX: we can flatten this check out I think if !first { // XXX: we can flatten this check out I think
G.SetState(graphPausing) G.SetState(graphPausing)
log.Printf("State: %v", G.State()) log.Printf("State: %v", G.GetState())
G.Pause() // sync G.Pause() // sync
G.SetState(graphPaused) G.SetState(graphPaused)
log.Printf("State: %v", G.State()) log.Printf("State: %v", G.GetState())
} }
// build the graph from a config file // build the graph from a config file
// build the graph on events (eg: from etcd) // build the graph on events (eg: from etcd)
UpdateGraphFromConfig(config, hostname, G, kapi) UpdateGraphFromConfig(config, hostname, G, etcdO)
log.Printf("Graph: %v\n", G) // show graph log.Printf("Graph: %v\n", G) // show graph
err := G.ExecGraphviz(c.String("graphviz-filter"), c.String("graphviz")) err := G.ExecGraphviz(c.String("graphviz-filter"), c.String("graphviz"))
if err != nil { if err != nil {
@@ -147,21 +153,53 @@ func run(c *cli.Context) {
log.Printf("Graphviz: Successfully generated graph!") log.Printf("Graphviz: Successfully generated graph!")
} }
G.SetVertex() G.SetVertex()
G.SetConvergedCallback(c.Int("converged-timeout"), converged)
// G.Start(...) needs to be synchronous or wait, // G.Start(...) needs to be synchronous or wait,
// because if half of the nodes are started and // because if half of the nodes are started and
// some are not ready yet and the EtcdWatch // some are not ready yet and the EtcdWatch
// loops, we'll cause G.Pause(...) before we // loops, we'll cause G.Pause(...) before we
// even got going, thus causing nil pointer errors // even got going, thus causing nil pointer errors
G.SetState(graphStarting) G.SetState(graphStarting)
log.Printf("State: %v", G.State()) log.Printf("State: %v", G.GetState())
G.Start(&wg) // sync G.Start(&wg) // sync
G.SetState(graphStarted) G.SetState(graphStarted)
log.Printf("State: %v", G.State()) log.Printf("State: %v", G.GetState())
first = false first = false
} }
}() }()
if i := c.Int("converged-timeout"); i >= 0 {
go func() {
for {
isConverged := true
<-converged // when anyone says they have converged
if etcdO.GetState() != etcdConvergedTimeout {
isConverged = false
goto ConvergedCheck // efficiency boost
}
for v := range G.GetVerticesChan() {
if v.Type.GetState() != typeConvergedTimeout {
isConverged = false
break
}
}
ConvergedCheck:
// if all have converged, exit
if isConverged {
log.Printf("Converged for %d seconds, exiting!", i)
exit <- true
for {
<-converged
} // unblock/drain
return
}
}
}()
}
log.Println("Running...") log.Println("Running...")
waitForSignal(exit) // pass in exit channel to watch waitForSignal(exit) // pass in exit channel to watch
@@ -236,7 +274,12 @@ func main() {
Usage: "default etc peer endpoint", Usage: "default etc peer endpoint",
}, },
cli.IntFlag{ cli.IntFlag{
Name: "exittime", Name: "converged-timeout",
Value: -1,
Usage: "exit after approximately this many seconds in a converged state",
},
cli.IntFlag{
Name: "max-runtime",
Value: 0, Value: 0,
Usage: "exit after a maximum of approximately this many seconds", Usage: "exit after a maximum of approximately this many seconds",
}, },

10
misc.go
View File

@@ -23,6 +23,7 @@ import (
"encoding/gob" "encoding/gob"
"path" "path"
"strings" "strings"
"time"
) )
// Similar to the GNU dirname command // Similar to the GNU dirname command
@@ -110,3 +111,12 @@ func B64ToObj(str string, obj interface{}) bool {
} }
return true return true
} }
// special version of time.After that blocks when given a negative integer
// when used in a case statement, the timer restarts on each select call to it
func TimeAfterOrBlock(t int) <-chan time.Time {
if t < 0 {
return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(t) * time.Second)
}

View File

@@ -95,7 +95,7 @@ func (g *Graph) SetName(name string) {
g.Name = name g.Name = name
} }
func (g *Graph) State() graphState { func (g *Graph) GetState() graphState {
g.mutex.Lock() g.mutex.Lock()
defer g.mutex.Unlock() defer g.mutex.Unlock()
return g.state return g.state
@@ -577,6 +577,12 @@ func (g *Graph) Exit() {
} }
} }
func (g *Graph) SetConvergedCallback(ctimeout int, converged chan bool) {
for v := range g.GetVerticesChan() {
v.Type.SetConvegedCallback(ctimeout, converged)
}
}
// in array function to test *vertices in a slice of *vertices // in array function to test *vertices in a slice of *vertices
func HasVertex(v *Vertex, haystack []*Vertex) bool { func HasVertex(v *Vertex, haystack []*Vertex) bool {
for _, r := range haystack { for _, r := range haystack {

View File

@@ -123,14 +123,20 @@ func (obj *ServiceType) Watch() {
select { select {
case _ = <-buschan: // XXX wait for new units event to unstick case _ = <-buschan: // XXX wait for new units event to unstick
obj.SetState(typeNil)
// loop so that we can see the changed invalid signal // loop so that we can see the changed invalid signal
log.Printf("Service[%v]->DaemonReload()\n", service) log.Printf("Service[%v]->DaemonReload()\n", service)
case event := <-obj.events: case event := <-obj.events:
obj.SetState(typeNil)
if ok := obj.ReadEvent(&event); !ok { if ok := obj.ReadEvent(&event); !ok {
return // exit return // exit
} }
send = true send = true
case _ = <-TimeAfterOrBlock(obj.ctimeout):
obj.SetState(typeConvergedTimeout)
obj.converged <- true
continue
} }
} else { } else {
if !activeSet { if !activeSet {
@@ -160,11 +166,13 @@ func (obj *ServiceType) Watch() {
send = true send = true
case err := <-subErrors: case err := <-subErrors:
obj.SetState(typeNil) // XXX ?
log.Println("error:", err) log.Println("error:", err)
log.Fatal(err) log.Fatal(err)
//vertex.events <- fmt.Sprintf("service: %v", "error") // XXX: how should we handle errors? //vertex.events <- fmt.Sprintf("service: %v", "error") // XXX: how should we handle errors?
case event := <-obj.events: case event := <-obj.events:
obj.SetState(typeNil)
if ok := obj.ReadEvent(&event); !ok { if ok := obj.ReadEvent(&event); !ok {
return // exit return // exit
} }

View File

@@ -23,6 +23,15 @@ import (
"time" "time"
) )
//go:generate stringer -type=typeState -output=typestate_stringer.go
type typeState int
const (
typeNil typeState = iota
//typeConverged
typeConvergedTimeout
)
type Type interface { type Type interface {
Init() Init()
GetName() string // can't be named "Name()" because of struct field GetName() string // can't be named "Name()" because of struct field
@@ -31,10 +40,13 @@ type Type interface {
StateOK() bool // TODO: can we rename this to something better? StateOK() bool // TODO: can we rename this to something better?
Apply() bool Apply() bool
SetVertex(*Vertex) SetVertex(*Vertex)
SetConvegedCallback(ctimeout int, converged chan bool)
Compare(Type) bool Compare(Type) bool
SendEvent(eventName, bool) SendEvent(eventName, bool)
IsWatching() bool IsWatching() bool
SetWatching(bool) SetWatching(bool)
GetState() typeState
SetState(typeState)
GetTimestamp() int64 GetTimestamp() int64
UpdateTimestamp() int64 UpdateTimestamp() int64
//Process() //Process()
@@ -45,7 +57,10 @@ type BaseType struct {
timestamp int64 // last updated timestamp ? timestamp int64 // last updated timestamp ?
events chan Event events chan Event
vertex *Vertex vertex *Vertex
state typeState
watching bool // is Watch() loop running ? watching bool // is Watch() loop running ?
ctimeout int // converged timeout
converged chan bool
} }
type NoopType struct { type NoopType struct {
@@ -87,6 +102,11 @@ func (obj *BaseType) SetVertex(v *Vertex) {
obj.vertex = v obj.vertex = v
} }
func (obj *BaseType) SetConvegedCallback(ctimeout int, converged chan bool) {
obj.ctimeout = ctimeout
obj.converged = converged
}
// is the Watch() function running? // is the Watch() function running?
func (obj *BaseType) IsWatching() bool { func (obj *BaseType) IsWatching() bool {
return obj.watching return obj.watching
@@ -97,6 +117,14 @@ func (obj *BaseType) SetWatching(b bool) {
obj.watching = b obj.watching = b
} }
func (obj *BaseType) GetState() typeState {
return obj.state
}
func (obj *BaseType) SetState(state typeState) {
obj.state = state
}
// get timestamp of a vertex // get timestamp of a vertex
func (obj *BaseType) GetTimestamp() int64 { func (obj *BaseType) GetTimestamp() int64 {
return obj.timestamp return obj.timestamp
@@ -230,14 +258,18 @@ func (obj *NoopType) Watch() {
//vertex := obj.vertex // stored with SetVertex //vertex := obj.vertex // stored with SetVertex
var send = false // send event? var send = false // send event?
for { for {
select { select {
case event := <-obj.events: case event := <-obj.events:
obj.SetState(typeNil)
if ok := obj.ReadEvent(&event); !ok { if ok := obj.ReadEvent(&event); !ok {
return // exit return // exit
} }
send = true send = true
case _ = <-TimeAfterOrBlock(obj.ctimeout):
obj.SetState(typeConvergedTimeout)
obj.converged <- true
continue
} }
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs