engine: Retry should be stateful and add RetryReset

Make the retry meta param a bit more sane now that we can persist it
between graph switches. This also unblocks us from pausing during retry
loops.
This commit is contained in:
James Shubin
2023-09-01 21:11:27 -04:00
parent 9545e409d4
commit f9bc50e262
4 changed files with 39 additions and 9 deletions

View File

@@ -258,7 +258,9 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
// initialize or reinitialize the meta state for this resource uid
if _, exists := obj.metas[engine.PtrUID(res)]; !exists || res.MetaParams().Reset {
obj.metas[engine.PtrUID(res)] = &engine.MetaState{}
obj.metas[engine.PtrUID(res)] = &engine.MetaState{
CheckApplyRetry: res.MetaParams().Retry, // lookup the retry value
}
}
//defer close(obj.state[vertex].stopped) // done signal
@@ -492,7 +494,7 @@ Loop:
// retry...
var err error
var retry = res.MetaParams().Retry // lookup the retry value
//var retry = res.MetaParams().Retry // lookup the retry value
var delay uint64
RetryLoop:
for { // retry loop
@@ -542,21 +544,28 @@ Loop:
if obj.Debug {
obj.Logf("Process(%s): Return(%s)", vertex, engineUtil.CleanError(err))
}
if err == nil && res.MetaParams().RetryReset { // reset it on success!
obj.metas[engine.PtrUID(res)].CheckApplyRetry = res.MetaParams().Retry // lookup the retry value
}
if err == nil {
break RetryLoop
}
// we've got an error...
delay = res.MetaParams().Delay
if retry < 0 { // infinite retries
if obj.metas[engine.PtrUID(res)].CheckApplyRetry < 0 { // infinite retries
continue
}
if retry > 0 { // don't decrement past 0
retry--
obj.state[vertex].init.Logf("retrying CheckApply after %.4f seconds (%d left)", float64(delay)/1000, retry)
if obj.metas[engine.PtrUID(res)].CheckApplyRetry > 0 { // don't decrement past 0
obj.metas[engine.PtrUID(res)].CheckApplyRetry--
obj.state[vertex].init.Logf(
"retrying CheckApply after %.4f seconds (%d left)",
float64(delay)/1000,
obj.metas[engine.PtrUID(res)].CheckApplyRetry,
)
continue
}
//if retry == 0 { // optional
//if obj.metas[engine.PtrUID(res)].CheckApplyRetry == 0 { // optional
// err = errwrap.Wrapf(err, "permanent process error")
//}

View File

@@ -65,9 +65,16 @@ type MetaParams struct {
// reason to want to do something differently for the Watch errors.
// Retry is the number of times to retry on error. Use -1 for infinite.
// This value is used for both Watch and CheckApply.
Retry int16 `yaml:"retry"`
// Delay is the number of milliseconds to wait between retries.
// RetryReset resets the retry count for CheckApply if it succeeds. This
// value is currently different from the count used for Watch.
// TODO: Consider resetting retry count for watch if it sends an event?
RetryReset bool `yaml:"retryreset"`
// Delay is the number of milliseconds to wait between retries. This
// value is used for both Watch and CheckApply.
Delay uint64 `yaml:"delay"`
// Poll is the number of seconds between poll intervals. Use 0 to Watch.
@@ -227,4 +234,7 @@ func (obj *MetaParams) UnmarshalYAML(unmarshal func(interface{}) error) error {
// changed a parameter (field) of the resource. This doesn't mean we don't want
// to ever reset these counts. For that, flip on the reset meta param.
type MetaState struct {
// CheckApplyRetry is the current retry count for CheckApply.
CheckApplyRetry int16
}

View File

@@ -920,6 +920,9 @@ func (obj *StmtRes) metaparams(res engine.Res) error {
// TODO: check that it doesn't overflow
meta.Retry = int16(x)
case "retryreset":
meta.RetryReset = v.Bool() // must not panic
case "delay":
x := v.Int() // must not panic
// TODO: check that it isn't signed
@@ -984,6 +987,9 @@ func (obj *StmtRes) metaparams(res engine.Res) error {
// TODO: check that it doesn't overflow
meta.Retry = int16(x)
}
if val, exists := v.Struct()["retryreset"]; exists {
meta.RetryReset = val.Bool() // must not panic
}
if val, exists := v.Struct()["delay"]; exists {
x := val.Int() // must not panic
// TODO: check that it isn't signed
@@ -1596,6 +1602,7 @@ func (obj *StmtResMeta) Init(data *interfaces.Data) error {
// TODO: we could add these fields dynamically if we were fancy!
case "noop":
case "retry":
case "retryreset":
case "delay":
case "poll":
case "limit":
@@ -1787,6 +1794,9 @@ func (obj *StmtResMeta) Unify(kind string) ([]interfaces.Invariant, error) {
case "retry":
invar = static(types.TypeInt)
case "retryreset":
invar = static(types.TypeBool)
case "delay":
invar = static(types.TypeInt)
@@ -1837,7 +1847,7 @@ func (obj *StmtResMeta) Unify(kind string) ([]interfaces.Invariant, error) {
// FIXME: allow partial subsets of this struct, and in any order
// FIXME: we might need an updated unification engine to do this
wrap := func(reverse *types.Type) *types.Type {
return types.NewType(fmt.Sprintf("struct{noop bool; retry int; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; reverse %s; autoedge bool; autogroup bool}", reverse.String()))
return types.NewType(fmt.Sprintf("struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; reverse %s; autoedge bool; autogroup bool}", reverse.String()))
}
ors := []interfaces.Invariant{}
invarBool := static(wrap(types.TypeBool))

View File

@@ -5,6 +5,7 @@ test "test" {
Meta => true ?: struct{
noop => false,
retry => -1,
retryreset => false,
delay => 0,
poll => 5,
limit => 4.2,