Rename type to resource (res) and service to svc
Naming the resources "type" was a stupid mistake, and is a huge source of confusion when also talking about real types. Fix this before it gets out of hand.
This commit is contained in:
407
resources.go
Normal file
407
resources.go
Normal file
@@ -0,0 +1,407 @@
|
||||
// Mgmt
|
||||
// Copyright (C) 2013-2016+ James Shubin and the project contributors
|
||||
// Written by James Shubin <james@shubin.ca> and the project contributors
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
//go:generate stringer -type=resState -output=resstate_stringer.go
|
||||
type resState int
|
||||
|
||||
const (
|
||||
resStateNil resState = iota
|
||||
resStateWatching
|
||||
resStateEvent // an event has happened, but we haven't poked yet
|
||||
resStateCheckApply
|
||||
resStatePoking
|
||||
)
|
||||
|
||||
//go:generate stringer -type=resConvergedState -output=resconvergedstate_stringer.go
|
||||
type resConvergedState int
|
||||
|
||||
const (
|
||||
resConvergedNil resConvergedState = iota
|
||||
//resConverged
|
||||
resConvergedTimeout
|
||||
)
|
||||
|
||||
type Res interface {
|
||||
Init()
|
||||
GetName() string // can't be named "Name()" because of struct field
|
||||
GetRes() string
|
||||
Watch()
|
||||
StateOK() bool // TODO: can we rename this to something better?
|
||||
Apply() bool
|
||||
SetVertex(*Vertex)
|
||||
SetConvergedCallback(ctimeout int, converged chan bool)
|
||||
Compare(Res) bool
|
||||
SendEvent(eventName, bool, bool) bool
|
||||
IsWatching() bool
|
||||
SetWatching(bool)
|
||||
GetConvergedState() resConvergedState
|
||||
SetConvergedState(resConvergedState)
|
||||
GetState() resState
|
||||
SetState(resState)
|
||||
GetTimestamp() int64
|
||||
UpdateTimestamp() int64
|
||||
OKTimestamp() bool
|
||||
Poke(bool)
|
||||
BackPoke()
|
||||
}
|
||||
|
||||
type BaseRes struct {
|
||||
Name string `yaml:"name"`
|
||||
timestamp int64 // last updated timestamp ?
|
||||
events chan Event
|
||||
vertex *Vertex
|
||||
state resState
|
||||
convergedState resConvergedState
|
||||
watching bool // is Watch() loop running ?
|
||||
ctimeout int // converged timeout
|
||||
converged chan bool
|
||||
isStateOK bool // whether the state is okay based on events or not
|
||||
}
|
||||
|
||||
type NoopRes struct {
|
||||
BaseRes `yaml:",inline"`
|
||||
Comment string `yaml:"comment"` // extra field for example purposes
|
||||
}
|
||||
|
||||
func NewNoopRes(name string) *NoopRes {
|
||||
// FIXME: we could get rid of this New constructor and use raw object creation with a required Init()
|
||||
return &NoopRes{
|
||||
BaseRes: BaseRes{
|
||||
Name: name,
|
||||
events: make(chan Event), // unbuffered chan size to avoid stale events
|
||||
vertex: nil,
|
||||
},
|
||||
Comment: "",
|
||||
}
|
||||
}
|
||||
|
||||
// initialize structures like channels if created without New constructor
|
||||
func (obj *BaseRes) Init() {
|
||||
obj.events = make(chan Event)
|
||||
}
|
||||
|
||||
// this method gets used by all the types, if we have one of (obj NoopRes) it would get overridden in that case!
|
||||
func (obj *BaseRes) GetName() string {
|
||||
return obj.Name
|
||||
}
|
||||
|
||||
func (obj *BaseRes) GetRes() string {
|
||||
return "Base"
|
||||
}
|
||||
|
||||
func (obj *BaseRes) GetVertex() *Vertex {
|
||||
return obj.vertex
|
||||
}
|
||||
|
||||
func (obj *BaseRes) SetVertex(v *Vertex) {
|
||||
obj.vertex = v
|
||||
}
|
||||
|
||||
func (obj *BaseRes) SetConvergedCallback(ctimeout int, converged chan bool) {
|
||||
obj.ctimeout = ctimeout
|
||||
obj.converged = converged
|
||||
}
|
||||
|
||||
// is the Watch() function running?
|
||||
func (obj *BaseRes) IsWatching() bool {
|
||||
return obj.watching
|
||||
}
|
||||
|
||||
// store status of if the Watch() function is running
|
||||
func (obj *BaseRes) SetWatching(b bool) {
|
||||
obj.watching = b
|
||||
}
|
||||
|
||||
func (obj *BaseRes) GetConvergedState() resConvergedState {
|
||||
return obj.convergedState
|
||||
}
|
||||
|
||||
func (obj *BaseRes) SetConvergedState(state resConvergedState) {
|
||||
obj.convergedState = state
|
||||
}
|
||||
|
||||
func (obj *BaseRes) GetState() resState {
|
||||
return obj.state
|
||||
}
|
||||
|
||||
func (obj *BaseRes) SetState(state resState) {
|
||||
if DEBUG {
|
||||
log.Printf("%v[%v]: State: %v -> %v", obj.GetRes(), obj.GetName(), obj.GetState(), state)
|
||||
}
|
||||
obj.state = state
|
||||
}
|
||||
|
||||
// GetTimestamp returns the timestamp of a vertex
|
||||
func (obj *BaseRes) GetTimestamp() int64 {
|
||||
return obj.timestamp
|
||||
}
|
||||
|
||||
// UpdateTimestamp updates the timestamp on a vertex and returns the new value
|
||||
func (obj *BaseRes) UpdateTimestamp() int64 {
|
||||
obj.timestamp = time.Now().UnixNano() // update
|
||||
return obj.timestamp
|
||||
}
|
||||
|
||||
// can this element run right now?
|
||||
func (obj *BaseRes) OKTimestamp() bool {
|
||||
v := obj.GetVertex()
|
||||
g := v.GetGraph()
|
||||
// these are all the vertices pointing TO v, eg: ??? -> v
|
||||
for _, n := range g.IncomingGraphEdges(v) {
|
||||
// if the vertex has a greater timestamp than any pre-req (n)
|
||||
// then we can't run right now...
|
||||
// if they're equal (eg: on init of 0) then we also can't run
|
||||
// b/c we should let our pre-req's go first...
|
||||
x, y := obj.GetTimestamp(), n.Res.GetTimestamp()
|
||||
if DEBUG {
|
||||
log.Printf("%v[%v]: OKTimestamp: (%v) >= %v[%v](%v): !%v", obj.GetRes(), obj.GetName(), x, n.GetRes(), n.GetName(), y, x >= y)
|
||||
}
|
||||
if x >= y {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// notify nodes after me in the dependency graph that they need refreshing...
|
||||
// NOTE: this assumes that this can never fail or need to be rescheduled
|
||||
func (obj *BaseRes) Poke(activity bool) {
|
||||
v := obj.GetVertex()
|
||||
g := v.GetGraph()
|
||||
// these are all the vertices pointing AWAY FROM v, eg: v -> ???
|
||||
for _, n := range g.OutgoingGraphEdges(v) {
|
||||
// XXX: if we're in state event and haven't been cancelled by
|
||||
// apply, then we can cancel a poke to a child, right? XXX
|
||||
// XXX: if n.Res.GetState() != resStateEvent { // is this correct?
|
||||
if true { // XXX
|
||||
if DEBUG {
|
||||
log.Printf("%v[%v]: Poke: %v[%v]", v.GetRes(), v.GetName(), n.GetRes(), n.GetName())
|
||||
}
|
||||
n.SendEvent(eventPoke, false, activity) // XXX: can this be switched to sync?
|
||||
} else {
|
||||
if DEBUG {
|
||||
log.Printf("%v[%v]: Poke: %v[%v]: Skipped!", v.GetRes(), v.GetName(), n.GetRes(), n.GetName())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// poke the pre-requisites that are stale and need to run before I can run...
|
||||
func (obj *BaseRes) BackPoke() {
|
||||
v := obj.GetVertex()
|
||||
g := v.GetGraph()
|
||||
// these are all the vertices pointing TO v, eg: ??? -> v
|
||||
for _, n := range g.IncomingGraphEdges(v) {
|
||||
x, y, s := obj.GetTimestamp(), n.Res.GetTimestamp(), n.Res.GetState()
|
||||
// if the parent timestamp needs poking AND it's not in state
|
||||
// resStateEvent, then poke it. If the parent is in resStateEvent it
|
||||
// means that an event is pending, so we'll be expecting a poke
|
||||
// back soon, so we can safely discard the extra parent poke...
|
||||
// TODO: implement a stateLT (less than) to tell if something
|
||||
// happens earlier in the state cycle and that doesn't wrap nil
|
||||
if x >= y && (s != resStateEvent && s != resStateCheckApply) {
|
||||
if DEBUG {
|
||||
log.Printf("%v[%v]: BackPoke: %v[%v]", v.GetRes(), v.GetName(), n.GetRes(), n.GetName())
|
||||
}
|
||||
n.SendEvent(eventBackPoke, false, false) // XXX: can this be switched to sync?
|
||||
} else {
|
||||
if DEBUG {
|
||||
log.Printf("%v[%v]: BackPoke: %v[%v]: Skipped!", v.GetRes(), v.GetName(), n.GetRes(), n.GetName())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// push an event into the message queue for a particular vertex
|
||||
func (obj *BaseRes) SendEvent(event eventName, sync bool, activity bool) bool {
|
||||
// TODO: isn't this race-y ?
|
||||
if !obj.IsWatching() { // element has already exited
|
||||
return false // if we don't return, we'll block on the send
|
||||
}
|
||||
if !sync {
|
||||
obj.events <- Event{event, nil, "", activity}
|
||||
return true
|
||||
}
|
||||
|
||||
resp := make(chan bool)
|
||||
obj.events <- Event{event, resp, "", activity}
|
||||
for {
|
||||
value := <-resp
|
||||
// wait until true value
|
||||
if value {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// process events when a select gets one, this handles the pause code too!
|
||||
// the return values specify if we should exit and poke respectively
|
||||
func (obj *BaseRes) ReadEvent(event *Event) (exit, poke bool) {
|
||||
event.ACK()
|
||||
switch event.Name {
|
||||
case eventStart:
|
||||
return false, true
|
||||
|
||||
case eventPoke:
|
||||
return false, true
|
||||
|
||||
case eventBackPoke:
|
||||
return false, true // forward poking in response to a back poke!
|
||||
|
||||
case eventExit:
|
||||
return true, false
|
||||
|
||||
case eventPause:
|
||||
// wait for next event to continue
|
||||
select {
|
||||
case e := <-obj.events:
|
||||
e.ACK()
|
||||
if e.Name == eventExit {
|
||||
return true, false
|
||||
} else if e.Name == eventStart { // eventContinue
|
||||
return false, false // don't poke on unpause!
|
||||
} else {
|
||||
// if we get a poke event here, it's a bug!
|
||||
log.Fatalf("%v[%v]: Unknown event: %v, while paused!", obj.GetRes(), obj.GetName(), e)
|
||||
}
|
||||
}
|
||||
|
||||
default:
|
||||
log.Fatal("Unknown event: ", event)
|
||||
}
|
||||
return true, false // required to keep the stupid go compiler happy
|
||||
}
|
||||
|
||||
// useful for using as: return CleanState() in the StateOK functions when there
|
||||
// are multiple `true` return exits
|
||||
func (obj *BaseRes) CleanState() bool {
|
||||
obj.isStateOK = true
|
||||
return true
|
||||
}
|
||||
|
||||
// XXX: rename this function
|
||||
func Process(obj Res) {
|
||||
if DEBUG {
|
||||
log.Printf("%v[%v]: Process()", obj.GetRes(), obj.GetName())
|
||||
}
|
||||
obj.SetState(resStateEvent)
|
||||
var ok = true
|
||||
var apply = false // did we run an apply?
|
||||
// is it okay to run dependency wise right now?
|
||||
// if not, that's okay because when the dependency runs, it will poke
|
||||
// us back and we will run if needed then!
|
||||
if obj.OKTimestamp() {
|
||||
if DEBUG {
|
||||
log.Printf("%v[%v]: OKTimestamp(%v)", obj.GetRes(), obj.GetName(), obj.GetTimestamp())
|
||||
}
|
||||
if !obj.StateOK() { // TODO: can we rename this to something better?
|
||||
if DEBUG {
|
||||
log.Printf("%v[%v]: !StateOK()", obj.GetRes(), obj.GetName())
|
||||
}
|
||||
// throw an error if apply fails...
|
||||
// if this fails, don't UpdateTimestamp()
|
||||
obj.SetState(resStateCheckApply)
|
||||
if !obj.Apply() { // check for error
|
||||
ok = false
|
||||
} else {
|
||||
apply = true
|
||||
}
|
||||
}
|
||||
|
||||
if ok {
|
||||
// update this timestamp *before* we poke or the poked
|
||||
// nodes might fail due to having a too old timestamp!
|
||||
obj.UpdateTimestamp() // this was touched...
|
||||
obj.SetState(resStatePoking) // can't cancel parent poke
|
||||
obj.Poke(apply)
|
||||
}
|
||||
// poke at our pre-req's instead since they need to refresh/run...
|
||||
} else {
|
||||
// only poke at the pre-req's that need to run
|
||||
go obj.BackPoke()
|
||||
}
|
||||
}
|
||||
|
||||
func (obj *NoopRes) GetRes() string {
|
||||
return "Noop"
|
||||
}
|
||||
|
||||
func (obj *NoopRes) Watch() {
|
||||
if obj.IsWatching() {
|
||||
return
|
||||
}
|
||||
obj.SetWatching(true)
|
||||
defer obj.SetWatching(false)
|
||||
|
||||
//vertex := obj.vertex // stored with SetVertex
|
||||
var send = false // send event?
|
||||
var exit = false
|
||||
for {
|
||||
obj.SetState(resStateWatching) // reset
|
||||
select {
|
||||
case event := <-obj.events:
|
||||
obj.SetConvergedState(resConvergedNil)
|
||||
// we avoid sending events on unpause
|
||||
if exit, send = obj.ReadEvent(&event); exit {
|
||||
return // exit
|
||||
}
|
||||
|
||||
case _ = <-TimeAfterOrBlock(obj.ctimeout):
|
||||
obj.SetConvergedState(resConvergedTimeout)
|
||||
obj.converged <- true
|
||||
continue
|
||||
}
|
||||
|
||||
// do all our event sending all together to avoid duplicate msgs
|
||||
if send {
|
||||
send = false
|
||||
// only do this on certain types of events
|
||||
//obj.isStateOK = false // something made state dirty
|
||||
Process(obj) // XXX: rename this function
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (obj *NoopRes) StateOK() bool {
|
||||
return true // never needs updating
|
||||
}
|
||||
|
||||
func (obj *NoopRes) Apply() bool {
|
||||
log.Printf("%v[%v]: Apply", obj.GetRes(), obj.GetName())
|
||||
return true
|
||||
}
|
||||
|
||||
func (obj *NoopRes) Compare(res Res) bool {
|
||||
switch res.(type) {
|
||||
// we can only compare NoopRes to others of the same resource
|
||||
case *NoopRes:
|
||||
res := res.(*NoopRes)
|
||||
if obj.Name != res.Name {
|
||||
return false
|
||||
}
|
||||
default:
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
Reference in New Issue
Block a user