resources, pgraph: Refactor Worker and simplify API

I'm still working on reducing the size of the monster patches that I
land, but I'm exercising the priviledge as the initial author. In any
case, this refactors worker into two, and cleans up the passing around
of the processChan. This puts common code into Init and Close.
This commit is contained in:
James Shubin
2017-02-10 09:37:12 -05:00
parent 2da21f90f4
commit fccf508dde
16 changed files with 276 additions and 274 deletions

View File

@@ -273,7 +273,7 @@ sending out erroneous `Event` messages to keep things alive until it finishes.
#### Example #### Example
```golang ```golang
// Watch is the listener and main loop for this resource. // Watch is the listener and main loop for this resource.
func (obj *FooRes) Watch(processChan chan *event.Event) error { func (obj *FooRes) Watch() error {
// setup the Foo resource // setup the Foo resource
var err error var err error
if err, obj.foo = OpenFoo(); err != nil { if err, obj.foo = OpenFoo(); err != nil {
@@ -282,7 +282,7 @@ func (obj *FooRes) Watch(processChan chan *event.Event) error {
defer obj.whatever.CloseFoo() // shutdown our defer obj.whatever.CloseFoo() // shutdown our
// notify engine that we're running // notify engine that we're running
if err := obj.Running(processChan); err != nil { if err := obj.Running(); err != nil {
return err // bubble up a NACK... return err // bubble up a NACK...
} }
@@ -311,7 +311,7 @@ func (obj *FooRes) Watch(processChan chan *event.Event) error {
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
send = false send = false
obj.Event(processChan) obj.Event() // send the event!
} }
} }
} }

View File

