From e2296a631b5bcfc6c2287f2d60c929bd64d7f9ab Mon Sep 17 00:00:00 2001 From: James Shubin Date: Sat, 12 Jan 2019 14:27:36 -0500 Subject: [PATCH] engine: event: Switch events system to use simpler structs Pass around pointers of things now. Also, naming is vastly improved and clearer. --- engine/event/event.go | 60 +++++++++++++++++++++++++++++++++---- engine/graph/actions.go | 6 ++-- engine/graph/engine.go | 8 ++--- engine/graph/state.go | 66 ++++++++++++++++++++--------------------- engine/resources.go | 4 +-- 5 files changed, 97 insertions(+), 47 deletions(-) diff --git a/engine/event/event.go b/engine/event/event.go index 7e32353d..54ec5ff6 100644 --- a/engine/event/event.go +++ b/engine/event/event.go @@ -25,9 +25,59 @@ type Kind int // The different event kinds are used in different contexts. const ( - EventNil Kind = iota - EventStart - EventPause - EventPoke - EventExit + KindNil Kind = iota + KindStart + KindPause + KindPoke + KindExit ) + +// Pre-built messages so they can be used directly without having to use NewMsg. +// These are useful when we don't want a response via ACK(). +var ( + Start = &Msg{Kind: KindStart} + Pause = &Msg{Kind: KindPause} // probably unused b/c we want a resp + Poke = &Msg{Kind: KindPoke} + Exit = &Msg{Kind: KindExit} +) + +// Msg is an event primitive that represents a kind of event, and optionally a +// request for an ACK. +type Msg struct { + Kind Kind + + resp chan struct{} +} + +// NewMsg builds a new message struct. It will want an ACK. If you don't want an +// ACK then use the pre-built messages in the package variable globals. +func NewMsg(kind Kind) *Msg { + return &Msg{ + Kind: kind, + resp: make(chan struct{}), + } +} + +// CanACK determines if an ACK is possible for this message. It does not say +// whether one has already been sent or not. +func (obj *Msg) CanACK() bool { + return obj.resp != nil +} + +// ACK acknowledges the event. It must not be called more than once for the same +// event. It unblocks the past and future calls of Wait for this event. +func (obj *Msg) ACK() { + close(obj.resp) +} + +// Wait on ACK for this event. It doesn't matter if this runs before or after +// the ACK. It will unblock either way. +// TODO: consider adding a context if it's ever useful. +func (obj *Msg) Wait() error { + select { + //case <-ctx.Done(): + // return ctx.Err() + case <-obj.resp: + return nil + } +} diff --git a/engine/graph/actions.go b/engine/graph/actions.go index 3eaad4b2..253c02d6 100644 --- a/engine/graph/actions.go +++ b/engine/graph/actions.go @@ -347,7 +347,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { var limiter = rate.NewLimiter(res.MetaParams().Limit, res.MetaParams().Burst) // It is important that we shutdown the Watch loop if this exits. // Example, if Process errors permanently, we should ask Watch to exit. - defer obj.state[vertex].Event(event.EventExit) // signal an exit + defer obj.state[vertex].Event(event.Exit) // signal an exit for { select { case err, ok := <-obj.state[vertex].outputChan: // read from watch channel @@ -464,11 +464,11 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { // err = errwrap.Wrapf(err, "permanent process error") //} - // If this exits, defer calls Event(event.EventExit), + // If this exits, defer calls: obj.Event(event.Exit), // which will cause the Watch loop to shutdown. Also, // if the Watch loop shuts down, that will cause this // Process loop to shut down. Also the graph sync can - // run an Event(event.EventExit) which causes this to + // run an: obj.Event(event.Exit) which causes this to // shutdown as well. Lastly, it is possible that more // that one of these scenarios happens simultaneously. return err diff --git a/engine/graph/engine.go b/engine/graph/engine.go index 47636bf0..063c8479 100644 --- a/engine/graph/engine.go +++ b/engine/graph/engine.go @@ -196,8 +196,8 @@ func (obj *Engine) Commit() error { } vertexRemoveFn := func(vertex pgraph.Vertex) error { // wait for exit before starting new graph! - obj.state[vertex].Event(event.EventExit) // signal an exit - obj.waits[vertex].Wait() // sync + obj.state[vertex].Event(event.Exit) // signal an exit + obj.waits[vertex].Wait() // sync // close the state and resource // FIXME: will this mess up the sync and block the engine? @@ -276,7 +276,7 @@ func (obj *Engine) Start() error { } if unpause { // unpause (if needed) - obj.state[vertex].Event(event.EventStart) + obj.state[vertex].Event(event.Start) } } // we wait for everyone to start before exiting! @@ -301,7 +301,7 @@ func (obj *Engine) Pause(fastPause bool) { for _, vertex := range topoSort { // squeeze out the events... // The Event is sent to an unbuffered channel, so this event is // synchronous, and as a result it blocks until it is received. - obj.state[vertex].Event(event.EventPause) + obj.state[vertex].Event(event.Pause) } // we are now completely paused... diff --git a/engine/graph/state.go b/engine/graph/state.go index 3c2a90ef..e86f2d55 100644 --- a/engine/graph/state.go +++ b/engine/graph/state.go @@ -65,7 +65,7 @@ type State struct { // events is a channel of incoming events which is read by the Watch // loop for that resource. It receives events like pause, start, and // poke. The channel shuts down to signal for Watch to exit. - eventsChan chan event.Kind // incoming to resource + eventsChan chan *event.Msg // incoming to resource eventsLock *sync.Mutex // lock around sending and closing of events channel eventsDone bool // is channel closed? @@ -93,7 +93,7 @@ type State struct { // Init initializes structures like channels. func (obj *State) Init() error { - obj.eventsChan = make(chan event.Kind) + obj.eventsChan = make(chan *event.Msg) obj.eventsLock = &sync.Mutex{} obj.outputChan = make(chan error) @@ -256,7 +256,7 @@ func (obj *State) Poke() { // Event sends a Pause or Start event to the resource. It can also be used to // send Poke events, but it's much more efficient to send them directly instead // of passing them through the resource. -func (obj *State) Event(kind event.Kind) { +func (obj *State) Event(msg *event.Msg) { // TODO: should these happen after the lock? obj.wg.Add(1) defer obj.wg.Done() @@ -268,7 +268,7 @@ func (obj *State) Event(kind event.Kind) { return } - if kind == event.EventExit { // set this so future events don't deadlock + if msg.Kind == event.KindExit { // set this so future events don't deadlock obj.Logf("exit event...") obj.eventsDone = true close(obj.eventsChan) // causes resource Watch loop to close @@ -277,7 +277,7 @@ func (obj *State) Event(kind event.Kind) { } select { - case obj.eventsChan <- kind: + case obj.eventsChan <- msg: case <-obj.exit.Signal(): } @@ -285,40 +285,40 @@ func (obj *State) Event(kind event.Kind) { // read is a helper function used inside the main select statement of resources. // If it returns an error, then this is a signal for the resource to exit. -func (obj *State) read(kind event.Kind) error { - switch kind { - case event.EventPoke: +func (obj *State) read(msg *event.Msg) error { + switch msg.Kind { + case event.KindPoke: return obj.event() // a poke needs to cause an event... - case event.EventStart: + case event.KindStart: return fmt.Errorf("unexpected start") - case event.EventPause: + case event.KindPause: // pass - case event.EventExit: + case event.KindExit: return engine.ErrSignalExit default: - return fmt.Errorf("unhandled event: %+v", kind) + return fmt.Errorf("unhandled event: %+v", msg.Kind) } // we're paused now select { - case kind, ok := <-obj.eventsChan: + case msg, ok := <-obj.eventsChan: if !ok { return engine.ErrWatchExit } - switch kind { - case event.EventPoke: + switch msg.Kind { + case event.KindPoke: return fmt.Errorf("unexpected poke") - case event.EventPause: + case event.KindPause: return fmt.Errorf("unexpected pause") - case event.EventStart: + case event.KindStart: // resumed return nil - case event.EventExit: + case event.KindExit: return engine.ErrSignalExit default: - return fmt.Errorf("unhandled event: %+v", kind) + return fmt.Errorf("unhandled event: %+v", msg.Kind) } } } @@ -335,45 +335,45 @@ func (obj *State) event() error { return nil // sent event! // make sure to keep handling incoming - case kind, ok := <-obj.eventsChan: + case msg, ok := <-obj.eventsChan: if !ok { return engine.ErrWatchExit } - switch kind { - case event.EventPoke: + switch msg.Kind { + case event.KindPoke: // we're trying to send an event, so swallow the // poke: it's what we wanted to have happen here continue - case event.EventStart: + case event.KindStart: return fmt.Errorf("unexpected start") - case event.EventPause: + case event.KindPause: // pass - case event.EventExit: + case event.KindExit: return engine.ErrSignalExit default: - return fmt.Errorf("unhandled event: %+v", kind) + return fmt.Errorf("unhandled event: %+v", msg.Kind) } } // we're paused now select { - case kind, ok := <-obj.eventsChan: + case msg, ok := <-obj.eventsChan: if !ok { return engine.ErrWatchExit } - switch kind { - case event.EventPoke: + switch msg.Kind { + case event.KindPoke: return fmt.Errorf("unexpected poke") - case event.EventPause: + case event.KindPause: return fmt.Errorf("unexpected pause") - case event.EventStart: + case event.KindStart: // resumed - case event.EventExit: + case event.KindExit: return engine.ErrSignalExit default: - return fmt.Errorf("unhandled event: %+v", kind) + return fmt.Errorf("unhandled event: %+v", msg.Kind) } } } diff --git a/engine/resources.go b/engine/resources.go index 3d0fdd90..fec342ed 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -100,11 +100,11 @@ type Init struct { // Events returns a channel that we must watch for messages from the // engine. When it closes, this is a signal to shutdown. - Events chan event.Kind + Events chan *event.Msg // Read processes messages that come in from the Events channel. It is a // helper method that knows how to handle the pause mechanism correctly. - Read func(event.Kind) error + Read func(*event.Msg) error // Dirty marks the resource state as dirty. This signals to the engine // that CheckApply will have some work to do in order to converge it.