fix a race crash when building with "ADD some-broken.tar.xz ..."

The race is between pools.Put which calls buf.Reset and exec.Cmd
doing io.Copy from the buffer; it caused a runtime crash, as
described in #16924:

``` docker-daemon cat the-tarball.xz | xz -d -c -q | docker-untar /path/to/... (aufs ) ```

When docker-untar side fails (like try to set xattr on aufs, or a broken
tar), invokeUnpack will be responsible to exhaust all input, otherwise
`xz` will be write pending for ever.

this change add a receive only channel to cmdStream, and will close it
to notify it's now safe to close the input stream;

in CmdStream the change to use Stdin / Stdout / Stderr keeps the
code simple, os/exec.Cmd will spawn goroutines and call io.Copy automatically.

the CmdStream is actually called in the same file only, change it
lowercase to mark as private.

[...]
INFO[0000] Docker daemon                                 commit=0a8c2e3 execdriver=native-0.2 graphdriver=aufs version=1.8.2

DEBU[0006] Calling POST /build
INFO[0006] POST /v1.20/build?cgroupparent=&cpuperiod=0&cpuquota=0&cpusetcpus=&cpusetmems=&cpushares=0&dockerfile=Dockerfile&memory=0&memswap=0&rm=1&t=gentoo-x32&ulimits=null
DEBU[0008] [BUILDER] Cache miss
DEBU[0009] Couldn't untar /home/lib-docker-v1.8.2-tmp/tmp/docker-build316710953/stage3-x32-20151004.tar.xz to /home/lib-docker-v1.8.2-tmp/aufs/mnt/d909abb87150463939c13e8a349b889a72d9b14f0cfcab42a8711979be285537: Untar re-exec error: exit status 1: output: operation not supported
DEBU[0009] CopyFileWithTar(/home/lib-docker-v1.8.2-tmp/tmp/docker-build316710953/stage3-x32-20151004.tar.xz, /home/lib-docker-v1.8.2-tmp/aufs/mnt/d909abb87150463939c13e8a349b889a72d9b14f0cfcab42a8711979be285537/)
panic: runtime error: slice bounds out of range

goroutine 42 [running]:
bufio.(*Reader).fill(0xc208187800)
    /usr/local/go/src/bufio/bufio.go:86 +0x2db
bufio.(*Reader).WriteTo(0xc208187800, 0x7ff39602d150, 0xc2083f11a0, 0x508000, 0x0, 0x0)
    /usr/local/go/src/bufio/bufio.go:449 +0x27e
io.Copy(0x7ff39602d150, 0xc2083f11a0, 0x7ff3960261f8, 0xc208187800, 0x0, 0x0, 0x0)
    /usr/local/go/src/io/io.go:354 +0xb2
github.com/docker/docker/pkg/archive.func·006()
    /go/src/github.com/docker/docker/pkg/archive/archive.go:817 +0x71
created by github.com/docker/docker/pkg/archive.CmdStream
    /go/src/github.com/docker/docker/pkg/archive/archive.go:819 +0x1ec

goroutine 1 [chan receive]:
main.(*DaemonCli).CmdDaemon(0xc20809da30, 0xc20800a020, 0xd, 0xd, 0x0, 0x0)
    /go/src/github.com/docker/docker/docker/daemon.go:289 +0x1781
reflect.callMethod(0xc208140090, 0xc20828fce0)
    /usr/local/go/src/reflect/value.go:605 +0x179
reflect.methodValueCall(0xc20800a020, 0xd, 0xd, 0x1, 0xc208140090, 0x0, 0x0, 0xc208140090, 0x0, 0x45343f, ...)
    /usr/local/go/src/reflect/asm_amd64.s:29 +0x36
github.com/docker/docker/cli.(*Cli).Run(0xc208129fb0, 0xc20800a010, 0xe, 0xe, 0x0, 0x0)
    /go/src/github.com/docker/docker/cli/cli.go:89 +0x38e
