engine: Create a resource kind+name specific stateful store

This adds a meta state store that is preserved between graph switches if
the kind and name match. This is useful so that rapid graph changes
don't necessarily reset their retry count if they've only changed one
resource field.
This commit is contained in:
James Shubin
2023-09-01 20:45:12 -04:00
parent 07bd8afc4a
commit 9545e409d4
7 changed files with 106 additions and 1 deletions

View File

@@ -256,6 +256,11 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
return fmt.Errorf("permanently limited (rate != Inf, burst = 0)")
}
// initialize or reinitialize the meta state for this resource uid
if _, exists := obj.metas[engine.PtrUID(res)]; !exists || res.MetaParams().Reset {
obj.metas[engine.PtrUID(res)] = &engine.MetaState{}
}
//defer close(obj.state[vertex].stopped) // done signal
obj.state[vertex].cuid = obj.Converger.Register()

View File

@@ -61,6 +61,8 @@ type Engine struct {
waits map[pgraph.Vertex]*sync.WaitGroup // wg for the Worker func
wlock *sync.Mutex // lock around waits map
metas map[engine.ResPtrUID]*engine.MetaState // meta state
slock *sync.Mutex // semaphore lock
semas map[string]*semaphore.Semaphore
@@ -97,6 +99,8 @@ func (obj *Engine) Init() error {
obj.waits = make(map[pgraph.Vertex]*sync.WaitGroup)
obj.wlock = &sync.Mutex{}
obj.metas = make(map[engine.ResPtrUID]*engine.MetaState)
obj.slock = &sync.Mutex{}
obj.semas = make(map[string]*semaphore.Semaphore)
@@ -161,6 +165,15 @@ func (obj *Engine) Commit() error {
// TODO: Does this hurt performance or graph changes ?
activeMetas := make(map[engine.ResPtrUID]struct{})
for vertex := range obj.state {
res, ok := vertex.(engine.Res)
if !ok { // should not happen, previously validated
return fmt.Errorf("not a Res")
}
activeMetas[engine.PtrUID(res)] = struct{}{} // add
}
start := []func() error{} // functions to run after graphsync to start...
vertexAddFn := func(vertex pgraph.Vertex) error {
// some of these validation steps happen before this Commit step
@@ -178,6 +191,8 @@ func (obj *Engine) Commit() error {
return fmt.Errorf("the Res state already exists")
}
activeMetas[engine.PtrUID(res)] = struct{}{} // add
if obj.Debug {
obj.Logf("Validate(%s)", res)
}
@@ -254,6 +269,12 @@ func (obj *Engine) Commit() error {
free := []func() error{} // functions to run after graphsync to reset...
vertexRemoveFn := func(vertex pgraph.Vertex) error {
res, ok := vertex.(engine.Res)
if !ok { // should not happen, previously validated
return fmt.Errorf("not a Res")
}
delete(activeMetas, engine.PtrUID(res))
// wait for exit before starting new graph!
close(obj.state[vertex].removeDone) // causes doneCtx to cancel
close(obj.state[vertex].resumeSignal) // unblock (it only closes here)
@@ -318,6 +339,21 @@ func (obj *Engine) Commit() error {
if err := obj.graph.GraphSync(obj.nextGraph, vertexCmpFn, vertexAddFn, vertexRemoveFn, engine.EdgeCmpFn); err != nil {
return errwrap.Wrapf(err, "error running graph sync")
}
// This happens after GraphSync when vertexRemoveFn and vertexAddFn are
// done running. Those two modified the activeMetas map. It's important
// that vertexRemoveFn runs before vertexAddFn, but GraphSync guarantees
// that, and it would be kind of illogical to not run things that way.
metaGC := make(map[engine.ResPtrUID]struct{}) // which metas should we garbage collect?
for ptrUID := range obj.metas {
if _, exists := activeMetas[ptrUID]; !exists {
metaGC[ptrUID] = struct{}{}
}
}
for ptrUID := range metaGC {
delete(obj.metas, ptrUID) // otherwise, this could grow forever
}
// We run these afterwards, so that we don't unnecessarily start anyone
// if GraphSync failed in some way. Otherwise we'd have to do clean up!
for _, fn := range start {

View File

@@ -36,6 +36,7 @@ var DefaultMetaParams = &MetaParams{
Poll: 0, // defaults to watching for events
Limit: rate.Inf, // defaults to no limit
Burst: 0, // no burst needed on an infinite rate
Reset: false,
//Sema: []string{},
Rewatch: false,
Realize: false, // true would be more awesome, but unexpected for users
@@ -78,6 +79,15 @@ type MetaParams struct {
// Burst is the number of events to allow in a burst.
Burst int `yaml:"burst"`
// Reset causes the meta param state to reset when the resource changes.
// What this means is if you have a resource of a specific kind and name
// and in the subsequent graph it changes because one of its params
// changed, normally the Retry, and other params will remember their
// state, and you'll not reset the retry counter, however if this is
// true, then it will get reset. Note that any normal reset mechanisms
// built into retry are not affected by this.
Reset bool `yaml:"reset"`
// Sema is a list of semaphore ids in the form `id` or `id:count`. If
// you don't specify a count, then 1 is assumed. The sema of `foo` which
// has a count equal to 1, is different from a sema named `foo:1` which
@@ -135,6 +145,9 @@ func (obj *MetaParams) Cmp(meta *MetaParams) error {
if obj.Burst != meta.Burst {
return fmt.Errorf("values for Burst are different")
}
if obj.Reset != meta.Reset {
return fmt.Errorf("values for Reset are different")
}
if err := util.SortedStrSliceCompare(obj.Sema, meta.Sema); err != nil {
return errwrap.Wrapf(err, "values for Sema are different")
@@ -182,6 +195,7 @@ func (obj *MetaParams) Copy() *MetaParams {
Poll: obj.Poll,
Limit: obj.Limit, // FIXME: can we copy this type like this? test me!
Burst: obj.Burst,
Reset: obj.Reset,
Sema: sema,
Rewatch: obj.Rewatch,
Realize: obj.Realize,
@@ -202,3 +216,15 @@ func (obj *MetaParams) UnmarshalYAML(unmarshal func(interface{}) error) error {
*obj = MetaParams(raw) // restore from indirection with type conversion!
return nil
}
// MetaState is some local meta param state that is saved between resources of
// the same kind+name unique ID. Even though the vertex/resource pointer might
// change during a graph switch (because the params changed) we might be
// logically referring to the same resource, and we might want to preserve some
// data across that switch. The common example is the resource retry count. If a
// resource failed three times, and we had a limit of five before it would be a
// permanent failure, then we don't want to reset this counter just because we
// changed a parameter (field) of the resource. This doesn't mean we don't want
// to ever reset these counts. For that, flip on the reset meta param.
type MetaState struct {
}

View File

@@ -220,6 +220,16 @@ func Stringer(res Res) string {
return Repr(res.Kind(), res.Name())
}
// ResPtrUID is a unique identifier that is consistent for the kind and name of
// the resource only.
type ResPtrUID string
// PtrUID generates a ResPtrUID from a resource.
func PtrUID(res Res) ResPtrUID {
// the use of "repr" is kind of arbitrary as long as it's unique
return ResPtrUID(Repr(res.Kind(), res.Name()))
}
// Validate validates a resource by checking multiple aspects. This is the main
// entry point for running all the validation steps on a resource.
func Validate(res Res) error {

View File

@@ -1749,3 +1749,20 @@ func TestResources2(t *testing.T) {
})
}
}
func TestResPtrUID1(t *testing.T) {
t1, err := engine.NewNamedResource("test", "test1")
if err != nil {
t.Errorf("could not build resource: %+v", err)
return
}
t2, err := engine.NewNamedResource("test", "test1")
if err != nil {
t.Errorf("could not build resource: %+v", err)
return
}
if uid1, uid2 := engine.PtrUID(t1), engine.PtrUID(t2); uid1 != uid2 {
t.Errorf("uid's don't match")
}
}

View File

@@ -939,6 +939,9 @@ func (obj *StmtRes) metaparams(res engine.Res) error {
// TODO: check that it doesn't overflow
meta.Burst = int(x)
case "reset":
meta.Reset = v.Bool() // must not panic
case "sema": // []string
values := []string{}
for _, x := range v.List() { // must not panic
@@ -1000,6 +1003,9 @@ func (obj *StmtRes) metaparams(res engine.Res) error {
// TODO: check that it doesn't overflow
meta.Burst = int(x)
}
if val, exists := v.Struct()["reset"]; exists {
meta.Reset = val.Bool() // must not panic
}
if val, exists := v.Struct()["sema"]; exists {
values := []string{}
for _, x := range val.List() { // must not panic
@@ -1594,6 +1600,7 @@ func (obj *StmtResMeta) Init(data *interfaces.Data) error {
case "poll":
case "limit":
case "burst":
case "reset":
case "sema":
case "rewatch":
case "realize":
@@ -1792,6 +1799,9 @@ func (obj *StmtResMeta) Unify(kind string) ([]interfaces.Invariant, error) {
case "burst":
invar = static(types.TypeInt)
case "reset":
invar = static(types.TypeBool)
case "sema":
invar = static(types.NewType("[]str"))
@@ -1827,7 +1837,7 @@ func (obj *StmtResMeta) Unify(kind string) ([]interfaces.Invariant, error) {
// FIXME: allow partial subsets of this struct, and in any order
// FIXME: we might need an updated unification engine to do this
wrap := func(reverse *types.Type) *types.Type {
return types.NewType(fmt.Sprintf("struct{noop bool; retry int; delay int; poll int; limit float; burst int; sema []str; rewatch bool; realize bool; reverse %s; autoedge bool; autogroup bool}", reverse.String()))
return types.NewType(fmt.Sprintf("struct{noop bool; retry int; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; reverse %s; autoedge bool; autogroup bool}", reverse.String()))
}
ors := []interfaces.Invariant{}
invarBool := static(wrap(types.TypeBool))

View File

@@ -9,6 +9,7 @@ test "test" {
poll => 5,
limit => 4.2,
burst => 3,
reset => false,
sema => ["foo:1", "bar:3",],
rewatch => false,
realize => true,