resources: aws: ec2: Refactor longpollWatch

This patch simplifies longpollwatch by getting rid of some unnecessary
api calls and breaking the waiters out into their own functions.
This commit is contained in:
Jonathan Gold
2017-12-06 07:15:19 -05:00
committed by James Shubin
parent 76fcb7a06e
commit 32e3c4e029

View File

@@ -400,6 +400,8 @@ func (obj *AwsEc2Res) longpollWatch() error {
defer obj.wg.Done() defer obj.wg.Done()
defer close(obj.awsChan) defer close(obj.awsChan)
for { for {
// get the instance that matches the name specified in
// the definition. Ignore terminated instances.
diInput := &ec2.DescribeInstancesInput{ diInput := &ec2.DescribeInstancesInput{
Filters: []*ec2.Filter{ Filters: []*ec2.Filter{
{ {
@@ -428,31 +430,10 @@ func (obj *AwsEc2Res) longpollWatch() error {
return return
} }
if obj.State == "running" { if obj.State == "running" {
stoppedInput := &ec2.DescribeInstancesInput{ // Watch the instance if it isn't in a stopped
Filters: []*ec2.Filter{ // state. If we watch instances that are already
{ // stopped, we will send events continuously.
Name: aws.String("tag:Name"), if len(diOutput.Reservations) == 1 && *diOutput.Reservations[0].Instances[0].State.Name != "stopped" {
Values: []*string{aws.String(obj.prependName())},
},
{
Name: aws.String("instance-state-name"),
Values: []*string{
aws.String("stopped"),
},
},
},
}
stoppedOutput, err := obj.client.DescribeInstances(stoppedInput)
if err != nil {
select {
case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "error describing instances"),
}:
case <-obj.closeChan:
}
return
}
if len(diOutput.Reservations) == 1 && len(stoppedOutput.Reservations) == 0 {
waitInput := &ec2.DescribeInstancesInput{ waitInput := &ec2.DescribeInstancesInput{
InstanceIds: []*string{diOutput.Reservations[0].Instances[0].InstanceId}, InstanceIds: []*string{diOutput.Reservations[0].Instances[0].InstanceId},
Filters: []*ec2.Filter{ Filters: []*ec2.Filter{
@@ -483,36 +464,15 @@ func (obj *AwsEc2Res) longpollWatch() error {
event: event, event: event,
}: }:
case <-obj.closeChan: case <-obj.closeChan:
}
return return
} }
} }
}
if obj.State == "stopped" { if obj.State == "stopped" {
runningInput := &ec2.DescribeInstancesInput{ // Watch the instance if it isn't in a running
Filters: []*ec2.Filter{ // state. If we watch instances that are already
{ // running, we will send events continuously.
Name: aws.String("tag:Name"), if len(diOutput.Reservations) == 1 && *diOutput.Reservations[0].Instances[0].State.Name != "running" {
Values: []*string{aws.String(obj.prependName())},
},
{
Name: aws.String("instance-state-name"),
Values: []*string{
aws.String("running"),
},
},
},
}
runningOutput, err := obj.client.DescribeInstances(runningInput)
if err != nil {
select {
case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "error describing instances"),
}:
case <-obj.closeChan:
}
return
}
if len(diOutput.Reservations) == 1 && len(runningOutput.Reservations) == 0 {
waitInput := &ec2.DescribeInstancesInput{ waitInput := &ec2.DescribeInstancesInput{
InstanceIds: []*string{diOutput.Reservations[0].Instances[0].InstanceId}, InstanceIds: []*string{diOutput.Reservations[0].Instances[0].InstanceId},
Filters: []*ec2.Filter{ Filters: []*ec2.Filter{
@@ -523,88 +483,44 @@ func (obj *AwsEc2Res) longpollWatch() error {
}, },
} }
log.Printf("%s: watching: %s", obj, *diOutput.Reservations[0].Instances[0].InstanceId) log.Printf("%s: watching: %s", obj, *diOutput.Reservations[0].Instances[0].InstanceId)
if err := obj.client.WaitUntilInstanceRunningWithContext(ctx, waitInput); err != nil { event, err := longpollStoppedWaiter(ctx, waitInput, obj.client)
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == request.CanceledErrorCode {
log.Printf("%s: Request cancelled", obj)
}
if aerr.Code() == request.WaiterResourceNotReadyErrorCode {
continue
}
}
select {
case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "unknown error waiting for instance to start"),
}:
case <-obj.closeChan:
}
return
}
stateOutput, err := obj.client.DescribeInstances(diInput)
if err != nil { if err != nil {
select { select {
case obj.awsChan <- &chanStruct{ case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "error describing instances"), err: errwrap.Wrapf(err, "unknown waiter error"),
}: }:
case <-obj.closeChan: case <-obj.closeChan:
} }
return return
} }
var stateName string
if len(stateOutput.Reservations) == 1 {
stateName = *stateOutput.Reservations[0].Instances[0].State.Name
}
if len(stateOutput.Reservations) == 0 || (len(stateOutput.Reservations) == 1 && stateName != "stopped") {
select { select {
case obj.awsChan <- &chanStruct{ case obj.awsChan <- &chanStruct{
event: awsEc2EventInstanceRunning, event: event,
}: }:
case <-obj.closeChan: case <-obj.closeChan:
return return
} }
} }
} }
}
if obj.State == "terminated" { if obj.State == "terminated" {
if err := obj.client.WaitUntilInstanceExistsWithContext(ctx, diInput); err != nil { event, err := longpollTerminatedWaiter(ctx, diInput, obj.client)
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == request.CanceledErrorCode {
log.Printf("%s: Request cancelled", obj)
}
if aerr.Code() == request.WaiterResourceNotReadyErrorCode {
continue
}
}
select {
case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "unknown error waiting for instance to exist"),
}:
case <-obj.closeChan:
}
return
}
stateOutput, err := obj.client.DescribeInstances(diInput)
if err != nil { if err != nil {
select { select {
case obj.awsChan <- &chanStruct{ case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "error describing instances"), err: errwrap.Wrapf(err, "unknown waiter error"),
}: }:
case <-obj.closeChan: case <-obj.closeChan:
} }
return return
} }
if len(stateOutput.Reservations) == 1 {
{
select { select {
case obj.awsChan <- &chanStruct{ case obj.awsChan <- &chanStruct{
event: awsEc2EventInstanceExists, event: event,
}: }:
case <-obj.closeChan: case <-obj.closeChan:
return return
} }
} }
}
}
select { select {
case <-obj.closeChan: case <-obj.closeChan:
return return
@@ -1019,6 +935,50 @@ func longpollRunningWaiter(ctx context.Context, waitInput *ec2.DescribeInstances
return awsEc2EventNone, nil return awsEc2EventNone, nil
} }
// longpollStoppedWaiter waits for the instance to be in a running state. When
// it returns, the instance state is checked, and an awsEc2Event is returned.
func longpollStoppedWaiter(ctx context.Context, waitInput *ec2.DescribeInstancesInput, c *ec2.EC2) (awsEc2Event, error) {
if err := c.WaitUntilInstanceRunningWithContext(ctx, waitInput); err != nil {
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == request.CanceledErrorCode || aerr.Code() == request.WaiterResourceNotReadyErrorCode {
err = nil // we want to ignore these kinds of errors
}
}
return awsEc2EventNone, errwrap.Wrapf(err, "error waiting for instance to run")
}
// Check the instance state, and return the appropriate event.
stateOutput, err := c.DescribeInstances(waitInput)
if err != nil {
return awsEc2EventNone, errwrap.Wrapf(err, "error describing instances")
}
if len(stateOutput.Reservations) == 1 && *stateOutput.Reservations[0].Instances[0].State.Name == "running" {
return awsEc2EventInstanceRunning, nil
}
return awsEc2EventNone, nil
}
// longpollTerminatedWaiter waits for an instance matching the definition to
// exist. When it returns, if the instance exists, an awsEc2Event is returned.
func longpollTerminatedWaiter(ctx context.Context, waitInput *ec2.DescribeInstancesInput, c *ec2.EC2) (awsEc2Event, error) {
if err := c.WaitUntilInstanceExistsWithContext(ctx, waitInput); err != nil {
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == request.CanceledErrorCode || aerr.Code() == request.WaiterResourceNotReadyErrorCode {
err = nil // we want to ignore these kinds of errors
}
}
return awsEc2EventNone, errwrap.Wrapf(err, "error waiting for instance to exist")
}
// Check the instance state, and return the appropriate event.
stateOutput, err := c.DescribeInstances(waitInput)
if err != nil {
return awsEc2EventNone, errwrap.Wrapf(err, "error describing instances")
}
if len(stateOutput.Reservations) != 0 {
return awsEc2EventInstanceExists, nil
}
return awsEc2EventNone, nil
}
// waitUntilInstanceStoppedOrTerminatedWithContext combines the two waiters // waitUntilInstanceStoppedOrTerminatedWithContext combines the two waiters
// required to trigger events if a running instance is stopped or terminated. // required to trigger events if a running instance is stopped or terminated.
// This function is needed, because the AWS api only provides waiters for one // This function is needed, because the AWS api only provides waiters for one