resources: aws: ec2: Start and stop SNS endpoint in snsWatch

This patch adds snsWatch which launches the HTTP server and listens
for messages on awsChan to forward as events to the mgmt engine.
This commit is contained in:
Jonathan Gold
2017-11-09 22:33:13 -05:00
parent 91eff75288
commit d698b82a83

View File

@@ -52,6 +52,8 @@ const (
SnsPrefix = Ec2Prefix + "sns-"
// SnsTopicName is the name of the sns topic created by snsMakeTopic.
SnsTopicName = SnsPrefix + "events"
// SnsServerShutdownTimeout is the maximum number of seconds to wait for the http server to shutdown gracefully.
SnsServerShutdownTimeout = 30
// waitTimeout is the duration in seconds of the timeout context in CheckApply.
waitTimeout = 400
)
@@ -212,6 +214,9 @@ func (obj *AwsEc2Res) Init() error {
// Watch is the primary listener for this resource and it outputs events.
func (obj *AwsEc2Res) Watch() error {
if obj.WatchListenAddr != "" {
return obj.snsWatch()
}
return obj.longpollWatch()
}
@@ -486,6 +491,73 @@ func (obj *AwsEc2Res) longpollWatch() error {
}
}
// snsWatch uses amazon cloudwatch events and simple notification service to
// detect ec2 instance state changes.
func (obj *AwsEc2Res) snsWatch() error {
send := false
var exit *error
defer obj.wg.Wait()
defer close(obj.closeChan)
// set up the sns endpoint
snsServer := obj.snsServer()
// shutdown the sns endpoint when we're done
defer func() {
ctx, cancel := context.WithTimeout(context.TODO(), SnsServerShutdownTimeout*time.Second)
defer cancel()
if err := snsServer.Shutdown(ctx); err != nil {
if err != context.Canceled {
log.Printf("%s: error stopping sns endpoint: %s", obj, err)
return
}
log.Printf("%s: sns server shutdown cancelled", obj)
}
}()
obj.wg.Add(1)
// start the endpoint
go func() {
defer obj.wg.Done()
defer close(obj.awsChan)
if err := snsServer.ListenAndServe(); err != nil {
// when we shut down
if err == http.ErrServerClosed {
log.Printf("%s: Stopped SNS Endpoint", obj)
return
}
// any other error
select {
case obj.awsChan <- &chanStruct{
err: errwrap.Wrapf(err, "sns server error"),
}:
case <-obj.closeChan:
}
}
}()
log.Printf("%s: Started SNS Endpoint", obj)
// process events
for {
select {
case event := <-obj.Events():
if exit, send = obj.ReadEvent(event); exit != nil {
return *exit
}
case msg, ok := <-obj.awsChan:
if !ok {
return *exit
}
if err := msg.err; err != nil {
return err
}
log.Printf("%s: State: %s", obj, msg.str)
obj.StateOK(false)
send = true
}
if send {
send = false
obj.Event()
}
}
}
// CheckApply method for AwsEc2 resource.
func (obj *AwsEc2Res) CheckApply(apply bool) (checkOK bool, err error) {
log.Printf("%s: CheckApply(%t)", obj, apply)