Merge pull request #18353 from aaronlehmann/transfer-manager

Improved push and pull with upload manager and download manager
This commit is contained in:
Alexander Morozov 2015-12-10 14:52:48 -08:00
commit ffa3493afc
9 changed files with 334 additions and 329 deletions

View file

@ -1,167 +0,0 @@
package broadcaster
import (
"errors"
"io"
"sync"
)
// Buffered keeps track of one or more observers watching the progress
// of an operation. For example, if multiple clients are trying to pull an
// image, they share a Buffered struct for the download operation.
type Buffered struct {
sync.Mutex
// c is a channel that observers block on, waiting for the operation
// to finish.
c chan struct{}
// cond is a condition variable used to wake up observers when there's
// new data available.
cond *sync.Cond
// history is a buffer of the progress output so far, so a new observer
// can catch up. The history is stored as a slice of separate byte
// slices, so that if the writer is a WriteFlusher, the flushes will
// happen in the right places.
history [][]byte
// wg is a WaitGroup used to wait for all writes to finish on Close
wg sync.WaitGroup
// result is the argument passed to the first call of Close, and
// returned to callers of Wait
result error
}
// NewBuffered returns an initialized Buffered structure.
func NewBuffered() *Buffered {
b := &Buffered{
c: make(chan struct{}),
}
b.cond = sync.NewCond(b)
return b
}
// closed returns true if and only if the broadcaster has been closed
func (broadcaster *Buffered) closed() bool {
select {
case <-broadcaster.c:
return true
default:
return false
}
}
// receiveWrites runs as a goroutine so that writes don't block the Write
// function. It writes the new data in broadcaster.history each time there's
// activity on the broadcaster.cond condition variable.
func (broadcaster *Buffered) receiveWrites(observer io.Writer) {
n := 0
broadcaster.Lock()
// The condition variable wait is at the end of this loop, so that the
// first iteration will write the history so far.
for {
newData := broadcaster.history[n:]
// Make a copy of newData so we can release the lock
sendData := make([][]byte, len(newData), len(newData))
copy(sendData, newData)
broadcaster.Unlock()
for len(sendData) > 0 {
_, err := observer.Write(sendData[0])
if err != nil {
broadcaster.wg.Done()
return
}
n++
sendData = sendData[1:]
}
broadcaster.Lock()
// If we are behind, we need to catch up instead of waiting
// or handling a closure.
if len(broadcaster.history) != n {
continue
}
// detect closure of the broadcast writer
if broadcaster.closed() {
broadcaster.Unlock()
broadcaster.wg.Done()
return
}
broadcaster.cond.Wait()
// Mutex is still locked as the loop continues
}
}
// Write adds data to the history buffer, and also writes it to all current
// observers.
func (broadcaster *Buffered) Write(p []byte) (n int, err error) {
broadcaster.Lock()
defer broadcaster.Unlock()
// Is the broadcaster closed? If so, the write should fail.
if broadcaster.closed() {
return 0, errors.New("attempted write to a closed broadcaster.Buffered")
}
// Add message in p to the history slice
newEntry := make([]byte, len(p), len(p))
copy(newEntry, p)
broadcaster.history = append(broadcaster.history, newEntry)
broadcaster.cond.Broadcast()
return len(p), nil
}
// Add adds an observer to the broadcaster. The new observer receives the
// data from the history buffer, and also all subsequent data.
func (broadcaster *Buffered) Add(w io.Writer) error {
// The lock is acquired here so that Add can't race with Close
broadcaster.Lock()
defer broadcaster.Unlock()
if broadcaster.closed() {
return errors.New("attempted to add observer to a closed broadcaster.Buffered")
}
broadcaster.wg.Add(1)
go broadcaster.receiveWrites(w)
return nil
}
// CloseWithError signals to all observers that the operation has finished. Its
// argument is a result that should be returned to waiters blocking on Wait.
func (broadcaster *Buffered) CloseWithError(result error) {
broadcaster.Lock()
if broadcaster.closed() {
broadcaster.Unlock()
return
}
broadcaster.result = result
close(broadcaster.c)
broadcaster.cond.Broadcast()
broadcaster.Unlock()
// Don't return until all writers have caught up.
broadcaster.wg.Wait()
}
// Close signals to all observers that the operation has finished. It causes
// all calls to Wait to return nil.
func (broadcaster *Buffered) Close() {
broadcaster.CloseWithError(nil)
}
// Wait blocks until the operation is marked as completed by the Close method,
// and all writer goroutines have completed. It returns the argument that was
// passed to Close.
func (broadcaster *Buffered) Wait() error {
<-broadcaster.c
broadcaster.wg.Wait()
return broadcaster.result
}