main.main()
    /go/src/github.com/docker/docker/docker/docker.go:69 +0x428

goroutine 5 [syscall]:
os/signal.loop()
    /usr/local/go/src/os/signal/signal_unix.go:21 +0x1f
created by os/signal.init·1
    /usr/local/go/src/os/signal/signal_unix.go:27 +0x35

Signed-off-by: Derek Ch <denc716@gmail.com>
This commit is contained in:
Derek Ch 2015-10-10 03:24:21 +07:30 committed by Derek
parent 831c2e7a7b
commit c45a95b9c0
3 changed files with 34 additions and 48 deletions

View file

@ -20,6 +20,7 @@ import (
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/fileutils" "github.com/docker/docker/pkg/fileutils"
"github.com/docker/docker/pkg/idtools" "github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/pools" "github.com/docker/docker/pkg/pools"
"github.com/docker/docker/pkg/promise" "github.com/docker/docker/pkg/promise"
"github.com/docker/docker/pkg/system" "github.com/docker/docker/pkg/system"
@ -116,10 +117,10 @@ func DetectCompression(source []byte) Compression {
return Uncompressed return Uncompressed
} }
func xzDecompress(archive io.Reader) (io.ReadCloser, error) { func xzDecompress(archive io.Reader) (io.ReadCloser, <-chan struct{}, error) {
args := []string{"xz", "-d", "-c", "-q"} args := []string{"xz", "-d", "-c", "-q"}
return CmdStream(exec.Command(args[0], args[1:]...), archive) return cmdStream(exec.Command(args[0], args[1:]...), archive)
} }
// DecompressStream decompress the archive and returns a ReaderCloser with the decompressed archive. // DecompressStream decompress the archive and returns a ReaderCloser with the decompressed archive.
@ -148,12 +149,15 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
readBufWrapper := p.NewReadCloserWrapper(buf, bz2Reader) readBufWrapper := p.NewReadCloserWrapper(buf, bz2Reader)
return readBufWrapper, nil return readBufWrapper, nil
case Xz: case Xz:
xzReader, err := xzDecompress(buf) xzReader, chdone, err := xzDecompress(buf)
if err != nil { if err != nil {
return nil, err return nil, err
} }
readBufWrapper := p.NewReadCloserWrapper(buf, xzReader) readBufWrapper := p.NewReadCloserWrapper(buf, xzReader)
return readBufWrapper, nil return ioutils.NewReadCloserWrapper(readBufWrapper, func() error {
<-chdone
return readBufWrapper.Close()
}), nil
default: default:
return nil, fmt.Errorf("Unsupported compression format %s", (&compression).Extension()) return nil, fmt.Errorf("Unsupported compression format %s", (&compression).Extension())
} }
@ -925,57 +929,33 @@ func CopyFileWithTar(src, dst string) (err error) {
return defaultArchiver.CopyFileWithTar(src, dst) return defaultArchiver.CopyFileWithTar(src, dst)
} }
// CmdStream executes a command, and returns its stdout as a stream. // cmdStream executes a command, and returns its stdout as a stream.
// If the command fails to run or doesn't complete successfully, an error // If the command fails to run or doesn't complete successfully, an error
// will be returned, including anything written on stderr. // will be returned, including anything written on stderr.
func CmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, error) { func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{}, error) {
if input != nil { chdone := make(chan struct{})
stdin, err := cmd.StdinPipe() cmd.Stdin = input
if err != nil {
return nil, err
}
// Write stdin if any
go func() {
io.Copy(stdin, input)
stdin.Close()
}()
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
pipeR, pipeW := io.Pipe() pipeR, pipeW := io.Pipe()
errChan := make(chan []byte) cmd.Stdout = pipeW
// Collect stderr, we will use it in case of an error var errBuf bytes.Buffer
go func() { cmd.Stderr = &errBuf
errText, e := ioutil.ReadAll(stderr)
if e != nil { // Run the command and return the pipe
errText = []byte("(...couldn't fetch stderr: " + e.Error() + ")") if err := cmd.Start(); err != nil {
} return nil, nil, err
errChan <- errText }
}()
// Copy stdout to the returned pipe // Copy stdout to the returned pipe
go func() { go func() {
_, err := io.Copy(pipeW, stdout)
if err != nil {
pipeW.CloseWithError(err)
}
errText := <-errChan
if err := cmd.Wait(); err != nil { if err := cmd.Wait(); err != nil {
pipeW.CloseWithError(fmt.Errorf("%s: %s", err, errText)) pipeW.CloseWithError(fmt.Errorf("%s: %s", err, errBuf.String()))
} else { } else {
pipeW.Close() pipeW.Close()
} }
close(chdone)
}() }()
// Run the command and return the pipe
if err := cmd.Start(); err != nil { return pipeR, chdone, nil
return nil, err
}
return pipeR, nil
} }
// NewTempArchive reads the content of src into a temporary file, and returns the contents // NewTempArchive reads the content of src into a temporary file, and returns the contents

