diff --git a/resources/aws_ec2.go b/resources/aws_ec2.go index 275e1bc7..d8a08877 100644 --- a/resources/aws_ec2.go +++ b/resources/aws_ec2.go @@ -20,8 +20,8 @@ package resources import ( "context" "encoding/base64" + "encoding/json" "fmt" - "io/ioutil" "log" "net" "net/http" @@ -53,6 +53,9 @@ const ( SnsPrefix = Ec2Prefix + "sns-" // SnsTopicName is the name of the sns topic created by snsMakeTopic. SnsTopicName = SnsPrefix + "events" + // SnsSubscriptionProto is used to tell sns that the subscriber uses the http protocol. + // TODO: add https support + SnsSubscriptionProto = "http" // SnsServerShutdownTimeout is the maximum number of seconds to wait for the http server to shutdown gracefully. SnsServerShutdownTimeout = 30 // waitTimeout is the duration in seconds of the timeout context in CheckApply. @@ -65,7 +68,7 @@ const ( type awsEc2Event uint8 const ( - awsEc2EventServerReady awsEc2Event = iota + awsEc2EventWatchReady awsEc2Event = iota awsEc2EventInstanceStopped awsEc2EventInstanceRunning awsEc2EventInstanceExists @@ -103,6 +106,7 @@ type AwsEc2Res struct { Type string `yaml:"type"` // type of ec2 instance, eg: t2.micro ImageID string `yaml:"imageid"` // imageid must be available on the chosen region + WatchEndpoint string `yaml:"watchendpoint"` // the public url of the sns endpoint, eg: http://server:12345/ WatchListenAddr string `yaml:"watchlistenaddr"` // the local address or port that the sns listens on, eg: 10.0.0.0:23456 or 23456 // UserData is used to run bash and cloud-init commands on first launch. // See http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/user-data.html @@ -127,6 +131,13 @@ type chanStruct struct { err error } +// postData is the format of the messages received and decoded by snsPostHandler(). +type postData struct { + Type string `json:"Type"` + Token string `json:"Token"` + Message string `json:"Message"` +} + // Default returns some sensible defaults for this resource. func (obj *AwsEc2Res) Default() Res { return &AwsEc2Res{ @@ -185,6 +196,13 @@ func (obj *AwsEc2Res) Validate() error { return fmt.Errorf("imageid must be a valid ami available in the specified region") } + if obj.WatchEndpoint == "" && obj.WatchListenAddr != "" { + return fmt.Errorf("you must set watchendpoint with watchlistenaddr to use http watch") + } + if obj.WatchEndpoint != "" && obj.WatchListenAddr == "" { + return fmt.Errorf("you must set watchendpoint with watchlistenaddr to use http watch") + } + return obj.BaseRes.Validate() } @@ -554,6 +572,16 @@ func (obj *AwsEc2Res) snsWatch() error { } }() log.Printf("%s: Started SNS Endpoint", obj) + // Subscribing the endpoint to the topic needs to happen after starting + // the http server, so that the server can process the subscription + // confirmation. We won't drop incoming connections from aws by this + // point, because we've already opened the server listener. In the + // worst case scenario the incoming aws connections will be accepted + // but will block until our http server finishes getting ready in + // its goroutine. + if err := obj.snsSubscribe(obj.WatchEndpoint, obj.snsTopicArn); err != nil { + return errwrap.Wrapf(err, "error subscribing to sns topic") + } // process events for { select { @@ -568,6 +596,16 @@ func (obj *AwsEc2Res) snsWatch() error { if err := msg.err; err != nil { return err } + // snsPostHandler sends the ready message after the + // subscription is confirmed. Once the subscription + // is confirmed, we are ready to receive events, so we + // can notify the engine that we're running. + if msg.event == awsEc2EventWatchReady { + if err := obj.Running(); err != nil { + return err + } + continue + } log.Printf("%s: State: %v", obj, msg.event) obj.StateOK(false) send = true @@ -781,6 +819,9 @@ func (obj *AwsEc2Res) Compare(r Res) bool { if obj.UserData != res.UserData { return false } + if obj.WatchEndpoint != res.WatchEndpoint { + return false + } if obj.WatchListenAddr != res.WatchListenAddr { return false } @@ -831,9 +872,42 @@ func (obj *AwsEc2Res) snsPostHandler(w http.ResponseWriter, req *http.Request) { http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) return } - post, _ := ioutil.ReadAll(req.Body) - if obj.debug { - log.Printf("%s: Post: %s", obj, string(post)) + // decode json + decoder := json.NewDecoder(req.Body) + var post postData + if err := decoder.Decode(&post); err != nil { + http.Error(w, "Bad request", http.StatusBadRequest) + select { + case obj.awsChan <- &chanStruct{ + err: errwrap.Wrapf(err, "error decoding incoming POST, check struct formatting"), + }: + case <-obj.closeChan: + } + return + } + if post.Type == "SubscriptionConfirmation" { + if err := obj.snsConfirmSubscription(obj.snsTopicArn, post.Token); err != nil { + select { + case obj.awsChan <- &chanStruct{ + err: errwrap.Wrapf(err, "error confirming subscription"), + }: + case <-obj.closeChan: + } + return + } + // Now that the subscription is confirmed, we can tell the + // engine we're running. If there is a delay between making the + // request and the subscription actually being confirmed, + // amazon will retry sending any new messages every 20 seconds + // for one minute. So, we won't miss any events. See the + // following for more details: + // http://docs.aws.amazon.com/sns/latest/dg/SendMessageToHttp.html#SendMessageToHttp.retry + select { + case obj.awsChan <- &chanStruct{ + event: awsEc2EventWatchReady, + }: + case <-obj.closeChan: + } } } @@ -867,13 +941,46 @@ func (obj *AwsEc2Res) snsDeleteTopic(topicArn string) error { return nil } +// snsSubscribe subscribes the endpoint to the sns topic. +// Returning SubscriptionArn here is useless as it is still pending confirmation. +func (obj *AwsEc2Res) snsSubscribe(endpoint string, topicArn string) error { + // subscribe to the topic + subInput := &sns.SubscribeInput{ + Endpoint: aws.String(endpoint), + Protocol: aws.String(SnsSubscriptionProto), + TopicArn: aws.String(topicArn), + } + _, err := obj.snsClient.Subscribe(subInput) + if err != nil { + return err + } + log.Printf("%s: Created Subscription", obj) + return nil +} + +// snsConfirmSubscription confirms the sns subscription. +// Returning SubscriptionArn here is useless as it is still pending confirmation. +func (obj *AwsEc2Res) snsConfirmSubscription(topicArn string, token string) error { + // confirm the subscription + csInput := &sns.ConfirmSubscriptionInput{ + Token: aws.String(token), + TopicArn: aws.String(topicArn), + } + _, err := obj.snsClient.ConfirmSubscription(csInput) + if err != nil { + return err + } + log.Printf("%s: Subscription Confirmed", obj) + return nil +} + // Close cleans up when we're done. This is needed to delete some of the AWS // objects created for the SNS endpoint. func (obj *AwsEc2Res) Close() error { var errList error // clean up sns objects created by Init/snsWatch if obj.snsClient != nil { - // delete the topic + // delete the topic and associated subscriptions if err := obj.snsDeleteTopic(obj.snsTopicArn); err != nil { errList = multierr.Append(errList, err) }