diff --git a/engine/graph/engine.go b/engine/graph/engine.go index c37a1b31..bb374b35 100644 --- a/engine/graph/engine.go +++ b/engine/graph/engine.go @@ -48,6 +48,7 @@ type Engine struct { nextGraph *pgraph.Graph state map[pgraph.Vertex]*State waits map[pgraph.Vertex]*sync.WaitGroup // wg for the Worker func + wlock *sync.Mutex // lock around waits map slock *sync.Mutex // semaphore lock semas map[string]*semaphore.Semaphore @@ -83,6 +84,7 @@ func (obj *Engine) Init() error { obj.state = make(map[pgraph.Vertex]*State) obj.waits = make(map[pgraph.Vertex]*sync.WaitGroup) + obj.wlock = &sync.Mutex{} obj.slock = &sync.Mutex{} obj.semas = make(map[string]*semaphore.Semaphore) @@ -204,10 +206,19 @@ func (obj *Engine) Commit() error { fn := func() error { // start the Worker obj.wg.Add(1) + obj.wlock.Lock() obj.waits[vertex].Add(1) + obj.wlock.Unlock() go func(v pgraph.Vertex) { defer obj.wg.Done() - defer obj.waits[v].Done() + defer func() { + // we need this lock, because this go + // routine could run when the next fn + // function above here is running... + obj.wlock.Lock() + obj.waits[v].Done() + obj.wlock.Unlock() + }() obj.Logf("Worker(%s)", v) // contains the Watch and CheckApply loops