diff --git a/oci/container.go b/oci/container.go index 2e760762..42885db4 100644 --- a/oci/container.go +++ b/oci/container.go @@ -50,6 +50,7 @@ type ContainerState struct { Finished time.Time `json:"finished,omitempty"` ExitCode int32 `json:"exitCode,omitempty"` OOMKilled bool `json:"oomKilled,omitempty"` + Error string `json:"error,omitempty"` } // NewContainer creates a container object. diff --git a/oci/oci.go b/oci/oci.go index bc64b34c..b1091601 100644 --- a/oci/oci.go +++ b/oci/oci.go @@ -545,6 +545,15 @@ func (r *Runtime) DeleteContainer(c *Container) error { return err } +// SetStartFailed sets the container state appropriately after a start failure +func (r *Runtime) SetStartFailed(c *Container, err error) { + c.opLock.Lock() + defer c.opLock.Unlock() + // adjust finished and started times + c.state.Finished, c.state.Started = c.state.Created, c.state.Created + c.state.Error = err.Error() +} + // UpdateStatus refreshes the status of the container. func (r *Runtime) UpdateStatus(c *Container) error { c.opLock.Lock() diff --git a/server/container_attach.go b/server/container_attach.go index 9d96a56a..81c6cc3c 100644 --- a/server/container_attach.go +++ b/server/container_attach.go @@ -10,6 +10,7 @@ import ( "syscall" "github.com/Sirupsen/logrus" + "github.com/docker/docker/pkg/pools" "github.com/kubernetes-incubator/cri-o/oci" "github.com/kubernetes-incubator/cri-o/utils" "golang.org/x/net/context" @@ -108,7 +109,7 @@ func redirectResponseToOutputStream(tty bool, outputStream, errorStream io.Write } var err error if tty { - _, err = io.Copy(outputStream, conn) + _, err = pools.Copy(outputStream, conn) } else { // TODO } diff --git a/server/container_exec.go b/server/container_exec.go index e6811d72..2dbab839 100644 --- a/server/container_exec.go +++ b/server/container_exec.go @@ -7,6 +7,7 @@ import ( "os/exec" "github.com/Sirupsen/logrus" + "github.com/docker/docker/pkg/pools" "github.com/kubernetes-incubator/cri-o/oci" "golang.org/x/net/context" pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" @@ -67,11 +68,11 @@ func (ss streamService) Exec(containerID string, cmd []string, stdin io.Reader, }) if stdin != nil { - go io.Copy(p, stdin) + go pools.Copy(p, stdin) } if stdout != nil { - go io.Copy(stdout, p) + go pools.Copy(stdout, p) } cmdErr = execCmd.Wait() @@ -85,7 +86,7 @@ func (ss streamService) Exec(containerID string, cmd []string, stdin io.Reader, if err != nil { return err } - go io.Copy(w, stdin) + go pools.Copy(w, stdin) execCmd.Stdin = r } diff --git a/server/container_portforward.go b/server/container_portforward.go index 97cf8258..bc222d7d 100644 --- a/server/container_portforward.go +++ b/server/container_portforward.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/Sirupsen/logrus" + "github.com/docker/docker/pkg/pools" "github.com/kubernetes-incubator/cri-o/oci" "golang.org/x/net/context" pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" @@ -78,7 +79,7 @@ func (ss streamService) PortForward(podSandboxID string, port int32, stream io.R return fmt.Errorf("unable to do port forwarding: error creating stdin pipe: %v", err) } go func() { - io.Copy(inPipe, stream) + pools.Copy(inPipe, stream) inPipe.Close() }() diff --git a/server/container_start.go b/server/container_start.go index a426def9..6ef7d415 100644 --- a/server/container_start.go +++ b/server/container_start.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/Sirupsen/logrus" + "github.com/kubernetes-incubator/cri-o/oci" "golang.org/x/net/context" pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" ) @@ -15,12 +16,26 @@ func (s *Server) StartContainer(ctx context.Context, req *pb.StartContainerReque if err != nil { return nil, err } - - if err = s.runtime.StartContainer(c); err != nil { - return nil, fmt.Errorf("failed to start container %s: %v", c.ID(), err) + state := s.runtime.ContainerStatus(c) + if state.Status != oci.ContainerStateCreated { + return nil, fmt.Errorf("container %s is not in created state: %s", c.ID(), state.Status) } - s.containerStateToDisk(c) + defer func() { + // if the call to StartContainer fails below we still want to fill + // some fields of a container status. In particular, we're going to + // adjust container started/finished time and set an error to be + // returned in the Reason field for container status call. + if err != nil { + s.runtime.SetStartFailed(c, err) + } + s.containerStateToDisk(c) + }() + + err = s.runtime.StartContainer(c) + if err != nil { + return nil, fmt.Errorf("failed to start container %s: %v", c.ID(), err) + } resp := &pb.StartContainerResponse{} logrus.Debugf("StartContainerResponse %+v", resp) diff --git a/server/container_status.go b/server/container_status.go index 3320365f..64c8c024 100644 --- a/server/container_status.go +++ b/server/container_status.go @@ -12,6 +12,12 @@ import ( pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" ) +const ( + oomKilledReason = "OOMKilled" + completedReason = "Completed" + errorReason = "Error" +) + // ContainerStatus returns status of the container. func (s *Server) ContainerStatus(ctx context.Context, req *pb.ContainerStatusRequest) (*pb.ContainerStatusResponse, error) { logrus.Debugf("ContainerStatusRequest %+v", req) @@ -100,11 +106,12 @@ func (s *Server) ContainerStatus(ctx context.Context, req *pb.ContainerStatusReq resp.Status.ExitCode = cState.ExitCode switch { case cState.OOMKilled: - resp.Status.Reason = "OOMKilled" + resp.Status.Reason = oomKilledReason case cState.ExitCode == 0: - resp.Status.Reason = "Completed" + resp.Status.Reason = completedReason default: - resp.Status.Reason = "Error" + resp.Status.Reason = errorReason + resp.Status.Message = cState.Error } } diff --git a/vendor/github.com/docker/docker/pkg/pools/pools.go b/vendor/github.com/docker/docker/pkg/pools/pools.go new file mode 100644 index 00000000..5c5aead6 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/pools/pools.go @@ -0,0 +1,116 @@ +// Package pools provides a collection of pools which provide various +// data types with buffers. These can be used to lower the number of +// memory allocations and reuse buffers. +// +// New pools should be added to this package to allow them to be +// shared across packages. +// +// Utility functions which operate on pools should be added to this +// package to allow them to be reused. +package pools + +import ( + "bufio" + "io" + "sync" + + "github.com/docker/docker/pkg/ioutils" +) + +var ( + // BufioReader32KPool is a pool which returns bufio.Reader with a 32K buffer. + BufioReader32KPool = newBufioReaderPoolWithSize(buffer32K) + // BufioWriter32KPool is a pool which returns bufio.Writer with a 32K buffer. + BufioWriter32KPool = newBufioWriterPoolWithSize(buffer32K) +) + +const buffer32K = 32 * 1024 + +// BufioReaderPool is a bufio reader that uses sync.Pool. +type BufioReaderPool struct { + pool sync.Pool +} + +// newBufioReaderPoolWithSize is unexported because new pools should be +// added here to be shared where required. +func newBufioReaderPoolWithSize(size int) *BufioReaderPool { + return &BufioReaderPool{ + pool: sync.Pool{ + New: func() interface{} { return bufio.NewReaderSize(nil, size) }, + }, + } +} + +// Get returns a bufio.Reader which reads from r. The buffer size is that of the pool. +func (bufPool *BufioReaderPool) Get(r io.Reader) *bufio.Reader { + buf := bufPool.pool.Get().(*bufio.Reader) + buf.Reset(r) + return buf +} + +// Put puts the bufio.Reader back into the pool. +func (bufPool *BufioReaderPool) Put(b *bufio.Reader) { + b.Reset(nil) + bufPool.pool.Put(b) +} + +// Copy is a convenience wrapper which uses a buffer to avoid allocation in io.Copy. +func Copy(dst io.Writer, src io.Reader) (written int64, err error) { + buf := BufioReader32KPool.Get(src) + written, err = io.Copy(dst, buf) + BufioReader32KPool.Put(buf) + return +} + +// NewReadCloserWrapper returns a wrapper which puts the bufio.Reader back +// into the pool and closes the reader if it's an io.ReadCloser. +func (bufPool *BufioReaderPool) NewReadCloserWrapper(buf *bufio.Reader, r io.Reader) io.ReadCloser { + return ioutils.NewReadCloserWrapper(r, func() error { + if readCloser, ok := r.(io.ReadCloser); ok { + readCloser.Close() + } + bufPool.Put(buf) + return nil + }) +} + +// BufioWriterPool is a bufio writer that uses sync.Pool. +type BufioWriterPool struct { + pool sync.Pool +} + +// newBufioWriterPoolWithSize is unexported because new pools should be +// added here to be shared where required. +func newBufioWriterPoolWithSize(size int) *BufioWriterPool { + return &BufioWriterPool{ + pool: sync.Pool{ + New: func() interface{} { return bufio.NewWriterSize(nil, size) }, + }, + } +} + +// Get returns a bufio.Writer which writes to w. The buffer size is that of the pool. +func (bufPool *BufioWriterPool) Get(w io.Writer) *bufio.Writer { + buf := bufPool.pool.Get().(*bufio.Writer) + buf.Reset(w) + return buf +} + +// Put puts the bufio.Writer back into the pool. +func (bufPool *BufioWriterPool) Put(b *bufio.Writer) { + b.Reset(nil) + bufPool.pool.Put(b) +} + +// NewWriteCloserWrapper returns a wrapper which puts the bufio.Writer back +// into the pool and closes the writer if it's an io.Writecloser. +func (bufPool *BufioWriterPool) NewWriteCloserWrapper(buf *bufio.Writer, w io.Writer) io.WriteCloser { + return ioutils.NewWriteCloserWrapper(w, func() error { + buf.Flush() + if writeCloser, ok := w.(io.WriteCloser); ok { + writeCloser.Close() + } + bufPool.Put(buf) + return nil + }) +}