util: Move socketset from net resource to util
Prepare the socketset api to be used outside of the scope the net resource.
This commit is contained in:
@@ -34,6 +34,7 @@ import (
|
||||
"github.com/purpleidea/mgmt/engine/traits"
|
||||
"github.com/purpleidea/mgmt/recwatch"
|
||||
"github.com/purpleidea/mgmt/util"
|
||||
"github.com/purpleidea/mgmt/util/socketset"
|
||||
|
||||
multierr "github.com/hashicorp/go-multierror"
|
||||
errwrap "github.com/pkg/errors"
|
||||
@@ -41,7 +42,6 @@ import (
|
||||
// do not clean up spawned goroutines. Should be replaced when a suitable
|
||||
// alternative is available.
|
||||
"github.com/vishvananda/netlink"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -195,11 +195,11 @@ func (obj *NetRes) Watch() error {
|
||||
defer wg.Wait()
|
||||
|
||||
// create a netlink socket for receiving network interface events
|
||||
conn, err := newSocketSet(rtmGrps, obj.socketFile)
|
||||
conn, err := socketset.NewSocketSet(rtmGrps, obj.socketFile)
|
||||
if err != nil {
|
||||
return errwrap.Wrapf(err, "error creating socket set")
|
||||
}
|
||||
defer conn.shutdown() // close the netlink socket and unblock conn.receive()
|
||||
defer conn.Shutdown() // close the netlink socket and unblock conn.receive()
|
||||
|
||||
// watch the systemd-networkd configuration file
|
||||
recWatcher, err := recwatch.NewRecWatcher(obj.unitFilePath, false)
|
||||
@@ -219,11 +219,11 @@ func (obj *NetRes) Watch() error {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer conn.close() // close the pipe when we're done with it
|
||||
defer conn.Close() // close the pipe when we're done with it
|
||||
defer close(nlChan)
|
||||
for {
|
||||
// receive messages from the socket set
|
||||
msgs, err := conn.receive()
|
||||
msgs, err := conn.Receive()
|
||||
if err != nil {
|
||||
select {
|
||||
case nlChan <- &nlChanStruct{
|
||||
@@ -768,122 +768,3 @@ func (obj *iface) addrApplyAdd(objAddrs []string) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// socketSet is used to receive events from a socket and shut it down cleanly
|
||||
// when asked. It contains a socket for events and a pipe socket to unblock
|
||||
// receive on shutdown.
|
||||
type socketSet struct {
|
||||
fdEvents int
|
||||
fdPipe int
|
||||
pipeFile string
|
||||
}
|
||||
|
||||
// newSocketSet returns a socketSet, initialized with the given parameters.
|
||||
func newSocketSet(groups uint32, file string) (*socketSet, error) {
|
||||
// make a netlink socket file descriptor
|
||||
fdEvents, err := unix.Socket(unix.AF_NETLINK, unix.SOCK_RAW, unix.NETLINK_ROUTE)
|
||||
if err != nil {
|
||||
return nil, errwrap.Wrapf(err, "error creating netlink socket")
|
||||
}
|
||||
// bind to the socket and add add the netlink groups we need to get events
|
||||
if err := unix.Bind(fdEvents, &unix.SockaddrNetlink{
|
||||
Family: unix.AF_NETLINK,
|
||||
Groups: groups,
|
||||
}); err != nil {
|
||||
return nil, errwrap.Wrapf(err, "error binding netlink socket")
|
||||
}
|
||||
|
||||
// create a pipe socket to unblock unix.Select when we close
|
||||
fdPipe, err := unix.Socket(unix.AF_UNIX, unix.SOCK_RAW, unix.PROT_NONE)
|
||||
if err != nil {
|
||||
return nil, errwrap.Wrapf(err, "error creating pipe socket")
|
||||
}
|
||||
// bind the pipe to a file
|
||||
if err = unix.Bind(fdPipe, &unix.SockaddrUnix{
|
||||
Name: file,
|
||||
}); err != nil {
|
||||
return nil, errwrap.Wrapf(err, "error binding pipe socket")
|
||||
}
|
||||
return &socketSet{
|
||||
fdEvents: fdEvents,
|
||||
fdPipe: fdPipe,
|
||||
pipeFile: file,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// shutdown closes the event file descriptor and unblocks receive by sending
|
||||
// a message to the pipe file descriptor. It must be called before close, and
|
||||
// should only be called once.
|
||||
func (obj *socketSet) shutdown() error {
|
||||
// close the event socket so no more events are produced
|
||||
if err := unix.Close(obj.fdEvents); err != nil {
|
||||
return err
|
||||
}
|
||||
// send a message to the pipe to unblock select
|
||||
return unix.Sendto(obj.fdPipe, nil, 0, &unix.SockaddrUnix{
|
||||
Name: path.Join(obj.pipeFile),
|
||||
})
|
||||
}
|
||||
|
||||
// close closes the pipe file descriptor. It must only be called after
|
||||
// shutdown has closed fdEvents, and unblocked receive. It should only be
|
||||
// called once.
|
||||
func (obj *socketSet) close() error {
|
||||
return unix.Close(obj.fdPipe)
|
||||
}
|
||||
|
||||
// receive waits for bytes from fdEvents and parses them into a slice of
|
||||
// netlink messages. It will block until an event is produced, or shutdown
|
||||
// is called.
|
||||
func (obj *socketSet) receive() ([]syscall.NetlinkMessage, error) {
|
||||
// Select will return when any fd in fdSet (fdEvents and fdPipe) is ready
|
||||
// to read.
|
||||
_, err := unix.Select(obj.nfd(), obj.fdSet(), nil, nil, nil)
|
||||
if err != nil {
|
||||
// if a system interrupt is caught
|
||||
if err == unix.EINTR { // signal interrupt
|
||||
return nil, nil
|
||||
}
|
||||
return nil, errwrap.Wrapf(err, "error selecting on fd")
|
||||
}
|
||||
// receive the message from the netlink socket into b
|
||||
b := make([]byte, os.Getpagesize())
|
||||
n, _, err := unix.Recvfrom(obj.fdEvents, b, unix.MSG_DONTWAIT) // non-blocking receive
|
||||
if err != nil {
|
||||
// if fdEvents is closed
|
||||
if err == unix.EBADF { // bad file descriptor
|
||||
return nil, nil
|
||||
}
|
||||
return nil, errwrap.Wrapf(err, "error receiving messages")
|
||||
}
|
||||
// if we didn't get enough bytes for a header, something went wrong
|
||||
if n < unix.NLMSG_HDRLEN {
|
||||
return nil, fmt.Errorf("received short header")
|
||||
}
|
||||
b = b[:n] // truncate b to message length
|
||||
// use syscall to parse, as func does not exist in x/sys/unix
|
||||
return syscall.ParseNetlinkMessage(b)
|
||||
}
|
||||
|
||||
// nfd returns one more than the highest fd value in the struct, for use as as
|
||||
// the nfds parameter in select. It represents the file descriptor set maximum
|
||||
// size. See man select for more info.
|
||||
func (obj *socketSet) nfd() int {
|
||||
if obj.fdEvents > obj.fdPipe {
|
||||
return obj.fdEvents + 1
|
||||
}
|
||||
return obj.fdPipe + 1
|
||||
}
|
||||
|
||||
// fdSet returns a bitmask representation of the integer values of fdEvents
|
||||
// and fdPipe. See man select for more info.
|
||||
func (obj *socketSet) fdSet() *unix.FdSet {
|
||||
fdSet := &unix.FdSet{}
|
||||
// Generate the bitmask representing the file descriptors in the socketSet.
|
||||
// The rightmost bit corresponds to file descriptor zero, and each bit to
|
||||
// the left represents the next file descriptor number in the sequence of
|
||||
// all real numbers. E.g. the FdSet containing containing 0 and 4 is 10001.
|
||||
fdSet.Bits[obj.fdEvents/64] |= 1 << uint(obj.fdEvents)
|
||||
fdSet.Bits[obj.fdPipe/64] |= 1 << uint(obj.fdPipe)
|
||||
return fdSet
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user