resources: aws: ec2: Subscribe SNS endpoint to topic

This patch adds methods to subscribe and confirm the subscription
to the sns topic.
This commit is contained in:
Jonathan Gold
2017-11-09 16:56:05 -05:00
committed by James Shubin
parent 966172eac6
commit 1162485c2c

View File

@@ -20,8 +20,8 @@ package resources
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
@@ -53,6 +53,9 @@ const (
SnsPrefix = Ec2Prefix + "sns-"
// SnsTopicName is the name of the sns topic created by snsMakeTopic.
SnsTopicName = SnsPrefix + "events"
// SnsSubscriptionProto is used to tell sns that the subscriber uses the http protocol.
// TODO: add https support
SnsSubscriptionProto = "http"
// SnsServerShutdownTimeout is the maximum number of seconds to wait for the http server to shutdown gracefully.
SnsServerShutdownTimeout = 30
// waitTimeout is the duration in seconds of the timeout context in CheckApply.
@@ -65,7 +68,7 @@ const (
type awsEc2Event uint8
const (
awsEc2EventServerReady awsEc2Event = iota
awsEc2EventWatchReady awsEc2Event = iota
awsEc2EventInstanceStopped
awsEc2EventInstanceRunning
awsEc2EventInstanceExists
@@ -103,6 +106,7 @@ type AwsEc2Res struct {
Type string `yaml:"type"` // type of ec2 instance, eg: t2.micro
ImageID string `yaml:"imageid"` // imageid must be available on the chosen region
WatchEndpoint string `yaml:"watchendpoint"` // the public url of the sns endpoint, eg: http://server:12345/
WatchListenAddr string `yaml:"watchlistenaddr"` // the local address or port that the sns listens on, eg: 10.0.0.0:23456 or 23456
// UserData is used to run bash and cloud-init commands on first launch.
// See http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/user-data.html
@@ -127,6 +131,13 @@ type chanStruct struct {
err error
}
// postData is the format of the messages received and decoded by snsPostHandler().
type postData struct {
Type string `json:"Type"`
Token string `json:"Token"`
Message string `json:"Message"`
}
// Default returns some sensible defaults for this resource.
func (obj *AwsEc2Res) Default() Res {
return &AwsEc2Res{
@@ -185,6 +196,13 @@ func (obj *AwsEc2Res) Validate() error {
return fmt.Errorf("imageid must be a valid ami available in the specified region")
}
if obj.WatchEndpoint == "" && obj.WatchListenAddr != "" {
return fmt.Errorf("you must set watchendpoint with watchlistenaddr to use http watch")
}
if obj.WatchEndpoint != "" && obj.WatchListenAddr == "" {
return fmt.Errorf("you must set watchendpoint with watchlistenaddr to use http watch")
}
return obj.BaseRes.Validate()
}
@@ -554,6 +572,16 @@ func (obj *AwsEc2Res) snsWatch() error {
}
}()
log.Printf("%s: Started SNS Endpoint", obj)
// Subscribing the endpoint to the topic needs to happen after starting
// the http server, so that the server can process the subscription
// confirmation. We won't drop incoming connections from aws by this
// point, because we've already opened the server listener. In the
// worst case scenario the incoming aws connections will be accepted
// but will block until our http server finishes getting ready in
// its goroutine.
if err := obj.snsSubscribe(obj.WatchEndpoint, obj.snsTopicArn); err != nil {
return errwrap.Wrapf(err, "error subscribing to sns topic")
}
// process events
for {
select {
@@ -568,6 +596,16 @@ func (obj *AwsEc2Res) snsWatch() error {
if err := msg.err; err != nil {
return err
}
// snsPostHandler sends the ready message after the
// subscription is confirmed. Once the subscription
// is confirmed, we are ready to receive events, so we
// can notify the engine that we're running.
if msg.event == awsEc2EventWatchReady {
if err := obj.Running(); err != nil {
return err
}
continue
}
log.Printf("%s: State: %v", obj, msg.event)
obj.StateOK(false)
send = true
@@ -781,6 +819,9 @@ func (obj *AwsEc2Res) Compare(r Res) bool {
if obj.UserData != res.UserData {
return false
}
if obj.WatchEndpoint != res.WatchEndpoint {
return false
}
if obj.WatchListenAddr != res.WatchListenAddr {
return false
}
@@ -831,9 +872,42 @@ func (obj *AwsEc2Res) snsPostHandler(w http.ResponseWriter, req *http.Request) {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
return
}
post, _ := ioutil.ReadAll(req.Body)
if obj.debug {
log.Printf("%s: Post: %s", obj, string(post))
// decode json
decoder := json.NewDecoder(req.Body)
var post postData
if err := decoder.Decode(&post); err != nil {
http.Error(w, "Bad request", http.StatusBadRequest)
select {
case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "error decoding incoming POST, check struct formatting"),
}:
case <-obj.closeChan:
}
return
}
if post.Type == "SubscriptionConfirmation" {
if err := obj.snsConfirmSubscription(obj.snsTopicArn, post.Token); err != nil {
select {
case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "error confirming subscription"),
}:
case <-obj.closeChan:
}
return
}
// Now that the subscription is confirmed, we can tell the
// engine we're running. If there is a delay between making the
// request and the subscription actually being confirmed,
// amazon will retry sending any new messages every 20 seconds
// for one minute. So, we won't miss any events. See the
// following for more details:
// http://docs.aws.amazon.com/sns/latest/dg/SendMessageToHttp.html#SendMessageToHttp.retry
select {
case obj.awsChan <- &chanStruct{
event: awsEc2EventWatchReady,
}:
case <-obj.closeChan:
}
}
}
@@ -867,13 +941,46 @@ func (obj *AwsEc2Res) snsDeleteTopic(topicArn string) error {
return nil
}
// snsSubscribe subscribes the endpoint to the sns topic.
// Returning SubscriptionArn here is useless as it is still pending confirmation.
func (obj *AwsEc2Res) snsSubscribe(endpoint string, topicArn string) error {
// subscribe to the topic
subInput := &sns.SubscribeInput{
Endpoint: aws.String(endpoint),
Protocol: aws.String(SnsSubscriptionProto),
TopicArn: aws.String(topicArn),
}
_, err := obj.snsClient.Subscribe(subInput)
if err != nil {
return err
}
log.Printf("%s: Created Subscription", obj)
return nil
}
// snsConfirmSubscription confirms the sns subscription.
// Returning SubscriptionArn here is useless as it is still pending confirmation.
func (obj *AwsEc2Res) snsConfirmSubscription(topicArn string, token string) error {
// confirm the subscription
csInput := &sns.ConfirmSubscriptionInput{
Token: aws.String(token),
TopicArn: aws.String(topicArn),
}
_, err := obj.snsClient.ConfirmSubscription(csInput)
if err != nil {
return err
}
log.Printf("%s: Subscription Confirmed", obj)
return nil
}
// Close cleans up when we're done. This is needed to delete some of the AWS
// objects created for the SNS endpoint.
func (obj *AwsEc2Res) Close() error {
var errList error
// clean up sns objects created by Init/snsWatch
if obj.snsClient != nil {
// delete the topic
// delete the topic and associated subscriptions
if err := obj.snsDeleteTopic(obj.snsTopicArn); err != nil {
errList = multierr.Append(errList, err)
}