resources: aws: ec2: Add CloudWatch rule and target SNS
This patch creates the cloudwatch rule that detects ec2 instance state changes, and targets the rule to publish on our sns topic which, in turn, pushes those event notifications to our endpoint.
This commit is contained in:
committed by
James Shubin
parent
1162485c2c
commit
dcd23fcf75
@@ -33,6 +33,7 @@ import (
|
|||||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||||
"github.com/aws/aws-sdk-go/aws/request"
|
"github.com/aws/aws-sdk-go/aws/request"
|
||||||
"github.com/aws/aws-sdk-go/aws/session"
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
|
cwe "github.com/aws/aws-sdk-go/service/cloudwatchevents"
|
||||||
"github.com/aws/aws-sdk-go/service/ec2"
|
"github.com/aws/aws-sdk-go/service/ec2"
|
||||||
"github.com/aws/aws-sdk-go/service/sns"
|
"github.com/aws/aws-sdk-go/service/sns"
|
||||||
multierr "github.com/hashicorp/go-multierror"
|
multierr "github.com/hashicorp/go-multierror"
|
||||||
@@ -58,6 +59,18 @@ const (
|
|||||||
SnsSubscriptionProto = "http"
|
SnsSubscriptionProto = "http"
|
||||||
// SnsServerShutdownTimeout is the maximum number of seconds to wait for the http server to shutdown gracefully.
|
// SnsServerShutdownTimeout is the maximum number of seconds to wait for the http server to shutdown gracefully.
|
||||||
SnsServerShutdownTimeout = 30
|
SnsServerShutdownTimeout = 30
|
||||||
|
// CwePrefix gets prepended onto the cloudwatch rule name.
|
||||||
|
CwePrefix = Ec2Prefix + "cw-"
|
||||||
|
// CweRuleName is the name of the rule created by makeCloudWatchRule.
|
||||||
|
CweRuleName = CwePrefix + "state"
|
||||||
|
// CweRuleSource describes the resource type to monitor for cloudwatch events.
|
||||||
|
CweRuleSource = "aws.ec2"
|
||||||
|
// CweRuleDetailType describes the specific type of events to trigger cloudwatch.
|
||||||
|
CweRuleDetailType = "EC2 Instance State-change Notification"
|
||||||
|
// CweTargetID is used to tell cloudwatch events to target the sns service.
|
||||||
|
CweTargetID = "sns"
|
||||||
|
// CweTargetJSON is the json field that cloudwatch will send to our endpoint so we don't get more than we need.
|
||||||
|
CweTargetJSON = "$.detail"
|
||||||
// waitTimeout is the duration in seconds of the timeout context in CheckApply.
|
// waitTimeout is the duration in seconds of the timeout context in CheckApply.
|
||||||
waitTimeout = 400
|
waitTimeout = 400
|
||||||
)
|
)
|
||||||
@@ -120,6 +133,8 @@ type AwsEc2Res struct {
|
|||||||
// so we save it here when we create the topic instead.
|
// so we save it here when we create the topic instead.
|
||||||
snsTopicArn string
|
snsTopicArn string
|
||||||
|
|
||||||
|
cweClient *cwe.CloudWatchEvents // client for AWS CloudWatchEvents API calls
|
||||||
|
|
||||||
awsChan chan *chanStruct // channel used to send events and errors to Watch()
|
awsChan chan *chanStruct // channel used to send events and errors to Watch()
|
||||||
closeChan chan struct{} // channel used to cancel context when it's time to shut down
|
closeChan chan struct{} // channel used to cancel context when it's time to shut down
|
||||||
wg *sync.WaitGroup // waitgroup for goroutines in Watch()
|
wg *sync.WaitGroup // waitgroup for goroutines in Watch()
|
||||||
@@ -131,6 +146,18 @@ type chanStruct struct {
|
|||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cloudWatchRule denotes the structure of cloudwatch rules.
|
||||||
|
type cloudWatchRule struct {
|
||||||
|
Source []string `json:"source"`
|
||||||
|
DetailType []string `json:"detail-type"`
|
||||||
|
Detail ruleDetail `json:"detail"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ruleDetail is the structure of the detail field in cloudWatchRule.
|
||||||
|
type ruleDetail struct {
|
||||||
|
State []string `json:"state"`
|
||||||
|
}
|
||||||
|
|
||||||
// postData is the format of the messages received and decoded by snsPostHandler().
|
// postData is the format of the messages received and decoded by snsPostHandler().
|
||||||
type postData struct {
|
type postData struct {
|
||||||
Type string `json:"Type"`
|
Type string `json:"Type"`
|
||||||
@@ -238,6 +265,30 @@ func (obj *AwsEc2Res) Init() error {
|
|||||||
}
|
}
|
||||||
// save the topicArn for later use
|
// save the topicArn for later use
|
||||||
obj.snsTopicArn = snsTopicArn
|
obj.snsTopicArn = snsTopicArn
|
||||||
|
|
||||||
|
// make cloudwatch client
|
||||||
|
cweSess, err := session.NewSession(&aws.Config{
|
||||||
|
Region: aws.String(obj.Region),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return errwrap.Wrapf(err, "error creating cwe session")
|
||||||
|
}
|
||||||
|
obj.cweClient = cwe.New(cweSess)
|
||||||
|
// make the cloudwatch rule event pattern
|
||||||
|
// CweRuleDetail describes the instance states that will trigger events.
|
||||||
|
CweRuleDetail := []string{"running", "stopped", "terminated"}
|
||||||
|
eventPattern, err := obj.cweMakeEventPattern(CweRuleSource, CweRuleDetailType, CweRuleDetail)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// make the cloudwatch rule
|
||||||
|
if err := obj.cweMakeRule(CweRuleName, eventPattern); err != nil {
|
||||||
|
return errwrap.Wrapf(err, "error making cloudwatch rule")
|
||||||
|
}
|
||||||
|
// target cloudwatch rule to sns topic
|
||||||
|
if err := obj.cweTargetRule(obj.snsTopicArn, CweTargetID, CweTargetJSON, CweRuleName); err != nil {
|
||||||
|
return errwrap.Wrapf(err, "error targeting cloudwatch rule")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return obj.BaseRes.Init() // call base init, b/c we're overriding
|
return obj.BaseRes.Init() // call base init, b/c we're overriding
|
||||||
@@ -522,8 +573,11 @@ func (obj *AwsEc2Res) longpollWatch() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// snsWatch uses amazon cloudwatch events and simple notification service to
|
// snsWatch uses amazon's SNS and CloudWatchEvents APIs to get instance state-
|
||||||
// detect ec2 instance state changes.
|
// change notifications pushed to the http endpoint (snsServer) set up below.
|
||||||
|
// In Init() a CloudWatch rule is created along with a corresponding SNS topic
|
||||||
|
// that it can publish to. snsWatch creates an http server which listens for
|
||||||
|
// messages published to the topic and processes them accordingly.
|
||||||
func (obj *AwsEc2Res) snsWatch() error {
|
func (obj *AwsEc2Res) snsWatch() error {
|
||||||
send := false
|
send := false
|
||||||
var exit *error
|
var exit *error
|
||||||
@@ -974,6 +1028,83 @@ func (obj *AwsEc2Res) snsConfirmSubscription(topicArn string, token string) erro
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cweMakeEventPattern makes and encodes event patterns for cloudwatch rules.
|
||||||
|
func (obj *AwsEc2Res) cweMakeEventPattern(source, detailType string, detail []string) (string, error) {
|
||||||
|
pattern := cloudWatchRule{
|
||||||
|
Source: []string{source},
|
||||||
|
DetailType: []string{detailType},
|
||||||
|
Detail: ruleDetail{
|
||||||
|
State: detail,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
eventPattern, err := json.Marshal(pattern)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return string(eventPattern), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// cweMakeRule makes a cloud watch rule.
|
||||||
|
func (obj *AwsEc2Res) cweMakeRule(name, eventPattern string) error {
|
||||||
|
// make cloudwatch rule
|
||||||
|
putRuleInput := &cwe.PutRuleInput{
|
||||||
|
Name: aws.String(name),
|
||||||
|
EventPattern: aws.String(eventPattern),
|
||||||
|
}
|
||||||
|
if _, err := obj.cweClient.PutRule(putRuleInput); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Printf("%s: Created CloudWatch Rule", obj)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// cweDeleteRule deletes the cloudwatch rule.
|
||||||
|
func (obj *AwsEc2Res) cweDeleteRule(name string) error {
|
||||||
|
// delete the rule
|
||||||
|
drInput := &cwe.DeleteRuleInput{
|
||||||
|
Name: aws.String(name),
|
||||||
|
}
|
||||||
|
log.Printf("%s: Deleting CloudWatch Rule", obj)
|
||||||
|
if _, err := obj.cweClient.DeleteRule(drInput); err != nil {
|
||||||
|
return errwrap.Wrapf(err, "error deleting cloudwatch rule")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// cweTargetRule configures cloudwatch to send events to sns topic.
|
||||||
|
func (obj *AwsEc2Res) cweTargetRule(topicArn, targetID, inputPath, ruleName string) error {
|
||||||
|
// target the rule to sns topic
|
||||||
|
target := &cwe.Target{
|
||||||
|
Arn: aws.String(topicArn),
|
||||||
|
Id: aws.String(targetID),
|
||||||
|
InputPath: aws.String(inputPath),
|
||||||
|
}
|
||||||
|
putTargetInput := &cwe.PutTargetsInput{
|
||||||
|
Rule: aws.String(ruleName),
|
||||||
|
Targets: []*cwe.Target{target},
|
||||||
|
}
|
||||||
|
_, err := obj.cweClient.PutTargets(putTargetInput)
|
||||||
|
if err != nil {
|
||||||
|
return errwrap.Wrapf(err, "error putting cloudwatch target")
|
||||||
|
}
|
||||||
|
log.Printf("%s: Targeted SNS Topic", obj)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// cweRemoveTarget removes the sns target from the cloudwatch rule.
|
||||||
|
func (obj *AwsEc2Res) cweRemoveTarget(targetID, ruleName string) error {
|
||||||
|
// remove the target
|
||||||
|
rtInput := &cwe.RemoveTargetsInput{
|
||||||
|
Ids: []*string{aws.String(targetID)},
|
||||||
|
Rule: aws.String(ruleName),
|
||||||
|
}
|
||||||
|
log.Printf("%s: Removing Target", obj)
|
||||||
|
if _, err := obj.cweClient.RemoveTargets(rtInput); err != nil {
|
||||||
|
return errwrap.Wrapf(err, "error removing cloudwatch target")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Close cleans up when we're done. This is needed to delete some of the AWS
|
// Close cleans up when we're done. This is needed to delete some of the AWS
|
||||||
// objects created for the SNS endpoint.
|
// objects created for the SNS endpoint.
|
||||||
func (obj *AwsEc2Res) Close() error {
|
func (obj *AwsEc2Res) Close() error {
|
||||||
@@ -984,6 +1115,14 @@ func (obj *AwsEc2Res) Close() error {
|
|||||||
if err := obj.snsDeleteTopic(obj.snsTopicArn); err != nil {
|
if err := obj.snsDeleteTopic(obj.snsTopicArn); err != nil {
|
||||||
errList = multierr.Append(errList, err)
|
errList = multierr.Append(errList, err)
|
||||||
}
|
}
|
||||||
|
// remove the target
|
||||||
|
if err := obj.cweRemoveTarget(CweTargetID, CweRuleName); err != nil {
|
||||||
|
errList = multierr.Append(errList, err)
|
||||||
|
}
|
||||||
|
// delete the cloudwatch rule
|
||||||
|
if err := obj.cweDeleteRule(CweRuleName); err != nil {
|
||||||
|
errList = multierr.Append(errList, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if err := obj.BaseRes.Close(); err != nil {
|
if err := obj.BaseRes.Close(); err != nil {
|
||||||
errList = multierr.Append(errList, err) // list of errors
|
errList = multierr.Append(errList, err) // list of errors
|
||||||
|
|||||||
Reference in New Issue
Block a user