From 32e3c4e029e6ace755f4d71f97c693a42778d640 Mon Sep 17 00:00:00 2001 From: Jonathan Gold Date: Wed, 6 Dec 2017 07:15:19 -0500 Subject: [PATCH] resources: aws: ec2: Refactor longpollWatch This patch simplifies longpollwatch by getting rid of some unnecessary api calls and breaking the waiters out into their own functions. --- resources/aws_ec2.go | 182 +++++++++++++++++-------------------------- 1 file changed, 71 insertions(+), 111 deletions(-) diff --git a/resources/aws_ec2.go b/resources/aws_ec2.go index dc8c50ac..c68470e9 100644 --- a/resources/aws_ec2.go +++ b/resources/aws_ec2.go @@ -400,6 +400,8 @@ func (obj *AwsEc2Res) longpollWatch() error { defer obj.wg.Done() defer close(obj.awsChan) for { + // get the instance that matches the name specified in + // the definition. Ignore terminated instances. diInput := &ec2.DescribeInstancesInput{ Filters: []*ec2.Filter{ { @@ -428,31 +430,10 @@ func (obj *AwsEc2Res) longpollWatch() error { return } if obj.State == "running" { - stoppedInput := &ec2.DescribeInstancesInput{ - Filters: []*ec2.Filter{ - { - Name: aws.String("tag:Name"), - Values: []*string{aws.String(obj.prependName())}, - }, - { - Name: aws.String("instance-state-name"), - Values: []*string{ - aws.String("stopped"), - }, - }, - }, - } - stoppedOutput, err := obj.client.DescribeInstances(stoppedInput) - if err != nil { - select { - case obj.awsChan <- &chanStruct{ - err: errwrap.Wrapf(err, "error describing instances"), - }: - case <-obj.closeChan: - } - return - } - if len(diOutput.Reservations) == 1 && len(stoppedOutput.Reservations) == 0 { + // Watch the instance if it isn't in a stopped + // state. If we watch instances that are already + // stopped, we will send events continuously. + if len(diOutput.Reservations) == 1 && *diOutput.Reservations[0].Instances[0].State.Name != "stopped" { waitInput := &ec2.DescribeInstancesInput{ InstanceIds: []*string{diOutput.Reservations[0].Instances[0].InstanceId}, Filters: []*ec2.Filter{ @@ -483,36 +464,15 @@ func (obj *AwsEc2Res) longpollWatch() error { event: event, }: case <-obj.closeChan: + return } - return } } if obj.State == "stopped" { - runningInput := &ec2.DescribeInstancesInput{ - Filters: []*ec2.Filter{ - { - Name: aws.String("tag:Name"), - Values: []*string{aws.String(obj.prependName())}, - }, - { - Name: aws.String("instance-state-name"), - Values: []*string{ - aws.String("running"), - }, - }, - }, - } - runningOutput, err := obj.client.DescribeInstances(runningInput) - if err != nil { - select { - case obj.awsChan <- &chanStruct{ - err: errwrap.Wrapf(err, "error describing instances"), - }: - case <-obj.closeChan: - } - return - } - if len(diOutput.Reservations) == 1 && len(runningOutput.Reservations) == 0 { + // Watch the instance if it isn't in a running + // state. If we watch instances that are already + // running, we will send events continuously. + if len(diOutput.Reservations) == 1 && *diOutput.Reservations[0].Instances[0].State.Name != "running" { waitInput := &ec2.DescribeInstancesInput{ InstanceIds: []*string{diOutput.Reservations[0].Instances[0].InstanceId}, Filters: []*ec2.Filter{ @@ -523,86 +483,42 @@ func (obj *AwsEc2Res) longpollWatch() error { }, } log.Printf("%s: watching: %s", obj, *diOutput.Reservations[0].Instances[0].InstanceId) - if err := obj.client.WaitUntilInstanceRunningWithContext(ctx, waitInput); err != nil { - if aerr, ok := err.(awserr.Error); ok { - if aerr.Code() == request.CanceledErrorCode { - log.Printf("%s: Request cancelled", obj) - } - if aerr.Code() == request.WaiterResourceNotReadyErrorCode { - continue - } - } - select { - case obj.awsChan <- &chanStruct{ - err: errwrap.Wrapf(err, "unknown error waiting for instance to start"), - }: - case <-obj.closeChan: - } - return - } - stateOutput, err := obj.client.DescribeInstances(diInput) + event, err := longpollStoppedWaiter(ctx, waitInput, obj.client) if err != nil { select { case obj.awsChan <- &chanStruct{ - err: errwrap.Wrapf(err, "error describing instances"), + err: errwrap.Wrapf(err, "unknown waiter error"), }: case <-obj.closeChan: } return } - var stateName string - if len(stateOutput.Reservations) == 1 { - stateName = *stateOutput.Reservations[0].Instances[0].State.Name - } - if len(stateOutput.Reservations) == 0 || (len(stateOutput.Reservations) == 1 && stateName != "stopped") { - select { - case obj.awsChan <- &chanStruct{ - event: awsEc2EventInstanceRunning, - }: - case <-obj.closeChan: - return - } + select { + case obj.awsChan <- &chanStruct{ + event: event, + }: + case <-obj.closeChan: + return } } } if obj.State == "terminated" { - if err := obj.client.WaitUntilInstanceExistsWithContext(ctx, diInput); err != nil { - if aerr, ok := err.(awserr.Error); ok { - if aerr.Code() == request.CanceledErrorCode { - log.Printf("%s: Request cancelled", obj) - } - if aerr.Code() == request.WaiterResourceNotReadyErrorCode { - continue - } - } - select { - case obj.awsChan <- &chanStruct{ - err: errwrap.Wrapf(err, "unknown error waiting for instance to exist"), - }: - case <-obj.closeChan: - } - return - } - stateOutput, err := obj.client.DescribeInstances(diInput) + event, err := longpollTerminatedWaiter(ctx, diInput, obj.client) if err != nil { select { case obj.awsChan <- &chanStruct{ - err: errwrap.Wrapf(err, "error describing instances"), + err: errwrap.Wrapf(err, "unknown waiter error"), }: case <-obj.closeChan: } return } - if len(stateOutput.Reservations) == 1 { - { - select { - case obj.awsChan <- &chanStruct{ - event: awsEc2EventInstanceExists, - }: - case <-obj.closeChan: - return - } - } + select { + case obj.awsChan <- &chanStruct{ + event: event, + }: + case <-obj.closeChan: + return } } select { @@ -1019,6 +935,50 @@ func longpollRunningWaiter(ctx context.Context, waitInput *ec2.DescribeInstances return awsEc2EventNone, nil } +// longpollStoppedWaiter waits for the instance to be in a running state. When +// it returns, the instance state is checked, and an awsEc2Event is returned. +func longpollStoppedWaiter(ctx context.Context, waitInput *ec2.DescribeInstancesInput, c *ec2.EC2) (awsEc2Event, error) { + if err := c.WaitUntilInstanceRunningWithContext(ctx, waitInput); err != nil { + if aerr, ok := err.(awserr.Error); ok { + if aerr.Code() == request.CanceledErrorCode || aerr.Code() == request.WaiterResourceNotReadyErrorCode { + err = nil // we want to ignore these kinds of errors + } + } + return awsEc2EventNone, errwrap.Wrapf(err, "error waiting for instance to run") + } + // Check the instance state, and return the appropriate event. + stateOutput, err := c.DescribeInstances(waitInput) + if err != nil { + return awsEc2EventNone, errwrap.Wrapf(err, "error describing instances") + } + if len(stateOutput.Reservations) == 1 && *stateOutput.Reservations[0].Instances[0].State.Name == "running" { + return awsEc2EventInstanceRunning, nil + } + return awsEc2EventNone, nil +} + +// longpollTerminatedWaiter waits for an instance matching the definition to +// exist. When it returns, if the instance exists, an awsEc2Event is returned. +func longpollTerminatedWaiter(ctx context.Context, waitInput *ec2.DescribeInstancesInput, c *ec2.EC2) (awsEc2Event, error) { + if err := c.WaitUntilInstanceExistsWithContext(ctx, waitInput); err != nil { + if aerr, ok := err.(awserr.Error); ok { + if aerr.Code() == request.CanceledErrorCode || aerr.Code() == request.WaiterResourceNotReadyErrorCode { + err = nil // we want to ignore these kinds of errors + } + } + return awsEc2EventNone, errwrap.Wrapf(err, "error waiting for instance to exist") + } + // Check the instance state, and return the appropriate event. + stateOutput, err := c.DescribeInstances(waitInput) + if err != nil { + return awsEc2EventNone, errwrap.Wrapf(err, "error describing instances") + } + if len(stateOutput.Reservations) != 0 { + return awsEc2EventInstanceExists, nil + } + return awsEc2EventNone, nil +} + // waitUntilInstanceStoppedOrTerminatedWithContext combines the two waiters // required to trigger events if a running instance is stopped or terminated. // This function is needed, because the AWS api only provides waiters for one