Add state caching and invalidation to service type

This required a change in the event system to add an "activity" field.
This is meant to be generic in the case that there is more than one need
for it, but at the moment, allows a poke to tell that it is a poke in
response to an apply that just finished, instead of a regular poke or
backpoke in which all that matters is timestamp updates, because there
wasn't any actual work done (since that state was okay).
This commit is contained in:
James Shubin
2016-01-14 23:22:31 -05:00
parent 935805aeda
commit f7858b8e9b
5 changed files with 46 additions and 18 deletions

View File

@@ -205,7 +205,7 @@ func UpdateGraphFromConfig(config *graphConfig, hostname string, g *Graph, etcdO
for _, v := range g.GetVertices() {
if !HasVertex(v, keep) {
// wait for exit before starting new graph!
v.Type.SendEvent(eventExit, true)
v.Type.SendEvent(eventExit, true, false)
g.DeleteVertex(v)
}
}

View File

@@ -25,14 +25,15 @@ const (
eventStart
eventPause
eventPoke
eventChanged
eventBackPoke
)
type Event struct {
Name eventName
Resp chan bool // channel to send an ack response on, nil to skip
//Wg *sync.WaitGroup // receiver barrier to Wait() for everyone else on
Msg string // some words for fun
Msg string // some words for fun
Activity bool // did something interesting happen?
}
// send a single acknowledgement on the channel if one was requested
@@ -47,3 +48,8 @@ func (event *Event) NACK() {
event.Resp <- false // send NACK
}
}
// get the activity value
func (event *Event) GetActivity() bool {
return event.Activity
}

View File

@@ -556,7 +556,7 @@ func (g *Graph) Start(wg *sync.WaitGroup) { // start or continue
}
// ensure state is started before continuing on to next vertex
v.Type.SendEvent(eventStart, true)
v.Type.SendEvent(eventStart, true, false)
}
}
@@ -564,7 +564,7 @@ func (g *Graph) Start(wg *sync.WaitGroup) { // start or continue
func (g *Graph) Pause() {
t, _ := g.TopologicalSort()
for _, v := range t { // squeeze out the events...
v.Type.SendEvent(eventPause, true)
v.Type.SendEvent(eventPause, true, false)
}
}
@@ -576,7 +576,7 @@ func (g *Graph) Exit() {
// when we hit the 'default' in the select statement!
// XXX: we can do this to quiesce, but it's not necessary now
v.Type.SendEvent(eventExit, true)
v.Type.SendEvent(eventExit, true, false)
}
}

View File

