engine: resources: Don't prematurely error docker watches

A subtlety about the engine is that while it guarantees CheckApply
happens in the listed edge-based dependency order, it doesn't stop
Watch from starting up in whatever order it wants to. As a result, we
can prematurely error since the docker service isn't running yet. It may
in fact be in the process of getting installed and started by mgmt
before we then try and use this resource! As a result, let it error once
for free and wait for CheckApply to get going before we start again.

Keep in mind, Watch has to use the .Running() method once to tell
CheckApply to do its initial event. So this concurrency is complex!

It's unclear if this is a bug in mgmt or not, but I'm leaning towards
not, particularly since there isn't an obvious way to fix it.
This commit is contained in:
James Shubin
2025-03-12 06:14:38 -04:00
parent 3ca1aa9cb1
commit f8a4751290
2 changed files with 116 additions and 10 deletions

View File

@@ -37,6 +37,7 @@ import (
"io" "io"
"regexp" "regexp"
"strings" "strings"
"sync"
"github.com/purpleidea/mgmt/engine" "github.com/purpleidea/mgmt/engine"
"github.com/purpleidea/mgmt/engine/traits" "github.com/purpleidea/mgmt/engine/traits"
@@ -97,6 +98,11 @@ type DockerContainerRes struct {
init *engine.Init init *engine.Init
client *dockerClient.Client // docker api client client *dockerClient.Client // docker api client
once *sync.Once
start chan struct{} // closes by once
sflag bool // first time happened?
ready chan struct{} // closes by once
} }
// Default returns some sensible defaults for this resource. // Default returns some sensible defaults for this resource.
@@ -155,6 +161,10 @@ func (obj *DockerContainerRes) Validate() error {
func (obj *DockerContainerRes) Init(init *engine.Init) error { func (obj *DockerContainerRes) Init(init *engine.Init) error {
obj.init = init // save for later obj.init = init // save for later
obj.once = &sync.Once{}
obj.start = make(chan struct{})
obj.ready = make(chan struct{})
return nil return nil
} }
@@ -165,17 +175,52 @@ func (obj *DockerContainerRes) Cleanup() error {
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *DockerContainerRes) Watch(ctx context.Context) error { func (obj *DockerContainerRes) Watch(ctx context.Context) error {
var client *dockerClient.Client
var err error
// Initialize the docker client. for {
client, err := dockerClient.NewClientWithOpts(dockerClient.WithVersion(obj.APIVersion)) client, err = dockerClient.NewClientWithOpts(dockerClient.WithVersion(obj.APIVersion))
if err != nil { if err == nil {
// the above won't check the connection, force that here
_, err = client.Ping(ctx)
}
if err == nil {
break
}
// If we didn't connect right away, it might be because we're
// waiting for someone to install the docker package, and start
// the service. We might even have an edge between this resource
// and those dependencies, but that doesn't stop this Watch from
// starting up. As a result, we will wait *once* for CheckApply
// to unlock us, since that runs in dependency order.
// This error looks like: Cannot connect to the Docker daemon at
// unix:///var/run/docker.sock. Is the docker daemon running?
if dockerClient.IsErrConnectionFailed(err) && !obj.sflag {
// notify engine that we're running so that CheckApply
// can start...
obj.init.Running()
select {
case <-obj.start:
obj.sflag = true
continue
case <-ctx.Done(): // don't block
close(obj.ready) // tell CheckApply to unblock!
return nil
}
}
close(obj.ready) // tell CheckApply to unblock!
return errwrap.Wrapf(err, "error creating docker client") return errwrap.Wrapf(err, "error creating docker client")
} }
defer client.Close() defer client.Close() // success, so close it later
eventChan, errChan := client.Events(ctx, types.EventsOptions{}) eventChan, errChan := client.Events(ctx, types.EventsOptions{})
close(obj.ready) // tell CheckApply to start now that events are running
obj.init.Running() // when started, notify engine that we're running // notify engine that we're running
if !obj.sflag {
obj.init.Running()
}
var send = false // send event? var send = false // send event?
for { for {
@@ -209,6 +254,15 @@ func (obj *DockerContainerRes) Watch(ctx context.Context) error {
// CheckApply method for Docker resource. // CheckApply method for Docker resource.
func (obj *DockerContainerRes) CheckApply(ctx context.Context, apply bool) (bool, error) { func (obj *DockerContainerRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
obj.once.Do(func() { close(obj.start) }) // Tell Watch() it's safe to start again.
// Now wait to make sure events are started before we make changes!
select {
case <-obj.ready:
case <-ctx.Done(): // don't block
return false, ctx.Err()
}
var id string var id string
var destroy bool var destroy bool
var err error var err error

View File

@@ -37,6 +37,7 @@ import (
"io" "io"
"regexp" "regexp"
"strings" "strings"
"sync"
"github.com/purpleidea/mgmt/engine" "github.com/purpleidea/mgmt/engine"
"github.com/purpleidea/mgmt/engine/traits" "github.com/purpleidea/mgmt/engine/traits"
@@ -66,6 +67,11 @@ type DockerImageRes struct {
APIVersion string `lang:"apiversion" yaml:"apiversion"` APIVersion string `lang:"apiversion" yaml:"apiversion"`
init *engine.Init init *engine.Init
once *sync.Once
start chan struct{} // closes by once
sflag bool // first time happened?
ready chan struct{} // closes by once
} }
// Default returns some sensible defaults for this resource. // Default returns some sensible defaults for this resource.
@@ -102,6 +108,10 @@ func (obj *DockerImageRes) Validate() error {
func (obj *DockerImageRes) Init(init *engine.Init) error { func (obj *DockerImageRes) Init(init *engine.Init) error {
obj.init = init // save for later obj.init = init // save for later
obj.once = &sync.Once{}
obj.start = make(chan struct{})
obj.ready = make(chan struct{})
return nil return nil
} }
@@ -112,18 +122,52 @@ func (obj *DockerImageRes) Cleanup() error {
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *DockerImageRes) Watch(ctx context.Context) error { func (obj *DockerImageRes) Watch(ctx context.Context) error {
var client *dockerClient.Client
var err error
// Initialize the docker client. for {
client, err := dockerClient.NewClientWithOpts(dockerClient.WithVersion(obj.APIVersion)) client, err = dockerClient.NewClientWithOpts(dockerClient.WithVersion(obj.APIVersion))
if err != nil { if err == nil {
// the above won't check the connection, force that here
_, err = client.Ping(ctx)
}
if err == nil {
break
}
// If we didn't connect right away, it might be because we're
// waiting for someone to install the docker package, and start
// the service. We might even have an edge between this resource
// and those dependencies, but that doesn't stop this Watch from
// starting up. As a result, we will wait *once* for CheckApply
// to unlock us, since that runs in dependency order.
// This error looks like: Cannot connect to the Docker daemon at
// unix:///var/run/docker.sock. Is the docker daemon running?
if dockerClient.IsErrConnectionFailed(err) && !obj.sflag {
// notify engine that we're running so that CheckApply
// can start...
obj.init.Running()
select {
case <-obj.start:
obj.sflag = true
continue
case <-ctx.Done(): // don't block
close(obj.ready) // tell CheckApply to unblock!
return nil
}
}
close(obj.ready) // tell CheckApply to unblock!
return errwrap.Wrapf(err, "error creating docker client") return errwrap.Wrapf(err, "error creating docker client")
} }
defer client.Close() defer client.Close() // success, so close it later
eventChan, errChan := client.Events(ctx, types.EventsOptions{}) eventChan, errChan := client.Events(ctx, types.EventsOptions{})
close(obj.ready) // tell CheckApply to start now that events are running
// notify engine that we're running // notify engine that we're running
obj.init.Running() if !obj.sflag {
obj.init.Running()
}
var send = false // send event? var send = false // send event?
for { for {
@@ -158,6 +202,14 @@ func (obj *DockerImageRes) Watch(ctx context.Context) error {
// CheckApply method for Docker resource. // CheckApply method for Docker resource.
func (obj *DockerImageRes) CheckApply(ctx context.Context, apply bool) (checkOK bool, err error) { func (obj *DockerImageRes) CheckApply(ctx context.Context, apply bool) (checkOK bool, err error) {
obj.once.Do(func() { close(obj.start) }) // Tell Watch() it's safe to start again.
// Now wait to make sure events are started before we make changes!
select {
case <-obj.ready:
case <-ctx.Done(): // don't block
return false, ctx.Err()
}
// Save the full image name and tag. // Save the full image name and tag.
image := dockerImageNameTag(obj.Name()) image := dockerImageNameTag(obj.Name())