engine: resources: Simplify the Watch loop

I had some legacy unnecessary boolean for sending everywhere. Not sure
why I never re-read it, it's so easy to just copy and paste and carry
on.
This commit is contained in:
James Shubin
2025-05-25 02:12:14 -04:00
parent f73127ec23
commit b868a60f69
29 changed files with 48 additions and 244 deletions

View File

@@ -361,14 +361,14 @@ func (obj *FooRes) Watch(ctx context.Context) error {
// notify engine that we're running // notify engine that we're running
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var send = false // send event?
for { for {
select { select {
// the actual events! // the actual events!
case event := <-obj.foo.Events: case event := <-obj.foo.Events:
if is_an_event { if !is_an_event {
send = true continue // skip event
} }
// send below...
// event errors // event errors
case err := <-obj.foo.Errors: case err := <-obj.foo.Errors:
@@ -378,11 +378,7 @@ func (obj *FooRes) Watch(ctx context.Context) error {
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs obj.init.Event() // notify engine of an event (this can block)
if send {
send = false
obj.init.Event()
}
} }
} }
``` ```

View File

@@ -148,7 +148,6 @@ func (obj *AugeasRes) Watch(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var send = false // send event?
for { for {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("Watching: %s", obj.File) // attempting to watch... obj.init.Logf("Watching: %s", obj.File) // attempting to watch...
@@ -165,19 +164,14 @@ func (obj *AugeasRes) Watch(ctx context.Context) error {
if obj.init.Debug { // don't access event.Body if event.Error isn't nil if obj.init.Debug { // don't access event.Body if event.Error isn't nil
obj.init.Logf("Event(%s): %v", event.Body.Name, event.Body.Op) obj.init.Logf("Event(%s): %v", event.Body.Name, event.Body.Op)
} }
send = true
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// checkApplySet runs CheckApply for one element of the AugeasRes.Set // checkApplySet runs CheckApply for one element of the AugeasRes.Set
func (obj *AugeasRes) checkApplySet(ctx context.Context, apply bool, ag *augeas.Augeas, set *AugeasSet) (bool, error) { func (obj *AugeasRes) checkApplySet(ctx context.Context, apply bool, ag *augeas.Augeas, set *AugeasSet) (bool, error) {

View File

@@ -447,8 +447,6 @@ func (obj *AwsEc2Res) Watch(ctx context.Context) error {
// longpollWatch uses the ec2 api's built in methods to watch ec2 resource // longpollWatch uses the ec2 api's built in methods to watch ec2 resource
// state. // state.
func (obj *AwsEc2Res) longpollWatch(ctx context.Context) error { func (obj *AwsEc2Res) longpollWatch(ctx context.Context) error {
send := false
// We tell the engine that we're running right away. This is not correct, // We tell the engine that we're running right away. This is not correct,
// but the api doesn't have a way to signal when the waiters are ready. // but the api doesn't have a way to signal when the waiters are ready.
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
@@ -527,19 +525,15 @@ func (obj *AwsEc2Res) longpollWatch(ctx context.Context) error {
continue continue
default: default:
obj.init.Logf("State: %v", msg.state) obj.init.Logf("State: %v", msg.state)
send = true
} }
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// snsWatch uses amazon's SNS and CloudWatchEvents APIs to get instance state- // snsWatch uses amazon's SNS and CloudWatchEvents APIs to get instance state-
// change notifications pushed to the http endpoint (snsServer) set up below. In // change notifications pushed to the http endpoint (snsServer) set up below. In
@@ -547,7 +541,6 @@ func (obj *AwsEc2Res) longpollWatch(ctx context.Context) error {
// it can publish to. snsWatch creates an http server which listens for messages // it can publish to. snsWatch creates an http server which listens for messages
// published to the topic and processes them accordingly. // published to the topic and processes them accordingly.
func (obj *AwsEc2Res) snsWatch(ctx context.Context) error { func (obj *AwsEc2Res) snsWatch(ctx context.Context) error {
send := false
defer obj.wg.Wait() defer obj.wg.Wait()
// create the sns listener // create the sns listener
// closing is handled by http.Server.Shutdown in the defer func below // closing is handled by http.Server.Shutdown in the defer func below
@@ -622,18 +615,14 @@ func (obj *AwsEc2Res) snsWatch(ctx context.Context) error {
continue continue
} }
obj.init.Logf("State: %v", msg.event) obj.init.Logf("State: %v", msg.event)
send = true
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// CheckApply method for AwsEc2 resource. // CheckApply method for AwsEc2 resource.
func (obj *AwsEc2Res) CheckApply(ctx context.Context, apply bool) (bool, error) { func (obj *AwsEc2Res) CheckApply(ctx context.Context, apply bool) (bool, error) {

View File

@@ -296,7 +296,6 @@ func (obj *CronRes) Watch(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var send = false // send event?
for { for {
select { select {
case event := <-dbusChan: case event := <-dbusChan:
@@ -304,7 +303,6 @@ func (obj *CronRes) Watch(ctx context.Context) error {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("%+v", event) obj.init.Logf("%+v", event)
} }
send = true
case event, ok := <-obj.recWatcher.Events(): case event, ok := <-obj.recWatcher.Events():
// process unit file recwatch events // process unit file recwatch events
@@ -317,18 +315,14 @@ func (obj *CronRes) Watch(ctx context.Context) error {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("Event(%s): %v", event.Body.Name, event.Body.Op) obj.init.Logf("Event(%s): %v", event.Body.Name, event.Body.Op)
} }
send = true
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// CheckApply is run to check the state and, if apply is true, to apply the // CheckApply is run to check the state and, if apply is true, to apply the
// necessary changes to reach the desired state. This is run before Watch and // necessary changes to reach the desired state. This is run before Watch and

View File

@@ -158,7 +158,6 @@ func (obj *DeployTar) Watch(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var send = false // send event?
for { for {
select { select {
case event, ok := <-recWatcher.Events(): case event, ok := <-recWatcher.Events():
@@ -174,19 +173,14 @@ func (obj *DeployTar) Watch(ctx context.Context) error {
if obj.init.Debug { // don't access event.Body if event.Error isn't nil if obj.init.Debug { // don't access event.Body if event.Error isn't nil
obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op) obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op)
} }
send = true
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// CheckApply checks the resource state and applies the resource if the bool // CheckApply checks the resource state and applies the resource if the bool
// input is true. It returns error info and if the state check passed or not. // input is true. It returns error info and if the state check passed or not.

View File

@@ -514,7 +514,6 @@ func (obj *DHCPServerRes) Watch(ctx context.Context) error {
startupChan := make(chan struct{}) startupChan := make(chan struct{})
close(startupChan) // send one initial signal close(startupChan) // send one initial signal
var send = false // send event?
for { for {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("Looping...") obj.init.Logf("Looping...")
@@ -523,7 +522,6 @@ func (obj *DHCPServerRes) Watch(ctx context.Context) error {
select { select {
case <-startupChan: case <-startupChan:
startupChan = nil startupChan = nil
send = true
case <-closeSignal: // something shut us down early case <-closeSignal: // something shut us down early
return closeError return closeError
@@ -532,13 +530,9 @@ func (obj *DHCPServerRes) Watch(ctx context.Context) error {
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// sidCheckApply runs the server ID cache operation in CheckApply, which can // sidCheckApply runs the server ID cache operation in CheckApply, which can
// help CheckApply fail before the handler runs, so at least we see an error. // help CheckApply fail before the handler runs, so at least we see an error.

View File

@@ -222,7 +222,6 @@ func (obj *DockerContainerRes) Watch(ctx context.Context) error {
obj.init.Running() obj.init.Running()
} }
var send = false // send event?
for { for {
select { select {
case event, ok := <-eventChan: case event, ok := <-eventChan:
@@ -232,7 +231,6 @@ func (obj *DockerContainerRes) Watch(ctx context.Context) error {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("%+v", event) obj.init.Logf("%+v", event)
} }
send = true
case err, ok := <-errChan: case err, ok := <-errChan:
if !ok { if !ok {
@@ -244,13 +242,9 @@ func (obj *DockerContainerRes) Watch(ctx context.Context) error {
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// CheckApply method for Docker resource. // CheckApply method for Docker resource.
func (obj *DockerContainerRes) CheckApply(ctx context.Context, apply bool) (bool, error) { func (obj *DockerContainerRes) CheckApply(ctx context.Context, apply bool) (bool, error) {

View File

@@ -169,7 +169,6 @@ func (obj *DockerImageRes) Watch(ctx context.Context) error {
obj.init.Running() obj.init.Running()
} }
var send = false // send event?
for { for {
select { select {
case event, ok := <-eventChan: case event, ok := <-eventChan:
@@ -179,7 +178,6 @@ func (obj *DockerImageRes) Watch(ctx context.Context) error {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("%+v", event) obj.init.Logf("%+v", event)
} }
send = true
case err, ok := <-errChan: case err, ok := <-errChan:
if !ok { if !ok {
@@ -191,13 +189,9 @@ func (obj *DockerImageRes) Watch(ctx context.Context) error {
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// CheckApply method for Docker resource. // CheckApply method for Docker resource.
func (obj *DockerImageRes) CheckApply(ctx context.Context, apply bool) (checkOK bool, err error) { func (obj *DockerImageRes) CheckApply(ctx context.Context, apply bool) (checkOK bool, err error) {

View File

@@ -369,7 +369,6 @@ func (obj *ExecRes) Watch(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var send = false // send event?
for { for {
select { select {
case data, ok := <-ioChan: case data, ok := <-ioChan:
@@ -408,8 +407,8 @@ func (obj *ExecRes) Watch(ctx context.Context) error {
obj.init.Logf("watch out:") obj.init.Logf("watch out:")
obj.init.Logf("%s", s) obj.init.Logf("%s", s)
} }
if data.text != "" { if data.text == "" { // TODO: do we want to skip event?
send = true continue
} }
case event, ok := <-rwChan: case event, ok := <-rwChan:
@@ -419,7 +418,6 @@ func (obj *ExecRes) Watch(ctx context.Context) error {
if err := event.Error; err != nil { if err := event.Error; err != nil {
return errwrap.Wrapf(err, "unknown %s watcher error", obj) return errwrap.Wrapf(err, "unknown %s watcher error", obj)
} }
send = true
case files, ok := <-filesChan: case files, ok := <-filesChan:
if !ok { // channel shutdown if !ok { // channel shutdown
@@ -428,19 +426,14 @@ func (obj *ExecRes) Watch(ctx context.Context) error {
if err := files.Error; err != nil { if err := files.Error; err != nil {
return errwrap.Wrapf(err, "unknown %s watcher error", obj) return errwrap.Wrapf(err, "unknown %s watcher error", obj)
} }
send = true
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// CheckApply checks the resource state and applies the resource if the bool // CheckApply checks the resource state and applies the resource if the bool
// input is true. It returns error info and if the state check passed or not. // input is true. It returns error info and if the state check passed or not.

View File

@@ -518,7 +518,6 @@ func (obj *FileRes) Watch(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var send = false // send event?
for { for {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("watching: %s", obj.getPath()) // attempting to watch... obj.init.Logf("watching: %s", obj.getPath()) // attempting to watch...
@@ -538,7 +537,6 @@ func (obj *FileRes) Watch(ctx context.Context) error {
if obj.init.Debug { // don't access event.Body if event.Error isn't nil if obj.init.Debug { // don't access event.Body if event.Error isn't nil
obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op) obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op)
} }
send = true
case event, ok := <-inputEvents: case event, ok := <-inputEvents:
if !ok { if !ok {
@@ -550,19 +548,14 @@ func (obj *FileRes) Watch(ctx context.Context) error {
if obj.init.Debug { // don't access event.Body if event.Error isn't nil if obj.init.Debug { // don't access event.Body if event.Error isn't nil
obj.init.Logf("input event(%s): %v", event.Body.Name, event.Body.Op) obj.init.Logf("input event(%s): %v", event.Body.Name, event.Body.Op)
} }
send = true
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// fileCheckApply is the CheckApply operation for a source and destination file. // fileCheckApply is the CheckApply operation for a source and destination file.
// It can accept an io.Reader as the source, which can be a regular file, or it // It can accept an io.Reader as the source, which can be a regular file, or it

View File

@@ -262,7 +262,6 @@ func (obj *FirewalldRes) Watch(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var send = false // send event?
for { for {
select { select {
case event, ok := <-events: // &nftables.MonitorEvent case event, ok := <-events: // &nftables.MonitorEvent
@@ -278,19 +277,13 @@ func (obj *FirewalldRes) Watch(ctx context.Context) error {
//obj.init.Logf("event data: %+v", event.Data) //obj.init.Logf("event data: %+v", event.Data)
} }
send = true
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// CheckApply checks the resource state and applies the resource if the bool // CheckApply checks the resource state and applies the resource if the bool
// input is true. It returns error info and if the state check passed or not. // input is true. It returns error info and if the state check passed or not.

View File

@@ -102,7 +102,6 @@ func (obj *GroupRes) Watch(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var send = false // send event?
for { for {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("Watching: %s", groupFile) // attempting to watch... obj.init.Logf("Watching: %s", groupFile) // attempting to watch...
@@ -119,19 +118,14 @@ func (obj *GroupRes) Watch(ctx context.Context) error {
if obj.init.Debug { // don't access event.Body if event.Error isn't nil if obj.init.Debug { // don't access event.Body if event.Error isn't nil
obj.init.Logf("Event(%s): %v", event.Body.Name, event.Body.Op) obj.init.Logf("Event(%s): %v", event.Body.Name, event.Body.Op)
} }
send = true
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// CheckApply method for Group resource. // CheckApply method for Group resource.
func (obj *GroupRes) CheckApply(ctx context.Context, apply bool) (bool, error) { func (obj *GroupRes) CheckApply(ctx context.Context, apply bool) (bool, error) {

View File

@@ -243,7 +243,6 @@ func (obj *GzipRes) Watch(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var send = false // send event?
for { for {
select { select {
case event, ok := <-recWatcher.Events(): case event, ok := <-recWatcher.Events():
@@ -259,7 +258,6 @@ func (obj *GzipRes) Watch(ctx context.Context) error {
if obj.init.Debug { // don't access event.Body if event.Error isn't nil if obj.init.Debug { // don't access event.Body if event.Error isn't nil
obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op) obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op)
} }
send = true
case event, ok := <-events: case event, ok := <-events:
if !ok { // channel shutdown if !ok { // channel shutdown
@@ -271,19 +269,14 @@ func (obj *GzipRes) Watch(ctx context.Context) error {
if obj.init.Debug { // don't access event.Body if event.Error isn't nil if obj.init.Debug { // don't access event.Body if event.Error isn't nil
obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op) obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op)
} }
send = true
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// CheckApply checks the resource state and applies the resource if the bool // CheckApply checks the resource state and applies the resource if the bool
// input is true. It returns error info and if the state check passed or not. // input is true. It returns error info and if the state check passed or not.

View File

@@ -183,7 +183,6 @@ func (obj *HostnameRes) Watch(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var send = false // send event?
for { for {
select { select {
case _, ok := <-signals: case _, ok := <-signals:
@@ -191,7 +190,6 @@ func (obj *HostnameRes) Watch(ctx context.Context) error {
return fmt.Errorf("unexpected close") return fmt.Errorf("unexpected close")
} }
//signals = nil //signals = nil
send = true
case event, ok := <-recWatcher.Events(): case event, ok := <-recWatcher.Events():
if !ok { // channel shutdown if !ok { // channel shutdown
@@ -203,19 +201,14 @@ func (obj *HostnameRes) Watch(ctx context.Context) error {
if obj.init.Debug { // don't access event.Body if event.Error isn't nil if obj.init.Debug { // don't access event.Body if event.Error isn't nil
obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op) obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op)
} }
send = true
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
func (obj *HostnameRes) updateHostnameProperty(object dbus.BusObject, expectedValue, property, setterName string, apply bool) (bool, error) { func (obj *HostnameRes) updateHostnameProperty(object dbus.BusObject, expectedValue, property, setterName string, apply bool) (bool, error) {
propertyObject, err := object.GetProperty("org.freedesktop.hostname1." + property) propertyObject, err := object.GetProperty("org.freedesktop.hostname1." + property)

View File

@@ -526,7 +526,6 @@ func (obj *HTTPServerRes) Watch(ctx context.Context) error {
startupChan := make(chan struct{}) startupChan := make(chan struct{})
close(startupChan) // send one initial signal close(startupChan) // send one initial signal
var send = false // send event?
for { for {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("Looping...") obj.init.Logf("Looping...")
@@ -535,7 +534,6 @@ func (obj *HTTPServerRes) Watch(ctx context.Context) error {
select { select {
case <-startupChan: case <-startupChan:
startupChan = nil startupChan = nil
send = true
case err, ok := <-multiplexedChan: case err, ok := <-multiplexedChan:
if !ok { // shouldn't happen if !ok { // shouldn't happen
@@ -545,7 +543,6 @@ func (obj *HTTPServerRes) Watch(ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
send = true
case <-closeSignal: // something shut us down early case <-closeSignal: // something shut us down early
return closeError return closeError
@@ -554,13 +551,9 @@ func (obj *HTTPServerRes) Watch(ctx context.Context) error {
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// CheckApply never has anything to do for this resource, so it always succeeds. // CheckApply never has anything to do for this resource, so it always succeeds.
// It does however check that certain runtime requirements (such as the Root dir // It does however check that certain runtime requirements (such as the Root dir

View File

@@ -203,7 +203,6 @@ func (obj *HTTPServerFlagRes) Watch(ctx context.Context) error {
startupChan := make(chan struct{}) startupChan := make(chan struct{})
close(startupChan) // send one initial signal close(startupChan) // send one initial signal
var send = false // send event?
for { for {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("Looping...") obj.init.Logf("Looping...")
@@ -212,7 +211,6 @@ func (obj *HTTPServerFlagRes) Watch(ctx context.Context) error {
select { select {
case <-startupChan: case <-startupChan:
startupChan = nil startupChan = nil
send = true
case err, ok := <-obj.eventStream: case err, ok := <-obj.eventStream:
if !ok { // shouldn't happen if !ok { // shouldn't happen
@@ -222,19 +220,14 @@ func (obj *HTTPServerFlagRes) Watch(ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
send = true
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// CheckApply never has anything to do for this resource, so it always succeeds. // CheckApply never has anything to do for this resource, so it always succeeds.
func (obj *HTTPServerFlagRes) CheckApply(ctx context.Context, apply bool) (bool, error) { func (obj *HTTPServerFlagRes) CheckApply(ctx context.Context, apply bool) (bool, error) {

View File

@@ -664,7 +664,6 @@ func (obj *HTTPServerUIRes) Watch(ctx context.Context) error {
startupChan := make(chan struct{}) startupChan := make(chan struct{})
close(startupChan) // send one initial signal close(startupChan) // send one initial signal
var send = false // send event?
for { for {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("Looping...") obj.init.Logf("Looping...")
@@ -673,7 +672,6 @@ func (obj *HTTPServerUIRes) Watch(ctx context.Context) error {
select { select {
case <-startupChan: case <-startupChan:
startupChan = nil startupChan = nil
send = true
//case err, ok := <-obj.eventStream: //case err, ok := <-obj.eventStream:
// if !ok { // shouldn't happen // if !ok { // shouldn't happen
@@ -683,7 +681,6 @@ func (obj *HTTPServerUIRes) Watch(ctx context.Context) error {
// if err != nil { // if err != nil {
// return err // return err
// } // }
// send = true
case err, ok := <-multiplexedChan: case err, ok := <-multiplexedChan:
if !ok { // shouldn't happen if !ok { // shouldn't happen
@@ -693,18 +690,13 @@ func (obj *HTTPServerUIRes) Watch(ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
send = true
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
}
//return nil // unreachable //return nil // unreachable
} }

View File

@@ -253,7 +253,6 @@ func (obj *MountRes) Watch(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var send bool
var done bool var done bool
for { for {
select { select {
@@ -272,8 +271,6 @@ func (obj *MountRes) Watch(ctx context.Context) error {
obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op) obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op)
} }
send = true
case event, ok := <-ch: case event, ok := <-ch:
if !ok { if !ok {
if done { if done {
@@ -286,19 +283,13 @@ func (obj *MountRes) Watch(ctx context.Context) error {
obj.init.Logf("event: %+v", event) obj.init.Logf("event: %+v", event)
} }
send = true
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// fstabCheckApply checks /etc/fstab for entries corresponding to the resource // fstabCheckApply checks /etc/fstab for entries corresponding to the resource
// definition, and adds or deletes the entry as needed. // definition, and adds or deletes the entry as needed.

View File

@@ -121,19 +121,10 @@ func (obj *MsgRes) Cleanup() error {
func (obj *MsgRes) Watch(ctx context.Context) error { func (obj *MsgRes) Watch(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
//var send = false // send event?
for {
select { select {
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
//if send {
// send = false
// obj.init.Event() // notify engine of an event (this can block)
//}
}
} }
// isAllStateOK derives a compound state from all internal cache flags that // isAllStateOK derives a compound state from all internal cache flags that

View File

@@ -320,7 +320,6 @@ func (obj *NetRes) Watch(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var send = false // send event?
var done bool var done bool
for { for {
select { select {
@@ -339,8 +338,6 @@ func (obj *NetRes) Watch(ctx context.Context) error {
obj.init.Logf("Event: %+v", s.msg) obj.init.Logf("Event: %+v", s.msg)
} }
send = true
case event, ok := <-recWatcher.Events(): case event, ok := <-recWatcher.Events():
if !ok { if !ok {
if done { if done {
@@ -356,19 +353,13 @@ func (obj *NetRes) Watch(ctx context.Context) error {
obj.init.Logf("Event(%s): %v", event.Body.Name, event.Body.Op) obj.init.Logf("Event(%s): %v", event.Body.Name, event.Body.Op)
} }
send = true
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// ifaceCheckApply checks the state of the network device and brings it up or // ifaceCheckApply checks the state of the network device and brings it up or
// down as necessary. // down as necessary.

View File

@@ -183,12 +183,13 @@ func (obj *NspawnRes) Watch(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var send = false // send event?
for { for {
select { select {
case event := <-busChan: case event := <-busChan:
// process org.freedesktop.machine1 events for this resource's name // process org.freedesktop.machine1 events for this resource's name
if event.Body[0] == obj.Name() { if event.Body[0] != obj.Name() {
continue
}
obj.init.Logf("Event received: %v", event.Name) obj.init.Logf("Event received: %v", event.Name)
if event.Name == machineNew { if event.Name == machineNew {
obj.init.Logf("Machine started") obj.init.Logf("Machine started")
@@ -197,20 +198,14 @@ func (obj *NspawnRes) Watch(ctx context.Context) error {
} else { } else {
return fmt.Errorf("unknown event: %s", event.Name) return fmt.Errorf("unknown event: %s", event.Name)
} }
send = true
}
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// CheckApply is run to check the state and, if apply is true, to apply the // CheckApply is run to check the state and, if apply is true, to apply the
// necessary changes to reach the desired state. This is run before Watch and // necessary changes to reach the desired state. This is run before Watch and

View File

@@ -222,7 +222,6 @@ func (obj *PasswordRes) Watch(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var send = false // send event?
for { for {
select { select {
// NOTE: this part is very similar to the file resource code // NOTE: this part is very similar to the file resource code
@@ -233,19 +232,14 @@ func (obj *PasswordRes) Watch(ctx context.Context) error {
if err := event.Error; err != nil { if err := event.Error; err != nil {
return errwrap.Wrapf(err, "unknown %s watcher error", obj) return errwrap.Wrapf(err, "unknown %s watcher error", obj)
} }
send = true
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// CheckApply method for Password resource. Does nothing, returns happy! // CheckApply method for Password resource. Does nothing, returns happy!
func (obj *PasswordRes) CheckApply(ctx context.Context, apply bool) (bool, error) { func (obj *PasswordRes) CheckApply(ctx context.Context, apply bool) (bool, error) {

View File

@@ -150,7 +150,6 @@ func (obj *PkgRes) Watch(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var send = false // send event?
for { for {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("%s: Watching...", obj.fmtNames(obj.getNames())) obj.init.Logf("%s: Watching...", obj.fmtNames(obj.getNames()))
@@ -169,19 +168,13 @@ func (obj *PkgRes) Watch(ctx context.Context) error {
<-ch // discard <-ch // discard
} }
send = true
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// get list of names when grouped or not // get list of names when grouped or not
func (obj *PkgRes) getNames() []string { func (obj *PkgRes) getNames() []string {

View File

@@ -217,7 +217,6 @@ func (obj *SysctlRes) Watch(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var send = false // send event?
for { for {
select { select {
case event, ok := <-events1: case event, ok := <-events1:
@@ -230,7 +229,6 @@ func (obj *SysctlRes) Watch(ctx context.Context) error {
if obj.init.Debug { // don't access event.Body if event.Error isn't nil if obj.init.Debug { // don't access event.Body if event.Error isn't nil
obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op) obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op)
} }
send = true
case event, ok := <-events2: case event, ok := <-events2:
if !ok { // channel shutdown if !ok { // channel shutdown
@@ -242,19 +240,14 @@ func (obj *SysctlRes) Watch(ctx context.Context) error {
if obj.init.Debug { // don't access event.Body if event.Error isn't nil if obj.init.Debug { // don't access event.Body if event.Error isn't nil
obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op) obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op)
} }
send = true
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// CheckApply checks the resource state and applies the resource if the bool // CheckApply checks the resource state and applies the resource if the bool
// input is true. It returns error info and if the state check passed or not. // input is true. It returns error info and if the state check passed or not.

View File

@@ -218,7 +218,6 @@ func (obj *TarRes) Watch(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var send = false // send event?
for { for {
select { select {
case event, ok := <-recWatcher.Events(): case event, ok := <-recWatcher.Events():
@@ -234,7 +233,6 @@ func (obj *TarRes) Watch(ctx context.Context) error {
if obj.init.Debug { // don't access event.Body if event.Error isn't nil if obj.init.Debug { // don't access event.Body if event.Error isn't nil
obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op) obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op)
} }
send = true
case event, ok := <-events: case event, ok := <-events:
if !ok { // channel shutdown if !ok { // channel shutdown
@@ -249,19 +247,14 @@ func (obj *TarRes) Watch(ctx context.Context) error {
if obj.init.Debug { // don't access event.Body if event.Error isn't nil if obj.init.Debug { // don't access event.Body if event.Error isn't nil
obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op) obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op)
} }
send = true
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// CheckApply checks the resource state and applies the resource if the bool // CheckApply checks the resource state and applies the resource if the bool
// input is true. It returns error info and if the state check passed or not. // input is true. It returns error info and if the state check passed or not.

View File

@@ -199,7 +199,6 @@ func (obj *TFTPServerRes) Watch(ctx context.Context) error {
startupChan := make(chan struct{}) startupChan := make(chan struct{})
close(startupChan) // send one initial signal close(startupChan) // send one initial signal
var send = false // send event?
for { for {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("Looping...") obj.init.Logf("Looping...")
@@ -208,7 +207,6 @@ func (obj *TFTPServerRes) Watch(ctx context.Context) error {
select { select {
case <-startupChan: case <-startupChan:
startupChan = nil startupChan = nil
send = true
case <-closeSignal: // something shut us down early case <-closeSignal: // something shut us down early
return closeError return closeError
@@ -217,13 +215,9 @@ func (obj *TFTPServerRes) Watch(ctx context.Context) error {
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// CheckApply never has anything to do for this resource, so it always succeeds. // CheckApply never has anything to do for this resource, so it always succeeds.
// It does however check that certain runtime requirements (such as the Root dir // It does however check that certain runtime requirements (such as the Root dir

View File

@@ -91,23 +91,18 @@ func (obj *TimerRes) Watch(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var send = false // send event?
for { for {
select { select {
case <-obj.ticker.C: // received the timer event case <-obj.ticker.C: // received the timer event
send = true
obj.init.Logf("received tick") obj.init.Logf("received tick")
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// CheckApply method for Timer resource. Triggers a timer reset on notify. // CheckApply method for Timer resource. Triggers a timer reset on notify.
func (obj *TimerRes) CheckApply(ctx context.Context, apply bool) (bool, error) { func (obj *TimerRes) CheckApply(ctx context.Context, apply bool) (bool, error) {

View File

@@ -153,7 +153,6 @@ func (obj *UserRes) Watch(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var send = false // send event?
for { for {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("watching: %s", util.EtcPasswdFile) // attempting to watch... obj.init.Logf("watching: %s", util.EtcPasswdFile) // attempting to watch...
@@ -170,19 +169,14 @@ func (obj *UserRes) Watch(ctx context.Context) error {
if obj.init.Debug { // don't access event.Body if event.Error isn't nil if obj.init.Debug { // don't access event.Body if event.Error isn't nil
obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op) obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op)
} }
send = true
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// CheckApply method for User resource. // CheckApply method for User resource.
func (obj *UserRes) CheckApply(ctx context.Context, apply bool) (bool, error) { func (obj *UserRes) CheckApply(ctx context.Context, apply bool) (bool, error) {

View File

@@ -437,7 +437,6 @@ func (obj *VirtBuilderRes) Watch(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running obj.init.Running() // when started, notify engine that we're running
var send = false // send event?
for { for {
select { select {
case event, ok := <-recWatcher.Events(): case event, ok := <-recWatcher.Events():
@@ -450,19 +449,14 @@ func (obj *VirtBuilderRes) Watch(ctx context.Context) error {
if obj.init.Debug { // don't access event.Body if event.Error isn't nil if obj.init.Debug { // don't access event.Body if event.Error isn't nil
obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op) obj.init.Logf("event(%s): %v", event.Body.Name, event.Body.Op)
} }
send = true
case <-ctx.Done(): // closed by the engine to signal shutdown case <-ctx.Done(): // closed by the engine to signal shutdown
return nil return nil
} }
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
obj.init.Event() // notify engine of an event (this can block) obj.init.Event() // notify engine of an event (this can block)
} }
} }
}
// CheckApply checks the resource state and applies the resource if the bool // CheckApply checks the resource state and applies the resource if the bool
// input is true. It returns error info and if the state check passed or not. // input is true. It returns error info and if the state check passed or not.