Improved push and pull with upload manager and download manager

This commit adds a transfer manager which deduplicates and schedules
transfers, and also an upload manager and download manager that build on
top of the transfer manager to provide high-level interfaces for uploads
and downloads. The push and pull code is modified to use these building
blocks.

Some benefits of the changes:

- Simplification of push/pull code
- Pushes can upload layers concurrently
- Failed downloads and uploads are retried after backoff delays
- Cancellation is supported, but individual transfers will only be
  cancelled if all pushes or pulls using them are cancelled.
- The distribution code is decoupled from Docker Engine packages and API
  conventions (i.e. streamformatter), which will make it easier to split
  out.

This commit also includes unit tests for the new distribution/xfer
package. The tests cover 87.8% of the statements in the package.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
Aaron Lehmann 2015-11-13 16:59:01 -08:00
parent 17aa7f8b20
commit 41193db82e
9 changed files with 334 additions and 329 deletions

View file

@ -1,167 +0,0 @@
package broadcaster
import (
"errors"
"io"
"sync"
)
// Buffered keeps track of one or more observers watching the progress
// of an operation. For example, if multiple clients are trying to pull an
// image, they share a Buffered struct for the download operation.
type Buffered struct {
sync.Mutex
// c is a channel that observers block on, waiting for the operation
// to finish.
c chan struct{}
// cond is a condition variable used to wake up observers when there's
// new data available.
cond *sync.Cond
// history is a buffer of the progress output so far, so a new observer
// can catch up. The history is stored as a slice of separate byte
// slices, so that if the writer is a WriteFlusher, the flushes will
// happen in the right places.
history [][]byte
// wg is a WaitGroup used to wait for all writes to finish on Close
wg sync.WaitGroup
// result is the argument passed to the first call of Close, and
// returned to callers of Wait
result error
}
// NewBuffered returns an initialized Buffered structure.
func NewBuffered() *Buffered {
b := &Buffered{
c: make(chan struct{}),
}
b.cond = sync.NewCond(b)
return b
}
// closed returns true if and only if the broadcaster has been closed
func (broadcaster *Buffered) closed() bool {
select {
case <-broadcaster.c:
return true
default:
return false
}
}
// receiveWrites runs as a goroutine so that writes don't block the Write
// function. It writes the new data in broadcaster.history each time there's
// activity on the broadcaster.cond condition variable.
func (broadcaster *Buffered) receiveWrites(observer io.Writer) {
n := 0
broadcaster.Lock()
// The condition variable wait is at the end of this loop, so that the
// first iteration will write the history so far.
for {
newData := broadcaster.history[n:]
// Make a copy of newData so we can release the lock
sendData := make([][]byte, len(newData), len(newData))
copy(sendData, newData)
broadcaster.Unlock()
for len(sendData) > 0 {
_, err := observer.Write(sendData[0])
if err != nil {
broadcaster.wg.Done()
return
}
n++
sendData = sendData[1:]
}
broadcaster.Lock()
// If we are behind, we need to catch up instead of waiting
// or handling a closure.
if len(broadcaster.history) != n {
continue
}
// detect closure of the broadcast writer
if broadcaster.closed() {
broadcaster.Unlock()
broadcaster.wg.Done()
return
}
broadcaster.cond.Wait()
// Mutex is still locked as the loop continues
}
}
// Write adds data to the history buffer, and also writes it to all current
// observers.
func (broadcaster *Buffered) Write(p []byte) (n int, err error) {
broadcaster.Lock()
defer broadcaster.Unlock()
// Is the broadcaster closed? If so, the write should fail.
if broadcaster.closed() {
return 0, errors.New("attempted write to a closed broadcaster.Buffered")
}
// Add message in p to the history slice
newEntry := make([]byte, len(p), len(p))
copy(newEntry, p)
broadcaster.history = append(broadcaster.history, newEntry)
broadcaster.cond.Broadcast()
return len(p), nil
}
// Add adds an observer to the broadcaster. The new observer receives the
// data from the history buffer, and also all subsequent data.
func (broadcaster *Buffered) Add(w io.Writer) error {
// The lock is acquired here so that Add can't race with Close
broadcaster.Lock()
defer broadcaster.Unlock()
if broadcaster.closed() {
return errors.New("attempted to add observer to a closed broadcaster.Buffered")
}
broadcaster.wg.Add(1)
go broadcaster.receiveWrites(w)
return nil
}
// CloseWithError signals to all observers that the operation has finished. Its
// argument is a result that should be returned to waiters blocking on Wait.
func (broadcaster *Buffered) CloseWithError(result error) {
broadcaster.Lock()
if broadcaster.closed() {
broadcaster.Unlock()
return
}
broadcaster.result = result
close(broadcaster.c)
broadcaster.cond.Broadcast()
broadcaster.Unlock()
// Don't return until all writers have caught up.
broadcaster.wg.Wait()
}
// Close signals to all observers that the operation has finished. It causes
// all calls to Wait to return nil.
func (broadcaster *Buffered) Close() {
broadcaster.CloseWithError(nil)
}
// Wait blocks until the operation is marked as completed by the Close method,
// and all writer goroutines have completed. It returns the argument that was
// passed to Close.
func (broadcaster *Buffered) Wait() error {
<-broadcaster.c
broadcaster.wg.Wait()
return broadcaster.result
}

