resources: Clean up errors and string printing

This commit is contained in:
James Shubin
2016-11-08 03:49:27 -05:00
parent eef9abf0bf
commit 3bffccc48e
7 changed files with 86 additions and 79 deletions

View File

@@ -282,19 +282,19 @@ func (obj *Main) Run() error {
exitchan := make(chan struct{}) // exit on close exitchan := make(chan struct{}) // exit on close
go func() { go func() {
startchan := make(chan struct{}) // start signal startChan := make(chan struct{}) // start signal
go func() { startchan <- struct{}{} }() go func() { startChan <- struct{}{} }()
log.Println("Etcd: Starting...") log.Println("Etcd: Starting...")
etcdchan := etcd.EtcdWatch(EmbdEtcd) etcdChan := etcd.EtcdWatch(EmbdEtcd)
first := true // first loop or not first := true // first loop or not
for { for {
log.Println("Main: Waiting...") log.Println("Main: Waiting...")
select { select {
case <-startchan: // kick the loop once at start case <-startChan: // kick the loop once at start
// pass // pass
case b := <-etcdchan: case b := <-etcdChan:
if !b { // ignore the message if !b { // ignore the message
continue continue
} }
@@ -302,6 +302,10 @@ func (obj *Main) Run() error {
case err, ok := <-gapiChan: case err, ok := <-gapiChan:
if !ok { // channel closed if !ok { // channel closed
if obj.DEBUG {
log.Printf("Main: GAPI exited")
}
gapiChan = nil // disable it
continue continue
} }
if err != nil { if err != nil {
@@ -469,7 +473,7 @@ func (obj *Main) Run() error {
} }
if obj.DEBUG { if obj.DEBUG {
log.Printf("Graph: %v", G) log.Printf("Main: Graph: %v", G)
} }
wg.Wait() // wait for primary go routines to exit wg.Wait() // wait for primary go routines to exit

View File

@@ -21,7 +21,6 @@ import (
"bufio" "bufio"
"bytes" "bytes"
"encoding/gob" "encoding/gob"
"errors"
"fmt" "fmt"
"log" "log"
"os/exec" "os/exec"
@@ -30,6 +29,8 @@ import (
"github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
errwrap "github.com/pkg/errors"
) )
func init() { func init() {
@@ -151,7 +152,7 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error {
cmdReader, err := cmd.StdoutPipe() cmdReader, err := cmd.StdoutPipe()
if err != nil { if err != nil {
return fmt.Errorf("%s[%s]: Error creating StdoutPipe for Cmd: %v", obj.Kind(), obj.GetName(), err) return errwrap.Wrapf(err, "Error creating StdoutPipe for Cmd")
} }
scanner := bufio.NewScanner(cmdReader) scanner := bufio.NewScanner(cmdReader)
@@ -162,7 +163,7 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error {
cmd.Process.Kill() // TODO: is this necessary? cmd.Process.Kill() // TODO: is this necessary?
}() }()
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
return fmt.Errorf("%s[%s]: Error starting Cmd: %v", obj.Kind(), obj.GetName(), err) return errwrap.Wrapf(err, "Error starting Cmd")
} }
bufioch, errch = obj.BufioChanScanner(scanner) bufioch, errch = obj.BufioChanScanner(scanner)
@@ -174,7 +175,7 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error {
case text := <-bufioch: case text := <-bufioch:
cuid.SetConverged(false) cuid.SetConverged(false)
// each time we get a line of output, we loop! // each time we get a line of output, we loop!
log.Printf("%v[%v]: Watch output: %s", obj.Kind(), obj.GetName(), text) log.Printf("%s[%s]: Watch output: %s", obj.Kind(), obj.GetName(), text)
if text != "" { if text != "" {
send = true send = true
} }
@@ -184,10 +185,10 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error {
if err == nil { // EOF if err == nil { // EOF
// FIXME: add an "if watch command ends/crashes" // FIXME: add an "if watch command ends/crashes"
// restart or generate error option // restart or generate error option
return fmt.Errorf("%s[%s]: Reached EOF", obj.Kind(), obj.GetName()) return fmt.Errorf("Reached EOF")
} }
// error reading input? // error reading input?
return fmt.Errorf("Unknown %s[%s] error: %v", obj.Kind(), obj.GetName(), err) return errwrap.Wrapf(err, "Unknown error")
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false) cuid.SetConverged(false)
@@ -221,7 +222,7 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error {
// input is true. It returns error info and if the state check passed or not. // input is true. It returns error info and if the state check passed or not.
// TODO: expand the IfCmd to be a list of commands // TODO: expand the IfCmd to be a list of commands
func (obj *ExecRes) CheckApply(apply bool) (checkok bool, err error) { func (obj *ExecRes) CheckApply(apply bool) (checkok bool, err error) {
log.Printf("%v[%v]: CheckApply(%t)", obj.Kind(), obj.GetName(), apply) log.Printf("%s[%s]: CheckApply(%t)", obj.Kind(), obj.GetName(), apply)
// if there is a watch command, but no if command, run based on state // if there is a watch command, but no if command, run based on state
if obj.WatchCmd != "" && obj.IfCmd == "" { if obj.WatchCmd != "" && obj.IfCmd == "" {
@@ -274,7 +275,7 @@ func (obj *ExecRes) CheckApply(apply bool) (checkok bool, err error) {
} }
// apply portion // apply portion
log.Printf("%v[%v]: Apply", obj.Kind(), obj.GetName()) log.Printf("%s[%s]: Apply", obj.Kind(), obj.GetName())
var cmdName string var cmdName string
var cmdArgs []string var cmdArgs []string
if obj.Shell == "" { if obj.Shell == "" {
@@ -295,9 +296,8 @@ func (obj *ExecRes) CheckApply(apply bool) (checkok bool, err error) {
var out bytes.Buffer var out bytes.Buffer
cmd.Stdout = &out cmd.Stdout = &out
if err = cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
log.Printf("%v[%v]: Error starting Cmd: %v", obj.Kind(), obj.GetName(), err) return false, errwrap.Wrapf(err, "Error starting Cmd")
return false, err
} }
timeout := obj.Timeout timeout := obj.Timeout
@@ -308,25 +308,25 @@ func (obj *ExecRes) CheckApply(apply bool) (checkok bool, err error) {
go func() { done <- cmd.Wait() }() go func() { done <- cmd.Wait() }()
select { select {
case err = <-done: case err := <-done:
if err != nil { if err != nil {
log.Printf("%v[%v]: Error waiting for Cmd: %v", obj.Kind(), obj.GetName(), err) e := errwrap.Wrapf(err, "Error waiting for Cmd")
return false, err return false, e
} }
case <-util.TimeAfterOrBlock(timeout): case <-util.TimeAfterOrBlock(timeout):
log.Printf("%v[%v]: Timeout waiting for Cmd", obj.Kind(), obj.GetName())
//cmd.Process.Kill() // TODO: is this necessary? //cmd.Process.Kill() // TODO: is this necessary?
return false, errors.New("Timeout waiting for Cmd!") return false, fmt.Errorf("Timeout waiting for Cmd!")
} }
// TODO: if we printed the stdout while the command is running, this // TODO: if we printed the stdout while the command is running, this
// would be nice, but it would require terminal log output that doesn't // would be nice, but it would require terminal log output that doesn't
// interleave all the parallel parts which would mix it all up... // interleave all the parallel parts which would mix it all up...
if s := out.String(); s == "" { if s := out.String(); s == "" {
log.Printf("Exec[%v]: Command output is empty!", obj.Name) log.Printf("%s[%s]: Command output is empty!", obj.Kind(), obj.GetName())
} else { } else {
log.Printf("Exec[%v]: Command output is:", obj.Name) log.Printf("%s[%s]: Command output is:", obj.Kind(), obj.GetName())
log.Printf(out.String()) log.Printf(out.String())
} }
// XXX: return based on exit value!! // XXX: return based on exit value!!

View File

@@ -36,6 +36,8 @@ import (
"github.com/purpleidea/mgmt/global" // XXX: package mgmtmain instead? "github.com/purpleidea/mgmt/global" // XXX: package mgmtmain instead?
"github.com/purpleidea/mgmt/recwatch" "github.com/purpleidea/mgmt/recwatch"
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
errwrap "github.com/pkg/errors"
) )
func init() { func init() {
@@ -182,7 +184,7 @@ func (obj *FileRes) Watch(processChan chan event.Event) error {
} }
cuid.SetConverged(false) cuid.SetConverged(false)
if err := event.Error; err != nil { if err := event.Error; err != nil {
return fmt.Errorf("Unknown %s[%s] watcher error: %v", obj.Kind(), obj.GetName(), err) return errwrap.Wrapf(err, "Unknown %s[%s] watcher error", obj.Kind(), obj.GetName())
} }
if global.DEBUG { // don't access event.Body if event.Error isn't nil if global.DEBUG { // don't access event.Body if event.Error isn't nil
log.Printf("%s[%s]: Event(%s): %v", obj.Kind(), obj.GetName(), event.Body.Name, event.Body.Op) log.Printf("%s[%s]: Event(%s): %v", obj.Kind(), obj.GetName(), event.Body.Name, event.Body.Op)
@@ -258,7 +260,7 @@ func ReadDir(path string) ([]FileInfo, error) {
abs := path + smartPath(fi) abs := path + smartPath(fi)
rel, err := filepath.Rel(path, abs) // NOTE: calls Clean() rel, err := filepath.Rel(path, abs) // NOTE: calls Clean()
if err != nil { // shouldn't happen if err != nil { // shouldn't happen
return nil, fmt.Errorf("ReadDir: Unhandled error: %v", err) return nil, errwrap.Wrapf(err, "ReadDir: Unhandled error")
} }
if fi.IsDir() { if fi.IsDir() {
rel += "/" // add a trailing slash for dirs rel += "/" // add a trailing slash for dirs
@@ -521,7 +523,7 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string) (bool, error) {
} }
if obj.Recurse { if obj.Recurse {
if c, err := obj.syncCheckApply(apply, absSrc, absDst); err != nil { // recurse if c, err := obj.syncCheckApply(apply, absSrc, absDst); err != nil { // recurse
return false, fmt.Errorf("syncCheckApply: Recurse failed: %v", err) return false, errwrap.Wrapf(err, "syncCheckApply: Recurse failed")
} else if !c { // don't let subsequent passes make this true } else if !c { // don't let subsequent passes make this true
checkOK = false checkOK = false
} }
@@ -562,7 +564,7 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string) (bool, error) {
_ = absSrc _ = absSrc
//log.Printf("syncCheckApply: Recurse rm: %s -> %s", absSrc, absDst) //log.Printf("syncCheckApply: Recurse rm: %s -> %s", absSrc, absDst)
//if c, err := obj.syncCheckApply(apply, absSrc, absDst); err != nil { //if c, err := obj.syncCheckApply(apply, absSrc, absDst); err != nil {
// return false, fmt.Errorf("syncCheckApply: Recurse rm failed: %v", err) // return false, errwrap.Wrapf(err, "syncCheckApply: Recurse rm failed")
//} else if !c { // don't let subsequent passes make this true //} else if !c { // don't let subsequent passes make this true
// checkOK = false // checkOK = false
//} //}
@@ -580,7 +582,7 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string) (bool, error) {
// contentCheckApply performs a CheckApply for the file existence and content. // contentCheckApply performs a CheckApply for the file existence and content.
func (obj *FileRes) contentCheckApply(apply bool) (checkOK bool, _ error) { func (obj *FileRes) contentCheckApply(apply bool) (checkOK bool, _ error) {
log.Printf("%v[%v]: contentCheckApply(%t)", obj.Kind(), obj.GetName(), apply) log.Printf("%s[%s]: contentCheckApply(%t)", obj.Kind(), obj.GetName(), apply)
if obj.State == "absent" { if obj.State == "absent" {
if _, err := os.Stat(obj.path); os.IsNotExist(err) { if _, err := os.Stat(obj.path); os.IsNotExist(err) {
@@ -638,7 +640,7 @@ func (obj *FileRes) contentCheckApply(apply bool) (checkOK bool, _ error) {
// CheckApply checks the resource state and applies the resource if the bool // CheckApply checks the resource state and applies the resource if the bool
// input is true. It returns error info and if the state check passed or not. // input is true. It returns error info and if the state check passed or not.
func (obj *FileRes) CheckApply(apply bool) (checkOK bool, _ error) { func (obj *FileRes) CheckApply(apply bool) (checkOK bool, _ error) {
log.Printf("%v[%v]: CheckApply(%t)", obj.Kind(), obj.GetName(), apply) log.Printf("%s[%s]: CheckApply(%t)", obj.Kind(), obj.GetName(), apply)
if obj.isStateOK { // cache the state if obj.isStateOK { // cache the state
return true, nil return true, nil

View File

@@ -113,7 +113,7 @@ func (obj *NoopRes) Watch(processChan chan event.Event) error {
// CheckApply method for Noop resource. Does nothing, returns happy! // CheckApply method for Noop resource. Does nothing, returns happy!
func (obj *NoopRes) CheckApply(apply bool) (checkok bool, err error) { func (obj *NoopRes) CheckApply(apply bool) (checkok bool, err error) {
log.Printf("%v[%v]: CheckApply(%t)", obj.Kind(), obj.GetName(), apply) log.Printf("%s[%s]: CheckApply(%t)", obj.Kind(), obj.GetName(), apply)
return true, nil // state is always okay return true, nil // state is always okay
} }

View File

@@ -19,7 +19,6 @@ package resources
import ( import (
"encoding/gob" "encoding/gob"
"errors"
"fmt" "fmt"
"log" "log"
"path" "path"
@@ -30,6 +29,8 @@ import (
"github.com/purpleidea/mgmt/global" // XXX: package mgmtmain instead? "github.com/purpleidea/mgmt/global" // XXX: package mgmtmain instead?
"github.com/purpleidea/mgmt/resources/packagekit" "github.com/purpleidea/mgmt/resources/packagekit"
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
errwrap "github.com/pkg/errors"
) )
func init() { func init() {
@@ -76,7 +77,7 @@ func (obj *PkgRes) Init() error {
result, err := obj.pkgMappingHelper(bus) result, err := obj.pkgMappingHelper(bus)
if err != nil { if err != nil {
return fmt.Errorf("The pkgMappingHelper failed with: %v.", err) return errwrap.Wrapf(err, "The pkgMappingHelper failed")
} }
data, ok := result[obj.Name] // lookup single package (init does just one) data, ok := result[obj.Name] // lookup single package (init does just one)
@@ -88,7 +89,7 @@ func (obj *PkgRes) Init() error {
packageIDs := []string{data.PackageID} // just one for now packageIDs := []string{data.PackageID} // just one for now
filesMap, err := bus.GetFilesByPackageID(packageIDs) filesMap, err := bus.GetFilesByPackageID(packageIDs)
if err != nil { if err != nil {
return fmt.Errorf("Can't run GetFilesByPackageID: %v", err) return errwrap.Wrapf(err, "Can't run GetFilesByPackageID")
} }
if files, ok := filesMap[data.PackageID]; ok { if files, ok := filesMap[data.PackageID]; ok {
obj.fileList = util.DirifyFileList(files, false) obj.fileList = util.DirifyFileList(files, false)
@@ -129,13 +130,13 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error {
bus := packagekit.NewBus() bus := packagekit.NewBus()
if bus == nil { if bus == nil {
log.Fatal("Can't connect to PackageKit bus.") return fmt.Errorf("Can't connect to PackageKit bus.")
} }
defer bus.Close() defer bus.Close()
ch, err := bus.WatchChanges() ch, err := bus.WatchChanges()
if err != nil { if err != nil {
log.Fatalf("Error adding signal match: %v", err) return errwrap.Wrapf(err, "Error adding signal match")
} }
var send = false // send event? var send = false // send event?
@@ -144,7 +145,7 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error {
for { for {
if global.DEBUG { if global.DEBUG {
log.Printf("%v: Watching...", obj.fmtNames(obj.getNames())) log.Printf("%s: Watching...", obj.fmtNames(obj.getNames()))
} }
obj.SetState(ResStateWatching) // reset obj.SetState(ResStateWatching) // reset
@@ -154,7 +155,7 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error {
// FIXME: ask packagekit for info on what packages changed // FIXME: ask packagekit for info on what packages changed
if global.DEBUG { if global.DEBUG {
log.Printf("%v: Event: %v", obj.fmtNames(obj.getNames()), event.Name) log.Printf("%s: Event: %v", obj.fmtNames(obj.getNames()), event.Name)
} }
// since the chan is buffered, remove any supplemental // since the chan is buffered, remove any supplemental
@@ -217,9 +218,9 @@ func (obj *PkgRes) getNames() []string {
// pretty print for header values // pretty print for header values
func (obj *PkgRes) fmtNames(names []string) string { func (obj *PkgRes) fmtNames(names []string) string {
if len(obj.GetGroup()) > 0 { // grouped elements if len(obj.GetGroup()) > 0 { // grouped elements
return fmt.Sprintf("%v[autogroup:(%v)]", obj.Kind(), strings.Join(names, ",")) return fmt.Sprintf("%s[autogroup:(%v)]", obj.Kind(), strings.Join(names, ","))
} }
return fmt.Sprintf("%v[%v]", obj.Kind(), obj.GetName()) return fmt.Sprintf("%s[%s]", obj.Kind(), obj.GetName())
} }
func (obj *PkgRes) groupMappingHelper() map[string]string { func (obj *PkgRes) groupMappingHelper() map[string]string {
@@ -228,7 +229,7 @@ func (obj *PkgRes) groupMappingHelper() map[string]string {
for _, x := range g { for _, x := range g {
pkg, ok := x.(*PkgRes) // convert from Res pkg, ok := x.(*PkgRes) // convert from Res
if !ok { if !ok {
log.Fatalf("Grouped member %v is not a %v", x, obj.Kind()) log.Fatalf("Grouped member %v is not a %s", x, obj.Kind())
} }
result[pkg.Name] = pkg.State result[pkg.Name] = pkg.State
} }
@@ -254,9 +255,9 @@ func (obj *PkgRes) pkgMappingHelper(bus *packagekit.Conn) (map[string]*packageki
if !obj.AllowUnsupported { if !obj.AllowUnsupported {
filter += packagekit.PK_FILTER_ENUM_SUPPORTED filter += packagekit.PK_FILTER_ENUM_SUPPORTED
} }
result, e := bus.PackagesToPackageIDs(packageMap, filter) result, err := bus.PackagesToPackageIDs(packageMap, filter)
if e != nil { if err != nil {
return nil, fmt.Errorf("Can't run PackagesToPackageIDs: %v", e) return nil, errwrap.Wrapf(err, "Can't run PackagesToPackageIDs")
} }
return result, nil return result, nil
} }
@@ -264,10 +265,10 @@ func (obj *PkgRes) pkgMappingHelper(bus *packagekit.Conn) (map[string]*packageki
// CheckApply checks the resource state and applies the resource if the bool // CheckApply checks the resource state and applies the resource if the bool
// input is true. It returns error info and if the state check passed or not. // input is true. It returns error info and if the state check passed or not.
func (obj *PkgRes) CheckApply(apply bool) (checkok bool, err error) { func (obj *PkgRes) CheckApply(apply bool) (checkok bool, err error) {
log.Printf("%v: CheckApply(%t)", obj.fmtNames(obj.getNames()), apply) log.Printf("%s: CheckApply(%t)", obj.fmtNames(obj.getNames()), apply)
if obj.State == "" { // TODO: Validate() should replace this check! if obj.State == "" { // TODO: Validate() should replace this check!
log.Fatalf("%v: Package state is undefined!", obj.fmtNames(obj.getNames())) log.Fatalf("%s: Package state is undefined!", obj.fmtNames(obj.getNames()))
} }
if obj.isStateOK { // cache the state if obj.isStateOK { // cache the state
@@ -276,13 +277,13 @@ func (obj *PkgRes) CheckApply(apply bool) (checkok bool, err error) {
bus := packagekit.NewBus() bus := packagekit.NewBus()
if bus == nil { if bus == nil {
return false, errors.New("Can't connect to PackageKit bus.") return false, fmt.Errorf("Can't connect to PackageKit bus.")
} }
defer bus.Close() defer bus.Close()
result, err := obj.pkgMappingHelper(bus) result, err := obj.pkgMappingHelper(bus)
if err != nil { if err != nil {
return false, fmt.Errorf("The pkgMappingHelper failed with: %v.", err) return false, errwrap.Wrapf(err, "The pkgMappingHelper failed")
} }
packageMap := obj.groupMappingHelper() // map[string]string packageMap := obj.groupMappingHelper() // map[string]string
@@ -295,7 +296,7 @@ func (obj *PkgRes) CheckApply(apply bool) (checkok bool, err error) {
// eventually we might be able to drop this constraint! // eventually we might be able to drop this constraint!
states, err := packagekit.FilterState(result, packageList, obj.State) states, err := packagekit.FilterState(result, packageList, obj.State)
if err != nil { if err != nil {
return false, fmt.Errorf("The FilterState method failed with: %v.", err) return false, errwrap.Wrapf(err, "The FilterState method failed")
} }
data, _ := result[obj.Name] // if above didn't error, we won't either! data, _ := result[obj.Name] // if above didn't error, we won't either!
validState := util.BoolMapTrue(util.BoolMapValues(states)) validState := util.BoolMapTrue(util.BoolMapValues(states))
@@ -324,7 +325,7 @@ func (obj *PkgRes) CheckApply(apply bool) (checkok bool, err error) {
} }
// apply portion // apply portion
log.Printf("%v: Apply", obj.fmtNames(obj.getNames())) log.Printf("%s: Apply", obj.fmtNames(obj.getNames()))
readyPackages, err := packagekit.FilterPackageState(result, packageList, obj.State) readyPackages, err := packagekit.FilterPackageState(result, packageList, obj.State)
if err != nil { if err != nil {
return false, err // fail return false, err // fail
@@ -338,7 +339,7 @@ func (obj *PkgRes) CheckApply(apply bool) (checkok bool, err error) {
transactionFlags += packagekit.PK_TRANSACTION_FLAG_ENUM_ONLY_TRUSTED transactionFlags += packagekit.PK_TRANSACTION_FLAG_ENUM_ONLY_TRUSTED
} }
// apply correct state! // apply correct state!
log.Printf("%v: Set: %v...", obj.fmtNames(util.StrListIntersection(applyPackages, obj.getNames())), obj.State) log.Printf("%s: Set: %v...", obj.fmtNames(util.StrListIntersection(applyPackages, obj.getNames())), obj.State)
switch obj.State { switch obj.State {
case "uninstalled": // run remove case "uninstalled": // run remove
// NOTE: packageID is different than when installed, because now // NOTE: packageID is different than when installed, because now
@@ -356,7 +357,7 @@ func (obj *PkgRes) CheckApply(apply bool) (checkok bool, err error) {
if err != nil { if err != nil {
return false, err // fail return false, err // fail
} }
log.Printf("%v: Set: %v success!", obj.fmtNames(util.StrListIntersection(applyPackages, obj.getNames())), obj.State) log.Printf("%s: Set: %v success!", obj.fmtNames(util.StrListIntersection(applyPackages, obj.getNames())), obj.State)
obj.isStateOK = true // reset obj.isStateOK = true // reset
return false, nil // success return false, nil // success
} }

View File

@@ -21,7 +21,6 @@ package resources
import ( import (
"encoding/gob" "encoding/gob"
"errors"
"fmt" "fmt"
"log" "log"
"time" "time"
@@ -32,6 +31,7 @@ import (
systemd "github.com/coreos/go-systemd/dbus" // change namespace systemd "github.com/coreos/go-systemd/dbus" // change namespace
systemdUtil "github.com/coreos/go-systemd/util" systemdUtil "github.com/coreos/go-systemd/util"
"github.com/godbus/dbus" // namespace collides with systemd wrapper "github.com/godbus/dbus" // namespace collides with systemd wrapper
errwrap "github.com/pkg/errors"
) )
func init() { func init() {
@@ -100,14 +100,14 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
conn, err := systemd.NewSystemdConnection() // needs root access conn, err := systemd.NewSystemdConnection() // needs root access
if err != nil { if err != nil {
return fmt.Errorf("Failed to connect to systemd: %s", err) return errwrap.Wrapf(err, "Failed to connect to systemd")
} }
defer conn.Close() defer conn.Close()
// if we share the bus with others, we will get each others messages!! // if we share the bus with others, we will get each others messages!!
bus, err := util.SystemBusPrivateUsable() // don't share the bus connection! bus, err := util.SystemBusPrivateUsable() // don't share the bus connection!
if err != nil { if err != nil {
return fmt.Errorf("Failed to connect to bus: %s", err) return errwrap.Wrapf(err, "Failed to connect to bus")
} }
// XXX: will this detect new units? // XXX: will this detect new units?
@@ -116,7 +116,7 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
buschan := make(chan *dbus.Signal, 10) buschan := make(chan *dbus.Signal, 10)
bus.Signal(buschan) bus.Signal(buschan)
var svc = fmt.Sprintf("%v.service", obj.Name) // systemd name var svc = fmt.Sprintf("%s.service", obj.Name) // systemd name
var send = false // send event? var send = false // send event?
var exit = false var exit = false
var dirty = false var dirty = false
@@ -143,7 +143,7 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
if !invalid { if !invalid {
var notFound = (loadstate.Value == dbus.MakeVariant("not-found")) var notFound = (loadstate.Value == dbus.MakeVariant("not-found"))
if notFound { // XXX: in the loop we'll handle changes better... if notFound { // XXX: in the loop we'll handle changes better...
log.Printf("Failed to find svc: %v", svc) log.Printf("Failed to find svc: %s", svc)
invalid = true // XXX: ? invalid = true // XXX: ?
} }
} }
@@ -154,7 +154,7 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
} }
if invalid { if invalid {
log.Printf("Waiting for: %v", svc) // waiting for svc to appear... log.Printf("Waiting for: %s", svc) // waiting for svc to appear...
if activeSet { if activeSet {
activeSet = false activeSet = false
set.Remove(svc) // no return value should ever occur set.Remove(svc) // no return value should ever occur
@@ -165,7 +165,7 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
case <-buschan: // XXX: wait for new units event to unstick case <-buschan: // XXX: wait for new units event to unstick
cuid.SetConverged(false) cuid.SetConverged(false)
// loop so that we can see the changed invalid signal // loop so that we can see the changed invalid signal
log.Printf("Svc[%v]->DaemonReload()", svc) log.Printf("Svc[%s]->DaemonReload()", svc)
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false) cuid.SetConverged(false)
@@ -191,7 +191,7 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
set.Add(svc) // no return value should ever occur set.Add(svc) // no return value should ever occur
} }
log.Printf("Watching: %v", svc) // attempting to watch... log.Printf("Watching: %s", svc) // attempting to watch...
obj.SetState(ResStateWatching) // reset obj.SetState(ResStateWatching) // reset
select { select {
case event := <-subChannel: case event := <-subChannel:
@@ -203,24 +203,24 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
switch event[svc].ActiveState { switch event[svc].ActiveState {
case "active": case "active":
log.Printf("Svc[%v]->Started", svc) log.Printf("Svc[%s]->Started", svc)
case "inactive": case "inactive":
log.Printf("Svc[%v]->Stopped", svc) log.Printf("Svc[%s]->Stopped", svc)
case "reloading": case "reloading":
log.Printf("Svc[%v]->Reloading", svc) log.Printf("Svc[%s]->Reloading", svc)
default: default:
log.Fatalf("Unknown svc state: %s", event[svc].ActiveState) log.Fatalf("Unknown svc state: %s", event[svc].ActiveState)
} }
} else { } else {
// svc stopped (and ActiveState is nil...) // svc stopped (and ActiveState is nil...)
log.Printf("Svc[%v]->Stopped", svc) log.Printf("Svc[%s]->Stopped", svc)
} }
send = true send = true
dirty = true dirty = true
case err := <-subErrors: case err := <-subErrors:
cuid.SetConverged(false) cuid.SetConverged(false)
return fmt.Errorf("Unknown %s[%s] error: %v", obj.Kind(), obj.GetName(), err) return errwrap.Wrapf(err, "Unknown %s[%s] error", obj.Kind(), obj.GetName())
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false) cuid.SetConverged(false)
@@ -259,33 +259,33 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
// CheckApply checks the resource state and applies the resource if the bool // CheckApply checks the resource state and applies the resource if the bool
// input is true. It returns error info and if the state check passed or not. // input is true. It returns error info and if the state check passed or not.
func (obj *SvcRes) CheckApply(apply bool) (checkok bool, err error) { func (obj *SvcRes) CheckApply(apply bool) (checkok bool, err error) {
log.Printf("%v[%v]: CheckApply(%t)", obj.Kind(), obj.GetName(), apply) log.Printf("%s[%s]: CheckApply(%t)", obj.Kind(), obj.GetName(), apply)
if obj.isStateOK { // cache the state if obj.isStateOK { // cache the state
return true, nil return true, nil
} }
if !systemdUtil.IsRunningSystemd() { if !systemdUtil.IsRunningSystemd() {
return false, errors.New("Systemd is not running.") return false, fmt.Errorf("Systemd is not running.")
} }
conn, err := systemd.NewSystemdConnection() // needs root access conn, err := systemd.NewSystemdConnection() // needs root access
if err != nil { if err != nil {
return false, fmt.Errorf("Failed to connect to systemd: %v", err) return false, errwrap.Wrapf(err, "Failed to connect to systemd")
} }
defer conn.Close() defer conn.Close()
var svc = fmt.Sprintf("%v.service", obj.Name) // systemd name var svc = fmt.Sprintf("%s.service", obj.Name) // systemd name
loadstate, err := conn.GetUnitProperty(svc, "LoadState") loadstate, err := conn.GetUnitProperty(svc, "LoadState")
if err != nil { if err != nil {
return false, fmt.Errorf("Failed to get load state: %v", err) return false, errwrap.Wrapf(err, "Failed to get load state")
} }
// NOTE: we have to compare variants with other variants, they are really strings... // NOTE: we have to compare variants with other variants, they are really strings...
var notFound = (loadstate.Value == dbus.MakeVariant("not-found")) var notFound = (loadstate.Value == dbus.MakeVariant("not-found"))
if notFound { if notFound {
return false, fmt.Errorf("Failed to find svc: %v", svc) return false, errwrap.Wrapf(err, "Failed to find svc: %s", svc)
} }
// XXX: check svc "enabled at boot" or not status... // XXX: check svc "enabled at boot" or not status...
@@ -293,7 +293,7 @@ func (obj *SvcRes) CheckApply(apply bool) (checkok bool, err error) {
//conn.GetUnitProperties(svc) //conn.GetUnitProperties(svc)
activestate, err := conn.GetUnitProperty(svc, "ActiveState") activestate, err := conn.GetUnitProperty(svc, "ActiveState")
if err != nil { if err != nil {
return false, fmt.Errorf("Failed to get active state: %v", err) return false, errwrap.Wrapf(err, "Failed to get active state")
} }
var running = (activestate.Value == dbus.MakeVariant("active")) var running = (activestate.Value == dbus.MakeVariant("active"))
@@ -310,7 +310,7 @@ func (obj *SvcRes) CheckApply(apply bool) (checkok bool, err error) {
} }
// apply portion // apply portion
log.Printf("%v[%v]: Apply", obj.Kind(), obj.GetName()) log.Printf("%s[%s]: Apply", obj.Kind(), obj.GetName())
var files = []string{svc} // the svc represented in a list var files = []string{svc} // the svc represented in a list
if obj.Startup == "enabled" { if obj.Startup == "enabled" {
_, _, err = conn.EnableUnitFiles(files, false, true) _, _, err = conn.EnableUnitFiles(files, false, true)
@@ -320,7 +320,7 @@ func (obj *SvcRes) CheckApply(apply bool) (checkok bool, err error) {
} }
if err != nil { if err != nil {
return false, fmt.Errorf("Unable to change startup status: %v", err) return false, errwrap.Wrapf(err, "Unable to change startup status")
} }
// XXX: do we need to use a buffered channel here? // XXX: do we need to use a buffered channel here?
@@ -329,18 +329,18 @@ func (obj *SvcRes) CheckApply(apply bool) (checkok bool, err error) {
if obj.State == "running" { if obj.State == "running" {
_, err = conn.StartUnit(svc, "fail", result) _, err = conn.StartUnit(svc, "fail", result)
if err != nil { if err != nil {
return false, fmt.Errorf("Failed to start unit: %v", err) return false, errwrap.Wrapf(err, "Failed to start unit")
} }
} else if obj.State == "stopped" { } else if obj.State == "stopped" {
_, err = conn.StopUnit(svc, "fail", result) _, err = conn.StopUnit(svc, "fail", result)
if err != nil { if err != nil {
return false, fmt.Errorf("Failed to stop unit: %v", err) return false, errwrap.Wrapf(err, "Failed to stop unit")
} }
} }
status := <-result status := <-result
if &status == nil { if &status == nil {
return false, errors.New("Systemd service action result is nil") return false, fmt.Errorf("Systemd service action result is nil")
} }
if status != "done" { if status != "done" {
return false, fmt.Errorf("Unknown systemd return string: %v", status) return false, fmt.Errorf("Unknown systemd return string: %v", status)

View File

@@ -95,7 +95,7 @@ func (obj *TimerRes) Watch(processChan chan event.Event) error {
select { select {
case <-ticker.C: // received the timer event case <-ticker.C: // received the timer event
send = true send = true
log.Printf("%v[%v]: received tick", obj.Kind(), obj.GetName()) log.Printf("%s[%s]: received tick", obj.Kind(), obj.GetName())
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false) cuid.SetConverged(false)
@@ -161,6 +161,6 @@ func (obj *TimerRes) Compare(res Res) bool {
// CheckApply method for Timer resource. Does nothing, returns happy! // CheckApply method for Timer resource. Does nothing, returns happy!
func (obj *TimerRes) CheckApply(apply bool) (bool, error) { func (obj *TimerRes) CheckApply(apply bool) (bool, error) {
log.Printf("%v[%v]: CheckApply(%t)", obj.Kind(), obj.GetName(), apply) log.Printf("%s[%s]: CheckApply(%t)", obj.Kind(), obj.GetName(), apply)
return true, nil // state is always okay return true, nil // state is always okay
} }