engine: resources: http: Recurse on Init and Watch
Make this resource handle sub-resources more powerfully.
This commit is contained in:
@@ -32,6 +32,7 @@ import (
|
||||
|
||||
"github.com/purpleidea/mgmt/engine"
|
||||
"github.com/purpleidea/mgmt/engine/traits"
|
||||
"github.com/purpleidea/mgmt/pgraph"
|
||||
"github.com/purpleidea/mgmt/util/errwrap"
|
||||
|
||||
securefilepath "github.com/cyphar/filepath-securejoin"
|
||||
@@ -121,6 +122,7 @@ type HTTPServerRes struct {
|
||||
|
||||
// TODO: should we allow adding a list of one-of files directly here?
|
||||
|
||||
eventsChanMap map[engine.Res]chan error
|
||||
interruptChan chan struct{}
|
||||
|
||||
conn net.Listener
|
||||
@@ -216,9 +218,62 @@ func (obj *HTTPServerRes) Init(init *engine.Init) error {
|
||||
|
||||
// NOTE: If we don't Init anything that's autogrouped, then it won't
|
||||
// even get an Init call on it.
|
||||
|
||||
obj.eventsChanMap = make(map[engine.Res]chan error)
|
||||
|
||||
// TODO: should we do this in the engine? Do we want to decide it here?
|
||||
for _, res := range obj.GetGroup() { // grouped elements
|
||||
if err := res.Init(init); err != nil {
|
||||
// NOTE: We build a new init, but it's not complete. We only add
|
||||
// what we're planning to use, and we ignore the rest for now...
|
||||
r := res // bind the variable!
|
||||
|
||||
obj.eventsChanMap[r] = make(chan error)
|
||||
event := func() {
|
||||
select {
|
||||
case obj.eventsChanMap[r] <- nil:
|
||||
// send!
|
||||
}
|
||||
// We don't do this here (why?) we instead read from the
|
||||
// above channel and then send on multiplexedChan to the
|
||||
// main loop, where it runs the obj.init.Event function.
|
||||
//obj.init.Event() // notify engine of an event (this can block)
|
||||
}
|
||||
|
||||
newInit := &engine.Init{
|
||||
Program: obj.init.Program,
|
||||
Version: obj.init.Version,
|
||||
Hostname: obj.init.Hostname,
|
||||
|
||||
// Watch:
|
||||
Running: event,
|
||||
Event: event,
|
||||
|
||||
// CheckApply:
|
||||
Refresh: func() bool {
|
||||
innerRes, ok := r.(engine.RefreshableRes)
|
||||
if !ok {
|
||||
panic("res does not support the Refreshable trait")
|
||||
}
|
||||
return innerRes.Refresh()
|
||||
},
|
||||
Send: engine.GenerateSendFunc(r),
|
||||
Recv: engine.GenerateRecvFunc(r), // unused
|
||||
|
||||
FilteredGraph: func() (*pgraph.Graph, error) {
|
||||
panic("FilteredGraph for HTTP not implemented")
|
||||
},
|
||||
|
||||
Local: obj.init.Local,
|
||||
World: obj.init.World,
|
||||
//VarDir: obj.init.VarDir, // TODO: wrap this
|
||||
|
||||
Debug: obj.init.Debug,
|
||||
Logf: func(format string, v ...interface{}) {
|
||||
obj.init.Logf(r.String()+": "+format, v...)
|
||||
},
|
||||
}
|
||||
|
||||
if err := res.Init(newInit); err != nil {
|
||||
return errwrap.Wrapf(err, "autogrouped Init failed")
|
||||
}
|
||||
}
|
||||
@@ -268,14 +323,60 @@ func (obj *HTTPServerRes) Watch(ctx context.Context) error {
|
||||
//MaxHeaderBytes: 1 << 20, XXX: should we add a param for this?
|
||||
}
|
||||
|
||||
multiplexedChan := make(chan error)
|
||||
defer close(multiplexedChan) // closes after everyone below us is finished
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
defer wg.Wait()
|
||||
|
||||
for _, r := range obj.GetGroup() { // grouped elements
|
||||
res := r // optional in newer golang
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer close(obj.eventsChanMap[res]) // where Watch sends events
|
||||
if err := res.Watch(ctx); err != nil {
|
||||
select {
|
||||
case multiplexedChan <- err:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
}()
|
||||
// wait for Watch first Running() call or immediate error...
|
||||
select {
|
||||
case <-obj.eventsChanMap[res]: // triggers on start or on err...
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
var ok bool
|
||||
var err error
|
||||
select {
|
||||
// receive
|
||||
case err, ok = <-obj.eventsChanMap[res]:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// send (multiplex)
|
||||
select {
|
||||
case multiplexedChan <- err:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
// we block until all the children are started first...
|
||||
|
||||
obj.init.Running() // when started, notify engine that we're running
|
||||
|
||||
var closeError error
|
||||
closeSignal := make(chan struct{})
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
defer wg.Wait()
|
||||
|
||||
shutdownChan := make(chan struct{}) // server shutdown finished signal
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
@@ -336,6 +437,16 @@ func (obj *HTTPServerRes) Watch(ctx context.Context) error {
|
||||
startupChan = nil
|
||||
send = true
|
||||
|
||||
case err, ok := <-multiplexedChan:
|
||||
if !ok { // shouldn't happen
|
||||
multiplexedChan = nil
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
send = true
|
||||
|
||||
case <-closeSignal: // something shut us down early
|
||||
return closeError
|
||||
|
||||
|
||||
Reference in New Issue
Block a user