View file

@ -4,6 +4,8 @@ import (
"crypto/sha256" "crypto/sha256"
"encoding/hex" "encoding/hex"
"io" "io"
"golang.org/x/net/context"
) )
type readCloserWrapper struct { type readCloserWrapper struct {
@ -81,3 +83,72 @@ func (r *OnEOFReader) runFunc() {
r.Fn = nil r.Fn = nil
} }
} }
// cancelReadCloser wraps an io.ReadCloser with a context for cancelling read
// operations.
type cancelReadCloser struct {
cancel func()
pR *io.PipeReader // Stream to read from
pW *io.PipeWriter
}
// NewCancelReadCloser creates a wrapper that closes the ReadCloser when the
// context is cancelled. The returned io.ReadCloser must be closed when it is
// no longer needed.
func NewCancelReadCloser(ctx context.Context, in io.ReadCloser) io.ReadCloser {
pR, pW := io.Pipe()
// Create a context used to signal when the pipe is closed
doneCtx, cancel := context.WithCancel(context.Background())
p := &cancelReadCloser{
cancel: cancel,
pR: pR,
pW: pW,
}
go func() {
_, err := io.Copy(pW, in)
select {
case <-ctx.Done():
// If the context was closed, p.closeWithError
// was already called. Calling it again would
// change the error that Read returns.
default:
p.closeWithError(err)
}
in.Close()
}()
go func() {
for {
select {
case <-ctx.Done():
p.closeWithError(ctx.Err())
case <-doneCtx.Done():
return
}
}
}()
return p
}
// Read wraps the Read method of the pipe that provides data from the wrapped
// ReadCloser.
func (p *cancelReadCloser) Read(buf []byte) (n int, err error) {
return p.pR.Read(buf)
}
// closeWithError closes the wrapper and its underlying reader. It will
// cause future calls to Read to return err.
func (p *cancelReadCloser) closeWithError(err error) {
p.pW.CloseWithError(err)
p.cancel()
}
// Close closes the wrapper its underlying reader. It will cause
// future calls to Read to return io.EOF.
func (p *cancelReadCloser) Close() error {
p.closeWithError(io.EOF)
return nil
}

View file

@ -2,8 +2,12 @@ package ioutils
import ( import (
"fmt" "fmt"
"io/ioutil"
"strings" "strings"
"testing" "testing"
"time"
"golang.org/x/net/context"
) )
// Implement io.Reader // Implement io.Reader
@ -65,3 +69,26 @@ func TestHashData(t *testing.T) {
t.Fatalf("Expecting %s, got %s", expected, actual) t.Fatalf("Expecting %s, got %s", expected, actual)
} }
} }
type perpetualReader struct{}
func (p *perpetualReader) Read(buf []byte) (n int, err error) {
for i := 0; i != len(buf); i++ {
buf[i] = 'a'
}
return len(buf), nil
}
func TestCancelReadCloser(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond)
cancelReadCloser := NewCancelReadCloser(ctx, ioutil.NopCloser(&perpetualReader{}))
for {
var buf [128]byte
_, err := cancelReadCloser.Read(buf[:])
if err == context.DeadlineExceeded {
break
} else if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
}
}

63
progress/progress.go Normal file
View file

@ -0,0 +1,63 @@
package progress
import (
"fmt"
)
// Progress represents the progress of a transfer.
type Progress struct {
ID string
// Progress contains a Message or...
Message string
// ...progress of an action
Action string
Current int64
Total int64
LastUpdate bool
}
// Output is an interface for writing progress information. It's
// like a writer for progress, but we don't call it Writer because
// that would be confusing next to ProgressReader (also, because it
// doesn't implement the io.Writer interface).
type Output interface {
WriteProgress(Progress) error
}
type chanOutput chan<- Progress
func (out chanOutput) WriteProgress(p Progress) error {
out <- p
return nil
}
// ChanOutput returns a Output that writes progress updates to the
// supplied channel.
func ChanOutput(progressChan chan<- Progress) Output {
return chanOutput(progressChan)
}
// Update is a convenience function to write a progress update to the channel.
func Update(out Output, id, action string) {
out.WriteProgress(Progress{ID: id, Action: action})
}
// Updatef is a convenience function to write a printf-formatted progress update
// to the channel.
func Updatef(out Output, id, format string, a ...interface{}) {
Update(out, id, fmt.Sprintf(format, a...))
}
// Message is a convenience function to write a progress message to the channel.
func Message(out Output, id, message string) {
out.WriteProgress(Progress{ID: id, Message: message})
}
// Messagef is a convenience function to write a printf-formatted progress
// message to the channel.
func Messagef(out Output, id, format string, a ...interface{}) {
Message(out, id, fmt.Sprintf(format, a...))
}

