From 23b6435e6bb902fe67a20272fead5d73269373ab Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Mon, 30 Nov 2015 09:57:07 -0800 Subject: [PATCH] Optimize tar stream generation - New writeTo method allows to avoid creating extra pipe. - Copy with a pooled buffer instead of allocating new buffer for each file. - Avoid extra object allocations inside the loop. Signed-off-by: Tonis Tiigi --- tar/asm/assemble.go | 139 ++++++++++++++++++++++++++++++++------------ 1 file changed, 101 insertions(+), 38 deletions(-) diff --git a/tar/asm/assemble.go b/tar/asm/assemble.go index 83d6426..d624450 100644 --- a/tar/asm/assemble.go +++ b/tar/asm/assemble.go @@ -3,8 +3,10 @@ package asm import ( "bytes" "fmt" + "hash" "hash/crc64" "io" + "sync" "github.com/vbatts/tar-split/tar/storage" ) @@ -23,45 +25,106 @@ func NewOutputTarStream(fg storage.FileGetter, up storage.Unpacker) io.ReadClose } pr, pw := io.Pipe() go func() { - for { - entry, err := up.Next() - if err != nil { - pw.CloseWithError(err) - return - } - switch entry.Type { - case storage.SegmentType: - if _, err := pw.Write(entry.Payload); err != nil { - pw.CloseWithError(err) - return - } - case storage.FileType: - if entry.Size == 0 { - continue - } - fh, err := fg.Get(entry.GetName()) - if err != nil { - pw.CloseWithError(err) - return - } - c := crc64.New(storage.CRCTable) - tRdr := io.TeeReader(fh, c) - if _, err := io.Copy(pw, tRdr); err != nil { - fh.Close() - pw.CloseWithError(err) - return - } - if !bytes.Equal(c.Sum(nil), entry.Payload) { - // I would rather this be a comparable ErrInvalidChecksum or such, - // but since it's coming through the PipeReader, the context of - // _which_ file would be lost... - fh.Close() - pw.CloseWithError(fmt.Errorf("file integrity checksum failed for %q", entry.GetName())) - return - } - fh.Close() - } + err := WriteOutputTarStream(fg, up, pw) + if err != nil { + pw.CloseWithError(err) + } else { + pw.Close() } }() return pr } + +// WriteOutputTarStream writes assembled tar archive to a writer. +func WriteOutputTarStream(fg storage.FileGetter, up storage.Unpacker, w io.Writer) error { + // ... Since these are interfaces, this is possible, so let's not have a nil pointer + if fg == nil || up == nil { + return nil + } + var copyBuffer []byte + var crcHash hash.Hash + var crcSum []byte + var multiWriter io.Writer + for { + entry, err := up.Next() + if err != nil { + if err == io.EOF { + return nil + } + return err + } + switch entry.Type { + case storage.SegmentType: + if _, err := w.Write(entry.Payload); err != nil { + return err + } + case storage.FileType: + if entry.Size == 0 { + continue + } + fh, err := fg.Get(entry.GetName()) + if err != nil { + return err + } + if crcHash == nil { + crcHash = crc64.New(storage.CRCTable) + crcSum = make([]byte, 8) + multiWriter = io.MultiWriter(w, crcHash) + copyBuffer = byteBufferPool.Get().([]byte) + defer byteBufferPool.Put(copyBuffer) + } else { + crcHash.Reset() + } + + if _, err := copyWithBuffer(multiWriter, fh, copyBuffer); err != nil { + fh.Close() + return err + } + + if !bytes.Equal(crcHash.Sum(crcSum[:0]), entry.Payload) { + // I would rather this be a comparable ErrInvalidChecksum or such, + // but since it's coming through the PipeReader, the context of + // _which_ file would be lost... + fh.Close() + return fmt.Errorf("file integrity checksum failed for %q", entry.GetName()) + } + fh.Close() + } + } +} + +var byteBufferPool = &sync.Pool{ + New: func() interface{} { + return make([]byte, 32*1024) + }, +} + +// copyWithBuffer is taken from stdlib io.Copy implementation +// https://github.com/golang/go/blob/go1.5.1/src/io/io.go#L367 +func copyWithBuffer(dst io.Writer, src io.Reader, buf []byte) (written int64, err error) { + for { + nr, er := src.Read(buf) + if nr > 0 { + nw, ew := dst.Write(buf[0:nr]) + if nw > 0 { + written += int64(nw) + } + if ew != nil { + err = ew + break + } + if nr != nw { + err = io.ErrShortWrite + break + } + } + if er == io.EOF { + break + } + if er != nil { + err = er + break + } + } + return written, err +}