View file

@ -4,6 +4,8 @@ import (
"crypto/sha256"
"encoding/hex"
"io"
"golang.org/x/net/context"
)
type readCloserWrapper struct {
@ -81,3 +83,72 @@ func (r *OnEOFReader) runFunc() {
r.Fn = nil
}
}
// cancelReadCloser wraps an io.ReadCloser with a context for cancelling read
// operations.
type cancelReadCloser struct {
cancel func()
pR *io.PipeReader // Stream to read from
pW *io.PipeWriter
}
// NewCancelReadCloser creates a wrapper that closes the ReadCloser when the
// context is cancelled. The returned io.ReadCloser must be closed when it is
// no longer needed.
func NewCancelReadCloser(ctx context.Context, in io.ReadCloser) io.ReadCloser {
pR, pW := io.Pipe()
// Create a context used to signal when the pipe is closed
doneCtx, cancel := context.WithCancel(context.Background())
p := &cancelReadCloser{
cancel: cancel,
pR: pR,
pW: pW,
}
go func() {
_, err := io.Copy(pW, in)
select {
case <-ctx.Done():
// If the context was closed, p.closeWithError
// was already called. Calling it again would
// change the error that Read returns.
default:
p.closeWithError(err)
}
in.Close()
}()
go func() {
for {
select {
case <-ctx.Done():
p.closeWithError(ctx.Err())
case <-doneCtx.Done():
return
}
}
}()
return p
}
// Read wraps the Read method of the pipe that provides data from the wrapped
// ReadCloser.
func (p *cancelReadCloser) Read(buf []byte) (n int, err error) {
return p.pR.Read(buf)
}
// closeWithError closes the wrapper and its underlying reader. It will
// cause future calls to Read to return err.
func (p *cancelReadCloser) closeWithError(err error) {
p.pW.CloseWithError(err)
p.cancel()
}
// Close closes the wrapper its underlying reader. It will cause
// future calls to Read to return io.EOF.
func (p *cancelReadCloser) Close() error {
p.closeWithError(io.EOF)
return nil
}

View file

@ -2,8 +2,12 @@ package ioutils
import (
"fmt"
"io/ioutil"
"strings"
"testing"
"time"
"golang.org/x/net/context"
)
// Implement io.Reader
@ -65,3 +69,26 @@ func TestHashData(t *testing.T) {
t.Fatalf("Expecting %s, got %s", expected, actual)
}
}
type perpetualReader struct{}
func (p *perpetualReader) Read(buf []byte) (n int, err error) {
for i := 0; i != len(buf); i++ {
buf[i] = 'a'
}
return len(buf), nil
}
func TestCancelReadCloser(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond)
cancelReadCloser := NewCancelReadCloser(ctx, ioutil.NopCloser(&perpetualReader{}))
for {
var buf [128]byte
_, err := cancelReadCloser.Read(buf[:])
if err == context.DeadlineExceeded {
break
} else if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
}
}

63
progress/progress.go Normal file
View file

@ -0,0 +1,63 @@
package progress
import (
"fmt"
)
// Progress represents the progress of a transfer.
type Progress struct {
ID string
// Progress contains a Message or...
Message string
// ...progress of an action
Action string
Current int64
Total int64
LastUpdate bool
}
// Output is an interface for writing progress information. It's
// like a writer for progress, but we don't call it Writer because
// that would be confusing next to ProgressReader (also, because it
// doesn't implement the io.Writer interface).
type Output interface {
WriteProgress(Progress) error
}
type chanOutput chan<- Progress
func (out chanOutput) WriteProgress(p Progress) error {
out <- p
return nil
}
// ChanOutput returns a Output that writes progress updates to the
// supplied channel.
func ChanOutput(progressChan chan<- Progress) Output {
return chanOutput(progressChan)
}
// Update is a convenience function to write a progress update to the channel.
func Update(out Output, id, action string) {
out.WriteProgress(Progress{ID: id, Action: action})
}
// Updatef is a convenience function to write a printf-formatted progress update
// to the channel.
func Updatef(out Output, id, format string, a ...interface{}) {
Update(out, id, fmt.Sprintf(format, a...))
}
// Message is a convenience function to write a progress message to the channel.
func Message(out Output, id, message string) {
out.WriteProgress(Progress{ID: id, Message: message})
}
// Messagef is a convenience function to write a printf-formatted progress
// message to the channel.
func Messagef(out Output, id, format string, a ...interface{}) {
Message(out, id, fmt.Sprintf(format, a...))
}

