diff --git a/broadcaster/buffered.go b/broadcaster/buffered.go deleted file mode 100644 index 57f5f97..0000000 --- a/broadcaster/buffered.go +++ /dev/null @@ -1,167 +0,0 @@ -package broadcaster - -import ( - "errors" - "io" - "sync" -) - -// Buffered keeps track of one or more observers watching the progress -// of an operation. For example, if multiple clients are trying to pull an -// image, they share a Buffered struct for the download operation. -type Buffered struct { - sync.Mutex - // c is a channel that observers block on, waiting for the operation - // to finish. - c chan struct{} - // cond is a condition variable used to wake up observers when there's - // new data available. - cond *sync.Cond - // history is a buffer of the progress output so far, so a new observer - // can catch up. The history is stored as a slice of separate byte - // slices, so that if the writer is a WriteFlusher, the flushes will - // happen in the right places. - history [][]byte - // wg is a WaitGroup used to wait for all writes to finish on Close - wg sync.WaitGroup - // result is the argument passed to the first call of Close, and - // returned to callers of Wait - result error -} - -// NewBuffered returns an initialized Buffered structure. -func NewBuffered() *Buffered { - b := &Buffered{ - c: make(chan struct{}), - } - b.cond = sync.NewCond(b) - return b -} - -// closed returns true if and only if the broadcaster has been closed -func (broadcaster *Buffered) closed() bool { - select { - case <-broadcaster.c: - return true - default: - return false - } -} - -// receiveWrites runs as a goroutine so that writes don't block the Write -// function. It writes the new data in broadcaster.history each time there's -// activity on the broadcaster.cond condition variable. -func (broadcaster *Buffered) receiveWrites(observer io.Writer) { - n := 0 - - broadcaster.Lock() - - // The condition variable wait is at the end of this loop, so that the - // first iteration will write the history so far. - for { - newData := broadcaster.history[n:] - // Make a copy of newData so we can release the lock - sendData := make([][]byte, len(newData), len(newData)) - copy(sendData, newData) - broadcaster.Unlock() - - for len(sendData) > 0 { - _, err := observer.Write(sendData[0]) - if err != nil { - broadcaster.wg.Done() - return - } - n++ - sendData = sendData[1:] - } - - broadcaster.Lock() - - // If we are behind, we need to catch up instead of waiting - // or handling a closure. - if len(broadcaster.history) != n { - continue - } - - // detect closure of the broadcast writer - if broadcaster.closed() { - broadcaster.Unlock() - broadcaster.wg.Done() - return - } - - broadcaster.cond.Wait() - - // Mutex is still locked as the loop continues - } -} - -// Write adds data to the history buffer, and also writes it to all current -// observers. -func (broadcaster *Buffered) Write(p []byte) (n int, err error) { - broadcaster.Lock() - defer broadcaster.Unlock() - - // Is the broadcaster closed? If so, the write should fail. - if broadcaster.closed() { - return 0, errors.New("attempted write to a closed broadcaster.Buffered") - } - - // Add message in p to the history slice - newEntry := make([]byte, len(p), len(p)) - copy(newEntry, p) - broadcaster.history = append(broadcaster.history, newEntry) - - broadcaster.cond.Broadcast() - - return len(p), nil -} - -// Add adds an observer to the broadcaster. The new observer receives the -// data from the history buffer, and also all subsequent data. -func (broadcaster *Buffered) Add(w io.Writer) error { - // The lock is acquired here so that Add can't race with Close - broadcaster.Lock() - defer broadcaster.Unlock() - - if broadcaster.closed() { - return errors.New("attempted to add observer to a closed broadcaster.Buffered") - } - - broadcaster.wg.Add(1) - go broadcaster.receiveWrites(w) - - return nil -} - -// CloseWithError signals to all observers that the operation has finished. Its -// argument is a result that should be returned to waiters blocking on Wait. -func (broadcaster *Buffered) CloseWithError(result error) { - broadcaster.Lock() - if broadcaster.closed() { - broadcaster.Unlock() - return - } - broadcaster.result = result - close(broadcaster.c) - broadcaster.cond.Broadcast() - broadcaster.Unlock() - - // Don't return until all writers have caught up. - broadcaster.wg.Wait() -} - -// Close signals to all observers that the operation has finished. It causes -// all calls to Wait to return nil. -func (broadcaster *Buffered) Close() { - broadcaster.CloseWithError(nil) -} - -// Wait blocks until the operation is marked as completed by the Close method, -// and all writer goroutines have completed. It returns the argument that was -// passed to Close. -func (broadcaster *Buffered) Wait() error { - <-broadcaster.c - broadcaster.wg.Wait() - return broadcaster.result -} diff --git a/ioutils/readers.go b/ioutils/readers.go index b4544de..e73b02b 100644 --- a/ioutils/readers.go +++ b/ioutils/readers.go @@ -4,6 +4,8 @@ import ( "crypto/sha256" "encoding/hex" "io" + + "golang.org/x/net/context" ) type readCloserWrapper struct { @@ -81,3 +83,72 @@ func (r *OnEOFReader) runFunc() { r.Fn = nil } } + +// cancelReadCloser wraps an io.ReadCloser with a context for cancelling read +// operations. +type cancelReadCloser struct { + cancel func() + pR *io.PipeReader // Stream to read from + pW *io.PipeWriter +} + +// NewCancelReadCloser creates a wrapper that closes the ReadCloser when the +// context is cancelled. The returned io.ReadCloser must be closed when it is +// no longer needed. +func NewCancelReadCloser(ctx context.Context, in io.ReadCloser) io.ReadCloser { + pR, pW := io.Pipe() + + // Create a context used to signal when the pipe is closed + doneCtx, cancel := context.WithCancel(context.Background()) + + p := &cancelReadCloser{ + cancel: cancel, + pR: pR, + pW: pW, + } + + go func() { + _, err := io.Copy(pW, in) + select { + case <-ctx.Done(): + // If the context was closed, p.closeWithError + // was already called. Calling it again would + // change the error that Read returns. + default: + p.closeWithError(err) + } + in.Close() + }() + go func() { + for { + select { + case <-ctx.Done(): + p.closeWithError(ctx.Err()) + case <-doneCtx.Done(): + return + } + } + }() + + return p +} + +// Read wraps the Read method of the pipe that provides data from the wrapped +// ReadCloser. +func (p *cancelReadCloser) Read(buf []byte) (n int, err error) { + return p.pR.Read(buf) +} + +// closeWithError closes the wrapper and its underlying reader. It will +// cause future calls to Read to return err. +func (p *cancelReadCloser) closeWithError(err error) { + p.pW.CloseWithError(err) + p.cancel() +} + +// Close closes the wrapper its underlying reader. It will cause +// future calls to Read to return io.EOF. +func (p *cancelReadCloser) Close() error { + p.closeWithError(io.EOF) + return nil +} diff --git a/ioutils/readers_test.go b/ioutils/readers_test.go index 5be68cb..9abc105 100644 --- a/ioutils/readers_test.go +++ b/ioutils/readers_test.go @@ -2,8 +2,12 @@ package ioutils import ( "fmt" + "io/ioutil" "strings" "testing" + "time" + + "golang.org/x/net/context" ) // Implement io.Reader @@ -65,3 +69,26 @@ func TestHashData(t *testing.T) { t.Fatalf("Expecting %s, got %s", expected, actual) } } + +type perpetualReader struct{} + +func (p *perpetualReader) Read(buf []byte) (n int, err error) { + for i := 0; i != len(buf); i++ { + buf[i] = 'a' + } + return len(buf), nil +} + +func TestCancelReadCloser(t *testing.T) { + ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond) + cancelReadCloser := NewCancelReadCloser(ctx, ioutil.NopCloser(&perpetualReader{})) + for { + var buf [128]byte + _, err := cancelReadCloser.Read(buf[:]) + if err == context.DeadlineExceeded { + break + } else if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + } +} diff --git a/progress/progress.go b/progress/progress.go new file mode 100644 index 0000000..1f3b34a --- /dev/null +++ b/progress/progress.go @@ -0,0 +1,63 @@ +package progress + +import ( + "fmt" +) + +// Progress represents the progress of a transfer. +type Progress struct { + ID string + + // Progress contains a Message or... + Message string + + // ...progress of an action + Action string + Current int64 + Total int64 + + LastUpdate bool +} + +// Output is an interface for writing progress information. It's +// like a writer for progress, but we don't call it Writer because +// that would be confusing next to ProgressReader (also, because it +// doesn't implement the io.Writer interface). +type Output interface { + WriteProgress(Progress) error +} + +type chanOutput chan<- Progress + +func (out chanOutput) WriteProgress(p Progress) error { + out <- p + return nil +} + +// ChanOutput returns a Output that writes progress updates to the +// supplied channel. +func ChanOutput(progressChan chan<- Progress) Output { + return chanOutput(progressChan) +} + +// Update is a convenience function to write a progress update to the channel. +func Update(out Output, id, action string) { + out.WriteProgress(Progress{ID: id, Action: action}) +} + +// Updatef is a convenience function to write a printf-formatted progress update +// to the channel. +func Updatef(out Output, id, format string, a ...interface{}) { + Update(out, id, fmt.Sprintf(format, a...)) +} + +// Message is a convenience function to write a progress message to the channel. +func Message(out Output, id, message string) { + out.WriteProgress(Progress{ID: id, Message: message}) +} + +// Messagef is a convenience function to write a printf-formatted progress +// message to the channel. +func Messagef(out Output, id, format string, a ...interface{}) { + Message(out, id, fmt.Sprintf(format, a...)) +} diff --git a/progress/progressreader.go b/progress/progressreader.go new file mode 100644 index 0000000..c39e2b6 --- /dev/null +++ b/progress/progressreader.go @@ -0,0 +1,59 @@ +package progress + +import ( + "io" +) + +// Reader is a Reader with progress bar. +type Reader struct { + in io.ReadCloser // Stream to read from + out Output // Where to send progress bar to + size int64 + current int64 + lastUpdate int64 + id string + action string +} + +// NewProgressReader creates a new ProgressReader. +func NewProgressReader(in io.ReadCloser, out Output, size int64, id, action string) *Reader { + return &Reader{ + in: in, + out: out, + size: size, + id: id, + action: action, + } +} + +func (p *Reader) Read(buf []byte) (n int, err error) { + read, err := p.in.Read(buf) + p.current += int64(read) + updateEvery := int64(1024 * 512) //512kB + if p.size > 0 { + // Update progress for every 1% read if 1% < 512kB + if increment := int64(0.01 * float64(p.size)); increment < updateEvery { + updateEvery = increment + } + } + if p.current-p.lastUpdate > updateEvery || err != nil { + p.updateProgress(err != nil && read == 0) + p.lastUpdate = p.current + } + + return read, err +} + +// Close closes the progress reader and its underlying reader. +func (p *Reader) Close() error { + if p.current < p.size { + // print a full progress bar when closing prematurely + p.current = p.size + p.updateProgress(false) + } + return p.in.Close() +} + +func (p *Reader) updateProgress(last bool) { + p.out.WriteProgress(Progress{ID: p.id, Action: p.action, Current: p.current, Total: p.size, LastUpdate: last}) +} diff --git a/progress/progressreader_test.go b/progress/progressreader_test.go new file mode 100644 index 0000000..b14d401 --- /dev/null +++ b/progress/progressreader_test.go @@ -0,0 +1,75 @@ +package progress + +import ( + "bytes" + "io" + "io/ioutil" + "testing" +) + +func TestOutputOnPrematureClose(t *testing.T) { + content := []byte("TESTING") + reader := ioutil.NopCloser(bytes.NewReader(content)) + progressChan := make(chan Progress, 10) + + pr := NewProgressReader(reader, ChanOutput(progressChan), int64(len(content)), "Test", "Read") + + part := make([]byte, 4, 4) + _, err := io.ReadFull(pr, part) + if err != nil { + pr.Close() + t.Fatal(err) + } + +drainLoop: + for { + select { + case <-progressChan: + default: + break drainLoop + } + } + + pr.Close() + + select { + case <-progressChan: + default: + t.Fatalf("Expected some output when closing prematurely") + } +} + +func TestCompleteSilently(t *testing.T) { + content := []byte("TESTING") + reader := ioutil.NopCloser(bytes.NewReader(content)) + progressChan := make(chan Progress, 10) + + pr := NewProgressReader(reader, ChanOutput(progressChan), int64(len(content)), "Test", "Read") + + out, err := ioutil.ReadAll(pr) + if err != nil { + pr.Close() + t.Fatal(err) + } + if string(out) != "TESTING" { + pr.Close() + t.Fatalf("Unexpected output %q from reader", string(out)) + } + +drainLoop: + for { + select { + case <-progressChan: + default: + break drainLoop + } + } + + pr.Close() + + select { + case <-progressChan: + t.Fatalf("Should have closed silently when read is complete") + default: + } +} diff --git a/progressreader/progressreader.go b/progressreader/progressreader.go deleted file mode 100644 index f48442b..0000000 --- a/progressreader/progressreader.go +++ /dev/null @@ -1,68 +0,0 @@ -// Package progressreader provides a Reader with a progress bar that can be -// printed out using the streamformatter package. -package progressreader - -import ( - "io" - - "github.com/docker/docker/pkg/jsonmessage" - "github.com/docker/docker/pkg/streamformatter" -) - -// Config contains the configuration for a Reader with progress bar. -type Config struct { - In io.ReadCloser // Stream to read from - Out io.Writer // Where to send progress bar to - Formatter *streamformatter.StreamFormatter - Size int64 - Current int64 - LastUpdate int64 - NewLines bool - ID string - Action string -} - -// New creates a new Config. -func New(newReader Config) *Config { - return &newReader -} - -func (config *Config) Read(p []byte) (n int, err error) { - read, err := config.In.Read(p) - config.Current += int64(read) - updateEvery := int64(1024 * 512) //512kB - if config.Size > 0 { - // Update progress for every 1% read if 1% < 512kB - if increment := int64(0.01 * float64(config.Size)); increment < updateEvery { - updateEvery = increment - } - } - if config.Current-config.LastUpdate > updateEvery || err != nil { - updateProgress(config) - config.LastUpdate = config.Current - } - - if err != nil && read == 0 { - updateProgress(config) - if config.NewLines { - config.Out.Write(config.Formatter.FormatStatus("", "")) - } - } - return read, err -} - -// Close closes the reader (Config). -func (config *Config) Close() error { - if config.Current < config.Size { - //print a full progress bar when closing prematurely - config.Current = config.Size - updateProgress(config) - } - return config.In.Close() -} - -func updateProgress(config *Config) { - progress := jsonmessage.JSONProgress{Current: config.Current, Total: config.Size} - fmtMessage := config.Formatter.FormatProgress(config.ID, config.Action, &progress) - config.Out.Write(fmtMessage) -} diff --git a/progressreader/progressreader_test.go b/progressreader/progressreader_test.go deleted file mode 100644 index 21d9b0f..0000000 --- a/progressreader/progressreader_test.go +++ /dev/null @@ -1,94 +0,0 @@ -package progressreader - -import ( - "bufio" - "bytes" - "io" - "io/ioutil" - "testing" - - "github.com/docker/docker/pkg/streamformatter" -) - -func TestOutputOnPrematureClose(t *testing.T) { - var outBuf bytes.Buffer - content := []byte("TESTING") - reader := ioutil.NopCloser(bytes.NewReader(content)) - writer := bufio.NewWriter(&outBuf) - - prCfg := Config{ - In: reader, - Out: writer, - Formatter: streamformatter.NewStreamFormatter(), - Size: int64(len(content)), - NewLines: true, - ID: "Test", - Action: "Read", - } - pr := New(prCfg) - - part := make([]byte, 4, 4) - _, err := io.ReadFull(pr, part) - if err != nil { - pr.Close() - t.Fatal(err) - } - - if err := writer.Flush(); err != nil { - pr.Close() - t.Fatal(err) - } - - tlen := outBuf.Len() - pr.Close() - if err := writer.Flush(); err != nil { - t.Fatal(err) - } - - if outBuf.Len() == tlen { - t.Fatalf("Expected some output when closing prematurely") - } -} - -func TestCompleteSilently(t *testing.T) { - var outBuf bytes.Buffer - content := []byte("TESTING") - reader := ioutil.NopCloser(bytes.NewReader(content)) - writer := bufio.NewWriter(&outBuf) - - prCfg := Config{ - In: reader, - Out: writer, - Formatter: streamformatter.NewStreamFormatter(), - Size: int64(len(content)), - NewLines: true, - ID: "Test", - Action: "Read", - } - pr := New(prCfg) - - out, err := ioutil.ReadAll(pr) - if err != nil { - pr.Close() - t.Fatal(err) - } - if string(out) != "TESTING" { - pr.Close() - t.Fatalf("Unexpected output %q from reader", string(out)) - } - - if err := writer.Flush(); err != nil { - pr.Close() - t.Fatal(err) - } - - tlen := outBuf.Len() - pr.Close() - if err := writer.Flush(); err != nil { - t.Fatal(err) - } - - if outBuf.Len() > tlen { - t.Fatalf("Should have closed silently when read is complete") - } -} diff --git a/streamformatter/streamformatter.go b/streamformatter/streamformatter.go index d3ac39e..b67a53d 100644 --- a/streamformatter/streamformatter.go +++ b/streamformatter/streamformatter.go @@ -7,6 +7,7 @@ import ( "io" "github.com/docker/docker/pkg/jsonmessage" + "github.com/docker/docker/pkg/progress" ) // StreamFormatter formats a stream, optionally using JSON. @@ -92,6 +93,44 @@ func (sf *StreamFormatter) FormatProgress(id, action string, progress *jsonmessa return []byte(action + " " + progress.String() + endl) } +// NewProgressOutput returns a progress.Output object that can be passed to +// progress.NewProgressReader. +func (sf *StreamFormatter) NewProgressOutput(out io.Writer, newLines bool) progress.Output { + return &progressOutput{ + sf: sf, + out: out, + newLines: newLines, + } +} + +type progressOutput struct { + sf *StreamFormatter + out io.Writer + newLines bool +} + +// WriteProgress formats progress information from a ProgressReader. +func (out *progressOutput) WriteProgress(prog progress.Progress) error { + var formatted []byte + if prog.Message != "" { + formatted = out.sf.FormatStatus(prog.ID, prog.Message) + } else { + jsonProgress := jsonmessage.JSONProgress{Current: prog.Current, Total: prog.Total} + formatted = out.sf.FormatProgress(prog.ID, prog.Action, &jsonProgress) + } + _, err := out.out.Write(formatted) + if err != nil { + return err + } + + if out.newLines && prog.LastUpdate { + _, err = out.out.Write(out.sf.FormatStatus("", "")) + return err + } + + return nil +} + // StdoutFormatter is a streamFormatter that writes to the standard output. type StdoutFormatter struct { io.Writer