diff --git a/engine/resources/net.go b/engine/resources/net.go index 98f99a23..20279d4d 100644 --- a/engine/resources/net.go +++ b/engine/resources/net.go @@ -191,15 +191,19 @@ func (obj *NetRes) Close() error { // TODO: currently gets events from ALL interfaces, would be nice to reject // events from other interfaces. func (obj *NetRes) Watch() error { - // waitgroup for netlink receive goroutine - wg := &sync.WaitGroup{} - defer wg.Wait() - // create a netlink socket for receiving network interface events conn, err := socketset.NewSocketSet(rtmGrps, obj.socketFile, unix.NETLINK_ROUTE) if err != nil { return errwrap.Wrapf(err, "error creating socket set") } + + // waitgroup for netlink receive goroutine + wg := &sync.WaitGroup{} + defer conn.Close() + // We must wait for the Shutdown() AND the select inside of SocketSet to + // complete before we Close, since the unblocking in SocketSet is not a + // synchronous operation. + defer wg.Wait() defer conn.Shutdown() // close the netlink socket and unblock conn.receive() // watch the systemd-networkd configuration file @@ -220,7 +224,6 @@ func (obj *NetRes) Watch() error { wg.Add(1) go func() { defer wg.Done() - defer conn.Close() // close the pipe when we're done with it defer close(nlChan) for { // receive messages from the socket set diff --git a/lang/funcs/core/coresys/cpucount_fact.go b/lang/funcs/core/coresys/cpucount_fact.go index a1a6699c..630fe571 100644 --- a/lang/funcs/core/coresys/cpucount_fact.go +++ b/lang/funcs/core/coresys/cpucount_fact.go @@ -76,9 +76,12 @@ func (obj CPUCountFact) Stream() error { // waitgroup for netlink receive goroutine wg := &sync.WaitGroup{} - defer wg.Wait() defer ss.Close() - defer ss.Shutdown() + // We must wait for the Shutdown() AND the select inside of SocketSet to + // complete before we Close, since the unblocking in SocketSet is not a + // synchronous operation. + defer wg.Wait() + defer ss.Shutdown() // close the netlink socket and unblock conn.receive() eventChan := make(chan *nlChanEvent) // updated in goroutine when we receive uevent closeChan := make(chan struct{}) // channel to unblock selects in goroutine diff --git a/util/socketset/socketset_test.go b/util/socketset/socketset_test.go index 81747117..07b90101 100644 --- a/util/socketset/socketset_test.go +++ b/util/socketset/socketset_test.go @@ -108,18 +108,22 @@ func TestNfd(t *testing.T) { // test SocketSet.Shutdown() func TestShutdown(t *testing.T) { - wg := &sync.WaitGroup{} - defer wg.Wait() - // pass 0 so we create a socket that doesn't receive any events ss, err := NewSocketSet(0, "pipe.sock", 0) if err != nil { t.Errorf("could not create SocketSet: %+v", err) } + // waitgroup for netlink receive goroutine + wg := &sync.WaitGroup{} + defer ss.Close() + // We must wait for the Shutdown() AND the select inside of SocketSet to + // complete before we Close, since the unblocking in SocketSet is not a + // synchronous operation. + defer wg.Wait() + defer ss.Shutdown() // close the netlink socket and unblock conn.receive() + closeChan := make(chan struct{}) defer close(closeChan) - defer ss.Close() - defer ss.Shutdown() // create a listener that never receives any data wg.Add(1) // add a waitgroup to ensure this will block if we don't properly unblock Select