Force process events to be synchronous

This avoids messing up the converged-timeout state!
This commit is contained in:
James Shubin
2016-03-28 20:22:03 -04:00
parent 1b01f908e3
commit c59f45a37b
8 changed files with 52 additions and 17 deletions

View File

@@ -21,16 +21,19 @@ package main
type eventName int
const (
eventExit eventName = iota
eventNil eventName = iota
eventExit
eventStart
eventPause
eventPoke
eventBackPoke
)
type Resp chan bool
type Event struct {
Name eventName
Resp chan bool // channel to send an ack response on, nil to skip
Resp Resp // channel to send an ack response on, nil to skip
//Wg *sync.WaitGroup // receiver barrier to Wait() for everyone else on
Msg string // some words for fun
Activity bool // did something interesting happen?
@@ -49,6 +52,23 @@ func (event *Event) NACK() {
}
}
// Resp is just a helper to return the right type of response channel
func NewResp() Resp {
resp := make(chan bool)
return resp
}
// ACKWait waits for a +ive Ack from a Resp channel
func (resp Resp) ACKWait() {
for {
value := <-resp
// wait until true value
if value {
return
}
}
}
// get the activity value
func (event *Event) GetActivity() bool {
return event.Activity

View File

@@ -102,7 +102,7 @@ func (obj *ExecRes) BufioChanScanner(scanner *bufio.Scanner) (chan string, chan
}
// Exec watcher
func (obj *ExecRes) Watch(processChan chan struct{}) {
func (obj *ExecRes) Watch(processChan chan Event) {
if obj.IsWatching() {
return
}
@@ -192,8 +192,10 @@ func (obj *ExecRes) Watch(processChan chan struct{}) {
if send {
send = false
// it is okay to invalidate the clean state on poke too
obj.isStateOK = false // something made state dirty
processChan <- struct{}{} // trigger process
obj.isStateOK = false // something made state dirty
resp := NewResp()
processChan <- Event{eventNil, resp, "", true} // trigger process
resp.ACKWait() // wait for the ACK()
}
}
}

View File

@@ -102,7 +102,7 @@ func (obj *FileRes) Validate() bool {
// File watcher for files and directories
// Modify with caution, probably important to write some test cases first!
// obj.GetPath(): file or directory
func (obj *FileRes) Watch(processChan chan struct{}) {
func (obj *FileRes) Watch(processChan chan Event) {
if obj.IsWatching() {
return
}
@@ -260,7 +260,9 @@ func (obj *FileRes) Watch(processChan chan struct{}) {
dirty = false
obj.isStateOK = false // something made state dirty
}
processChan <- struct{}{} // trigger process
resp := NewResp()
processChan <- Event{eventNil, resp, "", true} // trigger process
resp.ACKWait() // wait for the ACK()
}
}
}

View File

@@ -53,7 +53,7 @@ func (obj *NoopRes) Validate() bool {
return true
}
func (obj *NoopRes) Watch(processChan chan struct{}) {
func (obj *NoopRes) Watch(processChan chan Event) {
if obj.IsWatching() {
return
}
@@ -84,7 +84,9 @@ func (obj *NoopRes) Watch(processChan chan struct{}) {
send = false
// only do this on certain types of events
//obj.isStateOK = false // something made state dirty
processChan <- struct{}{} // trigger process
resp := NewResp()
processChan <- Event{eventNil, resp, "", true} // trigger process
resp.ACKWait() // wait for the ACK()
}
}
}

View File

@@ -735,11 +735,16 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
// this avoids us having to pass the data into
// the Watch() function about which graph it is
// running on, which isolates things nicely...
chanProcess := make(chan struct{})
chanProcess := make(chan Event)
go func() {
for _ = range chanProcess {
// XXX: do we need to ACK so that it's synchronous?
for event := range chanProcess {
// this has to be synchronous,
// because otherwise the Res
// event loop will keep running
// and change state, causing the
// converged timeout to fire!
g.Process(vv)
event.ACK() // sync
}
}()
vv.Res.Watch(chanProcess) // i block until i end

6
pkg.go
View File

@@ -107,7 +107,7 @@ func (obj *PkgRes) Validate() bool {
// use UpdatesChanged signal to watch for changes
// TODO: https://github.com/hughsie/PackageKit/issues/109
// TODO: https://github.com/hughsie/PackageKit/issues/110
func (obj *PkgRes) Watch(processChan chan struct{}) {
func (obj *PkgRes) Watch(processChan chan Event) {
if obj.IsWatching() {
return
}
@@ -173,7 +173,9 @@ func (obj *PkgRes) Watch(processChan chan struct{}) {
dirty = false
obj.isStateOK = false // something made state dirty
}
processChan <- struct{}{} // trigger process
resp := NewResp()
processChan <- Event{eventNil, resp, "", true} // trigger process
resp.ACKWait() // wait for the ACK()
}
}
}

View File

@@ -102,7 +102,7 @@ type Res interface {
Init()
//Validate() bool // TODO: this might one day be added
GetUUIDs() []ResUUID // most resources only return one
Watch(chan struct{}) // send on channel to signal process() events
Watch(chan Event) // send on channel to signal process() events
CheckApply(bool) (bool, error)
AutoEdges() AutoEdge
Compare(Res) bool

6
svc.go
View File

@@ -67,7 +67,7 @@ func (obj *SvcRes) Validate() bool {
}
// Service watcher
func (obj *SvcRes) Watch(processChan chan struct{}) {
func (obj *SvcRes) Watch(processChan chan Event) {
if obj.IsWatching() {
return
}
@@ -215,7 +215,9 @@ func (obj *SvcRes) Watch(processChan chan struct{}) {
dirty = false
obj.isStateOK = false // something made state dirty
}
processChan <- struct{}{} // trigger process
resp := NewResp()
processChan <- Event{eventNil, resp, "", true} // trigger process
resp.ACKWait() // wait for the ACK()
}
}