@@ -235,8 +235,8 @@ func (g *Graph) Process(v *Vertex) error {
// would simplify the resources by removing boilerplate // would simplify the resources by removing boilerplate
if v.Meta().Poll > 0 { if v.Meta().Poll > 0 {
if !checkOK { // something changed, restart timer if !checkOK { // something changed, restart timer
cuid := v.Res.ConvergerUID() // get the converger uid used to report status cuid, _, _ := v.Res.ConvergerUIDs() // get the converger uid used to report status
cuid.ResetTimer() // activity! cuid.ResetTimer() // activity!
if g.Flags.Debug { if g.Flags.Debug {
log.Printf("%s[%s]: Converger: ResetTimer", obj.Kind(), obj.GetName()) log.Printf("%s[%s]: Converger: ResetTimer", obj.Kind(), obj.GetName())
} }
@@ -306,6 +306,149 @@ func (obj *SentinelErr) Error() string {
return obj.err.Error() return obj.err.Error()
} }
// innerWorker is the CheckApply runner that reads from processChan.
func (g *Graph) innerWorker(v *Vertex) {
obj := v.Res
running := false
done := make(chan struct{})
playback := false // do we need to run another one?
_, wcuid, pcuid := obj.ConvergerUIDs() // get extra cuids (worker, process)
waiting := false
var timer = time.NewTimer(time.Duration(math.MaxInt64)) // longest duration
if !timer.Stop() {
<-timer.C // unnecessary, shouldn't happen
}
var delay = time.Duration(v.Meta().Delay) * time.Millisecond
var retry = v.Meta().Retry // number of tries left, -1 for infinite
var limiter = rate.NewLimiter(v.Meta().Limit, v.Meta().Burst)
limited := false
Loop:
for {
select {
case ev, ok := <-obj.ProcessChan(): // must use like this
if !ok { // processChan closed, let's exit
break Loop // no event, so no ack!
}
if v.Res.Meta().Poll == 0 { // skip for polling
wcuid.SetConverged(false)
}
// if process started, but no action yet, skip!
if v.Res.GetState() == resources.ResStateProcess {
if g.Flags.Debug {
log.Printf("%s[%s]: Skipped event!", v.Kind(), v.GetName())
}
ev.ACK() // ready for next message
continue
}
// if running, we skip running a new execution!
// if waiting, we skip running a new execution!
if running || waiting {
if g.Flags.Debug {
log.Printf("%s[%s]: Playback added!", v.Kind(), v.GetName())
}
playback = true
ev.ACK() // ready for next message
continue
}
// catch invalid rates
if v.Meta().Burst == 0 && !(v.Meta().Limit == rate.Inf) { // blocked
e := fmt.Errorf("%s[%s]: Permanently limited (rate != Inf, burst: 0)", v.Kind(), v.GetName())
v.SendEvent(event.EventExit, &SentinelErr{e})
ev.ACK() // ready for next message
continue
}
// rate limit
// FIXME: consider skipping rate limit check if
// the event is a poke instead of a watch event
if !limited && !(v.Meta().Limit == rate.Inf) { // skip over the playback event...
now := time.Now()
r := limiter.ReserveN(now, 1) // one event
// r.OK() seems to always be true here!
d := r.DelayFrom(now)
if d > 0 { // delay
limited = true
playback = true
log.Printf("%s[%s]: Limited (rate: %v/sec, burst: %d, next: %v)", v.Kind(), v.GetName(), v.Meta().Limit, v.Meta().Burst, d)
// start the timer...
timer.Reset(d)
waiting = true // waiting for retry timer
ev.ACK()
continue
} // otherwise, we run directly!
}
limited = false // let one through
running = true
go func(ev *event.Event) {
pcuid.SetConverged(false) // "block" Process
if e := g.Process(v); e != nil {
playback = true
log.Printf("%s[%s]: CheckApply errored: %v", v.Kind(), v.GetName(), e)
if retry == 0 {
// wrap the error in the sentinel
v.SendEvent(event.EventExit, &SentinelErr{e})
return
}
if retry > 0 { // don't decrement the -1
retry--
}
log.Printf("%s[%s]: CheckApply: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), delay.Seconds(), retry)
// start the timer...
timer.Reset(delay)
waiting = true // waiting for retry timer
return
}
retry = v.Meta().Retry // reset on success
close(done) // trigger
}(ev)
ev.ACK() // sync (now mostly useless)
case <-timer.C:
if v.Res.Meta().Poll == 0 { // skip for polling
wcuid.SetConverged(false)
}
waiting = false
if !timer.Stop() {
//<-timer.C // blocks, docs are wrong!
}
log.Printf("%s[%s]: CheckApply delay expired!", v.Kind(), v.GetName())
close(done)
// a CheckApply run (with possibly retry pause) finished
case <-done:
if v.Res.Meta().Poll == 0 { // skip for polling
wcuid.SetConverged(false)
}
if g.Flags.Debug {
log.Printf("%s[%s]: CheckApply finished!", v.Kind(), v.GetName())
}
done = make(chan struct{}) // reset
// re-send this event, to trigger a CheckApply()
if playback {
playback = false
// this lock avoids us sending to
// channel after we've closed it!
// TODO: can this experience indefinite postponement ?
// see: https://github.com/golang/go/issues/11506
go obj.Event() // replay a new event
}
running = false
pcuid.SetConverged(true) // "unblock" Process
case <-wcuid.ConvergedTimer():
wcuid.SetConverged(true) // converged!
continue
}
}
}
// Worker is the common run frontend of the vertex. It handles all of the retry // Worker is the common run frontend of the vertex. It handles all of the retry
// and retry delay common code, and ultimately returns the final status of this // and retry delay common code, and ultimately returns the final status of this
// vertex execution. // vertex execution.
@@ -316,172 +459,30 @@ func (g *Graph) Worker(v *Vertex) error {
// the Watch() function about which graph it is // the Watch() function about which graph it is
// running on, which isolates things nicely... // running on, which isolates things nicely...
obj := v.Res obj := v.Res
if g.Flags.Debug {
// run the init (should match 1-1 with Close function if this succeeds) log.Printf("%s[%s]: Worker: Running", v.Kind(), v.GetName())
defer log.Printf("%s[%s]: Worker: Stopped", v.Kind(), v.GetName())
}
// run the init (should match 1-1 with Close function)
if err := obj.Init(); err != nil { if err := obj.Init(); err != nil {
// always exit the worker function by finishing with Close()
if e := obj.Close(); e != nil {
err = multierr.Append(err, e) // list of errors
}
return errwrap.Wrapf(err, "could not Init() resource") return errwrap.Wrapf(err, "could not Init() resource")
} }
lock := &sync.Mutex{} // lock around processChan closing and sending
finished := false // did we close processChan ?
processChan := make(chan *event.Event)
// if the CheckApply run takes longer than the converged // if the CheckApply run takes longer than the converged
// timeout, we could inappropriately converge mid-apply! // timeout, we could inappropriately converge mid-apply!
// avoid this by blocking convergence with a fake report // avoid this by blocking convergence with a fake report
// we also add a similar blocker around the worker loop! // we also add a similar blocker around the worker loop!
wcuid := obj.Converger().Register() // get an extra cuid for the worker! _, wcuid, pcuid := obj.ConvergerUIDs() // get extra cuids (worker, process)
defer wcuid.Unregister() // XXX: put these in Init() ?
wcuid.SetConverged(true) // starts off false, and waits for loop timeout wcuid.SetConverged(true) // starts off false, and waits for loop timeout
pcuid := obj.Converger().Register() // get an extra cuid for the process
defer pcuid.Unregister()
pcuid.SetConverged(true) // starts off true, because it's not running... pcuid.SetConverged(true) // starts off true, because it's not running...
go func() { go g.innerWorker(v)
running := false
done := make(chan struct{})
playback := false // do we need to run another one?
waiting := false
var timer = time.NewTimer(time.Duration(math.MaxInt64)) // longest duration
if !timer.Stop() {
<-timer.C // unnecessary, shouldn't happen
}
var delay = time.Duration(v.Meta().Delay) * time.Millisecond
var retry = v.Meta().Retry // number of tries left, -1 for infinite
var limiter = rate.NewLimiter(v.Meta().Limit, v.Meta().Burst)
limited := false
Loop:
for {
select {
case ev, ok := <-processChan: // must use like this
if !ok { // processChan closed, let's exit
break Loop // no event, so no ack!
}
if v.Res.Meta().Poll == 0 { // skip for polling
wcuid.SetConverged(false)
}
// if process started, but no action yet, skip!
if v.Res.GetState() == resources.ResStateProcess {
if g.Flags.Debug {
log.Printf("%s[%s]: Skipped event!", v.Kind(), v.GetName())
}
ev.ACK() // ready for next message
continue
}
// if running, we skip running a new execution!
// if waiting, we skip running a new execution!
if running || waiting {
if g.Flags.Debug {
log.Printf("%s[%s]: Playback added!", v.Kind(), v.GetName())
}
playback = true
ev.ACK() // ready for next message
continue
}
// catch invalid rates
if v.Meta().Burst == 0 && !(v.Meta().Limit == rate.Inf) { // blocked
e := fmt.Errorf("%s[%s]: Permanently limited (rate != Inf, burst: 0)", v.Kind(), v.GetName())
v.SendEvent(event.EventExit, &SentinelErr{e})
ev.ACK() // ready for next message
continue
}
// rate limit
// FIXME: consider skipping rate limit check if
// the event is a poke instead of a watch event
if !limited && !(v.Meta().Limit == rate.Inf) { // skip over the playback event...
now := time.Now()
r := limiter.ReserveN(now, 1) // one event
// r.OK() seems to always be true here!
d := r.DelayFrom(now)
if d > 0 { // delay
limited = true
playback = true
log.Printf("%s[%s]: Limited (rate: %v/sec, burst: %d, next: %v)", v.Kind(), v.GetName(), v.Meta().Limit, v.Meta().Burst, d)
// start the timer...
timer.Reset(d)
waiting = true // waiting for retry timer
ev.ACK()
continue
} // otherwise, we run directly!
}
limited = false // let one through
running = true
go func(ev *event.Event) {
pcuid.SetConverged(false) // "block" Process
if e := g.Process(v); e != nil {
playback = true
log.Printf("%s[%s]: CheckApply errored: %v", v.Kind(), v.GetName(), e)
if retry == 0 {
// wrap the error in the sentinel
v.SendEvent(event.EventExit, &SentinelErr{e})
return
}
if retry > 0 { // don't decrement the -1
retry--
}
log.Printf("%s[%s]: CheckApply: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), delay.Seconds(), retry)
// start the timer...
timer.Reset(delay)
waiting = true // waiting for retry timer
return
}
retry = v.Meta().Retry // reset on success
close(done) // trigger
}(ev)
ev.ACK() // sync (now mostly useless)
case <-timer.C:
if v.Res.Meta().Poll == 0 { // skip for polling
wcuid.SetConverged(false)
}
waiting = false
if !timer.Stop() {
//<-timer.C // blocks, docs are wrong!
}
log.Printf("%s[%s]: CheckApply delay expired!", v.Kind(), v.GetName())
close(done)
// a CheckApply run (with possibly retry pause) finished
case <-done:
if v.Res.Meta().Poll == 0 { // skip for polling
wcuid.SetConverged(false)
}
if g.Flags.Debug {
log.Printf("%s[%s]: CheckApply finished!", v.Kind(), v.GetName())
}
done = make(chan struct{}) // reset
// re-send this event, to trigger a CheckApply()
if playback {
playback = false
// this lock avoids us sending to
// channel after we've closed it!
lock.Lock()
go func() {
if !finished {
// TODO: can this experience indefinite postponement ?
// see: https://github.com/golang/go/issues/11506
obj.Event(processChan) // replay a new event
}
lock.Unlock()
}()
}
running = false
pcuid.SetConverged(true) // "unblock" Process
case <-wcuid.ConvergedTimer():
wcuid.SetConverged(true) // converged!
continue
}
}
}()
var err error // propagate the error up (this is a permanent BAD error!) var err error // propagate the error up (this is a permanent BAD error!)
// the watch delay runs inside of the Watch resource loop, so that it // the watch delay runs inside of the Watch resource loop, so that it
// can still process signals and exit if needed. It shouldn't run any // can still process signals and exit if needed. It shouldn't run any
@@ -545,24 +546,22 @@ func (g *Graph) Worker(v *Vertex) error {
// NOTE: we can avoid the send if running Watch guarantees // NOTE: we can avoid the send if running Watch guarantees
// one CheckApply event on startup! // one CheckApply event on startup!
//if pendingSendEvent { // TODO: should this become a list in the future? //if pendingSendEvent { // TODO: should this become a list in the future?
// if exit, err := obj.DoSend(processChan, ""); exit || err != nil { // if err := obj.Event() err != nil {
// return err // we exit or bubble up a NACK... // return err // we exit or bubble up a NACK...
// } // }
//} //}
} }
// TODO: reset the watch retry count after some amount of success // TODO: reset the watch retry count after some amount of success
v.Res.RegisterConverger()
var e error var e error
if v.Res.Meta().Poll > 0 { // poll instead of watching :( if v.Res.Meta().Poll > 0 { // poll instead of watching :(
cuid := v.Res.ConvergerUID() // get the converger uid used to report status cuid, _, _ := v.Res.ConvergerUIDs() // get the converger uid used to report status
cuid.StartTimer() cuid.StartTimer()
e = v.Res.Poll(processChan) e = v.Res.Poll()
cuid.StopTimer() // clean up nicely cuid.StopTimer() // clean up nicely
} else { } else {
e = v.Res.Watch(processChan) // run the watch normally e = v.Res.Watch() // run the watch normally
} }
v.Res.UnregisterConverger()
if e == nil { // exit signal if e == nil { // exit signal
err = nil // clean exit err = nil // clean exit
break break
@@ -586,10 +585,6 @@ func (g *Graph) Worker(v *Vertex) error {
// by getting the Watch resource to send one event once it's up! // by getting the Watch resource to send one event once it's up!
//v.SendEvent(eventPoke, false, false) //v.SendEvent(eventPoke, false, false)
} }
lock.Lock() // lock to avoid a send when closed!
finished = true
close(processChan)
lock.Unlock()
// close resource and return possible errors if any // close resource and return possible errors if any
if e := obj.Close(); err == nil { if e := obj.Close(); err == nil {
@@ -635,6 +630,7 @@ func (g *Graph) Start(first bool) { // start or continue
// TODO: if a sufficient number of workers error, // TODO: if a sufficient number of workers error,
// should something be done? Should these restart // should something be done? Should these restart
// after perma-failure if we have a graph change? // after perma-failure if we have a graph change?
log.Printf("%s[%s]: Started", vv.Kind(), vv.GetName())
if err := g.Worker(vv); err != nil { // contains the Watch and CheckApply loops if err := g.Worker(vv); err != nil { // contains the Watch and CheckApply loops
log.Printf("%s[%s]: Exited with failure: %v", vv.Kind(), vv.GetName(), err) log.Printf("%s[%s]: Exited with failure: %v", vv.Kind(), vv.GetName(), err)
return return

View File

@@ -25,7 +25,6 @@ import (
"os" "os"
"strings" "strings"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/recwatch" "github.com/purpleidea/mgmt/recwatch"
errwrap "github.com/pkg/errors" errwrap "github.com/pkg/errors"
@@ -101,7 +100,7 @@ func (obj *AugeasRes) Init() error {
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
// Taken from the File resource. // Taken from the File resource.
// FIXME: DRY - This is taken from the file resource // FIXME: DRY - This is taken from the file resource
func (obj *AugeasRes) Watch(processChan chan *event.Event) error { func (obj *AugeasRes) Watch() error {
var err error var err error
obj.recWatcher, err = recwatch.NewRecWatcher(obj.File, false) obj.recWatcher, err = recwatch.NewRecWatcher(obj.File, false)
if err != nil { if err != nil {
@@ -110,7 +109,7 @@ func (obj *AugeasRes) Watch(processChan chan *event.Event) error {
defer obj.recWatcher.Close() defer obj.recWatcher.Close()
// notify engine that we're running // notify engine that we're running
if err := obj.Running(processChan); err != nil { if err := obj.Running(); err != nil {
return err // bubble up a NACK... return err // bubble up a NACK...
} }
@@ -146,7 +145,7 @@ func (obj *AugeasRes) Watch(processChan chan *event.Event) error {
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
send = false send = false
obj.Event(processChan) obj.Event()
} }
} }
} }

View File

@@ -26,7 +26,6 @@ import (
"os/exec" "os/exec"
"strings" "strings"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
errwrap "github.com/pkg/errors" errwrap "github.com/pkg/errors"
@@ -98,7 +97,7 @@ func (obj *ExecRes) BufioChanScanner(scanner *bufio.Scanner) (chan string, chan
} }
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *ExecRes) Watch(processChan chan *event.Event) error { func (obj *ExecRes) Watch() error {
var send = false // send event? var send = false // send event?
var exit *error var exit *error
bufioch, errch := make(chan string), make(chan error) bufioch, errch := make(chan string), make(chan error)
@@ -141,7 +140,7 @@ func (obj *ExecRes) Watch(processChan chan *event.Event) error {
} }
// notify engine that we're running // notify engine that we're running
if err := obj.Running(processChan); err != nil { if err := obj.Running(); err != nil {
return err // bubble up a NACK... return err // bubble up a NACK...
} }
@@ -174,7 +173,7 @@ func (obj *ExecRes) Watch(processChan chan *event.Event) error {
send = false send = false
// it is okay to invalidate the clean state on poke too // it is okay to invalidate the clean state on poke too
obj.StateOK(false) // something made state dirty obj.StateOK(false) // something made state dirty
obj.Event(processChan) obj.Event()
} }
} }
} }

View File

@@ -34,7 +34,6 @@ import (
"strings" "strings"
"syscall" "syscall"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/recwatch" "github.com/purpleidea/mgmt/recwatch"
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
@@ -181,7 +180,7 @@ func (obj *FileRes) GetPath() string {
// If the Watch returns an error, it means that something has gone wrong, and it // If the Watch returns an error, it means that something has gone wrong, and it
// must be restarted. On a clean exit it returns nil. // must be restarted. On a clean exit it returns nil.
// FIXME: Also watch the source directory when using obj.Source !!! // FIXME: Also watch the source directory when using obj.Source !!!
func (obj *FileRes) Watch(processChan chan *event.Event) error { func (obj *FileRes) Watch() error {
var err error var err error
obj.recWatcher, err = recwatch.NewRecWatcher(obj.path, obj.Recurse) obj.recWatcher, err = recwatch.NewRecWatcher(obj.path, obj.Recurse)
if err != nil { if err != nil {
@@ -190,7 +189,7 @@ func (obj *FileRes) Watch(processChan chan *event.Event) error {
defer obj.recWatcher.Close() defer obj.recWatcher.Close()
// notify engine that we're running // notify engine that we're running
if err := obj.Running(processChan); err != nil { if err := obj.Running(); err != nil {
return err // bubble up a NACK... return err // bubble up a NACK...
} }
@@ -226,7 +225,7 @@ func (obj *FileRes) Watch(processChan chan *event.Event) error {
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
send = false send = false
obj.Event(processChan) obj.Event()
} }
} }
} }

View File

@@ -23,7 +23,6 @@ import (
"fmt" "fmt"
"log" "log"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
"github.com/godbus/dbus" "github.com/godbus/dbus"
@@ -102,7 +101,7 @@ func (obj *HostnameRes) Init() error {
} }
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *HostnameRes) Watch(processChan chan *event.Event) error { func (obj *HostnameRes) Watch() error {
// if we share the bus with others, we will get each others messages!! // if we share the bus with others, we will get each others messages!!
bus, err := util.SystemBusPrivateUsable() // don't share the bus connection! bus, err := util.SystemBusPrivateUsable() // don't share the bus connection!
if err != nil { if err != nil {
@@ -120,7 +119,7 @@ func (obj *HostnameRes) Watch(processChan chan *event.Event) error {
bus.Signal(signals) bus.Signal(signals)
// notify engine that we're running // notify engine that we're running
if err := obj.Running(processChan); err != nil { if err := obj.Running(); err != nil {
return err // bubble up a NACK... return err // bubble up a NACK...
} }
@@ -144,7 +143,7 @@ func (obj *HostnameRes) Watch(processChan chan *event.Event) error {
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
send = false send = false
obj.Event(processChan) obj.Event()
} }
} }
} }

View File

@@ -24,8 +24,6 @@ import (
"regexp" "regexp"
"strings" "strings"
"github.com/purpleidea/mgmt/event"
"github.com/coreos/go-systemd/journal" "github.com/coreos/go-systemd/journal"
) )
@@ -122,9 +120,9 @@ func (obj *MsgRes) journalPriority() journal.Priority {
} }
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *MsgRes) Watch(processChan chan *event.Event) error { func (obj *MsgRes) Watch() error {
// notify engine that we're running // notify engine that we're running
if err := obj.Running(processChan); err != nil { if err := obj.Running(); err != nil {
return err // bubble up a NACK... return err // bubble up a NACK...
} }
@@ -142,7 +140,7 @@ func (obj *MsgRes) Watch(processChan chan *event.Event) error {
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
send = false send = false
obj.Event(processChan) obj.Event()
} }
} }
} }

