diff --git a/engine/graph/actions.go b/engine/graph/actions.go index ce80c4fb..9ddf646d 100644 --- a/engine/graph/actions.go +++ b/engine/graph/actions.go @@ -260,11 +260,14 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { } // initialize or reinitialize the meta state for this resource uid + obj.mlock.Lock() if _, exists := obj.metas[engine.PtrUID(res)]; !exists || res.MetaParams().Reset { obj.metas[engine.PtrUID(res)] = &engine.MetaState{ CheckApplyRetry: res.MetaParams().Retry, // lookup the retry value } } + metas := obj.metas[engine.PtrUID(res)] // handle + obj.mlock.Unlock() //defer close(obj.state[vertex].stopped) // done signal @@ -579,7 +582,7 @@ Loop: obj.Logf("Process(%s): Return(%s)", vertex, engineUtil.CleanError(err)) } if err == nil && !backPoke && res.MetaParams().RetryReset { // reset it on success! - obj.metas[engine.PtrUID(res)].CheckApplyRetry = res.MetaParams().Retry // lookup the retry value + metas.CheckApplyRetry = res.MetaParams().Retry // lookup the retry value } if err == nil || backPoke { break RetryLoop @@ -587,19 +590,19 @@ Loop: // we've got an error... delay = res.MetaParams().Delay - if obj.metas[engine.PtrUID(res)].CheckApplyRetry < 0 { // infinite retries + if metas.CheckApplyRetry < 0 { // infinite retries continue } - if obj.metas[engine.PtrUID(res)].CheckApplyRetry > 0 { // don't decrement past 0 - obj.metas[engine.PtrUID(res)].CheckApplyRetry-- + if metas.CheckApplyRetry > 0 { // don't decrement past 0 + metas.CheckApplyRetry-- obj.state[vertex].init.Logf( "retrying CheckApply after %.4f seconds (%d left)", float64(delay)/1000, - obj.metas[engine.PtrUID(res)].CheckApplyRetry, + metas.CheckApplyRetry, ) continue } - //if obj.metas[engine.PtrUID(res)].CheckApplyRetry == 0 { // optional + //if metas.CheckApplyRetry == 0 { // optional // err = errwrap.Wrapf(err, "permanent process error") //} diff --git a/engine/graph/engine.go b/engine/graph/engine.go index b2e307f5..a3247a2b 100644 --- a/engine/graph/engine.go +++ b/engine/graph/engine.go @@ -61,6 +61,7 @@ type Engine struct { waits map[pgraph.Vertex]*sync.WaitGroup // wg for the Worker func wlock *sync.Mutex // lock around waits map + mlock *sync.Mutex // metas lock metas map[engine.ResPtrUID]*engine.MetaState // meta state slock *sync.Mutex // semaphore lock @@ -99,6 +100,7 @@ func (obj *Engine) Init() error { obj.waits = make(map[pgraph.Vertex]*sync.WaitGroup) obj.wlock = &sync.Mutex{} + obj.mlock = &sync.Mutex{} obj.metas = make(map[engine.ResPtrUID]*engine.MetaState) obj.slock = &sync.Mutex{} @@ -345,6 +347,7 @@ func (obj *Engine) Commit() error { // that vertexRemoveFn runs before vertexAddFn, but GraphSync guarantees // that, and it would be kind of illogical to not run things that way. metaGC := make(map[engine.ResPtrUID]struct{}) // which metas should we garbage collect? + obj.mlock.Lock() for ptrUID := range obj.metas { if _, exists := activeMetas[ptrUID]; !exists { metaGC[ptrUID] = struct{}{} @@ -353,6 +356,7 @@ func (obj *Engine) Commit() error { for ptrUID := range metaGC { delete(obj.metas, ptrUID) // otherwise, this could grow forever } + obj.mlock.Unlock() // We run these afterwards, so that we don't unnecessarily start anyone // if GraphSync failed in some way. Otherwise we'd have to do clean up!