engine: DBus cleanup

This commit is contained in:
Jonathan Gold
2018-05-05 10:26:29 -04:00
committed by James Shubin
parent a407771eaf
commit 299080f590
3 changed files with 78 additions and 32 deletions

View File

@@ -35,8 +35,9 @@ func init() {
} }
const ( const (
hostname1Path = "/org/freedesktop/hostname1" hostname1Path = "/org/freedesktop/hostname1"
hostname1Iface = "org.freedesktop.hostname1" hostname1Iface = "org.freedesktop.hostname1"
dbusPropertiesIface = "org.freedesktop.DBus.Properties"
) )
// ErrResourceInsufficientParameters is returned when the configuration of the // ErrResourceInsufficientParameters is returned when the configuration of the
@@ -112,12 +113,16 @@ func (obj *HostnameRes) Watch() error {
return errwrap.Wrap(err, "Failed to connect to bus") return errwrap.Wrap(err, "Failed to connect to bus")
} }
defer bus.Close() defer bus.Close()
callResult := bus.BusObject().Call( // watch the PropertiesChanged signal on the hostname1 dbus path
engineUtil.DBusAddMatch, 0, args := fmt.Sprintf(
fmt.Sprintf("type='signal',path='%s',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged'", hostname1Path)) "type='signal', path='%s', interface='%s', member='PropertiesChanged'",
if callResult.Err != nil { hostname1Path,
return errwrap.Wrap(callResult.Err, "Failed to subscribe to DBus events for hostname1") dbusPropertiesIface,
)
if call := bus.BusObject().Call(engineUtil.DBusAddMatch, 0, args); call.Err != nil {
return errwrap.Wrap(call.Err, "Failed to subscribe to DBus events for hostname1")
} }
defer bus.BusObject().Call(engineUtil.DBusRemoveMatch, 0, args) // ignore the error
signals := make(chan *dbus.Signal, 10) // closed by dbus package signals := make(chan *dbus.Signal, 10) // closed by dbus package
bus.Signal(signals) bus.Signal(signals)

View File

