From c59f45a37b7b3181326af8bbb602d1c1810a237f Mon Sep 17 00:00:00 2001 From: James Shubin Date: Mon, 28 Mar 2016 20:22:03 -0400 Subject: [PATCH] Force process events to be synchronous This avoids messing up the converged-timeout state! --- event.go | 24 ++++++++++++++++++++++-- exec.go | 8 +++++--- file.go | 6 ++++-- noop.go | 6 ++++-- pgraph.go | 11 ++++++++--- pkg.go | 6 ++++-- resources.go | 2 +- svc.go | 6 ++++-- 8 files changed, 52 insertions(+), 17 deletions(-) diff --git a/event.go b/event.go index 60d70dcf..8fbbf0a4 100644 --- a/event.go +++ b/event.go @@ -21,16 +21,19 @@ package main type eventName int const ( - eventExit eventName = iota + eventNil eventName = iota + eventExit eventStart eventPause eventPoke eventBackPoke ) +type Resp chan bool + type Event struct { Name eventName - Resp chan bool // channel to send an ack response on, nil to skip + Resp Resp // channel to send an ack response on, nil to skip //Wg *sync.WaitGroup // receiver barrier to Wait() for everyone else on Msg string // some words for fun Activity bool // did something interesting happen? @@ -49,6 +52,23 @@ func (event *Event) NACK() { } } +// Resp is just a helper to return the right type of response channel +func NewResp() Resp { + resp := make(chan bool) + return resp +} + +// ACKWait waits for a +ive Ack from a Resp channel +func (resp Resp) ACKWait() { + for { + value := <-resp + // wait until true value + if value { + return + } + } +} + // get the activity value func (event *Event) GetActivity() bool { return event.Activity diff --git a/exec.go b/exec.go index 2f7c1528..139b47d7 100644 --- a/exec.go +++ b/exec.go @@ -102,7 +102,7 @@ func (obj *ExecRes) BufioChanScanner(scanner *bufio.Scanner) (chan string, chan } // Exec watcher -func (obj *ExecRes) Watch(processChan chan struct{}) { +func (obj *ExecRes) Watch(processChan chan Event) { if obj.IsWatching() { return } @@ -192,8 +192,10 @@ func (obj *ExecRes) Watch(processChan chan struct{}) { if send { send = false // it is okay to invalidate the clean state on poke too - obj.isStateOK = false // something made state dirty - processChan <- struct{}{} // trigger process + obj.isStateOK = false // something made state dirty + resp := NewResp() + processChan <- Event{eventNil, resp, "", true} // trigger process + resp.ACKWait() // wait for the ACK() } } } diff --git a/file.go b/file.go index bb8c015b..77413dca 100644 --- a/file.go +++ b/file.go @@ -102,7 +102,7 @@ func (obj *FileRes) Validate() bool { // File watcher for files and directories // Modify with caution, probably important to write some test cases first! // obj.GetPath(): file or directory -func (obj *FileRes) Watch(processChan chan struct{}) { +func (obj *FileRes) Watch(processChan chan Event) { if obj.IsWatching() { return } @@ -260,7 +260,9 @@ func (obj *FileRes) Watch(processChan chan struct{}) { dirty = false obj.isStateOK = false // something made state dirty } - processChan <- struct{}{} // trigger process + resp := NewResp() + processChan <- Event{eventNil, resp, "", true} // trigger process + resp.ACKWait() // wait for the ACK() } } } diff --git a/noop.go b/noop.go index 591b36f2..763d4628 100644 --- a/noop.go +++ b/noop.go @@ -53,7 +53,7 @@ func (obj *NoopRes) Validate() bool { return true } -func (obj *NoopRes) Watch(processChan chan struct{}) { +func (obj *NoopRes) Watch(processChan chan Event) { if obj.IsWatching() { return } @@ -84,7 +84,9 @@ func (obj *NoopRes) Watch(processChan chan struct{}) { send = false // only do this on certain types of events //obj.isStateOK = false // something made state dirty - processChan <- struct{}{} // trigger process + resp := NewResp() + processChan <- Event{eventNil, resp, "", true} // trigger process + resp.ACKWait() // wait for the ACK() } } } diff --git a/pgraph.go b/pgraph.go index bb23ecb9..a19850bb 100644 --- a/pgraph.go +++ b/pgraph.go @@ -735,11 +735,16 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue // this avoids us having to pass the data into // the Watch() function about which graph it is // running on, which isolates things nicely... - chanProcess := make(chan struct{}) + chanProcess := make(chan Event) go func() { - for _ = range chanProcess { - // XXX: do we need to ACK so that it's synchronous? + for event := range chanProcess { + // this has to be synchronous, + // because otherwise the Res + // event loop will keep running + // and change state, causing the + // converged timeout to fire! g.Process(vv) + event.ACK() // sync } }() vv.Res.Watch(chanProcess) // i block until i end diff --git a/pkg.go b/pkg.go index 0ff7eb20..cbe7f2d8 100644 --- a/pkg.go +++ b/pkg.go @@ -107,7 +107,7 @@ func (obj *PkgRes) Validate() bool { // use UpdatesChanged signal to watch for changes // TODO: https://github.com/hughsie/PackageKit/issues/109 // TODO: https://github.com/hughsie/PackageKit/issues/110 -func (obj *PkgRes) Watch(processChan chan struct{}) { +func (obj *PkgRes) Watch(processChan chan Event) { if obj.IsWatching() { return } @@ -173,7 +173,9 @@ func (obj *PkgRes) Watch(processChan chan struct{}) { dirty = false obj.isStateOK = false // something made state dirty } - processChan <- struct{}{} // trigger process + resp := NewResp() + processChan <- Event{eventNil, resp, "", true} // trigger process + resp.ACKWait() // wait for the ACK() } } } diff --git a/resources.go b/resources.go index 2390f9fa..5c87d26d 100644 --- a/resources.go +++ b/resources.go @@ -102,7 +102,7 @@ type Res interface { Init() //Validate() bool // TODO: this might one day be added GetUUIDs() []ResUUID // most resources only return one - Watch(chan struct{}) // send on channel to signal process() events + Watch(chan Event) // send on channel to signal process() events CheckApply(bool) (bool, error) AutoEdges() AutoEdge Compare(Res) bool diff --git a/svc.go b/svc.go index 69f0a309..1b308f4b 100644 --- a/svc.go +++ b/svc.go @@ -67,7 +67,7 @@ func (obj *SvcRes) Validate() bool { } // Service watcher -func (obj *SvcRes) Watch(processChan chan struct{}) { +func (obj *SvcRes) Watch(processChan chan Event) { if obj.IsWatching() { return } @@ -215,7 +215,9 @@ func (obj *SvcRes) Watch(processChan chan struct{}) { dirty = false obj.isStateOK = false // something made state dirty } - processChan <- struct{}{} // trigger process + resp := NewResp() + processChan <- Event{eventNil, resp, "", true} // trigger process + resp.ACKWait() // wait for the ACK() } }