From b670bb8d2ce3f18b2d049cae73327511483e32b0 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Tue, 16 Jan 2024 19:12:07 -0500 Subject: [PATCH] engine: resources: http: Recurse on Init and Watch Make this resource handle sub-resources more powerfully. --- engine/resources/http.go | 119 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 115 insertions(+), 4 deletions(-) diff --git a/engine/resources/http.go b/engine/resources/http.go index 62c0e42b..9cdabdfb 100644 --- a/engine/resources/http.go +++ b/engine/resources/http.go @@ -32,6 +32,7 @@ import ( "github.com/purpleidea/mgmt/engine" "github.com/purpleidea/mgmt/engine/traits" + "github.com/purpleidea/mgmt/pgraph" "github.com/purpleidea/mgmt/util/errwrap" securefilepath "github.com/cyphar/filepath-securejoin" @@ -121,6 +122,7 @@ type HTTPServerRes struct { // TODO: should we allow adding a list of one-of files directly here? + eventsChanMap map[engine.Res]chan error interruptChan chan struct{} conn net.Listener @@ -216,9 +218,62 @@ func (obj *HTTPServerRes) Init(init *engine.Init) error { // NOTE: If we don't Init anything that's autogrouped, then it won't // even get an Init call on it. + + obj.eventsChanMap = make(map[engine.Res]chan error) + // TODO: should we do this in the engine? Do we want to decide it here? for _, res := range obj.GetGroup() { // grouped elements - if err := res.Init(init); err != nil { + // NOTE: We build a new init, but it's not complete. We only add + // what we're planning to use, and we ignore the rest for now... + r := res // bind the variable! + + obj.eventsChanMap[r] = make(chan error) + event := func() { + select { + case obj.eventsChanMap[r] <- nil: + // send! + } + // We don't do this here (why?) we instead read from the + // above channel and then send on multiplexedChan to the + // main loop, where it runs the obj.init.Event function. + //obj.init.Event() // notify engine of an event (this can block) + } + + newInit := &engine.Init{ + Program: obj.init.Program, + Version: obj.init.Version, + Hostname: obj.init.Hostname, + + // Watch: + Running: event, + Event: event, + + // CheckApply: + Refresh: func() bool { + innerRes, ok := r.(engine.RefreshableRes) + if !ok { + panic("res does not support the Refreshable trait") + } + return innerRes.Refresh() + }, + Send: engine.GenerateSendFunc(r), + Recv: engine.GenerateRecvFunc(r), // unused + + FilteredGraph: func() (*pgraph.Graph, error) { + panic("FilteredGraph for HTTP not implemented") + }, + + Local: obj.init.Local, + World: obj.init.World, + //VarDir: obj.init.VarDir, // TODO: wrap this + + Debug: obj.init.Debug, + Logf: func(format string, v ...interface{}) { + obj.init.Logf(r.String()+": "+format, v...) + }, + } + + if err := res.Init(newInit); err != nil { return errwrap.Wrapf(err, "autogrouped Init failed") } } @@ -268,14 +323,60 @@ func (obj *HTTPServerRes) Watch(ctx context.Context) error { //MaxHeaderBytes: 1 << 20, XXX: should we add a param for this? } + multiplexedChan := make(chan error) + defer close(multiplexedChan) // closes after everyone below us is finished + + wg := &sync.WaitGroup{} + defer wg.Wait() + + for _, r := range obj.GetGroup() { // grouped elements + res := r // optional in newer golang + wg.Add(1) + go func() { + defer wg.Done() + defer close(obj.eventsChanMap[res]) // where Watch sends events + if err := res.Watch(ctx); err != nil { + select { + case multiplexedChan <- err: + case <-ctx.Done(): + } + } + }() + // wait for Watch first Running() call or immediate error... + select { + case <-obj.eventsChanMap[res]: // triggers on start or on err... + } + + wg.Add(1) + go func() { + defer wg.Done() + for { + var ok bool + var err error + select { + // receive + case err, ok = <-obj.eventsChanMap[res]: + if !ok { + return + } + } + + // send (multiplex) + select { + case multiplexedChan <- err: + case <-ctx.Done(): + return + } + } + }() + } + // we block until all the children are started first... + obj.init.Running() // when started, notify engine that we're running var closeError error closeSignal := make(chan struct{}) - wg := &sync.WaitGroup{} - defer wg.Wait() - shutdownChan := make(chan struct{}) // server shutdown finished signal wg.Add(1) go func() { @@ -336,6 +437,16 @@ func (obj *HTTPServerRes) Watch(ctx context.Context) error { startupChan = nil send = true + case err, ok := <-multiplexedChan: + if !ok { // shouldn't happen + multiplexedChan = nil + continue + } + if err != nil { + return err + } + send = true + case <-closeSignal: // something shut us down early return closeError