diff --git a/engine/resources/hostname.go b/engine/resources/hostname.go index 7a2b42b3..15e8c902 100644 --- a/engine/resources/hostname.go +++ b/engine/resources/hostname.go @@ -35,8 +35,9 @@ func init() { } const ( - hostname1Path = "/org/freedesktop/hostname1" - hostname1Iface = "org.freedesktop.hostname1" + hostname1Path = "/org/freedesktop/hostname1" + hostname1Iface = "org.freedesktop.hostname1" + dbusPropertiesIface = "org.freedesktop.DBus.Properties" ) // ErrResourceInsufficientParameters is returned when the configuration of the @@ -112,12 +113,16 @@ func (obj *HostnameRes) Watch() error { return errwrap.Wrap(err, "Failed to connect to bus") } defer bus.Close() - callResult := bus.BusObject().Call( - engineUtil.DBusAddMatch, 0, - fmt.Sprintf("type='signal',path='%s',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged'", hostname1Path)) - if callResult.Err != nil { - return errwrap.Wrap(callResult.Err, "Failed to subscribe to DBus events for hostname1") + // watch the PropertiesChanged signal on the hostname1 dbus path + args := fmt.Sprintf( + "type='signal', path='%s', interface='%s', member='PropertiesChanged'", + hostname1Path, + dbusPropertiesIface, + ) + if call := bus.BusObject().Call(engineUtil.DBusAddMatch, 0, args); call.Err != nil { + return errwrap.Wrap(call.Err, "Failed to subscribe to DBus events for hostname1") } + defer bus.BusObject().Call(engineUtil.DBusRemoveMatch, 0, args) // ignore the error signals := make(chan *dbus.Signal, 10) // closed by dbus package bus.Signal(signals) diff --git a/engine/resources/nspawn.go b/engine/resources/nspawn.go index df8c4cff..3171c9f6 100644 --- a/engine/resources/nspawn.go +++ b/engine/resources/nspawn.go @@ -38,9 +38,9 @@ import ( const ( running = "running" stopped = "stopped" - dbusInterface = "org.freedesktop.machine1.Manager" - machineNew = "org.freedesktop.machine1.Manager.MachineNew" - machineRemoved = "org.freedesktop.machine1.Manager.MachineRemoved" + dbusMachine1Iface = "org.freedesktop.machine1.Manager" + machineNew = dbusMachine1Iface + ".MachineNew" + machineRemoved = dbusMachine1Iface + ".MachineRemoved" nspawnServiceTmpl = "systemd-nspawn@%s" ) @@ -155,13 +155,12 @@ func (obj *NspawnRes) Watch() error { defer bus.Close() // add a match rule to match messages going through the message bus - call := bus.BusObject().Call(engineUtil.DBusAddMatch, 0, - fmt.Sprintf("type='signal',interface='%s',eavesdrop='true'", - dbusInterface)) - // <-call.Done - if err := call.Err; err != nil { + args := fmt.Sprintf("type='signal',interface='%s',eavesdrop='true'", dbusMachine1Iface) + if call := bus.BusObject().Call(engineUtil.DBusAddMatch, 0, args); call.Err != nil { return err } + defer bus.BusObject().Call(engineUtil.DBusRemoveMatch, 0, args) // ignore the error + busChan := make(chan *dbus.Signal) defer close(busChan) bus.Signal(busChan) diff --git a/engine/resources/packagekit/packagekit.go b/engine/resources/packagekit/packagekit.go index cc76212a..d837a4bc 100644 --- a/engine/resources/packagekit/packagekit.go +++ b/engine/resources/packagekit/packagekit.go @@ -29,6 +29,7 @@ import ( "github.com/purpleidea/mgmt/util" "github.com/godbus/dbus" + multierr "github.com/hashicorp/go-multierror" errwrap "github.com/pkg/errors" ) @@ -186,34 +187,50 @@ func (obj *Conn) Close() error { } // internal helper to add signal matches to the bus, should only be called once -func (obj *Conn) matchSignal(ch chan *dbus.Signal, path dbus.ObjectPath, iface string, signals []string) error { +func (obj *Conn) matchSignal(ch chan *dbus.Signal, path dbus.ObjectPath, iface string, signals []string) (func() error, error) { if obj.Debug { obj.Logf("matchSignal(%v, %v, %s, %v)", ch, path, iface, signals) } // eg: gdbus monitor --system --dest org.freedesktop.PackageKit --object-path /org/freedesktop/PackageKit | grep - var call *dbus.Call + bus := obj.GetBus().BusObject() + var argsList []string + // cleanup function should be called when done or when AddMatch errors + removeSignals := func() error { + var errList error + for i := len(argsList) - 1; i >= 0; i-- { // last in first out + if call := bus.Call(engineUtil.DBusRemoveMatch, 0, argsList[i]); call.Err != nil { + errList = multierr.Append(errList, call.Err) + } + } + return errList + } // TODO: if we make this call many times, we seem to receive signals // that many times... Maybe this should be an object singleton? - bus := obj.GetBus().BusObject() + var call *dbus.Call pathStr := fmt.Sprintf("%s", path) if len(signals) == 0 { - call = bus.Call(engineUtil.DBusAddMatch, 0, "type='signal',path='"+pathStr+"',interface='"+iface+"'") + args := fmt.Sprintf("type='signal', path='%s', interface='%s'", pathStr, iface) + argsList = append(argsList, args) + call = bus.Call(engineUtil.DBusAddMatch, 0, args) } else { for _, signal := range signals { - call = bus.Call(engineUtil.DBusAddMatch, 0, "type='signal',path='"+pathStr+"',interface='"+iface+"',member='"+signal+"'") - if call.Err != nil { - break + args := fmt.Sprintf("type='signal', path='%s', interface='%s', member'%s'", pathStr, iface, signal) + argsList = append(argsList, args) + if call = bus.Call(engineUtil.DBusAddMatch, 0, args); call.Err != nil { + break // fail if any one fails } } } if call.Err != nil { - return call.Err + defer removeSignals() // ignore the error + return nil, call.Err } + // The caller has to make sure that ch is sufficiently buffered; if a // message arrives when a write to c is not possible, it is discarded! // This can be disastrous if we're waiting for a "Finished" signal! obj.GetBus().Signal(ch) - return nil + return removeSignals, nil } // WatchChanges gets a signal anytime an event happens. @@ -223,11 +240,12 @@ func (obj *Conn) WatchChanges() (chan *dbus.Signal, error) { // but with much less specificity. If we're missing events, report the // issue upstream! The UpdatesChanged signal is what hughsie suggested var signal = "UpdatesChanged" - err := obj.matchSignal(ch, PkPath, PkIface, []string{signal}) + removeSignals, err := obj.matchSignal(ch, PkPath, PkIface, []string{signal}) if err != nil { return nil, err } - if Paranoid { // TODO: this filtering might not be necessary anymore... + defer removeSignals() // ignore the error + if Paranoid { // TODO: this filtering might not be necessary anymore... // try to handle the filtering inside this function! rch := make(chan *dbus.Signal) go func() { @@ -286,7 +304,11 @@ func (obj *Conn) ResolvePackages(packages []string, filter uint64) ([]string, er // add signal matches for Package and Finished which will always be last var signals = []string{"Package", "Finished", "Error", "Destroy"} - obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals) + removeSignals, err := obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals) + if err != nil { + return nil, err + } + defer removeSignals() if obj.Debug { obj.Logf("ResolvePackages(): Object(%s, %v)", PkIface, interfacePath) } @@ -396,7 +418,11 @@ func (obj *Conn) InstallPackages(packageIDs []string, transactionFlags uint64) e } var signals = []string{"Package", "ErrorCode", "Finished", "Destroy"} // "ItemProgress", "Status" ? - obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals) + removeSignals, err := obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals) + if err != nil { + return err + } + defer removeSignals() bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path call := bus.Call(FmtTransactionMethod("RefreshCache"), 0, false) @@ -454,7 +480,11 @@ func (obj *Conn) RemovePackages(packageIDs []string, transactionFlags uint64) er } var signals = []string{"Package", "ErrorCode", "Finished", "Destroy"} // "ItemProgress", "Status" ? - obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals) + removeSignals, err := obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals) + if err != nil { + return err + } + defer removeSignals() bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path call := bus.Call(FmtTransactionMethod("RemovePackages"), 0, transactionFlags, packageIDs, allowDeps, autoremove) @@ -499,7 +529,11 @@ func (obj *Conn) UpdatePackages(packageIDs []string, transactionFlags uint64) er } var signals = []string{"Package", "ErrorCode", "Finished", "Destroy"} // "ItemProgress", "Status" ? - obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals) + removeSignals, err := obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals) + if err != nil { + return err + } + defer removeSignals() bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path call := bus.Call(FmtTransactionMethod("UpdatePackages"), 0, transactionFlags, packageIDs) @@ -544,7 +578,11 @@ func (obj *Conn) GetFilesByPackageID(packageIDs []string) (files map[string][]st } var signals = []string{"Files", "ErrorCode", "Finished", "Destroy"} // "ItemProgress", "Status" ? - obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals) + removeSignals, err := obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals) + if err != nil { + return + } + defer removeSignals() bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path call := bus.Call(FmtTransactionMethod("GetFiles"), 0, packageIDs) @@ -611,7 +649,11 @@ func (obj *Conn) GetUpdates(filter uint64) ([]string, error) { } var signals = []string{"Package", "ErrorCode", "Finished", "Destroy"} // "ItemProgress" ? - obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals) + removeSignals, err := obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals) + if err != nil { + return nil, err + } + defer removeSignals() bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path call := bus.Call(FmtTransactionMethod("GetUpdates"), 0, filter)