From 8b20f9161d2cd89438fd90a228464d545647a237 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Mon, 30 Nov 2015 09:52:44 -0800 Subject: [PATCH 1/2] Optimize JSON decoding This allows to avoid extra allocations on `ReadBytes` and decoding buffers. Signed-off-by: Tonis Tiigi --- tar/storage/packer.go | 28 +++++----------------------- 1 file changed, 5 insertions(+), 23 deletions(-) diff --git a/tar/storage/packer.go b/tar/storage/packer.go index 0c9d99b..aba6948 100644 --- a/tar/storage/packer.go +++ b/tar/storage/packer.go @@ -1,7 +1,6 @@ package storage import ( - "bufio" "encoding/json" "errors" "io" @@ -33,31 +32,15 @@ type PackUnpacker interface { */ type jsonUnpacker struct { - r io.Reader - b *bufio.Reader - isEOF bool - seen seenNames + seen seenNames + dec *json.Decoder } func (jup *jsonUnpacker) Next() (*Entry, error) { var e Entry - if jup.isEOF { - // since ReadBytes() will return read bytes AND an EOF, we handle it this - // round-a-bout way so we can Unmarshal the tail with relevant errors, but - // still get an io.EOF when the stream is ended. - return nil, io.EOF - } - line, err := jup.b.ReadBytes('\n') - if err != nil && err != io.EOF { + err := jup.dec.Decode(&e) + if err != nil { return nil, err - } else if err == io.EOF { - jup.isEOF = true - } - - err = json.Unmarshal(line, &e) - if err != nil && jup.isEOF { - // if the remainder actually _wasn't_ a remaining json structure, then just EOF - return nil, io.EOF } // check for dup name @@ -78,8 +61,7 @@ func (jup *jsonUnpacker) Next() (*Entry, error) { // Each Entry read are expected to be delimited by new line. func NewJSONUnpacker(r io.Reader) Unpacker { return &jsonUnpacker{ - r: r, - b: bufio.NewReader(r), + dec: json.NewDecoder(r), seen: seenNames{}, } } From 23b6435e6bb902fe67a20272fead5d73269373ab Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Mon, 30 Nov 2015 09:57:07 -0800 Subject: [PATCH 2/2] Optimize tar stream generation - New writeTo method allows to avoid creating extra pipe. - Copy with a pooled buffer instead of allocating new buffer for each file. - Avoid extra object allocations inside the loop. Signed-off-by: Tonis Tiigi --- tar/asm/assemble.go | 139 ++++++++++++++++++++++++++++++++------------ 1 file changed, 101 insertions(+), 38 deletions(-) diff --git a/tar/asm/assemble.go b/tar/asm/assemble.go index 83d6426..d624450 100644 --- a/tar/asm/assemble.go +++ b/tar/asm/assemble.go @@ -3,8 +3,10 @@ package asm import ( "bytes" "fmt" + "hash" "hash/crc64" "io" + "sync" "github.com/vbatts/tar-split/tar/storage" ) @@ -23,45 +25,106 @@ func NewOutputTarStream(fg storage.FileGetter, up storage.Unpacker) io.ReadClose } pr, pw := io.Pipe() go func() { - for { - entry, err := up.Next() - if err != nil { - pw.CloseWithError(err) - return - } - switch entry.Type { - case storage.SegmentType: - if _, err := pw.Write(entry.Payload); err != nil { - pw.CloseWithError(err) - return - } - case storage.FileType: - if entry.Size == 0 { - continue - } - fh, err := fg.Get(entry.GetName()) - if err != nil { - pw.CloseWithError(err) - return - } - c := crc64.New(storage.CRCTable) - tRdr := io.TeeReader(fh, c) - if _, err := io.Copy(pw, tRdr); err != nil { - fh.Close() - pw.CloseWithError(err) - return - } - if !bytes.Equal(c.Sum(nil), entry.Payload) { - // I would rather this be a comparable ErrInvalidChecksum or such, - // but since it's coming through the PipeReader, the context of - // _which_ file would be lost... - fh.Close() - pw.CloseWithError(fmt.Errorf("file integrity checksum failed for %q", entry.GetName())) - return - } - fh.Close() - } + err := WriteOutputTarStream(fg, up, pw) + if err != nil { + pw.CloseWithError(err) + } else { + pw.Close() } }() return pr } + +// WriteOutputTarStream writes assembled tar archive to a writer. +func WriteOutputTarStream(fg storage.FileGetter, up storage.Unpacker, w io.Writer) error { + // ... Since these are interfaces, this is possible, so let's not have a nil pointer + if fg == nil || up == nil { + return nil + } + var copyBuffer []byte + var crcHash hash.Hash + var crcSum []byte + var multiWriter io.Writer + for { + entry, err := up.Next() + if err != nil { + if err == io.EOF { + return nil + } + return err + } + switch entry.Type { + case storage.SegmentType: + if _, err := w.Write(entry.Payload); err != nil { + return err + } + case storage.FileType: + if entry.Size == 0 { + continue + } + fh, err := fg.Get(entry.GetName()) + if err != nil { + return err + } + if crcHash == nil { + crcHash = crc64.New(storage.CRCTable) + crcSum = make([]byte, 8) + multiWriter = io.MultiWriter(w, crcHash) + copyBuffer = byteBufferPool.Get().([]byte) + defer byteBufferPool.Put(copyBuffer) + } else { + crcHash.Reset() + } + + if _, err := copyWithBuffer(multiWriter, fh, copyBuffer); err != nil { + fh.Close() + return err + } + + if !bytes.Equal(crcHash.Sum(crcSum[:0]), entry.Payload) { + // I would rather this be a comparable ErrInvalidChecksum or such, + // but since it's coming through the PipeReader, the context of + // _which_ file would be lost... + fh.Close() + return fmt.Errorf("file integrity checksum failed for %q", entry.GetName()) + } + fh.Close() + } + } +} + +var byteBufferPool = &sync.Pool{ + New: func() interface{} { + return make([]byte, 32*1024) + }, +} + +// copyWithBuffer is taken from stdlib io.Copy implementation +// https://github.com/golang/go/blob/go1.5.1/src/io/io.go#L367 +func copyWithBuffer(dst io.Writer, src io.Reader, buf []byte) (written int64, err error) { + for { + nr, er := src.Read(buf) + if nr > 0 { + nw, ew := dst.Write(buf[0:nr]) + if nw > 0 { + written += int64(nw) + } + if ew != nil { + err = ew + break + } + if nr != nw { + err = io.ErrShortWrite + break + } + } + if er == io.EOF { + break + } + if er != nil { + err = er + break + } + } + return written, err +}