misc: Rename mgmtmain to lib and remove global package
This refactor should make it cleaner to use mgmt.
This commit is contained in:
123
etcd/etcd.go
123
etcd/etcd.go
@@ -64,7 +64,6 @@ import (
|
||||
|
||||
"github.com/purpleidea/mgmt/converger"
|
||||
"github.com/purpleidea/mgmt/event"
|
||||
"github.com/purpleidea/mgmt/global"
|
||||
"github.com/purpleidea/mgmt/resources"
|
||||
"github.com/purpleidea/mgmt/util"
|
||||
|
||||
@@ -154,6 +153,13 @@ type TN struct {
|
||||
data *etcd.TxnResponse
|
||||
}
|
||||
|
||||
// Flags are some constant flags which are used throughout the program.
|
||||
type Flags struct {
|
||||
Debug bool // add additional log messages
|
||||
Trace bool // add execution flow log messages
|
||||
Verbose bool // add extra log message output
|
||||
}
|
||||
|
||||
// EmbdEtcd provides the embedded server and client etcd functionality
|
||||
type EmbdEtcd struct { // EMBeddeD etcd
|
||||
// etcd client connection related
|
||||
@@ -190,6 +196,7 @@ type EmbdEtcd struct { // EMBeddeD etcd
|
||||
delq chan *DL // delete queue
|
||||
txnq chan *TN // txn queue
|
||||
|
||||
flags Flags
|
||||
prefix string // folder prefix to use for misc storage
|
||||
converger converger.Converger // converged tracking
|
||||
|
||||
@@ -200,7 +207,7 @@ type EmbdEtcd struct { // EMBeddeD etcd
|
||||
}
|
||||
|
||||
// NewEmbdEtcd creates the top level embedded etcd struct client and server obj
|
||||
func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs etcdtypes.URLs, noServer bool, idealClusterSize uint16, prefix string, converger converger.Converger) *EmbdEtcd {
|
||||
func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs etcdtypes.URLs, noServer bool, idealClusterSize uint16, flags Flags, prefix string, converger converger.Converger) *EmbdEtcd {
|
||||
endpoints := make(etcdtypes.URLsMap)
|
||||
if hostname == seedSentinel { // safety
|
||||
return nil
|
||||
@@ -229,6 +236,7 @@ func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs etcdtypes.URLs,
|
||||
|
||||
idealClusterSize: idealClusterSize,
|
||||
converger: converger,
|
||||
flags: flags,
|
||||
prefix: prefix,
|
||||
dataDir: path.Join(prefix, "etcd"),
|
||||
}
|
||||
@@ -273,7 +281,7 @@ func (obj *EmbdEtcd) GetConfig() etcd.Config {
|
||||
// Connect connects the client to a server, and then builds the *API structs.
|
||||
// If reconnect is true, it will force a reconnect with new config endpoints.
|
||||
func (obj *EmbdEtcd) Connect(reconnect bool) error {
|
||||
if global.DEBUG {
|
||||
if obj.flags.Debug {
|
||||
log.Println("Etcd: Connect...")
|
||||
}
|
||||
obj.cLock.Lock()
|
||||
@@ -529,29 +537,29 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context,
|
||||
var isTimeout = false
|
||||
var iter int // = 0
|
||||
if ctxerr, ok := ctx.Value(ctxErr).(error); ok {
|
||||
if global.DEBUG {
|
||||
if obj.flags.Debug {
|
||||
log.Printf("Etcd: CtxError: err(%v), ctxerr(%v)", err, ctxerr)
|
||||
}
|
||||
if i, ok := ctx.Value(ctxIter).(int); ok {
|
||||
iter = i + 1 // load and increment
|
||||
if global.DEBUG {
|
||||
if obj.flags.Debug {
|
||||
log.Printf("Etcd: CtxError: Iter: %v", iter)
|
||||
}
|
||||
}
|
||||
isTimeout = err == context.DeadlineExceeded
|
||||
if global.DEBUG {
|
||||
if obj.flags.Debug {
|
||||
log.Printf("Etcd: CtxError: isTimeout: %v", isTimeout)
|
||||
}
|
||||
if !isTimeout {
|
||||
iter = 0 // reset timer
|
||||
}
|
||||
err = ctxerr // restore error
|
||||
} else if global.DEBUG {
|
||||
} else if obj.flags.Debug {
|
||||
log.Printf("Etcd: CtxError: No value found")
|
||||
}
|
||||
ctxHelper := func(tmin, texp, tmax int) context.Context {
|
||||
t := expBackoff(tmin, texp, iter, tmax)
|
||||
if global.DEBUG {
|
||||
if obj.flags.Debug {
|
||||
log.Printf("Etcd: CtxError: Timeout: %v", t)
|
||||
}
|
||||
|
||||
@@ -638,13 +646,13 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context,
|
||||
fallthrough
|
||||
case isGrpc(grpc.ErrClientConnClosing):
|
||||
|
||||
if global.DEBUG {
|
||||
if obj.flags.Debug {
|
||||
log.Printf("Etcd: CtxError: Error(%T): %+v", err, err)
|
||||
log.Printf("Etcd: Endpoints are: %v", obj.client.Endpoints())
|
||||
log.Printf("Etcd: Client endpoints are: %v", obj.endpoints)
|
||||
}
|
||||
|
||||
if global.DEBUG {
|
||||
if obj.flags.Debug {
|
||||
log.Printf("Etcd: CtxError: Locking...")
|
||||
}
|
||||
obj.rLock.Lock()
|
||||
@@ -665,7 +673,7 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context,
|
||||
obj.ctxErr = fmt.Errorf("Etcd: Permanent connect error: %v", err)
|
||||
return ctx, obj.ctxErr
|
||||
}
|
||||
if global.DEBUG {
|
||||
if obj.flags.Debug {
|
||||
log.Printf("Etcd: CtxError: Unlocking...")
|
||||
}
|
||||
obj.rLock.Unlock()
|
||||
@@ -709,7 +717,7 @@ func (obj *EmbdEtcd) CbLoop() {
|
||||
if !re.skipConv { // if we want to count it...
|
||||
cuid.ResetTimer() // activity!
|
||||
}
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: CbLoop: Event: StartLoop")
|
||||
}
|
||||
for {
|
||||
@@ -717,11 +725,11 @@ func (obj *EmbdEtcd) CbLoop() {
|
||||
//re.resp.NACK() // nope!
|
||||
break
|
||||
}
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: CbLoop: rawCallback()")
|
||||
}
|
||||
err := rawCallback(ctx, re)
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: CbLoop: rawCallback(): %v", err)
|
||||
}
|
||||
if err == nil {
|
||||
@@ -733,7 +741,7 @@ func (obj *EmbdEtcd) CbLoop() {
|
||||
break // TODO: it's bad, break or return?
|
||||
}
|
||||
}
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: CbLoop: Event: FinishLoop")
|
||||
}
|
||||
|
||||
@@ -761,11 +769,11 @@ func (obj *EmbdEtcd) Loop() {
|
||||
select {
|
||||
case aw := <-obj.awq:
|
||||
cuid.ResetTimer() // activity!
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: Loop: PriorityAW: StartLoop")
|
||||
}
|
||||
obj.loopProcessAW(ctx, aw)
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: Loop: PriorityAW: FinishLoop")
|
||||
}
|
||||
continue // loop to drain the priority channel first!
|
||||
@@ -777,18 +785,18 @@ func (obj *EmbdEtcd) Loop() {
|
||||
// add watcher
|
||||
case aw := <-obj.awq:
|
||||
cuid.ResetTimer() // activity!
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: Loop: AW: StartLoop")
|
||||
}
|
||||
obj.loopProcessAW(ctx, aw)
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: Loop: AW: FinishLoop")
|
||||
}
|
||||
|
||||
// set kv pair
|
||||
case kv := <-obj.setq:
|
||||
cuid.ResetTimer() // activity!
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: Loop: Set: StartLoop")
|
||||
}
|
||||
for {
|
||||
@@ -805,7 +813,7 @@ func (obj *EmbdEtcd) Loop() {
|
||||
break // TODO: it's bad, break or return?
|
||||
}
|
||||
}
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: Loop: Set: FinishLoop")
|
||||
}
|
||||
|
||||
@@ -814,7 +822,7 @@ func (obj *EmbdEtcd) Loop() {
|
||||
if !gq.skipConv {
|
||||
cuid.ResetTimer() // activity!
|
||||
}
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: Loop: Get: StartLoop")
|
||||
}
|
||||
for {
|
||||
@@ -832,14 +840,14 @@ func (obj *EmbdEtcd) Loop() {
|
||||
break // TODO: it's bad, break or return?
|
||||
}
|
||||
}
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: Loop: Get: FinishLoop")
|
||||
}
|
||||
|
||||
// delete value
|
||||
case dl := <-obj.delq:
|
||||
cuid.ResetTimer() // activity!
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: Loop: Delete: StartLoop")
|
||||
}
|
||||
for {
|
||||
@@ -857,14 +865,14 @@ func (obj *EmbdEtcd) Loop() {
|
||||
break // TODO: it's bad, break or return?
|
||||
}
|
||||
}
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: Loop: Delete: FinishLoop")
|
||||
}
|
||||
|
||||
// run txn
|
||||
case tn := <-obj.txnq:
|
||||
cuid.ResetTimer() // activity!
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: Loop: Txn: StartLoop")
|
||||
}
|
||||
for {
|
||||
@@ -882,7 +890,7 @@ func (obj *EmbdEtcd) Loop() {
|
||||
break // TODO: it's bad, break or return?
|
||||
}
|
||||
}
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: Loop: Txn: FinishLoop")
|
||||
}
|
||||
|
||||
@@ -936,7 +944,7 @@ func (obj *EmbdEtcd) Set(key, value string, opts ...etcd.OpOption) error {
|
||||
|
||||
// rawSet actually implements the key set operation
|
||||
func (obj *EmbdEtcd) rawSet(ctx context.Context, kv *KV) error {
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: rawSet()")
|
||||
}
|
||||
// key is the full key path
|
||||
@@ -945,7 +953,7 @@ func (obj *EmbdEtcd) rawSet(ctx context.Context, kv *KV) error {
|
||||
response, err := obj.client.KV.Put(ctx, kv.key, kv.value, kv.opts...)
|
||||
obj.rLock.RUnlock()
|
||||
log.Printf("Etcd: Set(%s): %v", kv.key, response) // w00t... bonus
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: rawSet(): %v", err)
|
||||
}
|
||||
return err
|
||||
@@ -970,7 +978,7 @@ func (obj *EmbdEtcd) ComplexGet(path string, skipConv bool, opts ...etcd.OpOptio
|
||||
}
|
||||
|
||||
func (obj *EmbdEtcd) rawGet(ctx context.Context, gq *GQ) (result map[string]string, err error) {
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: rawGet()")
|
||||
}
|
||||
obj.rLock.RLock()
|
||||
@@ -986,7 +994,7 @@ func (obj *EmbdEtcd) rawGet(ctx context.Context, gq *GQ) (result map[string]stri
|
||||
result[bytes.NewBuffer(x.Key).String()] = bytes.NewBuffer(x.Value).String()
|
||||
}
|
||||
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: rawGet(): %v", result)
|
||||
}
|
||||
return
|
||||
@@ -1004,7 +1012,7 @@ func (obj *EmbdEtcd) Delete(path string, opts ...etcd.OpOption) (int64, error) {
|
||||
}
|
||||
|
||||
func (obj *EmbdEtcd) rawDelete(ctx context.Context, dl *DL) (count int64, err error) {
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: rawDelete()")
|
||||
}
|
||||
count = -1
|
||||
@@ -1014,7 +1022,7 @@ func (obj *EmbdEtcd) rawDelete(ctx context.Context, dl *DL) (count int64, err er
|
||||
if err == nil {
|
||||
count = response.Deleted
|
||||
}
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: rawDelete(): %v", err)
|
||||
}
|
||||
return
|
||||
@@ -1032,13 +1040,13 @@ func (obj *EmbdEtcd) Txn(ifcmps []etcd.Cmp, thenops, elseops []etcd.Op) (*etcd.T
|
||||
}
|
||||
|
||||
func (obj *EmbdEtcd) rawTxn(ctx context.Context, tn *TN) (*etcd.TxnResponse, error) {
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: rawTxn()")
|
||||
}
|
||||
obj.rLock.RLock()
|
||||
response, err := obj.client.KV.Txn(ctx).If(tn.ifcmps...).Then(tn.thenops...).Else(tn.elseops...).Commit()
|
||||
obj.rLock.RUnlock()
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: rawTxn(): %v, %v", response, err)
|
||||
}
|
||||
return response, err
|
||||
@@ -1072,7 +1080,7 @@ func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error)
|
||||
err := response.Err()
|
||||
isCanceled := response.Canceled || err == context.Canceled
|
||||
if response.Header.Revision == 0 { // by inspection
|
||||
if global.DEBUG {
|
||||
if obj.flags.Debug {
|
||||
log.Printf("Etcd: Watch: Received empty message!") // switched client connection
|
||||
}
|
||||
isCanceled = true
|
||||
@@ -1093,7 +1101,7 @@ func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error)
|
||||
}
|
||||
locked = false
|
||||
} else {
|
||||
if global.DEBUG {
|
||||
if obj.flags.Debug {
|
||||
log.Printf("Etcd: Watch: Error: %v", err) // probably fixable
|
||||
}
|
||||
// this new context is the fix for a tricky set
|
||||
@@ -1142,9 +1150,6 @@ func rawCallback(ctx context.Context, re *RE) error {
|
||||
// NOTE: the callback must *not* block!
|
||||
// FIXME: do we need to pass ctx in via *RE, or in the callback signature ?
|
||||
err = callback(re) // run the callback
|
||||
if global.TRACE {
|
||||
log.Printf("Trace: Etcd: rawCallback(): %v", err)
|
||||
}
|
||||
if !re.errCheck || err == nil {
|
||||
return nil
|
||||
}
|
||||
@@ -1160,7 +1165,7 @@ func rawCallback(ctx context.Context, re *RE) error {
|
||||
// FIXME: we might need to respond to member change/disconnect/shutdown events,
|
||||
// see: https://github.com/coreos/etcd/issues/5277
|
||||
func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: volunteerCallback()")
|
||||
defer log.Printf("Trace: Etcd: volunteerCallback(): Finished!")
|
||||
}
|
||||
@@ -1348,7 +1353,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
|
||||
// nominateCallback runs to respond to the nomination list change events
|
||||
// functionally, it controls the starting and stopping of the server process
|
||||
func (obj *EmbdEtcd) nominateCallback(re *RE) error {
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: nominateCallback()")
|
||||
defer log.Printf("Trace: Etcd: nominateCallback(): Finished!")
|
||||
}
|
||||
@@ -1397,7 +1402,7 @@ func (obj *EmbdEtcd) nominateCallback(re *RE) error {
|
||||
_, exists := obj.nominated[obj.hostname]
|
||||
// FIXME: can we get rid of the len(obj.nominated) == 0 ?
|
||||
newCluster := len(obj.nominated) == 0 || (len(obj.nominated) == 1 && exists)
|
||||
if global.DEBUG {
|
||||
if obj.flags.Debug {
|
||||
log.Printf("Etcd: nominateCallback(): newCluster: %v; exists: %v; obj.server == nil: %t", newCluster, exists, obj.server == nil)
|
||||
}
|
||||
// XXX: check if i have actually volunteered first of all...
|
||||
@@ -1500,7 +1505,7 @@ func (obj *EmbdEtcd) nominateCallback(re *RE) error {
|
||||
|
||||
// endpointCallback runs to respond to the endpoint list change events
|
||||
func (obj *EmbdEtcd) endpointCallback(re *RE) error {
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: endpointCallback()")
|
||||
defer log.Printf("Trace: Etcd: endpointCallback(): Finished!")
|
||||
}
|
||||
@@ -1566,7 +1571,7 @@ func (obj *EmbdEtcd) endpointCallback(re *RE) error {
|
||||
|
||||
// idealClusterSizeCallback runs to respond to the ideal cluster size changes
|
||||
func (obj *EmbdEtcd) idealClusterSizeCallback(re *RE) error {
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: idealClusterSizeCallback()")
|
||||
defer log.Printf("Trace: Etcd: idealClusterSizeCallback(): Finished!")
|
||||
}
|
||||
@@ -1713,7 +1718,7 @@ func (obj *EmbdEtcd) DestroyServer() error {
|
||||
|
||||
// EtcdNominate nominates a particular client to be a server (peer)
|
||||
func EtcdNominate(obj *EmbdEtcd, hostname string, urls etcdtypes.URLs) error {
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: EtcdNominate(%v): %v", hostname, urls.String())
|
||||
defer log.Printf("Trace: Etcd: EtcdNominate(%v): Finished!", hostname)
|
||||
}
|
||||
@@ -1755,7 +1760,7 @@ func EtcdNominated(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
|
||||
return nil, fmt.Errorf("Etcd: Nominated: Data format error!: %v", err)
|
||||
}
|
||||
nominated[name] = urls // add to map
|
||||
if global.DEBUG {
|
||||
if obj.flags.Debug {
|
||||
log.Printf("Etcd: Nominated(%v): %v", name, val)
|
||||
}
|
||||
}
|
||||
@@ -1764,7 +1769,7 @@ func EtcdNominated(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
|
||||
|
||||
// EtcdVolunteer offers yourself up to be a server if needed
|
||||
func EtcdVolunteer(obj *EmbdEtcd, urls etcdtypes.URLs) error {
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: EtcdVolunteer(%v): %v", obj.hostname, urls.String())
|
||||
defer log.Printf("Trace: Etcd: EtcdVolunteer(%v): Finished!", obj.hostname)
|
||||
}
|
||||
@@ -1787,7 +1792,7 @@ func EtcdVolunteer(obj *EmbdEtcd, urls etcdtypes.URLs) error {
|
||||
|
||||
// EtcdVolunteers returns a urls map of available etcd server volunteers
|
||||
func EtcdVolunteers(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: EtcdVolunteers()")
|
||||
defer log.Printf("Trace: Etcd: EtcdVolunteers(): Finished!")
|
||||
}
|
||||
@@ -1810,7 +1815,7 @@ func EtcdVolunteers(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
|
||||
return nil, fmt.Errorf("Etcd: Volunteers: Data format error!: %v", err)
|
||||
}
|
||||
volunteers[name] = urls // add to map
|
||||
if global.DEBUG {
|
||||
if obj.flags.Debug {
|
||||
log.Printf("Etcd: Volunteer(%v): %v", name, val)
|
||||
}
|
||||
}
|
||||
@@ -1819,7 +1824,7 @@ func EtcdVolunteers(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
|
||||
|
||||
// EtcdAdvertiseEndpoints advertises the list of available client endpoints
|
||||
func EtcdAdvertiseEndpoints(obj *EmbdEtcd, urls etcdtypes.URLs) error {
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: EtcdAdvertiseEndpoints(%v): %v", obj.hostname, urls.String())
|
||||
defer log.Printf("Trace: Etcd: EtcdAdvertiseEndpoints(%v): Finished!", obj.hostname)
|
||||
}
|
||||
@@ -1842,7 +1847,7 @@ func EtcdAdvertiseEndpoints(obj *EmbdEtcd, urls etcdtypes.URLs) error {
|
||||
|
||||
// EtcdEndpoints returns a urls map of available etcd server endpoints
|
||||
func EtcdEndpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: EtcdEndpoints()")
|
||||
defer log.Printf("Trace: Etcd: EtcdEndpoints(): Finished!")
|
||||
}
|
||||
@@ -1865,7 +1870,7 @@ func EtcdEndpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
|
||||
return nil, fmt.Errorf("Etcd: Endpoints: Data format error!: %v", err)
|
||||
}
|
||||
endpoints[name] = urls // add to map
|
||||
if global.DEBUG {
|
||||
if obj.flags.Debug {
|
||||
log.Printf("Etcd: Endpoint(%v): %v", name, val)
|
||||
}
|
||||
}
|
||||
@@ -1874,7 +1879,7 @@ func EtcdEndpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
|
||||
|
||||
// EtcdSetHostnameConverged sets whether a specific hostname is converged.
|
||||
func EtcdSetHostnameConverged(obj *EmbdEtcd, hostname string, isConverged bool) error {
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: EtcdSetHostnameConverged(%s): %v", hostname, isConverged)
|
||||
defer log.Printf("Trace: Etcd: EtcdSetHostnameConverged(%v): Finished!", hostname)
|
||||
}
|
||||
@@ -1888,7 +1893,7 @@ func EtcdSetHostnameConverged(obj *EmbdEtcd, hostname string, isConverged bool)
|
||||
|
||||
// EtcdHostnameConverged returns a map of every hostname's converged state.
|
||||
func EtcdHostnameConverged(obj *EmbdEtcd) (map[string]bool, error) {
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: EtcdHostnameConverged()")
|
||||
defer log.Printf("Trace: Etcd: EtcdHostnameConverged(): Finished!")
|
||||
}
|
||||
@@ -1933,7 +1938,7 @@ func EtcdAddHostnameConvergedWatcher(obj *EmbdEtcd, callbackFn func(map[string]b
|
||||
|
||||
// EtcdSetClusterSize sets the ideal target cluster size of etcd peers
|
||||
func EtcdSetClusterSize(obj *EmbdEtcd, value uint16) error {
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: EtcdSetClusterSize(): %v", value)
|
||||
defer log.Printf("Trace: Etcd: EtcdSetClusterSize(): Finished!")
|
||||
}
|
||||
@@ -2027,7 +2032,7 @@ func EtcdMembers(obj *EmbdEtcd) (map[uint64]string, error) {
|
||||
return nil, fmt.Errorf("Exiting...")
|
||||
}
|
||||
obj.rLock.RLock()
|
||||
if global.TRACE {
|
||||
if obj.flags.Trace {
|
||||
log.Printf("Trace: Etcd: EtcdMembers(): Endpoints are: %v", obj.client.Endpoints())
|
||||
}
|
||||
response, err = obj.client.MemberList(ctx)
|
||||
@@ -2279,9 +2284,7 @@ func ApplyDeltaEvents(re *RE, urlsmap etcdtypes.URLsMap) (etcdtypes.URLsMap, err
|
||||
if _, exists := urlsmap[key]; !exists {
|
||||
// this can happen if we retry an operation b/w
|
||||
// a reconnect so ignore if we are reconnecting
|
||||
if global.DEBUG {
|
||||
log.Printf("Etcd: ApplyDeltaEvents: Inconsistent key: %v", key)
|
||||
}
|
||||
log.Printf("Etcd: ApplyDeltaEvents: Inconsistent key: %v", key)
|
||||
return nil, errApplyDeltaEventsInconsistent
|
||||
}
|
||||
delete(urlsmap, key)
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/gapi"
|
||||
mgmt "github.com/purpleidea/mgmt/mgmtmain"
|
||||
mgmt "github.com/purpleidea/mgmt/lib"
|
||||
"github.com/purpleidea/mgmt/pgraph"
|
||||
"github.com/purpleidea/mgmt/resources"
|
||||
"github.com/purpleidea/mgmt/yamlgraph"
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/gapi"
|
||||
mgmt "github.com/purpleidea/mgmt/mgmtmain"
|
||||
mgmt "github.com/purpleidea/mgmt/lib"
|
||||
"github.com/purpleidea/mgmt/pgraph"
|
||||
"github.com/purpleidea/mgmt/resources"
|
||||
)
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/gapi"
|
||||
mgmt "github.com/purpleidea/mgmt/mgmtmain"
|
||||
mgmt "github.com/purpleidea/mgmt/lib"
|
||||
"github.com/purpleidea/mgmt/pgraph"
|
||||
"github.com/purpleidea/mgmt/resources"
|
||||
)
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package mgmtmain
|
||||
package lib
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@@ -37,6 +37,11 @@ func run(c *cli.Context) error {
|
||||
|
||||
obj.Program = c.App.Name
|
||||
obj.Version = c.App.Version
|
||||
if val, exists := c.App.Metadata["flags"]; exists {
|
||||
if flags, ok := val.(Flags); ok {
|
||||
obj.Flags = flags
|
||||
}
|
||||
}
|
||||
|
||||
if h := c.String("hostname"); c.IsSet("hostname") && h != "" {
|
||||
obj.Hostname = &h
|
||||
@@ -143,7 +148,7 @@ func run(c *cli.Context) error {
|
||||
}
|
||||
|
||||
// CLI is the entry point for using mgmt normally from the CLI.
|
||||
func CLI(program, version string) error {
|
||||
func CLI(program, version string, flags Flags) error {
|
||||
|
||||
// test for sanity
|
||||
if program == "" || version == "" {
|
||||
@@ -153,6 +158,9 @@ func CLI(program, version string) error {
|
||||
app.Name = program // App.name and App.version pass these values through
|
||||
app.Version = version
|
||||
app.Usage = "next generation config management"
|
||||
app.Metadata = map[string]interface{}{ // additional flags
|
||||
"flags": flags,
|
||||
}
|
||||
//app.Action = ... // without a default action, help runs
|
||||
|
||||
app.Commands = []cli.Command{
|
||||
@@ -15,7 +15,7 @@
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package mgmtmain
|
||||
package lib
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@@ -42,11 +42,20 @@ import (
|
||||
errwrap "github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// Flags are some constant flags which are used throughout the program.
|
||||
type Flags struct {
|
||||
Debug bool // add additional log messages
|
||||
Trace bool // add execution flow log messages
|
||||
Verbose bool // add extra log message output
|
||||
}
|
||||
|
||||
// Main is the main struct for running the mgmt logic.
|
||||
type Main struct {
|
||||
Program string // the name of this program, usually set at compile time
|
||||
Version string // the version of this program, usually set at compile time
|
||||
|
||||
Flags Flags // static global flags that are set at compile time
|
||||
|
||||
Hostname *string // hostname to use; nil if undefined
|
||||
|
||||
Prefix *string // prefix passed in; nil if undefined
|
||||
@@ -75,9 +84,6 @@ type Main struct {
|
||||
NoCaching bool // don't allow remote caching of remote execution binary
|
||||
Depth uint16 // depth in remote hierarchy; for internal use only
|
||||
|
||||
DEBUG bool
|
||||
VERBOSE bool
|
||||
|
||||
seeds etcdtypes.URLs // processed seeds value
|
||||
clientURLs etcdtypes.URLs // processed client urls value
|
||||
serverURLs etcdtypes.URLs // processed server urls value
|
||||
@@ -167,7 +173,7 @@ func (obj *Main) Run() error {
|
||||
var start = time.Now().UnixNano()
|
||||
|
||||
var flags int
|
||||
if obj.DEBUG || true { // TODO: remove || true
|
||||
if obj.Flags.Debug || true { // TODO: remove || true
|
||||
flags = log.LstdFlags | log.Lshortfile
|
||||
}
|
||||
flags = (flags - log.Ldate) // remove the date for now
|
||||
@@ -175,7 +181,7 @@ func (obj *Main) Run() error {
|
||||
|
||||
// un-hijack from capnslog...
|
||||
log.SetOutput(os.Stderr)
|
||||
if obj.VERBOSE {
|
||||
if obj.Flags.Verbose {
|
||||
capnslog.SetFormatter(capnslog.NewLogFormatter(os.Stderr, "(etcd) ", flags))
|
||||
} else {
|
||||
capnslog.SetFormatter(capnslog.NewNilFormatter())
|
||||
@@ -292,6 +298,11 @@ func (obj *Main) Run() error {
|
||||
obj.serverURLs,
|
||||
obj.NoServer,
|
||||
obj.idealClusterSize,
|
||||
etcd.Flags{
|
||||
Debug: obj.Flags.Debug,
|
||||
Trace: obj.Flags.Trace,
|
||||
Verbose: obj.Flags.Verbose,
|
||||
},
|
||||
prefix,
|
||||
converger,
|
||||
)
|
||||
@@ -361,7 +372,7 @@ func (obj *Main) Run() error {
|
||||
|
||||
case err, ok := <-gapiChan:
|
||||
if !ok { // channel closed
|
||||
if obj.DEBUG {
|
||||
if obj.Flags.Debug {
|
||||
log.Printf("Main: GAPI exited")
|
||||
}
|
||||
gapiChan = nil // disable it
|
||||
@@ -406,11 +417,12 @@ func (obj *Main) Run() error {
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
newGraph.Flags = pgraph.Flags{Debug: obj.Flags.Debug}
|
||||
// pass in the information we need
|
||||
newGraph.AssociateData(&resources.Data{
|
||||
Converger: converger,
|
||||
Prefix: pgraphPrefix,
|
||||
Debug: obj.Flags.Debug,
|
||||
})
|
||||
|
||||
// apply the global noop parameter if requested
|
||||
@@ -461,6 +473,7 @@ func (obj *Main) Run() error {
|
||||
}()
|
||||
|
||||
configWatcher := recwatch.NewConfigWatcher()
|
||||
configWatcher.Flags = recwatch.Flags{Debug: obj.Flags.Debug}
|
||||
events := configWatcher.Events()
|
||||
if !obj.NoWatch {
|
||||
configWatcher.Add(obj.Remotes...) // add all the files...
|
||||
@@ -497,7 +510,10 @@ func (obj *Main) Run() error {
|
||||
prefix,
|
||||
converger,
|
||||
convergerCb,
|
||||
obj.Program,
|
||||
remote.Flags{
|
||||
Program: obj.Program,
|
||||
Debug: obj.Flags.Debug,
|
||||
},
|
||||
)
|
||||
|
||||
// TODO: is there any benefit to running the remotes above in the loop?
|
||||
@@ -537,7 +553,7 @@ func (obj *Main) Run() error {
|
||||
reterr = multierr.Append(reterr, err) // list of errors
|
||||
}
|
||||
|
||||
if obj.DEBUG {
|
||||
if obj.Flags.Debug {
|
||||
log.Printf("Main: Graph: %v", G)
|
||||
}
|
||||
|
||||
16
main.go
16
main.go
@@ -21,7 +21,14 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/purpleidea/mgmt/mgmtmain"
|
||||
mgmt "github.com/purpleidea/mgmt/lib"
|
||||
)
|
||||
|
||||
// These constants are some global variables that are used throughout the code.
|
||||
const (
|
||||
DEBUG = false // add additional log messages
|
||||
TRACE = false // add execution flow log messages
|
||||
VERBOSE = false // add extra log message output
|
||||
)
|
||||
|
||||
// set at compile time
|
||||
@@ -31,7 +38,12 @@ var (
|
||||
)
|
||||
|
||||
func main() {
|
||||
if err := mgmtmain.CLI(program, version); err != nil {
|
||||
flags := mgmt.Flags{
|
||||
Debug: DEBUG,
|
||||
Trace: TRACE,
|
||||
Verbose: VERBOSE,
|
||||
}
|
||||
if err := mgmt.CLI(program, version, flags); err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
return
|
||||
|
||||
@@ -25,7 +25,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/event"
|
||||
"github.com/purpleidea/mgmt/global"
|
||||
"github.com/purpleidea/mgmt/resources"
|
||||
|
||||
errwrap "github.com/pkg/errors"
|
||||
@@ -51,7 +50,7 @@ func (g *Graph) OKTimestamp(v *Vertex) bool {
|
||||
// if they're equal (eg: on init of 0) then we also can't run
|
||||
// b/c we should let our pre-req's go first...
|
||||
x, y := v.GetTimestamp(), n.GetTimestamp()
|
||||
if global.DEBUG {
|
||||
if g.Flags.Debug {
|
||||
log.Printf("%s[%s]: OKTimestamp: (%v) >= %s[%s](%v): !%v", v.Kind(), v.GetName(), x, n.Kind(), n.GetName(), y, x >= y)
|
||||
}
|
||||
if x >= y {
|
||||
@@ -71,7 +70,7 @@ func (g *Graph) Poke(v *Vertex, activity bool) error {
|
||||
// apply, then we can cancel a poke to a child, right? XXX
|
||||
// XXX: if n.Res.getState() != resources.ResStateEvent || activity { // is this correct?
|
||||
if true || activity { // XXX: ???
|
||||
if global.DEBUG {
|
||||
if g.Flags.Debug {
|
||||
log.Printf("%s[%s]: Poke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
|
||||
}
|
||||
wg.Add(1)
|
||||
@@ -87,7 +86,7 @@ func (g *Graph) Poke(v *Vertex, activity bool) error {
|
||||
}(n)
|
||||
|
||||
} else {
|
||||
if global.DEBUG {
|
||||
if g.Flags.Debug {
|
||||
log.Printf("%s[%s]: Poke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName())
|
||||
}
|
||||
}
|
||||
@@ -108,13 +107,13 @@ func (g *Graph) BackPoke(v *Vertex) {
|
||||
// TODO: implement a stateLT (less than) to tell if something
|
||||
// happens earlier in the state cycle and that doesn't wrap nil
|
||||
if x >= y && (s != resources.ResStateEvent && s != resources.ResStateCheckApply) {
|
||||
if global.DEBUG {
|
||||
if g.Flags.Debug {
|
||||
log.Printf("%s[%s]: BackPoke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
|
||||
}
|
||||
// FIXME: is it okay that this is sync?
|
||||
n.SendEvent(event.EventBackPoke, true, false)
|
||||
} else {
|
||||
if global.DEBUG {
|
||||
if g.Flags.Debug {
|
||||
log.Printf("%s[%s]: BackPoke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName())
|
||||
}
|
||||
}
|
||||
@@ -157,7 +156,7 @@ func (g *Graph) SetDownstreamRefresh(v *Vertex, b bool) {
|
||||
// Process is the primary function to execute for a particular vertex in the graph.
|
||||
func (g *Graph) Process(v *Vertex) error {
|
||||
obj := v.Res
|
||||
if global.DEBUG {
|
||||
if g.Flags.Debug {
|
||||
log.Printf("%s[%s]: Process()", obj.Kind(), obj.GetName())
|
||||
}
|
||||
obj.SetState(resources.ResStateEvent)
|
||||
@@ -167,7 +166,7 @@ func (g *Graph) Process(v *Vertex) error {
|
||||
// if not, that's okay because when the dependency runs, it will poke
|
||||
// us back and we will run if needed then!
|
||||
if g.OKTimestamp(v) {
|
||||
if global.DEBUG {
|
||||
if g.Flags.Debug {
|
||||
log.Printf("%s[%s]: OKTimestamp(%v)", obj.Kind(), obj.GetName(), v.GetTimestamp())
|
||||
}
|
||||
|
||||
@@ -190,7 +189,7 @@ func (g *Graph) Process(v *Vertex) error {
|
||||
var checkOK bool
|
||||
var err error
|
||||
|
||||
if global.DEBUG {
|
||||
if g.Flags.Debug {
|
||||
log.Printf("%s[%s]: CheckApply(%t)", obj.Kind(), obj.GetName(), !noop)
|
||||
}
|
||||
|
||||
@@ -217,7 +216,7 @@ func (g *Graph) Process(v *Vertex) error {
|
||||
if checkOK && err != nil { // should never return this way
|
||||
log.Fatalf("%s[%s]: CheckApply(): %t, %+v", obj.Kind(), obj.GetName(), checkOK, err)
|
||||
}
|
||||
if global.DEBUG {
|
||||
if g.Flags.Debug {
|
||||
log.Printf("%s[%s]: CheckApply(): %t, %v", obj.Kind(), obj.GetName(), checkOK, err)
|
||||
}
|
||||
|
||||
@@ -486,7 +485,7 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
|
||||
if (!first) || indegree[v] == 0 {
|
||||
// ensure state is started before continuing on to next vertex
|
||||
for !v.SendEvent(event.EventStart, true, false) {
|
||||
if global.DEBUG {
|
||||
if g.Flags.Debug {
|
||||
// if SendEvent fails, we aren't up yet
|
||||
log.Printf("%s[%s]: Retrying SendEvent(Start)", v.Kind(), v.GetName())
|
||||
// sleep here briefly or otherwise cause
|
||||
|
||||
@@ -22,7 +22,6 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/purpleidea/mgmt/global"
|
||||
"github.com/purpleidea/mgmt/resources"
|
||||
)
|
||||
|
||||
@@ -39,7 +38,7 @@ func (g *Graph) addEdgesByMatchingUIDS(v *Vertex, uids []resources.ResUID) []boo
|
||||
if v == vv { // skip self
|
||||
continue
|
||||
}
|
||||
if global.DEBUG {
|
||||
if g.Flags.Debug {
|
||||
log.Printf("Compile: AutoEdge: Match: %v[%v] with UID: %v[%v]", vv.Kind(), vv.GetName(), uid.Kind(), uid.GetName())
|
||||
}
|
||||
// we must match to an effective UID for the resource,
|
||||
@@ -85,7 +84,7 @@ func (g *Graph) AutoEdges() {
|
||||
log.Printf("%v[%v]: Config: The auto edge list is empty!", v.Kind(), v.GetName())
|
||||
break // inner loop
|
||||
}
|
||||
if global.DEBUG {
|
||||
if g.Flags.Debug {
|
||||
log.Println("Compile: AutoEdge: UIDS:")
|
||||
for i, u := range uids {
|
||||
log.Printf("Compile: AutoEdge: UID%d: %v", i, u)
|
||||
|
||||
@@ -21,8 +21,6 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/purpleidea/mgmt/global"
|
||||
|
||||
errwrap "github.com/pkg/errors"
|
||||
)
|
||||
|
||||
@@ -312,7 +310,7 @@ func (g *Graph) autoGroup(ag AutoGrouper) chan string {
|
||||
wStr := fmt.Sprintf("%s", w)
|
||||
|
||||
if err := ag.vertexCmp(v, w); err != nil { // cmp ?
|
||||
if global.DEBUG {
|
||||
if g.Flags.Debug {
|
||||
strch <- fmt.Sprintf("Compile: Grouping: !GroupCmp for: %s into %s", wStr, vStr)
|
||||
}
|
||||
|
||||
|
||||
@@ -40,6 +40,10 @@ const (
|
||||
graphStatePaused
|
||||
)
|
||||
|
||||
type Flags struct {
|
||||
Debug bool
|
||||
}
|
||||
|
||||
// Graph is the graph structure in this library.
|
||||
// The graph abstract data type (ADT) is defined as follows:
|
||||
// * the directed graph arrows point from left to right ( -> )
|
||||
@@ -49,6 +53,7 @@ const (
|
||||
type Graph struct {
|
||||
Name string
|
||||
Adjacency map[*Vertex]map[*Vertex]*Edge // *Vertex -> *Vertex (edge)
|
||||
Flags Flags
|
||||
state graphState
|
||||
mutex sync.Mutex // used when modifying graph State variable
|
||||
}
|
||||
@@ -105,6 +110,7 @@ func (g *Graph) Copy() *Graph {
|
||||
newGraph := &Graph{
|
||||
Name: g.Name,
|
||||
Adjacency: make(map[*Vertex]map[*Vertex]*Edge, len(g.Adjacency)),
|
||||
Flags: g.Flags,
|
||||
state: g.state,
|
||||
}
|
||||
for k, v := range g.Adjacency {
|
||||
|
||||
@@ -26,17 +26,17 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/purpleidea/mgmt/global"
|
||||
"github.com/purpleidea/mgmt/yamlgraph"
|
||||
)
|
||||
|
||||
const (
|
||||
// PuppetYAMLBufferSize is the maximum buffer size for the yaml input data
|
||||
PuppetYAMLBufferSize = 65535
|
||||
Debug = false // FIXME: integrate with global debug flag
|
||||
)
|
||||
|
||||
func runPuppetCommand(cmd *exec.Cmd) ([]byte, error) {
|
||||
if global.DEBUG {
|
||||
if Debug {
|
||||
log.Printf("Puppet: running command: %v", cmd)
|
||||
}
|
||||
|
||||
@@ -71,7 +71,7 @@ func runPuppetCommand(cmd *exec.Cmd) ([]byte, error) {
|
||||
// will choke on an oversized slice. http://stackoverflow.com/a/33726617/3356612
|
||||
result = append(result, data[0:count]...)
|
||||
}
|
||||
if global.DEBUG {
|
||||
if Debug {
|
||||
log.Printf("Puppet: read %v bytes of data from puppet", len(result))
|
||||
}
|
||||
for scanner := bufio.NewScanner(stderr); scanner.Scan(); {
|
||||
@@ -117,7 +117,7 @@ func ParseConfigFromPuppet(puppetParam, puppetConf string) *yamlgraph.GraphConfi
|
||||
|
||||
// PuppetInterval returns the graph refresh interval from the puppet configuration.
|
||||
func PuppetInterval(puppetConf string) int {
|
||||
if global.DEBUG {
|
||||
if Debug {
|
||||
log.Printf("Puppet: determining graph refresh interval")
|
||||
}
|
||||
var cmd *exec.Cmd
|
||||
|
||||
@@ -20,12 +20,12 @@ package recwatch
|
||||
import (
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"github.com/purpleidea/mgmt/global"
|
||||
)
|
||||
|
||||
// ConfigWatcher returns events on a channel anytime one of its files events.
|
||||
type ConfigWatcher struct {
|
||||
Flags Flags
|
||||
|
||||
ch chan string
|
||||
wg sync.WaitGroup
|
||||
closechan chan struct{}
|
||||
@@ -56,7 +56,7 @@ func (obj *ConfigWatcher) Add(file ...string) {
|
||||
obj.wg.Add(1)
|
||||
go func() {
|
||||
defer obj.wg.Done()
|
||||
ch := ConfigWatch(file[0])
|
||||
ch := obj.ConfigWatch(file[0])
|
||||
for {
|
||||
select {
|
||||
case e := <-ch:
|
||||
@@ -100,7 +100,7 @@ func (obj *ConfigWatcher) Close() {
|
||||
}
|
||||
|
||||
// ConfigWatch writes on the channel every time an event is seen for the path.
|
||||
func ConfigWatch(file string) chan error {
|
||||
func (obj *ConfigWatcher) ConfigWatch(file string) chan error {
|
||||
ch := make(chan error)
|
||||
go func() {
|
||||
recWatcher, err := NewRecWatcher(file, false)
|
||||
@@ -109,9 +109,10 @@ func ConfigWatch(file string) chan error {
|
||||
close(ch)
|
||||
return
|
||||
}
|
||||
recWatcher.Flags = obj.Flags
|
||||
defer recWatcher.Close()
|
||||
for {
|
||||
if global.DEBUG {
|
||||
if obj.Flags.Debug {
|
||||
log.Printf("Watching: %v", file)
|
||||
}
|
||||
select {
|
||||
|
||||
@@ -15,12 +15,9 @@
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
// Package global holds some global variables that are used throughout the code.
|
||||
package global
|
||||
package recwatch
|
||||
|
||||
// These constants are used throughout the program.
|
||||
const (
|
||||
DEBUG = false // add additional log messages
|
||||
TRACE = false // add execution flow log messages
|
||||
VERBOSE = false // add extra log message output
|
||||
)
|
||||
// Flags contains all the constant flags that recwatch needs.
|
||||
type Flags struct {
|
||||
Debug bool
|
||||
}
|
||||
@@ -29,7 +29,6 @@ import (
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"github.com/purpleidea/mgmt/global" // XXX: package mgmtmain instead?
|
||||
"github.com/purpleidea/mgmt/util"
|
||||
|
||||
"gopkg.in/fsnotify.v1"
|
||||
@@ -46,6 +45,7 @@ type Event struct {
|
||||
type RecWatcher struct {
|
||||
Path string // computed path
|
||||
Recurse bool // should we watch recursively?
|
||||
Flags Flags
|
||||
isDir bool // computed isDir
|
||||
safename string // safe path
|
||||
watcher *fsnotify.Watcher
|
||||
@@ -150,12 +150,12 @@ func (obj *RecWatcher) Watch() error {
|
||||
if current == "" { // the empty string top is the root dir ("/")
|
||||
current = "/"
|
||||
}
|
||||
if global.DEBUG {
|
||||
if obj.Flags.Debug {
|
||||
log.Printf("Watching: %s", current) // attempting to watch...
|
||||
}
|
||||
// initialize in the loop so that we can reset on rm-ed handles
|
||||
if err := obj.watcher.Add(current); err != nil {
|
||||
if global.DEBUG {
|
||||
if obj.Flags.Debug {
|
||||
log.Printf("watcher.Add(%s): Error: %v", current, err)
|
||||
}
|
||||
|
||||
@@ -178,7 +178,7 @@ func (obj *RecWatcher) Watch() error {
|
||||
|
||||
select {
|
||||
case event := <-obj.watcher.Events:
|
||||
if global.DEBUG {
|
||||
if obj.Flags.Debug {
|
||||
log.Printf("Watch(%s), Event(%s): %v", current, event.Name, event.Op)
|
||||
}
|
||||
// the deeper you go, the bigger the deltaDepth is...
|
||||
@@ -291,7 +291,7 @@ func (obj *RecWatcher) addSubFolders(p string) error {
|
||||
}
|
||||
// look at all subfolders...
|
||||
walkFn := func(path string, info os.FileInfo, err error) error {
|
||||
if global.DEBUG {
|
||||
if obj.Flags.Debug {
|
||||
log.Printf("Walk: %s (%v): %v", path, info, err)
|
||||
}
|
||||
if err != nil {
|
||||
|
||||
@@ -62,7 +62,6 @@ import (
|
||||
"time"
|
||||
|
||||
cv "github.com/purpleidea/mgmt/converger"
|
||||
"github.com/purpleidea/mgmt/global"
|
||||
"github.com/purpleidea/mgmt/util"
|
||||
"github.com/purpleidea/mgmt/yamlgraph"
|
||||
|
||||
@@ -85,6 +84,12 @@ const (
|
||||
nonInteractivePasswordTimeout = 5 * 2 // five minutes
|
||||
)
|
||||
|
||||
// Flags are constants required by the remote lib.
|
||||
type Flags struct {
|
||||
Program string
|
||||
Debug bool
|
||||
}
|
||||
|
||||
// The SSH struct is the unit building block for a single remote SSH connection.
|
||||
type SSH struct {
|
||||
hostname string // uuid of the host, as used by the --hostname argument
|
||||
@@ -116,7 +121,7 @@ type SSH struct {
|
||||
lock sync.Mutex // mutex to avoid exit races
|
||||
exiting bool // flag to let us know if we're exiting
|
||||
|
||||
program string // name of the binary
|
||||
flags Flags // constant runtime values
|
||||
remotewd string // path to remote working directory
|
||||
execpath string // path to remote mgmt binary
|
||||
filepath string // path to remote file config
|
||||
@@ -224,7 +229,7 @@ func (obj *SSH) Sftp() error {
|
||||
break
|
||||
}
|
||||
|
||||
obj.execpath = path.Join(obj.remotewd, obj.program) // program is a compile time string
|
||||
obj.execpath = path.Join(obj.remotewd, obj.flags.Program) // program is a compile time string
|
||||
log.Printf("Remote: Remote path is: %s", obj.execpath)
|
||||
|
||||
var same bool
|
||||
@@ -448,7 +453,7 @@ func (obj *SSH) forward(remoteConn net.Conn) net.Conn {
|
||||
log.Printf("Remote: io.Copy error: %s", err)
|
||||
// FIXME: what should we do here???
|
||||
}
|
||||
if global.DEBUG {
|
||||
if obj.flags.Debug {
|
||||
log.Printf("Remote: io.Copy finished: %d", n)
|
||||
}
|
||||
}
|
||||
@@ -563,7 +568,7 @@ func (obj *SSH) ExecExit() error {
|
||||
}
|
||||
|
||||
// FIXME: workaround: force a signal!
|
||||
if _, err := obj.simpleRun(fmt.Sprintf("killall -SIGINT %s", obj.program)); err != nil { // FIXME: low specificity
|
||||
if _, err := obj.simpleRun(fmt.Sprintf("killall -SIGINT %s", obj.flags.Program)); err != nil { // FIXME: low specificity
|
||||
log.Printf("Remote: Failed to send SIGINT: %s", err.Error())
|
||||
}
|
||||
|
||||
@@ -572,12 +577,12 @@ func (obj *SSH) ExecExit() error {
|
||||
// try killing the process more violently
|
||||
time.Sleep(10 * time.Second)
|
||||
//obj.session.Signal(ssh.SIGKILL)
|
||||
cmd := fmt.Sprintf("killall -SIGKILL %s", obj.program) // FIXME: low specificity
|
||||
cmd := fmt.Sprintf("killall -SIGKILL %s", obj.flags.Program) // FIXME: low specificity
|
||||
obj.simpleRun(cmd)
|
||||
}()
|
||||
|
||||
// FIXME: workaround: wait (spin lock) until process quits cleanly...
|
||||
cmd := fmt.Sprintf("while killall -0 %s 2> /dev/null; do sleep 1s; done", obj.program) // FIXME: low specificity
|
||||
cmd := fmt.Sprintf("while killall -0 %s 2> /dev/null; do sleep 1s; done", obj.flags.Program) // FIXME: low specificity
|
||||
if _, err := obj.simpleRun(cmd); err != nil {
|
||||
return fmt.Errorf("Error waiting: %s", err)
|
||||
}
|
||||
@@ -704,11 +709,11 @@ type Remotes struct {
|
||||
cuids map[string]cv.ConvergerUID // map to each SSH struct with the remote as the key
|
||||
callbackCancelFunc func() // stored callback function cancel function
|
||||
|
||||
program string // name of the program
|
||||
flags Flags // constant runtime values
|
||||
}
|
||||
|
||||
// NewRemotes builds a Remotes struct.
|
||||
func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fileWatch chan string, cConns uint16, interactive bool, sshPrivIdRsa string, caching bool, depth uint16, prefix string, converger cv.Converger, convergerCb func(func(map[string]bool) error) (func(), error), program string) *Remotes {
|
||||
func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fileWatch chan string, cConns uint16, interactive bool, sshPrivIdRsa string, caching bool, depth uint16, prefix string, converger cv.Converger, convergerCb func(func(map[string]bool) error) (func(), error), flags Flags) *Remotes {
|
||||
return &Remotes{
|
||||
clientURLs: clientURLs,
|
||||
remoteURLs: remoteURLs,
|
||||
@@ -728,7 +733,7 @@ func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fi
|
||||
semaphore: NewSemaphore(int(cConns)),
|
||||
hostnames: make([]string, len(remotes)),
|
||||
cuids: make(map[string]cv.ConvergerUID),
|
||||
program: program,
|
||||
flags: flags,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -818,7 +823,7 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) {
|
||||
caching: obj.caching,
|
||||
converger: obj.converger,
|
||||
prefix: obj.prefix,
|
||||
program: obj.program,
|
||||
flags: obj.flags,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -924,7 +929,7 @@ func (obj *Remotes) Run() {
|
||||
if !ok { // no status on hostname means unconverged!
|
||||
continue
|
||||
}
|
||||
if global.DEBUG {
|
||||
if obj.flags.Debug {
|
||||
log.Printf("Remote: Converged: Status: %+v", obj.converger.Status())
|
||||
}
|
||||
// if exiting, don't update, it will be unregistered...
|
||||
|
||||
@@ -33,7 +33,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/event"
|
||||
"github.com/purpleidea/mgmt/global" // XXX: package mgmtmain instead?
|
||||
"github.com/purpleidea/mgmt/recwatch"
|
||||
"github.com/purpleidea/mgmt/util"
|
||||
|
||||
@@ -171,7 +170,7 @@ func (obj *FileRes) Watch(processChan chan event.Event) error {
|
||||
var exit = false
|
||||
|
||||
for {
|
||||
if global.DEBUG {
|
||||
if obj.debug {
|
||||
log.Printf("%s[%s]: Watching: %s", obj.Kind(), obj.GetName(), obj.Path) // attempting to watch...
|
||||
}
|
||||
|
||||
@@ -185,7 +184,7 @@ func (obj *FileRes) Watch(processChan chan event.Event) error {
|
||||
if err := event.Error; err != nil {
|
||||
return errwrap.Wrapf(err, "Unknown %s[%s] watcher error", obj.Kind(), obj.GetName())
|
||||
}
|
||||
if global.DEBUG { // don't access event.Body if event.Error isn't nil
|
||||
if obj.debug { // don't access event.Body if event.Error isn't nil
|
||||
log.Printf("%s[%s]: Event(%s): %v", obj.Kind(), obj.GetName(), event.Body.Name, event.Body.Op)
|
||||
}
|
||||
send = true
|
||||
@@ -289,7 +288,7 @@ func mapPaths(fileInfos []FileInfo) map[string]FileInfo {
|
||||
func (obj *FileRes) fileCheckApply(apply bool, src io.ReadSeeker, dst string, sha256sum string) (string, bool, error) {
|
||||
// TODO: does it make sense to switch dst to an io.Writer ?
|
||||
// TODO: use obj.Force when dealing with symlinks and other file types!
|
||||
if global.DEBUG {
|
||||
if obj.debug {
|
||||
log.Printf("fileCheckApply: %s -> %s", src, dst)
|
||||
}
|
||||
|
||||
@@ -386,7 +385,7 @@ func (obj *FileRes) fileCheckApply(apply bool, src io.ReadSeeker, dst string, sh
|
||||
if !apply {
|
||||
return sha256sum, false, nil
|
||||
}
|
||||
if global.DEBUG {
|
||||
if obj.debug {
|
||||
log.Printf("fileCheckApply: Apply: %s -> %s", src, dst)
|
||||
}
|
||||
|
||||
@@ -407,12 +406,12 @@ func (obj *FileRes) fileCheckApply(apply bool, src io.ReadSeeker, dst string, sh
|
||||
// syscall.Splice(rfd int, roff *int64, wfd int, woff *int64, len int, flags int) (n int64, err error)
|
||||
|
||||
// TODO: should we offer a way to cancel the copy on ^C ?
|
||||
if global.DEBUG {
|
||||
if obj.debug {
|
||||
log.Printf("fileCheckApply: Copy: %s -> %s", src, dst)
|
||||
}
|
||||
if n, err := io.Copy(dstFile, src); err != nil {
|
||||
return sha256sum, false, err
|
||||
} else if global.DEBUG {
|
||||
} else if obj.debug {
|
||||
log.Printf("fileCheckApply: Copied: %v", n)
|
||||
}
|
||||
return sha256sum, false, dstFile.Sync()
|
||||
@@ -422,7 +421,7 @@ func (obj *FileRes) fileCheckApply(apply bool, src io.ReadSeeker, dst string, sh
|
||||
// It is recursive and can create directories directly, and files via the usual
|
||||
// fileCheckApply method. It returns checkOK and error as is normally expected.
|
||||
func (obj *FileRes) syncCheckApply(apply bool, src, dst string) (bool, error) {
|
||||
if global.DEBUG {
|
||||
if obj.debug {
|
||||
log.Printf("syncCheckApply: %s -> %s", src, dst)
|
||||
}
|
||||
if src == "" || dst == "" {
|
||||
@@ -440,12 +439,12 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string) (bool, error) {
|
||||
}
|
||||
|
||||
if !srcIsDir && !dstIsDir {
|
||||
if global.DEBUG {
|
||||
if obj.debug {
|
||||
log.Printf("syncCheckApply: %s -> %s", src, dst)
|
||||
}
|
||||
fin, err := os.Open(src)
|
||||
if err != nil {
|
||||
if global.DEBUG && os.IsNotExist(err) { // if we get passed an empty src
|
||||
if obj.debug && os.IsNotExist(err) { // if we get passed an empty src
|
||||
log.Printf("syncCheckApply: Missing src: %s", src)
|
||||
}
|
||||
return false, err
|
||||
@@ -501,7 +500,7 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string) (bool, error) {
|
||||
delete(smartDst, relPathFile) // rm from purge list
|
||||
}
|
||||
|
||||
if global.DEBUG {
|
||||
if obj.debug {
|
||||
log.Printf("syncCheckApply: mkdir -m %s '%s'", fileInfo.Mode(), absDst)
|
||||
}
|
||||
if err := os.Mkdir(absDst, fileInfo.Mode()); err != nil {
|
||||
@@ -512,7 +511,7 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string) (bool, error) {
|
||||
// if we're a regular file, the recurse will create it
|
||||
}
|
||||
|
||||
if global.DEBUG {
|
||||
if obj.debug {
|
||||
log.Printf("syncCheckApply: Recurse: %s -> %s", absSrc, absDst)
|
||||
}
|
||||
if obj.Recurse {
|
||||
|
||||
@@ -31,7 +31,6 @@ import (
|
||||
"github.com/godbus/dbus"
|
||||
errwrap "github.com/pkg/errors"
|
||||
machined "github.com/purpleidea/go-systemd/machine1"
|
||||
"github.com/purpleidea/mgmt/global"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -218,13 +217,13 @@ func (obj *NspawnRes) CheckApply(apply bool) (checkOK bool, err error) {
|
||||
obj.GetName())
|
||||
}
|
||||
}
|
||||
if global.DEBUG {
|
||||
if obj.debug {
|
||||
log.Printf("%s[%s]: properties: %v", obj.Kind(), obj.GetName(), properties)
|
||||
}
|
||||
// if the machine doesn't exist and is supposed to
|
||||
// be stopped or the state matches we're done
|
||||
if !exists && obj.State == stopped || properties["State"] == obj.State {
|
||||
if global.DEBUG {
|
||||
if obj.debug {
|
||||
log.Printf("%s[%s]: CheckApply() in valid state", obj.Kind(), obj.GetName())
|
||||
}
|
||||
return true, nil
|
||||
@@ -235,7 +234,7 @@ func (obj *NspawnRes) CheckApply(apply bool) (checkOK bool, err error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if global.DEBUG {
|
||||
if obj.debug {
|
||||
log.Printf("%s[%s]: CheckApply() applying '%s' state", obj.Kind(), obj.GetName(), obj.State)
|
||||
}
|
||||
|
||||
|
||||
@@ -26,7 +26,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/event"
|
||||
"github.com/purpleidea/mgmt/global" // XXX: package mgmtmain instead?
|
||||
"github.com/purpleidea/mgmt/resources/packagekit"
|
||||
"github.com/purpleidea/mgmt/util"
|
||||
|
||||
@@ -143,7 +142,7 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error {
|
||||
var exit = false
|
||||
|
||||
for {
|
||||
if global.DEBUG {
|
||||
if obj.debug {
|
||||
log.Printf("%s: Watching...", obj.fmtNames(obj.getNames()))
|
||||
}
|
||||
|
||||
@@ -153,7 +152,7 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error {
|
||||
cuid.SetConverged(false)
|
||||
|
||||
// FIXME: ask packagekit for info on what packages changed
|
||||
if global.DEBUG {
|
||||
if obj.debug {
|
||||
log.Printf("%s: Event: %v", obj.fmtNames(obj.getNames()), event.Name)
|
||||
}
|
||||
|
||||
|
||||
@@ -30,7 +30,6 @@ import (
|
||||
// TODO: should each resource be a sub-package?
|
||||
"github.com/purpleidea/mgmt/converger"
|
||||
"github.com/purpleidea/mgmt/event"
|
||||
"github.com/purpleidea/mgmt/global"
|
||||
|
||||
errwrap "github.com/pkg/errors"
|
||||
)
|
||||
@@ -57,6 +56,7 @@ type Data struct {
|
||||
//Noop bool
|
||||
Converger converger.Converger
|
||||
Prefix string // the prefix to be used for the pgraph namespace
|
||||
Debug bool
|
||||
// NOTE: we can add more fields here if needed for the resources.
|
||||
}
|
||||
|
||||
@@ -172,6 +172,7 @@ type BaseRes struct {
|
||||
events chan event.Event
|
||||
converger converger.Converger // converged tracking
|
||||
prefix string // base prefix for this resource
|
||||
debug bool
|
||||
state ResState
|
||||
watching bool // is Watch() loop running ?
|
||||
isStateOK bool // whether the state is okay based on events or not
|
||||
@@ -271,6 +272,7 @@ func (obj *BaseRes) Events() chan event.Event {
|
||||
func (obj *BaseRes) AssociateData(data *Data) {
|
||||
obj.converger = data.Converger
|
||||
obj.prefix = data.Prefix
|
||||
obj.debug = data.Debug
|
||||
}
|
||||
|
||||
// IsWatching tells us if the Watch() function is running.
|
||||
@@ -290,7 +292,7 @@ func (obj *BaseRes) GetState() ResState {
|
||||
|
||||
// SetState sets the state of the resource.
|
||||
func (obj *BaseRes) SetState(state ResState) {
|
||||
if global.DEBUG {
|
||||
if obj.debug {
|
||||
log.Printf("%s[%s]: State: %v -> %v", obj.Kind(), obj.GetName(), obj.GetState(), state)
|
||||
}
|
||||
obj.state = state
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"reflect"
|
||||
|
||||
"github.com/purpleidea/mgmt/event"
|
||||
"github.com/purpleidea/mgmt/global"
|
||||
|
||||
multierr "github.com/hashicorp/go-multierror"
|
||||
errwrap "github.com/pkg/errors"
|
||||
@@ -120,7 +119,7 @@ type Send struct {
|
||||
// SendRecv pulls in the sent values into the receive slots. It is called by the
|
||||
// receiver and must be given as input the full resource struct to receive on.
|
||||
func (obj *BaseRes) SendRecv(res Res) (map[string]bool, error) {
|
||||
if global.DEBUG {
|
||||
if obj.debug {
|
||||
// NOTE: this could expose private resource data like passwords
|
||||
log.Printf("%s[%s]: SendRecv: %+v", obj.Kind(), obj.GetName(), obj.Recv)
|
||||
}
|
||||
@@ -141,7 +140,7 @@ func (obj *BaseRes) SendRecv(res Res) (map[string]bool, error) {
|
||||
value2 := obj2.FieldByName(k)
|
||||
kind2 := value2.Kind()
|
||||
|
||||
if global.DEBUG {
|
||||
if obj.debug {
|
||||
log.Printf("Send(%s) has %v: %v", type1, kind1, value1)
|
||||
log.Printf("Recv(%s) has %v: %v", type2, kind2, value2)
|
||||
}
|
||||
|
||||
@@ -26,7 +26,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/event"
|
||||
"github.com/purpleidea/mgmt/global"
|
||||
|
||||
errwrap "github.com/pkg/errors"
|
||||
"github.com/rgbkrk/libvirt-go"
|
||||
@@ -190,7 +189,7 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error {
|
||||
if domName == obj.GetName() {
|
||||
eventChan <- lifecycleEvent.Event
|
||||
}
|
||||
} else if global.DEBUG {
|
||||
} else if obj.debug {
|
||||
log.Printf("%s[%s]: Event details isn't DomainLifecycleEvent", obj.Kind(), obj.GetName())
|
||||
}
|
||||
return 0
|
||||
|
||||
@@ -88,7 +88,8 @@ func (obj *GAPI) SwitchStream() chan error {
|
||||
ch <- fmt.Errorf("yamlgraph: GAPI is not initialized")
|
||||
return
|
||||
}
|
||||
configChan := recwatch.ConfigWatch(*obj.File)
|
||||
configWatcher := recwatch.NewConfigWatcher()
|
||||
configChan := configWatcher.ConfigWatch(*obj.File) // simple
|
||||
for {
|
||||
select {
|
||||
case err, ok := <-configChan: // returns nil events on ok!
|
||||
|
||||
@@ -27,7 +27,6 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/purpleidea/mgmt/gapi"
|
||||
"github.com/purpleidea/mgmt/global"
|
||||
"github.com/purpleidea/mgmt/pgraph"
|
||||
"github.com/purpleidea/mgmt/resources"
|
||||
"github.com/purpleidea/mgmt/util"
|
||||
@@ -35,6 +34,10 @@ import (
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
const (
|
||||
Debug = false // FIXME: integrate with global debug flag
|
||||
)
|
||||
|
||||
type collectorResConfig struct {
|
||||
Kind string `yaml:"kind"`
|
||||
Pattern string `yaml:"pattern"` // XXX: Not Implemented
|
||||
@@ -117,7 +120,7 @@ func (c *GraphConfig) NewGraphFromConfig(hostname string, world gapi.World, noop
|
||||
slice := reflect.ValueOf(iface)
|
||||
// XXX: should we just drop these everywhere and have the kind strings be all lowercase?
|
||||
kind := util.FirstToUpper(name)
|
||||
if global.DEBUG {
|
||||
if Debug {
|
||||
log.Printf("Config: Processing: %v...", kind)
|
||||
}
|
||||
for j := 0; j < slice.Len(); j++ { // loop through resources of same kind
|
||||
|
||||
Reference in New Issue
Block a user