View File

@@ -21,8 +21,6 @@ import (
"encoding/gob" "encoding/gob"
"fmt" "fmt"
"log" "log"
"github.com/purpleidea/mgmt/event"
) )
func init() { func init() {
@@ -56,9 +54,9 @@ func (obj *NoopRes) Init() error {
} }
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *NoopRes) Watch(processChan chan *event.Event) error { func (obj *NoopRes) Watch() error {
// notify engine that we're running // notify engine that we're running
if err := obj.Running(processChan); err != nil { if err := obj.Running(); err != nil {
return err // bubble up a NACK... return err // bubble up a NACK...
} }
@@ -76,7 +74,7 @@ func (obj *NoopRes) Watch(processChan chan *event.Event) error {
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
send = false send = false
obj.Event(processChan) obj.Event()
} }
} }
} }

View File

@@ -23,7 +23,6 @@ import (
"fmt" "fmt"
"log" "log"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
systemdUtil "github.com/coreos/go-systemd/util" systemdUtil "github.com/coreos/go-systemd/util"
@@ -98,7 +97,7 @@ func (obj *NspawnRes) Init() error {
} }
// Watch for state changes and sends a message to the bus if there is a change // Watch for state changes and sends a message to the bus if there is a change
func (obj *NspawnRes) Watch(processChan chan *event.Event) error { func (obj *NspawnRes) Watch() error {
// this resource depends on systemd ensure that it's running // this resource depends on systemd ensure that it's running
if !systemdUtil.IsRunningSystemd() { if !systemdUtil.IsRunningSystemd() {
return fmt.Errorf("Systemd is not running.") return fmt.Errorf("Systemd is not running.")
@@ -122,7 +121,7 @@ func (obj *NspawnRes) Watch(processChan chan *event.Event) error {
bus.Signal(buschan) bus.Signal(buschan)
// notify engine that we're running // notify engine that we're running
if err := obj.Running(processChan); err != nil { if err := obj.Running(); err != nil {
return err // bubble up a NACK... return err // bubble up a NACK...
} }
@@ -155,7 +154,7 @@ func (obj *NspawnRes) Watch(processChan chan *event.Event) error {
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
send = false send = false
obj.Event(processChan) obj.Event()
} }
} }
} }

