resources: aws: ec2: Use custom listener for snsServer
This patch replaces the call to Server.ListenAndServe() with Server.Serve(listener) in order to make sure the listener is up and running before we subscribe to the topic in a future patch.
This commit is contained in:
committed by
James Shubin
parent
12fce52cd7
commit
966172eac6
@@ -23,6 +23,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -510,9 +511,17 @@ func (obj *AwsEc2Res) snsWatch() error {
|
|||||||
var exit *error
|
var exit *error
|
||||||
defer obj.wg.Wait()
|
defer obj.wg.Wait()
|
||||||
defer close(obj.closeChan)
|
defer close(obj.closeChan)
|
||||||
// set up the sns endpoint
|
// create the sns listener
|
||||||
snsServer := obj.snsServer()
|
// closing is handled by http.Server.Shutdown in the defer func below
|
||||||
// shutdown the sns endpoint when we're done
|
listener, err := obj.snsListener(obj.WatchListenAddr)
|
||||||
|
if err != nil {
|
||||||
|
return errwrap.Wrapf(err, "error creating listener")
|
||||||
|
}
|
||||||
|
// set up the sns server
|
||||||
|
snsServer := &http.Server{
|
||||||
|
Handler: http.HandlerFunc(obj.snsPostHandler),
|
||||||
|
}
|
||||||
|
// close the listener and shutdown the sns server when we're done
|
||||||
defer func() {
|
defer func() {
|
||||||
ctx, cancel := context.WithTimeout(context.TODO(), SnsServerShutdownTimeout*time.Second)
|
ctx, cancel := context.WithTimeout(context.TODO(), SnsServerShutdownTimeout*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@@ -525,11 +534,11 @@ func (obj *AwsEc2Res) snsWatch() error {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
obj.wg.Add(1)
|
obj.wg.Add(1)
|
||||||
// start the endpoint
|
// start the sns server
|
||||||
go func() {
|
go func() {
|
||||||
defer obj.wg.Done()
|
defer obj.wg.Done()
|
||||||
defer close(obj.awsChan)
|
defer close(obj.awsChan)
|
||||||
if err := snsServer.ListenAndServe(); err != nil {
|
if err := snsServer.Serve(listener); err != nil {
|
||||||
// when we shut down
|
// when we shut down
|
||||||
if err == http.ErrServerClosed {
|
if err == http.ErrServerClosed {
|
||||||
log.Printf("%s: Stopped SNS Endpoint", obj)
|
log.Printf("%s: Stopped SNS Endpoint", obj)
|
||||||
@@ -802,18 +811,18 @@ func (obj *AwsEc2Res) prependName() string {
|
|||||||
return AwsPrefix + obj.GetName()
|
return AwsPrefix + obj.GetName()
|
||||||
}
|
}
|
||||||
|
|
||||||
// snsServer returns an http server used to listen for sns messages.
|
// snsListener returns a listener bound to listenAddr.
|
||||||
func (obj *AwsEc2Res) snsServer() *http.Server {
|
func (obj *AwsEc2Res) snsListener(listenAddr string) (net.Listener, error) {
|
||||||
addr := obj.WatchListenAddr
|
addr := listenAddr
|
||||||
// if addr is a port
|
// if listenAddr is a port
|
||||||
if _, err := strconv.Atoi(obj.WatchListenAddr); err == nil {
|
if _, err := strconv.Atoi(listenAddr); err == nil {
|
||||||
addr = fmt.Sprintf(":%s", obj.WatchListenAddr)
|
addr = fmt.Sprintf(":%s", listenAddr)
|
||||||
}
|
}
|
||||||
handler := http.HandlerFunc(obj.snsPostHandler)
|
listener, err := net.Listen("tcp", addr)
|
||||||
return &http.Server{
|
if err != nil {
|
||||||
Addr: addr,
|
return nil, err
|
||||||
Handler: handler,
|
|
||||||
}
|
}
|
||||||
|
return listener, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// snsPostHandler listens for posts on the SNS Endpoint.
|
// snsPostHandler listens for posts on the SNS Endpoint.
|
||||||
|
|||||||
Reference in New Issue
Block a user