View file

@ -0,0 +1,59 @@
package progress
import (
"io"
)
// Reader is a Reader with progress bar.
type Reader struct {
in io.ReadCloser // Stream to read from
out Output // Where to send progress bar to
size int64
current int64
lastUpdate int64
id string
action string
}
// NewProgressReader creates a new ProgressReader.
func NewProgressReader(in io.ReadCloser, out Output, size int64, id, action string) *Reader {
return &Reader{
in: in,
out: out,
size: size,
id: id,
action: action,
}
}
func (p *Reader) Read(buf []byte) (n int, err error) {
read, err := p.in.Read(buf)
p.current += int64(read)
updateEvery := int64(1024 * 512) //512kB
if p.size > 0 {
// Update progress for every 1% read if 1% < 512kB
if increment := int64(0.01 * float64(p.size)); increment < updateEvery {
updateEvery = increment
}
}
if p.current-p.lastUpdate > updateEvery || err != nil {
p.updateProgress(err != nil && read == 0)
p.lastUpdate = p.current
}
return read, err
}
// Close closes the progress reader and its underlying reader.
func (p *Reader) Close() error {
if p.current < p.size {
// print a full progress bar when closing prematurely
p.current = p.size
p.updateProgress(false)
}
return p.in.Close()
}
func (p *Reader) updateProgress(last bool) {
p.out.WriteProgress(Progress{ID: p.id, Action: p.action, Current: p.current, Total: p.size, LastUpdate: last})
}

View file

@ -0,0 +1,75 @@
package progress
import (
"bytes"
"io"
"io/ioutil"
"testing"
)
func TestOutputOnPrematureClose(t *testing.T) {
content := []byte("TESTING")
reader := ioutil.NopCloser(bytes.NewReader(content))
progressChan := make(chan Progress, 10)
pr := NewProgressReader(reader, ChanOutput(progressChan), int64(len(content)), "Test", "Read")
part := make([]byte, 4, 4)
_, err := io.ReadFull(pr, part)
if err != nil {
pr.Close()
t.Fatal(err)
}
drainLoop:
for {
select {
case <-progressChan:
default:
break drainLoop
}
}
pr.Close()
select {
case <-progressChan:
default:
t.Fatalf("Expected some output when closing prematurely")
}
}
func TestCompleteSilently(t *testing.T) {
content := []byte("TESTING")
reader := ioutil.NopCloser(bytes.NewReader(content))
progressChan := make(chan Progress, 10)
pr := NewProgressReader(reader, ChanOutput(progressChan), int64(len(content)), "Test", "Read")
out, err := ioutil.ReadAll(pr)
if err != nil {
pr.Close()
t.Fatal(err)
}
if string(out) != "TESTING" {
pr.Close()
t.Fatalf("Unexpected output %q from reader", string(out))
}
drainLoop:
for {
select {
case <-progressChan:
default:
break drainLoop
}
}
pr.Close()
select {
case <-progressChan:
t.Fatalf("Should have closed silently when read is complete")
default:
}
}

View file

@ -1,68 +0,0 @@
// Package progressreader provides a Reader with a progress bar that can be
// printed out using the streamformatter package.
package progressreader
import (
"io"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/streamformatter"
)
// Config contains the configuration for a Reader with progress bar.
type Config struct {
In io.ReadCloser // Stream to read from
Out io.Writer // Where to send progress bar to
Formatter *streamformatter.StreamFormatter
Size int64
Current int64
LastUpdate int64
NewLines bool
ID string
Action string
}
// New creates a new Config.
func New(newReader Config) *Config {
return &newReader
}
func (config *Config) Read(p []byte) (n int, err error) {
read, err := config.In.Read(p)
config.Current += int64(read)
updateEvery := int64(1024 * 512) //512kB
if config.Size > 0 {
// Update progress for every 1% read if 1% < 512kB
if increment := int64(0.01 * float64(config.Size)); increment < updateEvery {
updateEvery = increment
}
}
if config.Current-config.LastUpdate > updateEvery || err != nil {
updateProgress(config)
config.LastUpdate = config.Current
}
if err != nil && read == 0 {
updateProgress(config)
if config.NewLines {
config.Out.Write(config.Formatter.FormatStatus("", ""))
}
}
return read, err
}
// Close closes the reader (Config).
func (config *Config) Close() error {
if config.Current < config.Size {
//print a full progress bar when closing prematurely
config.Current = config.Size
updateProgress(config)
}
return config.In.Close()
}
func updateProgress(config *Config) {
progress := jsonmessage.JSONProgress{Current: config.Current, Total: config.Size}
fmtMessage := config.Formatter.FormatProgress(config.ID, config.Action, &progress)
config.Out.Write(fmtMessage)
}

