engine: resources: Massively refactor the svc

This was a long time coming, but now it looks to be done. It was kind of
meant as low-hanging fruit for some interested student, but in the end I
got to it first.
This commit is contained in:
James Shubin
2025-09-15 04:01:15 -04:00
parent ec48a6944c
commit ff1581be87

View File

@@ -36,6 +36,7 @@ import (
"fmt" "fmt"
"os/user" "os/user"
"path" "path"
"time"
"github.com/purpleidea/mgmt/engine" "github.com/purpleidea/mgmt/engine"
"github.com/purpleidea/mgmt/engine/traits" "github.com/purpleidea/mgmt/engine/traits"
@@ -139,21 +140,27 @@ func (obj *SvcRes) Cleanup() error {
return nil return nil
} }
// svc is a helper that returns the systemd name.
func (obj *SvcRes) svc() string {
return fmt.Sprintf("%s.service", obj.Name())
}
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *SvcRes) Watch(ctx context.Context) error { func (obj *SvcRes) Watch(ctx context.Context) error {
// obj.Name: svc name
if !systemdUtil.IsRunningSystemd() { if !systemdUtil.IsRunningSystemd() {
return fmt.Errorf("systemd is not running") return fmt.Errorf("systemd is not running")
} }
ctx, cancel := context.WithCancel(ctx)
defer cancel() // make sure we always close any below ctx just in case!
var conn *systemd.Conn var conn *systemd.Conn
var bus *dbus.Conn
var err error var err error
if obj.Session { if obj.Session {
conn, err = systemd.NewUserConnection() // user session conn, err = systemd.NewUserConnectionContext(ctx) // user session
} else { } else {
// we want NewSystemConnection but New falls back to this // we want NewSystemConnectionContext but New... falls back to this
conn, err = systemd.New() // needs root access conn, err = systemd.NewWithContext(ctx) // needs root access
} }
if err != nil { if err != nil {
return errwrap.Wrapf(err, "failed to connect to systemd") return errwrap.Wrapf(err, "failed to connect to systemd")
@@ -161,6 +168,7 @@ func (obj *SvcRes) Watch(ctx context.Context) error {
defer conn.Close() defer conn.Close()
// if we share the bus with others, we will get each others messages!! // if we share the bus with others, we will get each others messages!!
var bus *dbus.Conn
if obj.Session { if obj.Session {
bus, err = util.SessionBusPrivateUsable() bus, err = util.SessionBusPrivateUsable()
} else { } else {
@@ -171,123 +179,178 @@ func (obj *SvcRes) Watch(ctx context.Context) error {
} }
defer bus.Close() defer bus.Close()
// XXX: will this detect new units? // NOTE: I guess it's not the worst-case scenario if we drop signal or
bus.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, // if it fills up and we block. Whichever way the upstream implements it
"type='signal',interface='org.freedesktop.systemd1.Manager',member='Reloading'") // we'll have a back log of signals to loop through which is just fine.
buschan := make(chan *dbus.Signal, 10) chBus := make(chan *dbus.Signal, 10) // TODO: what size if any?
defer close(buschan) // NOTE: closing a chan that contains a value is ok defer close(chBus) // NOTE: closing a chan that contains a value is ok
bus.Signal(buschan) bus.Signal(chBus)
defer bus.RemoveSignal(buschan) // not needed here, but nice for symmetry defer bus.RemoveSignal(chBus) // not needed here, but nice for symmetry
// Legacy way to do this matching...
//method := "org.freedesktop.DBus.AddMatch"
//flags := dbus.Flags(0)
//args := []interface{}{"type='signal',interface='org.freedesktop.systemd1.Manager',member='Reloading'"}
//call := bus.BusObject().CallWithContext(ctx, method, flags, args...) // *dbus.Call
//if err := call.Err; err != nil {
// return errwrap.Wrapf(err, "failed to connect signal on bus")
//}
matchOptions := []dbus.MatchOption{
dbus.WithMatchInterface("org.freedesktop.systemd1.Manager"),
dbus.WithMatchMember("Reloading"),
}
if err := bus.AddMatchSignalContext(ctx, matchOptions...); err != nil {
return errwrap.Wrapf(err, "failed to add match signal on bus")
}
defer func() {
// On shutdown, we prefer to give this a chance to run. If we
// use the main ctx, then it will error because ctx cancelled.
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
defer cancel()
if err := bus.RemoveMatchSignalContext(ctx, matchOptions...); err != nil {
obj.init.Logf("failed to remove match signal on bus: %+v", err)
}
}()
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var svc = fmt.Sprintf("%s.service", obj.Name()) // systemd name svc := obj.svc() // systemd name
var send = false // send event?
var invalid = false // does the svc exist or not?
var previous bool // previous invalid value
// TODO: do we first need to call conn.Subscribe() ?
set := conn.NewSubscriptionSet() // no error should be returned set := conn.NewSubscriptionSet() // no error should be returned
subChannel, subErrors := set.Subscribe() // XXX: dynamic bugs: https://github.com/coreos/go-systemd/issues/474
//defer close(subChannel) // cannot close receive-only channel set.Add(svc) // it's okay if the svc doesn't exist yet
//defer close(subErrors) // cannot close receive-only channel chSub, chSubErr := set.Subscribe()
var activeSet = false //defer close(chSub) // cannot close receive-only channel
//defer close(chSubErr) // cannot close receive-only channel
//chSubClosed := false
//chSubErrClosed := false
for { for {
// XXX: watch for an event for new units... //if chSubClosed && chSubErrClosed {
// XXX: detect if startup enabled/disabled value changes... //
//}
previous = invalid
invalid = false
// firstly, does svc even exist or not?
loadstate, err := conn.GetUnitPropertyContext(ctx, svc, "LoadState")
if err != nil {
obj.init.Logf("failed to get property: %+v", err)
invalid = true
}
if !invalid {
var notFound = (loadstate.Value == dbus.MakeVariant("not-found"))
if notFound { // XXX: in the loop we'll handle changes better...
obj.init.Logf("failed to find svc")
invalid = true // XXX: ?
}
}
if previous != invalid { // if invalid changed, send signal
send = true
}
if invalid {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("waiting for service") // waiting for svc to appear... obj.init.Logf("watching...")
} }
if activeSet {
activeSet = false
set.Remove(svc) // no return value should ever occur
}
select { select {
case <-buschan: // XXX: wait for new units event to unstick
// loop so that we can see the changed invalid signal
obj.init.Logf("daemon reload")
case <-ctx.Done(): // closed by the engine to signal shutdown case sig, ok := <-chBus:
return nil if !ok {
chBus = nil
return fmt.Errorf("unexpected close") // we close this one!
} }
} else { if obj.init.Debug {
if !activeSet { obj.init.Logf("sig: %+v", sig)
activeSet = true
set.Add(svc) // no return value should ever occur
} }
//obj.init.Logf("watching...") // attempting to watch... // This event happens if we `systemctl daemon-reload` or
select { // if `systemctl enable/disable <svc>` is run. For both
case event := <-subChannel: // of these situations we seem to always get two events.
// The first seems to have `Body:[true]`, and the second
// has `Body:[false]`.
// https://pkg.go.dev/github.com/godbus/dbus/v5#Signal
//eg: &{Sender::1.287 Path:/org/freedesktop/systemd1 Name:org.freedesktop.systemd1.Manager.Reloading Body:[false] Sequence:7}
if sig.Name != "org.freedesktop.systemd1.Manager.Reloading" {
// not for us
continue
}
if len(sig.Body) == 0 {
// does this ever happen? send a signal for now
obj.init.Logf("daemon reload with empty body")
break // break out of select and send event now
}
if len(sig.Body) > 1 {
// does this ever happen? send a signal for now
obj.init.Logf("daemon reload with big body")
break // break out of select and send event now
}
b, ok := sig.Body[0].(bool)
if !ok {
// does this ever happen? send a signal for now
obj.init.Logf("daemon reload with badly typed body")
break // break out of select and send event now
}
// We do all of this annoying parsing to cut our event
// count by half, since these signals seem to come in
// pairs. We skip the "true" one that comes first.
if b {
if obj.init.Debug {
obj.init.Logf("skipping daemon-reload start")
}
continue
}
if obj.init.Debug {
obj.init.Logf("daemon reload") // success!
}
case event, ok := <-chSub:
if !ok {
chSub = nil
//chSubClosed = true
continue
}
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("event: %+v", event) obj.init.Logf("event: %+v", event)
} }
// NOTE: the value returned is a map for some reason...
if event[svc] != nil {
// event[svc].ActiveState is not nil
switch event[svc].ActiveState { // The value returned is a map in case we monitor many.
unitStatus, ok := event[svc]
if !ok { // not me
continue
}
if unitStatus == nil {
if obj.init.Debug {
obj.init.Logf("service stopped")
}
break // break out of select and send event now
}
msg := ""
switch event[svc].ActiveState { // string
case "active": case "active":
obj.init.Logf("event: started") msg = "service started"
case "inactive": case "inactive":
obj.init.Logf("event: stopped") msg = "service stopped"
case "reloading": case "reloading":
obj.init.Logf("event: reloading") msg = "service reloading"
case "failed": case "failed":
obj.init.Logf("event: failed") msg = "service failed"
case "activating": case "activating":
obj.init.Logf("event: activating") msg = "service activating"
case "deactivating": case "deactivating":
obj.init.Logf("event: deactivating") msg = "service deactivating"
default: default:
return fmt.Errorf("unknown svc state: %s", event[svc].ActiveState) return fmt.Errorf("unknown service state: %s", event[svc].ActiveState)
} }
} else { if obj.init.Debug {
// svc stopped (and ActiveState is nil...) obj.init.Logf("%s", msg)
obj.init.Logf("event: stopped")
} }
send = true
case err := <-subErrors: case err, ok := <-chSubErr:
return errwrap.Wrapf(err, "unknown %s error", obj) if !ok {
chSubErr = nil
//chSubErrClosed = true
continue
}
if err == nil {
obj.init.Logf("unexpected nil error")
continue
}
return errwrap.Wrapf(err, "unknown error")
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return ctx.Err()
}
} }
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
}
} }
// CheckApply checks the resource state and applies the resource if the bool // CheckApply checks the resource state and applies the resource if the bool
@@ -297,50 +360,63 @@ func (obj *SvcRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
return false, fmt.Errorf("systemd is not running") return false, fmt.Errorf("systemd is not running")
} }
ctx, cancel := context.WithCancel(ctx)
defer cancel() // make sure we always close any below ctx just in case!
var conn *systemd.Conn var conn *systemd.Conn
var err error var err error
if obj.Session { if obj.Session {
conn, err = systemd.NewUserConnection() // user session conn, err = systemd.NewUserConnectionContext(ctx) // user session
} else { } else {
// we want NewSystemConnection but New falls back to this // we want NewSystemConnectionContext but New... falls back to this
conn, err = systemd.New() // needs root access conn, err = systemd.NewWithContext(ctx) // needs root access
} }
if err != nil { if err != nil {
return false, errwrap.Wrapf(err, "failed to connect to systemd") return false, errwrap.Wrapf(err, "failed to connect to systemd")
} }
defer conn.Close() defer conn.Close()
var svc = fmt.Sprintf("%s.service", obj.Name()) // systemd name // if we share the bus with others, we will get each others messages!!
//var bus *dbus.Conn
//if obj.Session {
// bus, err = util.SessionBusPrivateUsable()
//} else {
// bus, err = util.SystemBusPrivateUsable()
//}
//if err != nil {
// return errwrap.Wrapf(err, "failed to connect to bus")
//}
//defer bus.Close()
loadstate, err := conn.GetUnitPropertyContext(ctx, svc, "LoadState") svc := obj.svc() // systemd name
loadState, err := conn.GetUnitPropertyContext(ctx, svc, "LoadState")
if err != nil { if err != nil {
return false, errwrap.Wrapf(err, "failed to get load state") return false, errwrap.Wrapf(err, "failed to get load state")
} }
// NOTE: we have to compare variants with other variants, they are really strings... // NOTE: we have to compare variants with other variants, they are really strings...
var notFound = (loadstate.Value == dbus.MakeVariant("not-found")) notFound := (loadState.Value == dbus.MakeVariant("not-found"))
if notFound { if notFound {
return false, errwrap.Wrapf(err, "failed to find svc: %s", svc) return false, errwrap.Wrapf(err, "failed to find svc: %s", svc)
} }
// XXX: check svc "enabled at boot" or not status...
//conn.GetUnitPropertiesContexts(svc) //conn.GetUnitPropertiesContexts(svc)
activestate, err := conn.GetUnitPropertyContext(ctx, svc, "ActiveState") activeState, err := conn.GetUnitPropertyContext(ctx, svc, "ActiveState")
if err != nil { if err != nil {
return false, errwrap.Wrapf(err, "failed to get active state") return false, errwrap.Wrapf(err, "failed to get active state")
} }
var running = (activestate.Value == dbus.MakeVariant("active")) running := (activeState.Value == dbus.MakeVariant("active"))
var stateOK = ((obj.State == "") || (obj.State == "running" && running) || (obj.State == "stopped" && !running)) stateOK := ((obj.State == "") || (obj.State == "running" && running) || (obj.State == "stopped" && !running))
startupstate, err := conn.GetUnitPropertyContext(ctx, svc, "UnitFileState") startupState, err := conn.GetUnitPropertyContext(ctx, svc, "UnitFileState")
if err != nil { if err != nil {
return false, errwrap.Wrapf(err, "failed to get unit file state") return false, errwrap.Wrapf(err, "failed to get unit file state")
} }
enabled := (startupstate.Value == dbus.MakeVariant("enabled")) enabled := (startupState.Value == dbus.MakeVariant("enabled"))
disabled := (startupstate.Value == dbus.MakeVariant("disabled")) disabled := (startupState.Value == dbus.MakeVariant("disabled"))
startupOK := ((obj.Startup == "") || (obj.Startup == "enabled" && enabled) || (obj.Startup == "disabled" && disabled)) startupOK := ((obj.Startup == "") || (obj.Startup == "enabled" && enabled) || (obj.Startup == "disabled" && disabled))
// NOTE: if this svc resource is embedded as a composite resource inside // NOTE: if this svc resource is embedded as a composite resource inside
@@ -352,7 +428,7 @@ func (obj *SvcRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
// trait to the parent resource, or we'll panic when we call this line.) // trait to the parent resource, or we'll panic when we call this line.)
// It might not be recommended to use the Watch method without a thought // It might not be recommended to use the Watch method without a thought
// to what actually happens when we would run Send(), and other methods. // to what actually happens when we would run Send(), and other methods.
var refresh = obj.init.Refresh() // do we have a pending reload to apply? refresh := obj.init.Refresh() // do we have a pending reload to apply?
if stateOK && startupOK && !refresh { if stateOK && startupOK && !refresh {
return true, nil // we are in the correct state return true, nil // we are in the correct state
@@ -364,25 +440,40 @@ func (obj *SvcRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
} }
// apply portion // apply portion
if !startupOK && obj.Startup != "" {
files := []string{svc} // the svc represented in a list files := []string{svc} // the svc represented in a list
if obj.Startup == "enabled" { if obj.Startup == "enabled" {
_, _, err = conn.EnableUnitFilesContext(ctx, files, false, true) _, _, err = conn.EnableUnitFilesContext(ctx, files, false, true)
} else if obj.Startup == "disabled" { } else if obj.Startup == "disabled" {
_, err = conn.DisableUnitFilesContext(ctx, files, false) _, err = conn.DisableUnitFilesContext(ctx, files, false)
} else {
// pass
} }
if err != nil { if err != nil {
return false, errwrap.Wrapf(err, "unable to change startup status") return false, errwrap.Wrapf(err, "unable to change startup status")
} }
if obj.Startup == "enabled" {
obj.init.Logf("service enabled")
} else if obj.Startup == "disabled" {
obj.init.Logf("service disabled")
}
}
// XXX: do we need to use a buffered channel here? // XXX: do we need to use a buffered channel here?
result := make(chan string, 1) // catch result information result := make(chan string, 1) // catch result information
defer close(result)
var status string var status string
var ok bool
if !stateOK { if !stateOK && obj.State != "" {
if obj.State == "running" { if obj.State == "running" {
_, err = conn.StartUnitContext(ctx, svc, SystemdUnitModeFail, result) _, err = conn.StartUnitContext(ctx, svc, SystemdUnitModeFail, result)
} else if obj.State == "stopped" { } else if obj.State == "stopped" {
_, err = conn.StopUnitContext(ctx, svc, SystemdUnitModeFail, result) _, err = conn.StopUnitContext(ctx, svc, SystemdUnitModeFail, result)
} else { // skip through this section
// TODO: should we do anything here instead?
result <- "" // chan is buffered, so won't block
} }
if err != nil { if err != nil {
return false, errwrap.Wrapf(err, "unable to change running status") return false, errwrap.Wrapf(err, "unable to change running status")
@@ -394,30 +485,48 @@ func (obj *SvcRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
// TODO: Do we need a timeout here? // TODO: Do we need a timeout here?
select { select {
case status = <-result: case status, ok = <-result:
if !ok {
return false, fmt.Errorf("unexpected closed channel during start/stop")
}
case <-ctx.Done(): case <-ctx.Done():
return false, ctx.Err() return false, ctx.Err()
} }
if &status == nil {
return false, fmt.Errorf("systemd service action result is nil")
}
switch status { switch status {
case SystemdUnitResultDone: case "":
// pass // pass
case SystemdUnitResultFailed:
return false, fmt.Errorf("svc failed (selinux?)") case SystemdUnitResultDone:
default: if obj.State == "running" {
return false, fmt.Errorf("unknown systemd return string: %v", status) obj.init.Logf("service started")
} } else if obj.State == "stopped" {
obj.init.Logf("service stopped")
} }
// XXX: also set enabled on boot case SystemdUnitResultCanceled:
// TODO: should this be context.Canceled?
return false, fmt.Errorf("operation cancelled")
case SystemdUnitResultTimeout:
return false, fmt.Errorf("operation timed out")
case SystemdUnitResultFailed:
return false, fmt.Errorf("svc failed (selinux?)")
default:
return false, fmt.Errorf("unknown systemd return string: %s", status)
}
}
if !refresh { // Do we need to reload the service? if !refresh { // Do we need to reload the service?
return false, nil // success return false, nil // success
} }
if obj.init.Debug {
obj.init.Logf("reloading...") obj.init.Logf("reloading...")
}
// From: https://www.freedesktop.org/software/systemd/man/latest/org.freedesktop.systemd1.html // From: https://www.freedesktop.org/software/systemd/man/latest/org.freedesktop.systemd1.html
// If a service is restarted that isn't running, it will be started // If a service is restarted that isn't running, it will be started
@@ -430,15 +539,32 @@ func (obj *SvcRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
// TODO: Do we need a timeout here? // TODO: Do we need a timeout here?
select { select {
case status = <-result: case status, ok = <-result:
if !ok {
return false, fmt.Errorf("unexpected closed channel during reload")
}
case <-ctx.Done(): case <-ctx.Done():
return false, ctx.Err() return false, ctx.Err()
} }
switch status { switch status {
case SystemdUnitResultDone: case "":
// pass // pass
case SystemdUnitResultDone:
obj.init.Logf("service reloaded")
case SystemdUnitResultCanceled:
// TODO: should this be context.Canceled?
return false, fmt.Errorf("operation cancelled")
case SystemdUnitResultTimeout:
return false, fmt.Errorf("operation timed out")
case SystemdUnitResultFailed: case SystemdUnitResultFailed:
return false, fmt.Errorf("svc reload failed (selinux?)") return false, fmt.Errorf("svc reload failed (selinux?)")
default: default:
return false, fmt.Errorf("unknown systemd return string: %v", status) return false, fmt.Errorf("unknown systemd return string: %v", status)
} }
@@ -565,10 +691,13 @@ func (obj *SvcResAutoEdgesCron) Test([]bool) bool {
func (obj *SvcRes) AutoEdges() (engine.AutoEdge, error) { func (obj *SvcRes) AutoEdges() (engine.AutoEdge, error) {
var data []engine.ResUID var data []engine.ResUID
var svcFiles []string var svcFiles []string
svc := obj.svc() // systemd name
svcFiles = []string{ svcFiles = []string{
// root svc // root svc
fmt.Sprintf("/etc/systemd/system/%s.service", obj.Name()), // takes precedence fmt.Sprintf("/etc/systemd/system/%s", svc), // takes precedence
fmt.Sprintf("/usr/lib/systemd/system/%s.service", obj.Name()), // pkg default fmt.Sprintf("/usr/lib/systemd/system/%s", svc), // pkg default
} }
if obj.Session { if obj.Session {
// user svc // user svc
@@ -580,7 +709,7 @@ func (obj *SvcRes) AutoEdges() (engine.AutoEdge, error) {
return nil, fmt.Errorf("user has no home directory") return nil, fmt.Errorf("user has no home directory")
} }
svcFiles = []string{ svcFiles = []string{
path.Join(u.HomeDir, "/.config/systemd/user/", fmt.Sprintf("%s.service", obj.Name())), path.Join(u.HomeDir, "/.config/systemd/user/", svc),
} }
} }
for _, x := range svcFiles { for _, x := range svcFiles {
@@ -602,7 +731,7 @@ func (obj *SvcRes) AutoEdges() (engine.AutoEdge, error) {
} }
cronEdge := &SvcResAutoEdgesCron{ cronEdge := &SvcResAutoEdgesCron{
session: obj.Session, session: obj.Session,
unit: fmt.Sprintf("%s.service", obj.Name()), unit: svc,
} }
return engineUtil.AutoEdgeCombiner(fileEdge, cronEdge) return engineUtil.AutoEdgeCombiner(fileEdge, cronEdge)