forked from mirrors/tar-split
storage: working on packing and unpacking
This commit is contained in:
parent
a4fa9207cd
commit
b1284905d3
6 changed files with 308 additions and 0 deletions
36
DESIGN.md
Normal file
36
DESIGN.md
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
Flow of TAR stream
|
||||||
|
==================
|
||||||
|
|
||||||
|
The underlying use of `github.com/vbatts/tar-split/archive/tar` is most similar
|
||||||
|
to stdlib.
|
||||||
|
|
||||||
|
|
||||||
|
Packer interface
|
||||||
|
----------------
|
||||||
|
|
||||||
|
For ease of storage and usage of the raw bytes, there will be a storage
|
||||||
|
interface, that accepts an io.Writer (This way you could pass it an in memory
|
||||||
|
buffer or a file handle).
|
||||||
|
|
||||||
|
Having a Packer interface can allow configuration of hash.Hash for file payloads
|
||||||
|
and providing your own io.Writer.
|
||||||
|
|
||||||
|
Instead of having a state directory to store all the header information for all
|
||||||
|
Readers, we will leave that up to user of Reader. Because we can not assume an
|
||||||
|
ID for each Reader, and keeping that information differentiated.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
State Directory
|
||||||
|
---------------
|
||||||
|
|
||||||
|
Perhaps we could deduplicate the header info, by hashing the rawbytes and
|
||||||
|
storing them in a directory tree like:
|
||||||
|
|
||||||
|
./ac/dc/beef
|
||||||
|
|
||||||
|
Then reference the hash of the header info, in the positional records for the
|
||||||
|
tar stream. Though this could be a future feature, and not required for an
|
||||||
|
initial implementation. Also, this would imply an owned state directory, rather
|
||||||
|
than just writing storage info to an io.Writer.
|
||||||
|
|
31
tar/storage/entry.go
Normal file
31
tar/storage/entry.go
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
// Entries is for sorting by Position
|
||||||
|
type Entries []Entry
|
||||||
|
|
||||||
|
func (e Entries) Len() int { return len(e) }
|
||||||
|
func (e Entries) Swap(i, j int) { e[i], e[j] = e[j], e[i] }
|
||||||
|
func (e Entries) Less(i, j int) bool { return e[i].Position < e[j].Position }
|
||||||
|
|
||||||
|
type Type int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// FileType represents a file payload from the tar stream.
|
||||||
|
//
|
||||||
|
// This will be used to map to relative paths on disk. Only Size > 0 will get
|
||||||
|
// read into a resulting output stream (due to hardlinks).
|
||||||
|
FileType Type = 1 + iota
|
||||||
|
// SegmentType represents a raw bytes segment from the archive stream. These raw
|
||||||
|
// byte segments consist of the raw headers and various padding.
|
||||||
|
//
|
||||||
|
// It's payload is to be marshalled base64 encoded.
|
||||||
|
SegmentType
|
||||||
|
)
|
||||||
|
|
||||||
|
type Entry struct {
|
||||||
|
Type Type `json:"type"`
|
||||||
|
Name string `json:"name",omitempty`
|
||||||
|
Size int64 `json:"size",omitempty`
|
||||||
|
Payload []byte `json:"payload"` // SegmentType store payload here; FileType store checksum here;
|
||||||
|
Position int `json:"position"`
|
||||||
|
}
|
66
tar/storage/entry_test.go
Normal file
66
tar/storage/entry_test.go
Normal file
|
@ -0,0 +1,66 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"sort"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestEntries(t *testing.T) {
|
||||||
|
e := Entries{
|
||||||
|
Entry{
|
||||||
|
Type: SegmentType,
|
||||||
|
Payload: []byte("y'all"),
|
||||||
|
Position: 1,
|
||||||
|
},
|
||||||
|
Entry{
|
||||||
|
Type: SegmentType,
|
||||||
|
Payload: []byte("doin"),
|
||||||
|
Position: 3,
|
||||||
|
},
|
||||||
|
Entry{
|
||||||
|
Type: FileType,
|
||||||
|
Name: "./hurr.txt",
|
||||||
|
Payload: []byte("deadbeef"),
|
||||||
|
Position: 2,
|
||||||
|
},
|
||||||
|
Entry{
|
||||||
|
Type: SegmentType,
|
||||||
|
Payload: []byte("how"),
|
||||||
|
Position: 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
sort.Sort(e)
|
||||||
|
if e[0].Position != 0 {
|
||||||
|
t.Errorf("expected Position 0, but got %d", e[0].Position)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFile(t *testing.T) {
|
||||||
|
f := Entry{
|
||||||
|
Type: FileType,
|
||||||
|
Name: "./hello.txt",
|
||||||
|
Size: 100,
|
||||||
|
Position: 2,
|
||||||
|
}
|
||||||
|
|
||||||
|
buf, err := json.Marshal(f)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
f1 := Entry{}
|
||||||
|
if err = json.Unmarshal(buf, &f1); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if f.Name != f1.Name {
|
||||||
|
t.Errorf("expected Name %q, got %q", f.Name, f1.Name)
|
||||||
|
}
|
||||||
|
if f.Size != f1.Size {
|
||||||
|
t.Errorf("expected Size %q, got %q", f.Size, f1.Size)
|
||||||
|
}
|
||||||
|
if f.Position != f1.Position {
|
||||||
|
t.Errorf("expected Position %q, got %q", f.Position, f1.Position)
|
||||||
|
}
|
||||||
|
}
|
87
tar/storage/packer.go
Normal file
87
tar/storage/packer.go
Normal file
|
@ -0,0 +1,87 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"encoding/json"
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Packer interface {
|
||||||
|
// AddSegment packs the segment bytes provided and returns the position of
|
||||||
|
// the entry
|
||||||
|
//AddSegment([]byte) (int, error)
|
||||||
|
// AddFile packs the File provided and returns the position of the entry. The
|
||||||
|
// Position is set in the stored File.
|
||||||
|
//AddFile(File) (int, error)
|
||||||
|
|
||||||
|
//
|
||||||
|
AddEntry(e Entry) (int, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Unpacker interface {
|
||||||
|
Next() (*Entry, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type PackUnpacker interface {
|
||||||
|
Packer
|
||||||
|
Unpacker
|
||||||
|
}
|
||||||
|
|
||||||
|
type jsonUnpacker struct {
|
||||||
|
r io.Reader
|
||||||
|
b *bufio.Reader
|
||||||
|
isEOF bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (jup *jsonUnpacker) Next() (*Entry, error) {
|
||||||
|
var e Entry
|
||||||
|
if jup.isEOF {
|
||||||
|
// since ReadBytes() will return read bytes AND an EOF, we handle it this
|
||||||
|
// round-a-bout way so we can Unmarshal the tail with relevant errors, but
|
||||||
|
// still get an io.EOF when the stream is ended.
|
||||||
|
return nil, io.EOF
|
||||||
|
}
|
||||||
|
line, err := jup.b.ReadBytes('\n')
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
return nil, err
|
||||||
|
} else if err == io.EOF {
|
||||||
|
jup.isEOF = true
|
||||||
|
}
|
||||||
|
err = json.Unmarshal(line, &e)
|
||||||
|
if err != nil && jup.isEOF {
|
||||||
|
// if the remainder actually _wasn't_ a remaining json structure, then just EOF
|
||||||
|
return nil, io.EOF
|
||||||
|
}
|
||||||
|
return &e, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// jsonUnpacker writes each entry (SegmentType and FileType) as a json document.
|
||||||
|
// Each entry on a new line.
|
||||||
|
func NewJsonUnpacker(r io.Reader) Unpacker {
|
||||||
|
return &jsonUnpacker{
|
||||||
|
r: r,
|
||||||
|
b: bufio.NewReader(r),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type jsonPacker struct {
|
||||||
|
w io.Writer
|
||||||
|
e *json.Encoder
|
||||||
|
pos int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (jp *jsonPacker) AddEntry(e Entry) (int, error) {
|
||||||
|
e.Position = jp.pos
|
||||||
|
err := jp.e.Encode(e)
|
||||||
|
if err == nil {
|
||||||
|
jp.pos++
|
||||||
|
}
|
||||||
|
return e.Position, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewJsonPacker(w io.Writer) Packer {
|
||||||
|
return &jsonPacker{
|
||||||
|
w: w,
|
||||||
|
e: json.NewEncoder(w),
|
||||||
|
}
|
||||||
|
}
|
58
tar/storage/packer_test.go
Normal file
58
tar/storage/packer_test.go
Normal file
|
@ -0,0 +1,58 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestJsonPackerUnpacker(t *testing.T) {
|
||||||
|
e := []Entry{
|
||||||
|
Entry{
|
||||||
|
Type: SegmentType,
|
||||||
|
Payload: []byte("how"),
|
||||||
|
},
|
||||||
|
Entry{
|
||||||
|
Type: SegmentType,
|
||||||
|
Payload: []byte("y'all"),
|
||||||
|
},
|
||||||
|
Entry{
|
||||||
|
Type: FileType,
|
||||||
|
Name: "./hurr.txt",
|
||||||
|
Payload: []byte("deadbeef"),
|
||||||
|
},
|
||||||
|
Entry{
|
||||||
|
Type: SegmentType,
|
||||||
|
Payload: []byte("doin"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := []byte{}
|
||||||
|
b := bytes.NewBuffer(buf)
|
||||||
|
|
||||||
|
func() {
|
||||||
|
jp := NewJsonPacker(b)
|
||||||
|
for i := range e {
|
||||||
|
if _, err := jp.AddEntry(e[i]); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
t.Errorf("%#v", b.String())
|
||||||
|
b = bytes.NewBuffer(b.Bytes())
|
||||||
|
func() {
|
||||||
|
jup := NewJsonUnpacker(b)
|
||||||
|
for {
|
||||||
|
entry, err := jup.Next()
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
t.Errorf("%#v", entry)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
}
|
30
tar/storage/reader.go
Normal file
30
tar/storage/reader.go
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/vbatts/tar-split/archive/tar"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewReader(r io.Reader, p Packer) *Reader {
|
||||||
|
return &Reader{
|
||||||
|
tr: tar.NewReader(r),
|
||||||
|
p: p,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reader resembles the tar.Reader struct, and is handled the same. Though it
|
||||||
|
// takes an Packer which write the stored records and file info
|
||||||
|
type Reader struct {
|
||||||
|
tr *tar.Reader
|
||||||
|
p Packer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Reader) Next() (*tar.Header, error) {
|
||||||
|
// TODO read RawBytes
|
||||||
|
return r.tr.Next()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Reader) Read(b []byte) (i int, e error) {
|
||||||
|
return r.tr.Read(b)
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue