Files
mgmt/etcd.go
James Shubin 6f3ac4bf2a Rework the converged detection and provide a clean interface
The old converged detection was hacked in code, instead of something
with a nice interface. This cleans it up, splits it into a separate
file, and removes a race condition that happened with the old code.

We also take the time to get rid of the ugly Set* methods and replace
them all with a single AssociateData method. This might be unnecessary
if we can pass in the Converger method at Resource construction.

Lastly, and most interesting, we suspend the individual timeout callers
when they've already converged, thus reducing unnecessary traffic, and
avoiding fast (eg: < 5 second) timers triggering more than once if they
stay converged!

A quick note on theory for any future readers... What happens if we have
--converged-timeout=0 ? Well, for this and any other positive value,
it's important to realize that deciding if something is converged is
actually a race between if the converged timer will fire and if some
random new event will get triggered. This is because there is nothing
that can actually predict if or when a new event will happen (eg the
user modifying a file). As a result, a race is always inherent, and
actually not a negative or "incorrect" algorithm.

A future improvement could be to add a global lock to each resource, and
to lock all resources when computing if we are converged or not. In
practice, this hasn't been necessary. The worst case scenario would be
(in theory, because this hasn't been tested) if an event happens
*during* the converged calculation, and starts running, the exit command
then runs, and the event finishes, but it doesn't get a chance to notify
some service to restart. A lock could probably fix this theoretical
case.
2016-03-29 20:27:38 -04:00

275 lines
8.3 KiB
Go

// Mgmt
// Copyright (C) 2013-2016+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main
import (
"fmt"
etcd "github.com/coreos/etcd/client"
etcd_context "golang.org/x/net/context"
"log"
"math"
"strings"
"time"
)
//go:generate stringer -type=etcdMsg -output=etcdmsg_stringer.go
type etcdMsg int
const (
etcdStart etcdMsg = iota
etcdEvent
etcdFoo
etcdBar
)
type EtcdWObject struct { // etcd wrapper object
seed string
converger Converger // converged tracking
kapi etcd.KeysAPI
}
func (etcdO *EtcdWObject) GetKAPI() etcd.KeysAPI {
if etcdO.kapi != nil { // memoize
return etcdO.kapi
}
cfg := etcd.Config{
Endpoints: []string{etcdO.seed},
Transport: etcd.DefaultTransport,
// set timeout per request to fail fast when the target endpoint is unavailable
HeaderTimeoutPerRequest: time.Second,
}
var c etcd.Client
var err error
c, err = etcd.New(cfg)
if err != nil {
// XXX: not sure if this ever errors
if cerr, ok := err.(*etcd.ClusterError); ok {
// XXX: not sure if this part ever matches
// not running or disconnected
if cerr == etcd.ErrClusterUnavailable {
log.Fatal("XXX: etcd: ErrClusterUnavailable")
} else {
log.Fatal("XXX: etcd: Unknown")
}
}
log.Fatal(err) // some unhandled error
}
etcdO.kapi = etcd.NewKeysAPI(c)
return etcdO.kapi
}
type EtcdChannelWatchResponse struct {
resp *etcd.Response
err error
}
// wrap the etcd watcher.Next blocking function inside of a channel
func (etcdO *EtcdWObject) EtcdChannelWatch(watcher etcd.Watcher, context etcd_context.Context) chan *EtcdChannelWatchResponse {
ch := make(chan *EtcdChannelWatchResponse)
go func() {
for {
resp, err := watcher.Next(context) // blocks here
ch <- &EtcdChannelWatchResponse{resp, err}
}
}()
return ch
}
func (etcdO *EtcdWObject) EtcdWatch() chan etcdMsg {
kapi := etcdO.GetKAPI()
// XXX: i think we need this buffered so that when we're hanging on the
// channel, which is inside the EtcdWatch main loop, we still want the
// calls to Get/Set on etcd to succeed, so blocking them here would
// kill the whole thing
ch := make(chan etcdMsg, 1) // XXX: buffer of at least 1 is required
go func(ch chan etcdMsg) {
tmin := 500 // initial (min) delay in ms
t := tmin // current time
tmult := 2 // multiplier for exponential delay
tmax := 16000 // max delay
cuuid := etcdO.converger.Register()
defer cuuid.Unregister()
watcher := kapi.Watcher("/exported/", &etcd.WatcherOptions{Recursive: true})
etcdch := etcdO.EtcdChannelWatch(watcher, etcd_context.Background())
for {
log.Printf("Etcd: Watching...")
var resp *etcd.Response // = nil by default
var err error
select {
case out := <-etcdch:
cuuid.SetConverged(false)
resp, err = out.resp, out.err
case _ = <-cuuid.ConvergedTimer():
cuuid.SetConverged(true) // converged!
continue
}
if err != nil {
if err == etcd_context.Canceled {
// ctx is canceled by another routine
log.Fatal("Canceled")
} else if err == etcd_context.DeadlineExceeded {
// ctx is attached with a deadline and it exceeded
log.Fatal("Deadline")
} else if cerr, ok := err.(*etcd.ClusterError); ok {
// not running or disconnected
// TODO: is there a better way to parse errors?
for _, e := range cerr.Errors {
if strings.HasSuffix(e.Error(), "getsockopt: connection refused") {
t = int(math.Min(float64(t*tmult), float64(tmax)))
log.Printf("Etcd: Waiting %d ms for connection...", t)
time.Sleep(time.Duration(t) * time.Millisecond) // sleep for t ms
} else if e.Error() == "unexpected EOF" {
log.Printf("Etcd: Unexpected disconnect...")
} else if e.Error() == "EOF" {
log.Printf("Etcd: Disconnected...")
} else if strings.HasPrefix(e.Error(), "unsupported protocol scheme") {
// usually a bad peer endpoint value
log.Fatal("Bad peer endpoint value?")
} else {
log.Fatal("Woops: ", e.Error())
}
}
} else {
// bad cluster endpoints, which are not etcd servers
log.Fatal("Woops: ", err)
}
} else {
//log.Print(resp)
//log.Printf("Watcher().Node.Value(%v): %+v", key, resp.Node.Value)
// FIXME: we should actually reset when the server comes back, not here on msg!
//XXX: can we fix this with one of these patterns?: https://blog.golang.org/go-concurrency-patterns-timing-out-and
t = tmin // reset timer
// don't trigger event if nothing changed
if n, p := resp.Node, resp.PrevNode; resp.Action == "set" && p != nil {
if n.Key == p.Key && n.Value == p.Value {
continue
}
}
// FIXME: we get events on key/res/value changes for
// each res directory... ignore the non final ones...
// IOW, ignore everything except for the value or some
// field which gets set last... this could be the max count field thing...
log.Printf("Etcd: Value: %v", resp.Node.Value) // event
ch <- etcdEvent // event
}
} // end for loop
//close(ch)
}(ch) // call go routine
return ch
}
// helper function to store our data in etcd
func (etcdO *EtcdWObject) EtcdPut(hostname, key, res string, data string) bool {
kapi := etcdO.GetKAPI()
path := fmt.Sprintf("/exported/%s/resources/%s/res", hostname, key)
_, err := kapi.Set(etcd_context.Background(), path, res, nil)
// XXX validate...
path = fmt.Sprintf("/exported/%s/resources/%s/value", hostname, key)
resp, err := kapi.Set(etcd_context.Background(), path, data, nil)
if err != nil {
if cerr, ok := err.(*etcd.ClusterError); ok {
// not running or disconnected
for _, e := range cerr.Errors {
if strings.HasSuffix(e.Error(), "getsockopt: connection refused") {
}
//if e == etcd.ErrClusterUnavailable
}
}
log.Printf("Etcd: Could not store %v key.", key)
return false
}
log.Print("Etcd: ", resp) // w00t... bonus
return true
}
// lookup /exported/ node hierarchy
func (etcdO *EtcdWObject) EtcdGet() (etcd.Nodes, bool) {
kapi := etcdO.GetKAPI()
// key structure is /exported/<hostname>/resources/...
resp, err := kapi.Get(etcd_context.Background(), "/exported/", &etcd.GetOptions{Recursive: true})
if err != nil {
return nil, false // not found
}
return resp.Node.Nodes, true
}
func (etcdO *EtcdWObject) EtcdGetProcess(nodes etcd.Nodes, res string) []string {
//path := fmt.Sprintf("/exported/%s/resources/", h)
top := "/exported/"
log.Printf("Etcd: Get: %+v", nodes) // Get().Nodes.Nodes
var output []string
for _, x := range nodes { // loop through hosts
if !strings.HasPrefix(x.Key, top) {
log.Fatal("Error!")
}
host := x.Key[len(top):]
//log.Printf("Get().Nodes[%v]: %+v ==> %+v", -1, host, x.Nodes)
//log.Printf("Get().Nodes[%v]: %+v ==> %+v", i, x.Key, x.Nodes)
resources, ok := EtcdGetChildNodeByKey(x, "resources")
if !ok {
continue
}
for _, y := range resources.Nodes { // loop through resources
//key := y.Key # UUID?
//log.Printf("Get(%v): RES[%v]", host, y.Key)
t, ok := EtcdGetChildNodeByKey(y, "res")
if !ok {
continue
}
if res != "" && res != t.Value {
continue
} // filter based on res
v, ok := EtcdGetChildNodeByKey(y, "value") // B64ToObj this
if !ok {
continue
}
log.Printf("Etcd: Hostname: %v; Get: %v", host, t.Value)
output = append(output, v.Value)
}
}
return output
}
// TODO: wrap this somehow so it's a method of *etcd.Node
// helper function that returns the node for a particular key under a node
func EtcdGetChildNodeByKey(node *etcd.Node, key string) (*etcd.Node, bool) {
for _, x := range node.Nodes {
if x.Key == fmt.Sprintf("%s/%s", node.Key, key) {
return x, true
}
}
return nil, false // not found
}