From 67837a47acc988de664ac92d088ab90a06c5606c Mon Sep 17 00:00:00 2001 From: jonathangold Date: Tue, 12 Dec 2017 19:17:47 -0500 Subject: [PATCH] resources: aws: ec2: Refactor longpollWatch Complete rewrite of longpollWatch() for correctness and maintanability. --- resources/aws_ec2.go | 439 ++++++++++++++++++++----------------------- 1 file changed, 199 insertions(+), 240 deletions(-) diff --git a/resources/aws_ec2.go b/resources/aws_ec2.go index c68470e9..56ce955d 100644 --- a/resources/aws_ec2.go +++ b/resources/aws_ec2.go @@ -90,11 +90,19 @@ const ( CweTargetID = "sns" // CweTargetJSON is the json field that cloudwatch will send to our endpoint so we don't get more than we need. CweTargetJSON = "$.detail" - // waitTimeout is the duration in seconds of the timeout context in CheckApply. - waitTimeout = 400 + // AwsErrExceededWaitAttempts is the awserr.Message() that gets sent with + // the ResourceStateNotReady awserr.Code() when the waiters time out. + AwsErrExceededWaitAttempts = "exceeded wait attempts" // AwsErrIncorrectInstanceState is the error returned when an action // cannot be completed due to the current instance state. AwsErrIncorrectInstanceState = "IncorrectInstanceState" + // waitTimeout is the duration in seconds of the timeout context in CheckApply. + waitTimeout = 400 + // nameKey is the name of the tag key that stores the instance name in ec2.Instance. + // in ec2.Instance + nameKey = "Name" + // nameTag is used to define the name tag. + nameTag = "tag:" + nameKey ) //go:generate stringer -type=awsEc2Event -output=awsec2event_stringer.go @@ -173,6 +181,7 @@ type AwsEc2Res struct { // chanStruct defines the type for a channel used to pass events and errors to watch. type chanStruct struct { event awsEc2Event + state string err error } @@ -381,12 +390,21 @@ func (obj *AwsEc2Res) Watch() error { func (obj *AwsEc2Res) longpollWatch() error { send := false var exit *error + + // We tell the engine that we're running right away. This is not correct, + // but the api doesn't have a way to signal when the waiters are ready. if err := obj.Running(); err != nil { return err } + + // cancellable context used for exiting cleanly + ctx, cancel := context.WithCancel(context.TODO()) + + // clean up when we're done defer obj.wg.Wait() defer close(obj.closeChan) - ctx, cancel := context.WithCancel(context.TODO()) + + // cancel our context if obj.closeChan closes obj.wg.Add(1) go func() { defer obj.wg.Done() @@ -395,139 +413,49 @@ func (obj *AwsEc2Res) longpollWatch() error { cancel() } }() + + // monitor the resource and send the state to the channel obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(obj.awsChan) for { - // get the instance that matches the name specified in - // the definition. Ignore terminated instances. - diInput := &ec2.DescribeInstancesInput{ - Filters: []*ec2.Filter{ - { - Name: aws.String("tag:Name"), - Values: []*string{aws.String(obj.prependName())}, - }, - { - Name: aws.String("instance-state-name"), - Values: []*string{ - aws.String("pending"), - aws.String("running"), - aws.String("stopping"), - aws.String("stopped"), - }, - }, - }, - } - diOutput, err := obj.client.DescribeInstances(diInput) + // get the instance with the name specified in the definition + instance, err := describeInstanceByName(obj.client, obj.prependName()) if err != nil { select { case obj.awsChan <- &chanStruct{ - err: errwrap.Wrapf(err, "error describing instances"), + err: errwrap.Wrapf(err, "error describing instance"), }: case <-obj.closeChan: } return } - if obj.State == "running" { - // Watch the instance if it isn't in a stopped - // state. If we watch instances that are already - // stopped, we will send events continuously. - if len(diOutput.Reservations) == 1 && *diOutput.Reservations[0].Instances[0].State.Name != "stopped" { - waitInput := &ec2.DescribeInstancesInput{ - InstanceIds: []*string{diOutput.Reservations[0].Instances[0].InstanceId}, - Filters: []*ec2.Filter{ - { - Name: aws.String("instance-state-name"), - Values: []*string{ - aws.String("stopped"), - aws.String("terminated"), - }, - }, - }, - } - log.Printf("%s: Watching: %s", obj, *diOutput.Reservations[0].Instances[0].InstanceId) - // Wait for instance to stop or - // terminate concurrently. - event, err := longpollRunningWaiter(ctx, waitInput, obj.client) - if err != nil { - select { - case obj.awsChan <- &chanStruct{ - err: errwrap.Wrapf(err, "internal waiter error"), - }: - case <-obj.closeChan: - } - return - } - select { - case obj.awsChan <- &chanStruct{ - event: event, - }: - case <-obj.closeChan: - return - } - } - } - if obj.State == "stopped" { - // Watch the instance if it isn't in a running - // state. If we watch instances that are already - // running, we will send events continuously. - if len(diOutput.Reservations) == 1 && *diOutput.Reservations[0].Instances[0].State.Name != "running" { - waitInput := &ec2.DescribeInstancesInput{ - InstanceIds: []*string{diOutput.Reservations[0].Instances[0].InstanceId}, - Filters: []*ec2.Filter{ - { - Name: aws.String("instance-state-name"), - Values: []*string{aws.String("running")}, - }, - }, - } - log.Printf("%s: watching: %s", obj, *diOutput.Reservations[0].Instances[0].InstanceId) - event, err := longpollStoppedWaiter(ctx, waitInput, obj.client) - if err != nil { - select { - case obj.awsChan <- &chanStruct{ - err: errwrap.Wrapf(err, "unknown waiter error"), - }: - case <-obj.closeChan: - } - return - } - select { - case obj.awsChan <- &chanStruct{ - event: event, - }: - case <-obj.closeChan: - return - } - } - } - if obj.State == "terminated" { - event, err := longpollTerminatedWaiter(ctx, diInput, obj.client) - if err != nil { - select { - case obj.awsChan <- &chanStruct{ - err: errwrap.Wrapf(err, "unknown waiter error"), - }: - case <-obj.closeChan: - } - return - } + + // wait for the instance state to change + state, err := stateWaiter(ctx, instance, obj.client) + if err != nil { select { case obj.awsChan <- &chanStruct{ - event: event, + err: errwrap.Wrapf(err, "waiter error"), }: case <-obj.closeChan: - return } + return } + + // send the state to the event processing loop select { + case obj.awsChan <- &chanStruct{ + state: state, + }: case <-obj.closeChan: return - default: } } }() + + // process events from the goroutine for { select { case event := <-obj.Events(): @@ -541,12 +469,15 @@ func (obj *AwsEc2Res) longpollWatch() error { if err := msg.err; err != nil { return err } - if msg.event == awsEc2EventNone { + switch msg.state { + // send events to the engine, except empty and transitional states + case "", ec2.InstanceStateNamePending, ec2.InstanceStateNameStopping: continue + default: + log.Printf("%s: State: %v", obj, msg.state) + obj.StateOK(false) + send = true } - log.Printf("%s: State: %v", obj, msg.event) - obj.StateOK(false) - send = true } if send { send = false @@ -555,6 +486,93 @@ func (obj *AwsEc2Res) longpollWatch() error { } } +// stateWaiter waits for an instance to change state and returns the new state. +func stateWaiter(ctx context.Context, instance *ec2.Instance, c *ec2.EC2) (string, error) { + var err error + var name string + + // these cases are not permitted + if instance == nil { + return "", fmt.Errorf("nil instance") + } + if aws.StringValue(instance.State.Name) == "" { + return "", fmt.Errorf("nil or empty state") + } + + // get the instance name + for _, tag := range instance.Tags { + if aws.StringValue(tag.Key) == nameKey { + name = aws.StringValue(tag.Value) + } + } + // error if we didn't find one + if name == "" { + return "", fmt.Errorf("name not found") + } + + // build the input for the waiters + waitInput := &ec2.DescribeInstancesInput{ + InstanceIds: []*string{instance.InstanceId}, + Filters: []*ec2.Filter{ + { + Name: aws.String(nameTag), + Values: []*string{aws.String(name)}, + }, + }, + } + // When we are watching terminated instances and waiting for them to exist, + // we must exclude terminated instances from the waiter input. If we don't, + // the waiter will return even if it finds a terminated instance, which is + // not what we want. + existWaiterFilter := &ec2.Filter{ + Name: aws.String("instance-state-name"), + Values: []*string{ + aws.String(ec2.InstanceStateNameRunning), + aws.String(ec2.InstanceStateNameStopped), + }, + } + // Select the appropriate waiter based on the instance state. There are + // five possible states and we will catch every pertinent state change + // (excluding transitional states) by waiting for the next state in the + // instance's lifecycle. For more information about the lifecycle, see: + // http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html + switch aws.StringValue(instance.State.Name) { + case ec2.InstanceStateNameRunning, ec2.InstanceStateNameStopping: + err = c.WaitUntilInstanceStoppedWithContext(ctx, waitInput) + case ec2.InstanceStateNameStopped, ec2.InstanceStateNamePending: + err = c.WaitUntilInstanceRunningWithContext(ctx, waitInput) + case ec2.InstanceStateNameTerminated: + waitInput.Filters = append(waitInput.Filters, existWaiterFilter) + err = c.WaitUntilInstanceExistsWithContext(ctx, waitInput) + default: + return "", fmt.Errorf("unrecognized instance state: %s", aws.StringValue(instance.State.Name)) + } + if err != nil { + aerr, ok := err.(awserr.Error) + if !ok { + return "", errwrap.Wrapf(err, "error casting awserr") + } + // ignore these errors + if aerr.Code() != request.CanceledErrorCode && aerr.Code() != request.WaiterResourceNotReadyErrorCode { + return "", errwrap.Wrapf(err, "internal waiter error") + } + // If the waiter returns, because it has exceeded the maximum number of + // attempts we return an empty state, which the event processing loop + // ignores, and the longpollWatch goroutine will loop and restart + // the waiter. + if aerr.Message() == AwsErrExceededWaitAttempts { + return "", nil + } + } + + // return the instance state + instance, err = describeInstanceByName(c, name) + if err != nil { + return "", errwrap.Wrapf(err, "error describing instances") + } + return aws.StringValue(instance.State.Name), nil +} + // snsWatch uses amazon's SNS and CloudWatchEvents APIs to get instance state- // change notifications pushed to the http endpoint (snsServer) set up below. // In Init() a CloudWatch rule is created along with a corresponding SNS topic @@ -912,143 +930,84 @@ func (obj *AwsEc2Res) prependName() string { return AwsPrefix + obj.GetName() } -// longpollRunningWaiter waits for the instance to stop and waits for it to -// terminate. If either waiter returns, the instance state is checked, and an -// awsEc2Event is returned. -func longpollRunningWaiter(ctx context.Context, waitInput *ec2.DescribeInstancesInput, c *ec2.EC2) (awsEc2Event, error) { - if err := waitUntilInstanceStoppedOrTerminatedWithContext(ctx, waitInput, c); err != nil { - return awsEc2EventNone, errwrap.Wrapf(err, "error waiting for instance to stop or terminate") +// describeInstanceByName takes an ec2 client session and an instance name, and +// returns a *ec2.Instance or an error. +func describeInstanceByName(c *ec2.EC2, name string) (*ec2.Instance, error) { + // get any instance with the specified name, that isn't terminated. + diInput := &ec2.DescribeInstancesInput{ + Filters: []*ec2.Filter{ + { + Name: aws.String(nameTag), + Values: []*string{aws.String(name)}, + }, + { + Name: aws.String("instance-state-name"), + Values: []*string{ + aws.String(ec2.InstanceStateNameRunning), + aws.String(ec2.InstanceStateNamePending), + aws.String(ec2.InstanceStateNameStopped), + aws.String(ec2.InstanceStateNameStopping), + }, + }, + }, } - // Check the instance state, and return the appropriate event. - stateOutput, err := c.DescribeInstances(waitInput) + diOutput, err := c.DescribeInstances(diInput) if err != nil { - return awsEc2EventNone, errwrap.Wrapf(err, "error describing instances") + return nil, errwrap.Wrapf(err, "error describing instances") } - if len(stateOutput.Reservations) == 1 { - switch *stateOutput.Reservations[0].Instances[0].State.Name { - case "stopped": - return awsEc2EventInstanceStopped, nil - case "terminated": - return awsEc2EventInstanceTerminated, nil - } + + // error if we get more than one reservation. + if len(diOutput.Reservations) > 1 { + return nil, fmt.Errorf("too many reservations") } - return awsEc2EventNone, nil + // error if we got a reservation without exactly one instance. + if len(diOutput.Reservations) != 0 && len(diOutput.Reservations[0].Instances) != 1 { + return nil, fmt.Errorf("wrong number of instances") + } + + // if we didn't find an instance, we consider it 'terminated'. + if len(diOutput.Reservations) == 0 { + return &ec2.Instance{ + State: &ec2.InstanceState{ + Name: aws.String(ec2.InstanceStateNameTerminated), + }, + Tags: []*ec2.Tag{ + { + Key: aws.String(nameKey), + Value: aws.String(name), + }, + }, + }, nil + } + + return diOutput.Reservations[0].Instances[0], nil } -// longpollStoppedWaiter waits for the instance to be in a running state. When -// it returns, the instance state is checked, and an awsEc2Event is returned. -func longpollStoppedWaiter(ctx context.Context, waitInput *ec2.DescribeInstancesInput, c *ec2.EC2) (awsEc2Event, error) { - if err := c.WaitUntilInstanceRunningWithContext(ctx, waitInput); err != nil { - if aerr, ok := err.(awserr.Error); ok { - if aerr.Code() == request.CanceledErrorCode || aerr.Code() == request.WaiterResourceNotReadyErrorCode { - err = nil // we want to ignore these kinds of errors - } - } - return awsEc2EventNone, errwrap.Wrapf(err, "error waiting for instance to run") +// describeInstanceByID takes an ec2 client session and a pointer to an +// instanceID, and returns an *ec2.Instance or an error. +func describeInstanceByID(c *ec2.EC2, instanceID *string) (*ec2.Instance, error) { + if instanceID == nil { + return nil, fmt.Errorf("instanceID is nil") } - // Check the instance state, and return the appropriate event. - stateOutput, err := c.DescribeInstances(waitInput) + + // get any instance with the specified instanceID. + diInput := &ec2.DescribeInstancesInput{ + InstanceIds: []*string{instanceID}, + } + diOutput, err := c.DescribeInstances(diInput) if err != nil { - return awsEc2EventNone, errwrap.Wrapf(err, "error describing instances") + return nil, errwrap.Wrapf(err, "error describing instances") } - if len(stateOutput.Reservations) == 1 && *stateOutput.Reservations[0].Instances[0].State.Name == "running" { - return awsEc2EventInstanceRunning, nil + + // error if we didn't find exactly one reservation with one instance. + if len(diOutput.Reservations) != 1 { + return nil, fmt.Errorf("wrong number of reservations") } - return awsEc2EventNone, nil -} - -// longpollTerminatedWaiter waits for an instance matching the definition to -// exist. When it returns, if the instance exists, an awsEc2Event is returned. -func longpollTerminatedWaiter(ctx context.Context, waitInput *ec2.DescribeInstancesInput, c *ec2.EC2) (awsEc2Event, error) { - if err := c.WaitUntilInstanceExistsWithContext(ctx, waitInput); err != nil { - if aerr, ok := err.(awserr.Error); ok { - if aerr.Code() == request.CanceledErrorCode || aerr.Code() == request.WaiterResourceNotReadyErrorCode { - err = nil // we want to ignore these kinds of errors - } - } - return awsEc2EventNone, errwrap.Wrapf(err, "error waiting for instance to exist") + if len(diOutput.Reservations[0].Instances) != 1 { + return nil, fmt.Errorf("wrong number of instances") } - // Check the instance state, and return the appropriate event. - stateOutput, err := c.DescribeInstances(waitInput) - if err != nil { - return awsEc2EventNone, errwrap.Wrapf(err, "error describing instances") - } - if len(stateOutput.Reservations) != 0 { - return awsEc2EventInstanceExists, nil - } - return awsEc2EventNone, nil -} -// waitUntilInstanceStoppedOrTerminatedWithContext combines the two waiters -// required to trigger events if a running instance is stopped or terminated. -// This function is needed, because the AWS api only provides waiters for one -// or the other. -func waitUntilInstanceStoppedOrTerminatedWithContext(ctx context.Context, waitInput *ec2.DescribeInstancesInput, c *ec2.EC2) error { - errChan := make(chan error) - defer close(errChan) // unnecessary, but nice to have - wg := sync.WaitGroup{} - defer wg.Wait() - - closeChan := make(chan struct{}) - innerCtx, cancel := context.WithCancel(context.TODO()) // uncoupled!! - - once := &sync.Once{} - closer := func() { - cancel() - close(closeChan) - } - defer once.Do(closer) // needed if we exit below due to ctx being cancelled - - wg.Add(1) - go func() { - defer wg.Done() - defer once.Do(closer) - - err := c.WaitUntilInstanceStoppedWithContext(innerCtx, waitInput) - if err != nil { - if aerr, ok := err.(awserr.Error); ok { - if aerr.Code() == request.CanceledErrorCode || aerr.Code() == request.WaiterResourceNotReadyErrorCode { - err = nil // we want to ignore these kinds of errors - } - } - } - - select { - case errChan <- errwrap.Wrapf(err, "unknown error waiting for instance to stop"): - case <-closeChan: - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - defer once.Do(closer) - - err := c.WaitUntilInstanceTerminatedWithContext(innerCtx, waitInput) - if err != nil { - if aerr, ok := err.(awserr.Error); ok { - if aerr.Code() == request.CanceledErrorCode || aerr.Code() == request.WaiterResourceNotReadyErrorCode { - err = nil - } - } - } - - select { - case errChan <- errwrap.Wrapf(err, "unknown error waiting for instance to terminate"): - case <-closeChan: - } - }() - - select { - case err, ok := <-errChan: - if !ok { - return fmt.Errorf("channel closed unexpectedly") - } - return err // return either nil or an error - case <-ctx.Done(): // if ctx is canceled, we need to transmit that error - // TODO: should we instead use the aws context copy and request.CanceledErrorCode ? - return ctx.Err() - } + return diOutput.Reservations[0].Instances[0], nil } // snsListener returns a listener bound to listenAddr.