View file

@ -0,0 +1,59 @@
package progress
import (
"io"
)
// Reader is a Reader with progress bar.
type Reader struct {
in io.ReadCloser // Stream to read from
out Output // Where to send progress bar to
size int64
current int64
lastUpdate int64
id string
action string
}
// NewProgressReader creates a new ProgressReader.
func NewProgressReader(in io.ReadCloser, out Output, size int64, id, action string) *Reader {
return &Reader{
in: in,
out: out,
size: size,
id: id,
action: action,
}
}
func (p *Reader) Read(buf []byte) (n int, err error) {
read, err := p.in.Read(buf)
p.current += int64(read)
updateEvery := int64(1024 * 512) //512kB
if p.size > 0 {
// Update progress for every 1% read if 1% < 512kB
if increment := int64(0.01 * float64(p.size)); increment < updateEvery {
updateEvery = increment
}
}
if p.current-p.lastUpdate > updateEvery || err != nil {
p.updateProgress(err != nil && read == 0)
p.lastUpdate = p.current
}
return read, err
}
// Close closes the progress reader and its underlying reader.
func (p *Reader) Close() error {
if p.current < p.size {
// print a full progress bar when closing prematurely
p.current = p.size
p.updateProgress(false)
}
return p.in.Close()
}
func (p *Reader) updateProgress(last bool) {
p.out.WriteProgress(Progress{ID: p.id, Action: p.action, Current: p.current, Total: p.size, LastUpdate: last})
}

View file

@ -0,0 +1,75 @@
package progress
import (
"bytes"
"io"
"io/ioutil"
"testing"
)
func TestOutputOnPrematureClose(t *testing.T) {
content := []byte("TESTING")
reader := ioutil.NopCloser(bytes.NewReader(content))
progressChan := make(chan Progress, 10)
pr := NewProgressReader(reader, ChanOutput(progressChan), int64(len(content)), "Test", "Read")
part := make([]byte, 4, 4)
_, err := io.ReadFull(pr, part)
if err != nil {
pr.Close()
t.Fatal(err)
}
drainLoop:
for {
select {
case <-progressChan:
default:
break drainLoop
}
}
pr.Close()
select {
case <-progressChan:
default:
t.Fatalf("Expected some output when closing prematurely")
}
}
func TestCompleteSilently(t *testing.T) {
content := []byte("TESTING")
reader := ioutil.NopCloser(bytes.NewReader(content))
progressChan := make(chan Progress, 10)
pr := NewProgressReader(reader, ChanOutput(progressChan), int64(len(content)), "Test", "Read")
out, err := ioutil.ReadAll(pr)
if err != nil {
pr.Close()
t.Fatal(err)
}
if string(out) != "TESTING" {
pr.Close()
t.Fatalf("Unexpected output %q from reader", string(out))
}
drainLoop:
for {
select {
case <-progressChan:
default:
break drainLoop
}
}
pr.Close()
select {
case <-progressChan:
t.Fatalf("Should have closed silently when read is complete")
default:
}
}

View file

@ -1,68 +0,0 @@
// Package progressreader provides a Reader with a progress bar that can be
// printed out using the streamformatter package.
package progressreader
import (
"io"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/streamformatter"
)
// Config contains the configuration for a Reader with progress bar.
type Config struct {
In io.ReadCloser // Stream to read from
Out io.Writer // Where to send progress bar to
Formatter *streamformatter.StreamFormatter
Size int64
Current int64
LastUpdate int64
NewLines bool
ID string
Action string
}
// New creates a new Config.
func New(newReader Config) *Config {
return &newReader
}
func (config *Config) Read(p []byte) (n int, err error) {
read, err := config.In.Read(p)
config.Current += int64(read)
updateEvery := int64(1024 * 512) //512kB
if config.Size > 0 {
// Update progress for every 1% read if 1% < 512kB
if increment := int64(0.01 * float64(config.Size)); increment < updateEvery {
updateEvery = increment
}
}
if config.Current-config.LastUpdate > updateEvery || err != nil {
updateProgress(config)
config.LastUpdate = config.Current
}
if err != nil && read == 0 {
updateProgress(config)
if config.NewLines {
config.Out.Write(config.Formatter.FormatStatus("", ""))
}
}
return read, err
}
// Close closes the reader (Config).
func (config *Config) Close() error {
if config.Current < config.Size {
//print a full progress bar when closing prematurely
config.Current = config.Size
updateProgress(config)
}
return config.In.Close()
}
func updateProgress(config *Config) {
progress := jsonmessage.JSONProgress{Current: config.Current, Total: config.Size}
fmtMessage := config.Formatter.FormatProgress(config.ID, config.Action, &progress)
config.Out.Write(fmtMessage)
}

