engine: DBus cleanup
This commit is contained in:
committed by
James Shubin
parent
a407771eaf
commit
299080f590
@@ -35,8 +35,9 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
hostname1Path = "/org/freedesktop/hostname1"
|
hostname1Path = "/org/freedesktop/hostname1"
|
||||||
hostname1Iface = "org.freedesktop.hostname1"
|
hostname1Iface = "org.freedesktop.hostname1"
|
||||||
|
dbusPropertiesIface = "org.freedesktop.DBus.Properties"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ErrResourceInsufficientParameters is returned when the configuration of the
|
// ErrResourceInsufficientParameters is returned when the configuration of the
|
||||||
@@ -112,12 +113,16 @@ func (obj *HostnameRes) Watch() error {
|
|||||||
return errwrap.Wrap(err, "Failed to connect to bus")
|
return errwrap.Wrap(err, "Failed to connect to bus")
|
||||||
}
|
}
|
||||||
defer bus.Close()
|
defer bus.Close()
|
||||||
callResult := bus.BusObject().Call(
|
// watch the PropertiesChanged signal on the hostname1 dbus path
|
||||||
engineUtil.DBusAddMatch, 0,
|
args := fmt.Sprintf(
|
||||||
fmt.Sprintf("type='signal',path='%s',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged'", hostname1Path))
|
"type='signal', path='%s', interface='%s', member='PropertiesChanged'",
|
||||||
if callResult.Err != nil {
|
hostname1Path,
|
||||||
return errwrap.Wrap(callResult.Err, "Failed to subscribe to DBus events for hostname1")
|
dbusPropertiesIface,
|
||||||
|
)
|
||||||
|
if call := bus.BusObject().Call(engineUtil.DBusAddMatch, 0, args); call.Err != nil {
|
||||||
|
return errwrap.Wrap(call.Err, "Failed to subscribe to DBus events for hostname1")
|
||||||
}
|
}
|
||||||
|
defer bus.BusObject().Call(engineUtil.DBusRemoveMatch, 0, args) // ignore the error
|
||||||
|
|
||||||
signals := make(chan *dbus.Signal, 10) // closed by dbus package
|
signals := make(chan *dbus.Signal, 10) // closed by dbus package
|
||||||
bus.Signal(signals)
|
bus.Signal(signals)
|
||||||
|
|||||||
@@ -38,9 +38,9 @@ import (
|
|||||||
const (
|
const (
|
||||||
running = "running"
|
running = "running"
|
||||||
stopped = "stopped"
|
stopped = "stopped"
|
||||||
dbusInterface = "org.freedesktop.machine1.Manager"
|
dbusMachine1Iface = "org.freedesktop.machine1.Manager"
|
||||||
machineNew = "org.freedesktop.machine1.Manager.MachineNew"
|
machineNew = dbusMachine1Iface + ".MachineNew"
|
||||||
machineRemoved = "org.freedesktop.machine1.Manager.MachineRemoved"
|
machineRemoved = dbusMachine1Iface + ".MachineRemoved"
|
||||||
nspawnServiceTmpl = "systemd-nspawn@%s"
|
nspawnServiceTmpl = "systemd-nspawn@%s"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -155,13 +155,12 @@ func (obj *NspawnRes) Watch() error {
|
|||||||
defer bus.Close()
|
defer bus.Close()
|
||||||
|
|
||||||
// add a match rule to match messages going through the message bus
|
// add a match rule to match messages going through the message bus
|
||||||
call := bus.BusObject().Call(engineUtil.DBusAddMatch, 0,
|
args := fmt.Sprintf("type='signal',interface='%s',eavesdrop='true'", dbusMachine1Iface)
|
||||||
fmt.Sprintf("type='signal',interface='%s',eavesdrop='true'",
|
if call := bus.BusObject().Call(engineUtil.DBusAddMatch, 0, args); call.Err != nil {
|
||||||
dbusInterface))
|
|
||||||
// <-call.Done
|
|
||||||
if err := call.Err; err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
defer bus.BusObject().Call(engineUtil.DBusRemoveMatch, 0, args) // ignore the error
|
||||||
|
|
||||||
busChan := make(chan *dbus.Signal)
|
busChan := make(chan *dbus.Signal)
|
||||||
defer close(busChan)
|
defer close(busChan)
|
||||||
bus.Signal(busChan)
|
bus.Signal(busChan)
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ import (
|
|||||||
"github.com/purpleidea/mgmt/util"
|
"github.com/purpleidea/mgmt/util"
|
||||||
|
|
||||||
"github.com/godbus/dbus"
|
"github.com/godbus/dbus"
|
||||||
|
multierr "github.com/hashicorp/go-multierror"
|
||||||
errwrap "github.com/pkg/errors"
|
errwrap "github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -186,34 +187,50 @@ func (obj *Conn) Close() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// internal helper to add signal matches to the bus, should only be called once
|
// internal helper to add signal matches to the bus, should only be called once
|
||||||
func (obj *Conn) matchSignal(ch chan *dbus.Signal, path dbus.ObjectPath, iface string, signals []string) error {
|
func (obj *Conn) matchSignal(ch chan *dbus.Signal, path dbus.ObjectPath, iface string, signals []string) (func() error, error) {
|
||||||
if obj.Debug {
|
if obj.Debug {
|
||||||
obj.Logf("matchSignal(%v, %v, %s, %v)", ch, path, iface, signals)
|
obj.Logf("matchSignal(%v, %v, %s, %v)", ch, path, iface, signals)
|
||||||
}
|
}
|
||||||
// eg: gdbus monitor --system --dest org.freedesktop.PackageKit --object-path /org/freedesktop/PackageKit | grep <signal>
|
// eg: gdbus monitor --system --dest org.freedesktop.PackageKit --object-path /org/freedesktop/PackageKit | grep <signal>
|
||||||
var call *dbus.Call
|
bus := obj.GetBus().BusObject()
|
||||||
|
var argsList []string
|
||||||
|
// cleanup function should be called when done or when AddMatch errors
|
||||||
|
removeSignals := func() error {
|
||||||
|
var errList error
|
||||||
|
for i := len(argsList) - 1; i >= 0; i-- { // last in first out
|
||||||
|
if call := bus.Call(engineUtil.DBusRemoveMatch, 0, argsList[i]); call.Err != nil {
|
||||||
|
errList = multierr.Append(errList, call.Err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return errList
|
||||||
|
}
|
||||||
// TODO: if we make this call many times, we seem to receive signals
|
// TODO: if we make this call many times, we seem to receive signals
|
||||||
// that many times... Maybe this should be an object singleton?
|
// that many times... Maybe this should be an object singleton?
|
||||||
bus := obj.GetBus().BusObject()
|
var call *dbus.Call
|
||||||
pathStr := fmt.Sprintf("%s", path)
|
pathStr := fmt.Sprintf("%s", path)
|
||||||
if len(signals) == 0 {
|
if len(signals) == 0 {
|
||||||
call = bus.Call(engineUtil.DBusAddMatch, 0, "type='signal',path='"+pathStr+"',interface='"+iface+"'")
|
args := fmt.Sprintf("type='signal', path='%s', interface='%s'", pathStr, iface)
|
||||||
|
argsList = append(argsList, args)
|
||||||
|
call = bus.Call(engineUtil.DBusAddMatch, 0, args)
|
||||||
} else {
|
} else {
|
||||||
for _, signal := range signals {
|
for _, signal := range signals {
|
||||||
call = bus.Call(engineUtil.DBusAddMatch, 0, "type='signal',path='"+pathStr+"',interface='"+iface+"',member='"+signal+"'")
|
args := fmt.Sprintf("type='signal', path='%s', interface='%s', member'%s'", pathStr, iface, signal)
|
||||||
if call.Err != nil {
|
argsList = append(argsList, args)
|
||||||
break
|
if call = bus.Call(engineUtil.DBusAddMatch, 0, args); call.Err != nil {
|
||||||
|
break // fail if any one fails
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if call.Err != nil {
|
if call.Err != nil {
|
||||||
return call.Err
|
defer removeSignals() // ignore the error
|
||||||
|
return nil, call.Err
|
||||||
}
|
}
|
||||||
|
|
||||||
// The caller has to make sure that ch is sufficiently buffered; if a
|
// The caller has to make sure that ch is sufficiently buffered; if a
|
||||||
// message arrives when a write to c is not possible, it is discarded!
|
// message arrives when a write to c is not possible, it is discarded!
|
||||||
// This can be disastrous if we're waiting for a "Finished" signal!
|
// This can be disastrous if we're waiting for a "Finished" signal!
|
||||||
obj.GetBus().Signal(ch)
|
obj.GetBus().Signal(ch)
|
||||||
return nil
|
return removeSignals, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchChanges gets a signal anytime an event happens.
|
// WatchChanges gets a signal anytime an event happens.
|
||||||
@@ -223,11 +240,12 @@ func (obj *Conn) WatchChanges() (chan *dbus.Signal, error) {
|
|||||||
// but with much less specificity. If we're missing events, report the
|
// but with much less specificity. If we're missing events, report the
|
||||||
// issue upstream! The UpdatesChanged signal is what hughsie suggested
|
// issue upstream! The UpdatesChanged signal is what hughsie suggested
|
||||||
var signal = "UpdatesChanged"
|
var signal = "UpdatesChanged"
|
||||||
err := obj.matchSignal(ch, PkPath, PkIface, []string{signal})
|
removeSignals, err := obj.matchSignal(ch, PkPath, PkIface, []string{signal})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if Paranoid { // TODO: this filtering might not be necessary anymore...
|
defer removeSignals() // ignore the error
|
||||||
|
if Paranoid { // TODO: this filtering might not be necessary anymore...
|
||||||
// try to handle the filtering inside this function!
|
// try to handle the filtering inside this function!
|
||||||
rch := make(chan *dbus.Signal)
|
rch := make(chan *dbus.Signal)
|
||||||
go func() {
|
go func() {
|
||||||
@@ -286,7 +304,11 @@ func (obj *Conn) ResolvePackages(packages []string, filter uint64) ([]string, er
|
|||||||
|
|
||||||
// add signal matches for Package and Finished which will always be last
|
// add signal matches for Package and Finished which will always be last
|
||||||
var signals = []string{"Package", "Finished", "Error", "Destroy"}
|
var signals = []string{"Package", "Finished", "Error", "Destroy"}
|
||||||
obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals)
|
removeSignals, err := obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer removeSignals()
|
||||||
if obj.Debug {
|
if obj.Debug {
|
||||||
obj.Logf("ResolvePackages(): Object(%s, %v)", PkIface, interfacePath)
|
obj.Logf("ResolvePackages(): Object(%s, %v)", PkIface, interfacePath)
|
||||||
}
|
}
|
||||||
@@ -396,7 +418,11 @@ func (obj *Conn) InstallPackages(packageIDs []string, transactionFlags uint64) e
|
|||||||
}
|
}
|
||||||
|
|
||||||
var signals = []string{"Package", "ErrorCode", "Finished", "Destroy"} // "ItemProgress", "Status" ?
|
var signals = []string{"Package", "ErrorCode", "Finished", "Destroy"} // "ItemProgress", "Status" ?
|
||||||
obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals)
|
removeSignals, err := obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer removeSignals()
|
||||||
|
|
||||||
bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path
|
bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path
|
||||||
call := bus.Call(FmtTransactionMethod("RefreshCache"), 0, false)
|
call := bus.Call(FmtTransactionMethod("RefreshCache"), 0, false)
|
||||||
@@ -454,7 +480,11 @@ func (obj *Conn) RemovePackages(packageIDs []string, transactionFlags uint64) er
|
|||||||
}
|
}
|
||||||
|
|
||||||
var signals = []string{"Package", "ErrorCode", "Finished", "Destroy"} // "ItemProgress", "Status" ?
|
var signals = []string{"Package", "ErrorCode", "Finished", "Destroy"} // "ItemProgress", "Status" ?
|
||||||
obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals)
|
removeSignals, err := obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer removeSignals()
|
||||||
|
|
||||||
bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path
|
bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path
|
||||||
call := bus.Call(FmtTransactionMethod("RemovePackages"), 0, transactionFlags, packageIDs, allowDeps, autoremove)
|
call := bus.Call(FmtTransactionMethod("RemovePackages"), 0, transactionFlags, packageIDs, allowDeps, autoremove)
|
||||||
@@ -499,7 +529,11 @@ func (obj *Conn) UpdatePackages(packageIDs []string, transactionFlags uint64) er
|
|||||||
}
|
}
|
||||||
|
|
||||||
var signals = []string{"Package", "ErrorCode", "Finished", "Destroy"} // "ItemProgress", "Status" ?
|
var signals = []string{"Package", "ErrorCode", "Finished", "Destroy"} // "ItemProgress", "Status" ?
|
||||||
obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals)
|
removeSignals, err := obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer removeSignals()
|
||||||
|
|
||||||
bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path
|
bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path
|
||||||
call := bus.Call(FmtTransactionMethod("UpdatePackages"), 0, transactionFlags, packageIDs)
|
call := bus.Call(FmtTransactionMethod("UpdatePackages"), 0, transactionFlags, packageIDs)
|
||||||
@@ -544,7 +578,11 @@ func (obj *Conn) GetFilesByPackageID(packageIDs []string) (files map[string][]st
|
|||||||
}
|
}
|
||||||
|
|
||||||
var signals = []string{"Files", "ErrorCode", "Finished", "Destroy"} // "ItemProgress", "Status" ?
|
var signals = []string{"Files", "ErrorCode", "Finished", "Destroy"} // "ItemProgress", "Status" ?
|
||||||
obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals)
|
removeSignals, err := obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer removeSignals()
|
||||||
|
|
||||||
bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path
|
bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path
|
||||||
call := bus.Call(FmtTransactionMethod("GetFiles"), 0, packageIDs)
|
call := bus.Call(FmtTransactionMethod("GetFiles"), 0, packageIDs)
|
||||||
@@ -611,7 +649,11 @@ func (obj *Conn) GetUpdates(filter uint64) ([]string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var signals = []string{"Package", "ErrorCode", "Finished", "Destroy"} // "ItemProgress" ?
|
var signals = []string{"Package", "ErrorCode", "Finished", "Destroy"} // "ItemProgress" ?
|
||||||
obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals)
|
removeSignals, err := obj.matchSignal(ch, interfacePath, PkIfaceTransaction, signals)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer removeSignals()
|
||||||
|
|
||||||
bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path
|
bus := obj.GetBus().Object(PkIface, interfacePath) // pass in found transaction path
|
||||||
call := bus.Call(FmtTransactionMethod("GetUpdates"), 0, filter)
|
call := bus.Call(FmtTransactionMethod("GetUpdates"), 0, filter)
|
||||||
|
|||||||
Reference in New Issue
Block a user