diff --git a/docs/storage/driver/rados/rados.go b/docs/storage/driver/rados/rados.go new file mode 100644 index 00000000..999b06b0 --- /dev/null +++ b/docs/storage/driver/rados/rados.go @@ -0,0 +1,628 @@ +package rados + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "io/ioutil" + "path" + "strconv" + + "code.google.com/p/go-uuid/uuid" + log "github.com/Sirupsen/logrus" + "github.com/docker/distribution/context" + storagedriver "github.com/docker/distribution/registry/storage/driver" + "github.com/docker/distribution/registry/storage/driver/base" + "github.com/docker/distribution/registry/storage/driver/factory" + "github.com/noahdesu/go-ceph/rados" +) + +const driverName = "rados" + +// Prefix all the stored blob +const objectBlobPrefix = "blob:" + +// Stripes objects size to 4M +const defaultChunkSize = 4 << 20 +const defaultXattrTotalSizeName = "total-size" + +// Max number of keys fetched from omap at each read operation +const defaultKeysFetched = 1 + +//DriverParameters A struct that encapsulates all of the driver parameters after all values have been set +type DriverParameters struct { + poolname string + username string + chunksize uint64 +} + +func init() { + factory.Register(driverName, &radosDriverFactory{}) +} + +// radosDriverFactory implements the factory.StorageDriverFactory interface +type radosDriverFactory struct{} + +func (factory *radosDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) { + return FromParameters(parameters) +} + +type driver struct { + Conn *rados.Conn + Ioctx *rados.IOContext + chunksize uint64 +} + +type baseEmbed struct { + base.Base +} + +// Driver is a storagedriver.StorageDriver implementation backed by Ceph RADOS +// Objects are stored at absolute keys in the provided bucket. +type Driver struct { + baseEmbed +} + +// FromParameters constructs a new Driver with a given parameters map +// Required parameters: +// - poolname: the ceph pool name +func FromParameters(parameters map[string]interface{}) (*Driver, error) { + + pool, ok := parameters["poolname"] + if !ok { + return nil, fmt.Errorf("No poolname parameter provided") + } + + username, ok := parameters["username"] + if !ok { + username = "" + } + + chunksize := uint64(defaultChunkSize) + chunksizeParam, ok := parameters["chunksize"] + if ok { + chunksize, ok = chunksizeParam.(uint64) + if !ok { + return nil, fmt.Errorf("The chunksize parameter should be a number") + } + } + + params := DriverParameters{ + fmt.Sprint(pool), + fmt.Sprint(username), + chunksize, + } + + return New(params) +} + +// New constructs a new Driver +func New(params DriverParameters) (*Driver, error) { + var conn *rados.Conn + var err error + + if params.username != "" { + log.Infof("Opening connection to pool %s using user %s", params.poolname, params.username) + conn, err = rados.NewConnWithUser(params.username) + } else { + log.Infof("Opening connection to pool %s", params.poolname) + conn, err = rados.NewConn() + } + + if err != nil { + return nil, err + } + + err = conn.ReadDefaultConfigFile() + if err != nil { + return nil, err + } + + err = conn.Connect() + if err != nil { + return nil, err + } + + log.Infof("Connected") + + ioctx, err := conn.OpenIOContext(params.poolname) + + log.Infof("Connected to pool %s", params.poolname) + + if err != nil { + return nil, err + } + + d := &driver{ + Ioctx: ioctx, + Conn: conn, + chunksize: params.chunksize, + } + + return &Driver{ + baseEmbed: baseEmbed{ + Base: base.Base{ + StorageDriver: d, + }, + }, + }, nil +} + +// Implement the storagedriver.StorageDriver interface + +func (d *driver) Name() string { + return driverName +} + +// GetContent retrieves the content stored at "path" as a []byte. +func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { + rc, err := d.ReadStream(ctx, path, 0) + if err != nil { + return nil, err + } + defer rc.Close() + + p, err := ioutil.ReadAll(rc) + if err != nil { + return nil, err + } + + return p, nil +} + +// PutContent stores the []byte content at a location designated by "path". +func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error { + if _, err := d.WriteStream(ctx, path, 0, bytes.NewReader(contents)); err != nil { + return err + } + + return nil +} + +// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a +// given byte offset. +type readStreamReader struct { + driver *driver + oid string + size uint64 + offset uint64 +} + +func (r *readStreamReader) Read(b []byte) (n int, err error) { + // Determine the part available to read + bufferOffset := uint64(0) + bufferSize := uint64(len(b)) + + // End of the object, read less than the buffer size + if bufferSize > r.size-r.offset { + bufferSize = r.size - r.offset + } + + // Fill `b` + for bufferOffset < bufferSize { + // Get the offset in the object chunk + chunkedOid, chunkedOffset := r.driver.getChunkNameFromOffset(r.oid, r.offset) + + // Determine the best size to read + bufferEndOffset := bufferSize + if bufferEndOffset-bufferOffset > r.driver.chunksize-chunkedOffset { + bufferEndOffset = bufferOffset + (r.driver.chunksize - chunkedOffset) + } + + // Read the chunk + n, err = r.driver.Ioctx.Read(chunkedOid, b[bufferOffset:bufferEndOffset], chunkedOffset) + + if err != nil { + return int(bufferOffset), err + } + + bufferOffset += uint64(n) + r.offset += uint64(n) + } + + // EOF if the offset is at the end of the object + if r.offset == r.size { + return int(bufferOffset), io.EOF + } + + return int(bufferOffset), nil +} + +func (r *readStreamReader) Close() error { + return nil +} + +func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { + // get oid from filename + oid, err := d.getOid(path) + + if err != nil { + return nil, err + } + + // get object stat + stat, err := d.Stat(ctx, path) + + if err != nil { + return nil, err + } + + if offset > stat.Size() { + return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} + } + + return &readStreamReader{ + driver: d, + oid: oid, + size: uint64(stat.Size()), + offset: uint64(offset), + }, nil +} + +func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) { + buf := make([]byte, d.chunksize) + totalRead = 0 + + oid, err := d.getOid(path) + if err != nil { + switch err.(type) { + // Trying to write new object, generate new blob identifier for it + case storagedriver.PathNotFoundError: + oid = d.generateOid() + err = d.putOid(path, oid) + if err != nil { + return 0, err + } + default: + return 0, err + } + } else { + // Check total object size only for existing ones + totalSize, err := d.getXattrTotalSize(ctx, oid) + if err != nil { + return 0, err + } + + // If offset if after the current object size, fill the gap with zeros + for totalSize < uint64(offset) { + sizeToWrite := d.chunksize + if totalSize-uint64(offset) < sizeToWrite { + sizeToWrite = totalSize - uint64(offset) + } + + chunkName, chunkOffset := d.getChunkNameFromOffset(oid, uint64(totalSize)) + err = d.Ioctx.Write(chunkName, buf[:sizeToWrite], uint64(chunkOffset)) + if err != nil { + return totalRead, err + } + + totalSize += sizeToWrite + } + } + + // Writer + for { + // Align to chunk size + sizeRead := uint64(0) + sizeToRead := uint64(offset+totalRead) % d.chunksize + if sizeToRead == 0 { + sizeToRead = d.chunksize + } + + // Read from `reader` + for sizeRead < sizeToRead { + nn, err := reader.Read(buf[sizeRead:sizeToRead]) + sizeRead += uint64(nn) + + if err != nil { + if err != io.EOF { + return totalRead, err + } + + break + } + } + + // End of file and nothing was read + if sizeRead == 0 { + break + } + + // Write chunk object + chunkName, chunkOffset := d.getChunkNameFromOffset(oid, uint64(offset+totalRead)) + err = d.Ioctx.Write(chunkName, buf[:sizeRead], uint64(chunkOffset)) + + if err != nil { + return totalRead, err + } + + // Update total object size as xattr in the first chunk of the object + err = d.setXattrTotalSize(oid, uint64(offset+totalRead)+sizeRead) + if err != nil { + return totalRead, err + } + + totalRead += int64(sizeRead) + + // End of file + if sizeRead < sizeToRead { + break + } + } + + return totalRead, nil +} + +// Stat retrieves the FileInfo for the given path, including the current size +func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { + // get oid from filename + oid, err := d.getOid(path) + + if err != nil { + return nil, err + } + + // the path is a virtual directory? + if oid == "" { + return storagedriver.FileInfoInternal{ + FileInfoFields: storagedriver.FileInfoFields{ + Path: path, + Size: 0, + IsDir: true, + }, + }, nil + } + + // stat first chunk + stat, err := d.Ioctx.Stat(oid + "-0") + + if err != nil { + return nil, err + } + + // get total size of chunked object + totalSize, err := d.getXattrTotalSize(ctx, oid) + + if err != nil { + return nil, err + } + + return storagedriver.FileInfoInternal{ + FileInfoFields: storagedriver.FileInfoFields{ + Path: path, + Size: int64(totalSize), + ModTime: stat.ModTime, + }, + }, nil +} + +// List returns a list of the objects that are direct descendants of the given path. +func (d *driver) List(ctx context.Context, dirPath string) ([]string, error) { + files, err := d.listDirectoryOid(dirPath) + + if err != nil { + return nil, err + } + + keys := make([]string, 0, len(files)) + for k := range files { + keys = append(keys, path.Join(dirPath, k)) + } + + return keys, nil +} + +// Move moves an object stored at sourcePath to destPath, removing the original +// object. +func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { + // Get oid + oid, err := d.getOid(sourcePath) + + if err != nil { + return err + } + + // Move reference + err = d.putOid(destPath, oid) + + if err != nil { + return err + } + + // Delete old reference + err = d.deleteOid(sourcePath) + + if err != nil { + return err + } + + return nil +} + +// Delete recursively deletes all objects stored at "path" and its subpaths. +func (d *driver) Delete(ctx context.Context, objectPath string) error { + // Get oid + oid, err := d.getOid(objectPath) + + if err != nil { + return err + } + + // Deleting virtual directory + if oid == "" { + objects, err := d.listDirectoryOid(objectPath) + if err != nil { + return err + } + + for object := range objects { + err = d.Delete(ctx, path.Join(objectPath, object)) + if err != nil { + return err + } + } + } else { + // Delete object chunks + totalSize, err := d.getXattrTotalSize(ctx, oid) + + if err != nil { + return err + } + + for offset := uint64(0); offset < totalSize; offset += d.chunksize { + chunkName, _ := d.getChunkNameFromOffset(oid, offset) + + err = d.Ioctx.Delete(chunkName) + if err != nil { + return err + } + } + + // Delete reference + err = d.deleteOid(objectPath) + if err != nil { + return err + } + } + + return nil +} + +// URLFor returns a URL which may be used to retrieve the content stored at the given path. +// May return an UnsupportedMethodErr in certain StorageDriver implementations. +func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { + return "", storagedriver.ErrUnsupportedMethod +} + +// Generate a blob identifier +func (d *driver) generateOid() string { + return objectBlobPrefix + uuid.New() +} + +// Reference a object and its hierarchy +func (d *driver) putOid(objectPath string, oid string) error { + directory := path.Dir(objectPath) + base := path.Base(objectPath) + createParentReference := true + + // After creating this reference, skip the parents referencing since the + // hierarchy already exists + if oid == "" { + firstReference, err := d.Ioctx.GetOmapValues(directory, "", "", 1) + if (err == nil) && (len(firstReference) > 0) { + createParentReference = false + } + } + + oids := map[string][]byte{ + base: []byte(oid), + } + + // Reference object + err := d.Ioctx.SetOmap(directory, oids) + if err != nil { + return err + } + + // Esure parent virtual directories + if createParentReference && directory != "/" { + return d.putOid(directory, "") + } + + return nil +} + +// Get the object identifier from an object name +func (d *driver) getOid(objectPath string) (string, error) { + directory := path.Dir(objectPath) + base := path.Base(objectPath) + + files, err := d.Ioctx.GetOmapValues(directory, "", base, 1) + + if (err != nil) || (files[base] == nil) { + return "", storagedriver.PathNotFoundError{Path: objectPath} + } + + return string(files[base]), nil +} + +// List the objects of a virtual directory +func (d *driver) listDirectoryOid(path string) (list map[string][]byte, err error) { + return d.Ioctx.GetAllOmapValues(path, "", "", defaultKeysFetched) +} + +// Remove a file from the files hierarchy +func (d *driver) deleteOid(objectPath string) error { + // Remove object reference + directory := path.Dir(objectPath) + base := path.Base(objectPath) + err := d.Ioctx.RmOmapKeys(directory, []string{base}) + + if err != nil { + return err + } + + // Remove virtual directory if empty (no more references) + firstReference, err := d.Ioctx.GetOmapValues(directory, "", "", 1) + + if err != nil { + return err + } + + if len(firstReference) == 0 { + // Delete omap + err := d.Ioctx.Delete(directory) + + if err != nil { + return err + } + + // Remove reference on parent omaps + if directory != "/" { + return d.deleteOid(directory) + } + } + + return nil +} + +// Takes an offset in an chunked object and return the chunk name and a new +// offset in this chunk object +func (d *driver) getChunkNameFromOffset(oid string, offset uint64) (string, uint64) { + chunkID := offset / d.chunksize + chunkedOid := oid + "-" + strconv.FormatInt(int64(chunkID), 10) + chunkedOffset := offset % d.chunksize + return chunkedOid, chunkedOffset +} + +// Set the total size of a chunked object `oid` +func (d *driver) setXattrTotalSize(oid string, size uint64) error { + // Convert uint64 `size` to []byte + xattr := make([]byte, binary.MaxVarintLen64) + binary.LittleEndian.PutUint64(xattr, size) + + // Save the total size as a xattr in the first chunk + return d.Ioctx.SetXattr(oid+"-0", defaultXattrTotalSizeName, xattr) +} + +// Get the total size of the chunked object `oid` stored as xattr +func (d *driver) getXattrTotalSize(ctx context.Context, oid string) (uint64, error) { + // Fetch xattr as []byte + xattr := make([]byte, binary.MaxVarintLen64) + xattrLength, err := d.Ioctx.GetXattr(oid+"-0", defaultXattrTotalSizeName, xattr) + + if err != nil { + return 0, err + } + + if xattrLength != len(xattr) { + context.GetLogger(ctx).Errorf("object %s xattr length mismatch: %d != %d", oid, xattrLength, len(xattr)) + return 0, storagedriver.PathNotFoundError{Path: oid} + } + + // Convert []byte as uint64 + totalSize := binary.LittleEndian.Uint64(xattr) + + return totalSize, nil +} diff --git a/docs/storage/driver/rados/rados_test.go b/docs/storage/driver/rados/rados_test.go new file mode 100644 index 00000000..29486e89 --- /dev/null +++ b/docs/storage/driver/rados/rados_test.go @@ -0,0 +1,38 @@ +package rados + +import ( + "os" + "testing" + + storagedriver "github.com/docker/distribution/registry/storage/driver" + "github.com/docker/distribution/registry/storage/driver/testsuites" + + "gopkg.in/check.v1" +) + +// Hook up gocheck into the "go test" runner. +func Test(t *testing.T) { check.TestingT(t) } + +func init() { + poolname := os.Getenv("RADOS_POOL") + username := os.Getenv("RADOS_USER") + + driverConstructor := func() (storagedriver.StorageDriver, error) { + parameters := DriverParameters{ + poolname, + username, + defaultChunkSize, + } + + return New(parameters) + } + + skipCheck := func() string { + if poolname == "" { + return "RADOS_POOL must be set to run Rado tests" + } + return "" + } + + testsuites.RegisterInProcessSuite(driverConstructor, skipCheck) +}