View file

@ -1,94 +0,0 @@
package progressreader
import (
"bufio"
"bytes"
"io"
"io/ioutil"
"testing"
"github.com/docker/docker/pkg/streamformatter"
)
func TestOutputOnPrematureClose(t *testing.T) {
var outBuf bytes.Buffer
content := []byte("TESTING")
reader := ioutil.NopCloser(bytes.NewReader(content))
writer := bufio.NewWriter(&outBuf)
prCfg := Config{
In: reader,
Out: writer,
Formatter: streamformatter.NewStreamFormatter(),
Size: int64(len(content)),
NewLines: true,
ID: "Test",
Action: "Read",
}
pr := New(prCfg)
part := make([]byte, 4, 4)
_, err := io.ReadFull(pr, part)
if err != nil {
pr.Close()
t.Fatal(err)
}
if err := writer.Flush(); err != nil {
pr.Close()
t.Fatal(err)
}
tlen := outBuf.Len()
pr.Close()
if err := writer.Flush(); err != nil {
t.Fatal(err)
}
if outBuf.Len() == tlen {
t.Fatalf("Expected some output when closing prematurely")
}
}
func TestCompleteSilently(t *testing.T) {
var outBuf bytes.Buffer
content := []byte("TESTING")
reader := ioutil.NopCloser(bytes.NewReader(content))
writer := bufio.NewWriter(&outBuf)
prCfg := Config{
In: reader,
Out: writer,
Formatter: streamformatter.NewStreamFormatter(),
Size: int64(len(content)),
NewLines: true,
ID: "Test",
Action: "Read",
}
pr := New(prCfg)
out, err := ioutil.ReadAll(pr)
if err != nil {
pr.Close()
t.Fatal(err)
}
if string(out) != "TESTING" {
pr.Close()
t.Fatalf("Unexpected output %q from reader", string(out))
}
if err := writer.Flush(); err != nil {
pr.Close()
t.Fatal(err)
}
tlen := outBuf.Len()
pr.Close()
if err := writer.Flush(); err != nil {
t.Fatal(err)
}
if outBuf.Len() > tlen {
t.Fatalf("Should have closed silently when read is complete")
}
}

View file

@ -7,6 +7,7 @@ import (
"io" "io"
"github.com/docker/docker/pkg/jsonmessage" "github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/progress"
) )
// StreamFormatter formats a stream, optionally using JSON. // StreamFormatter formats a stream, optionally using JSON.
@ -92,6 +93,44 @@ func (sf *StreamFormatter) FormatProgress(id, action string, progress *jsonmessa
return []byte(action + " " + progress.String() + endl) return []byte(action + " " + progress.String() + endl)
} }
// NewProgressOutput returns a progress.Output object that can be passed to
// progress.NewProgressReader.
func (sf *StreamFormatter) NewProgressOutput(out io.Writer, newLines bool) progress.Output {
return &progressOutput{
sf: sf,
out: out,
newLines: newLines,
}
}
type progressOutput struct {
sf *StreamFormatter
out io.Writer
newLines bool
}
// WriteProgress formats progress information from a ProgressReader.
func (out *progressOutput) WriteProgress(prog progress.Progress) error {
var formatted []byte
if prog.Message != "" {
formatted = out.sf.FormatStatus(prog.ID, prog.Message)
} else {
jsonProgress := jsonmessage.JSONProgress{Current: prog.Current, Total: prog.Total}
formatted = out.sf.FormatProgress(prog.ID, prog.Action, &jsonProgress)
}
_, err := out.out.Write(formatted)
if err != nil {
return err
}
if out.newLines && prog.LastUpdate {
_, err = out.out.Write(out.sf.FormatStatus("", ""))
return err
}
return nil
}
// StdoutFormatter is a streamFormatter that writes to the standard output. // StdoutFormatter is a streamFormatter that writes to the standard output.
type StdoutFormatter struct { type StdoutFormatter struct {
io.Writer io.Writer