@@ -38,9 +38,9 @@ import (
const ( const (
running = "running" running = "running"
stopped = "stopped" stopped = "stopped"
dbusInterface = "org.freedesktop.machine1.Manager" dbusMachine1Iface = "org.freedesktop.machine1.Manager"
machineNew = "org.freedesktop.machine1.Manager.MachineNew" machineNew = dbusMachine1Iface + ".MachineNew"
machineRemoved = "org.freedesktop.machine1.Manager.MachineRemoved" machineRemoved = dbusMachine1Iface + ".MachineRemoved"
nspawnServiceTmpl = "systemd-nspawn@%s" nspawnServiceTmpl = "systemd-nspawn@%s"
) )
@@ -155,13 +155,12 @@ func (obj *NspawnRes) Watch() error {
defer bus.Close() defer bus.Close()
// add a match rule to match messages going through the message bus // add a match rule to match messages going through the message bus
call := bus.BusObject().Call(engineUtil.DBusAddMatch, 0, args := fmt.Sprintf("type='signal',interface='%s',eavesdrop='true'", dbusMachine1Iface)
fmt.Sprintf("type='signal',interface='%s',eavesdrop='true'", if call := bus.BusObject().Call(engineUtil.DBusAddMatch, 0, args); call.Err != nil {
dbusInterface))
// <-call.Done
if err := call.Err; err != nil {
return err return err
} }
defer bus.BusObject().Call(engineUtil.DBusRemoveMatch, 0, args) // ignore the error
busChan := make(chan *dbus.Signal) busChan := make(chan *dbus.Signal)
defer close(busChan) defer close(busChan)
bus.Signal(busChan) bus.Signal(busChan)

View File

@@ -29,6 +29,7 @@ import (
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
"github.com/godbus/dbus" "github.com/godbus/dbus"
multierr "github.com/hashicorp/go-multierror"
errwrap "github.com/pkg/errors" errwrap "github.com/pkg/errors"
) )
@@ -186,34 +187,50 @@ func (obj *Conn) Close() error {
} }
// internal helper to add signal matches to the bus, should only be called once // internal helper to add signal matches to the bus, should only be called once
func (obj *Conn) matchSignal(ch chan *dbus.Signal, path dbus.ObjectPath, iface string, signals []string) error { func (obj *Conn) matchSignal(ch chan *dbus.Signal, path dbus.ObjectPath, iface string, signals []string) (func() error, error) {
if obj.Debug { if obj.Debug {
obj.Logf("matchSignal(%v, %v, %s, %v)", ch, path, iface, signals) obj.Logf("matchSignal(%v, %v, %s, %v)", ch, path, iface, signals)
} }
// eg: gdbus monitor --system --dest org.freedesktop.PackageKit --object-path /org/freedesktop/PackageKit | grep <signal> // eg: gdbus monitor --system --dest org.freedesktop.PackageKit --object-path /org/freedesktop/PackageKit | grep <signal>
var call *dbus.Call bus := obj.GetBus().BusObject()
var argsList []string
// cleanup function should be called when done or when AddMatch errors
removeSignals := func() error {
var errList error
for i := len(argsList) - 1; i >= 0; i-- { // last in first out
if call := bus.Call(engineUtil.DBusRemoveMatch, 0, argsList[i]); call.Err != nil {
errList = multierr.Append(errList, call.Err)
}
}
return errList
}
// TODO: if we make this call many times, we seem to receive signals // TODO: if we make this call many times, we seem to receive signals
// that many times... Maybe this should be an object singleton? // that many times... Maybe this should be an object singleton?
bus := obj.GetBus().BusObject() var call *dbus.Call
pathStr := fmt.Sprintf("%s", path) pathStr := fmt.Sprintf("%s", path)
if len(signals) == 0 { if len(signals) == 0 {
call = bus.Call(engineUtil.DBusAddMatch, 0, "type='signal',path='"+pathStr+"',interface='"+iface+"'") args := fmt.Sprintf("type='signal', path='%s', interface='%s'", pathStr, iface)
argsList = append(argsList, args)
call = bus.Call(engineUtil.DBusAddMatch, 0, args)
} else { } else {
for _, signal := range signals { for _, signal := range signals {
call = bus.Call(engineUtil.DBusAddMatch, 0, "type='signal',path='"+pathStr+"',interface='"+iface+"',member='"+signal+"'") args := fmt.Sprintf("type='signal', path='%s', interface='%s', member'%s'", pathStr, iface, signal)
if call.Err != nil { argsList = append(argsList, args)
break if call = bus.Call(engineUtil.DBusAddMatch, 0, args); call.Err != nil {
break // fail if any one fails
} }
} }
} }
if call.Err != nil { if call.Err != nil {
return call.Err defer removeSignals() // ignore the error
return nil, call.Err
} }
// The caller has to make sure that ch is sufficiently buffered; if a // The caller has to make sure that ch is sufficiently buffered; if a
// message arrives when a write to c is not possible, it is discarded! // message arrives when a write to c is not possible, it is discarded!
// This can be disastrous if we're waiting for a "Finished" signal! // This can be disastrous if we're waiting for a "Finished" signal!
obj.GetBus().Signal(ch) obj.GetBus().Signal(ch)
return nil return removeSignals, nil
} }
// WatchChanges gets a signal anytime an event happens. // WatchChanges gets a signal anytime an event happens.
@@ -223,11 +240,12 @@ func (obj *Conn) WatchChanges() (chan *dbus.Signal, error) {
// but with much less specificity. If we're missing events, report the // but with much less specificity. If we're missing events, report the
// issue upstream! The UpdatesChanged signal is what hughsie suggested // issue upstream! The UpdatesChanged signal is what hughsie suggested
var signal = "UpdatesChanged" var signal = "UpdatesChanged"
err := obj.matchSignal(ch, PkPath, PkIface, []string{signal}) removeSignals, err := obj.matchSignal(ch, PkPath, PkIface, []string{signal})
if err != nil { if err != nil {
return nil, err return nil, err
} }
if Paranoid { // TODO: this filtering might not be necessary anymore... defer removeSignals() // ignore the error
if Paranoid { // TODO: this filtering might not be necessary anymore...
// try to handle the filtering inside this function! // try to handle the filtering inside this function!
rch := make(chan *dbus.Signal) rch := make(chan *dbus.Signal)
go func() { go func() {
@@ -286,7 +304,11 @@ func (obj *Conn) ResolvePackages(packages []string, filter uint64) ([]string, er
// add signal matches for Package and Finished which will always be last // add signal matches for Package and Finished which will always be last
var signals = []string{"Package", "Finished", "Error", "Destroy"} var signals = []string{"Package", "Finished", "Error", "Destroy"}
obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals) removeSignals, err := obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals)
if err != nil {
return nil, err
}
defer removeSignals()
if obj.Debug { if obj.Debug {
obj.Logf("ResolvePackages(): Object(%s, %v)", PkIface, interfacePath) obj.Logf("ResolvePackages(): Object(%s, %v)", PkIface, interfacePath)
} }
@@ -396,7 +418,11 @@ func (obj *Conn) InstallPackages(packageIDs []string, transactionFlags uint64) e
} }
var signals = []string{"Package", "ErrorCode", "Finished", "Destroy"} // "ItemProgress", "Status" ? var signals = []string{"Package", "ErrorCode", "Finished", "Destroy"} // "ItemProgress", "Status" ?
obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals) removeSignals, err := obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals)
if err != nil {
return err
}
defer removeSignals()
bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path
call := bus.Call(FmtTransactionMethod("RefreshCache"), 0, false) call := bus.Call(FmtTransactionMethod("RefreshCache"), 0, false)
@@ -454,7 +480,11 @@ func (obj *Conn) RemovePackages(packageIDs []string, transactionFlags uint64) er
} }
var signals = []string{"Package", "ErrorCode", "Finished", "Destroy"} // "ItemProgress", "Status" ? var signals = []string{"Package", "ErrorCode", "Finished", "Destroy"} // "ItemProgress", "Status" ?
obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals) removeSignals, err := obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals)
if err != nil {
return err
}
defer removeSignals()
bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path
call := bus.Call(FmtTransactionMethod("RemovePackages"), 0, transactionFlags, packageIDs, allowDeps, autoremove) call := bus.Call(FmtTransactionMethod("RemovePackages"), 0, transactionFlags, packageIDs, allowDeps, autoremove)
@@ -499,7 +529,11 @@ func (obj *Conn) UpdatePackages(packageIDs []string, transactionFlags uint64) er
} }
var signals = []string{"Package", "ErrorCode", "Finished", "Destroy"} // "ItemProgress", "Status" ? var signals = []string{"Package", "ErrorCode", "Finished", "Destroy"} // "ItemProgress", "Status" ?
obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals) removeSignals, err := obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals)
if err != nil {
return err
}
defer removeSignals()
bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path
call := bus.Call(FmtTransactionMethod("UpdatePackages"), 0, transactionFlags, packageIDs) call := bus.Call(FmtTransactionMethod("UpdatePackages"), 0, transactionFlags, packageIDs)
@@ -544,7 +578,11 @@ func (obj *Conn) GetFilesByPackageID(packageIDs []string) (files map[string][]st
} }
var signals = []string{"Files", "ErrorCode", "Finished", "Destroy"} // "ItemProgress", "Status" ? var signals = []string{"Files", "ErrorCode", "Finished", "Destroy"} // "ItemProgress", "Status" ?
obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals) removeSignals, err := obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals)
if err != nil {
return
}
defer removeSignals()
bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path
call := bus.Call(FmtTransactionMethod("GetFiles"), 0, packageIDs) call := bus.Call(FmtTransactionMethod("GetFiles"), 0, packageIDs)
@@ -611,7 +649,11 @@ func (obj *Conn) GetUpdates(filter uint64) ([]string, error) {
} }
var signals = []string{"Package", "ErrorCode", "Finished", "Destroy"} // "ItemProgress" ? var signals = []string{"Package", "ErrorCode", "Finished", "Destroy"} // "ItemProgress" ?
obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals) removeSignals, err := obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals)
if err != nil {
return nil, err
}
defer removeSignals()
bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path
call := bus.Call(FmtTransactionMethod("GetUpdates"), 0, filter) call := bus.Call(FmtTransactionMethod("GetUpdates"), 0, filter)