View file

@ -160,7 +160,7 @@ func TestExtensionXz(t *testing.T) {
func TestCmdStreamLargeStderr(t *testing.T) { func TestCmdStreamLargeStderr(t *testing.T) {
cmd := exec.Command("/bin/sh", "-c", "dd if=/dev/zero bs=1k count=1000 of=/dev/stderr; echo hello") cmd := exec.Command("/bin/sh", "-c", "dd if=/dev/zero bs=1k count=1000 of=/dev/stderr; echo hello")
out, err := CmdStream(cmd, nil) out, _, err := cmdStream(cmd, nil)
if err != nil { if err != nil {
t.Fatalf("Failed to start command: %s", err) t.Fatalf("Failed to start command: %s", err)
} }
@ -181,7 +181,7 @@ func TestCmdStreamLargeStderr(t *testing.T) {
func TestCmdStreamBad(t *testing.T) { func TestCmdStreamBad(t *testing.T) {
badCmd := exec.Command("/bin/sh", "-c", "echo hello; echo >&2 error couldn\\'t reverse the phase pulser; exit 1") badCmd := exec.Command("/bin/sh", "-c", "echo hello; echo >&2 error couldn\\'t reverse the phase pulser; exit 1")
out, err := CmdStream(badCmd, nil) out, _, err := cmdStream(badCmd, nil)
if err != nil { if err != nil {
t.Fatalf("Failed to start command: %s", err) t.Fatalf("Failed to start command: %s", err)
} }
@ -196,7 +196,7 @@ func TestCmdStreamBad(t *testing.T) {
func TestCmdStreamGood(t *testing.T) { func TestCmdStreamGood(t *testing.T) {
cmd := exec.Command("/bin/sh", "-c", "echo hello; exit 0") cmd := exec.Command("/bin/sh", "-c", "echo hello; exit 0")
out, err := CmdStream(cmd, nil) out, _, err := cmdStream(cmd, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View file

@ -8,6 +8,7 @@ import (
"flag" "flag"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"os" "os"
"runtime" "runtime"
"syscall" "syscall"
@ -79,6 +80,11 @@ func invokeUnpack(decompressedArchive io.Reader, dest string, options *archive.T
w.Close() w.Close()
if err := cmd.Wait(); err != nil { if err := cmd.Wait(); err != nil {
// when `xz -d -c -q | docker-untar ...` failed on docker-untar side,
// we need to exhaust `xz`'s output, otherwise the `xz` side will be
// pending on write pipe forever
io.Copy(ioutil.Discard, decompressedArchive)
return fmt.Errorf("Untar re-exec error: %v: output: %s", err, output) return fmt.Errorf("Untar re-exec error: %v: output: %s", err, output)
} }
return nil return nil