Refactor inmemory driver for Stat and WriteStream methods

This change started out as simply updating the existing inmemory driver to
implement the new Stat call. After struggling with the map based
implementation, it has been refactored to be a tree-based implementation.

This process has exposed a few missing error cases in the StorageDriver API
that should be addressed in the coming weeks.
This commit is contained in:
Stephen J Day 2014-12-04 20:14:41 -08:00
parent ab9570f872
commit 2ebc373d91
3 changed files with 453 additions and 80 deletions

View file

@ -5,9 +5,9 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"regexp"
"strings" "strings"
"sync" "sync"
"time"
"github.com/docker/docker-registry/storagedriver" "github.com/docker/docker-registry/storagedriver"
"github.com/docker/docker-registry/storagedriver/factory" "github.com/docker/docker-registry/storagedriver/factory"
@ -29,13 +29,18 @@ func (factory *inMemoryDriverFactory) Create(parameters map[string]string) (stor
// Driver is a storagedriver.StorageDriver implementation backed by a local map. // Driver is a storagedriver.StorageDriver implementation backed by a local map.
// Intended solely for example and testing purposes. // Intended solely for example and testing purposes.
type Driver struct { type Driver struct {
storage map[string][]byte root *dir
mutex sync.RWMutex mutex sync.RWMutex
} }
// New constructs a new Driver. // New constructs a new Driver.
func New() *Driver { func New() *Driver {
return &Driver{storage: make(map[string][]byte)} return &Driver{root: &dir{
common: common{
p: "/",
mod: time.Now(),
},
}}
} }
// Implement the storagedriver.StorageDriver interface. // Implement the storagedriver.StorageDriver interface.
@ -44,18 +49,31 @@ func New() *Driver {
func (d *Driver) GetContent(path string) ([]byte, error) { func (d *Driver) GetContent(path string) ([]byte, error) {
d.mutex.RLock() d.mutex.RLock()
defer d.mutex.RUnlock() defer d.mutex.RUnlock()
contents, ok := d.storage[path]
if !ok { rc, err := d.ReadStream(path, 0)
return nil, storagedriver.PathNotFoundError{Path: path} if err != nil {
return nil, err
} }
return contents, nil defer rc.Close()
return ioutil.ReadAll(rc)
} }
// PutContent stores the []byte content at a location designated by "path". // PutContent stores the []byte content at a location designated by "path".
func (d *Driver) PutContent(path string, contents []byte) error { func (d *Driver) PutContent(p string, contents []byte) error {
d.mutex.Lock() d.mutex.Lock()
defer d.mutex.Unlock() defer d.mutex.Unlock()
d.storage[path] = contents
f, err := d.root.mkfile(p)
if err != nil {
// TODO(stevvooe): Again, we need to clarify when this is not a
// directory in StorageDriver API.
return fmt.Errorf("not a file")
}
f.truncate()
f.WriteAt(contents, 0)
return nil return nil
} }
@ -64,86 +82,104 @@ func (d *Driver) PutContent(path string, contents []byte) error {
func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
d.mutex.RLock() d.mutex.RLock()
defer d.mutex.RUnlock() defer d.mutex.RUnlock()
contents, err := d.GetContent(path)
if err != nil { path = d.normalize(path)
return nil, err found := d.root.find(path)
} else if len(contents) <= int(offset) {
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} if found.path() != path {
return nil, storagedriver.PathNotFoundError{Path: path}
} }
src := contents[offset:] if found.isdir() {
buf := make([]byte, len(src)) return nil, fmt.Errorf("%q is a directory", path)
copy(buf, src) }
return ioutil.NopCloser(bytes.NewReader(buf)), nil
return ioutil.NopCloser(found.(*file).sectionReader(offset)), nil
} }
// WriteStream stores the contents of the provided io.ReadCloser at a location // WriteStream stores the contents of the provided io.ReadCloser at a location
// designated by the given path. // designated by the given path.
func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (nn int64, err error) {
defer reader.Close() d.mutex.Lock()
d.mutex.RLock() defer d.mutex.Unlock()
defer d.mutex.RUnlock()
resumableOffset, err := d.CurrentSize(path) if offset < 0 {
return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
}
normalized := d.normalize(path)
f, err := d.root.mkfile(normalized)
if err != nil { if err != nil {
return err return 0, fmt.Errorf("not a file")
} }
if offset > int64(resumableOffset) { var buf bytes.Buffer
return storagedriver.InvalidOffsetError{Path: path, Offset: offset}
}
contents, err := ioutil.ReadAll(reader) nn, err = buf.ReadFrom(reader)
if err != nil { if err != nil {
return err // TODO(stevvooe): This condition is odd and we may need to clarify:
// we've read nn bytes from reader but have written nothing to the
// backend. What is the correct return value? Really, the caller needs
// to know that the reader has been advanced and reattempting the
// operation is incorrect.
return nn, err
} }
if offset > 0 { f.WriteAt(buf.Bytes(), offset)
contents = append(d.storage[path][0:offset], contents...) return nn, err
}
d.storage[path] = contents
return nil
} }
// CurrentSize retrieves the curernt size in bytes of the object at the given // Stat returns info about the provided path.
// path. func (d *Driver) Stat(path string) (storagedriver.FileInfo, error) {
func (d *Driver) CurrentSize(path string) (uint64, error) {
d.mutex.RLock() d.mutex.RLock()
defer d.mutex.RUnlock() defer d.mutex.RUnlock()
contents, ok := d.storage[path]
if !ok { normalized := d.normalize(path)
return 0, nil found := d.root.find(path)
if found.path() != normalized {
return nil, storagedriver.PathNotFoundError{Path: path}
} }
return uint64(len(contents)), nil
fi := storagedriver.FileInfoFields{
Path: path,
IsDir: found.isdir(),
ModTime: found.modtime(),
}
if !fi.IsDir {
fi.Size = int64(len(found.(*file).data))
}
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
} }
// List returns a list of the objects that are direct descendants of the given // List returns a list of the objects that are direct descendants of the given
// path. // path.
func (d *Driver) List(path string) ([]string, error) { func (d *Driver) List(path string) ([]string, error) {
if path[len(path)-1] != '/' { normalized := d.normalize(path)
path += "/"
} found := d.root.find(normalized)
subPathMatcher, err := regexp.Compile(fmt.Sprintf("^%s[^/]+", path))
if err != nil { if !found.isdir() {
return nil, err return nil, fmt.Errorf("not a directory") // TODO(stevvooe): Need error type for this...
} }
d.mutex.RLock() entries, err := found.(*dir).list(normalized)
defer d.mutex.RUnlock()
// we use map to collect unique keys if err != nil {
keySet := make(map[string]struct{}) switch err {
for k := range d.storage { case errNotExists:
if key := subPathMatcher.FindString(k); key != "" { return nil, storagedriver.PathNotFoundError{Path: path}
keySet[key] = struct{}{} case errIsNotDir:
return nil, fmt.Errorf("not a directory")
default:
return nil, err
} }
} }
keys := make([]string, 0, len(keySet)) return entries, nil
for k := range keySet {
keys = append(keys, k)
}
return keys, nil
} }
// Move moves an object stored at sourcePath to destPath, removing the original // Move moves an object stored at sourcePath to destPath, removing the original
@ -151,32 +187,37 @@ func (d *Driver) List(path string) ([]string, error) {
func (d *Driver) Move(sourcePath string, destPath string) error { func (d *Driver) Move(sourcePath string, destPath string) error {
d.mutex.Lock() d.mutex.Lock()
defer d.mutex.Unlock() defer d.mutex.Unlock()
contents, ok := d.storage[sourcePath]
if !ok { normalizedSrc, normalizedDst := d.normalize(sourcePath), d.normalize(destPath)
return storagedriver.PathNotFoundError{Path: sourcePath}
err := d.root.move(normalizedSrc, normalizedDst)
switch err {
case errNotExists:
return storagedriver.PathNotFoundError{Path: destPath}
default:
return err
} }
d.storage[destPath] = contents
delete(d.storage, sourcePath)
return nil
} }
// Delete recursively deletes all objects stored at "path" and its subpaths. // Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *Driver) Delete(path string) error { func (d *Driver) Delete(path string) error {
d.mutex.Lock() d.mutex.Lock()
defer d.mutex.Unlock() defer d.mutex.Unlock()
var subPaths []string
for k := range d.storage {
if strings.HasPrefix(k, path) {
subPaths = append(subPaths, k)
}
}
if len(subPaths) == 0 { normalized := d.normalize(path)
err := d.root.delete(normalized)
switch err {
case errNotExists:
return storagedriver.PathNotFoundError{Path: path} return storagedriver.PathNotFoundError{Path: path}
default:
return err
} }
}
for _, subPath := range subPaths {
delete(d.storage, subPath) func (d *Driver) normalize(p string) string {
} if !strings.HasPrefix(p, "/") {
return nil p = "/" + p // Ghetto path absolution.
}
return p
} }

