engine: graph: Add a lock around metas access
We forgot about the concurrent writes. This should fix that.
This commit is contained in:
@@ -260,11 +260,14 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// initialize or reinitialize the meta state for this resource uid
|
// initialize or reinitialize the meta state for this resource uid
|
||||||
|
obj.mlock.Lock()
|
||||||
if _, exists := obj.metas[engine.PtrUID(res)]; !exists || res.MetaParams().Reset {
|
if _, exists := obj.metas[engine.PtrUID(res)]; !exists || res.MetaParams().Reset {
|
||||||
obj.metas[engine.PtrUID(res)] = &engine.MetaState{
|
obj.metas[engine.PtrUID(res)] = &engine.MetaState{
|
||||||
CheckApplyRetry: res.MetaParams().Retry, // lookup the retry value
|
CheckApplyRetry: res.MetaParams().Retry, // lookup the retry value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
metas := obj.metas[engine.PtrUID(res)] // handle
|
||||||
|
obj.mlock.Unlock()
|
||||||
|
|
||||||
//defer close(obj.state[vertex].stopped) // done signal
|
//defer close(obj.state[vertex].stopped) // done signal
|
||||||
|
|
||||||
@@ -579,7 +582,7 @@ Loop:
|
|||||||
obj.Logf("Process(%s): Return(%s)", vertex, engineUtil.CleanError(err))
|
obj.Logf("Process(%s): Return(%s)", vertex, engineUtil.CleanError(err))
|
||||||
}
|
}
|
||||||
if err == nil && !backPoke && res.MetaParams().RetryReset { // reset it on success!
|
if err == nil && !backPoke && res.MetaParams().RetryReset { // reset it on success!
|
||||||
obj.metas[engine.PtrUID(res)].CheckApplyRetry = res.MetaParams().Retry // lookup the retry value
|
metas.CheckApplyRetry = res.MetaParams().Retry // lookup the retry value
|
||||||
}
|
}
|
||||||
if err == nil || backPoke {
|
if err == nil || backPoke {
|
||||||
break RetryLoop
|
break RetryLoop
|
||||||
@@ -587,19 +590,19 @@ Loop:
|
|||||||
// we've got an error...
|
// we've got an error...
|
||||||
delay = res.MetaParams().Delay
|
delay = res.MetaParams().Delay
|
||||||
|
|
||||||
if obj.metas[engine.PtrUID(res)].CheckApplyRetry < 0 { // infinite retries
|
if metas.CheckApplyRetry < 0 { // infinite retries
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if obj.metas[engine.PtrUID(res)].CheckApplyRetry > 0 { // don't decrement past 0
|
if metas.CheckApplyRetry > 0 { // don't decrement past 0
|
||||||
obj.metas[engine.PtrUID(res)].CheckApplyRetry--
|
metas.CheckApplyRetry--
|
||||||
obj.state[vertex].init.Logf(
|
obj.state[vertex].init.Logf(
|
||||||
"retrying CheckApply after %.4f seconds (%d left)",
|
"retrying CheckApply after %.4f seconds (%d left)",
|
||||||
float64(delay)/1000,
|
float64(delay)/1000,
|
||||||
obj.metas[engine.PtrUID(res)].CheckApplyRetry,
|
metas.CheckApplyRetry,
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
//if obj.metas[engine.PtrUID(res)].CheckApplyRetry == 0 { // optional
|
//if metas.CheckApplyRetry == 0 { // optional
|
||||||
// err = errwrap.Wrapf(err, "permanent process error")
|
// err = errwrap.Wrapf(err, "permanent process error")
|
||||||
//}
|
//}
|
||||||
|
|
||||||
|
|||||||
@@ -61,6 +61,7 @@ type Engine struct {
|
|||||||
waits map[pgraph.Vertex]*sync.WaitGroup // wg for the Worker func
|
waits map[pgraph.Vertex]*sync.WaitGroup // wg for the Worker func
|
||||||
wlock *sync.Mutex // lock around waits map
|
wlock *sync.Mutex // lock around waits map
|
||||||
|
|
||||||
|
mlock *sync.Mutex // metas lock
|
||||||
metas map[engine.ResPtrUID]*engine.MetaState // meta state
|
metas map[engine.ResPtrUID]*engine.MetaState // meta state
|
||||||
|
|
||||||
slock *sync.Mutex // semaphore lock
|
slock *sync.Mutex // semaphore lock
|
||||||
@@ -99,6 +100,7 @@ func (obj *Engine) Init() error {
|
|||||||
obj.waits = make(map[pgraph.Vertex]*sync.WaitGroup)
|
obj.waits = make(map[pgraph.Vertex]*sync.WaitGroup)
|
||||||
obj.wlock = &sync.Mutex{}
|
obj.wlock = &sync.Mutex{}
|
||||||
|
|
||||||
|
obj.mlock = &sync.Mutex{}
|
||||||
obj.metas = make(map[engine.ResPtrUID]*engine.MetaState)
|
obj.metas = make(map[engine.ResPtrUID]*engine.MetaState)
|
||||||
|
|
||||||
obj.slock = &sync.Mutex{}
|
obj.slock = &sync.Mutex{}
|
||||||
@@ -345,6 +347,7 @@ func (obj *Engine) Commit() error {
|
|||||||
// that vertexRemoveFn runs before vertexAddFn, but GraphSync guarantees
|
// that vertexRemoveFn runs before vertexAddFn, but GraphSync guarantees
|
||||||
// that, and it would be kind of illogical to not run things that way.
|
// that, and it would be kind of illogical to not run things that way.
|
||||||
metaGC := make(map[engine.ResPtrUID]struct{}) // which metas should we garbage collect?
|
metaGC := make(map[engine.ResPtrUID]struct{}) // which metas should we garbage collect?
|
||||||
|
obj.mlock.Lock()
|
||||||
for ptrUID := range obj.metas {
|
for ptrUID := range obj.metas {
|
||||||
if _, exists := activeMetas[ptrUID]; !exists {
|
if _, exists := activeMetas[ptrUID]; !exists {
|
||||||
metaGC[ptrUID] = struct{}{}
|
metaGC[ptrUID] = struct{}{}
|
||||||
@@ -353,6 +356,7 @@ func (obj *Engine) Commit() error {
|
|||||||
for ptrUID := range metaGC {
|
for ptrUID := range metaGC {
|
||||||
delete(obj.metas, ptrUID) // otherwise, this could grow forever
|
delete(obj.metas, ptrUID) // otherwise, this could grow forever
|
||||||
}
|
}
|
||||||
|
obj.mlock.Unlock()
|
||||||
|
|
||||||
// We run these afterwards, so that we don't unnecessarily start anyone
|
// We run these afterwards, so that we don't unnecessarily start anyone
|
||||||
// if GraphSync failed in some way. Otherwise we'd have to do clean up!
|
// if GraphSync failed in some way. Otherwise we'd have to do clean up!
|
||||||
|
|||||||
Reference in New Issue
Block a user