diff --git a/resources/aws_ec2.go b/resources/aws_ec2.go index 6dd9adf2..3990f229 100644 --- a/resources/aws_ec2.go +++ b/resources/aws_ec2.go @@ -52,6 +52,8 @@ const ( SnsPrefix = Ec2Prefix + "sns-" // SnsTopicName is the name of the sns topic created by snsMakeTopic. SnsTopicName = SnsPrefix + "events" + // SnsServerShutdownTimeout is the maximum number of seconds to wait for the http server to shutdown gracefully. + SnsServerShutdownTimeout = 30 // waitTimeout is the duration in seconds of the timeout context in CheckApply. waitTimeout = 400 ) @@ -212,6 +214,9 @@ func (obj *AwsEc2Res) Init() error { // Watch is the primary listener for this resource and it outputs events. func (obj *AwsEc2Res) Watch() error { + if obj.WatchListenAddr != "" { + return obj.snsWatch() + } return obj.longpollWatch() } @@ -486,6 +491,73 @@ func (obj *AwsEc2Res) longpollWatch() error { } } +// snsWatch uses amazon cloudwatch events and simple notification service to +// detect ec2 instance state changes. +func (obj *AwsEc2Res) snsWatch() error { + send := false + var exit *error + defer obj.wg.Wait() + defer close(obj.closeChan) + // set up the sns endpoint + snsServer := obj.snsServer() + // shutdown the sns endpoint when we're done + defer func() { + ctx, cancel := context.WithTimeout(context.TODO(), SnsServerShutdownTimeout*time.Second) + defer cancel() + if err := snsServer.Shutdown(ctx); err != nil { + if err != context.Canceled { + log.Printf("%s: error stopping sns endpoint: %s", obj, err) + return + } + log.Printf("%s: sns server shutdown cancelled", obj) + } + }() + obj.wg.Add(1) + // start the endpoint + go func() { + defer obj.wg.Done() + defer close(obj.awsChan) + if err := snsServer.ListenAndServe(); err != nil { + // when we shut down + if err == http.ErrServerClosed { + log.Printf("%s: Stopped SNS Endpoint", obj) + return + } + // any other error + select { + case obj.awsChan <- &chanStruct{ + err: errwrap.Wrapf(err, "sns server error"), + }: + case <-obj.closeChan: + } + } + }() + log.Printf("%s: Started SNS Endpoint", obj) + // process events + for { + select { + case event := <-obj.Events(): + if exit, send = obj.ReadEvent(event); exit != nil { + return *exit + } + case msg, ok := <-obj.awsChan: + if !ok { + return *exit + } + if err := msg.err; err != nil { + return err + } + log.Printf("%s: State: %s", obj, msg.str) + obj.StateOK(false) + send = true + } + if send { + send = false + obj.Event() + } + } +} + // CheckApply method for AwsEc2 resource. func (obj *AwsEc2Res) CheckApply(apply bool) (checkOK bool, err error) { log.Printf("%s: CheckApply(%t)", obj, apply)