diff --git a/config.go b/config.go index de0a6f4a..b402c83d 100644 --- a/config.go +++ b/config.go @@ -205,7 +205,7 @@ func UpdateGraphFromConfig(config *graphConfig, hostname string, g *Graph, etcdO for _, v := range g.GetVertices() { if !HasVertex(v, keep) { // wait for exit before starting new graph! - v.Type.SendEvent(eventExit, true) + v.Type.SendEvent(eventExit, true, false) g.DeleteVertex(v) } } diff --git a/event.go b/event.go index 87fb81c6..60d70dcf 100644 --- a/event.go +++ b/event.go @@ -25,14 +25,15 @@ const ( eventStart eventPause eventPoke - eventChanged + eventBackPoke ) type Event struct { Name eventName Resp chan bool // channel to send an ack response on, nil to skip //Wg *sync.WaitGroup // receiver barrier to Wait() for everyone else on - Msg string // some words for fun + Msg string // some words for fun + Activity bool // did something interesting happen? } // send a single acknowledgement on the channel if one was requested @@ -47,3 +48,8 @@ func (event *Event) NACK() { event.Resp <- false // send NACK } } + +// get the activity value +func (event *Event) GetActivity() bool { + return event.Activity +} diff --git a/pgraph.go b/pgraph.go index 228316d3..40c52531 100644 --- a/pgraph.go +++ b/pgraph.go @@ -556,7 +556,7 @@ func (g *Graph) Start(wg *sync.WaitGroup) { // start or continue } // ensure state is started before continuing on to next vertex - v.Type.SendEvent(eventStart, true) + v.Type.SendEvent(eventStart, true, false) } } @@ -564,7 +564,7 @@ func (g *Graph) Start(wg *sync.WaitGroup) { // start or continue func (g *Graph) Pause() { t, _ := g.TopologicalSort() for _, v := range t { // squeeze out the events... - v.Type.SendEvent(eventPause, true) + v.Type.SendEvent(eventPause, true, false) } } @@ -576,7 +576,7 @@ func (g *Graph) Exit() { // when we hit the 'default' in the select statement! // XXX: we can do this to quiesce, but it's not necessary now - v.Type.SendEvent(eventExit, true) + v.Type.SendEvent(eventExit, true, false) } } diff --git a/service.go b/service.go index a45a1d60..0eff14a7 100644 --- a/service.go +++ b/service.go @@ -82,9 +82,10 @@ func (obj *ServiceType) Watch() { var service = fmt.Sprintf("%v.service", obj.Name) // systemd name var send = false // send event? - var invalid = false // does the service exist or not? - var previous bool // previous invalid value - set := conn.NewSubscriptionSet() // no error should be returned + var dirty = false + var invalid = false // does the service exist or not? + var previous bool // previous invalid value + set := conn.NewSubscriptionSet() // no error should be returned subChannel, subErrors := set.Subscribe() var activeSet = false @@ -112,6 +113,7 @@ func (obj *ServiceType) Watch() { if previous != invalid { // if invalid changed, send signal send = true + dirty = true } if invalid { @@ -133,6 +135,9 @@ func (obj *ServiceType) Watch() { if ok := obj.ReadEvent(&event); !ok { return // exit } + if event.GetActivity() { + dirty = true + } send = true case _ = <-TimeAfterOrBlock(obj.ctimeout): obj.SetConvergedState(typeConvergedTimeout) @@ -166,6 +171,7 @@ func (obj *ServiceType) Watch() { log.Printf("Service[%v]->Stopped", service) } send = true + dirty = true case err := <-subErrors: obj.SetConvergedState(typeConvergedNil) // XXX ? @@ -178,12 +184,19 @@ func (obj *ServiceType) Watch() { if ok := obj.ReadEvent(&event); !ok { return // exit } + if event.GetActivity() { + dirty = true + } send = true } } if send { send = false + if dirty { + dirty = false + obj.isStateOK = false // something made state dirty + } Process(obj) // XXX: rename this function } @@ -191,6 +204,9 @@ func (obj *ServiceType) Watch() { } func (obj *ServiceType) StateOK() bool { + if obj.isStateOK { // cache the state + return true + } if !util.IsRunningSystemd() { log.Fatal("Systemd is not running.") diff --git a/types.go b/types.go index 1e8cb4db..25174f09 100644 --- a/types.go +++ b/types.go @@ -52,7 +52,7 @@ type Type interface { SetVertex(*Vertex) SetConvegedCallback(ctimeout int, converged chan bool) Compare(Type) bool - SendEvent(eventName, bool) + SendEvent(eventName, bool, bool) IsWatching() bool SetWatching(bool) GetConvergedState() typeConvergedState @@ -62,7 +62,7 @@ type Type interface { GetTimestamp() int64 UpdateTimestamp() int64 OKTimestamp() bool - Poke() + Poke(bool) BackPoke() } @@ -186,7 +186,7 @@ func (obj *BaseType) OKTimestamp() bool { // notify nodes after me in the dependency graph that they need refreshing... // NOTE: this assumes that this can never fail or need to be rescheduled -func (obj *BaseType) Poke() { +func (obj *BaseType) Poke(activity bool) { v := obj.GetVertex() g := v.GetGraph() // these are all the vertices pointing AWAY FROM v, eg: v -> ??? @@ -198,7 +198,7 @@ func (obj *BaseType) Poke() { if DEBUG { log.Printf("%v[%v]: Poke: %v[%v]", v.GetType(), v.GetName(), n.GetType(), n.GetName()) } - n.SendEvent(eventPoke, false) // XXX: should this be sync or not? XXX: try it as async for now, but switch to sync and see if we deadlock -- maybe it's possible, i don't know for sure yet + n.SendEvent(eventPoke, false, activity) // XXX: can this be switched to sync? } else { if DEBUG { log.Printf("%v[%v]: Poke: %v[%v]: Skipped!", v.GetType(), v.GetName(), n.GetType(), n.GetName()) @@ -224,7 +224,7 @@ func (obj *BaseType) BackPoke() { if DEBUG { log.Printf("%v[%v]: BackPoke: %v[%v]", v.GetType(), v.GetName(), n.GetType(), n.GetName()) } - n.SendEvent(eventPoke, false) // XXX: should this be sync or not? XXX: try it as async for now, but switch to sync and see if we deadlock -- maybe it's possible, i don't know for sure yet + n.SendEvent(eventBackPoke, false, false) // XXX: can this be switched to sync? } else { if DEBUG { log.Printf("%v[%v]: BackPoke: %v[%v]: Skipped!", v.GetType(), v.GetName(), n.GetType(), n.GetName()) @@ -234,14 +234,14 @@ func (obj *BaseType) BackPoke() { } // push an event into the message queue for a particular type vertex -func (obj *BaseType) SendEvent(event eventName, sync bool) { +func (obj *BaseType) SendEvent(event eventName, sync bool, activity bool) { if !sync { - obj.events <- Event{event, nil, ""} + obj.events <- Event{event, nil, "", activity} return } resp := make(chan bool) - obj.events <- Event{event, resp, ""} + obj.events <- Event{event, resp, "", activity} for { value := <-resp // wait until true value @@ -262,6 +262,9 @@ func (obj *BaseType) ReadEvent(event *Event) bool { case eventPoke: return true + case eventBackPoke: + return true + case eventExit: return false @@ -299,6 +302,7 @@ func Process(obj Type) { } obj.SetState(typeEvent) var ok bool = true + var apply bool = false // did we run an apply? // is it okay to run dependency wise right now? // if not, that's okay because when the dependency runs, it will poke // us back and we will run if needed then! @@ -315,6 +319,8 @@ func Process(obj Type) { obj.SetState(typeApplying) if !obj.Apply() { // check for error ok = false + } else { + apply = true } } @@ -323,7 +329,7 @@ func Process(obj Type) { // nodes might fail due to having a too old timestamp! obj.UpdateTimestamp() // this was touched... obj.SetState(typePoking) // can't cancel parent poke - obj.Poke() + obj.Poke(apply) } // poke at our pre-req's instead since they need to refresh/run... } else {