View File

@@ -28,7 +28,6 @@ import (
"path" "path"
"strings" "strings"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/recwatch" "github.com/purpleidea/mgmt/recwatch"
errwrap "github.com/pkg/errors" errwrap "github.com/pkg/errors"
@@ -165,7 +164,7 @@ Loop:
} }
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *PasswordRes) Watch(processChan chan *event.Event) error { func (obj *PasswordRes) Watch() error {
var err error var err error
obj.recWatcher, err = recwatch.NewRecWatcher(obj.path, false) obj.recWatcher, err = recwatch.NewRecWatcher(obj.path, false)
if err != nil { if err != nil {
@@ -174,7 +173,7 @@ func (obj *PasswordRes) Watch(processChan chan *event.Event) error {
defer obj.recWatcher.Close() defer obj.recWatcher.Close()
// notify engine that we're running // notify engine that we're running
if err := obj.Running(processChan); err != nil { if err := obj.Running(); err != nil {
return err // bubble up a NACK... return err // bubble up a NACK...
} }
@@ -203,7 +202,7 @@ func (obj *PasswordRes) Watch(processChan chan *event.Event) error {
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
send = false send = false
obj.Event(processChan) obj.Event()
} }
} }
} }

View File

@@ -24,7 +24,6 @@ import (
"path" "path"
"strings" "strings"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/resources/packagekit" "github.com/purpleidea/mgmt/resources/packagekit"
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
@@ -104,7 +103,7 @@ func (obj *PkgRes) Init() error {
// It uses the PackageKit UpdatesChanged signal to watch for changes. // It uses the PackageKit UpdatesChanged signal to watch for changes.
// TODO: https://github.com/hughsie/PackageKit/issues/109 // TODO: https://github.com/hughsie/PackageKit/issues/109
// TODO: https://github.com/hughsie/PackageKit/issues/110 // TODO: https://github.com/hughsie/PackageKit/issues/110
func (obj *PkgRes) Watch(processChan chan *event.Event) error { func (obj *PkgRes) Watch() error {
bus := packagekit.NewBus() bus := packagekit.NewBus()
if bus == nil { if bus == nil {
return fmt.Errorf("Can't connect to PackageKit bus.") return fmt.Errorf("Can't connect to PackageKit bus.")
@@ -117,7 +116,7 @@ func (obj *PkgRes) Watch(processChan chan *event.Event) error {
} }
// notify engine that we're running // notify engine that we're running
if err := obj.Running(processChan); err != nil { if err := obj.Running(); err != nil {
return err // bubble up a NACK... return err // bubble up a NACK...
} }
@@ -156,7 +155,7 @@ func (obj *PkgRes) Watch(processChan chan *event.Event) error {
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs
if send { if send {
send = false send = false
obj.Event(processChan) obj.Event()
} }
} }
} }

View File

@@ -141,12 +141,10 @@ type Base interface {
AssociateData(*Data) AssociateData(*Data)
IsWorking() bool IsWorking() bool
Converger() converger.Converger Converger() converger.Converger
RegisterConverger() ConvergerUIDs() (converger.ConvergerUID, converger.ConvergerUID, converger.ConvergerUID)
UnregisterConverger()
ConvergerUID() converger.ConvergerUID
GetState() ResState GetState() ResState
SetState(ResState) SetState(ResState)
Event(chan *event.Event) error Event() error
SendEvent(event.EventName, error) error SendEvent(event.EventName, error) error
ReadEvent(*event.Event) (*error, bool) ReadEvent(*event.Event) (*error, bool)
Refresh() bool // is there a pending refresh to run? Refresh() bool // is there a pending refresh to run?
@@ -161,10 +159,11 @@ type Base interface {
GetGroup() []Res // return everyone grouped inside me GetGroup() []Res // return everyone grouped inside me
SetGroup([]Res) SetGroup([]Res)
VarDir(string) (string, error) VarDir(string) (string, error)
Running(chan *event.Event) error // notify the engine that Watch started Running() error // notify the engine that Watch started
Started() <-chan struct{} // returns when the resource has started Started() <-chan struct{} // returns when the resource has started
Starter(bool) Starter(bool)
Poll(chan *event.Event) error // poll alternative to watching :( Poll() error // poll alternative to watching :(
ProcessChan() chan *event.Event
Prometheus() *prometheus.Prometheus Prometheus() *prometheus.Prometheus
} }
@@ -175,8 +174,8 @@ type Res interface {
Validate() error Validate() error
Init() error Init() error
Close() error Close() error
UIDs() []ResUID // most resources only return one UIDs() []ResUID // most resources only return one
Watch(chan *event.Event) error // send on channel to signal process() events Watch() error // send on channel to signal process() events
CheckApply(apply bool) (checkOK bool, err error) CheckApply(apply bool) (checkOK bool, err error)
AutoEdges() AutoEdge AutoEdges() AutoEdge
Compare(Res) bool Compare(Res) bool
@@ -190,23 +189,28 @@ type BaseRes struct {
MetaParams MetaParams `yaml:"meta"` // struct of all the metaparams MetaParams MetaParams `yaml:"meta"` // struct of all the metaparams
Recv map[string]*Send // mapping of key to receive on from value Recv map[string]*Send // mapping of key to receive on from value
kind string kind string
mutex *sync.Mutex // locks around sending and closing of events channel mutex *sync.Mutex // locks around sending and closing of events channel
events chan *event.Event events chan *event.Event
converger converger.Converger // converged tracking converger converger.Converger // converged tracking
cuid converger.ConvergerUID cuid converger.ConvergerUID
prometheus *prometheus.Prometheus wcuid converger.ConvergerUID
prefix string // base prefix for this resource pcuid converger.ConvergerUID
debug bool prometheus *prometheus.Prometheus
state ResState prefix string // base prefix for this resource
working bool // is the Worker() loop running ? debug bool
started chan struct{} // closed when worker is started/running state ResState
isStarted bool // did the started chan already close? working bool // is the Worker() loop running ?
starter bool // does this have indegree == 0 ? XXX: usually? started chan struct{} // closed when worker is started/running
isStateOK bool // whether the state is okay based on events or not isStarted bool // did the started chan already close?
isGrouped bool // am i contained within a group? starter bool // does this have indegree == 0 ? XXX: usually?
grouped []Res // list of any grouped resources isStateOK bool // whether the state is okay based on events or not
refresh bool // does this resource have a refresh to run? isGrouped bool // am i contained within a group?
grouped []Res // list of any grouped resources
processLock *sync.Mutex
processDone bool
processChan chan *event.Event
refresh bool // does this resource have a refresh to run?
//refreshState StatefulBool // TODO: future stateful bool //refreshState StatefulBool // TODO: future stateful bool
} }
@@ -291,7 +295,13 @@ func (obj *BaseRes) Init() error {
if obj.kind == "" { if obj.kind == "" {
return fmt.Errorf("Resource did not set kind!") return fmt.Errorf("Resource did not set kind!")
} }
obj.cuid = obj.converger.Register()
obj.wcuid = obj.converger.Register() // get a cuid for the worker!
obj.pcuid = obj.converger.Register() // get a cuid for the process
obj.mutex = &sync.Mutex{} obj.mutex = &sync.Mutex{}
obj.working = true // Worker method should now be running...
obj.events = make(chan *event.Event) // unbuffered chan to avoid stale events obj.events = make(chan *event.Event) // unbuffered chan to avoid stale events
obj.started = make(chan struct{}) // closes when started obj.started = make(chan struct{}) // closes when started
@@ -303,6 +313,10 @@ func (obj *BaseRes) Init() error {
obj.Meta().Limit = rate.Inf obj.Meta().Limit = rate.Inf
} }
obj.processLock = &sync.Mutex{} // lock around processChan closing and sending
obj.processDone = false // did we close processChan ?
obj.processChan = make(chan *event.Event)
//dir, err := obj.VarDir("") //dir, err := obj.VarDir("")
//if err != nil { //if err != nil {
// return errwrap.Wrapf(err, "VarDir failed in Init()") // return errwrap.Wrapf(err, "VarDir failed in Init()")
@@ -310,7 +324,6 @@ func (obj *BaseRes) Init() error {
// TODO: this StatefulBool implementation could be eventually swappable // TODO: this StatefulBool implementation could be eventually swappable
//obj.refreshState = &DiskBool{Path: path.Join(dir, refreshPathToken)} //obj.refreshState = &DiskBool{Path: path.Join(dir, refreshPathToken)}
obj.working = true // Worker method should now be running...
return nil return nil
} }
@@ -319,10 +332,21 @@ func (obj *BaseRes) Close() error {
if obj.debug { if obj.debug {
log.Printf("%s[%s]: Close()", obj.Kind(), obj.GetName()) log.Printf("%s[%s]: Close()", obj.Kind(), obj.GetName())
} }
obj.processLock.Lock() // lock to avoid a send when closed!
obj.processDone = true
close(obj.processChan)
obj.processLock.Unlock()
obj.mutex.Lock() obj.mutex.Lock()
obj.working = false // Worker method should now be closing... obj.working = false // Worker method should now be closing...
close(obj.events) // this is where we properly close this channel! close(obj.events) // this is where we properly close this channel!
obj.mutex.Unlock() obj.mutex.Unlock()
obj.pcuid.Unregister()
obj.wcuid.Unregister()
obj.cuid.Unregister()
return nil return nil
} }
@@ -375,22 +399,11 @@ func (obj *BaseRes) Converger() converger.Converger {
return obj.converger return obj.converger
} }
// RegisterConverger sets up the cuid for the resource. This is a helper // ConvergerUIDs returns the ConvergerUIDs for the resource. This is called by
// function for the engine, and shouldn't be called by the resources directly. // the various methods that need one of these ConvergerUIDs. They are registered
func (obj *BaseRes) RegisterConverger() { // by the Init method and unregistered on the resource Close.
obj.cuid = obj.converger.Register() func (obj *BaseRes) ConvergerUIDs() (cuid converger.ConvergerUID, wcuid converger.ConvergerUID, pcuid converger.ConvergerUID) {
} return obj.cuid, obj.wcuid, obj.pcuid
// UnregisterConverger tears down the cuid for the resource. This is a helper
// function for the engine, and shouldn't be called by the resources directly.
func (obj *BaseRes) UnregisterConverger() {
obj.cuid.Unregister()
}
// ConvergerUID returns the ConvergerUID for the resource. This should be called
// by the Watch method of the resource to set the converged state.
func (obj *BaseRes) ConvergerUID() converger.ConvergerUID {
return obj.cuid
} }
// GetState returns the state of the resource. // GetState returns the state of the resource.
@@ -416,6 +429,11 @@ func (obj *BaseRes) StateOK(b bool) {
obj.isStateOK = b obj.isStateOK = b
} }
// ProcessChan returns the chan that resources send events to. Internal API!
func (obj *BaseRes) ProcessChan() chan *event.Event {
return obj.processChan
}
// GroupCmp compares two resources and decides if they're suitable for grouping // GroupCmp compares two resources and decides if they're suitable for grouping
// You'll probably want to override this method when implementing a resource... // You'll probably want to override this method when implementing a resource...
func (obj *BaseRes) GroupCmp(res Res) bool { func (obj *BaseRes) GroupCmp(res Res) bool {
@@ -528,15 +546,15 @@ func (obj *BaseRes) Started() <-chan struct{} { return obj.started }
func (obj *BaseRes) Starter(b bool) { obj.starter = b } func (obj *BaseRes) Starter(b bool) { obj.starter = b }
// Poll is the watch replacement for when we want to poll, which outputs events. // Poll is the watch replacement for when we want to poll, which outputs events.
func (obj *BaseRes) Poll(processChan chan *event.Event) error { func (obj *BaseRes) Poll() error {
cuid := obj.ConvergerUID() // get the converger uid used to report status cuid, _, _ := obj.ConvergerUIDs() // get the converger uid used to report status
// create a time.Ticker for the given interval // create a time.Ticker for the given interval
ticker := time.NewTicker(time.Duration(obj.Meta().Poll) * time.Second) ticker := time.NewTicker(time.Duration(obj.Meta().Poll) * time.Second)
defer ticker.Stop() defer ticker.Stop()
// notify engine that we're running // notify engine that we're running
if err := obj.Running(processChan); err != nil { if err := obj.Running(); err != nil {
return err // bubble up a NACK... return err // bubble up a NACK...
} }
cuid.SetConverged(false) // quickly stop any converge due to Running() cuid.SetConverged(false) // quickly stop any converge due to Running()
@@ -559,7 +577,7 @@ func (obj *BaseRes) Poll(processChan chan *event.Event) error {
if send { if send {
send = false send = false
obj.Event(processChan) obj.Event()
} }
} }
} }

View File

@@ -29,9 +29,13 @@ import (
) )
// Event sends off an event, but doesn't block the incoming event queue. // Event sends off an event, but doesn't block the incoming event queue.
func (obj *BaseRes) Event(processChan chan *event.Event) error { func (obj *BaseRes) Event() error {
resp := event.NewResp() resp := event.NewResp()
processChan <- &event.Event{Name: event.EventNil, Resp: resp} // trigger process obj.processLock.Lock()
if !obj.processDone {
obj.processChan <- &event.Event{Name: event.EventNil, Resp: resp} // trigger process
}
obj.processLock.Unlock()
return resp.Wait() return resp.Wait()
} }
@@ -97,14 +101,14 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit *error, send bool) {
// Running is called by the Watch method of the resource once it has started up. // Running is called by the Watch method of the resource once it has started up.
// This signals to the engine to kick off the initial CheckApply resource check. // This signals to the engine to kick off the initial CheckApply resource check.
func (obj *BaseRes) Running(processChan chan *event.Event) error { func (obj *BaseRes) Running() error {
// TODO: If a non-polling resource wants to use the converger, then it // TODO: If a non-polling resource wants to use the converger, then it
// should probably tell Running (via an arg) to not do this. Currently // should probably tell Running (via an arg) to not do this. Currently
// it is a very unlikey race that could cause an early converge if the // it is a very unlikey race that could cause an early converge if the
// converge timeout is very short ( ~ 1s) and the Watch method doesn't // converge timeout is very short ( ~ 1s) and the Watch method doesn't
// immediately SetConverged(false) to stop possible early termination. // immediately SetConverged(false) to stop possible early termination.
if obj.Meta().Poll == 0 { // if not polling, unblock this... if obj.Meta().Poll == 0 { // if not polling, unblock this...
cuid := obj.ConvergerUID() cuid, _, _ := obj.ConvergerUIDs()
cuid.SetConverged(true) // a reasonable initial assumption cuid.SetConverged(true) // a reasonable initial assumption
} }
@@ -116,7 +120,7 @@ func (obj *BaseRes) Running(processChan chan *event.Event) error {
var err error var err error
if obj.starter { // vertices of indegree == 0 should send initial pokes if obj.starter { // vertices of indegree == 0 should send initial pokes
err = obj.Event(processChan) // trigger a CheckApply err = obj.Event() // trigger a CheckApply
} }
return err // bubble up any possible error (or nil) return err // bubble up any possible error (or nil)
} }

