From 09522d8535d6a3b0f2d7c1bd446ceec919e71da0 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Thu, 8 Jan 2015 11:40:41 -0800 Subject: [PATCH] Implement a remote file writer for use with StorageDriver This changeset implements a fileWriter type that can be used to managed writes to remote files in a StorageDriver. Basically, it manages a local seek position for a remote path. An efficient use of this implementation will write data in large blocks. Signed-off-by: Stephen J Day --- storage/filewriter.go | 153 +++++++++++++++++++++++++++++++++++++ storage/filewriter_test.go | 148 +++++++++++++++++++++++++++++++++++ 2 files changed, 301 insertions(+) create mode 100644 storage/filewriter.go create mode 100644 storage/filewriter_test.go diff --git a/storage/filewriter.go b/storage/filewriter.go new file mode 100644 index 00000000..cfa7c93d --- /dev/null +++ b/storage/filewriter.go @@ -0,0 +1,153 @@ +package storage + +import ( + "bytes" + "fmt" + "io" + "os" + + "github.com/docker/distribution/storagedriver" +) + +// fileWriter implements a remote file writer backed by a storage driver. +type fileWriter struct { + driver storagedriver.StorageDriver + + // identifying fields + path string + + // mutable fields + size int64 // size of the file, aka the current end + offset int64 // offset is the current write offset + err error // terminal error, if set, reader is closed +} + +// fileWriterInterface makes the desired io compliant interface that the +// filewriter should implement. +type fileWriterInterface interface { + io.WriteSeeker + io.WriterAt + io.ReaderFrom + io.Closer +} + +var _ fileWriterInterface = &fileWriter{} + +// newFileWriter returns a prepared fileWriter for the driver and path. This +// could be considered similar to an "open" call on a regular filesystem. +func newFileWriter(driver storagedriver.StorageDriver, path string) (*fileWriter, error) { + fw := fileWriter{ + driver: driver, + path: path, + } + + if fi, err := driver.Stat(path); err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + // ignore, offset is zero + default: + return nil, err + } + } else { + if fi.IsDir() { + return nil, fmt.Errorf("cannot write to a directory") + } + + fw.size = fi.Size() + } + + return &fw, nil +} + +// Write writes the buffer p at the current write offset. +func (fw *fileWriter) Write(p []byte) (n int, err error) { + nn, err := fw.readFromAt(bytes.NewReader(p), -1) + return int(nn), err +} + +// WriteAt writes p at the specified offset. The underlying offset does not +// change. +func (fw *fileWriter) WriteAt(p []byte, offset int64) (n int, err error) { + nn, err := fw.readFromAt(bytes.NewReader(p), offset) + return int(nn), err +} + +// ReadFrom reads reader r until io.EOF writing the contents at the current +// offset. +func (fw *fileWriter) ReadFrom(r io.Reader) (n int64, err error) { + return fw.readFromAt(r, -1) +} + +// Seek moves the write position do the requested offest based on the whence +// argument, which can be os.SEEK_CUR, os.SEEK_END, or os.SEEK_SET. +func (fw *fileWriter) Seek(offset int64, whence int) (int64, error) { + if fw.err != nil { + return 0, fw.err + } + + var err error + newOffset := fw.offset + + switch whence { + case os.SEEK_CUR: + newOffset += int64(offset) + case os.SEEK_END: + newOffset = fw.size + int64(offset) + case os.SEEK_SET: + newOffset = int64(offset) + } + + if newOffset < 0 { + err = fmt.Errorf("cannot seek to negative position") + } else if newOffset > fw.size { + fw.offset = newOffset + fw.size = newOffset + } else { + // No problems, set the offset. + fw.offset = newOffset + } + + return fw.offset, err +} + +// Close closes the fileWriter for writing. +func (fw *fileWriter) Close() error { + if fw.err != nil { + return fw.err + } + + fw.err = fmt.Errorf("filewriter@%v: closed", fw.path) + + return fw.err +} + +// readFromAt writes to fw from r at the specified offset. If offset is less +// than zero, the value of fw.offset is used and updated after the operation. +func (fw *fileWriter) readFromAt(r io.Reader, offset int64) (n int64, err error) { + if fw.err != nil { + return 0, fw.err + } + + var updateOffset bool + if offset < 0 { + offset = fw.offset + updateOffset = true + } + + nn, err := fw.driver.WriteStream(fw.path, offset, r) + + if updateOffset { + // We should forward the offset, whether or not there was an error. + // Basically, we keep the filewriter in sync with the reader's head. If an + // error is encountered, the whole thing should be retried but we proceed + // from an expected offset, even if the data didn't make it to the + // backend. + fw.offset += nn + + if fw.offset > fw.size { + fw.size = fw.offset + } + } + + return nn, err +} diff --git a/storage/filewriter_test.go b/storage/filewriter_test.go new file mode 100644 index 00000000..2235462f --- /dev/null +++ b/storage/filewriter_test.go @@ -0,0 +1,148 @@ +package storage + +import ( + "bytes" + "crypto/rand" + "io" + "os" + "testing" + + "github.com/docker/distribution/digest" + "github.com/docker/distribution/storagedriver/inmemory" +) + +// TestSimpleWrite takes the fileWriter through common write operations +// ensuring data integrity. +func TestSimpleWrite(t *testing.T) { + content := make([]byte, 1<<20) + n, err := rand.Read(content) + if err != nil { + t.Fatalf("unexpected error building random data: %v", err) + } + + if n != len(content) { + t.Fatalf("random read did't fill buffer") + } + + dgst, err := digest.FromReader(bytes.NewReader(content)) + if err != nil { + t.Fatalf("unexpected error digesting random content: %v", err) + } + + driver := inmemory.New() + path := "/random" + + fw, err := newFileWriter(driver, path) + if err != nil { + t.Fatalf("unexpected error creating fileWriter: %v", err) + } + defer fw.Close() + + n, err = fw.Write(content) + if err != nil { + t.Fatalf("unexpected error writing content: %v", err) + } + + if n != len(content) { + t.Fatalf("unexpected write length: %d != %d", n, len(content)) + } + + fr, err := newFileReader(driver, path) + if err != nil { + t.Fatalf("unexpected error creating fileReader: %v", err) + } + defer fr.Close() + + verifier := digest.NewDigestVerifier(dgst) + io.Copy(verifier, fr) + + if !verifier.Verified() { + t.Fatalf("unable to verify write data") + } + + // Check the seek position is equal to the content length + end, err := fw.Seek(0, os.SEEK_END) + if err != nil { + t.Fatalf("unexpected error seeking: %v", err) + } + + if end != int64(len(content)) { + t.Fatalf("write did not advance offset: %d != %d", end, len(content)) + } + + // Double the content, but use the WriteAt method + doubled := append(content, content...) + doubledgst, err := digest.FromReader(bytes.NewReader(doubled)) + if err != nil { + t.Fatalf("unexpected error digesting doubled content: %v", err) + } + + n, err = fw.WriteAt(content, end) + if err != nil { + t.Fatalf("unexpected error writing content at %d: %v", end, err) + } + + if n != len(content) { + t.Fatalf("writeat was short: %d != %d", n, len(content)) + } + + fr, err = newFileReader(driver, path) + if err != nil { + t.Fatalf("unexpected error creating fileReader: %v", err) + } + defer fr.Close() + + verifier = digest.NewDigestVerifier(doubledgst) + io.Copy(verifier, fr) + + if !verifier.Verified() { + t.Fatalf("unable to verify write data") + } + + // Check that WriteAt didn't update the offset. + end, err = fw.Seek(0, os.SEEK_END) + if err != nil { + t.Fatalf("unexpected error seeking: %v", err) + } + + if end != int64(len(content)) { + t.Fatalf("write did not advance offset: %d != %d", end, len(content)) + } + + // Now, we copy from one path to another, running the data through the + // fileReader to fileWriter, rather than the driver.Move command to ensure + // everything is working correctly. + fr, err = newFileReader(driver, path) + if err != nil { + t.Fatalf("unexpected error creating fileReader: %v", err) + } + defer fr.Close() + + fw, err = newFileWriter(driver, "/copied") + if err != nil { + t.Fatalf("unexpected error creating fileWriter: %v", err) + } + defer fw.Close() + + nn, err := io.Copy(fw, fr) + if err != nil { + t.Fatalf("unexpected error copying data: %v", err) + } + + if nn != int64(len(doubled)) { + t.Fatalf("unexpected copy length: %d != %d", nn, len(doubled)) + } + + fr, err = newFileReader(driver, "/copied") + if err != nil { + t.Fatalf("unexpected error creating fileReader: %v", err) + } + defer fr.Close() + + verifier = digest.NewDigestVerifier(doubledgst) + io.Copy(verifier, fr) + + if !verifier.Verified() { + t.Fatalf("unable to verify write data") + } +}