From 0b2f6b53546463d4b30b55d0967ccb0544ce4098 Mon Sep 17 00:00:00 2001 From: Antonio Murdaca Date: Sat, 10 Jun 2017 18:18:59 +0200 Subject: [PATCH 1/2] adjust status on container start failure Signed-off-by: Antonio Murdaca --- oci/container.go | 1 + oci/oci.go | 9 +++++++++ server/container_start.go | 23 +++++++++++++++++++---- server/container_status.go | 13 ++++++++++--- 4 files changed, 39 insertions(+), 7 deletions(-) diff --git a/oci/container.go b/oci/container.go index 2e760762..42885db4 100644 --- a/oci/container.go +++ b/oci/container.go @@ -50,6 +50,7 @@ type ContainerState struct { Finished time.Time `json:"finished,omitempty"` ExitCode int32 `json:"exitCode,omitempty"` OOMKilled bool `json:"oomKilled,omitempty"` + Error string `json:"error,omitempty"` } // NewContainer creates a container object. diff --git a/oci/oci.go b/oci/oci.go index bc64b34c..b1091601 100644 --- a/oci/oci.go +++ b/oci/oci.go @@ -545,6 +545,15 @@ func (r *Runtime) DeleteContainer(c *Container) error { return err } +// SetStartFailed sets the container state appropriately after a start failure +func (r *Runtime) SetStartFailed(c *Container, err error) { + c.opLock.Lock() + defer c.opLock.Unlock() + // adjust finished and started times + c.state.Finished, c.state.Started = c.state.Created, c.state.Created + c.state.Error = err.Error() +} + // UpdateStatus refreshes the status of the container. func (r *Runtime) UpdateStatus(c *Container) error { c.opLock.Lock() diff --git a/server/container_start.go b/server/container_start.go index a426def9..6ef7d415 100644 --- a/server/container_start.go +++ b/server/container_start.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/Sirupsen/logrus" + "github.com/kubernetes-incubator/cri-o/oci" "golang.org/x/net/context" pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" ) @@ -15,12 +16,26 @@ func (s *Server) StartContainer(ctx context.Context, req *pb.StartContainerReque if err != nil { return nil, err } - - if err = s.runtime.StartContainer(c); err != nil { - return nil, fmt.Errorf("failed to start container %s: %v", c.ID(), err) + state := s.runtime.ContainerStatus(c) + if state.Status != oci.ContainerStateCreated { + return nil, fmt.Errorf("container %s is not in created state: %s", c.ID(), state.Status) } - s.containerStateToDisk(c) + defer func() { + // if the call to StartContainer fails below we still want to fill + // some fields of a container status. In particular, we're going to + // adjust container started/finished time and set an error to be + // returned in the Reason field for container status call. + if err != nil { + s.runtime.SetStartFailed(c, err) + } + s.containerStateToDisk(c) + }() + + err = s.runtime.StartContainer(c) + if err != nil { + return nil, fmt.Errorf("failed to start container %s: %v", c.ID(), err) + } resp := &pb.StartContainerResponse{} logrus.Debugf("StartContainerResponse %+v", resp) diff --git a/server/container_status.go b/server/container_status.go index 3320365f..64c8c024 100644 --- a/server/container_status.go +++ b/server/container_status.go @@ -12,6 +12,12 @@ import ( pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" ) +const ( + oomKilledReason = "OOMKilled" + completedReason = "Completed" + errorReason = "Error" +) + // ContainerStatus returns status of the container. func (s *Server) ContainerStatus(ctx context.Context, req *pb.ContainerStatusRequest) (*pb.ContainerStatusResponse, error) { logrus.Debugf("ContainerStatusRequest %+v", req) @@ -100,11 +106,12 @@ func (s *Server) ContainerStatus(ctx context.Context, req *pb.ContainerStatusReq resp.Status.ExitCode = cState.ExitCode switch { case cState.OOMKilled: - resp.Status.Reason = "OOMKilled" + resp.Status.Reason = oomKilledReason case cState.ExitCode == 0: - resp.Status.Reason = "Completed" + resp.Status.Reason = completedReason default: - resp.Status.Reason = "Error" + resp.Status.Reason = errorReason + resp.Status.Message = cState.Error } } From b2110610166d65f55c5c29c1562104be250924e9 Mon Sep 17 00:00:00 2001 From: Antonio Murdaca Date: Sat, 10 Jun 2017 18:33:35 +0200 Subject: [PATCH 2/2] copy using bytes pools Vendor and use docker/pkg/pools. pools are used to lower the number of memory allocations and reuse buffers when processing large streams operations.. The use of pools.Copy avoids io.Copy's internal buffer allocation. This commit replaces io.Copy with pools.Copy to avoid the allocation of buffers in io.Copy. Signed-off-by: Antonio Murdaca --- server/container_attach.go | 3 +- server/container_exec.go | 7 +- server/container_portforward.go | 3 +- .../docker/docker/pkg/pools/pools.go | 116 ++++++++++++++++++ 4 files changed, 124 insertions(+), 5 deletions(-) create mode 100644 vendor/github.com/docker/docker/pkg/pools/pools.go diff --git a/server/container_attach.go b/server/container_attach.go index 9d96a56a..81c6cc3c 100644 --- a/server/container_attach.go +++ b/server/container_attach.go @@ -10,6 +10,7 @@ import ( "syscall" "github.com/Sirupsen/logrus" + "github.com/docker/docker/pkg/pools" "github.com/kubernetes-incubator/cri-o/oci" "github.com/kubernetes-incubator/cri-o/utils" "golang.org/x/net/context" @@ -108,7 +109,7 @@ func redirectResponseToOutputStream(tty bool, outputStream, errorStream io.Write } var err error if tty { - _, err = io.Copy(outputStream, conn) + _, err = pools.Copy(outputStream, conn) } else { // TODO } diff --git a/server/container_exec.go b/server/container_exec.go index e6811d72..2dbab839 100644 --- a/server/container_exec.go +++ b/server/container_exec.go @@ -7,6 +7,7 @@ import ( "os/exec" "github.com/Sirupsen/logrus" + "github.com/docker/docker/pkg/pools" "github.com/kubernetes-incubator/cri-o/oci" "golang.org/x/net/context" pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" @@ -67,11 +68,11 @@ func (ss streamService) Exec(containerID string, cmd []string, stdin io.Reader, }) if stdin != nil { - go io.Copy(p, stdin) + go pools.Copy(p, stdin) } if stdout != nil { - go io.Copy(stdout, p) + go pools.Copy(stdout, p) } cmdErr = execCmd.Wait() @@ -85,7 +86,7 @@ func (ss streamService) Exec(containerID string, cmd []string, stdin io.Reader, if err != nil { return err } - go io.Copy(w, stdin) + go pools.Copy(w, stdin) execCmd.Stdin = r } diff --git a/server/container_portforward.go b/server/container_portforward.go index 97cf8258..bc222d7d 100644 --- a/server/container_portforward.go +++ b/server/container_portforward.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/Sirupsen/logrus" + "github.com/docker/docker/pkg/pools" "github.com/kubernetes-incubator/cri-o/oci" "golang.org/x/net/context" pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" @@ -78,7 +79,7 @@ func (ss streamService) PortForward(podSandboxID string, port int32, stream io.R return fmt.Errorf("unable to do port forwarding: error creating stdin pipe: %v", err) } go func() { - io.Copy(inPipe, stream) + pools.Copy(inPipe, stream) inPipe.Close() }() diff --git a/vendor/github.com/docker/docker/pkg/pools/pools.go b/vendor/github.com/docker/docker/pkg/pools/pools.go new file mode 100644 index 00000000..5c5aead6 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/pools/pools.go @@ -0,0 +1,116 @@ +// Package pools provides a collection of pools which provide various +// data types with buffers. These can be used to lower the number of +// memory allocations and reuse buffers. +// +// New pools should be added to this package to allow them to be +// shared across packages. +// +// Utility functions which operate on pools should be added to this +// package to allow them to be reused. +package pools + +import ( + "bufio" + "io" + "sync" + + "github.com/docker/docker/pkg/ioutils" +) + +var ( + // BufioReader32KPool is a pool which returns bufio.Reader with a 32K buffer. + BufioReader32KPool = newBufioReaderPoolWithSize(buffer32K) + // BufioWriter32KPool is a pool which returns bufio.Writer with a 32K buffer. + BufioWriter32KPool = newBufioWriterPoolWithSize(buffer32K) +) + +const buffer32K = 32 * 1024 + +// BufioReaderPool is a bufio reader that uses sync.Pool. +type BufioReaderPool struct { + pool sync.Pool +} + +// newBufioReaderPoolWithSize is unexported because new pools should be +// added here to be shared where required. +func newBufioReaderPoolWithSize(size int) *BufioReaderPool { + return &BufioReaderPool{ + pool: sync.Pool{ + New: func() interface{} { return bufio.NewReaderSize(nil, size) }, + }, + } +} + +// Get returns a bufio.Reader which reads from r. The buffer size is that of the pool. +func (bufPool *BufioReaderPool) Get(r io.Reader) *bufio.Reader { + buf := bufPool.pool.Get().(*bufio.Reader) + buf.Reset(r) + return buf +} + +// Put puts the bufio.Reader back into the pool. +func (bufPool *BufioReaderPool) Put(b *bufio.Reader) { + b.Reset(nil) + bufPool.pool.Put(b) +} + +// Copy is a convenience wrapper which uses a buffer to avoid allocation in io.Copy. +func Copy(dst io.Writer, src io.Reader) (written int64, err error) { + buf := BufioReader32KPool.Get(src) + written, err = io.Copy(dst, buf) + BufioReader32KPool.Put(buf) + return +} + +// NewReadCloserWrapper returns a wrapper which puts the bufio.Reader back +// into the pool and closes the reader if it's an io.ReadCloser. +func (bufPool *BufioReaderPool) NewReadCloserWrapper(buf *bufio.Reader, r io.Reader) io.ReadCloser { + return ioutils.NewReadCloserWrapper(r, func() error { + if readCloser, ok := r.(io.ReadCloser); ok { + readCloser.Close() + } + bufPool.Put(buf) + return nil + }) +} + +// BufioWriterPool is a bufio writer that uses sync.Pool. +type BufioWriterPool struct { + pool sync.Pool +} + +// newBufioWriterPoolWithSize is unexported because new pools should be +// added here to be shared where required. +func newBufioWriterPoolWithSize(size int) *BufioWriterPool { + return &BufioWriterPool{ + pool: sync.Pool{ + New: func() interface{} { return bufio.NewWriterSize(nil, size) }, + }, + } +} + +// Get returns a bufio.Writer which writes to w. The buffer size is that of the pool. +func (bufPool *BufioWriterPool) Get(w io.Writer) *bufio.Writer { + buf := bufPool.pool.Get().(*bufio.Writer) + buf.Reset(w) + return buf +} + +// Put puts the bufio.Writer back into the pool. +func (bufPool *BufioWriterPool) Put(b *bufio.Writer) { + b.Reset(nil) + bufPool.pool.Put(b) +} + +// NewWriteCloserWrapper returns a wrapper which puts the bufio.Writer back +// into the pool and closes the writer if it's an io.Writecloser. +func (bufPool *BufioWriterPool) NewWriteCloserWrapper(buf *bufio.Writer, w io.Writer) io.WriteCloser { + return ioutils.NewWriteCloserWrapper(w, func() error { + buf.Flush() + if writeCloser, ok := w.(io.WriteCloser); ok { + writeCloser.Close() + } + bufPool.Put(buf) + return nil + }) +}