diff --git a/engine/graph/actions.go b/engine/graph/actions.go index a9e17d23..a8c016ef 100644 --- a/engine/graph/actions.go +++ b/engine/graph/actions.go @@ -256,6 +256,11 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { return fmt.Errorf("permanently limited (rate != Inf, burst = 0)") } + // initialize or reinitialize the meta state for this resource uid + if _, exists := obj.metas[engine.PtrUID(res)]; !exists || res.MetaParams().Reset { + obj.metas[engine.PtrUID(res)] = &engine.MetaState{} + } + //defer close(obj.state[vertex].stopped) // done signal obj.state[vertex].cuid = obj.Converger.Register() diff --git a/engine/graph/engine.go b/engine/graph/engine.go index a84c47f2..b2e307f5 100644 --- a/engine/graph/engine.go +++ b/engine/graph/engine.go @@ -61,6 +61,8 @@ type Engine struct { waits map[pgraph.Vertex]*sync.WaitGroup // wg for the Worker func wlock *sync.Mutex // lock around waits map + metas map[engine.ResPtrUID]*engine.MetaState // meta state + slock *sync.Mutex // semaphore lock semas map[string]*semaphore.Semaphore @@ -97,6 +99,8 @@ func (obj *Engine) Init() error { obj.waits = make(map[pgraph.Vertex]*sync.WaitGroup) obj.wlock = &sync.Mutex{} + obj.metas = make(map[engine.ResPtrUID]*engine.MetaState) + obj.slock = &sync.Mutex{} obj.semas = make(map[string]*semaphore.Semaphore) @@ -161,6 +165,15 @@ func (obj *Engine) Commit() error { // TODO: Does this hurt performance or graph changes ? + activeMetas := make(map[engine.ResPtrUID]struct{}) + for vertex := range obj.state { + res, ok := vertex.(engine.Res) + if !ok { // should not happen, previously validated + return fmt.Errorf("not a Res") + } + activeMetas[engine.PtrUID(res)] = struct{}{} // add + } + start := []func() error{} // functions to run after graphsync to start... vertexAddFn := func(vertex pgraph.Vertex) error { // some of these validation steps happen before this Commit step @@ -178,6 +191,8 @@ func (obj *Engine) Commit() error { return fmt.Errorf("the Res state already exists") } + activeMetas[engine.PtrUID(res)] = struct{}{} // add + if obj.Debug { obj.Logf("Validate(%s)", res) } @@ -254,6 +269,12 @@ func (obj *Engine) Commit() error { free := []func() error{} // functions to run after graphsync to reset... vertexRemoveFn := func(vertex pgraph.Vertex) error { + res, ok := vertex.(engine.Res) + if !ok { // should not happen, previously validated + return fmt.Errorf("not a Res") + } + delete(activeMetas, engine.PtrUID(res)) + // wait for exit before starting new graph! close(obj.state[vertex].removeDone) // causes doneCtx to cancel close(obj.state[vertex].resumeSignal) // unblock (it only closes here) @@ -318,6 +339,21 @@ func (obj *Engine) Commit() error { if err := obj.graph.GraphSync(obj.nextGraph, vertexCmpFn, vertexAddFn, vertexRemoveFn, engine.EdgeCmpFn); err != nil { return errwrap.Wrapf(err, "error running graph sync") } + + // This happens after GraphSync when vertexRemoveFn and vertexAddFn are + // done running. Those two modified the activeMetas map. It's important + // that vertexRemoveFn runs before vertexAddFn, but GraphSync guarantees + // that, and it would be kind of illogical to not run things that way. + metaGC := make(map[engine.ResPtrUID]struct{}) // which metas should we garbage collect? + for ptrUID := range obj.metas { + if _, exists := activeMetas[ptrUID]; !exists { + metaGC[ptrUID] = struct{}{} + } + } + for ptrUID := range metaGC { + delete(obj.metas, ptrUID) // otherwise, this could grow forever + } + // We run these afterwards, so that we don't unnecessarily start anyone // if GraphSync failed in some way. Otherwise we'd have to do clean up! for _, fn := range start { diff --git a/engine/metaparams.go b/engine/metaparams.go index cd5ccf19..79d951ee 100644 --- a/engine/metaparams.go +++ b/engine/metaparams.go @@ -36,6 +36,7 @@ var DefaultMetaParams = &MetaParams{ Poll: 0, // defaults to watching for events Limit: rate.Inf, // defaults to no limit Burst: 0, // no burst needed on an infinite rate + Reset: false, //Sema: []string{}, Rewatch: false, Realize: false, // true would be more awesome, but unexpected for users @@ -78,6 +79,15 @@ type MetaParams struct { // Burst is the number of events to allow in a burst. Burst int `yaml:"burst"` + // Reset causes the meta param state to reset when the resource changes. + // What this means is if you have a resource of a specific kind and name + // and in the subsequent graph it changes because one of its params + // changed, normally the Retry, and other params will remember their + // state, and you'll not reset the retry counter, however if this is + // true, then it will get reset. Note that any normal reset mechanisms + // built into retry are not affected by this. + Reset bool `yaml:"reset"` + // Sema is a list of semaphore ids in the form `id` or `id:count`. If // you don't specify a count, then 1 is assumed. The sema of `foo` which // has a count equal to 1, is different from a sema named `foo:1` which @@ -135,6 +145,9 @@ func (obj *MetaParams) Cmp(meta *MetaParams) error { if obj.Burst != meta.Burst { return fmt.Errorf("values for Burst are different") } + if obj.Reset != meta.Reset { + return fmt.Errorf("values for Reset are different") + } if err := util.SortedStrSliceCompare(obj.Sema, meta.Sema); err != nil { return errwrap.Wrapf(err, "values for Sema are different") @@ -182,6 +195,7 @@ func (obj *MetaParams) Copy() *MetaParams { Poll: obj.Poll, Limit: obj.Limit, // FIXME: can we copy this type like this? test me! Burst: obj.Burst, + Reset: obj.Reset, Sema: sema, Rewatch: obj.Rewatch, Realize: obj.Realize, @@ -202,3 +216,15 @@ func (obj *MetaParams) UnmarshalYAML(unmarshal func(interface{}) error) error { *obj = MetaParams(raw) // restore from indirection with type conversion! return nil } + +// MetaState is some local meta param state that is saved between resources of +// the same kind+name unique ID. Even though the vertex/resource pointer might +// change during a graph switch (because the params changed) we might be +// logically referring to the same resource, and we might want to preserve some +// data across that switch. The common example is the resource retry count. If a +// resource failed three times, and we had a limit of five before it would be a +// permanent failure, then we don't want to reset this counter just because we +// changed a parameter (field) of the resource. This doesn't mean we don't want +// to ever reset these counts. For that, flip on the reset meta param. +type MetaState struct { +} diff --git a/engine/resources.go b/engine/resources.go index 0d56b7c5..de818969 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -220,6 +220,16 @@ func Stringer(res Res) string { return Repr(res.Kind(), res.Name()) } +// ResPtrUID is a unique identifier that is consistent for the kind and name of +// the resource only. +type ResPtrUID string + +// PtrUID generates a ResPtrUID from a resource. +func PtrUID(res Res) ResPtrUID { + // the use of "repr" is kind of arbitrary as long as it's unique + return ResPtrUID(Repr(res.Kind(), res.Name())) +} + // Validate validates a resource by checking multiple aspects. This is the main // entry point for running all the validation steps on a resource. func Validate(res Res) error { diff --git a/engine/resources/resources_test.go b/engine/resources/resources_test.go index 70b5062a..036e2f5a 100644 --- a/engine/resources/resources_test.go +++ b/engine/resources/resources_test.go @@ -1749,3 +1749,20 @@ func TestResources2(t *testing.T) { }) } } + +func TestResPtrUID1(t *testing.T) { + t1, err := engine.NewNamedResource("test", "test1") + if err != nil { + t.Errorf("could not build resource: %+v", err) + return + } + t2, err := engine.NewNamedResource("test", "test1") + if err != nil { + t.Errorf("could not build resource: %+v", err) + return + } + + if uid1, uid2 := engine.PtrUID(t1), engine.PtrUID(t2); uid1 != uid2 { + t.Errorf("uid's don't match") + } +} diff --git a/lang/ast/structs.go b/lang/ast/structs.go index ea4f89df..a367b369 100644 --- a/lang/ast/structs.go +++ b/lang/ast/structs.go @@ -939,6 +939,9 @@ func (obj *StmtRes) metaparams(res engine.Res) error { // TODO: check that it doesn't overflow meta.Burst = int(x) + case "reset": + meta.Reset = v.Bool() // must not panic + case "sema": // []string values := []string{} for _, x := range v.List() { // must not panic @@ -1000,6 +1003,9 @@ func (obj *StmtRes) metaparams(res engine.Res) error { // TODO: check that it doesn't overflow meta.Burst = int(x) } + if val, exists := v.Struct()["reset"]; exists { + meta.Reset = val.Bool() // must not panic + } if val, exists := v.Struct()["sema"]; exists { values := []string{} for _, x := range val.List() { // must not panic @@ -1594,6 +1600,7 @@ func (obj *StmtResMeta) Init(data *interfaces.Data) error { case "poll": case "limit": case "burst": + case "reset": case "sema": case "rewatch": case "realize": @@ -1792,6 +1799,9 @@ func (obj *StmtResMeta) Unify(kind string) ([]interfaces.Invariant, error) { case "burst": invar = static(types.TypeInt) + case "reset": + invar = static(types.TypeBool) + case "sema": invar = static(types.NewType("[]str")) @@ -1827,7 +1837,7 @@ func (obj *StmtResMeta) Unify(kind string) ([]interfaces.Invariant, error) { // FIXME: allow partial subsets of this struct, and in any order // FIXME: we might need an updated unification engine to do this wrap := func(reverse *types.Type) *types.Type { - return types.NewType(fmt.Sprintf("struct{noop bool; retry int; delay int; poll int; limit float; burst int; sema []str; rewatch bool; realize bool; reverse %s; autoedge bool; autogroup bool}", reverse.String())) + return types.NewType(fmt.Sprintf("struct{noop bool; retry int; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; reverse %s; autoedge bool; autogroup bool}", reverse.String())) } ors := []interfaces.Invariant{} invarBool := static(wrap(types.TypeBool)) diff --git a/lang/interpret_test/TestAstFunc1/resdupefields0.txtar b/lang/interpret_test/TestAstFunc1/resdupefields0.txtar index 14388ddb..9baf447c 100644 --- a/lang/interpret_test/TestAstFunc1/resdupefields0.txtar +++ b/lang/interpret_test/TestAstFunc1/resdupefields0.txtar @@ -9,6 +9,7 @@ test "test" { poll => 5, limit => 4.2, burst => 3, + reset => false, sema => ["foo:1", "bar:3",], rewatch => false, realize => true,