resources: aws: ec2: Refactor longpollWatch

Complete rewrite of longpollWatch() for correctness and maintanability.
This commit is contained in:
jonathangold
2017-12-12 19:17:47 -05:00
committed by James Shubin
parent 32e3c4e029
commit 67837a47ac

View File

@@ -90,11 +90,19 @@ const (
CweTargetID = "sns"
// CweTargetJSON is the json field that cloudwatch will send to our endpoint so we don't get more than we need.
CweTargetJSON = "$.detail"
// waitTimeout is the duration in seconds of the timeout context in CheckApply.
waitTimeout = 400
// AwsErrExceededWaitAttempts is the awserr.Message() that gets sent with
// the ResourceStateNotReady awserr.Code() when the waiters time out.
AwsErrExceededWaitAttempts = "exceeded wait attempts"
// AwsErrIncorrectInstanceState is the error returned when an action
// cannot be completed due to the current instance state.
AwsErrIncorrectInstanceState = "IncorrectInstanceState"
// waitTimeout is the duration in seconds of the timeout context in CheckApply.
waitTimeout = 400
// nameKey is the name of the tag key that stores the instance name in ec2.Instance.
// in ec2.Instance
nameKey = "Name"
// nameTag is used to define the name tag.
nameTag = "tag:" + nameKey
)
//go:generate stringer -type=awsEc2Event -output=awsec2event_stringer.go
@@ -173,6 +181,7 @@ type AwsEc2Res struct {
// chanStruct defines the type for a channel used to pass events and errors to watch.
type chanStruct struct {
event awsEc2Event
state string
err error
}
@@ -381,12 +390,21 @@ func (obj *AwsEc2Res) Watch() error {
func (obj *AwsEc2Res) longpollWatch() error {
send := false
var exit *error
// We tell the engine that we're running right away. This is not correct,
// but the api doesn't have a way to signal when the waiters are ready.
if err := obj.Running(); err != nil {
return err
}
// cancellable context used for exiting cleanly
ctx, cancel := context.WithCancel(context.TODO())
// clean up when we're done
defer obj.wg.Wait()
defer close(obj.closeChan)
ctx, cancel := context.WithCancel(context.TODO())
// cancel our context if obj.closeChan closes
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
@@ -395,139 +413,49 @@ func (obj *AwsEc2Res) longpollWatch() error {
cancel()
}
}()
// monitor the resource and send the state to the channel
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
defer close(obj.awsChan)
for {
// get the instance that matches the name specified in
// the definition. Ignore terminated instances.
diInput := &ec2.DescribeInstancesInput{
Filters: []*ec2.Filter{
{
Name: aws.String("tag:Name"),
Values: []*string{aws.String(obj.prependName())},
},
{
Name: aws.String("instance-state-name"),
Values: []*string{
aws.String("pending"),
aws.String("running"),
aws.String("stopping"),
aws.String("stopped"),
},
},
},
}
diOutput, err := obj.client.DescribeInstances(diInput)
// get the instance with the name specified in the definition
instance, err := describeInstanceByName(obj.client, obj.prependName())
if err != nil {
select {
case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "error describing instances"),
err: errwrap.Wrapf(err, "error describing instance"),
}:
case <-obj.closeChan:
}
return
}
if obj.State == "running" {
// Watch the instance if it isn't in a stopped
// state. If we watch instances that are already
// stopped, we will send events continuously.
if len(diOutput.Reservations) == 1 && *diOutput.Reservations[0].Instances[0].State.Name != "stopped" {
waitInput := &ec2.DescribeInstancesInput{
InstanceIds: []*string{diOutput.Reservations[0].Instances[0].InstanceId},
Filters: []*ec2.Filter{
{
Name: aws.String("instance-state-name"),
Values: []*string{
aws.String("stopped"),
aws.String("terminated"),
},
},
},
}
log.Printf("%s: Watching: %s", obj, *diOutput.Reservations[0].Instances[0].InstanceId)
// Wait for instance to stop or
// terminate concurrently.
event, err := longpollRunningWaiter(ctx, waitInput, obj.client)
// wait for the instance state to change
state, err := stateWaiter(ctx, instance, obj.client)
if err != nil {
select {
case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "internal waiter error"),
err: errwrap.Wrapf(err, "waiter error"),
}:
case <-obj.closeChan:
}
return
}
// send the state to the event processing loop
select {
case obj.awsChan <- &chanStruct{
event: event,
state: state,
}:
case <-obj.closeChan:
return
}
}
}
if obj.State == "stopped" {
// Watch the instance if it isn't in a running
// state. If we watch instances that are already
// running, we will send events continuously.
if len(diOutput.Reservations) == 1 && *diOutput.Reservations[0].Instances[0].State.Name != "running" {
waitInput := &ec2.DescribeInstancesInput{
InstanceIds: []*string{diOutput.Reservations[0].Instances[0].InstanceId},
Filters: []*ec2.Filter{
{
Name: aws.String("instance-state-name"),
Values: []*string{aws.String("running")},
},
},
}
log.Printf("%s: watching: %s", obj, *diOutput.Reservations[0].Instances[0].InstanceId)
event, err := longpollStoppedWaiter(ctx, waitInput, obj.client)
if err != nil {
select {
case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "unknown waiter error"),
}:
case <-obj.closeChan:
}
return
}
select {
case obj.awsChan <- &chanStruct{
event: event,
}:
case <-obj.closeChan:
return
}
}
}
if obj.State == "terminated" {
event, err := longpollTerminatedWaiter(ctx, diInput, obj.client)
if err != nil {
select {
case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "unknown waiter error"),
}:
case <-obj.closeChan:
}
return
}
select {
case obj.awsChan <- &chanStruct{
event: event,
}:
case <-obj.closeChan:
return
}
}
select {
case <-obj.closeChan:
return
default:
}
}
}()
// process events from the goroutine
for {
select {
case event := <-obj.Events():
@@ -541,13 +469,16 @@ func (obj *AwsEc2Res) longpollWatch() error {
if err := msg.err; err != nil {
return err
}
if msg.event == awsEc2EventNone {
switch msg.state {
// send events to the engine, except empty and transitional states
case "", ec2.InstanceStateNamePending, ec2.InstanceStateNameStopping:
continue
}
log.Printf("%s: State: %v", obj, msg.event)
default:
log.Printf("%s: State: %v", obj, msg.state)
obj.StateOK(false)
send = true
}
}
if send {
send = false
obj.Event()
@@ -555,6 +486,93 @@ func (obj *AwsEc2Res) longpollWatch() error {
}
}
// stateWaiter waits for an instance to change state and returns the new state.
func stateWaiter(ctx context.Context, instance *ec2.Instance, c *ec2.EC2) (string, error) {
var err error
var name string
// these cases are not permitted
if instance == nil {
return "", fmt.Errorf("nil instance")
}
if aws.StringValue(instance.State.Name) == "" {
return "", fmt.Errorf("nil or empty state")
}
// get the instance name
for _, tag := range instance.Tags {
if aws.StringValue(tag.Key) == nameKey {
name = aws.StringValue(tag.Value)
}
}
// error if we didn't find one
if name == "" {
return "", fmt.Errorf("name not found")
}
// build the input for the waiters
waitInput := &ec2.DescribeInstancesInput{
InstanceIds: []*string{instance.InstanceId},
Filters: []*ec2.Filter{
{
Name: aws.String(nameTag),
Values: []*string{aws.String(name)},
},
},
}
// When we are watching terminated instances and waiting for them to exist,
// we must exclude terminated instances from the waiter input. If we don't,
// the waiter will return even if it finds a terminated instance, which is
// not what we want.
existWaiterFilter := &ec2.Filter{
Name: aws.String("instance-state-name"),
Values: []*string{
aws.String(ec2.InstanceStateNameRunning),
aws.String(ec2.InstanceStateNameStopped),
},
}
// Select the appropriate waiter based on the instance state. There are
// five possible states and we will catch every pertinent state change
// (excluding transitional states) by waiting for the next state in the
// instance's lifecycle. For more information about the lifecycle, see:
// http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html
switch aws.StringValue(instance.State.Name) {
case ec2.InstanceStateNameRunning, ec2.InstanceStateNameStopping:
err = c.WaitUntilInstanceStoppedWithContext(ctx, waitInput)
case ec2.InstanceStateNameStopped, ec2.InstanceStateNamePending:
err = c.WaitUntilInstanceRunningWithContext(ctx, waitInput)
case ec2.InstanceStateNameTerminated:
waitInput.Filters = append(waitInput.Filters, existWaiterFilter)
err = c.WaitUntilInstanceExistsWithContext(ctx, waitInput)
default:
return "", fmt.Errorf("unrecognized instance state: %s", aws.StringValue(instance.State.Name))
}
if err != nil {
aerr, ok := err.(awserr.Error)
if !ok {
return "", errwrap.Wrapf(err, "error casting awserr")
}
// ignore these errors
if aerr.Code() != request.CanceledErrorCode && aerr.Code() != request.WaiterResourceNotReadyErrorCode {
return "", errwrap.Wrapf(err, "internal waiter error")
}
// If the waiter returns, because it has exceeded the maximum number of
// attempts we return an empty state, which the event processing loop
// ignores, and the longpollWatch goroutine will loop and restart
// the waiter.
if aerr.Message() == AwsErrExceededWaitAttempts {
return "", nil
}
}
// return the instance state
instance, err = describeInstanceByName(c, name)
if err != nil {
return "", errwrap.Wrapf(err, "error describing instances")
}
return aws.StringValue(instance.State.Name), nil
}
// snsWatch uses amazon's SNS and CloudWatchEvents APIs to get instance state-
// change notifications pushed to the http endpoint (snsServer) set up below.
// In Init() a CloudWatch rule is created along with a corresponding SNS topic
@@ -912,143 +930,84 @@ func (obj *AwsEc2Res) prependName() string {
return AwsPrefix + obj.GetName()
}
// longpollRunningWaiter waits for the instance to stop and waits for it to
// terminate. If either waiter returns, the instance state is checked, and an
// awsEc2Event is returned.
func longpollRunningWaiter(ctx context.Context, waitInput *ec2.DescribeInstancesInput, c *ec2.EC2) (awsEc2Event, error) {
if err := waitUntilInstanceStoppedOrTerminatedWithContext(ctx, waitInput, c); err != nil {
return awsEc2EventNone, errwrap.Wrapf(err, "error waiting for instance to stop or terminate")
// describeInstanceByName takes an ec2 client session and an instance name, and
// returns a *ec2.Instance or an error.
func describeInstanceByName(c *ec2.EC2, name string) (*ec2.Instance, error) {
// get any instance with the specified name, that isn't terminated.
diInput := &ec2.DescribeInstancesInput{
Filters: []*ec2.Filter{
{
Name: aws.String(nameTag),
Values: []*string{aws.String(name)},
},
{
Name: aws.String("instance-state-name"),
Values: []*string{
aws.String(ec2.InstanceStateNameRunning),
aws.String(ec2.InstanceStateNamePending),
aws.String(ec2.InstanceStateNameStopped),
aws.String(ec2.InstanceStateNameStopping),
},
},
},
}
// Check the instance state, and return the appropriate event.
stateOutput, err := c.DescribeInstances(waitInput)
diOutput, err := c.DescribeInstances(diInput)
if err != nil {
return awsEc2EventNone, errwrap.Wrapf(err, "error describing instances")
}
if len(stateOutput.Reservations) == 1 {
switch *stateOutput.Reservations[0].Instances[0].State.Name {
case "stopped":
return awsEc2EventInstanceStopped, nil
case "terminated":
return awsEc2EventInstanceTerminated, nil
}
}
return awsEc2EventNone, nil
return nil, errwrap.Wrapf(err, "error describing instances")
}
// longpollStoppedWaiter waits for the instance to be in a running state. When
// it returns, the instance state is checked, and an awsEc2Event is returned.
func longpollStoppedWaiter(ctx context.Context, waitInput *ec2.DescribeInstancesInput, c *ec2.EC2) (awsEc2Event, error) {
if err := c.WaitUntilInstanceRunningWithContext(ctx, waitInput); err != nil {
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == request.CanceledErrorCode || aerr.Code() == request.WaiterResourceNotReadyErrorCode {
err = nil // we want to ignore these kinds of errors
// error if we get more than one reservation.
if len(diOutput.Reservations) > 1 {
return nil, fmt.Errorf("too many reservations")
}
// error if we got a reservation without exactly one instance.
if len(diOutput.Reservations) != 0 && len(diOutput.Reservations[0].Instances) != 1 {
return nil, fmt.Errorf("wrong number of instances")
}
return awsEc2EventNone, errwrap.Wrapf(err, "error waiting for instance to run")
// if we didn't find an instance, we consider it 'terminated'.
if len(diOutput.Reservations) == 0 {
return &ec2.Instance{
State: &ec2.InstanceState{
Name: aws.String(ec2.InstanceStateNameTerminated),
},
Tags: []*ec2.Tag{
{
Key: aws.String(nameKey),
Value: aws.String(name),
},
},
}, nil
}
// Check the instance state, and return the appropriate event.
stateOutput, err := c.DescribeInstances(waitInput)
return diOutput.Reservations[0].Instances[0], nil
}
// describeInstanceByID takes an ec2 client session and a pointer to an
// instanceID, and returns an *ec2.Instance or an error.
func describeInstanceByID(c *ec2.EC2, instanceID *string) (*ec2.Instance, error) {
if instanceID == nil {
return nil, fmt.Errorf("instanceID is nil")
}
// get any instance with the specified instanceID.
diInput := &ec2.DescribeInstancesInput{
InstanceIds: []*string{instanceID},
}
diOutput, err := c.DescribeInstances(diInput)
if err != nil {
return awsEc2EventNone, errwrap.Wrapf(err, "error describing instances")
}
if len(stateOutput.Reservations) == 1 && *stateOutput.Reservations[0].Instances[0].State.Name == "running" {
return awsEc2EventInstanceRunning, nil
}
return awsEc2EventNone, nil
return nil, errwrap.Wrapf(err, "error describing instances")
}
// longpollTerminatedWaiter waits for an instance matching the definition to
// exist. When it returns, if the instance exists, an awsEc2Event is returned.
func longpollTerminatedWaiter(ctx context.Context, waitInput *ec2.DescribeInstancesInput, c *ec2.EC2) (awsEc2Event, error) {
if err := c.WaitUntilInstanceExistsWithContext(ctx, waitInput); err != nil {
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == request.CanceledErrorCode || aerr.Code() == request.WaiterResourceNotReadyErrorCode {
err = nil // we want to ignore these kinds of errors
// error if we didn't find exactly one reservation with one instance.
if len(diOutput.Reservations) != 1 {
return nil, fmt.Errorf("wrong number of reservations")
}
}
return awsEc2EventNone, errwrap.Wrapf(err, "error waiting for instance to exist")
}
// Check the instance state, and return the appropriate event.
stateOutput, err := c.DescribeInstances(waitInput)
if err != nil {
return awsEc2EventNone, errwrap.Wrapf(err, "error describing instances")
}
if len(stateOutput.Reservations) != 0 {
return awsEc2EventInstanceExists, nil
}
return awsEc2EventNone, nil
if len(diOutput.Reservations[0].Instances) != 1 {
return nil, fmt.Errorf("wrong number of instances")
}
// waitUntilInstanceStoppedOrTerminatedWithContext combines the two waiters
// required to trigger events if a running instance is stopped or terminated.
// This function is needed, because the AWS api only provides waiters for one
// or the other.
func waitUntilInstanceStoppedOrTerminatedWithContext(ctx context.Context, waitInput *ec2.DescribeInstancesInput, c *ec2.EC2) error {
errChan := make(chan error)
defer close(errChan) // unnecessary, but nice to have
wg := sync.WaitGroup{}
defer wg.Wait()
closeChan := make(chan struct{})
innerCtx, cancel := context.WithCancel(context.TODO()) // uncoupled!!
once := &sync.Once{}
closer := func() {
cancel()
close(closeChan)
}
defer once.Do(closer) // needed if we exit below due to ctx being cancelled
wg.Add(1)
go func() {
defer wg.Done()
defer once.Do(closer)
err := c.WaitUntilInstanceStoppedWithContext(innerCtx, waitInput)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == request.CanceledErrorCode || aerr.Code() == request.WaiterResourceNotReadyErrorCode {
err = nil // we want to ignore these kinds of errors
}
}
}
select {
case errChan <- errwrap.Wrapf(err, "unknown error waiting for instance to stop"):
case <-closeChan:
}
}()
wg.Add(1)
go func() {
defer wg.Done()
defer once.Do(closer)
err := c.WaitUntilInstanceTerminatedWithContext(innerCtx, waitInput)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == request.CanceledErrorCode || aerr.Code() == request.WaiterResourceNotReadyErrorCode {
err = nil
}
}
}
select {
case errChan <- errwrap.Wrapf(err, "unknown error waiting for instance to terminate"):
case <-closeChan:
}
}()
select {
case err, ok := <-errChan:
if !ok {
return fmt.Errorf("channel closed unexpectedly")
}
return err // return either nil or an error
case <-ctx.Done(): // if ctx is canceled, we need to transmit that error
// TODO: should we instead use the aws context copy and request.CanceledErrorCode ?
return ctx.Err()
}
return diOutput.Reservations[0].Instances[0], nil
}
// snsListener returns a listener bound to listenAddr.