diff --git a/engine/resources/docker_container.go b/engine/resources/docker_container.go index 385a6cb6..6742ba32 100644 --- a/engine/resources/docker_container.go +++ b/engine/resources/docker_container.go @@ -37,6 +37,7 @@ import ( "io" "regexp" "strings" + "sync" "github.com/purpleidea/mgmt/engine" "github.com/purpleidea/mgmt/engine/traits" @@ -97,6 +98,11 @@ type DockerContainerRes struct { init *engine.Init client *dockerClient.Client // docker api client + + once *sync.Once + start chan struct{} // closes by once + sflag bool // first time happened? + ready chan struct{} // closes by once } // Default returns some sensible defaults for this resource. @@ -155,6 +161,10 @@ func (obj *DockerContainerRes) Validate() error { func (obj *DockerContainerRes) Init(init *engine.Init) error { obj.init = init // save for later + obj.once = &sync.Once{} + obj.start = make(chan struct{}) + obj.ready = make(chan struct{}) + return nil } @@ -165,17 +175,52 @@ func (obj *DockerContainerRes) Cleanup() error { // Watch is the primary listener for this resource and it outputs events. func (obj *DockerContainerRes) Watch(ctx context.Context) error { + var client *dockerClient.Client + var err error - // Initialize the docker client. - client, err := dockerClient.NewClientWithOpts(dockerClient.WithVersion(obj.APIVersion)) - if err != nil { + for { + client, err = dockerClient.NewClientWithOpts(dockerClient.WithVersion(obj.APIVersion)) + if err == nil { + // the above won't check the connection, force that here + _, err = client.Ping(ctx) + } + if err == nil { + break + } + // If we didn't connect right away, it might be because we're + // waiting for someone to install the docker package, and start + // the service. We might even have an edge between this resource + // and those dependencies, but that doesn't stop this Watch from + // starting up. As a result, we will wait *once* for CheckApply + // to unlock us, since that runs in dependency order. + // This error looks like: Cannot connect to the Docker daemon at + // unix:///var/run/docker.sock. Is the docker daemon running? + if dockerClient.IsErrConnectionFailed(err) && !obj.sflag { + // notify engine that we're running so that CheckApply + // can start... + obj.init.Running() + select { + case <-obj.start: + obj.sflag = true + continue + + case <-ctx.Done(): // don't block + close(obj.ready) // tell CheckApply to unblock! + return nil + } + } + close(obj.ready) // tell CheckApply to unblock! return errwrap.Wrapf(err, "error creating docker client") } - defer client.Close() + defer client.Close() // success, so close it later eventChan, errChan := client.Events(ctx, types.EventsOptions{}) + close(obj.ready) // tell CheckApply to start now that events are running - obj.init.Running() // when started, notify engine that we're running + // notify engine that we're running + if !obj.sflag { + obj.init.Running() + } var send = false // send event? for { @@ -209,6 +254,15 @@ func (obj *DockerContainerRes) Watch(ctx context.Context) error { // CheckApply method for Docker resource. func (obj *DockerContainerRes) CheckApply(ctx context.Context, apply bool) (bool, error) { + + obj.once.Do(func() { close(obj.start) }) // Tell Watch() it's safe to start again. + // Now wait to make sure events are started before we make changes! + select { + case <-obj.ready: + case <-ctx.Done(): // don't block + return false, ctx.Err() + } + var id string var destroy bool var err error diff --git a/engine/resources/docker_image.go b/engine/resources/docker_image.go index 615af1b9..d8edfb94 100644 --- a/engine/resources/docker_image.go +++ b/engine/resources/docker_image.go @@ -37,6 +37,7 @@ import ( "io" "regexp" "strings" + "sync" "github.com/purpleidea/mgmt/engine" "github.com/purpleidea/mgmt/engine/traits" @@ -66,6 +67,11 @@ type DockerImageRes struct { APIVersion string `lang:"apiversion" yaml:"apiversion"` init *engine.Init + + once *sync.Once + start chan struct{} // closes by once + sflag bool // first time happened? + ready chan struct{} // closes by once } // Default returns some sensible defaults for this resource. @@ -102,6 +108,10 @@ func (obj *DockerImageRes) Validate() error { func (obj *DockerImageRes) Init(init *engine.Init) error { obj.init = init // save for later + obj.once = &sync.Once{} + obj.start = make(chan struct{}) + obj.ready = make(chan struct{}) + return nil } @@ -112,18 +122,52 @@ func (obj *DockerImageRes) Cleanup() error { // Watch is the primary listener for this resource and it outputs events. func (obj *DockerImageRes) Watch(ctx context.Context) error { + var client *dockerClient.Client + var err error - // Initialize the docker client. - client, err := dockerClient.NewClientWithOpts(dockerClient.WithVersion(obj.APIVersion)) - if err != nil { + for { + client, err = dockerClient.NewClientWithOpts(dockerClient.WithVersion(obj.APIVersion)) + if err == nil { + // the above won't check the connection, force that here + _, err = client.Ping(ctx) + } + if err == nil { + break + } + // If we didn't connect right away, it might be because we're + // waiting for someone to install the docker package, and start + // the service. We might even have an edge between this resource + // and those dependencies, but that doesn't stop this Watch from + // starting up. As a result, we will wait *once* for CheckApply + // to unlock us, since that runs in dependency order. + // This error looks like: Cannot connect to the Docker daemon at + // unix:///var/run/docker.sock. Is the docker daemon running? + if dockerClient.IsErrConnectionFailed(err) && !obj.sflag { + // notify engine that we're running so that CheckApply + // can start... + obj.init.Running() + select { + case <-obj.start: + obj.sflag = true + continue + + case <-ctx.Done(): // don't block + close(obj.ready) // tell CheckApply to unblock! + return nil + } + } + close(obj.ready) // tell CheckApply to unblock! return errwrap.Wrapf(err, "error creating docker client") } - defer client.Close() + defer client.Close() // success, so close it later eventChan, errChan := client.Events(ctx, types.EventsOptions{}) + close(obj.ready) // tell CheckApply to start now that events are running // notify engine that we're running - obj.init.Running() + if !obj.sflag { + obj.init.Running() + } var send = false // send event? for { @@ -158,6 +202,14 @@ func (obj *DockerImageRes) Watch(ctx context.Context) error { // CheckApply method for Docker resource. func (obj *DockerImageRes) CheckApply(ctx context.Context, apply bool) (checkOK bool, err error) { + obj.once.Do(func() { close(obj.start) }) // Tell Watch() it's safe to start again. + // Now wait to make sure events are started before we make changes! + select { + case <-obj.ready: + case <-ctx.Done(): // don't block + return false, ctx.Err() + } + // Save the full image name and tag. image := dockerImageNameTag(obj.Name())