diff --git a/resources/aws_ec2.go b/resources/aws_ec2.go index d8a08877..828f165b 100644 --- a/resources/aws_ec2.go +++ b/resources/aws_ec2.go @@ -33,6 +33,7 @@ import ( "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" + cwe "github.com/aws/aws-sdk-go/service/cloudwatchevents" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/sns" multierr "github.com/hashicorp/go-multierror" @@ -58,6 +59,18 @@ const ( SnsSubscriptionProto = "http" // SnsServerShutdownTimeout is the maximum number of seconds to wait for the http server to shutdown gracefully. SnsServerShutdownTimeout = 30 + // CwePrefix gets prepended onto the cloudwatch rule name. + CwePrefix = Ec2Prefix + "cw-" + // CweRuleName is the name of the rule created by makeCloudWatchRule. + CweRuleName = CwePrefix + "state" + // CweRuleSource describes the resource type to monitor for cloudwatch events. + CweRuleSource = "aws.ec2" + // CweRuleDetailType describes the specific type of events to trigger cloudwatch. + CweRuleDetailType = "EC2 Instance State-change Notification" + // CweTargetID is used to tell cloudwatch events to target the sns service. + CweTargetID = "sns" + // CweTargetJSON is the json field that cloudwatch will send to our endpoint so we don't get more than we need. + CweTargetJSON = "$.detail" // waitTimeout is the duration in seconds of the timeout context in CheckApply. waitTimeout = 400 ) @@ -120,6 +133,8 @@ type AwsEc2Res struct { // so we save it here when we create the topic instead. snsTopicArn string + cweClient *cwe.CloudWatchEvents // client for AWS CloudWatchEvents API calls + awsChan chan *chanStruct // channel used to send events and errors to Watch() closeChan chan struct{} // channel used to cancel context when it's time to shut down wg *sync.WaitGroup // waitgroup for goroutines in Watch() @@ -131,6 +146,18 @@ type chanStruct struct { err error } +// cloudWatchRule denotes the structure of cloudwatch rules. +type cloudWatchRule struct { + Source []string `json:"source"` + DetailType []string `json:"detail-type"` + Detail ruleDetail `json:"detail"` +} + +// ruleDetail is the structure of the detail field in cloudWatchRule. +type ruleDetail struct { + State []string `json:"state"` +} + // postData is the format of the messages received and decoded by snsPostHandler(). type postData struct { Type string `json:"Type"` @@ -238,6 +265,30 @@ func (obj *AwsEc2Res) Init() error { } // save the topicArn for later use obj.snsTopicArn = snsTopicArn + + // make cloudwatch client + cweSess, err := session.NewSession(&aws.Config{ + Region: aws.String(obj.Region), + }) + if err != nil { + return errwrap.Wrapf(err, "error creating cwe session") + } + obj.cweClient = cwe.New(cweSess) + // make the cloudwatch rule event pattern + // CweRuleDetail describes the instance states that will trigger events. + CweRuleDetail := []string{"running", "stopped", "terminated"} + eventPattern, err := obj.cweMakeEventPattern(CweRuleSource, CweRuleDetailType, CweRuleDetail) + if err != nil { + return err + } + // make the cloudwatch rule + if err := obj.cweMakeRule(CweRuleName, eventPattern); err != nil { + return errwrap.Wrapf(err, "error making cloudwatch rule") + } + // target cloudwatch rule to sns topic + if err := obj.cweTargetRule(obj.snsTopicArn, CweTargetID, CweTargetJSON, CweRuleName); err != nil { + return errwrap.Wrapf(err, "error targeting cloudwatch rule") + } } return obj.BaseRes.Init() // call base init, b/c we're overriding @@ -522,8 +573,11 @@ func (obj *AwsEc2Res) longpollWatch() error { } } -// snsWatch uses amazon cloudwatch events and simple notification service to -// detect ec2 instance state changes. +// snsWatch uses amazon's SNS and CloudWatchEvents APIs to get instance state- +// change notifications pushed to the http endpoint (snsServer) set up below. +// In Init() a CloudWatch rule is created along with a corresponding SNS topic +// that it can publish to. snsWatch creates an http server which listens for +// messages published to the topic and processes them accordingly. func (obj *AwsEc2Res) snsWatch() error { send := false var exit *error @@ -974,6 +1028,83 @@ func (obj *AwsEc2Res) snsConfirmSubscription(topicArn string, token string) erro return nil } +// cweMakeEventPattern makes and encodes event patterns for cloudwatch rules. +func (obj *AwsEc2Res) cweMakeEventPattern(source, detailType string, detail []string) (string, error) { + pattern := cloudWatchRule{ + Source: []string{source}, + DetailType: []string{detailType}, + Detail: ruleDetail{ + State: detail, + }, + } + eventPattern, err := json.Marshal(pattern) + if err != nil { + return "", err + } + return string(eventPattern), nil +} + +// cweMakeRule makes a cloud watch rule. +func (obj *AwsEc2Res) cweMakeRule(name, eventPattern string) error { + // make cloudwatch rule + putRuleInput := &cwe.PutRuleInput{ + Name: aws.String(name), + EventPattern: aws.String(eventPattern), + } + if _, err := obj.cweClient.PutRule(putRuleInput); err != nil { + return err + } + log.Printf("%s: Created CloudWatch Rule", obj) + return nil +} + +// cweDeleteRule deletes the cloudwatch rule. +func (obj *AwsEc2Res) cweDeleteRule(name string) error { + // delete the rule + drInput := &cwe.DeleteRuleInput{ + Name: aws.String(name), + } + log.Printf("%s: Deleting CloudWatch Rule", obj) + if _, err := obj.cweClient.DeleteRule(drInput); err != nil { + return errwrap.Wrapf(err, "error deleting cloudwatch rule") + } + return nil +} + +// cweTargetRule configures cloudwatch to send events to sns topic. +func (obj *AwsEc2Res) cweTargetRule(topicArn, targetID, inputPath, ruleName string) error { + // target the rule to sns topic + target := &cwe.Target{ + Arn: aws.String(topicArn), + Id: aws.String(targetID), + InputPath: aws.String(inputPath), + } + putTargetInput := &cwe.PutTargetsInput{ + Rule: aws.String(ruleName), + Targets: []*cwe.Target{target}, + } + _, err := obj.cweClient.PutTargets(putTargetInput) + if err != nil { + return errwrap.Wrapf(err, "error putting cloudwatch target") + } + log.Printf("%s: Targeted SNS Topic", obj) + return nil +} + +// cweRemoveTarget removes the sns target from the cloudwatch rule. +func (obj *AwsEc2Res) cweRemoveTarget(targetID, ruleName string) error { + // remove the target + rtInput := &cwe.RemoveTargetsInput{ + Ids: []*string{aws.String(targetID)}, + Rule: aws.String(ruleName), + } + log.Printf("%s: Removing Target", obj) + if _, err := obj.cweClient.RemoveTargets(rtInput); err != nil { + return errwrap.Wrapf(err, "error removing cloudwatch target") + } + return nil +} + // Close cleans up when we're done. This is needed to delete some of the AWS // objects created for the SNS endpoint. func (obj *AwsEc2Res) Close() error { @@ -984,6 +1115,14 @@ func (obj *AwsEc2Res) Close() error { if err := obj.snsDeleteTopic(obj.snsTopicArn); err != nil { errList = multierr.Append(errList, err) } + // remove the target + if err := obj.cweRemoveTarget(CweTargetID, CweRuleName); err != nil { + errList = multierr.Append(errList, err) + } + // delete the cloudwatch rule + if err := obj.cweDeleteRule(CweRuleName); err != nil { + errList = multierr.Append(errList, err) + } } if err := obj.BaseRes.Close(); err != nil { errList = multierr.Append(errList, err) // list of errors