resources: aws: ec2: Update postHandler to process messages

This commit is contained in:
Jonathan Gold
2017-11-04 12:38:08 -04:00
committed by James Shubin
parent 8cd3f28734
commit fac004b774

View File

@@ -91,9 +91,11 @@ const (
type awsEc2Event uint8 type awsEc2Event uint8
const ( const (
awsEc2EventWatchReady awsEc2Event = iota awsEc2EventNone awsEc2Event = iota
awsEc2EventWatchReady
awsEc2EventInstanceStopped awsEc2EventInstanceStopped
awsEc2EventInstanceRunning awsEc2EventInstanceRunning
awsEc2EventInstanceTerminated
awsEc2EventInstanceExists awsEc2EventInstanceExists
) )
@@ -202,6 +204,12 @@ type postData struct {
Message string `json:"Message"` Message string `json:"Message"`
} }
// postMsg is used to unmarshal the postData message if it's an event notification.
type postMsg struct {
InstanceID string `json:"instance-id"`
State string `json:"state"`
}
// Default returns some sensible defaults for this resource. // Default returns some sensible defaults for this resource.
func (obj *AwsEc2Res) Default() Res { func (obj *AwsEc2Res) Default() Res {
return &AwsEc2Res{ return &AwsEc2Res{
@@ -963,6 +971,7 @@ func (obj *AwsEc2Res) snsListener(listenAddr string) (net.Listener, error) {
} }
// snsPostHandler listens for posts on the SNS Endpoint. // snsPostHandler listens for posts on the SNS Endpoint.
// TODO: download pem and check message against signature
func (obj *AwsEc2Res) snsPostHandler(w http.ResponseWriter, req *http.Request) { func (obj *AwsEc2Res) snsPostHandler(w http.ResponseWriter, req *http.Request) {
if req.Method != "POST" { if req.Method != "POST" {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
@@ -1005,6 +1014,28 @@ func (obj *AwsEc2Res) snsPostHandler(w http.ResponseWriter, req *http.Request) {
case <-obj.closeChan: case <-obj.closeChan:
} }
} }
// process cloudwatch event notifications.
if post.Type == "Notification" {
event, err := obj.snsProcessEvent(post.Message, obj.prependName())
if err != nil {
select {
case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "error confirming subscription"),
}:
case <-obj.closeChan:
}
return
}
if event == awsEc2EventNone {
return
}
select {
case obj.awsChan <- &chanStruct{
event: event,
}:
case <-obj.closeChan:
}
}
} }
// snsMakeTopic creates a topic on aws sns. // snsMakeTopic creates a topic on aws sns.
@@ -1070,6 +1101,43 @@ func (obj *AwsEc2Res) snsConfirmSubscription(topicArn string, token string) erro
return nil return nil
} }
// snsProcessEvents unmarshals instance state-change notifications and, if the
// event matches the instance we are watching, returns an awsEc2Event.
func (obj *AwsEc2Res) snsProcessEvent(message, instanceName string) (awsEc2Event, error) {
// unmarshal the message
var msg postMsg
if err := json.Unmarshal([]byte(message), &msg); err != nil {
return awsEc2EventNone, err
}
// check if the instance id in the message matches the name of the
// instance we're watching
diInput := &ec2.DescribeInstancesInput{
InstanceIds: []*string{aws.String(msg.InstanceID)},
Filters: []*ec2.Filter{
{
Name: aws.String("tag:Name"),
Values: []*string{aws.String(instanceName)},
},
},
}
diOutput, err := obj.client.DescribeInstances(diInput)
if err != nil {
return awsEc2EventNone, err
}
// return the appropriate awsEc2Event
if len(diOutput.Reservations) != 0 {
switch msg.State {
case "running":
return awsEc2EventInstanceRunning, nil
case "stopped":
return awsEc2EventInstanceStopped, nil
case "terminated":
return awsEc2EventInstanceTerminated, nil
}
}
return awsEc2EventNone, nil
}
// snsAuthorize adds the necessary permission for cloudwatch to publish to the SNS topic. // snsAuthorize adds the necessary permission for cloudwatch to publish to the SNS topic.
func (obj *AwsEc2Res) snsAuthorizeCloudWatch(topicArn string) error { func (obj *AwsEc2Res) snsAuthorizeCloudWatch(topicArn string) error {
// get the topic attributes, including the security policy // get the topic attributes, including the security policy