resources: aws: ec2: Move watch channels into struct

This commit is contained in:
Jonathan Gold
2017-11-08 16:15:39 -05:00
parent db49fe85e4
commit c2a5e3f5d8

View File

@@ -82,6 +82,9 @@ type AwsEc2Res struct {
UserData string `yaml:"userdata"` UserData string `yaml:"userdata"`
client *ec2.EC2 // client session for AWS API calls client *ec2.EC2 // client session for AWS API calls
awsChan chan *chanStruct // channel used to send events and errors to Watch()
closeChan chan struct{} // channel used to cancel context when it's time to shut down
} }
// chanStruct defines the type for a channel used to pass events and errors to watch. // chanStruct defines the type for a channel used to pass events and errors to watch.
@@ -162,6 +165,9 @@ func (obj *AwsEc2Res) Init() error {
} }
obj.client = ec2.New(sess) obj.client = ec2.New(sess)
obj.awsChan = make(chan *chanStruct)
obj.closeChan = make(chan struct{})
return obj.BaseRes.Init() // call base init, b/c we're overriding return obj.BaseRes.Init() // call base init, b/c we're overriding
} }
@@ -177,15 +183,13 @@ func (obj *AwsEc2Res) longpollWatch() error {
if err := obj.Running(); err != nil { if err := obj.Running(); err != nil {
return err return err
} }
awsChan := make(chan *chanStruct)
closeChan := make(chan struct{})
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(context.TODO())
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
select { select {
case <-closeChan: case <-obj.closeChan:
cancel() cancel()
} }
}() }()
@@ -193,7 +197,7 @@ func (obj *AwsEc2Res) longpollWatch() error {
defer wg.Wait() defer wg.Wait()
go func() { go func() {
defer wg.Done() defer wg.Done()
defer close(awsChan) defer close(obj.awsChan)
for { for {
diInput := &ec2.DescribeInstancesInput{ diInput := &ec2.DescribeInstancesInput{
Filters: []*ec2.Filter{ Filters: []*ec2.Filter{
@@ -215,10 +219,10 @@ func (obj *AwsEc2Res) longpollWatch() error {
diOutput, err := obj.client.DescribeInstances(diInput) diOutput, err := obj.client.DescribeInstances(diInput)
if err != nil { if err != nil {
select { select {
case awsChan <- &chanStruct{ case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "error describing instances"), err: errwrap.Wrapf(err, "error describing instances"),
}: }:
case <-closeChan: case <-obj.closeChan:
} }
return return
} }
@@ -240,10 +244,10 @@ func (obj *AwsEc2Res) longpollWatch() error {
stoppedOutput, err := obj.client.DescribeInstances(stoppedInput) stoppedOutput, err := obj.client.DescribeInstances(stoppedInput)
if err != nil { if err != nil {
select { select {
case awsChan <- &chanStruct{ case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "error describing instances"), err: errwrap.Wrapf(err, "error describing instances"),
}: }:
case <-closeChan: case <-obj.closeChan:
} }
return return
} }
@@ -268,20 +272,20 @@ func (obj *AwsEc2Res) longpollWatch() error {
} }
} }
select { select {
case awsChan <- &chanStruct{ case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "unknown error waiting for instance to stop"), err: errwrap.Wrapf(err, "unknown error waiting for instance to stop"),
}: }:
case <-closeChan: case <-obj.closeChan:
} }
return return
} }
stateOutput, err := obj.client.DescribeInstances(diInput) stateOutput, err := obj.client.DescribeInstances(diInput)
if err != nil { if err != nil {
select { select {
case awsChan <- &chanStruct{ case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "error describing instances"), err: errwrap.Wrapf(err, "error describing instances"),
}: }:
case <-closeChan: case <-obj.closeChan:
} }
return return
} }
@@ -291,10 +295,10 @@ func (obj *AwsEc2Res) longpollWatch() error {
} }
if len(stateOutput.Reservations) == 0 || (len(stateOutput.Reservations) == 1 && stateName != "running") { if len(stateOutput.Reservations) == 0 || (len(stateOutput.Reservations) == 1 && stateName != "running") {
select { select {
case awsChan <- &chanStruct{ case obj.awsChan <- &chanStruct{
str: "stopped", str: "stopped",
}: }:
case <-closeChan: case <-obj.closeChan:
return return
} }
} }
@@ -318,10 +322,10 @@ func (obj *AwsEc2Res) longpollWatch() error {
runningOutput, err := obj.client.DescribeInstances(runningInput) runningOutput, err := obj.client.DescribeInstances(runningInput)
if err != nil { if err != nil {
select { select {
case awsChan <- &chanStruct{ case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "error describing instances"), err: errwrap.Wrapf(err, "error describing instances"),
}: }:
case <-closeChan: case <-obj.closeChan:
} }
return return
} }
@@ -343,20 +347,20 @@ func (obj *AwsEc2Res) longpollWatch() error {
} }
} }
select { select {
case awsChan <- &chanStruct{ case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "unknown error waiting for instance to start"), err: errwrap.Wrapf(err, "unknown error waiting for instance to start"),
}: }:
case <-closeChan: case <-obj.closeChan:
} }
return return
} }
stateOutput, err := obj.client.DescribeInstances(diInput) stateOutput, err := obj.client.DescribeInstances(diInput)
if err != nil { if err != nil {
select { select {
case awsChan <- &chanStruct{ case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "error describing instances"), err: errwrap.Wrapf(err, "error describing instances"),
}: }:
case <-closeChan: case <-obj.closeChan:
} }
return return
} }
@@ -366,10 +370,10 @@ func (obj *AwsEc2Res) longpollWatch() error {
} }
if len(stateOutput.Reservations) == 0 || (len(stateOutput.Reservations) == 1 && stateName != "stopped") { if len(stateOutput.Reservations) == 0 || (len(stateOutput.Reservations) == 1 && stateName != "stopped") {
select { select {
case awsChan <- &chanStruct{ case obj.awsChan <- &chanStruct{
str: "running", str: "running",
}: }:
case <-closeChan: case <-obj.closeChan:
return return
} }
} }
@@ -383,37 +387,37 @@ func (obj *AwsEc2Res) longpollWatch() error {
} }
} }
select { select {
case awsChan <- &chanStruct{ case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "unknown error waiting for instance to exist"), err: errwrap.Wrapf(err, "unknown error waiting for instance to exist"),
}: }:
case <-closeChan: case <-obj.closeChan:
} }
return return
} }
stateOutput, err := obj.client.DescribeInstances(diInput) stateOutput, err := obj.client.DescribeInstances(diInput)
if err != nil { if err != nil {
select { select {
case awsChan <- &chanStruct{ case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "error describing instances"), err: errwrap.Wrapf(err, "error describing instances"),
}: }:
case <-closeChan: case <-obj.closeChan:
} }
return return
} }
if len(stateOutput.Reservations) == 1 { if len(stateOutput.Reservations) == 1 {
{ {
select { select {
case awsChan <- &chanStruct{ case obj.awsChan <- &chanStruct{
str: "exists", str: "exists",
}: }:
case <-closeChan: case <-obj.closeChan:
return return
} }
} }
} }
} }
select { select {
case <-closeChan: case <-obj.closeChan:
return return
default: default:
} }
@@ -423,10 +427,10 @@ func (obj *AwsEc2Res) longpollWatch() error {
select { select {
case event := <-obj.Events(): case event := <-obj.Events():
if exit, send = obj.ReadEvent(event); exit != nil { if exit, send = obj.ReadEvent(event); exit != nil {
close(closeChan) close(obj.closeChan)
return *exit return *exit
} }
case msg, ok := <-awsChan: case msg, ok := <-obj.awsChan:
if !ok { if !ok {
return *exit return *exit
} }