View File

@@ -24,7 +24,6 @@ import (
"fmt" "fmt"
"log" "log"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
systemd "github.com/coreos/go-systemd/dbus" // change namespace systemd "github.com/coreos/go-systemd/dbus" // change namespace
@@ -71,7 +70,7 @@ func (obj *SvcRes) Init() error {
} }
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *SvcRes) Watch(processChan chan *event.Event) error { func (obj *SvcRes) Watch() error {
// obj.Name: svc name // obj.Name: svc name
if !systemdUtil.IsRunningSystemd() { if !systemdUtil.IsRunningSystemd() {
return fmt.Errorf("Systemd is not running.") return fmt.Errorf("Systemd is not running.")
@@ -96,7 +95,7 @@ func (obj *SvcRes) Watch(processChan chan *event.Event) error {
bus.Signal(buschan) bus.Signal(buschan)
// notify engine that we're running // notify engine that we're running
if err := obj.Running(processChan); err != nil { if err := obj.Running(); err != nil {
return err // bubble up a NACK... return err // bubble up a NACK...
} }
@@ -197,7 +196,7 @@ func (obj *SvcRes) Watch(processChan chan *event.Event) error {
if send { if send {
send = false send = false
obj.Event(processChan) obj.Event()
} }
} }
} }

View File

@@ -22,8 +22,6 @@ import (
"fmt" "fmt"
"log" "log"
"time" "time"
"github.com/purpleidea/mgmt/event"
) )
func init() { func init() {
@@ -70,13 +68,13 @@ func (obj *TimerRes) newTicker() *time.Ticker {
} }
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *TimerRes) Watch(processChan chan *event.Event) error { func (obj *TimerRes) Watch() error {
// create a time.Ticker for the given interval // create a time.Ticker for the given interval
obj.ticker = obj.newTicker() obj.ticker = obj.newTicker()
defer obj.ticker.Stop() defer obj.ticker.Stop()
// notify engine that we're running // notify engine that we're running
if err := obj.Running(processChan); err != nil { if err := obj.Running(); err != nil {
return err // bubble up a NACK... return err // bubble up a NACK...
} }
@@ -96,7 +94,7 @@ func (obj *TimerRes) Watch(processChan chan *event.Event) error {
if send { if send {
send = false send = false
obj.Event(processChan) obj.Event()
} }
} }
} }

