From c2a5e3f5d89bdb6e09092d0e5a539eedb6507921 Mon Sep 17 00:00:00 2001 From: Jonathan Gold Date: Wed, 8 Nov 2017 16:15:39 -0500 Subject: [PATCH] resources: aws: ec2: Move watch channels into struct --- resources/aws_ec2.go | 66 +++++++++++++++++++++++--------------------- 1 file changed, 35 insertions(+), 31 deletions(-) diff --git a/resources/aws_ec2.go b/resources/aws_ec2.go index a0423cd1..3a700ca2 100644 --- a/resources/aws_ec2.go +++ b/resources/aws_ec2.go @@ -82,6 +82,9 @@ type AwsEc2Res struct { UserData string `yaml:"userdata"` client *ec2.EC2 // client session for AWS API calls + + awsChan chan *chanStruct // channel used to send events and errors to Watch() + closeChan chan struct{} // channel used to cancel context when it's time to shut down } // chanStruct defines the type for a channel used to pass events and errors to watch. @@ -162,6 +165,9 @@ func (obj *AwsEc2Res) Init() error { } obj.client = ec2.New(sess) + obj.awsChan = make(chan *chanStruct) + obj.closeChan = make(chan struct{}) + return obj.BaseRes.Init() // call base init, b/c we're overriding } @@ -177,15 +183,13 @@ func (obj *AwsEc2Res) longpollWatch() error { if err := obj.Running(); err != nil { return err } - awsChan := make(chan *chanStruct) - closeChan := make(chan struct{}) ctx, cancel := context.WithCancel(context.TODO()) wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() select { - case <-closeChan: + case <-obj.closeChan: cancel() } }() @@ -193,7 +197,7 @@ func (obj *AwsEc2Res) longpollWatch() error { defer wg.Wait() go func() { defer wg.Done() - defer close(awsChan) + defer close(obj.awsChan) for { diInput := &ec2.DescribeInstancesInput{ Filters: []*ec2.Filter{ @@ -215,10 +219,10 @@ func (obj *AwsEc2Res) longpollWatch() error { diOutput, err := obj.client.DescribeInstances(diInput) if err != nil { select { - case awsChan <- &chanStruct{ + case obj.awsChan <- &chanStruct{ err: errwrap.Wrapf(err, "error describing instances"), }: - case <-closeChan: + case <-obj.closeChan: } return } @@ -240,10 +244,10 @@ func (obj *AwsEc2Res) longpollWatch() error { stoppedOutput, err := obj.client.DescribeInstances(stoppedInput) if err != nil { select { - case awsChan <- &chanStruct{ + case obj.awsChan <- &chanStruct{ err: errwrap.Wrapf(err, "error describing instances"), }: - case <-closeChan: + case <-obj.closeChan: } return } @@ -268,20 +272,20 @@ func (obj *AwsEc2Res) longpollWatch() error { } } select { - case awsChan <- &chanStruct{ + case obj.awsChan <- &chanStruct{ err: errwrap.Wrapf(err, "unknown error waiting for instance to stop"), }: - case <-closeChan: + case <-obj.closeChan: } return } stateOutput, err := obj.client.DescribeInstances(diInput) if err != nil { select { - case awsChan <- &chanStruct{ + case obj.awsChan <- &chanStruct{ err: errwrap.Wrapf(err, "error describing instances"), }: - case <-closeChan: + case <-obj.closeChan: } return } @@ -291,10 +295,10 @@ func (obj *AwsEc2Res) longpollWatch() error { } if len(stateOutput.Reservations) == 0 || (len(stateOutput.Reservations) == 1 && stateName != "running") { select { - case awsChan <- &chanStruct{ + case obj.awsChan <- &chanStruct{ str: "stopped", }: - case <-closeChan: + case <-obj.closeChan: return } } @@ -318,10 +322,10 @@ func (obj *AwsEc2Res) longpollWatch() error { runningOutput, err := obj.client.DescribeInstances(runningInput) if err != nil { select { - case awsChan <- &chanStruct{ + case obj.awsChan <- &chanStruct{ err: errwrap.Wrapf(err, "error describing instances"), }: - case <-closeChan: + case <-obj.closeChan: } return } @@ -343,20 +347,20 @@ func (obj *AwsEc2Res) longpollWatch() error { } } select { - case awsChan <- &chanStruct{ + case obj.awsChan <- &chanStruct{ err: errwrap.Wrapf(err, "unknown error waiting for instance to start"), }: - case <-closeChan: + case <-obj.closeChan: } return } stateOutput, err := obj.client.DescribeInstances(diInput) if err != nil { select { - case awsChan <- &chanStruct{ + case obj.awsChan <- &chanStruct{ err: errwrap.Wrapf(err, "error describing instances"), }: - case <-closeChan: + case <-obj.closeChan: } return } @@ -366,10 +370,10 @@ func (obj *AwsEc2Res) longpollWatch() error { } if len(stateOutput.Reservations) == 0 || (len(stateOutput.Reservations) == 1 && stateName != "stopped") { select { - case awsChan <- &chanStruct{ + case obj.awsChan <- &chanStruct{ str: "running", }: - case <-closeChan: + case <-obj.closeChan: return } } @@ -383,37 +387,37 @@ func (obj *AwsEc2Res) longpollWatch() error { } } select { - case awsChan <- &chanStruct{ + case obj.awsChan <- &chanStruct{ err: errwrap.Wrapf(err, "unknown error waiting for instance to exist"), }: - case <-closeChan: + case <-obj.closeChan: } return } stateOutput, err := obj.client.DescribeInstances(diInput) if err != nil { select { - case awsChan <- &chanStruct{ + case obj.awsChan <- &chanStruct{ err: errwrap.Wrapf(err, "error describing instances"), }: - case <-closeChan: + case <-obj.closeChan: } return } if len(stateOutput.Reservations) == 1 { { select { - case awsChan <- &chanStruct{ + case obj.awsChan <- &chanStruct{ str: "exists", }: - case <-closeChan: + case <-obj.closeChan: return } } } } select { - case <-closeChan: + case <-obj.closeChan: return default: } @@ -423,10 +427,10 @@ func (obj *AwsEc2Res) longpollWatch() error { select { case event := <-obj.Events(): if exit, send = obj.ReadEvent(event); exit != nil { - close(closeChan) + close(obj.closeChan) return *exit } - case msg, ok := <-awsChan: + case msg, ok := <-obj.awsChan: if !ok { return *exit }