@@ -82,9 +82,10 @@ func (obj *ServiceType) Watch() {
var service = fmt.Sprintf("%v.service", obj.Name) // systemd name
var send = false // send event?
var invalid = false // does the service exist or not?
var previous bool // previous invalid value
set := conn.NewSubscriptionSet() // no error should be returned
var dirty = false
var invalid = false // does the service exist or not?
var previous bool // previous invalid value
set := conn.NewSubscriptionSet() // no error should be returned
subChannel, subErrors := set.Subscribe()
var activeSet = false
@@ -112,6 +113,7 @@ func (obj *ServiceType) Watch() {
if previous != invalid { // if invalid changed, send signal
send = true
dirty = true
}
if invalid {
@@ -133,6 +135,9 @@ func (obj *ServiceType) Watch() {
if ok := obj.ReadEvent(&event); !ok {
return // exit
}
if event.GetActivity() {
dirty = true
}
send = true
case _ = <-TimeAfterOrBlock(obj.ctimeout):
obj.SetConvergedState(typeConvergedTimeout)
@@ -166,6 +171,7 @@ func (obj *ServiceType) Watch() {
log.Printf("Service[%v]->Stopped", service)
}
send = true
dirty = true
case err := <-subErrors:
obj.SetConvergedState(typeConvergedNil) // XXX ?
@@ -178,12 +184,19 @@ func (obj *ServiceType) Watch() {
if ok := obj.ReadEvent(&event); !ok {
return // exit
}
if event.GetActivity() {
dirty = true
}
send = true
}
}
if send {
send = false
if dirty {
dirty = false
obj.isStateOK = false // something made state dirty
}
Process(obj) // XXX: rename this function
}
@@ -191,6 +204,9 @@ func (obj *ServiceType) Watch() {
}
func (obj *ServiceType) StateOK() bool {
if obj.isStateOK { // cache the state
return true
}
if !util.IsRunningSystemd() {
log.Fatal("Systemd is not running.")

View File

@@ -52,7 +52,7 @@ type Type interface {
SetVertex(*Vertex)
SetConvegedCallback(ctimeout int, converged chan bool)
Compare(Type) bool
SendEvent(eventName, bool)
SendEvent(eventName, bool, bool)
IsWatching() bool
SetWatching(bool)
GetConvergedState() typeConvergedState
@@ -62,7 +62,7 @@ type Type interface {
GetTimestamp() int64
UpdateTimestamp() int64
OKTimestamp() bool
Poke()
Poke(bool)
BackPoke()
}
@@ -186,7 +186,7 @@ func (obj *BaseType) OKTimestamp() bool {
// notify nodes after me in the dependency graph that they need refreshing...
// NOTE: this assumes that this can never fail or need to be rescheduled
func (obj *BaseType) Poke() {
func (obj *BaseType) Poke(activity bool) {
v := obj.GetVertex()
g := v.GetGraph()
// these are all the vertices pointing AWAY FROM v, eg: v -> ???
@@ -198,7 +198,7 @@ func (obj *BaseType) Poke() {
if DEBUG {
log.Printf("%v[%v]: Poke: %v[%v]", v.GetType(), v.GetName(), n.GetType(), n.GetName())
}
n.SendEvent(eventPoke, false) // XXX: should this be sync or not? XXX: try it as async for now, but switch to sync and see if we deadlock -- maybe it's possible, i don't know for sure yet
n.SendEvent(eventPoke, false, activity) // XXX: can this be switched to sync?
} else {
if DEBUG {
log.Printf("%v[%v]: Poke: %v[%v]: Skipped!", v.GetType(), v.GetName(), n.GetType(), n.GetName())
@@ -224,7 +224,7 @@ func (obj *BaseType) BackPoke() {
if DEBUG {
log.Printf("%v[%v]: BackPoke: %v[%v]", v.GetType(), v.GetName(), n.GetType(), n.GetName())
}
n.SendEvent(eventPoke, false) // XXX: should this be sync or not? XXX: try it as async for now, but switch to sync and see if we deadlock -- maybe it's possible, i don't know for sure yet
n.SendEvent(eventBackPoke, false, false) // XXX: can this be switched to sync?
} else {
if DEBUG {
log.Printf("%v[%v]: BackPoke: %v[%v]: Skipped!", v.GetType(), v.GetName(), n.GetType(), n.GetName())
@@ -234,14 +234,14 @@ func (obj *BaseType) BackPoke() {
}
// push an event into the message queue for a particular type vertex
func (obj *BaseType) SendEvent(event eventName, sync bool) {
func (obj *BaseType) SendEvent(event eventName, sync bool, activity bool) {
if !sync {
obj.events <- Event{event, nil, ""}
obj.events <- Event{event, nil, "", activity}
return
}
resp := make(chan bool)
obj.events <- Event{event, resp, ""}
obj.events <- Event{event, resp, "", activity}
for {
value := <-resp
// wait until true value
@@ -262,6 +262,9 @@ func (obj *BaseType) ReadEvent(event *Event) bool {
case eventPoke:
return true
case eventBackPoke:
return true
case eventExit:
return false
@@ -299,6 +302,7 @@ func Process(obj Type) {
}
obj.SetState(typeEvent)
var ok bool = true
var apply bool = false // did we run an apply?
// is it okay to run dependency wise right now?
// if not, that's okay because when the dependency runs, it will poke
// us back and we will run if needed then!
@@ -315,6 +319,8 @@ func Process(obj Type) {
obj.SetState(typeApplying)
if !obj.Apply() { // check for error
ok = false
} else {
apply = true
}
}
@@ -323,7 +329,7 @@ func Process(obj Type) {
// nodes might fail due to having a too old timestamp!
obj.UpdateTimestamp() // this was touched...
obj.SetState(typePoking) // can't cancel parent poke
obj.Poke()
obj.Poke(apply)
}
// poke at our pre-req's instead since they need to refresh/run...
} else {