View file

@ -1,94 +0,0 @@
package progressreader
import (
"bufio"
"bytes"
"io"
"io/ioutil"
"testing"
"github.com/docker/docker/pkg/streamformatter"
)
func TestOutputOnPrematureClose(t *testing.T) {
var outBuf bytes.Buffer
content := []byte("TESTING")
reader := ioutil.NopCloser(bytes.NewReader(content))
writer := bufio.NewWriter(&outBuf)
prCfg := Config{
In: reader,
Out: writer,
Formatter: streamformatter.NewStreamFormatter(),
Size: int64(len(content)),
NewLines: true,
ID: "Test",
Action: "Read",
}
pr := New(prCfg)
part := make([]byte, 4, 4)
_, err := io.ReadFull(pr, part)
if err != nil {
pr.Close()
t.Fatal(err)
}
if err := writer.Flush(); err != nil {
pr.Close()
t.Fatal(err)
}
tlen := outBuf.Len()
pr.Close()
if err := writer.Flush(); err != nil {
t.Fatal(err)
}
if outBuf.Len() == tlen {
t.Fatalf("Expected some output when closing prematurely")
}
}
func TestCompleteSilently(t *testing.T) {
var outBuf bytes.Buffer
content := []byte("TESTING")
reader := ioutil.NopCloser(bytes.NewReader(content))
writer := bufio.NewWriter(&outBuf)
prCfg := Config{
In: reader,
Out: writer,
Formatter: streamformatter.NewStreamFormatter(),
Size: int64(len(content)),
NewLines: true,
ID: "Test",
Action: "Read",
}
pr := New(prCfg)
out, err := ioutil.ReadAll(pr)
if err != nil {
pr.Close()
t.Fatal(err)
}
if string(out) != "TESTING" {
pr.Close()
t.Fatalf("Unexpected output %q from reader", string(out))
}
if err := writer.Flush(); err != nil {
pr.Close()
t.Fatal(err)
}
tlen := outBuf.Len()
pr.Close()
if err := writer.Flush(); err != nil {
t.Fatal(err)
}
if outBuf.Len() > tlen {
t.Fatalf("Should have closed silently when read is complete")
}
}

View file

@ -7,6 +7,7 @@ import (
"io"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/progress"
)
// StreamFormatter formats a stream, optionally using JSON.
@ -92,6 +93,44 @@ func (sf *StreamFormatter) FormatProgress(id, action string, progress *jsonmessa
return []byte(action + " " + progress.String() + endl)
}
// NewProgressOutput returns a progress.Output object that can be passed to
// progress.NewProgressReader.
func (sf *StreamFormatter) NewProgressOutput(out io.Writer, newLines bool) progress.Output {
return &progressOutput{
sf: sf,
out: out,
newLines: newLines,
}
}
type progressOutput struct {
sf *StreamFormatter
out io.Writer
newLines bool
}
// WriteProgress formats progress information from a ProgressReader.
func (out *progressOutput) WriteProgress(prog progress.Progress) error {
var formatted []byte
if prog.Message != "" {
formatted = out.sf.FormatStatus(prog.ID, prog.Message)
} else {
jsonProgress := jsonmessage.JSONProgress{Current: prog.Current, Total: prog.Total}
formatted = out.sf.FormatProgress(prog.ID, prog.Action, &jsonProgress)
}
_, err := out.out.Write(formatted)
if err != nil {
return err
}
if out.newLines && prog.LastUpdate {
_, err = out.out.Write(out.sf.FormatStatus("", ""))
return err
}
return nil
}
// StdoutFormatter is a streamFormatter that writes to the standard output.
type StdoutFormatter struct {
io.Writer