Merge pull request #8041 from unclejack/lower_allocations_broadcastwriter

lower the number of allocations in broadcastwriter
This commit is contained in:
Michael Crosby 2014-09-17 15:26:44 -07:00
commit ff8c7b20ac
4 changed files with 213 additions and 5 deletions

View file

@ -2,7 +2,6 @@ package broadcastwriter
import (
"bytes"
"encoding/json"
"io"
"sync"
"time"
@ -15,6 +14,7 @@ import (
type BroadcastWriter struct {
sync.Mutex
buf *bytes.Buffer
jsLogBuf *bytes.Buffer
streams map[string](map[io.WriteCloser]struct{})
}
@ -43,6 +43,10 @@ func (w *BroadcastWriter) Write(p []byte) (n int, err error) {
}
}
}
if w.jsLogBuf == nil {
w.jsLogBuf = new(bytes.Buffer)
w.jsLogBuf.Grow(1024)
}
w.buf.Write(p)
for {
line, err := w.buf.ReadString('\n')
@ -54,19 +58,23 @@ func (w *BroadcastWriter) Write(p []byte) (n int, err error) {
if stream == "" {
continue
}
b, err := json.Marshal(jsonlog.JSONLog{Log: line, Stream: stream, Created: created})
jsonLog := jsonlog.JSONLog{Log: line, Stream: stream, Created: created}
err = jsonLog.MarshalJSONBuf(w.jsLogBuf)
if err != nil {
log.Errorf("Error making JSON log line: %s", err)
continue
}
b = append(b, '\n')
w.jsLogBuf.WriteByte('\n')
b := w.jsLogBuf.Bytes()
for sw := range writers {
if _, err := sw.Write(b); err != nil {
delete(writers, sw)
}
}
}
w.jsLogBuf.Reset()
}
w.jsLogBuf.Reset()
w.Unlock()
return len(p), nil
}

View file

@ -0,0 +1,176 @@
// This code was initially generated by ffjson <https://github.com/pquerna/ffjson>
// This code was generated via the following steps:
// $ go get -u github.com/pquerna/ffjson
// $ make shell BINDDIR=.
// $ ffjson pkg/jsonlog/jsonlog.go
// $ mv pkg/jsonglog/jsonlog_ffjson.go pkg/jsonlog/jsonlog_marshalling.go
//
// It has been modified to improve the performance of time marshalling to JSON
// and to clean it up.
// Should this code need to be regenerated when the JSONLog struct is changed,
// the relevant changes which have been made are:
// import (
// "bytes"
//-
// "unicode/utf8"
//+
//+ "github.com/docker/docker/pkg/timeutils"
// )
//
// func (mj *JSONLog) MarshalJSON() ([]byte, error) {
//@@ -20,13 +16,13 @@ func (mj *JSONLog) MarshalJSON() ([]byte, error) {
// }
// return buf.Bytes(), nil
// }
//+
// func (mj *JSONLog) MarshalJSONBuf(buf *bytes.Buffer) error {
//- var err error
//- var obj []byte
//- var first bool = true
//- _ = obj
//- _ = err
//- _ = first
//+ var (
//+ err error
//+ timestamp string
//+ first bool = true
//+ )
// buf.WriteString(`{`)
// if len(mj.Log) != 0 {
// if first == true {
//@@ -52,11 +48,11 @@ func (mj *JSONLog) MarshalJSONBuf(buf *bytes.Buffer) error {
// buf.WriteString(`,`)
// }
// buf.WriteString(`"time":`)
//- obj, err = mj.Created.MarshalJSON()
//+ timestamp, err = timeutils.FastMarshalJSON(mj.Created)
// if err != nil {
// return err
// }
//- buf.Write(obj)
//+ buf.WriteString(timestamp)
// buf.WriteString(`}`)
// return nil
// }
package jsonlog
import (
"bytes"
"unicode/utf8"
"github.com/docker/docker/pkg/timeutils"
)
func (mj *JSONLog) MarshalJSON() ([]byte, error) {
var buf bytes.Buffer
buf.Grow(1024)
err := mj.MarshalJSONBuf(&buf)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (mj *JSONLog) MarshalJSONBuf(buf *bytes.Buffer) error {
var (
err error
timestamp string
first bool = true
)
buf.WriteString(`{`)
if len(mj.Log) != 0 {
if first == true {
first = false
} else {
buf.WriteString(`,`)
}
buf.WriteString(`"log":`)
ffjson_WriteJsonString(buf, mj.Log)
}
if len(mj.Stream) != 0 {
if first == true {
first = false
} else {
buf.WriteString(`,`)
}
buf.WriteString(`"stream":`)
ffjson_WriteJsonString(buf, mj.Stream)
}
if first == true {
first = false
} else {
buf.WriteString(`,`)
}
buf.WriteString(`"time":`)
timestamp, err = timeutils.FastMarshalJSON(mj.Created)
if err != nil {
return err
}
buf.WriteString(timestamp)
buf.WriteString(`}`)
return nil
}
func ffjson_WriteJsonString(buf *bytes.Buffer, s string) {
const hex = "0123456789abcdef"
buf.WriteByte('"')
start := 0
for i := 0; i < len(s); {
if b := s[i]; b < utf8.RuneSelf {
if 0x20 <= b && b != '\\' && b != '"' && b != '<' && b != '>' && b != '&' {
i++
continue
}
if start < i {
buf.WriteString(s[start:i])
}
switch b {
case '\\', '"':
buf.WriteByte('\\')
buf.WriteByte(b)
case '\n':
buf.WriteByte('\\')
buf.WriteByte('n')
case '\r':
buf.WriteByte('\\')
buf.WriteByte('r')
default:
buf.WriteString(`\u00`)
buf.WriteByte(hex[b>>4])
buf.WriteByte(hex[b&0xF])
}
i++
start = i
continue
}
c, size := utf8.DecodeRuneInString(s[i:])
if c == utf8.RuneError && size == 1 {
if start < i {
buf.WriteString(s[start:i])
}
buf.WriteString(`\ufffd`)
i += size
start = i
continue
}
if c == '\u2028' || c == '\u2029' {
if start < i {
buf.WriteString(s[start:i])
}
buf.WriteString(`\u202`)
buf.WriteByte(hex[c&0xF])
i += size
start = i
continue
}
i += size
}
if start < len(s) {
buf.WriteString(s[start:])
}
buf.WriteByte('"')
}

1
timeutils/MAINTAINERS Normal file
View file

@ -0,0 +1 @@
Cristian Staretu <cristian.staretu@gmail.com> (@unclejack)

23
timeutils/json.go Normal file
View file

@ -0,0 +1,23 @@
package timeutils
import (
"errors"
"time"
)
const (
// Define our own version of RFC339Nano because we want one
// that pads the nano seconds part with zeros to ensure
// the timestamps are aligned in the logs.
RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
JSONFormat = `"` + time.RFC3339Nano + `"`
)
func FastMarshalJSON(t time.Time) (string, error) {
if y := t.Year(); y < 0 || y >= 10000 {
// RFC 3339 is clear that years are 4 digits exactly.
// See golang.org/issue/4556#c15 for more discussion.
return "", errors.New("Time.MarshalJSON: year outside of range [0,9999]")
}
return t.Format(JSONFormat), nil
}