copy using bytes pools

Vendor and use docker/pkg/pools.
pools are used to lower the number of memory allocations and reuse buffers when
processing large streams operations..

The use of pools.Copy avoids io.Copy's internal buffer allocation.
This commit replaces io.Copy with pools.Copy to avoid the allocation of
buffers in io.Copy.

Signed-off-by: Antonio Murdaca <runcom@redhat.com>
This commit is contained in:
Antonio Murdaca 2017-06-10 18:33:35 +02:00
parent 0b2f6b5354
commit b211061016
No known key found for this signature in database
GPG key ID: B2BEAD150DE936B9
4 changed files with 124 additions and 5 deletions

View file

@ -10,6 +10,7 @@ import (
"syscall" "syscall"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/pools"
"github.com/kubernetes-incubator/cri-o/oci" "github.com/kubernetes-incubator/cri-o/oci"
"github.com/kubernetes-incubator/cri-o/utils" "github.com/kubernetes-incubator/cri-o/utils"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -108,7 +109,7 @@ func redirectResponseToOutputStream(tty bool, outputStream, errorStream io.Write
} }
var err error var err error
if tty { if tty {
_, err = io.Copy(outputStream, conn) _, err = pools.Copy(outputStream, conn)
} else { } else {
// TODO // TODO
} }

View file

@ -7,6 +7,7 @@ import (
"os/exec" "os/exec"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/pools"
"github.com/kubernetes-incubator/cri-o/oci" "github.com/kubernetes-incubator/cri-o/oci"
"golang.org/x/net/context" "golang.org/x/net/context"
pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
@ -67,11 +68,11 @@ func (ss streamService) Exec(containerID string, cmd []string, stdin io.Reader,
}) })
if stdin != nil { if stdin != nil {
go io.Copy(p, stdin) go pools.Copy(p, stdin)
} }
if stdout != nil { if stdout != nil {
go io.Copy(stdout, p) go pools.Copy(stdout, p)
} }
cmdErr = execCmd.Wait() cmdErr = execCmd.Wait()
@ -85,7 +86,7 @@ func (ss streamService) Exec(containerID string, cmd []string, stdin io.Reader,
if err != nil { if err != nil {
return err return err
} }
go io.Copy(w, stdin) go pools.Copy(w, stdin)
execCmd.Stdin = r execCmd.Stdin = r
} }

View file

@ -8,6 +8,7 @@ import (
"strings" "strings"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/pools"
"github.com/kubernetes-incubator/cri-o/oci" "github.com/kubernetes-incubator/cri-o/oci"
"golang.org/x/net/context" "golang.org/x/net/context"
pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
@ -78,7 +79,7 @@ func (ss streamService) PortForward(podSandboxID string, port int32, stream io.R
return fmt.Errorf("unable to do port forwarding: error creating stdin pipe: %v", err) return fmt.Errorf("unable to do port forwarding: error creating stdin pipe: %v", err)
} }
go func() { go func() {
io.Copy(inPipe, stream) pools.Copy(inPipe, stream)
inPipe.Close() inPipe.Close()
}() }()

116
vendor/github.com/docker/docker/pkg/pools/pools.go generated vendored Normal file
View file

@ -0,0 +1,116 @@
// Package pools provides a collection of pools which provide various
// data types with buffers. These can be used to lower the number of
// memory allocations and reuse buffers.
//
// New pools should be added to this package to allow them to be
// shared across packages.
//
// Utility functions which operate on pools should be added to this
// package to allow them to be reused.
package pools
import (
"bufio"
"io"
"sync"
"github.com/docker/docker/pkg/ioutils"
)
var (
// BufioReader32KPool is a pool which returns bufio.Reader with a 32K buffer.
BufioReader32KPool = newBufioReaderPoolWithSize(buffer32K)
// BufioWriter32KPool is a pool which returns bufio.Writer with a 32K buffer.
BufioWriter32KPool = newBufioWriterPoolWithSize(buffer32K)
)
const buffer32K = 32 * 1024
// BufioReaderPool is a bufio reader that uses sync.Pool.
type BufioReaderPool struct {
pool sync.Pool
}
// newBufioReaderPoolWithSize is unexported because new pools should be
// added here to be shared where required.
func newBufioReaderPoolWithSize(size int) *BufioReaderPool {
return &BufioReaderPool{
pool: sync.Pool{
New: func() interface{} { return bufio.NewReaderSize(nil, size) },
},
}
}
// Get returns a bufio.Reader which reads from r. The buffer size is that of the pool.
func (bufPool *BufioReaderPool) Get(r io.Reader) *bufio.Reader {
buf := bufPool.pool.Get().(*bufio.Reader)
buf.Reset(r)
return buf
}
// Put puts the bufio.Reader back into the pool.
func (bufPool *BufioReaderPool) Put(b *bufio.Reader) {
b.Reset(nil)
bufPool.pool.Put(b)
}
// Copy is a convenience wrapper which uses a buffer to avoid allocation in io.Copy.
func Copy(dst io.Writer, src io.Reader) (written int64, err error) {
buf := BufioReader32KPool.Get(src)
written, err = io.Copy(dst, buf)
BufioReader32KPool.Put(buf)
return
}
// NewReadCloserWrapper returns a wrapper which puts the bufio.Reader back
// into the pool and closes the reader if it's an io.ReadCloser.
func (bufPool *BufioReaderPool) NewReadCloserWrapper(buf *bufio.Reader, r io.Reader) io.ReadCloser {
return ioutils.NewReadCloserWrapper(r, func() error {
if readCloser, ok := r.(io.ReadCloser); ok {
readCloser.Close()
}
bufPool.Put(buf)
return nil
})
}
// BufioWriterPool is a bufio writer that uses sync.Pool.
type BufioWriterPool struct {
pool sync.Pool
}
// newBufioWriterPoolWithSize is unexported because new pools should be
// added here to be shared where required.
func newBufioWriterPoolWithSize(size int) *BufioWriterPool {
return &BufioWriterPool{
pool: sync.Pool{
New: func() interface{} { return bufio.NewWriterSize(nil, size) },
},
}
}
// Get returns a bufio.Writer which writes to w. The buffer size is that of the pool.
func (bufPool *BufioWriterPool) Get(w io.Writer) *bufio.Writer {
buf := bufPool.pool.Get().(*bufio.Writer)
buf.Reset(w)
return buf
}
// Put puts the bufio.Writer back into the pool.
func (bufPool *BufioWriterPool) Put(b *bufio.Writer) {
b.Reset(nil)
bufPool.pool.Put(b)
}
// NewWriteCloserWrapper returns a wrapper which puts the bufio.Writer back
// into the pool and closes the writer if it's an io.Writecloser.
func (bufPool *BufioWriterPool) NewWriteCloserWrapper(buf *bufio.Writer, w io.Writer) io.WriteCloser {
return ioutils.NewWriteCloserWrapper(w, func() error {
buf.Flush()
if writeCloser, ok := w.(io.WriteCloser); ok {
writeCloser.Close()
}
bufPool.Put(buf)
return nil
})
}