1
0
Fork 0
forked from mirrors/tar-split

Merge pull request #22 from tonistiigi/stream-opt

Optimize tar stream generation
This commit is contained in:
Vincent Batts 2015-12-02 14:09:08 -06:00
commit 1501fe6002
2 changed files with 106 additions and 61 deletions

View file

@ -3,8 +3,10 @@ package asm
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"hash"
"hash/crc64" "hash/crc64"
"io" "io"
"sync"
"github.com/vbatts/tar-split/tar/storage" "github.com/vbatts/tar-split/tar/storage"
) )
@ -23,17 +25,38 @@ func NewOutputTarStream(fg storage.FileGetter, up storage.Unpacker) io.ReadClose
} }
pr, pw := io.Pipe() pr, pw := io.Pipe()
go func() { go func() {
err := WriteOutputTarStream(fg, up, pw)
if err != nil {
pw.CloseWithError(err)
} else {
pw.Close()
}
}()
return pr
}
// WriteOutputTarStream writes assembled tar archive to a writer.
func WriteOutputTarStream(fg storage.FileGetter, up storage.Unpacker, w io.Writer) error {
// ... Since these are interfaces, this is possible, so let's not have a nil pointer
if fg == nil || up == nil {
return nil
}
var copyBuffer []byte
var crcHash hash.Hash
var crcSum []byte
var multiWriter io.Writer
for { for {
entry, err := up.Next() entry, err := up.Next()
if err != nil { if err != nil {
pw.CloseWithError(err) if err == io.EOF {
return return nil
}
return err
} }
switch entry.Type { switch entry.Type {
case storage.SegmentType: case storage.SegmentType:
if _, err := pw.Write(entry.Payload); err != nil { if _, err := w.Write(entry.Payload); err != nil {
pw.CloseWithError(err) return err
return
} }
case storage.FileType: case storage.FileType:
if entry.Size == 0 { if entry.Size == 0 {
@ -41,27 +64,67 @@ func NewOutputTarStream(fg storage.FileGetter, up storage.Unpacker) io.ReadClose
} }
fh, err := fg.Get(entry.GetName()) fh, err := fg.Get(entry.GetName())
if err != nil { if err != nil {
pw.CloseWithError(err) return err
return
} }
c := crc64.New(storage.CRCTable) if crcHash == nil {
tRdr := io.TeeReader(fh, c) crcHash = crc64.New(storage.CRCTable)
if _, err := io.Copy(pw, tRdr); err != nil { crcSum = make([]byte, 8)
multiWriter = io.MultiWriter(w, crcHash)
copyBuffer = byteBufferPool.Get().([]byte)
defer byteBufferPool.Put(copyBuffer)
} else {
crcHash.Reset()
}
if _, err := copyWithBuffer(multiWriter, fh, copyBuffer); err != nil {
fh.Close() fh.Close()
pw.CloseWithError(err) return err
return
} }
if !bytes.Equal(c.Sum(nil), entry.Payload) {
if !bytes.Equal(crcHash.Sum(crcSum[:0]), entry.Payload) {
// I would rather this be a comparable ErrInvalidChecksum or such, // I would rather this be a comparable ErrInvalidChecksum or such,
// but since it's coming through the PipeReader, the context of // but since it's coming through the PipeReader, the context of
// _which_ file would be lost... // _which_ file would be lost...
fh.Close() fh.Close()
pw.CloseWithError(fmt.Errorf("file integrity checksum failed for %q", entry.GetName())) return fmt.Errorf("file integrity checksum failed for %q", entry.GetName())
return
} }
fh.Close() fh.Close()
} }
} }
}() }
return pr
var byteBufferPool = &sync.Pool{
New: func() interface{} {
return make([]byte, 32*1024)
},
}
// copyWithBuffer is taken from stdlib io.Copy implementation
// https://github.com/golang/go/blob/go1.5.1/src/io/io.go#L367
func copyWithBuffer(dst io.Writer, src io.Reader, buf []byte) (written int64, err error) {
for {
nr, er := src.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
if nw > 0 {
written += int64(nw)
}
if ew != nil {
err = ew
break
}
if nr != nw {
err = io.ErrShortWrite
break
}
}
if er == io.EOF {
break
}
if er != nil {
err = er
break
}
}
return written, err
} }

View file

@ -1,7 +1,6 @@
package storage package storage
import ( import (
"bufio"
"encoding/json" "encoding/json"
"errors" "errors"
"io" "io"
@ -33,31 +32,15 @@ type PackUnpacker interface {
*/ */
type jsonUnpacker struct { type jsonUnpacker struct {
r io.Reader
b *bufio.Reader
isEOF bool
seen seenNames seen seenNames
dec *json.Decoder
} }
func (jup *jsonUnpacker) Next() (*Entry, error) { func (jup *jsonUnpacker) Next() (*Entry, error) {
var e Entry var e Entry
if jup.isEOF { err := jup.dec.Decode(&e)
// since ReadBytes() will return read bytes AND an EOF, we handle it this if err != nil {
// round-a-bout way so we can Unmarshal the tail with relevant errors, but
// still get an io.EOF when the stream is ended.
return nil, io.EOF
}
line, err := jup.b.ReadBytes('\n')
if err != nil && err != io.EOF {
return nil, err return nil, err
} else if err == io.EOF {
jup.isEOF = true
}
err = json.Unmarshal(line, &e)
if err != nil && jup.isEOF {
// if the remainder actually _wasn't_ a remaining json structure, then just EOF
return nil, io.EOF
} }
// check for dup name // check for dup name
@ -78,8 +61,7 @@ func (jup *jsonUnpacker) Next() (*Entry, error) {
// Each Entry read are expected to be delimited by new line. // Each Entry read are expected to be delimited by new line.
func NewJSONUnpacker(r io.Reader) Unpacker { func NewJSONUnpacker(r io.Reader) Unpacker {
return &jsonUnpacker{ return &jsonUnpacker{
r: r, dec: json.NewDecoder(r),
b: bufio.NewReader(r),
seen: seenNames{}, seen: seenNames{},
} }
} }