diff --git a/broadcastwriter/broadcastwriter.go b/broadcastwriter/broadcastwriter.go index bd9b675..5d53658 100644 --- a/broadcastwriter/broadcastwriter.go +++ b/broadcastwriter/broadcastwriter.go @@ -1,33 +1,20 @@ package broadcastwriter import ( - "bytes" "io" "sync" - "time" - - "github.com/Sirupsen/logrus" - "github.com/docker/docker/pkg/jsonlog" - "github.com/docker/docker/pkg/timeutils" ) // BroadcastWriter accumulate multiple io.WriteCloser by stream. type BroadcastWriter struct { sync.Mutex - buf *bytes.Buffer - jsLogBuf *bytes.Buffer - streams map[string](map[io.WriteCloser]struct{}) + writers map[io.WriteCloser]struct{} } -// AddWriter adds new io.WriteCloser for stream. -// If stream is "", then all writes proceed as is. Otherwise every line from -// input will be packed to serialized jsonlog.JSONLog. -func (w *BroadcastWriter) AddWriter(writer io.WriteCloser, stream string) { +// AddWriter adds new io.WriteCloser. +func (w *BroadcastWriter) AddWriter(writer io.WriteCloser) { w.Lock() - if _, ok := w.streams[stream]; !ok { - w.streams[stream] = make(map[io.WriteCloser]struct{}) - } - w.streams[stream][writer] = struct{}{} + w.writers[writer] = struct{}{} w.Unlock() } @@ -35,67 +22,12 @@ func (w *BroadcastWriter) AddWriter(writer io.WriteCloser, stream string) { // this call. func (w *BroadcastWriter) Write(p []byte) (n int, err error) { w.Lock() - if writers, ok := w.streams[""]; ok { - for sw := range writers { - if n, err := sw.Write(p); err != nil || n != len(p) { - // On error, evict the writer - delete(writers, sw) - } - } - if len(w.streams) == 1 { - if w.buf.Len() >= 4096 { - w.buf.Reset() - } else { - w.buf.Write(p) - } - w.Unlock() - return len(p), nil + for sw := range w.writers { + if n, err := sw.Write(p); err != nil || n != len(p) { + // On error, evict the writer + delete(w.writers, sw) } } - if w.jsLogBuf == nil { - w.jsLogBuf = new(bytes.Buffer) - w.jsLogBuf.Grow(1024) - } - var timestamp string - created := time.Now().UTC() - w.buf.Write(p) - for { - if n := w.buf.Len(); n == 0 { - break - } - i := bytes.IndexByte(w.buf.Bytes(), '\n') - if i < 0 { - break - } - lineBytes := w.buf.Next(i + 1) - if timestamp == "" { - timestamp, err = timeutils.FastMarshalJSON(created) - if err != nil { - continue - } - } - - for stream, writers := range w.streams { - if stream == "" { - continue - } - jsonLog := jsonlog.JSONLogBytes{Log: lineBytes, Stream: stream, Created: timestamp} - err = jsonLog.MarshalJSONBuf(w.jsLogBuf) - if err != nil { - logrus.Errorf("Error making JSON log line: %s", err) - continue - } - w.jsLogBuf.WriteByte('\n') - b := w.jsLogBuf.Bytes() - for sw := range writers { - if _, err := sw.Write(b); err != nil { - delete(writers, sw) - } - } - } - w.jsLogBuf.Reset() - } - w.jsLogBuf.Reset() w.Unlock() return len(p), nil } @@ -104,19 +36,16 @@ func (w *BroadcastWriter) Write(p []byte) (n int, err error) { // will be saved. func (w *BroadcastWriter) Clean() error { w.Lock() - for _, writers := range w.streams { - for w := range writers { - w.Close() - } + for w := range w.writers { + w.Close() } - w.streams = make(map[string](map[io.WriteCloser]struct{})) + w.writers = make(map[io.WriteCloser]struct{}) w.Unlock() return nil } func New() *BroadcastWriter { return &BroadcastWriter{ - streams: make(map[string](map[io.WriteCloser]struct{})), - buf: bytes.NewBuffer(nil), + writers: make(map[io.WriteCloser]struct{}), } } diff --git a/broadcastwriter/broadcastwriter_test.go b/broadcastwriter/broadcastwriter_test.go index 7122782..bc24320 100644 --- a/broadcastwriter/broadcastwriter_test.go +++ b/broadcastwriter/broadcastwriter_test.go @@ -32,9 +32,9 @@ func TestBroadcastWriter(t *testing.T) { // Test 1: Both bufferA and bufferB should contain "foo" bufferA := &dummyWriter{} - writer.AddWriter(bufferA, "") + writer.AddWriter(bufferA) bufferB := &dummyWriter{} - writer.AddWriter(bufferB, "") + writer.AddWriter(bufferB) writer.Write([]byte("foo")) if bufferA.String() != "foo" { @@ -48,7 +48,7 @@ func TestBroadcastWriter(t *testing.T) { // Test2: bufferA and bufferB should contain "foobar", // while bufferC should only contain "bar" bufferC := &dummyWriter{} - writer.AddWriter(bufferC, "") + writer.AddWriter(bufferC) writer.Write([]byte("bar")) if bufferA.String() != "foobar" { @@ -100,7 +100,7 @@ func TestRaceBroadcastWriter(t *testing.T) { writer := New() c := make(chan bool) go func() { - writer.AddWriter(devNullCloser(0), "") + writer.AddWriter(devNullCloser(0)) c <- true }() writer.Write([]byte("hello")) @@ -111,9 +111,9 @@ func BenchmarkBroadcastWriter(b *testing.B) { writer := New() setUpWriter := func() { for i := 0; i < 100; i++ { - writer.AddWriter(devNullCloser(0), "stdout") - writer.AddWriter(devNullCloser(0), "stderr") - writer.AddWriter(devNullCloser(0), "") + writer.AddWriter(devNullCloser(0)) + writer.AddWriter(devNullCloser(0)) + writer.AddWriter(devNullCloser(0)) } } testLine := "Line that thinks that it is log line from docker" @@ -142,33 +142,3 @@ func BenchmarkBroadcastWriter(b *testing.B) { b.StartTimer() } } - -func BenchmarkBroadcastWriterWithoutStdoutStderr(b *testing.B) { - writer := New() - setUpWriter := func() { - for i := 0; i < 100; i++ { - writer.AddWriter(devNullCloser(0), "") - } - } - testLine := "Line that thinks that it is log line from docker" - var buf bytes.Buffer - for i := 0; i < 100; i++ { - buf.Write([]byte(testLine + "\n")) - } - // line without eol - buf.Write([]byte(testLine)) - testText := buf.Bytes() - b.SetBytes(int64(5 * len(testText))) - b.ResetTimer() - for i := 0; i < b.N; i++ { - setUpWriter() - - for j := 0; j < 5; j++ { - if _, err := writer.Write(testText); err != nil { - b.Fatal(err) - } - } - - writer.Clean() - } -} diff --git a/jsonlog/jsonlog.go b/jsonlog/jsonlog.go index 85afb3b..edcf764 100644 --- a/jsonlog/jsonlog.go +++ b/jsonlog/jsonlog.go @@ -3,7 +3,6 @@ package jsonlog import ( "encoding/json" "fmt" - "io" "time" ) @@ -29,28 +28,3 @@ func (jl *JSONLog) Reset() { jl.Stream = "" jl.Created = time.Time{} } - -func WriteLog(src io.Reader, dst io.Writer, format string, since time.Time) error { - dec := json.NewDecoder(src) - l := &JSONLog{} - for { - l.Reset() - if err := dec.Decode(l); err != nil { - if err == io.EOF { - return nil - } - return err - } - if !since.IsZero() && l.Created.Before(since) { - continue - } - - line, err := l.Format(format) - if err != nil { - return err - } - if _, err := io.WriteString(dst, line); err != nil { - return err - } - } -} diff --git a/jsonlog/jsonlog_test.go b/jsonlog/jsonlog_test.go deleted file mode 100644 index 2b787ef..0000000 --- a/jsonlog/jsonlog_test.go +++ /dev/null @@ -1,157 +0,0 @@ -package jsonlog - -import ( - "bytes" - "encoding/json" - "io/ioutil" - "regexp" - "strconv" - "strings" - "testing" - "time" - - "github.com/docker/docker/pkg/timeutils" -) - -// Invalid json should return an error -func TestWriteLogWithInvalidJSON(t *testing.T) { - json := strings.NewReader("Invalid json") - w := bytes.NewBuffer(nil) - if err := WriteLog(json, w, "json", time.Time{}); err == nil { - t.Fatalf("Expected an error, got [%v]", w.String()) - } -} - -// Any format is valid, it will just print it -func TestWriteLogWithInvalidFormat(t *testing.T) { - testLine := "Line that thinks that it is log line from docker\n" - var buf bytes.Buffer - e := json.NewEncoder(&buf) - for i := 0; i < 35; i++ { - e.Encode(JSONLog{Log: testLine, Stream: "stdout", Created: time.Now()}) - } - w := bytes.NewBuffer(nil) - if err := WriteLog(&buf, w, "invalid format", time.Time{}); err != nil { - t.Fatal(err) - } - res := w.String() - t.Logf("Result of WriteLog: %q", res) - lines := strings.Split(strings.TrimSpace(res), "\n") - expression := "^invalid format Line that thinks that it is log line from docker$" - logRe := regexp.MustCompile(expression) - expectedLines := 35 - if len(lines) != expectedLines { - t.Fatalf("Must be %v lines but got %d", expectedLines, len(lines)) - } - for _, l := range lines { - if !logRe.MatchString(l) { - t.Fatalf("Log line not in expected format [%v]: %q", expression, l) - } - } -} - -// Having multiple Log/Stream element -func TestWriteLogWithMultipleStreamLog(t *testing.T) { - testLine := "Line that thinks that it is log line from docker\n" - var buf bytes.Buffer - e := json.NewEncoder(&buf) - for i := 0; i < 35; i++ { - e.Encode(JSONLog{Log: testLine, Stream: "stdout", Created: time.Now()}) - } - w := bytes.NewBuffer(nil) - if err := WriteLog(&buf, w, "invalid format", time.Time{}); err != nil { - t.Fatal(err) - } - res := w.String() - t.Logf("Result of WriteLog: %q", res) - lines := strings.Split(strings.TrimSpace(res), "\n") - expression := "^invalid format Line that thinks that it is log line from docker$" - logRe := regexp.MustCompile(expression) - expectedLines := 35 - if len(lines) != expectedLines { - t.Fatalf("Must be %v lines but got %d", expectedLines, len(lines)) - } - for _, l := range lines { - if !logRe.MatchString(l) { - t.Fatalf("Log line not in expected format [%v]: %q", expression, l) - } - } -} - -// Write log with since after created, it won't print anything -func TestWriteLogWithDate(t *testing.T) { - created, _ := time.Parse("YYYY-MM-dd", "2015-01-01") - var buf bytes.Buffer - testLine := "Line that thinks that it is log line from docker\n" - jsonLog := JSONLog{Log: testLine, Stream: "stdout", Created: created} - if err := json.NewEncoder(&buf).Encode(jsonLog); err != nil { - t.Fatal(err) - } - w := bytes.NewBuffer(nil) - if err := WriteLog(&buf, w, "json", time.Now()); err != nil { - t.Fatal(err) - } - res := w.String() - if res != "" { - t.Fatalf("Expected empty log, got [%v]", res) - } -} - -// Happy path :) -func TestWriteLog(t *testing.T) { - testLine := "Line that thinks that it is log line from docker\n" - format := timeutils.RFC3339NanoFixed - logs := map[string][]string{ - "": {"35", "^Line that thinks that it is log line from docker$"}, - "json": {"1", `^{\"log\":\"Line that thinks that it is log line from docker\\n\",\"stream\":\"stdout\",\"time\":.{30,}\"}$`}, - // 30+ symbols, five more can come from system timezone - format: {"35", `.{30,} Line that thinks that it is log line from docker`}, - } - for givenFormat, expressionAndLines := range logs { - expectedLines, _ := strconv.Atoi(expressionAndLines[0]) - expression := expressionAndLines[1] - var buf bytes.Buffer - e := json.NewEncoder(&buf) - for i := 0; i < 35; i++ { - e.Encode(JSONLog{Log: testLine, Stream: "stdout", Created: time.Now()}) - } - w := bytes.NewBuffer(nil) - if err := WriteLog(&buf, w, givenFormat, time.Time{}); err != nil { - t.Fatal(err) - } - res := w.String() - t.Logf("Result of WriteLog: %q", res) - lines := strings.Split(strings.TrimSpace(res), "\n") - if len(lines) != expectedLines { - t.Fatalf("Must be %v lines but got %d", expectedLines, len(lines)) - } - logRe := regexp.MustCompile(expression) - for _, l := range lines { - if !logRe.MatchString(l) { - t.Fatalf("Log line not in expected format [%v]: %q", expression, l) - } - } - } -} - -func BenchmarkWriteLog(b *testing.B) { - var buf bytes.Buffer - e := json.NewEncoder(&buf) - testLine := "Line that thinks that it is log line from docker\n" - for i := 0; i < 30; i++ { - e.Encode(JSONLog{Log: testLine, Stream: "stdout", Created: time.Now()}) - } - r := bytes.NewReader(buf.Bytes()) - w := ioutil.Discard - format := timeutils.RFC3339NanoFixed - b.SetBytes(int64(r.Len())) - b.ResetTimer() - for i := 0; i < b.N; i++ { - if err := WriteLog(r, w, format, time.Time{}); err != nil { - b.Fatal(err) - } - b.StopTimer() - r.Seek(0, 0) - b.StartTimer() - } -}