From 98e8ec854e3bd10dd369ed586a46c87e9bb92aa3 Mon Sep 17 00:00:00 2001 From: bobby abbott Date: Tue, 17 Mar 2015 19:18:41 -0700 Subject: [PATCH] Fixes hacks from progressreader refactor related to #10959 Signed-off-by: bobby abbott --- jsonmessage/jsonmessage.go | 172 ++++++++++++++++++++++++ jsonmessage/jsonmessage_test.go | 38 ++++++ progressreader/progressreader.go | 31 +---- streamformatter/streamformatter.go | 113 ++++++++++++++++ streamformatter/streamformatter_test.go | 68 ++++++++++ 5 files changed, 396 insertions(+), 26 deletions(-) create mode 100644 jsonmessage/jsonmessage.go create mode 100644 jsonmessage/jsonmessage_test.go create mode 100644 streamformatter/streamformatter.go create mode 100644 streamformatter/streamformatter_test.go diff --git a/jsonmessage/jsonmessage.go b/jsonmessage/jsonmessage.go new file mode 100644 index 0000000..7db1626 --- /dev/null +++ b/jsonmessage/jsonmessage.go @@ -0,0 +1,172 @@ +package jsonmessage + +import ( + "encoding/json" + "fmt" + "io" + "strings" + "time" + + "github.com/docker/docker/pkg/term" + "github.com/docker/docker/pkg/timeutils" + "github.com/docker/docker/pkg/units" +) + +type JSONError struct { + Code int `json:"code,omitempty"` + Message string `json:"message,omitempty"` +} + +func (e *JSONError) Error() string { + return e.Message +} + +type JSONProgress struct { + terminalFd uintptr + Current int `json:"current,omitempty"` + Total int `json:"total,omitempty"` + Start int64 `json:"start,omitempty"` +} + +func (p *JSONProgress) String() string { + var ( + width = 200 + pbBox string + numbersBox string + timeLeftBox string + ) + + ws, err := term.GetWinsize(p.terminalFd) + if err == nil { + width = int(ws.Width) + } + + if p.Current <= 0 && p.Total <= 0 { + return "" + } + current := units.HumanSize(float64(p.Current)) + if p.Total <= 0 { + return fmt.Sprintf("%8v", current) + } + total := units.HumanSize(float64(p.Total)) + percentage := int(float64(p.Current)/float64(p.Total)*100) / 2 + if percentage > 50 { + percentage = 50 + } + if width > 110 { + // this number can't be negetive gh#7136 + numSpaces := 0 + if 50-percentage > 0 { + numSpaces = 50 - percentage + } + pbBox = fmt.Sprintf("[%s>%s] ", strings.Repeat("=", percentage), strings.Repeat(" ", numSpaces)) + } + numbersBox = fmt.Sprintf("%8v/%v", current, total) + + if p.Current > 0 && p.Start > 0 && percentage < 50 { + fromStart := time.Now().UTC().Sub(time.Unix(int64(p.Start), 0)) + perEntry := fromStart / time.Duration(p.Current) + left := time.Duration(p.Total-p.Current) * perEntry + left = (left / time.Second) * time.Second + + if width > 50 { + timeLeftBox = " " + left.String() + } + } + return pbBox + numbersBox + timeLeftBox +} + +type JSONMessage struct { + Stream string `json:"stream,omitempty"` + Status string `json:"status,omitempty"` + Progress *JSONProgress `json:"progressDetail,omitempty"` + ProgressMessage string `json:"progress,omitempty"` //deprecated + ID string `json:"id,omitempty"` + From string `json:"from,omitempty"` + Time int64 `json:"time,omitempty"` + Error *JSONError `json:"errorDetail,omitempty"` + ErrorMessage string `json:"error,omitempty"` //deprecated +} + +func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error { + if jm.Error != nil { + if jm.Error.Code == 401 { + return fmt.Errorf("Authentication is required.") + } + return jm.Error + } + var endl string + if isTerminal && jm.Stream == "" && jm.Progress != nil { + // [2K = erase entire current line + fmt.Fprintf(out, "%c[2K\r", 27) + endl = "\r" + } else if jm.Progress != nil && jm.Progress.String() != "" { //disable progressbar in non-terminal + return nil + } + if jm.Time != 0 { + fmt.Fprintf(out, "%s ", time.Unix(jm.Time, 0).Format(timeutils.RFC3339NanoFixed)) + } + if jm.ID != "" { + fmt.Fprintf(out, "%s: ", jm.ID) + } + if jm.From != "" { + fmt.Fprintf(out, "(from %s) ", jm.From) + } + if jm.Progress != nil && isTerminal { + fmt.Fprintf(out, "%s %s%s", jm.Status, jm.Progress.String(), endl) + } else if jm.ProgressMessage != "" { //deprecated + fmt.Fprintf(out, "%s %s%s", jm.Status, jm.ProgressMessage, endl) + } else if jm.Stream != "" { + fmt.Fprintf(out, "%s%s", jm.Stream, endl) + } else { + fmt.Fprintf(out, "%s%s\n", jm.Status, endl) + } + return nil +} + +func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool) error { + var ( + dec = json.NewDecoder(in) + ids = make(map[string]int) + diff = 0 + ) + for { + var jm JSONMessage + if err := dec.Decode(&jm); err != nil { + if err == io.EOF { + break + } + return err + } + + if jm.Progress != nil { + jm.Progress.terminalFd = terminalFd + } + if jm.ID != "" && (jm.Progress != nil || jm.ProgressMessage != "") { + line, ok := ids[jm.ID] + if !ok { + line = len(ids) + ids[jm.ID] = line + if isTerminal { + fmt.Fprintf(out, "\n") + } + diff = 0 + } else { + diff = len(ids) - line + } + if jm.ID != "" && isTerminal { + // [{diff}A = move cursor up diff rows + fmt.Fprintf(out, "%c[%dA", 27, diff) + } + } + err := jm.Display(out, isTerminal) + if jm.ID != "" && isTerminal { + // [{diff}B = move cursor down diff rows + fmt.Fprintf(out, "%c[%dB", 27, diff) + } + if err != nil { + return err + } + } + return nil +} diff --git a/jsonmessage/jsonmessage_test.go b/jsonmessage/jsonmessage_test.go new file mode 100644 index 0000000..4c3f566 --- /dev/null +++ b/jsonmessage/jsonmessage_test.go @@ -0,0 +1,38 @@ +package jsonmessage + +import ( + "testing" +) + +func TestError(t *testing.T) { + je := JSONError{404, "Not found"} + if je.Error() != "Not found" { + t.Fatalf("Expected 'Not found' got '%s'", je.Error()) + } +} + +func TestProgress(t *testing.T) { + jp := JSONProgress{} + if jp.String() != "" { + t.Fatalf("Expected empty string, got '%s'", jp.String()) + } + + expected := " 1 B" + jp2 := JSONProgress{Current: 1} + if jp2.String() != expected { + t.Fatalf("Expected %q, got %q", expected, jp2.String()) + } + + expected = "[=========================> ] 50 B/100 B" + jp3 := JSONProgress{Current: 50, Total: 100} + if jp3.String() != expected { + t.Fatalf("Expected %q, got %q", expected, jp3.String()) + } + + // this number can't be negetive gh#7136 + expected = "[==================================================>] 50 B/40 B" + jp4 := JSONProgress{Current: 50, Total: 40} + if jp4.String() != expected { + t.Fatalf("Expected %q, got %q", expected, jp4.String()) + } +} diff --git a/progressreader/progressreader.go b/progressreader/progressreader.go index 730559e..e548b07 100644 --- a/progressreader/progressreader.go +++ b/progressreader/progressreader.go @@ -1,37 +1,16 @@ package progressreader import ( + "github.com/docker/docker/pkg/jsonmessage" + "github.com/docker/docker/pkg/streamformatter" "io" ) -type StreamFormatter interface { - FormatProg(string, string, interface{}) []byte - FormatStatus(string, string, ...interface{}) []byte - FormatError(error) []byte -} - -type PR_JSONProgress interface { - GetCurrent() int - GetTotal() int -} - -type JSONProg struct { - Current int - Total int -} - -func (j *JSONProg) GetCurrent() int { - return j.Current -} -func (j *JSONProg) GetTotal() int { - return j.Total -} - // Reader with progress bar type Config struct { In io.ReadCloser // Stream to read from Out io.Writer // Where to send progress bar to - Formatter StreamFormatter + Formatter *streamformatter.StreamFormatter Size int Current int LastUpdate int @@ -54,7 +33,7 @@ func (config *Config) Read(p []byte) (n int, err error) { } } if config.Current-config.LastUpdate > updateEvery || err != nil { - config.Out.Write(config.Formatter.FormatProg(config.ID, config.Action, &JSONProg{Current: config.Current, Total: config.Size})) + config.Out.Write(config.Formatter.FormatProgress(config.ID, config.Action, &jsonmessage.JSONProgress{Current: config.Current, Total: config.Size})) config.LastUpdate = config.Current } // Send newline when complete @@ -64,6 +43,6 @@ func (config *Config) Read(p []byte) (n int, err error) { return read, err } func (config *Config) Close() error { - config.Out.Write(config.Formatter.FormatProg(config.ID, config.Action, &JSONProg{Current: config.Current, Total: config.Size})) + config.Out.Write(config.Formatter.FormatProgress(config.ID, config.Action, &jsonmessage.JSONProgress{Current: config.Current, Total: config.Size})) return config.In.Close() } diff --git a/streamformatter/streamformatter.go b/streamformatter/streamformatter.go new file mode 100644 index 0000000..383e7ad --- /dev/null +++ b/streamformatter/streamformatter.go @@ -0,0 +1,113 @@ +package streamformatter + +import ( + "encoding/json" + "fmt" + "github.com/docker/docker/pkg/jsonmessage" + "io" +) + +type StreamFormatter struct { + json bool +} + +func NewStreamFormatter(json bool) *StreamFormatter { + return &StreamFormatter{json} +} + +const streamNewline = "\r\n" + +var streamNewlineBytes = []byte(streamNewline) + +func (sf *StreamFormatter) FormatStream(str string) []byte { + if sf.json { + b, err := json.Marshal(&jsonmessage.JSONMessage{Stream: str}) + if err != nil { + return sf.FormatError(err) + } + return append(b, streamNewlineBytes...) + } + return []byte(str + "\r") +} + +func (sf *StreamFormatter) FormatStatus(id, format string, a ...interface{}) []byte { + str := fmt.Sprintf(format, a...) + if sf.json { + b, err := json.Marshal(&jsonmessage.JSONMessage{ID: id, Status: str}) + if err != nil { + return sf.FormatError(err) + } + return append(b, streamNewlineBytes...) + } + return []byte(str + streamNewline) +} + +func (sf *StreamFormatter) FormatError(err error) []byte { + if sf.json { + jsonError, ok := err.(*jsonmessage.JSONError) + if !ok { + jsonError = &jsonmessage.JSONError{Message: err.Error()} + } + if b, err := json.Marshal(&jsonmessage.JSONMessage{Error: jsonError, ErrorMessage: err.Error()}); err == nil { + return append(b, streamNewlineBytes...) + } + return []byte("{\"error\":\"format error\"}" + streamNewline) + } + return []byte("Error: " + err.Error() + streamNewline) +} + +func (sf *StreamFormatter) FormatProgress(id, action string, progress *jsonmessage.JSONProgress) []byte { + if progress == nil { + progress = &jsonmessage.JSONProgress{} + } + if sf.json { + + b, err := json.Marshal(&jsonmessage.JSONMessage{ + Status: action, + ProgressMessage: progress.String(), + Progress: progress, + ID: id, + }) + if err != nil { + return nil + } + return b + } + endl := "\r" + if progress.String() == "" { + endl += "\n" + } + return []byte(action + " " + progress.String() + endl) +} + +func (sf *StreamFormatter) Json() bool { + return sf.json +} + +type StdoutFormater struct { + io.Writer + *StreamFormatter +} + +func (sf *StdoutFormater) Write(buf []byte) (int, error) { + formattedBuf := sf.StreamFormatter.FormatStream(string(buf)) + n, err := sf.Writer.Write(formattedBuf) + if n != len(formattedBuf) { + return n, io.ErrShortWrite + } + return len(buf), err +} + +type StderrFormater struct { + io.Writer + *StreamFormatter +} + +func (sf *StderrFormater) Write(buf []byte) (int, error) { + formattedBuf := sf.StreamFormatter.FormatStream("\033[91m" + string(buf) + "\033[0m") + n, err := sf.Writer.Write(formattedBuf) + if n != len(formattedBuf) { + return n, io.ErrShortWrite + } + return len(buf), err +} diff --git a/streamformatter/streamformatter_test.go b/streamformatter/streamformatter_test.go new file mode 100644 index 0000000..edc432e --- /dev/null +++ b/streamformatter/streamformatter_test.go @@ -0,0 +1,68 @@ +package streamformatter + +import ( + "encoding/json" + "errors" + "github.com/docker/docker/pkg/jsonmessage" + "reflect" + "testing" +) + +func TestFormatStream(t *testing.T) { + sf := NewStreamFormatter(true) + res := sf.FormatStream("stream") + if string(res) != `{"stream":"stream"}`+"\r\n" { + t.Fatalf("%q", res) + } +} + +func TestFormatStatus(t *testing.T) { + sf := NewStreamFormatter(true) + res := sf.FormatStatus("ID", "%s%d", "a", 1) + if string(res) != `{"status":"a1","id":"ID"}`+"\r\n" { + t.Fatalf("%q", res) + } +} + +func TestFormatSimpleError(t *testing.T) { + sf := NewStreamFormatter(true) + res := sf.FormatError(errors.New("Error for formatter")) + if string(res) != `{"errorDetail":{"message":"Error for formatter"},"error":"Error for formatter"}`+"\r\n" { + t.Fatalf("%q", res) + } +} + +func TestFormatJSONError(t *testing.T) { + sf := NewStreamFormatter(true) + err := &jsonmessage.JSONError{Code: 50, Message: "Json error"} + res := sf.FormatError(err) + if string(res) != `{"errorDetail":{"code":50,"message":"Json error"},"error":"Json error"}`+"\r\n" { + t.Fatalf("%q", res) + } +} + +func TestFormatProgress(t *testing.T) { + sf := NewStreamFormatter(true) + progress := &jsonmessage.JSONProgress{ + Current: 15, + Total: 30, + Start: 1, + } + res := sf.FormatProgress("id", "action", progress) + msg := &jsonmessage.JSONMessage{} + if err := json.Unmarshal(res, msg); err != nil { + t.Fatal(err) + } + if msg.ID != "id" { + t.Fatalf("ID must be 'id', got: %s", msg.ID) + } + if msg.Status != "action" { + t.Fatalf("Status must be 'action', got: %s", msg.Status) + } + if msg.ProgressMessage != progress.String() { + t.Fatalf("ProgressMessage must be %s, got: %s", progress.String(), msg.ProgressMessage) + } + if !reflect.DeepEqual(msg.Progress, progress) { + t.Fatal("Original progress not equals progress from FormatProgress") + } +}