diff --git a/engine/resources/svc.go b/engine/resources/svc.go index 0c28bb18..583d4a8f 100644 --- a/engine/resources/svc.go +++ b/engine/resources/svc.go @@ -36,6 +36,7 @@ import ( "fmt" "os/user" "path" + "time" "github.com/purpleidea/mgmt/engine" "github.com/purpleidea/mgmt/engine/traits" @@ -139,21 +140,27 @@ func (obj *SvcRes) Cleanup() error { return nil } +// svc is a helper that returns the systemd name. +func (obj *SvcRes) svc() string { + return fmt.Sprintf("%s.service", obj.Name()) +} + // Watch is the primary listener for this resource and it outputs events. func (obj *SvcRes) Watch(ctx context.Context) error { - // obj.Name: svc name if !systemdUtil.IsRunningSystemd() { return fmt.Errorf("systemd is not running") } + ctx, cancel := context.WithCancel(ctx) + defer cancel() // make sure we always close any below ctx just in case! + var conn *systemd.Conn - var bus *dbus.Conn var err error if obj.Session { - conn, err = systemd.NewUserConnection() // user session + conn, err = systemd.NewUserConnectionContext(ctx) // user session } else { - // we want NewSystemConnection but New falls back to this - conn, err = systemd.New() // needs root access + // we want NewSystemConnectionContext but New... falls back to this + conn, err = systemd.NewWithContext(ctx) // needs root access } if err != nil { return errwrap.Wrapf(err, "failed to connect to systemd") @@ -161,6 +168,7 @@ func (obj *SvcRes) Watch(ctx context.Context) error { defer conn.Close() // if we share the bus with others, we will get each others messages!! + var bus *dbus.Conn if obj.Session { bus, err = util.SessionBusPrivateUsable() } else { @@ -171,122 +179,177 @@ func (obj *SvcRes) Watch(ctx context.Context) error { } defer bus.Close() - // XXX: will this detect new units? - bus.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, - "type='signal',interface='org.freedesktop.systemd1.Manager',member='Reloading'") - buschan := make(chan *dbus.Signal, 10) - defer close(buschan) // NOTE: closing a chan that contains a value is ok - bus.Signal(buschan) - defer bus.RemoveSignal(buschan) // not needed here, but nice for symmetry + // NOTE: I guess it's not the worst-case scenario if we drop signal or + // if it fills up and we block. Whichever way the upstream implements it + // we'll have a back log of signals to loop through which is just fine. + chBus := make(chan *dbus.Signal, 10) // TODO: what size if any? + defer close(chBus) // NOTE: closing a chan that contains a value is ok + bus.Signal(chBus) + defer bus.RemoveSignal(chBus) // not needed here, but nice for symmetry + + // Legacy way to do this matching... + //method := "org.freedesktop.DBus.AddMatch" + //flags := dbus.Flags(0) + //args := []interface{}{"type='signal',interface='org.freedesktop.systemd1.Manager',member='Reloading'"} + //call := bus.BusObject().CallWithContext(ctx, method, flags, args...) // *dbus.Call + //if err := call.Err; err != nil { + // return errwrap.Wrapf(err, "failed to connect signal on bus") + //} + matchOptions := []dbus.MatchOption{ + dbus.WithMatchInterface("org.freedesktop.systemd1.Manager"), + dbus.WithMatchMember("Reloading"), + } + if err := bus.AddMatchSignalContext(ctx, matchOptions...); err != nil { + return errwrap.Wrapf(err, "failed to add match signal on bus") + } + defer func() { + // On shutdown, we prefer to give this a chance to run. If we + // use the main ctx, then it will error because ctx cancelled. + ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond) + defer cancel() + if err := bus.RemoveMatchSignalContext(ctx, matchOptions...); err != nil { + obj.init.Logf("failed to remove match signal on bus: %+v", err) + } + }() obj.init.Running() // when started, notify engine that we're running - var svc = fmt.Sprintf("%s.service", obj.Name()) // systemd name - var send = false // send event? - var invalid = false // does the svc exist or not? - var previous bool // previous invalid value + svc := obj.svc() // systemd name - // TODO: do we first need to call conn.Subscribe() ? set := conn.NewSubscriptionSet() // no error should be returned - subChannel, subErrors := set.Subscribe() - //defer close(subChannel) // cannot close receive-only channel - //defer close(subErrors) // cannot close receive-only channel - var activeSet = false + // XXX: dynamic bugs: https://github.com/coreos/go-systemd/issues/474 + set.Add(svc) // it's okay if the svc doesn't exist yet + chSub, chSubErr := set.Subscribe() + //defer close(chSub) // cannot close receive-only channel + //defer close(chSubErr) // cannot close receive-only channel + //chSubClosed := false + //chSubErrClosed := false for { - // XXX: watch for an event for new units... - // XXX: detect if startup enabled/disabled value changes... + //if chSubClosed && chSubErrClosed { + // + //} - previous = invalid - invalid = false - - // firstly, does svc even exist or not? - loadstate, err := conn.GetUnitPropertyContext(ctx, svc, "LoadState") - if err != nil { - obj.init.Logf("failed to get property: %+v", err) - invalid = true + if obj.init.Debug { + obj.init.Logf("watching...") } + select { - if !invalid { - var notFound = (loadstate.Value == dbus.MakeVariant("not-found")) - if notFound { // XXX: in the loop we'll handle changes better... - obj.init.Logf("failed to find svc") - invalid = true // XXX: ? + case sig, ok := <-chBus: + if !ok { + chBus = nil + return fmt.Errorf("unexpected close") // we close this one! } - } - - if previous != invalid { // if invalid changed, send signal - send = true - } - - if invalid { if obj.init.Debug { - obj.init.Logf("waiting for service") // waiting for svc to appear... - } - if activeSet { - activeSet = false - set.Remove(svc) // no return value should ever occur + obj.init.Logf("sig: %+v", sig) } - select { - case <-buschan: // XXX: wait for new units event to unstick - // loop so that we can see the changed invalid signal - obj.init.Logf("daemon reload") + // This event happens if we `systemctl daemon-reload` or + // if `systemctl enable/disable ` is run. For both + // of these situations we seem to always get two events. + // The first seems to have `Body:[true]`, and the second + // has `Body:[false]`. - case <-ctx.Done(): // closed by the engine to signal shutdown - return nil - } - } else { - if !activeSet { - activeSet = true - set.Add(svc) // no return value should ever occur + // https://pkg.go.dev/github.com/godbus/dbus/v5#Signal + //eg: &{Sender::1.287 Path:/org/freedesktop/systemd1 Name:org.freedesktop.systemd1.Manager.Reloading Body:[false] Sequence:7} + if sig.Name != "org.freedesktop.systemd1.Manager.Reloading" { + // not for us + continue } - //obj.init.Logf("watching...") // attempting to watch... - select { - case event := <-subChannel: + if len(sig.Body) == 0 { + // does this ever happen? send a signal for now + obj.init.Logf("daemon reload with empty body") + break // break out of select and send event now + } + if len(sig.Body) > 1 { + // does this ever happen? send a signal for now + obj.init.Logf("daemon reload with big body") + break // break out of select and send event now + } + + b, ok := sig.Body[0].(bool) + if !ok { + // does this ever happen? send a signal for now + obj.init.Logf("daemon reload with badly typed body") + break // break out of select and send event now + } + + // We do all of this annoying parsing to cut our event + // count by half, since these signals seem to come in + // pairs. We skip the "true" one that comes first. + if b { if obj.init.Debug { - obj.init.Logf("event: %+v", event) + obj.init.Logf("skipping daemon-reload start") } - // NOTE: the value returned is a map for some reason... - if event[svc] != nil { - // event[svc].ActiveState is not nil - - switch event[svc].ActiveState { - case "active": - obj.init.Logf("event: started") - case "inactive": - obj.init.Logf("event: stopped") - case "reloading": - obj.init.Logf("event: reloading") - case "failed": - obj.init.Logf("event: failed") - case "activating": - obj.init.Logf("event: activating") - case "deactivating": - obj.init.Logf("event: deactivating") - default: - return fmt.Errorf("unknown svc state: %s", event[svc].ActiveState) - } - } else { - // svc stopped (and ActiveState is nil...) - obj.init.Logf("event: stopped") - } - send = true - - case err := <-subErrors: - return errwrap.Wrapf(err, "unknown %s error", obj) - - case <-ctx.Done(): // closed by the engine to signal shutdown - return nil + continue } + if obj.init.Debug { + obj.init.Logf("daemon reload") // success! + } + + case event, ok := <-chSub: + if !ok { + chSub = nil + //chSubClosed = true + continue + } + if obj.init.Debug { + obj.init.Logf("event: %+v", event) + } + + // The value returned is a map in case we monitor many. + unitStatus, ok := event[svc] + if !ok { // not me + continue + } + + if unitStatus == nil { + if obj.init.Debug { + obj.init.Logf("service stopped") + } + break // break out of select and send event now + } + + msg := "" + switch event[svc].ActiveState { // string + case "active": + msg = "service started" + case "inactive": + msg = "service stopped" + case "reloading": + msg = "service reloading" + case "failed": + msg = "service failed" + case "activating": + msg = "service activating" + case "deactivating": + msg = "service deactivating" + default: + return fmt.Errorf("unknown service state: %s", event[svc].ActiveState) + } + if obj.init.Debug { + obj.init.Logf("%s", msg) + } + + case err, ok := <-chSubErr: + if !ok { + chSubErr = nil + //chSubErrClosed = true + continue + } + if err == nil { + obj.init.Logf("unexpected nil error") + continue + } + return errwrap.Wrapf(err, "unknown error") + + case <-ctx.Done(): // closed by the engine to signal shutdown + return ctx.Err() } - if send { - send = false - obj.init.Event() // notify engine of an event (this can block) - } + obj.init.Event() // notify engine of an event (this can block) } } @@ -297,50 +360,63 @@ func (obj *SvcRes) CheckApply(ctx context.Context, apply bool) (bool, error) { return false, fmt.Errorf("systemd is not running") } + ctx, cancel := context.WithCancel(ctx) + defer cancel() // make sure we always close any below ctx just in case! + var conn *systemd.Conn var err error if obj.Session { - conn, err = systemd.NewUserConnection() // user session + conn, err = systemd.NewUserConnectionContext(ctx) // user session } else { - // we want NewSystemConnection but New falls back to this - conn, err = systemd.New() // needs root access + // we want NewSystemConnectionContext but New... falls back to this + conn, err = systemd.NewWithContext(ctx) // needs root access } if err != nil { return false, errwrap.Wrapf(err, "failed to connect to systemd") } defer conn.Close() - var svc = fmt.Sprintf("%s.service", obj.Name()) // systemd name + // if we share the bus with others, we will get each others messages!! + //var bus *dbus.Conn + //if obj.Session { + // bus, err = util.SessionBusPrivateUsable() + //} else { + // bus, err = util.SystemBusPrivateUsable() + //} + //if err != nil { + // return errwrap.Wrapf(err, "failed to connect to bus") + //} + //defer bus.Close() - loadstate, err := conn.GetUnitPropertyContext(ctx, svc, "LoadState") + svc := obj.svc() // systemd name + + loadState, err := conn.GetUnitPropertyContext(ctx, svc, "LoadState") if err != nil { return false, errwrap.Wrapf(err, "failed to get load state") } // NOTE: we have to compare variants with other variants, they are really strings... - var notFound = (loadstate.Value == dbus.MakeVariant("not-found")) + notFound := (loadState.Value == dbus.MakeVariant("not-found")) if notFound { return false, errwrap.Wrapf(err, "failed to find svc: %s", svc) } - // XXX: check svc "enabled at boot" or not status... - //conn.GetUnitPropertiesContexts(svc) - activestate, err := conn.GetUnitPropertyContext(ctx, svc, "ActiveState") + activeState, err := conn.GetUnitPropertyContext(ctx, svc, "ActiveState") if err != nil { return false, errwrap.Wrapf(err, "failed to get active state") } - var running = (activestate.Value == dbus.MakeVariant("active")) - var stateOK = ((obj.State == "") || (obj.State == "running" && running) || (obj.State == "stopped" && !running)) + running := (activeState.Value == dbus.MakeVariant("active")) + stateOK := ((obj.State == "") || (obj.State == "running" && running) || (obj.State == "stopped" && !running)) - startupstate, err := conn.GetUnitPropertyContext(ctx, svc, "UnitFileState") + startupState, err := conn.GetUnitPropertyContext(ctx, svc, "UnitFileState") if err != nil { return false, errwrap.Wrapf(err, "failed to get unit file state") } - enabled := (startupstate.Value == dbus.MakeVariant("enabled")) - disabled := (startupstate.Value == dbus.MakeVariant("disabled")) + enabled := (startupState.Value == dbus.MakeVariant("enabled")) + disabled := (startupState.Value == dbus.MakeVariant("disabled")) startupOK := ((obj.Startup == "") || (obj.Startup == "enabled" && enabled) || (obj.Startup == "disabled" && disabled)) // NOTE: if this svc resource is embedded as a composite resource inside @@ -352,7 +428,7 @@ func (obj *SvcRes) CheckApply(ctx context.Context, apply bool) (bool, error) { // trait to the parent resource, or we'll panic when we call this line.) // It might not be recommended to use the Watch method without a thought // to what actually happens when we would run Send(), and other methods. - var refresh = obj.init.Refresh() // do we have a pending reload to apply? + refresh := obj.init.Refresh() // do we have a pending reload to apply? if stateOK && startupOK && !refresh { return true, nil // we are in the correct state @@ -364,25 +440,40 @@ func (obj *SvcRes) CheckApply(ctx context.Context, apply bool) (bool, error) { } // apply portion - files := []string{svc} // the svc represented in a list - if obj.Startup == "enabled" { - _, _, err = conn.EnableUnitFilesContext(ctx, files, false, true) - } else if obj.Startup == "disabled" { - _, err = conn.DisableUnitFilesContext(ctx, files, false) - } - if err != nil { - return false, errwrap.Wrapf(err, "unable to change startup status") + + if !startupOK && obj.Startup != "" { + files := []string{svc} // the svc represented in a list + if obj.Startup == "enabled" { + _, _, err = conn.EnableUnitFilesContext(ctx, files, false, true) + } else if obj.Startup == "disabled" { + _, err = conn.DisableUnitFilesContext(ctx, files, false) + } else { + // pass + } + if err != nil { + return false, errwrap.Wrapf(err, "unable to change startup status") + } + if obj.Startup == "enabled" { + obj.init.Logf("service enabled") + } else if obj.Startup == "disabled" { + obj.init.Logf("service disabled") + } } // XXX: do we need to use a buffered channel here? result := make(chan string, 1) // catch result information + defer close(result) var status string + var ok bool - if !stateOK { + if !stateOK && obj.State != "" { if obj.State == "running" { _, err = conn.StartUnitContext(ctx, svc, SystemdUnitModeFail, result) } else if obj.State == "stopped" { _, err = conn.StopUnitContext(ctx, svc, SystemdUnitModeFail, result) + } else { // skip through this section + // TODO: should we do anything here instead? + result <- "" // chan is buffered, so won't block } if err != nil { return false, errwrap.Wrapf(err, "unable to change running status") @@ -394,30 +485,48 @@ func (obj *SvcRes) CheckApply(ctx context.Context, apply bool) (bool, error) { // TODO: Do we need a timeout here? select { - case status = <-result: + case status, ok = <-result: + if !ok { + return false, fmt.Errorf("unexpected closed channel during start/stop") + } + case <-ctx.Done(): return false, ctx.Err() } - if &status == nil { - return false, fmt.Errorf("systemd service action result is nil") - } + switch status { - case SystemdUnitResultDone: + case "": // pass + + case SystemdUnitResultDone: + if obj.State == "running" { + obj.init.Logf("service started") + } else if obj.State == "stopped" { + obj.init.Logf("service stopped") + } + + case SystemdUnitResultCanceled: + // TODO: should this be context.Canceled? + return false, fmt.Errorf("operation cancelled") + + case SystemdUnitResultTimeout: + return false, fmt.Errorf("operation timed out") + case SystemdUnitResultFailed: return false, fmt.Errorf("svc failed (selinux?)") + default: - return false, fmt.Errorf("unknown systemd return string: %v", status) + return false, fmt.Errorf("unknown systemd return string: %s", status) } } - // XXX: also set enabled on boot - if !refresh { // Do we need to reload the service? return false, nil // success } - obj.init.Logf("reloading...") + if obj.init.Debug { + obj.init.Logf("reloading...") + } // From: https://www.freedesktop.org/software/systemd/man/latest/org.freedesktop.systemd1.html // If a service is restarted that isn't running, it will be started @@ -430,15 +539,32 @@ func (obj *SvcRes) CheckApply(ctx context.Context, apply bool) (bool, error) { // TODO: Do we need a timeout here? select { - case status = <-result: + case status, ok = <-result: + if !ok { + return false, fmt.Errorf("unexpected closed channel during reload") + } + case <-ctx.Done(): return false, ctx.Err() } + switch status { - case SystemdUnitResultDone: + case "": // pass + + case SystemdUnitResultDone: + obj.init.Logf("service reloaded") + + case SystemdUnitResultCanceled: + // TODO: should this be context.Canceled? + return false, fmt.Errorf("operation cancelled") + + case SystemdUnitResultTimeout: + return false, fmt.Errorf("operation timed out") + case SystemdUnitResultFailed: return false, fmt.Errorf("svc reload failed (selinux?)") + default: return false, fmt.Errorf("unknown systemd return string: %v", status) } @@ -565,10 +691,13 @@ func (obj *SvcResAutoEdgesCron) Test([]bool) bool { func (obj *SvcRes) AutoEdges() (engine.AutoEdge, error) { var data []engine.ResUID var svcFiles []string + + svc := obj.svc() // systemd name + svcFiles = []string{ // root svc - fmt.Sprintf("/etc/systemd/system/%s.service", obj.Name()), // takes precedence - fmt.Sprintf("/usr/lib/systemd/system/%s.service", obj.Name()), // pkg default + fmt.Sprintf("/etc/systemd/system/%s", svc), // takes precedence + fmt.Sprintf("/usr/lib/systemd/system/%s", svc), // pkg default } if obj.Session { // user svc @@ -580,7 +709,7 @@ func (obj *SvcRes) AutoEdges() (engine.AutoEdge, error) { return nil, fmt.Errorf("user has no home directory") } svcFiles = []string{ - path.Join(u.HomeDir, "/.config/systemd/user/", fmt.Sprintf("%s.service", obj.Name())), + path.Join(u.HomeDir, "/.config/systemd/user/", svc), } } for _, x := range svcFiles { @@ -602,7 +731,7 @@ func (obj *SvcRes) AutoEdges() (engine.AutoEdge, error) { } cronEdge := &SvcResAutoEdgesCron{ session: obj.Session, - unit: fmt.Sprintf("%s.service", obj.Name()), + unit: svc, } return engineUtil.AutoEdgeCombiner(fileEdge, cronEdge)