diff --git a/engine/resources/net.go b/engine/resources/net.go index bde7463f..248e3e51 100644 --- a/engine/resources/net.go +++ b/engine/resources/net.go @@ -34,6 +34,7 @@ import ( "github.com/purpleidea/mgmt/engine/traits" "github.com/purpleidea/mgmt/recwatch" "github.com/purpleidea/mgmt/util" + "github.com/purpleidea/mgmt/util/socketset" multierr "github.com/hashicorp/go-multierror" errwrap "github.com/pkg/errors" @@ -41,7 +42,6 @@ import ( // do not clean up spawned goroutines. Should be replaced when a suitable // alternative is available. "github.com/vishvananda/netlink" - "golang.org/x/sys/unix" ) func init() { @@ -195,11 +195,11 @@ func (obj *NetRes) Watch() error { defer wg.Wait() // create a netlink socket for receiving network interface events - conn, err := newSocketSet(rtmGrps, obj.socketFile) + conn, err := socketset.NewSocketSet(rtmGrps, obj.socketFile) if err != nil { return errwrap.Wrapf(err, "error creating socket set") } - defer conn.shutdown() // close the netlink socket and unblock conn.receive() + defer conn.Shutdown() // close the netlink socket and unblock conn.receive() // watch the systemd-networkd configuration file recWatcher, err := recwatch.NewRecWatcher(obj.unitFilePath, false) @@ -219,11 +219,11 @@ func (obj *NetRes) Watch() error { wg.Add(1) go func() { defer wg.Done() - defer conn.close() // close the pipe when we're done with it + defer conn.Close() // close the pipe when we're done with it defer close(nlChan) for { // receive messages from the socket set - msgs, err := conn.receive() + msgs, err := conn.Receive() if err != nil { select { case nlChan <- &nlChanStruct{ @@ -768,122 +768,3 @@ func (obj *iface) addrApplyAdd(objAddrs []string) error { } return nil } - -// socketSet is used to receive events from a socket and shut it down cleanly -// when asked. It contains a socket for events and a pipe socket to unblock -// receive on shutdown. -type socketSet struct { - fdEvents int - fdPipe int - pipeFile string -} - -// newSocketSet returns a socketSet, initialized with the given parameters. -func newSocketSet(groups uint32, file string) (*socketSet, error) { - // make a netlink socket file descriptor - fdEvents, err := unix.Socket(unix.AF_NETLINK, unix.SOCK_RAW, unix.NETLINK_ROUTE) - if err != nil { - return nil, errwrap.Wrapf(err, "error creating netlink socket") - } - // bind to the socket and add add the netlink groups we need to get events - if err := unix.Bind(fdEvents, &unix.SockaddrNetlink{ - Family: unix.AF_NETLINK, - Groups: groups, - }); err != nil { - return nil, errwrap.Wrapf(err, "error binding netlink socket") - } - - // create a pipe socket to unblock unix.Select when we close - fdPipe, err := unix.Socket(unix.AF_UNIX, unix.SOCK_RAW, unix.PROT_NONE) - if err != nil { - return nil, errwrap.Wrapf(err, "error creating pipe socket") - } - // bind the pipe to a file - if err = unix.Bind(fdPipe, &unix.SockaddrUnix{ - Name: file, - }); err != nil { - return nil, errwrap.Wrapf(err, "error binding pipe socket") - } - return &socketSet{ - fdEvents: fdEvents, - fdPipe: fdPipe, - pipeFile: file, - }, nil -} - -// shutdown closes the event file descriptor and unblocks receive by sending -// a message to the pipe file descriptor. It must be called before close, and -// should only be called once. -func (obj *socketSet) shutdown() error { - // close the event socket so no more events are produced - if err := unix.Close(obj.fdEvents); err != nil { - return err - } - // send a message to the pipe to unblock select - return unix.Sendto(obj.fdPipe, nil, 0, &unix.SockaddrUnix{ - Name: path.Join(obj.pipeFile), - }) -} - -// close closes the pipe file descriptor. It must only be called after -// shutdown has closed fdEvents, and unblocked receive. It should only be -// called once. -func (obj *socketSet) close() error { - return unix.Close(obj.fdPipe) -} - -// receive waits for bytes from fdEvents and parses them into a slice of -// netlink messages. It will block until an event is produced, or shutdown -// is called. -func (obj *socketSet) receive() ([]syscall.NetlinkMessage, error) { - // Select will return when any fd in fdSet (fdEvents and fdPipe) is ready - // to read. - _, err := unix.Select(obj.nfd(), obj.fdSet(), nil, nil, nil) - if err != nil { - // if a system interrupt is caught - if err == unix.EINTR { // signal interrupt - return nil, nil - } - return nil, errwrap.Wrapf(err, "error selecting on fd") - } - // receive the message from the netlink socket into b - b := make([]byte, os.Getpagesize()) - n, _, err := unix.Recvfrom(obj.fdEvents, b, unix.MSG_DONTWAIT) // non-blocking receive - if err != nil { - // if fdEvents is closed - if err == unix.EBADF { // bad file descriptor - return nil, nil - } - return nil, errwrap.Wrapf(err, "error receiving messages") - } - // if we didn't get enough bytes for a header, something went wrong - if n < unix.NLMSG_HDRLEN { - return nil, fmt.Errorf("received short header") - } - b = b[:n] // truncate b to message length - // use syscall to parse, as func does not exist in x/sys/unix - return syscall.ParseNetlinkMessage(b) -} - -// nfd returns one more than the highest fd value in the struct, for use as as -// the nfds parameter in select. It represents the file descriptor set maximum -// size. See man select for more info. -func (obj *socketSet) nfd() int { - if obj.fdEvents > obj.fdPipe { - return obj.fdEvents + 1 - } - return obj.fdPipe + 1 -} - -// fdSet returns a bitmask representation of the integer values of fdEvents -// and fdPipe. See man select for more info. -func (obj *socketSet) fdSet() *unix.FdSet { - fdSet := &unix.FdSet{} - // Generate the bitmask representing the file descriptors in the socketSet. - // The rightmost bit corresponds to file descriptor zero, and each bit to - // the left represents the next file descriptor number in the sequence of - // all real numbers. E.g. the FdSet containing containing 0 and 4 is 10001. - fdSet.Bits[obj.fdEvents/64] |= 1 << uint(obj.fdEvents) - fdSet.Bits[obj.fdPipe/64] |= 1 << uint(obj.fdPipe) - return fdSet -} diff --git a/engine/resources/net_test.go b/engine/resources/net_test.go index ad859192..110214e8 100644 --- a/engine/resources/net_test.go +++ b/engine/resources/net_test.go @@ -21,8 +21,6 @@ import ( "bytes" "strings" "testing" - - "golang.org/x/sys/unix" ) // test cases for NetRes.unitFileContents() @@ -82,85 +80,3 @@ func TestUnitFileContents(t *testing.T) { } } } - -// test cases for socketSet.fdSet() -var fdSetTests = []struct { - in *socketSet - out *unix.FdSet -}{ - { - &socketSet{ - fdEvents: 3, - fdPipe: 4, - }, - &unix.FdSet{ - Bits: [16]int64{0x18}, // 11000 - }, - }, - { - &socketSet{ - fdEvents: 12, - fdPipe: 8, - }, - &unix.FdSet{ - Bits: [16]int64{0x1100}, // 1000100000000 - }, - }, - { - &socketSet{ - fdEvents: 9, - fdPipe: 21, - }, - &unix.FdSet{ - Bits: [16]int64{0x200200}, // 1000000000001000000000 - }, - }, -} - -// test socketSet.fdSet() -func TestFdSet(t *testing.T) { - for _, test := range fdSetTests { - result := test.in.fdSet() - if *result != *test.out { - t.Errorf("fdSet test wanted: %b, got: %b", *test.out, *result) - } - } -} - -// test cases for socketSet.nfd() -var nfdTests = []struct { - in *socketSet - out int -}{ - { - &socketSet{ - fdEvents: 3, - fdPipe: 4, - }, - 5, - }, - { - &socketSet{ - fdEvents: 8, - fdPipe: 4, - }, - 9, - }, - { - &socketSet{ - fdEvents: 90, - fdPipe: 900, - }, - 901, - }, -} - -// test socketSet.nfd() -func TestNfd(t *testing.T) { - for _, test := range nfdTests { - result := test.in.nfd() - if result != test.out { - t.Errorf("nfd test wanted: %d, got: %d", test.out, result) - } - } -} diff --git a/util/socketset/socketset.go b/util/socketset/socketset.go new file mode 100644 index 00000000..e7a9453a --- /dev/null +++ b/util/socketset/socketset.go @@ -0,0 +1,149 @@ +// Mgmt +// Copyright (C) 2013-2018+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +// Package socketset is in API for creating a select style netlink +// socket to receive events from the kernel. +package socketset + +import ( + "fmt" + "os" + "path" + "syscall" + + errwrap "github.com/pkg/errors" + "golang.org/x/sys/unix" +) + +// SocketSet is used to receive events from a socket and shut it down cleanly +// when asked. It contains a socket for events and a pipe socket to unblock +// receive on shutdown. +type SocketSet struct { + fdEvents int + fdPipe int + pipeFile string +} + +// NewSocketSet returns a socketSet, initialized with the given parameters. +func NewSocketSet(groups uint32, file string) (*SocketSet, error) { + // make a netlink socket file descriptor + fdEvents, err := unix.Socket(unix.AF_NETLINK, unix.SOCK_RAW, unix.NETLINK_ROUTE) + if err != nil { + return nil, errwrap.Wrapf(err, "error creating netlink socket") + } + // bind to the socket and add add the netlink groups we need to get events + if err := unix.Bind(fdEvents, &unix.SockaddrNetlink{ + Family: unix.AF_NETLINK, + Groups: groups, + }); err != nil { + return nil, errwrap.Wrapf(err, "error binding netlink socket") + } + + // create a pipe socket to unblock unix.Select when we close + fdPipe, err := unix.Socket(unix.AF_UNIX, unix.SOCK_RAW, unix.PROT_NONE) + if err != nil { + return nil, errwrap.Wrapf(err, "error creating pipe socket") + } + // bind the pipe to a file + if err = unix.Bind(fdPipe, &unix.SockaddrUnix{ + Name: file, + }); err != nil { + return nil, errwrap.Wrapf(err, "error binding pipe socket") + } + return &SocketSet{ + fdEvents: fdEvents, + fdPipe: fdPipe, + pipeFile: file, + }, nil +} + +// Shutdown closes the event file descriptor and unblocks receive by sending +// a message to the pipe file descriptor. It must be called before close, and +// should only be called once. +func (obj *SocketSet) Shutdown() error { + // close the event socket so no more events are produced + if err := unix.Close(obj.fdEvents); err != nil { + return err + } + // send a message to the pipe to unblock select + return unix.Sendto(obj.fdPipe, nil, 0, &unix.SockaddrUnix{ + Name: path.Join(obj.pipeFile), + }) +} + +// Close closes the pipe file descriptor. It must only be called after +// shutdown has closed fdEvents, and unblocked receive. It should only be +// called once. +func (obj *SocketSet) Close() error { + return unix.Close(obj.fdPipe) +} + +// Receive waits for bytes from fdEvents and parses them into a slice of +// netlink messages. It will block until an event is produced, or shutdown +// is called. +func (obj *SocketSet) Receive() ([]syscall.NetlinkMessage, error) { + // Select will return when any fd in fdSet (fdEvents and fdPipe) is ready + // to read. + _, err := unix.Select(obj.nfd(), obj.fdSet(), nil, nil, nil) + if err != nil { + // if a system interrupt is caught + if err == unix.EINTR { // signal interrupt + return nil, nil + } + return nil, errwrap.Wrapf(err, "error selecting on fd") + } + // receive the message from the netlink socket into b + b := make([]byte, os.Getpagesize()) + n, _, err := unix.Recvfrom(obj.fdEvents, b, unix.MSG_DONTWAIT) // non-blocking receive + if err != nil { + // if fdEvents is closed + if err == unix.EBADF { // bad file descriptor + return nil, nil + } + return nil, errwrap.Wrapf(err, "error receiving messages") + } + // if we didn't get enough bytes for a header, something went wrong + if n < unix.NLMSG_HDRLEN { + return nil, fmt.Errorf("received short header") + } + b = b[:n] // truncate b to message length + // use syscall to parse, as func does not exist in x/sys/unix + return syscall.ParseNetlinkMessage(b) +} + +// nfd returns one more than the highest fd value in the struct, for use as as +// the nfds parameter in select. It represents the file descriptor set maximum +// size. See man select for more info. +func (obj *SocketSet) nfd() int { + if obj.fdEvents > obj.fdPipe { + return obj.fdEvents + 1 + } + return obj.fdPipe + 1 +} + +// fdSet returns a bitmask representation of the integer values of fdEvents +// and fdPipe. See man select for more info. +func (obj *SocketSet) fdSet() *unix.FdSet { + fdSet := &unix.FdSet{} + // Generate the bitmask representing the file descriptors in the socketSet. + // The rightmost bit corresponds to file descriptor zero, and each bit to + // the left represents the next file descriptor number in the sequence of + // all real numbers. E.g. the FdSet containing containing 0 and 4 is 10001. + fdSet.Bits[obj.fdEvents/64] |= 1 << uint(obj.fdEvents) + fdSet.Bits[obj.fdPipe/64] |= 1 << uint(obj.fdPipe) + return fdSet +} diff --git a/util/socketset/socketset_test.go b/util/socketset/socketset_test.go new file mode 100644 index 00000000..5a0c38a5 --- /dev/null +++ b/util/socketset/socketset_test.go @@ -0,0 +1,106 @@ +// Mgmt +// Copyright (C) 2013-2018+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package socketset + +import ( + "testing" + + "golang.org/x/sys/unix" +) + +// test cases for socketSet.fdSet() +var fdSetTests = []struct { + in *SocketSet + out *unix.FdSet +}{ + { + &SocketSet{ + fdEvents: 3, + fdPipe: 4, + }, + &unix.FdSet{ + Bits: [16]int64{0x18}, // 11000 + }, + }, + { + &SocketSet{ + fdEvents: 12, + fdPipe: 8, + }, + &unix.FdSet{ + Bits: [16]int64{0x1100}, // 1000100000000 + }, + }, + { + &SocketSet{ + fdEvents: 9, + fdPipe: 21, + }, + &unix.FdSet{ + Bits: [16]int64{0x200200}, // 1000000000001000000000 + }, + }, +} + +// test socketSet.fdSet() +func TestFdSet(t *testing.T) { + for _, test := range fdSetTests { + result := test.in.fdSet() + if *result != *test.out { + t.Errorf("fdSet test wanted: %b, got: %b", *test.out, *result) + } + } +} + +// test cases for socketSet.nfd() +var nfdTests = []struct { + in *SocketSet + out int +}{ + { + &SocketSet{ + fdEvents: 3, + fdPipe: 4, + }, + 5, + }, + { + &SocketSet{ + fdEvents: 8, + fdPipe: 4, + }, + 9, + }, + { + &SocketSet{ + fdEvents: 90, + fdPipe: 900, + }, + 901, + }, +} + +// test socketSet.nfd() +func TestNfd(t *testing.T) { + for _, test := range nfdTests { + result := test.in.nfd() + if result != test.out { + t.Errorf("nfd test wanted: %d, got: %d", test.out, result) + } + } +}