From 72525d30b138b1995ced8ee4b858605dcc61b81e Mon Sep 17 00:00:00 2001 From: James Shubin Date: Wed, 6 Jan 2016 19:35:29 -0500 Subject: [PATCH] Refactor etcd into object and add exit timers This refactors my etcd use into a struct (object) wrapper, which makes it easier to add an exit on converged timer. --- DOCUMENTATION.md | 7 +++++ config.go | 11 +++---- etcd.go | 80 +++++++++++++++++++++++++++++++++++++++++++----- file.go | 8 +++++ main.go | 67 ++++++++++++++++++++++++++++++++-------- misc.go | 10 ++++++ pgraph.go | 8 ++++- service.go | 8 +++++ types.go | 36 ++++++++++++++++++++-- 9 files changed, 205 insertions(+), 30 deletions(-) diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index da6cc52b..19c03e75 100644 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -99,6 +99,13 @@ documentation, please run `mgmt --help`. ####`--file ` Point to a graph file to run. +####`--converged-timeout ` +Exit if the machine has converged for approximately this many seconds. + +####`--max-runtime ` +Exit when the agent has run for approximately this many seconds. This is not +generally recommended, but may be useful for users who know what they're doing. + ##Examples For example configurations, please consult the [examples/](https://github.com/purpleidea/mgmt/tree/master/examples) directory in the git source repository. It is available from: diff --git a/config.go b/config.go index 37f97633..f54b6ac3 100644 --- a/config.go +++ b/config.go @@ -18,9 +18,6 @@ package main import ( - //etcd_context "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - etcd "github.com/coreos/etcd/client" - "errors" "gopkg.in/yaml.v2" "io/ioutil" @@ -82,7 +79,7 @@ func ParseConfigFromFile(filename string) *graphConfig { return &config } -func UpdateGraphFromConfig(config *graphConfig, hostname string, g *Graph, kapi etcd.KeysAPI) { +func UpdateGraphFromConfig(config *graphConfig, hostname string, g *Graph, etcdO *EtcdWObject) { var NoopMap map[string]*Vertex = make(map[string]*Vertex) var FileMap map[string]*Vertex = make(map[string]*Vertex) @@ -116,7 +113,7 @@ func UpdateGraphFromConfig(config *graphConfig, hostname string, g *Graph, kapi if strings.HasPrefix(t.Name, "@@") { // exported resource // add to etcd storage... t.Name = t.Name[2:] //slice off @@ - if !EtcdPut(kapi, hostname, t.Name, "file", t) { + if !etcdO.EtcdPut(hostname, t.Name, "file", t) { log.Printf("Problem exporting file resource %v.", t.Name) continue } @@ -146,13 +143,13 @@ func UpdateGraphFromConfig(config *graphConfig, hostname string, g *Graph, kapi // lookup from etcd graph // do all the graph look ups in one single step, so that if the etcd // database changes, we don't have a partial state of affairs... - nodes, ok := EtcdGet(kapi) + nodes, ok := etcdO.EtcdGet() if ok { for _, t := range config.Collector { // XXX: use t.Type and optionally t.Pattern to collect from etcd storage log.Printf("Collect: %v; Pattern: %v", t.Type, t.Pattern) - for _, x := range EtcdGetProcess(nodes, "file") { + for _, x := range etcdO.EtcdGetProcess(nodes, "file") { var obj *FileType if B64ToObj(x, &obj) != true { log.Printf("Collect: File: %v not collected!", x) diff --git a/etcd.go b/etcd.go index e47f9030..25329b90 100644 --- a/etcd.go +++ b/etcd.go @@ -37,9 +37,38 @@ const ( etcdBar ) -func EtcdGetKAPI(seed string) etcd.KeysAPI { +//go:generate stringer -type=etcdState -output=etcdstate_stringer.go +type etcdState int + +const ( + etcdNil etcdState = iota + //etcdConverged + etcdConvergedTimeout +) + +type EtcdWObject struct { // etcd wrapper object + seed string + ctimeout int + converged chan bool + kapi etcd.KeysAPI + state etcdState +} + +func (obj *EtcdWObject) GetState() etcdState { + return obj.state +} + +func (obj *EtcdWObject) SetState(state etcdState) { + obj.state = state +} + +func (etcdO *EtcdWObject) GetKAPI() etcd.KeysAPI { + if etcdO.kapi != nil { // memoize + return etcdO.kapi + } + cfg := etcd.Config{ - Endpoints: []string{seed}, + Endpoints: []string{etcdO.seed}, Transport: etcd.DefaultTransport, // set timeout per request to fail fast when the target endpoint is unavailable HeaderTimeoutPerRequest: time.Second, @@ -62,10 +91,31 @@ func EtcdGetKAPI(seed string) etcd.KeysAPI { } log.Fatal(err) // some unhandled error } - return etcd.NewKeysAPI(c) + etcdO.kapi = etcd.NewKeysAPI(c) + return etcdO.kapi } -func EtcdWatch(kapi etcd.KeysAPI) chan etcdMsg { +type EtcdChannelWatchResponse struct { + resp *etcd.Response + err error +} + +// wrap the etcd watcher.Next blocking function inside of a channel +func (etcdO *EtcdWObject) EtcdChannelWatch(watcher etcd.Watcher, context etcd_context.Context) chan *EtcdChannelWatchResponse { + ch := make(chan *EtcdChannelWatchResponse) + go func() { + for { + resp, err := watcher.Next(context) // blocks here + ch <- &EtcdChannelWatchResponse{resp, err} + } + }() + return ch +} + +func (etcdO *EtcdWObject) EtcdWatch() chan etcdMsg { + kapi := etcdO.GetKAPI() + ctimeout := etcdO.ctimeout + converged := etcdO.converged // XXX: i think we need this buffered so that when we're hanging on the // channel, which is inside the EtcdWatch main loop, we still want the // calls to Get/Set on etcd to succeed, so blocking them here would @@ -79,7 +129,19 @@ func EtcdWatch(kapi etcd.KeysAPI) chan etcdMsg { watcher := kapi.Watcher("/exported/", &etcd.WatcherOptions{Recursive: true}) for { log.Printf("Etcd: Watching...") - resp, err := watcher.Next(etcd_context.Background()) // blocks here + var resp *etcd.Response = nil + var err error = nil + select { + case out := <-etcdO.EtcdChannelWatch(watcher, etcd_context.Background()): + etcdO.SetState(etcdNil) + resp, err = out.resp, out.err + + case _ = <-TimeAfterOrBlock(ctimeout): + etcdO.SetState(etcdConvergedTimeout) + converged <- true + continue + } + if err != nil { if err == etcd_context.Canceled { // ctx is canceled by another routine @@ -141,7 +203,8 @@ func EtcdWatch(kapi etcd.KeysAPI) chan etcdMsg { } // helper function to store our data in etcd -func EtcdPut(kapi etcd.KeysAPI, hostname, key, typ string, obj interface{}) bool { +func (etcdO *EtcdWObject) EtcdPut(hostname, key, typ string, obj interface{}) bool { + kapi := etcdO.GetKAPI() output, ok := ObjToB64(obj) if !ok { log.Printf("Etcd: Could not encode %v key.", key) @@ -171,7 +234,8 @@ func EtcdPut(kapi etcd.KeysAPI, hostname, key, typ string, obj interface{}) bool } // lookup /exported/ node hierarchy -func EtcdGet(kapi etcd.KeysAPI) (etcd.Nodes, bool) { +func (etcdO *EtcdWObject) EtcdGet() (etcd.Nodes, bool) { + kapi := etcdO.GetKAPI() // key structure is /exported//types/... resp, err := kapi.Get(etcd_context.Background(), "/exported/", &etcd.GetOptions{Recursive: true}) if err != nil { @@ -180,7 +244,7 @@ func EtcdGet(kapi etcd.KeysAPI) (etcd.Nodes, bool) { return resp.Node.Nodes, true } -func EtcdGetProcess(nodes etcd.Nodes, typ string) []string { +func (etcdO *EtcdWObject) EtcdGetProcess(nodes etcd.Nodes, typ string) []string { //path := fmt.Sprintf("/exported/%s/types/", h) top := "/exported/" log.Printf("Etcd: Get: %+v", nodes) // Get().Nodes.Nodes diff --git a/file.go b/file.go index 674bf362..7304ae94 100644 --- a/file.go +++ b/file.go @@ -149,6 +149,7 @@ func (obj *FileType) Watch() { select { case event := <-watcher.Events: + obj.SetState(typeNil) // XXX: technically i can detect is the event is erroneous or not first // the deeper you go, the bigger the delta_depth is... // this is the difference between what we're watching, // and the event... doesn't mean we can't watch deeper @@ -214,15 +215,22 @@ func (obj *FileType) Watch() { } case err := <-watcher.Errors: + obj.SetState(typeNil) // XXX ? log.Println("error:", err) log.Fatal(err) //obj.events <- fmt.Sprintf("file: %v", "error") // XXX: how should we handle errors? case event := <-obj.events: + obj.SetState(typeNil) if ok := obj.ReadEvent(&event); !ok { return // exit } send = true + + case _ = <-TimeAfterOrBlock(obj.ctimeout): + obj.SetState(typeConvergedTimeout) + obj.converged <- true + continue } // do all our event sending all together to avoid duplicate msgs diff --git a/main.go b/main.go index 063740c3..76e56744 100644 --- a/main.go +++ b/main.go @@ -26,6 +26,7 @@ import ( "sync" "syscall" "time" + //etcd_context "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" ) // set at compile time @@ -61,13 +62,14 @@ func waitForSignal(exit chan bool) { func run(c *cli.Context) { var start int64 = time.Now().UnixNano() var wg sync.WaitGroup - exit := make(chan bool) // exit signal + exit := make(chan bool) // exit signal + converged := make(chan bool) // converged signal log.Printf("This is: %v, version: %v\n", program, version) log.Printf("Start: %v\n", start) G := NewGraph("Graph") // give graph a default name - // exit after `exittime` seconds for no reason at all... - if i := c.Int("exittime"); i > 0 { + // exit after `max-runtime` seconds for no reason at all... + if i := c.Int("max-runtime"); i > 0 { go func() { time.Sleep(time.Duration(i) * time.Second) exit <- true @@ -85,6 +87,12 @@ func run(c *cli.Context) { // FIXME: validate seed, or wait for it to fail in etcd init? // etcd + etcdO := &EtcdWObject{ + seed: seed, + ctimeout: c.Int("converged-timeout"), + converged: converged, + } + hostname := c.String("hostname") if hostname == "" { hostname, _ = os.Hostname() // etcd watch key // XXX: this is not the correct key name this is the set key name... WOOPS @@ -95,8 +103,7 @@ func run(c *cli.Context) { file := c.String("file") configchan := ConfigWatch(file) log.Printf("Starting etcd...\n") - kapi := EtcdGetKAPI(seed) - etcdchan := EtcdWatch(kapi) + etcdchan := etcdO.EtcdWatch() first := true // first loop or not for { select { @@ -117,7 +124,6 @@ func run(c *cli.Context) { if c.Bool("no-watch") || !msg { continue // not ready to read config } - //case compile_event: XXX } @@ -130,15 +136,15 @@ func run(c *cli.Context) { // run graph vertex LOCK... if !first { // XXX: we can flatten this check out I think G.SetState(graphPausing) - log.Printf("State: %v", G.State()) + log.Printf("State: %v", G.GetState()) G.Pause() // sync G.SetState(graphPaused) - log.Printf("State: %v", G.State()) + log.Printf("State: %v", G.GetState()) } // build the graph from a config file // build the graph on events (eg: from etcd) - UpdateGraphFromConfig(config, hostname, G, kapi) + UpdateGraphFromConfig(config, hostname, G, etcdO) log.Printf("Graph: %v\n", G) // show graph err := G.ExecGraphviz(c.String("graphviz-filter"), c.String("graphviz")) if err != nil { @@ -147,21 +153,53 @@ func run(c *cli.Context) { log.Printf("Graphviz: Successfully generated graph!") } G.SetVertex() + G.SetConvergedCallback(c.Int("converged-timeout"), converged) // G.Start(...) needs to be synchronous or wait, // because if half of the nodes are started and // some are not ready yet and the EtcdWatch // loops, we'll cause G.Pause(...) before we // even got going, thus causing nil pointer errors G.SetState(graphStarting) - log.Printf("State: %v", G.State()) + log.Printf("State: %v", G.GetState()) G.Start(&wg) // sync G.SetState(graphStarted) - log.Printf("State: %v", G.State()) + log.Printf("State: %v", G.GetState()) first = false } }() + if i := c.Int("converged-timeout"); i >= 0 { + go func() { + for { + isConverged := true + <-converged // when anyone says they have converged + + if etcdO.GetState() != etcdConvergedTimeout { + isConverged = false + goto ConvergedCheck // efficiency boost + } + for v := range G.GetVerticesChan() { + if v.Type.GetState() != typeConvergedTimeout { + isConverged = false + break + } + } + + ConvergedCheck: + // if all have converged, exit + if isConverged { + log.Printf("Converged for %d seconds, exiting!", i) + exit <- true + for { + <-converged + } // unblock/drain + return + } + } + }() + } + log.Println("Running...") waitForSignal(exit) // pass in exit channel to watch @@ -236,7 +274,12 @@ func main() { Usage: "default etc peer endpoint", }, cli.IntFlag{ - Name: "exittime", + Name: "converged-timeout", + Value: -1, + Usage: "exit after approximately this many seconds in a converged state", + }, + cli.IntFlag{ + Name: "max-runtime", Value: 0, Usage: "exit after a maximum of approximately this many seconds", }, diff --git a/misc.go b/misc.go index 6229ccd4..db3e8519 100644 --- a/misc.go +++ b/misc.go @@ -23,6 +23,7 @@ import ( "encoding/gob" "path" "strings" + "time" ) // Similar to the GNU dirname command @@ -110,3 +111,12 @@ func B64ToObj(str string, obj interface{}) bool { } return true } + +// special version of time.After that blocks when given a negative integer +// when used in a case statement, the timer restarts on each select call to it +func TimeAfterOrBlock(t int) <-chan time.Time { + if t < 0 { + return make(chan time.Time) // blocks forever + } + return time.After(time.Duration(t) * time.Second) +} diff --git a/pgraph.go b/pgraph.go index a2ffccde..98e0253b 100644 --- a/pgraph.go +++ b/pgraph.go @@ -95,7 +95,7 @@ func (g *Graph) SetName(name string) { g.Name = name } -func (g *Graph) State() graphState { +func (g *Graph) GetState() graphState { g.mutex.Lock() defer g.mutex.Unlock() return g.state @@ -577,6 +577,12 @@ func (g *Graph) Exit() { } } +func (g *Graph) SetConvergedCallback(ctimeout int, converged chan bool) { + for v := range g.GetVerticesChan() { + v.Type.SetConvegedCallback(ctimeout, converged) + } +} + // in array function to test *vertices in a slice of *vertices func HasVertex(v *Vertex, haystack []*Vertex) bool { for _, r := range haystack { diff --git a/service.go b/service.go index 47c5cfa2..ea8de9f8 100644 --- a/service.go +++ b/service.go @@ -123,14 +123,20 @@ func (obj *ServiceType) Watch() { select { case _ = <-buschan: // XXX wait for new units event to unstick + obj.SetState(typeNil) // loop so that we can see the changed invalid signal log.Printf("Service[%v]->DaemonReload()\n", service) case event := <-obj.events: + obj.SetState(typeNil) if ok := obj.ReadEvent(&event); !ok { return // exit } send = true + case _ = <-TimeAfterOrBlock(obj.ctimeout): + obj.SetState(typeConvergedTimeout) + obj.converged <- true + continue } } else { if !activeSet { @@ -160,11 +166,13 @@ func (obj *ServiceType) Watch() { send = true case err := <-subErrors: + obj.SetState(typeNil) // XXX ? log.Println("error:", err) log.Fatal(err) //vertex.events <- fmt.Sprintf("service: %v", "error") // XXX: how should we handle errors? case event := <-obj.events: + obj.SetState(typeNil) if ok := obj.ReadEvent(&event); !ok { return // exit } diff --git a/types.go b/types.go index 05ef1f48..eec418fb 100644 --- a/types.go +++ b/types.go @@ -23,6 +23,15 @@ import ( "time" ) +//go:generate stringer -type=typeState -output=typestate_stringer.go +type typeState int + +const ( + typeNil typeState = iota + //typeConverged + typeConvergedTimeout +) + type Type interface { Init() GetName() string // can't be named "Name()" because of struct field @@ -31,10 +40,13 @@ type Type interface { StateOK() bool // TODO: can we rename this to something better? Apply() bool SetVertex(*Vertex) + SetConvegedCallback(ctimeout int, converged chan bool) Compare(Type) bool SendEvent(eventName, bool) IsWatching() bool SetWatching(bool) + GetState() typeState + SetState(typeState) GetTimestamp() int64 UpdateTimestamp() int64 //Process() @@ -45,7 +57,10 @@ type BaseType struct { timestamp int64 // last updated timestamp ? events chan Event vertex *Vertex + state typeState watching bool // is Watch() loop running ? + ctimeout int // converged timeout + converged chan bool } type NoopType struct { @@ -87,6 +102,11 @@ func (obj *BaseType) SetVertex(v *Vertex) { obj.vertex = v } +func (obj *BaseType) SetConvegedCallback(ctimeout int, converged chan bool) { + obj.ctimeout = ctimeout + obj.converged = converged +} + // is the Watch() function running? func (obj *BaseType) IsWatching() bool { return obj.watching @@ -97,6 +117,14 @@ func (obj *BaseType) SetWatching(b bool) { obj.watching = b } +func (obj *BaseType) GetState() typeState { + return obj.state +} + +func (obj *BaseType) SetState(state typeState) { + obj.state = state +} + // get timestamp of a vertex func (obj *BaseType) GetTimestamp() int64 { return obj.timestamp @@ -230,14 +258,18 @@ func (obj *NoopType) Watch() { //vertex := obj.vertex // stored with SetVertex var send = false // send event? for { - select { case event := <-obj.events: - + obj.SetState(typeNil) if ok := obj.ReadEvent(&event); !ok { return // exit } send = true + + case _ = <-TimeAfterOrBlock(obj.ctimeout): + obj.SetState(typeConvergedTimeout) + obj.converged <- true + continue } // do all our event sending all together to avoid duplicate msgs