View File

@@ -29,8 +29,6 @@ import (
"strings" "strings"
"time" "time"
"github.com/purpleidea/mgmt/event"
multierr "github.com/hashicorp/go-multierror" multierr "github.com/hashicorp/go-multierror"
"github.com/libvirt/libvirt-go" "github.com/libvirt/libvirt-go"
libvirtxml "github.com/libvirt/libvirt-go-xml" libvirtxml "github.com/libvirt/libvirt-go-xml"
@@ -250,7 +248,7 @@ func (obj *VirtRes) connect() (conn *libvirt.Connect, err error) {
} }
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *VirtRes) Watch(processChan chan *event.Event) error { func (obj *VirtRes) Watch() error {
domChan := make(chan libvirt.DomainEventType) // TODO: do we need to buffer this? domChan := make(chan libvirt.DomainEventType) // TODO: do we need to buffer this?
gaChan := make(chan *libvirt.DomainEventAgentLifecycle) gaChan := make(chan *libvirt.DomainEventAgentLifecycle)
errorChan := make(chan error) errorChan := make(chan error)
@@ -308,7 +306,7 @@ func (obj *VirtRes) Watch(processChan chan *event.Event) error {
defer obj.conn.DomainEventDeregister(gaCallbackID) defer obj.conn.DomainEventDeregister(gaCallbackID)
// notify engine that we're running // notify engine that we're running
if err := obj.Running(processChan); err != nil { if err := obj.Running(); err != nil {
return err // bubble up a NACK... return err // bubble up a NACK...
} }
@@ -400,7 +398,7 @@ func (obj *VirtRes) Watch(processChan chan *event.Event) error {
if send { if send {
send = false send = false
obj.Event(processChan) obj.Event()
} }
} }
} }