View file

@ -17,5 +17,8 @@ func init() {
return New(), nil return New(), nil
} }
testsuites.RegisterInProcessSuite(inmemoryDriverConstructor, testsuites.NeverSkip) testsuites.RegisterInProcessSuite(inmemoryDriverConstructor, testsuites.NeverSkip)
testsuites.RegisterIPCSuite(driverName, nil, testsuites.NeverSkip)
// BUG(stevvooe): Disable flaky IPC tests for now when we can troubleshoot
// the problems with libchan.
// testsuites.RegisterIPCSuite(driverName, nil, testsuites.NeverSkip)
} }

View file

@ -0,0 +1,329 @@
package inmemory
import (
"fmt"
"io"
"path"
"sort"
"strings"
"time"
)
var (
errExists = fmt.Errorf("exists")
errNotExists = fmt.Errorf("exists")
errIsNotDir = fmt.Errorf("notdir")
errIsDir = fmt.Errorf("isdir")
)
type node interface {
name() string
path() string
isdir() bool
modtime() time.Time
}
// dir is the central type for the memory-based storagedriver. All operations
// are dispatched from a root dir.
type dir struct {
common
// TODO(stevvooe): Use sorted slice + search.
children map[string]node
}
var _ node = &dir{}
func (d *dir) isdir() bool {
return true
}
// add places the node n into dir d.
func (d *dir) add(n node) {
if d.children == nil {
d.children = make(map[string]node)
}
d.children[n.name()] = n
d.mod = time.Now()
}
// find searches for the node, given path q in dir. If the node is found, it
// will be returned. If the node is not found, the closet existing parent. If
// the node is found, the returned (node).path() will match q.
func (d *dir) find(q string) node {
q = strings.Trim(q, "/")
i := strings.Index(q, "/")
if q == "" {
return d
}
if i == 0 {
panic("shouldn't happen, no root paths")
}
var component string
if i < 0 {
// No more path components
component = q
} else {
component = q[:i]
}
child, ok := d.children[component]
if !ok {
// Node was not found. Return p and the current node.
return d
}
if child.isdir() {
// traverse down!
q = q[i+1:]
return child.(*dir).find(q)
}
return child
}
func (d *dir) list(p string) ([]string, error) {
n := d.find(p)
if n.path() != p {
return nil, errNotExists
}
if !n.isdir() {
return nil, errIsNotDir
}
var children []string
for _, child := range n.(*dir).children {
children = append(children, child.path())
}
sort.Strings(children)
return children, nil
}
// mkfile or return the existing one. returns an error if it exists and is a
// directory. Essentially, this is open or create.
func (d *dir) mkfile(p string) (*file, error) {
n := d.find(p)
if n.path() == p {
if n.isdir() {
return nil, errIsDir
}
return n.(*file), nil
}
dirpath, filename := path.Split(p)
// Make any non-existent directories
n, err := d.mkdirs(dirpath)
if err != nil {
return nil, err
}
dd := n.(*dir)
n = &file{
common: common{
p: path.Join(dd.path(), filename),
mod: time.Now(),
},
}
dd.add(n)
return n.(*file), nil
}
// mkdirs creates any missing directory entries in p and returns the result.
func (d *dir) mkdirs(p string) (*dir, error) {
if p == "" {
p = "/"
}
n := d.find(p)
if !n.isdir() {
// Found something there
return nil, errIsNotDir
}
if n.path() == p {
return n.(*dir), nil
}
dd := n.(*dir)
relative := strings.Trim(strings.TrimPrefix(p, n.path()), "/")
if relative == "" {
return dd, nil
}
components := strings.Split(relative, "/")
for _, component := range components {
d, err := dd.mkdir(component)
if err != nil {
// This should actually never happen, since there are no children.
return nil, err
}
dd = d
}
return dd, nil
}
// mkdir creates a child directory under d with the given name.
func (d *dir) mkdir(name string) (*dir, error) {
if name == "" {
return nil, fmt.Errorf("invalid dirname")
}
_, ok := d.children[name]
if ok {
return nil, errExists
}
child := &dir{
common: common{
p: path.Join(d.path(), name),
mod: time.Now(),
},
}
d.add(child)
d.mod = time.Now()
return child, nil
}
func (d *dir) move(src, dst string) error {
dstDirname, _ := path.Split(dst)
dp, err := d.mkdirs(dstDirname)
if err != nil {
return err
}
srcDirname, srcFilename := path.Split(src)
sp := d.find(srcDirname)
if sp.path() != srcDirname {
return errNotExists
}
s, ok := sp.(*dir).children[srcFilename]
if !ok {
return errNotExists
}
delete(sp.(*dir).children, srcFilename)
switch n := s.(type) {
case *dir:
n.p = dst
case *file:
n.p = dst
}
dp.add(s)
return nil
}
func (d *dir) delete(p string) error {
dirname, filename := path.Split(p)
parent := d.find(dirname)
if dirname != parent.path() {
return errNotExists
}
if _, ok := parent.(*dir).children[filename]; !ok {
return errNotExists
}
delete(parent.(*dir).children, filename)
return nil
}
// dump outputs a primitive directory structure to stdout.
func (d *dir) dump(indent string) {
fmt.Println(indent, d.name()+"/")
for _, child := range d.children {
if child.isdir() {
child.(*dir).dump(indent + "\t")
} else {
fmt.Println(indent, child.name())
}
}
}
func (d *dir) String() string {
return fmt.Sprintf("&dir{path: %v, children: %v}", d.p, d.children)
}
// file stores actual data in the fs tree. It acts like an open, seekable file
// where operations are conducted through ReadAt and WriteAt. Use it with
// SectionReader for the best effect.
type file struct {
common
data []byte
}
var _ node = &file{}
func (f *file) isdir() bool {
return false
}
func (f *file) truncate() {
f.data = f.data[:0]
}
func (f *file) sectionReader(offset int64) io.Reader {
return io.NewSectionReader(f, offset, int64(len(f.data))-offset)
}
func (f *file) ReadAt(p []byte, offset int64) (n int, err error) {
return copy(p, f.data[offset:]), nil
}
func (f *file) WriteAt(p []byte, offset int64) (n int, err error) {
if len(f.data) > 0 && offset >= int64(len(f.data)) {
// Extend missing region with a zero pad, while also preallocating out to size of p.
pad := offset - int64(len(f.data))
size := len(p) + int(pad)
f.data = append(f.data, make([]byte, pad, size)...)
}
f.data = append(f.data, p...)
return len(p), nil
}
func (f *file) String() string {
return fmt.Sprintf("&file{path: %q}", f.p)
}
// common provides shared fields and methods for node implementations.
type common struct {
p string
mod time.Time
}
func (c *common) name() string {
_, name := path.Split(c.p)
return name
}
func (c *common) path() string {
return c.p
}
func (c *common) modtime() time.Time {
return c.mod
}