resources: exec: Avoid possible deadlock race
Some of the early code I wrote probably wouldn't pass my own reviews today. Here's one example of a rare deadlock that could sometimes occur when a Process/CheckApply caused a shutdown, but the bufio tried to send on a channel that nobody was going to read any more. Now we properly unblock that send with a context.
This commit is contained in:
@@ -20,6 +20,7 @@ package resources
|
|||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"os/user"
|
"os/user"
|
||||||
@@ -154,7 +155,9 @@ func (obj *ExecRes) Watch() error {
|
|||||||
return errwrap.Wrapf(err, "error starting Cmd")
|
return errwrap.Wrapf(err, "error starting Cmd")
|
||||||
}
|
}
|
||||||
|
|
||||||
ioChan = obj.bufioChanScanner(scanner)
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel() // unblock and cleanup
|
||||||
|
ioChan = obj.bufioChanScanner(ctx, scanner)
|
||||||
}
|
}
|
||||||
|
|
||||||
obj.init.Running() // when started, notify engine that we're running
|
obj.init.Running() // when started, notify engine that we're running
|
||||||
@@ -536,18 +539,26 @@ type bufioOutput struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// bufioChanScanner wraps the scanner output in a channel.
|
// bufioChanScanner wraps the scanner output in a channel.
|
||||||
func (obj *ExecRes) bufioChanScanner(scanner *bufio.Scanner) chan *bufioOutput {
|
func (obj *ExecRes) bufioChanScanner(ctx context.Context, scanner *bufio.Scanner) chan *bufioOutput {
|
||||||
ch := make(chan *bufioOutput)
|
ch := make(chan *bufioOutput)
|
||||||
obj.wg.Add(1)
|
obj.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer obj.wg.Done()
|
defer obj.wg.Done()
|
||||||
defer close(ch)
|
defer close(ch)
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
ch <- &bufioOutput{text: scanner.Text()} // blocks here ?
|
select {
|
||||||
|
case ch <- &bufioOutput{text: scanner.Text()}: // blocks here ?
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// on EOF, scanner.Err() will be nil
|
// on EOF, scanner.Err() will be nil
|
||||||
if err := scanner.Err(); err != nil {
|
if err := scanner.Err(); err != nil {
|
||||||
ch <- &bufioOutput{err: err} // send any misc errors we encounter
|
select {
|
||||||
|
case ch <- &bufioOutput{err: err}: // send any misc errors we encounter
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return ch
|
return ch
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
# TODO: should we return a different exit code if the resources fail?
|
# TODO: should we return a different exit code if the resources fail?
|
||||||
# TODO: should we be converged if one of the resources has permanently failed?
|
# TODO: should we be converged if one of the resources has permanently failed?
|
||||||
$TIMEOUT "$MGMT" run --converged-timeout=15 --no-watch --no-pgp --tmp-prefix yaml --yaml exec-fail.yaml &
|
$TIMEOUT "$MGMT" run --converged-timeout=15 --no-watch --no-pgp --tmp-prefix yaml --yaml exec-fail.yaml &
|
||||||
|
# there's no ^C sent... it should shutdown on its own
|
||||||
pid=$!
|
pid=$!
|
||||||
wait $pid # get exit status
|
wait $pid # get exit status
|
||||||
exit $?
|
exit $?
|
||||||
|
|||||||
Reference in New Issue
Block a user