diff --git a/circle.yml b/circle.yml index 96fa911a..16a6c817 100644 --- a/circle.yml +++ b/circle.yml @@ -21,8 +21,11 @@ test: - test -z $(gofmt -s -l . | tee /dev/stderr) - go vet ./... - test -z $(golint ./... | tee /dev/stderr) - - go test -race -test.v ./...: - timeout: 600 + - go test -test.v ./... + + # Disabling the race detector due to massive memory usage. + # - go test -race -test.v ./...: + # timeout: 600 # TODO(stevvooe): The following is an attempt at using goveralls but it # just doesn't work. goveralls requires a single profile file to be diff --git a/client/client_test.go b/client/client_test.go index 57578c81..d4a335ec 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -91,7 +91,7 @@ func TestPush(t *testing.T) { } handler := testutil.NewHandler(append(blobRequestResponseMappings, testutil.RequestResponseMap{ - testutil.RequestResponseMapping{ + { Request: testutil.Request{ Method: "PUT", Route: "/v2/" + name + "/manifest/" + tag, @@ -184,7 +184,7 @@ func TestPull(t *testing.T) { } handler := testutil.NewHandler(append(blobRequestResponseMappings, testutil.RequestResponseMap{ - testutil.RequestResponseMapping{ + { Request: testutil.Request{ Method: "GET", Route: "/v2/" + name + "/manifest/" + tag, @@ -307,7 +307,7 @@ func TestPullResume(t *testing.T) { for i := 0; i < 3; i++ { layerRequestResponseMappings = append(layerRequestResponseMappings, testutil.RequestResponseMap{ - testutil.RequestResponseMapping{ + { Request: testutil.Request{ Method: "GET", Route: "/v2/" + name + "/manifest/" + tag, diff --git a/cmd/registry-storagedriver-azure/main.go b/cmd/registry-storagedriver-azure/main.go index b9944342..584699bf 100644 --- a/cmd/registry-storagedriver-azure/main.go +++ b/cmd/registry-storagedriver-azure/main.go @@ -1,3 +1,5 @@ +// +build ignore + package main import ( diff --git a/cmd/registry-storagedriver-filesystem/main.go b/cmd/registry-storagedriver-filesystem/main.go index 5ea1eb70..0e555b61 100644 --- a/cmd/registry-storagedriver-filesystem/main.go +++ b/cmd/registry-storagedriver-filesystem/main.go @@ -1,3 +1,5 @@ +// +build ignore + package main import ( diff --git a/cmd/registry-storagedriver-inmemory/main.go b/cmd/registry-storagedriver-inmemory/main.go index 77b1c530..b75d3694 100644 --- a/cmd/registry-storagedriver-inmemory/main.go +++ b/cmd/registry-storagedriver-inmemory/main.go @@ -1,3 +1,5 @@ +// +build ignore + package main import ( diff --git a/cmd/registry-storagedriver-s3/main.go b/cmd/registry-storagedriver-s3/main.go index 21192a0f..e2234b7b 100644 --- a/cmd/registry-storagedriver-s3/main.go +++ b/cmd/registry-storagedriver-s3/main.go @@ -1,3 +1,5 @@ +// +build ignore + package main import ( diff --git a/cmd/registry/main.go b/cmd/registry/main.go index 150c7d6b..29fa24c1 100644 --- a/cmd/registry/main.go +++ b/cmd/registry/main.go @@ -15,7 +15,6 @@ import ( "github.com/docker/docker-registry/configuration" _ "github.com/docker/docker-registry/storagedriver/filesystem" _ "github.com/docker/docker-registry/storagedriver/inmemory" - _ "github.com/docker/docker-registry/storagedriver/s3" ) func main() { diff --git a/storage/filereader.go b/storage/filereader.go index 8f1f5205..bcc2614e 100644 --- a/storage/filereader.go +++ b/storage/filereader.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "time" "github.com/docker/docker-registry/storagedriver" ) @@ -16,8 +17,9 @@ type fileReader struct { driver storagedriver.StorageDriver // identifying fields - path string - size int64 // size is the total layer size, must be set. + path string + size int64 // size is the total layer size, must be set. + modtime time.Time // mutable fields rc io.ReadCloser // remote read closer @@ -28,16 +30,21 @@ type fileReader struct { func newFileReader(driver storagedriver.StorageDriver, path string) (*fileReader, error) { // Grab the size of the layer file, ensuring existence. - size, err := driver.CurrentSize(path) + fi, err := driver.Stat(path) if err != nil { return nil, err } + if fi.IsDir() { + return nil, fmt.Errorf("cannot read a directory") + } + return &fileReader{ - driver: driver, - path: path, - size: int64(size), + driver: driver, + path: path, + size: fi.Size(), + modtime: fi.ModTime(), }, nil } @@ -126,7 +133,7 @@ func (fr *fileReader) reader() (io.Reader, error) { } // If we don't have a reader, open one up. - rc, err := fr.driver.ReadStream(fr.path, uint64(fr.offset)) + rc, err := fr.driver.ReadStream(fr.path, fr.offset) if err != nil { return nil, err diff --git a/storage/layerreader.go b/storage/layerreader.go index 2cc184fd..fa2275d9 100644 --- a/storage/layerreader.go +++ b/storage/layerreader.go @@ -11,9 +11,8 @@ import ( type layerReader struct { fileReader - name string // repo name of this layer - digest digest.Digest - createdAt time.Time + name string // repo name of this layer + digest digest.Digest } var _ Layer = &layerReader{} @@ -27,5 +26,5 @@ func (lrs *layerReader) Digest() digest.Digest { } func (lrs *layerReader) CreatedAt() time.Time { - return lrs.createdAt + return lrs.modtime } diff --git a/storage/layerstore.go b/storage/layerstore.go index d731a5b8..ddebdbcc 100644 --- a/storage/layerstore.go +++ b/storage/layerstore.go @@ -1,8 +1,6 @@ package storage import ( - "time" - "github.com/docker/docker-registry/digest" "github.com/docker/docker-registry/storagedriver" ) @@ -55,11 +53,6 @@ func (ls *layerStore) Fetch(name string, digest digest.Digest) (Layer, error) { fileReader: *fr, name: name, digest: digest, - - // TODO(stevvooe): Storage backend does not support modification time - // queries yet. Layers "never" change, so just return the zero value - // plus a nano-second. - createdAt: (time.Time{}).Add(time.Nanosecond), }, nil } diff --git a/storage/layerupload.go b/storage/layerupload.go index de1a894b..3ee593b9 100644 --- a/storage/layerupload.go +++ b/storage/layerupload.go @@ -107,9 +107,13 @@ func (luc *layerUploadController) Finish(size int64, digest digest.Digest) (Laye return nil, err } - if err := luc.writeLayer(fp, size, digest); err != nil { + if nn, err := luc.writeLayer(fp, digest); err != nil { // Cleanup? return nil, err + } else if nn != size { + // TODO(stevvooe): Short write. Will have to delete the location and + // report an error. This error needs to be reported to the client. + return nil, fmt.Errorf("short write writing layer") } // Yes! We have written some layer data. Let's make it visible. Link the @@ -281,19 +285,20 @@ func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst d return dgst, nil } -// writeLayer actually writes the the layer file into its final destination. -// The layer should be validated before commencing the write. -func (luc *layerUploadController) writeLayer(fp layerFile, size int64, dgst digest.Digest) error { +// writeLayer actually writes the the layer file into its final destination, +// identified by dgst. The layer should be validated before commencing the +// write. +func (luc *layerUploadController) writeLayer(fp layerFile, dgst digest.Digest) (nn int64, err error) { blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{ digest: dgst, }) if err != nil { - return err + return 0, err } // Check for existence - if _, err := luc.layerStore.driver.CurrentSize(blobPath); err != nil { + if _, err := luc.layerStore.driver.Stat(blobPath); err != nil { // TODO(stevvooe): This check is kind of problematic and very racy. switch err := err.(type) { case storagedriver.PathNotFoundError: @@ -303,22 +308,18 @@ func (luc *layerUploadController) writeLayer(fp layerFile, size int64, dgst dige // content addressable and we should just use this to ensure we // have it written. Although, we do need to verify that the // content that is there is the correct length. - return err + return 0, err } } // Seek our local layer file back now. if _, err := fp.Seek(0, os.SEEK_SET); err != nil { // Cleanup? - return err + return 0, err } // Okay: we can write the file to the blob store. - if err := luc.layerStore.driver.WriteStream(blobPath, 0, uint64(size), fp); err != nil { - return err - } - - return nil + return luc.layerStore.driver.WriteStream(blobPath, 0, fp) } // linkLayer links a valid, written layer blob into the registry under the diff --git a/storage/manifeststore.go b/storage/manifeststore.go index e1760dd8..ebbc6b3c 100644 --- a/storage/manifeststore.go +++ b/storage/manifeststore.go @@ -22,12 +22,21 @@ func (ms *manifestStore) Exists(name, tag string) (bool, error) { return false, err } - size, err := ms.driver.CurrentSize(p) + fi, err := ms.driver.Stat(p) if err != nil { - return false, err + switch err.(type) { + case storagedriver.PathNotFoundError: + return false, nil + default: + return false, err + } } - if size == 0 { + if fi.IsDir() { + return false, fmt.Errorf("unexpected directory at path: %v, name=%s tag=%s", p, name, tag) + } + + if fi.Size() == 0 { return false, nil } diff --git a/storagedriver/azure/azure.go b/storagedriver/azure/azure.go index ba716841..64402f3b 100644 --- a/storagedriver/azure/azure.go +++ b/storagedriver/azure/azure.go @@ -1,3 +1,5 @@ +// +build ignore + // Package azure provides a storagedriver.StorageDriver implementation to // store blobs in Microsoft Azure Blob Storage Service. package azure @@ -103,7 +105,7 @@ func (d *Driver) PutContent(path string, contents []byte) error { // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { +func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { if ok, err := d.client.BlobExists(d.container, path); err != nil { return nil, err } else if !ok { @@ -115,7 +117,7 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { return nil, err } - if offset >= size { + if offset >= int64(size) { return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} } @@ -129,10 +131,10 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { +func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { var ( lastBlockNum int - resumableOffset uint64 + resumableOffset int64 blocks []azure.Block ) @@ -153,12 +155,12 @@ func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadClo return fmt.Errorf("Cannot parse block name as number '%s': %s", lastBlock.Name, err.Error()) } - var totalSize uint64 + var totalSize int64 for _, v := range parts.CommittedBlocks { blocks = append(blocks, azure.Block{ Id: v.Name, Status: azure.BlockStatusCommitted}) - totalSize += uint64(v.Size) + totalSize += int64(v.Size) } // NOTE: Azure driver currently supports only append mode (resumable diff --git a/storagedriver/azure/azure_test.go b/storagedriver/azure/azure_test.go index 888d1165..1edcc1ea 100644 --- a/storagedriver/azure/azure_test.go +++ b/storagedriver/azure/azure_test.go @@ -1,3 +1,5 @@ +// +build ignore + package azure import ( diff --git a/storagedriver/factory/factory.go b/storagedriver/factory/factory.go index 0b85f372..0f8ca001 100644 --- a/storagedriver/factory/factory.go +++ b/storagedriver/factory/factory.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/docker/docker-registry/storagedriver" - "github.com/docker/docker-registry/storagedriver/ipc" ) // driverFactories stores an internal mapping between storage driver names and their respective @@ -41,16 +40,23 @@ func Register(name string, factory StorageDriverFactory) { func Create(name string, parameters map[string]string) (storagedriver.StorageDriver, error) { driverFactory, ok := driverFactories[name] if !ok { + return nil, InvalidStorageDriverError{name} + + // NOTE(stevvooe): We are disabling storagedriver ipc for now, as the + // server and client need to be updated for the changed API calls and + // there were some problems libchan hanging. We'll phase this + // functionality back in over the next few weeks. + // No registered StorageDriverFactory found, try ipc - driverClient, err := ipc.NewDriverClient(name, parameters) - if err != nil { - return nil, InvalidStorageDriverError{name} - } - err = driverClient.Start() - if err != nil { - return nil, err - } - return driverClient, nil + // driverClient, err := ipc.NewDriverClient(name, parameters) + // if err != nil { + // return nil, InvalidStorageDriverError{name} + // } + // err = driverClient.Start() + // if err != nil { + // return nil, err + // } + // return driverClient, nil } return driverFactory.Create(parameters) } diff --git a/storagedriver/fileinfo.go b/storagedriver/fileinfo.go new file mode 100644 index 00000000..82e3d546 --- /dev/null +++ b/storagedriver/fileinfo.go @@ -0,0 +1,79 @@ +package storagedriver + +import "time" + +// FileInfo returns information about a given path. Inspired by os.FileInfo, +// it elides the base name method for a full path instead. +type FileInfo interface { + // Path provides the full path of the target of this file info. + Path() string + + // Size returns current length in bytes of the file. The return value can + // be used to write to the end of the file at path. The value is + // meaningless if IsDir returns true. + Size() int64 + + // ModTime returns the modification time for the file. For backends that + // don't have a modification time, the creation time should be returned. + ModTime() time.Time + + // IsDir returns true if the path is a directory. + IsDir() bool +} + +// NOTE(stevvooe): The next two types, FileInfoFields and FileInfoInternal +// should only be used by storagedriver implementations. They should moved to +// a "driver" package, similar to database/sql. + +// FileInfoFields provides the exported fields for implementing FileInfo +// interface in storagedriver implementations. It should be used with +// InternalFileInfo. +type FileInfoFields struct { + // Path provides the full path of the target of this file info. + Path string + + // Size is current length in bytes of the file. The value of this field + // can be used to write to the end of the file at path. The value is + // meaningless if IsDir is set to true. + Size int64 + + // ModTime returns the modification time for the file. For backends that + // don't have a modification time, the creation time should be returned. + ModTime time.Time + + // IsDir returns true if the path is a directory. + IsDir bool +} + +// FileInfoInternal implements the FileInfo interface. This should only be +// used by storagedriver implementations that don't have a specialized +// FileInfo type. +type FileInfoInternal struct { + FileInfoFields +} + +var _ FileInfo = FileInfoInternal{} +var _ FileInfo = &FileInfoInternal{} + +// Path provides the full path of the target of this file info. +func (fi FileInfoInternal) Path() string { + return fi.FileInfoFields.Path +} + +// Size returns current length in bytes of the file. The return value can +// be used to write to the end of the file at path. The value is +// meaningless if IsDir returns true. +func (fi FileInfoInternal) Size() int64 { + return fi.FileInfoFields.Size +} + +// ModTime returns the modification time for the file. For backends that +// don't have a modification time, the creation time should be returned. +func (fi FileInfoInternal) ModTime() time.Time { + return fi.FileInfoFields.ModTime +} + +// IsDir returns true if the path is a directory. +func (fi FileInfoInternal) IsDir() bool { + return fi.FileInfoFields.IsDir +} diff --git a/storagedriver/filesystem/driver.go b/storagedriver/filesystem/driver.go index a4b2e688..05ec6175 100644 --- a/storagedriver/filesystem/driver.go +++ b/storagedriver/filesystem/driver.go @@ -1,10 +1,13 @@ package filesystem import ( + "bytes" + "fmt" "io" "io/ioutil" "os" "path" + "time" "github.com/docker/docker-registry/storagedriver" "github.com/docker/docker-registry/storagedriver/factory" @@ -49,41 +52,47 @@ func New(rootDirectory string) *Driver { return &Driver{rootDirectory} } -// subPath returns the absolute path of a key within the Driver's storage -func (d *Driver) subPath(subPath string) string { - return path.Join(d.rootDirectory, subPath) -} - // Implement the storagedriver.StorageDriver interface // GetContent retrieves the content stored at "path" as a []byte. func (d *Driver) GetContent(path string) ([]byte, error) { - contents, err := ioutil.ReadFile(d.subPath(path)) + rc, err := d.ReadStream(path, 0) if err != nil { - return nil, storagedriver.PathNotFoundError{Path: path} + return nil, err } - return contents, nil + defer rc.Close() + + p, err := ioutil.ReadAll(rc) + if err != nil { + return nil, err + } + + return p, nil } // PutContent stores the []byte content at a location designated by "path". func (d *Driver) PutContent(subPath string, contents []byte) error { - fullPath := d.subPath(subPath) - parentDir := path.Dir(fullPath) - err := os.MkdirAll(parentDir, 0755) - if err != nil { + if _, err := d.WriteStream(subPath, 0, bytes.NewReader(contents)); err != nil { return err } - err = ioutil.WriteFile(fullPath, contents, 0644) - return err + return os.Truncate(d.fullPath(subPath), int64(len(contents))) } // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { - file, err := os.OpenFile(d.subPath(path), os.O_RDONLY, 0644) +func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { + if offset < 0 { + return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} + } + + file, err := os.OpenFile(d.fullPath(path), os.O_RDONLY, 0644) if err != nil { - return nil, storagedriver.PathNotFoundError{Path: path} + if os.IsNotExist(err) { + return nil, storagedriver.PathNotFoundError{Path: path} + } + + return nil, err } seekPos, err := file.Seek(int64(offset), os.SEEK_SET) @@ -98,79 +107,64 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { return file, nil } -// WriteStream stores the contents of the provided io.ReadCloser at a location +// WriteStream stores the contents of the provided io.Reader at a location // designated by the given path. -func (d *Driver) WriteStream(subPath string, offset, size uint64, reader io.ReadCloser) error { - defer reader.Close() - - resumableOffset, err := d.CurrentSize(subPath) - if _, pathNotFound := err.(storagedriver.PathNotFoundError); err != nil && !pathNotFound { - return err +func (d *Driver) WriteStream(subPath string, offset int64, reader io.Reader) (nn int64, err error) { + if offset < 0 { + return 0, storagedriver.InvalidOffsetError{Path: subPath, Offset: offset} } - if offset > resumableOffset { - return storagedriver.InvalidOffsetError{Path: subPath, Offset: offset} - } + // TODO(stevvooe): This needs to be a requirement. + // if !path.IsAbs(subPath) { + // return fmt.Errorf("absolute path required: %q", subPath) + // } - fullPath := d.subPath(subPath) + fullPath := d.fullPath(subPath) parentDir := path.Dir(fullPath) - err = os.MkdirAll(parentDir, 0755) + if err := os.MkdirAll(parentDir, 0755); err != nil { + return 0, err + } + + fp, err := os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE, 0644) if err != nil { - return err - } - - var file *os.File - if offset == 0 { - file, err = os.Create(fullPath) - } else { - file, err = os.OpenFile(fullPath, os.O_WRONLY|os.O_APPEND, 0) + // TODO(stevvooe): A few missing conditions in storage driver: + // 1. What if the path is already a directory? + // 2. Should number 1 be exposed explicitly in storagedriver? + // 2. Can this path not exist, even if we create above? + return 0, err } + defer fp.Close() + nn, err = fp.Seek(offset, os.SEEK_SET) if err != nil { - return err + return 0, err } - defer file.Close() - buf := make([]byte, 32*1024) - for { - bytesRead, er := reader.Read(buf) - if bytesRead > 0 { - bytesWritten, ew := file.WriteAt(buf[0:bytesRead], int64(offset)) - if bytesWritten > 0 { - offset += uint64(bytesWritten) - } - if ew != nil { - err = ew - break - } - if bytesRead != bytesWritten { - err = io.ErrShortWrite - break - } - } - if er == io.EOF { - break - } - if er != nil { - err = er - break - } + if nn != offset { + return 0, fmt.Errorf("bad seek to %v, expected %v in fp=%v", offset, nn, fp) } - return err + + return io.Copy(fp, reader) } -// CurrentSize retrieves the curernt size in bytes of the object at the given -// path. -func (d *Driver) CurrentSize(subPath string) (uint64, error) { - fullPath := d.subPath(subPath) +// Stat retrieves the FileInfo for the given path, including the current size +// in bytes and the creation time. +func (d *Driver) Stat(subPath string) (storagedriver.FileInfo, error) { + fullPath := d.fullPath(subPath) - fileInfo, err := os.Stat(fullPath) - if err != nil && !os.IsNotExist(err) { - return 0, err - } else if err != nil { - return 0, storagedriver.PathNotFoundError{Path: subPath} + fi, err := os.Stat(fullPath) + if err != nil { + if os.IsNotExist(err) { + return nil, storagedriver.PathNotFoundError{Path: subPath} + } + + return nil, err } - return uint64(fileInfo.Size()), nil + + return fileInfo{ + path: subPath, + FileInfo: fi, + }, nil } // List returns a list of the objects that are direct descendants of the given @@ -179,7 +173,7 @@ func (d *Driver) List(subPath string) ([]string, error) { if subPath[len(subPath)-1] != '/' { subPath += "/" } - fullPath := d.subPath(subPath) + fullPath := d.fullPath(subPath) dir, err := os.Open(fullPath) if err != nil { @@ -202,8 +196,8 @@ func (d *Driver) List(subPath string) ([]string, error) { // Move moves an object stored at sourcePath to destPath, removing the original // object. func (d *Driver) Move(sourcePath string, destPath string) error { - source := d.subPath(sourcePath) - dest := d.subPath(destPath) + source := d.fullPath(sourcePath) + dest := d.fullPath(destPath) if _, err := os.Stat(source); os.IsNotExist(err) { return storagedriver.PathNotFoundError{Path: sourcePath} @@ -215,7 +209,7 @@ func (d *Driver) Move(sourcePath string, destPath string) error { // Delete recursively deletes all objects stored at "path" and its subpaths. func (d *Driver) Delete(subPath string) error { - fullPath := d.subPath(subPath) + fullPath := d.fullPath(subPath) _, err := os.Stat(fullPath) if err != nil && !os.IsNotExist(err) { @@ -227,3 +221,42 @@ func (d *Driver) Delete(subPath string) error { err = os.RemoveAll(fullPath) return err } + +// fullPath returns the absolute path of a key within the Driver's storage. +func (d *Driver) fullPath(subPath string) string { + return path.Join(d.rootDirectory, subPath) +} + +type fileInfo struct { + os.FileInfo + path string +} + +var _ storagedriver.FileInfo = fileInfo{} + +// Path provides the full path of the target of this file info. +func (fi fileInfo) Path() string { + return fi.path +} + +// Size returns current length in bytes of the file. The return value can +// be used to write to the end of the file at path. The value is +// meaningless if IsDir returns true. +func (fi fileInfo) Size() int64 { + if fi.IsDir() { + return 0 + } + + return fi.FileInfo.Size() +} + +// ModTime returns the modification time for the file. For backends that +// don't have a modification time, the creation time should be returned. +func (fi fileInfo) ModTime() time.Time { + return fi.FileInfo.ModTime() +} + +// IsDir returns true if the path is a directory. +func (fi fileInfo) IsDir() bool { + return fi.FileInfo.IsDir() +} diff --git a/storagedriver/filesystem/driver_test.go b/storagedriver/filesystem/driver_test.go index 1d9bac54..0965daa4 100644 --- a/storagedriver/filesystem/driver_test.go +++ b/storagedriver/filesystem/driver_test.go @@ -1,6 +1,7 @@ package filesystem import ( + "io/ioutil" "os" "testing" @@ -13,12 +14,16 @@ import ( func Test(t *testing.T) { TestingT(t) } func init() { - rootDirectory := "/tmp/driver" - os.RemoveAll(rootDirectory) - - filesystemDriverConstructor := func() (storagedriver.StorageDriver, error) { - return New(rootDirectory), nil + root, err := ioutil.TempDir("", "driver-") + if err != nil { + panic(err) } - testsuites.RegisterInProcessSuite(filesystemDriverConstructor, testsuites.NeverSkip) - testsuites.RegisterIPCSuite(driverName, map[string]string{"rootdirectory": rootDirectory}, testsuites.NeverSkip) + defer os.Remove(root) + + testsuites.RegisterInProcessSuite(func() (storagedriver.StorageDriver, error) { + return New(root), nil + }, testsuites.NeverSkip) + + // BUG(stevvooe): IPC is broken so we're disabling for now. Will revisit later. + // testsuites.RegisterIPCSuite(driverName, map[string]string{"rootdirectory": root}, testsuites.NeverSkip) } diff --git a/storagedriver/inmemory/driver.go b/storagedriver/inmemory/driver.go index 98b068e9..0b68e021 100644 --- a/storagedriver/inmemory/driver.go +++ b/storagedriver/inmemory/driver.go @@ -5,9 +5,9 @@ import ( "fmt" "io" "io/ioutil" - "regexp" "strings" "sync" + "time" "github.com/docker/docker-registry/storagedriver" "github.com/docker/docker-registry/storagedriver/factory" @@ -29,13 +29,18 @@ func (factory *inMemoryDriverFactory) Create(parameters map[string]string) (stor // Driver is a storagedriver.StorageDriver implementation backed by a local map. // Intended solely for example and testing purposes. type Driver struct { - storage map[string][]byte - mutex sync.RWMutex + root *dir + mutex sync.RWMutex } // New constructs a new Driver. func New() *Driver { - return &Driver{storage: make(map[string][]byte)} + return &Driver{root: &dir{ + common: common{ + p: "/", + mod: time.Now(), + }, + }} } // Implement the storagedriver.StorageDriver interface. @@ -44,106 +49,141 @@ func New() *Driver { func (d *Driver) GetContent(path string) ([]byte, error) { d.mutex.RLock() defer d.mutex.RUnlock() - contents, ok := d.storage[path] - if !ok { - return nil, storagedriver.PathNotFoundError{Path: path} + + rc, err := d.ReadStream(path, 0) + if err != nil { + return nil, err } - return contents, nil + defer rc.Close() + + return ioutil.ReadAll(rc) } // PutContent stores the []byte content at a location designated by "path". -func (d *Driver) PutContent(path string, contents []byte) error { +func (d *Driver) PutContent(p string, contents []byte) error { d.mutex.Lock() defer d.mutex.Unlock() - d.storage[path] = contents + + f, err := d.root.mkfile(p) + if err != nil { + // TODO(stevvooe): Again, we need to clarify when this is not a + // directory in StorageDriver API. + return fmt.Errorf("not a file") + } + + f.truncate() + f.WriteAt(contents, 0) + return nil } // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { +func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { d.mutex.RLock() defer d.mutex.RUnlock() - contents, err := d.GetContent(path) - if err != nil { - return nil, err - } else if len(contents) <= int(offset) { + + if offset < 0 { return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} } - src := contents[offset:] - buf := make([]byte, len(src)) - copy(buf, src) - return ioutil.NopCloser(bytes.NewReader(buf)), nil + path = d.normalize(path) + found := d.root.find(path) + + if found.path() != path { + return nil, storagedriver.PathNotFoundError{Path: path} + } + + if found.isdir() { + return nil, fmt.Errorf("%q is a directory", path) + } + + return ioutil.NopCloser(found.(*file).sectionReader(offset)), nil } // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { - defer reader.Close() - d.mutex.RLock() - defer d.mutex.RUnlock() +func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (nn int64, err error) { + d.mutex.Lock() + defer d.mutex.Unlock() - resumableOffset, err := d.CurrentSize(path) + if offset < 0 { + return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset} + } + + normalized := d.normalize(path) + + f, err := d.root.mkfile(normalized) if err != nil { - return err + return 0, fmt.Errorf("not a file") } - if offset > resumableOffset { - return storagedriver.InvalidOffsetError{Path: path, Offset: offset} - } + var buf bytes.Buffer - contents, err := ioutil.ReadAll(reader) + nn, err = buf.ReadFrom(reader) if err != nil { - return err + // TODO(stevvooe): This condition is odd and we may need to clarify: + // we've read nn bytes from reader but have written nothing to the + // backend. What is the correct return value? Really, the caller needs + // to know that the reader has been advanced and reattempting the + // operation is incorrect. + return nn, err } - if offset > 0 { - contents = append(d.storage[path][0:offset], contents...) - } - - d.storage[path] = contents - return nil + f.WriteAt(buf.Bytes(), offset) + return nn, err } -// CurrentSize retrieves the curernt size in bytes of the object at the given -// path. -func (d *Driver) CurrentSize(path string) (uint64, error) { +// Stat returns info about the provided path. +func (d *Driver) Stat(path string) (storagedriver.FileInfo, error) { d.mutex.RLock() defer d.mutex.RUnlock() - contents, ok := d.storage[path] - if !ok { - return 0, nil + + normalized := d.normalize(path) + found := d.root.find(path) + + if found.path() != normalized { + return nil, storagedriver.PathNotFoundError{Path: path} } - return uint64(len(contents)), nil + + fi := storagedriver.FileInfoFields{ + Path: path, + IsDir: found.isdir(), + ModTime: found.modtime(), + } + + if !fi.IsDir { + fi.Size = int64(len(found.(*file).data)) + } + + return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil } // List returns a list of the objects that are direct descendants of the given // path. func (d *Driver) List(path string) ([]string, error) { - if path[len(path)-1] != '/' { - path += "/" - } - subPathMatcher, err := regexp.Compile(fmt.Sprintf("^%s[^/]+", path)) - if err != nil { - return nil, err + normalized := d.normalize(path) + + found := d.root.find(normalized) + + if !found.isdir() { + return nil, fmt.Errorf("not a directory") // TODO(stevvooe): Need error type for this... } - d.mutex.RLock() - defer d.mutex.RUnlock() - // we use map to collect unique keys - keySet := make(map[string]struct{}) - for k := range d.storage { - if key := subPathMatcher.FindString(k); key != "" { - keySet[key] = struct{}{} + entries, err := found.(*dir).list(normalized) + + if err != nil { + switch err { + case errNotExists: + return nil, storagedriver.PathNotFoundError{Path: path} + case errIsNotDir: + return nil, fmt.Errorf("not a directory") + default: + return nil, err } } - keys := make([]string, 0, len(keySet)) - for k := range keySet { - keys = append(keys, k) - } - return keys, nil + return entries, nil } // Move moves an object stored at sourcePath to destPath, removing the original @@ -151,32 +191,37 @@ func (d *Driver) List(path string) ([]string, error) { func (d *Driver) Move(sourcePath string, destPath string) error { d.mutex.Lock() defer d.mutex.Unlock() - contents, ok := d.storage[sourcePath] - if !ok { - return storagedriver.PathNotFoundError{Path: sourcePath} + + normalizedSrc, normalizedDst := d.normalize(sourcePath), d.normalize(destPath) + + err := d.root.move(normalizedSrc, normalizedDst) + switch err { + case errNotExists: + return storagedriver.PathNotFoundError{Path: destPath} + default: + return err } - d.storage[destPath] = contents - delete(d.storage, sourcePath) - return nil } // Delete recursively deletes all objects stored at "path" and its subpaths. func (d *Driver) Delete(path string) error { d.mutex.Lock() defer d.mutex.Unlock() - var subPaths []string - for k := range d.storage { - if strings.HasPrefix(k, path) { - subPaths = append(subPaths, k) - } - } - if len(subPaths) == 0 { + normalized := d.normalize(path) + + err := d.root.delete(normalized) + switch err { + case errNotExists: return storagedriver.PathNotFoundError{Path: path} + default: + return err } - - for _, subPath := range subPaths { - delete(d.storage, subPath) - } - return nil +} + +func (d *Driver) normalize(p string) string { + if !strings.HasPrefix(p, "/") { + p = "/" + p // Ghetto path absolution. + } + return p } diff --git a/storagedriver/inmemory/driver_test.go b/storagedriver/inmemory/driver_test.go index 87549542..6a4b3697 100644 --- a/storagedriver/inmemory/driver_test.go +++ b/storagedriver/inmemory/driver_test.go @@ -17,5 +17,8 @@ func init() { return New(), nil } testsuites.RegisterInProcessSuite(inmemoryDriverConstructor, testsuites.NeverSkip) - testsuites.RegisterIPCSuite(driverName, nil, testsuites.NeverSkip) + + // BUG(stevvooe): Disable flaky IPC tests for now when we can troubleshoot + // the problems with libchan. + // testsuites.RegisterIPCSuite(driverName, nil, testsuites.NeverSkip) } diff --git a/storagedriver/inmemory/mfs.go b/storagedriver/inmemory/mfs.go new file mode 100644 index 00000000..5248bbc6 --- /dev/null +++ b/storagedriver/inmemory/mfs.go @@ -0,0 +1,329 @@ +package inmemory + +import ( + "fmt" + "io" + "path" + "sort" + "strings" + "time" +) + +var ( + errExists = fmt.Errorf("exists") + errNotExists = fmt.Errorf("exists") + errIsNotDir = fmt.Errorf("notdir") + errIsDir = fmt.Errorf("isdir") +) + +type node interface { + name() string + path() string + isdir() bool + modtime() time.Time +} + +// dir is the central type for the memory-based storagedriver. All operations +// are dispatched from a root dir. +type dir struct { + common + + // TODO(stevvooe): Use sorted slice + search. + children map[string]node +} + +var _ node = &dir{} + +func (d *dir) isdir() bool { + return true +} + +// add places the node n into dir d. +func (d *dir) add(n node) { + if d.children == nil { + d.children = make(map[string]node) + } + + d.children[n.name()] = n + d.mod = time.Now() +} + +// find searches for the node, given path q in dir. If the node is found, it +// will be returned. If the node is not found, the closet existing parent. If +// the node is found, the returned (node).path() will match q. +func (d *dir) find(q string) node { + q = strings.Trim(q, "/") + i := strings.Index(q, "/") + + if q == "" { + return d + } + + if i == 0 { + panic("shouldn't happen, no root paths") + } + + var component string + if i < 0 { + // No more path components + component = q + } else { + component = q[:i] + } + + child, ok := d.children[component] + if !ok { + // Node was not found. Return p and the current node. + return d + } + + if child.isdir() { + // traverse down! + q = q[i+1:] + return child.(*dir).find(q) + } + + return child +} + +func (d *dir) list(p string) ([]string, error) { + n := d.find(p) + + if n.path() != p { + return nil, errNotExists + } + + if !n.isdir() { + return nil, errIsNotDir + } + + var children []string + for _, child := range n.(*dir).children { + children = append(children, child.path()) + } + + sort.Strings(children) + return children, nil +} + +// mkfile or return the existing one. returns an error if it exists and is a +// directory. Essentially, this is open or create. +func (d *dir) mkfile(p string) (*file, error) { + n := d.find(p) + if n.path() == p { + if n.isdir() { + return nil, errIsDir + } + + return n.(*file), nil + } + + dirpath, filename := path.Split(p) + // Make any non-existent directories + n, err := d.mkdirs(dirpath) + if err != nil { + return nil, err + } + + dd := n.(*dir) + n = &file{ + common: common{ + p: path.Join(dd.path(), filename), + mod: time.Now(), + }, + } + + dd.add(n) + return n.(*file), nil +} + +// mkdirs creates any missing directory entries in p and returns the result. +func (d *dir) mkdirs(p string) (*dir, error) { + if p == "" { + p = "/" + } + + n := d.find(p) + + if !n.isdir() { + // Found something there + return nil, errIsNotDir + } + + if n.path() == p { + return n.(*dir), nil + } + + dd := n.(*dir) + + relative := strings.Trim(strings.TrimPrefix(p, n.path()), "/") + + if relative == "" { + return dd, nil + } + + components := strings.Split(relative, "/") + for _, component := range components { + d, err := dd.mkdir(component) + + if err != nil { + // This should actually never happen, since there are no children. + return nil, err + } + dd = d + } + + return dd, nil +} + +// mkdir creates a child directory under d with the given name. +func (d *dir) mkdir(name string) (*dir, error) { + if name == "" { + return nil, fmt.Errorf("invalid dirname") + } + + _, ok := d.children[name] + if ok { + return nil, errExists + } + + child := &dir{ + common: common{ + p: path.Join(d.path(), name), + mod: time.Now(), + }, + } + d.add(child) + d.mod = time.Now() + + return child, nil +} + +func (d *dir) move(src, dst string) error { + dstDirname, _ := path.Split(dst) + + dp, err := d.mkdirs(dstDirname) + if err != nil { + return err + } + + srcDirname, srcFilename := path.Split(src) + sp := d.find(srcDirname) + + if sp.path() != srcDirname { + return errNotExists + } + + s, ok := sp.(*dir).children[srcFilename] + if !ok { + return errNotExists + } + + delete(sp.(*dir).children, srcFilename) + + switch n := s.(type) { + case *dir: + n.p = dst + case *file: + n.p = dst + } + + dp.add(s) + + return nil +} + +func (d *dir) delete(p string) error { + dirname, filename := path.Split(p) + parent := d.find(dirname) + + if dirname != parent.path() { + return errNotExists + } + + if _, ok := parent.(*dir).children[filename]; !ok { + return errNotExists + } + + delete(parent.(*dir).children, filename) + return nil +} + +// dump outputs a primitive directory structure to stdout. +func (d *dir) dump(indent string) { + fmt.Println(indent, d.name()+"/") + + for _, child := range d.children { + if child.isdir() { + child.(*dir).dump(indent + "\t") + } else { + fmt.Println(indent, child.name()) + } + + } +} + +func (d *dir) String() string { + return fmt.Sprintf("&dir{path: %v, children: %v}", d.p, d.children) +} + +// file stores actual data in the fs tree. It acts like an open, seekable file +// where operations are conducted through ReadAt and WriteAt. Use it with +// SectionReader for the best effect. +type file struct { + common + data []byte +} + +var _ node = &file{} + +func (f *file) isdir() bool { + return false +} + +func (f *file) truncate() { + f.data = f.data[:0] +} + +func (f *file) sectionReader(offset int64) io.Reader { + return io.NewSectionReader(f, offset, int64(len(f.data))-offset) +} + +func (f *file) ReadAt(p []byte, offset int64) (n int, err error) { + return copy(p, f.data[offset:]), nil +} + +func (f *file) WriteAt(p []byte, offset int64) (n int, err error) { + if len(f.data) > 0 && offset >= int64(len(f.data)) { + // Extend missing region with a zero pad, while also preallocating out to size of p. + pad := offset - int64(len(f.data)) + size := len(p) + int(pad) + f.data = append(f.data, make([]byte, pad, size)...) + } + + f.data = append(f.data, p...) + return len(p), nil +} + +func (f *file) String() string { + return fmt.Sprintf("&file{path: %q}", f.p) +} + +// common provides shared fields and methods for node implementations. +type common struct { + p string + mod time.Time +} + +func (c *common) name() string { + _, name := path.Split(c.p) + return name +} + +func (c *common) path() string { + return c.p +} + +func (c *common) modtime() time.Time { + return c.mod +} diff --git a/storagedriver/ipc/client.go b/storagedriver/ipc/client.go index c77797eb..2dc5c44e 100644 --- a/storagedriver/ipc/client.go +++ b/storagedriver/ipc/client.go @@ -1,3 +1,5 @@ +// +build ignore + package ipc import ( @@ -234,7 +236,7 @@ func (driver *StorageDriverClient) PutContent(path string, contents []byte) erro // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.ReadCloser, error) { +func (driver *StorageDriverClient) ReadStream(path string, offset int64) (io.ReadCloser, error) { if err := driver.exited(); err != nil { return nil, err } @@ -261,7 +263,7 @@ func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.Re // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { +func (driver *StorageDriverClient) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { if err := driver.exited(); err != nil { return err } diff --git a/storagedriver/ipc/ipc.go b/storagedriver/ipc/ipc.go index 82bdcbd7..45c54659 100644 --- a/storagedriver/ipc/ipc.go +++ b/storagedriver/ipc/ipc.go @@ -1,3 +1,5 @@ +// +build ignore + package ipc import ( diff --git a/storagedriver/ipc/server.go b/storagedriver/ipc/server.go index 7d1876ca..fa0077a8 100644 --- a/storagedriver/ipc/server.go +++ b/storagedriver/ipc/server.go @@ -1,3 +1,5 @@ +// +build ignore + package ipc import ( @@ -100,7 +102,7 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) { case "ReadStream": path, _ := request.Parameters["Path"].(string) // Depending on serialization method, Offset may be convereted to any int/uint type - offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(uint64(0))).Uint() + offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int() reader, err := driver.ReadStream(path, offset) var response ReadStreamResponse if err != nil { @@ -115,9 +117,9 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) { case "WriteStream": path, _ := request.Parameters["Path"].(string) // Depending on serialization method, Offset may be convereted to any int/uint type - offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(uint64(0))).Uint() + offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int() // Depending on serialization method, Size may be convereted to any int/uint type - size := reflect.ValueOf(request.Parameters["Size"]).Convert(reflect.TypeOf(uint64(0))).Uint() + size := reflect.ValueOf(request.Parameters["Size"]).Convert(reflect.TypeOf(int64(0))).Int() reader, _ := request.Parameters["Reader"].(io.ReadCloser) err := driver.WriteStream(path, offset, size, reader) response := WriteStreamResponse{ diff --git a/storagedriver/s3/s3.go b/storagedriver/s3/s3.go index def03e3e..e26d3be2 100644 --- a/storagedriver/s3/s3.go +++ b/storagedriver/s3/s3.go @@ -1,3 +1,5 @@ +// +build ignore + package s3 import ( @@ -17,7 +19,7 @@ const driverName = "s3" // minChunkSize defines the minimum multipart upload chunk size // S3 API requires multipart upload chunks to be at least 5MB -const minChunkSize = uint64(5 * 1024 * 1024) +const minChunkSize = 5 * 1024 * 1024 // listPartsMax is the largest amount of parts you can request from S3 const listPartsMax = 1000 @@ -120,9 +122,9 @@ func (d *Driver) PutContent(path string, contents []byte) error { // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { +func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { headers := make(http.Header) - headers.Add("Range", "bytes="+strconv.FormatUint(offset, 10)+"-") + headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-") resp, err := d.Bucket.GetResponseWithHeaders(path, headers) if err != nil { @@ -133,22 +135,22 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { +func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { defer reader.Close() - chunkSize := minChunkSize + chunkSize := int64(minChunkSize) for size/chunkSize >= listPartsMax { chunkSize *= 2 } partNumber := 1 - totalRead := uint64(0) + var totalRead int64 multi, parts, err := d.getAllParts(path) if err != nil { return err } - if (offset) > uint64(len(parts))*chunkSize || (offset < size && offset%chunkSize != 0) { + if (offset) > int64(len(parts))*chunkSize || (offset < size && offset%chunkSize != 0) { return storagedriver.InvalidOffsetError{Path: path, Offset: offset} } @@ -161,11 +163,11 @@ func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadClo buf := make([]byte, chunkSize) for { bytesRead, err := io.ReadFull(reader, buf) - totalRead += uint64(bytesRead) + totalRead += int64(bytesRead) if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF { return err - } else if (uint64(bytesRead) < chunkSize) && totalRead != size { + } else if (int64(bytesRead) < chunkSize) && totalRead != size { break } else { part, err := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:bytesRead])) diff --git a/storagedriver/s3/s3_test.go b/storagedriver/s3/s3_test.go index 6d7b3ff7..f7b4f80e 100644 --- a/storagedriver/s3/s3_test.go +++ b/storagedriver/s3/s3_test.go @@ -1,3 +1,5 @@ +// +build ignore + package s3 import ( diff --git a/storagedriver/storagedriver.go b/storagedriver/storagedriver.go index 1b6c5c00..339b465a 100644 --- a/storagedriver/storagedriver.go +++ b/storagedriver/storagedriver.go @@ -44,7 +44,7 @@ type StorageDriver interface { // ReadStream retrieves an io.ReadCloser for the content stored at "path" // with a given byte offset. // May be used to resume reading a stream by providing a nonzero offset. - ReadStream(path string, offset uint64) (io.ReadCloser, error) + ReadStream(path string, offset int64) (io.ReadCloser, error) // WriteStream stores the contents of the provided io.ReadCloser at a // location designated by the given path. @@ -52,12 +52,11 @@ type StorageDriver interface { // "size" bytes. // May be used to resume writing a stream by providing a nonzero offset. // The offset must be no larger than the CurrentSize for this path. - WriteStream(path string, offset, size uint64, readCloser io.ReadCloser) error + WriteStream(path string, offset int64, reader io.Reader) (nn int64, err error) - // CurrentSize retrieves the curernt size in bytes of the object at the - // given path. - // It should be safe to read or write anywhere up to this point. - CurrentSize(path string) (uint64, error) + // Stat retrieves the FileInfo for the given path, including the current + // size in bytes and the creation time. + Stat(path string) (FileInfo, error) // List returns a list of the objects that are direct descendants of the //given path. @@ -86,7 +85,7 @@ func (err PathNotFoundError) Error() string { // invalid offset. type InvalidOffsetError struct { Path string - Offset uint64 + Offset int64 } func (err InvalidOffsetError) Error() string { diff --git a/storagedriver/testsuites/testsuites.go b/storagedriver/testsuites/testsuites.go index 61756667..f745781e 100644 --- a/storagedriver/testsuites/testsuites.go +++ b/storagedriver/testsuites/testsuites.go @@ -2,15 +2,17 @@ package testsuites import ( "bytes" + "io" "io/ioutil" "math/rand" "os" "path" "sort" + "sync" "testing" + "time" "github.com/docker/docker-registry/storagedriver" - "github.com/docker/docker-registry/storagedriver/ipc" "gopkg.in/check.v1" ) @@ -30,29 +32,34 @@ func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipC // RegisterIPCSuite registers a storage driver test suite which runs the named // driver as a child process with the given parameters. func RegisterIPCSuite(driverName string, ipcParams map[string]string, skipCheck SkipCheck) { - suite := &DriverSuite{ - Constructor: func() (storagedriver.StorageDriver, error) { - d, err := ipc.NewDriverClient(driverName, ipcParams) - if err != nil { - return nil, err - } - err = d.Start() - if err != nil { - return nil, err - } - return d, nil - }, - SkipCheck: skipCheck, - } - suite.Teardown = func() error { - if suite.StorageDriver == nil { - return nil - } + panic("ipc testing is disabled for now") - driverClient := suite.StorageDriver.(*ipc.StorageDriverClient) - return driverClient.Stop() - } - check.Suite(suite) + // NOTE(stevvooe): IPC testing is disabled for now. Uncomment the code + // block before and remove the panic when we phase it back in. + + // suite := &DriverSuite{ + // Constructor: func() (storagedriver.StorageDriver, error) { + // d, err := ipc.NewDriverClient(driverName, ipcParams) + // if err != nil { + // return nil, err + // } + // err = d.Start() + // if err != nil { + // return nil, err + // } + // return d, nil + // }, + // SkipCheck: skipCheck, + // } + // suite.Teardown = func() error { + // if suite.StorageDriver == nil { + // return nil + // } + + // driverClient := suite.StorageDriver.(*ipc.StorageDriverClient) + // return driverClient.Stop() + // } + // check.Suite(suite) } // SkipCheck is a function used to determine if a test suite should be skipped. @@ -167,52 +174,13 @@ func (suite *DriverSuite) TestWriteReadStreams4(c *check.C) { suite.writeReadCompareStreams(c, filename, contents) } -// TestContinueStreamAppend tests that a stream write can be appended to without -// corrupting the data. -func (suite *DriverSuite) TestContinueStreamAppend(c *check.C) { - filename := randomString(32) - defer suite.StorageDriver.Delete(filename) - - chunkSize := uint64(10 * 1024 * 1024) - - contentsChunk1 := []byte(randomString(chunkSize)) - contentsChunk2 := []byte(randomString(chunkSize)) - contentsChunk3 := []byte(randomString(chunkSize)) - - fullContents := append(append(contentsChunk1, contentsChunk2...), contentsChunk3...) - - err := suite.StorageDriver.WriteStream(filename, 0, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(contentsChunk1))) - c.Assert(err, check.IsNil) - - offset, err := suite.StorageDriver.CurrentSize(filename) - c.Assert(err, check.IsNil) - if offset > chunkSize { - c.Fatalf("Offset too large, %d > %d", offset, chunkSize) - } - err = suite.StorageDriver.WriteStream(filename, offset, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:2*chunkSize]))) - c.Assert(err, check.IsNil) - - offset, err = suite.StorageDriver.CurrentSize(filename) - c.Assert(err, check.IsNil) - if offset > 2*chunkSize { - c.Fatalf("Offset too large, %d > %d", offset, 2*chunkSize) - } - - err = suite.StorageDriver.WriteStream(filename, offset, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:]))) - c.Assert(err, check.IsNil) - - received, err := suite.StorageDriver.GetContent(filename) - c.Assert(err, check.IsNil) - c.Assert(received, check.DeepEquals, fullContents) -} - // TestReadStreamWithOffset tests that the appropriate data is streamed when // reading with a given offset. func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { filename := randomString(32) defer suite.StorageDriver.Delete(filename) - chunkSize := uint64(32) + chunkSize := int64(32) contentsChunk1 := []byte(randomString(chunkSize)) contentsChunk2 := []byte(randomString(chunkSize)) @@ -245,8 +213,125 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { readContents, err = ioutil.ReadAll(reader) c.Assert(err, check.IsNil) - c.Assert(readContents, check.DeepEquals, contentsChunk3) + + // Ensure we get invalid offest for negative offsets. + reader, err = suite.StorageDriver.ReadStream(filename, -1) + c.Assert(err, check.FitsTypeOf, storagedriver.InvalidOffsetError{}) + c.Assert(err.(storagedriver.InvalidOffsetError).Offset, check.Equals, int64(-1)) + c.Assert(err.(storagedriver.InvalidOffsetError).Path, check.Equals, filename) + c.Assert(reader, check.IsNil) + + // Read past the end of the content and make sure we get a reader that + // returns 0 bytes and io.EOF + reader, err = suite.StorageDriver.ReadStream(filename, chunkSize*3) + c.Assert(err, check.IsNil) + defer reader.Close() + + buf := make([]byte, chunkSize) + n, err := reader.Read(buf) + c.Assert(err, check.Equals, io.EOF) + c.Assert(n, check.Equals, 0) + + // Check the N-1 boundary condition, ensuring we get 1 byte then io.EOF. + reader, err = suite.StorageDriver.ReadStream(filename, chunkSize*3-1) + c.Assert(err, check.IsNil) + defer reader.Close() + + n, err = reader.Read(buf) + c.Assert(n, check.Equals, 1) + + // We don't care whether the io.EOF comes on the this read or the first + // zero read, but the only error acceptable here is io.EOF. + if err != nil { + c.Assert(err, check.Equals, io.EOF) + } + + // Any more reads should result in zero bytes and io.EOF + n, err = reader.Read(buf) + c.Assert(n, check.Equals, 0) + c.Assert(err, check.Equals, io.EOF) +} + +// TestContinueStreamAppend tests that a stream write can be appended to without +// corrupting the data. +func (suite *DriverSuite) TestContinueStreamAppend(c *check.C) { + filename := randomString(32) + defer suite.StorageDriver.Delete(filename) + + chunkSize := int64(10 * 1024 * 1024) + + contentsChunk1 := []byte(randomString(chunkSize)) + contentsChunk2 := []byte(randomString(chunkSize)) + contentsChunk3 := []byte(randomString(chunkSize)) + contentsChunk4 := []byte(randomString(chunkSize)) + zeroChunk := make([]byte, int64(chunkSize)) + + fullContents := append(append(contentsChunk1, contentsChunk2...), contentsChunk3...) + + nn, err := suite.StorageDriver.WriteStream(filename, 0, bytes.NewReader(contentsChunk1)) + c.Assert(err, check.IsNil) + c.Assert(nn, check.Equals, int64(len(contentsChunk1))) + + fi, err := suite.StorageDriver.Stat(filename) + c.Assert(err, check.IsNil) + c.Assert(fi, check.NotNil) + c.Assert(fi.Size(), check.Equals, int64(len(contentsChunk1))) + + if fi.Size() > chunkSize { + c.Fatalf("Offset too large, %d > %d", fi.Size(), chunkSize) + } + nn, err = suite.StorageDriver.WriteStream(filename, fi.Size(), bytes.NewReader(contentsChunk2)) + c.Assert(err, check.IsNil) + c.Assert(nn, check.Equals, int64(len(contentsChunk2))) + + fi, err = suite.StorageDriver.Stat(filename) + c.Assert(err, check.IsNil) + c.Assert(fi, check.NotNil) + c.Assert(fi.Size(), check.Equals, 2*chunkSize) + + if fi.Size() > 2*chunkSize { + c.Fatalf("Offset too large, %d > %d", fi.Size(), 2*chunkSize) + } + + nn, err = suite.StorageDriver.WriteStream(filename, fi.Size(), bytes.NewReader(fullContents[fi.Size():])) + c.Assert(err, check.IsNil) + c.Assert(nn, check.Equals, int64(len(fullContents[fi.Size():]))) + + received, err := suite.StorageDriver.GetContent(filename) + c.Assert(err, check.IsNil) + c.Assert(received, check.DeepEquals, fullContents) + + // Writing past size of file extends file (no offest error). We would like + // to write chunk 4 one chunk length past chunk 3. It should be successful + // and the resulting file will be 5 chunks long, with a chunk of all + // zeros. + + fullContents = append(fullContents, zeroChunk...) + fullContents = append(fullContents, contentsChunk4...) + + nn, err = suite.StorageDriver.WriteStream(filename, int64(len(fullContents))-chunkSize, bytes.NewReader(contentsChunk4)) + c.Assert(err, check.IsNil) + c.Assert(nn, check.Equals, chunkSize) + + fi, err = suite.StorageDriver.Stat(filename) + c.Assert(err, check.IsNil) + c.Assert(fi, check.NotNil) + c.Assert(fi.Size(), check.Equals, int64(len(fullContents))) + + received, err = suite.StorageDriver.GetContent(filename) + c.Assert(err, check.IsNil) + c.Assert(len(received), check.Equals, len(fullContents)) + c.Assert(received[chunkSize*3:chunkSize*4], check.DeepEquals, zeroChunk) + c.Assert(received[chunkSize*4:chunkSize*5], check.DeepEquals, contentsChunk4) + c.Assert(received, check.DeepEquals, fullContents) + + // Ensure that negative offsets return correct error. + nn, err = suite.StorageDriver.WriteStream(filename, -1, bytes.NewReader(zeroChunk)) + c.Assert(err, check.NotNil) + c.Assert(err, check.FitsTypeOf, storagedriver.InvalidOffsetError{}) + c.Assert(err.(storagedriver.InvalidOffsetError).Path, check.Equals, filename) + c.Assert(err.(storagedriver.InvalidOffsetError).Offset, check.Equals, int64(-1)) } // TestReadNonexistentStream tests that reading a stream for a nonexistent path @@ -260,13 +345,13 @@ func (suite *DriverSuite) TestReadNonexistentStream(c *check.C) { // TestList checks the returned list of keys after populating a directory tree. func (suite *DriverSuite) TestList(c *check.C) { - rootDirectory := "/" + randomString(uint64(8+rand.Intn(8))) + rootDirectory := "/" + randomString(int64(8+rand.Intn(8))) defer suite.StorageDriver.Delete(rootDirectory) - parentDirectory := rootDirectory + "/" + randomString(uint64(8+rand.Intn(8))) + parentDirectory := rootDirectory + "/" + randomString(int64(8+rand.Intn(8))) childFiles := make([]string, 50) for i := 0; i < len(childFiles); i++ { - childFile := parentDirectory + "/" + randomString(uint64(8+rand.Intn(8))) + childFile := parentDirectory + "/" + randomString(int64(8+rand.Intn(8))) childFiles[i] = childFile err := suite.StorageDriver.PutContent(childFile, []byte(randomString(32))) c.Assert(err, check.IsNil) @@ -286,6 +371,11 @@ func (suite *DriverSuite) TestList(c *check.C) { sort.Strings(keys) c.Assert(keys, check.DeepEquals, childFiles) + + // A few checks to add here (check out #819 for more discussion on this): + // 1. Ensure that all paths are absolute. + // 2. Ensure that listings only include direct children. + // 3. Ensure that we only respond to directory listings that end with a slash (maybe?). } // TestMove checks that a moved object no longer exists at the source path and @@ -378,21 +468,75 @@ func (suite *DriverSuite) TestDeleteFolder(c *check.C) { c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) } +// TestStatCall runs verifies the implementation of the storagedriver's Stat call. +func (suite *DriverSuite) TestStatCall(c *check.C) { + content := randomString(4096) + dirPath := randomString(32) + fileName := randomString(32) + filePath := path.Join(dirPath, fileName) + + // Call on non-existent file/dir, check error. + fi, err := suite.StorageDriver.Stat(filePath) + c.Assert(err, check.NotNil) + c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) + c.Assert(fi, check.IsNil) + + err = suite.StorageDriver.PutContent(filePath, []byte(content)) + c.Assert(err, check.IsNil) + + // Call on regular file, check results + start := time.Now().Truncate(time.Second) // truncated for filesystem + fi, err = suite.StorageDriver.Stat(filePath) + c.Assert(err, check.IsNil) + expectedModTime := time.Now() + c.Assert(fi, check.NotNil) + c.Assert(fi.Path(), check.Equals, filePath) + c.Assert(fi.Size(), check.Equals, int64(len(content))) + c.Assert(fi.IsDir(), check.Equals, false) + + if start.After(fi.ModTime()) { + c.Fatalf("modtime %s before file created (%v)", fi.ModTime(), start) + } + + if fi.ModTime().After(expectedModTime) { + c.Fatalf("modtime %s after file created (%v)", fi.ModTime(), expectedModTime) + } + + // Call on directory + start = time.Now().Truncate(time.Second) + fi, err = suite.StorageDriver.Stat(dirPath) + c.Assert(err, check.IsNil) + expectedModTime = time.Now() + c.Assert(fi, check.NotNil) + c.Assert(fi.Path(), check.Equals, dirPath) + c.Assert(fi.Size(), check.Equals, int64(0)) + c.Assert(fi.IsDir(), check.Equals, true) + + if start.After(fi.ModTime()) { + c.Fatalf("modtime %s before file created (%v)", fi.ModTime(), start) + } + + if fi.ModTime().After(expectedModTime) { + c.Fatalf("modtime %s after file created (%v)", fi.ModTime(), expectedModTime) + } +} + // TestConcurrentFileStreams checks that multiple *os.File objects can be passed // in to WriteStream concurrently without hanging. // TODO(bbland): fix this test... func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) { - if _, isIPC := suite.StorageDriver.(*ipc.StorageDriverClient); isIPC { - c.Skip("Need to fix out-of-process concurrency") - } + // if _, isIPC := suite.StorageDriver.(*ipc.StorageDriverClient); isIPC { + // c.Skip("Need to fix out-of-process concurrency") + // } - doneChan := make(chan struct{}) + var wg sync.WaitGroup - testStream := func(size int) { + testStream := func(size int64) { + defer wg.Done() suite.testFileStreams(c, size) - doneChan <- struct{}{} } + wg.Add(6) go testStream(8 * 1024 * 1024) go testStream(4 * 1024 * 1024) go testStream(2 * 1024 * 1024) @@ -400,13 +544,10 @@ func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) { go testStream(1024) go testStream(64) - for i := 0; i < 6; i++ { - <-doneChan - } - + wg.Wait() } -func (suite *DriverSuite) testFileStreams(c *check.C, size int) { +func (suite *DriverSuite) testFileStreams(c *check.C, size int64) { tf, err := ioutil.TempFile("", "tf") c.Assert(err, check.IsNil) defer os.Remove(tf.Name()) @@ -414,7 +555,7 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int) { tfName := path.Base(tf.Name()) defer suite.StorageDriver.Delete(tfName) - contents := []byte(randomString(uint64(size))) + contents := []byte(randomString(size)) _, err = tf.Write(contents) c.Assert(err, check.IsNil) @@ -422,8 +563,9 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int) { tf.Sync() tf.Seek(0, os.SEEK_SET) - err = suite.StorageDriver.WriteStream(tfName, 0, uint64(size), tf) + nn, err := suite.StorageDriver.WriteStream(tfName, 0, tf) c.Assert(err, check.IsNil) + c.Assert(nn, check.Equals, size) reader, err := suite.StorageDriver.ReadStream(tfName, 0) c.Assert(err, check.IsNil) @@ -450,8 +592,9 @@ func (suite *DriverSuite) writeReadCompare(c *check.C, filename string, contents func (suite *DriverSuite) writeReadCompareStreams(c *check.C, filename string, contents []byte) { defer suite.StorageDriver.Delete(filename) - err := suite.StorageDriver.WriteStream(filename, 0, uint64(len(contents)), ioutil.NopCloser(bytes.NewReader(contents))) + nn, err := suite.StorageDriver.WriteStream(filename, 0, bytes.NewReader(contents)) c.Assert(err, check.IsNil) + c.Assert(nn, check.Equals, int64(len(contents))) reader, err := suite.StorageDriver.ReadStream(filename, 0) c.Assert(err, check.IsNil) @@ -465,7 +608,7 @@ func (suite *DriverSuite) writeReadCompareStreams(c *check.C, filename string, c var pathChars = []byte("abcdefghijklmnopqrstuvwxyz") -func randomString(length uint64) string { +func randomString(length int64) string { b := make([]byte, length) for i := range b { b[i] = pathChars[rand.Intn(len(pathChars))]