Use only one Swift container for both files and manifests
Signed-off-by: Sylvain Baubeau <sbaubeau@redhat.com>
This commit is contained in:
parent
a40502ec02
commit
57cef57e1b
1 changed files with 81 additions and 69 deletions
|
@ -156,10 +156,6 @@ func New(params DriverParameters) (*Driver, error) {
|
||||||
return nil, fmt.Errorf("Failed to create container %s (%s)", params.Container, err)
|
return nil, fmt.Errorf("Failed to create container %s (%s)", params.Container, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := ct.ContainerCreate(params.Container+"_segments", nil); err != nil {
|
|
||||||
return nil, fmt.Errorf("Failed to create container %s (%s)", params.Container+"_segments", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
d := &driver{
|
d := &driver{
|
||||||
Conn: ct,
|
Conn: ct,
|
||||||
Container: params.Container,
|
Container: params.Container,
|
||||||
|
@ -231,8 +227,8 @@ func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.
|
||||||
func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (int64, error) {
|
func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (int64, error) {
|
||||||
var (
|
var (
|
||||||
segments []swift.Object
|
segments []swift.Object
|
||||||
|
multi io.Reader
|
||||||
paddingReader io.Reader
|
paddingReader io.Reader
|
||||||
bytesRead int64
|
|
||||||
currentLength int64
|
currentLength int64
|
||||||
cursor int64
|
cursor int64
|
||||||
)
|
)
|
||||||
|
@ -240,10 +236,9 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
|
||||||
partNumber := 1
|
partNumber := 1
|
||||||
chunkSize := int64(d.ChunkSize)
|
chunkSize := int64(d.ChunkSize)
|
||||||
zeroBuf := make([]byte, d.ChunkSize)
|
zeroBuf := make([]byte, d.ChunkSize)
|
||||||
segmentsContainer := d.getSegmentsContainer()
|
|
||||||
|
|
||||||
getSegment := func() string {
|
getSegment := func() string {
|
||||||
return d.swiftPath(path) + "/" + fmt.Sprintf("%016d", partNumber)
|
return d.swiftSegmentPath(path) + "/" + fmt.Sprintf("%016d", partNumber)
|
||||||
}
|
}
|
||||||
|
|
||||||
max := func(a int64, b int64) int64 {
|
max := func(a int64, b int64) int64 {
|
||||||
|
@ -258,22 +253,22 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
|
||||||
if swiftErr, ok := err.(*swift.Error); ok && swiftErr.StatusCode == 404 {
|
if swiftErr, ok := err.(*swift.Error); ok && swiftErr.StatusCode == 404 {
|
||||||
// Create a object manifest
|
// Create a object manifest
|
||||||
if err := d.createParentFolders(path); err != nil {
|
if err := d.createParentFolders(path); err != nil {
|
||||||
return bytesRead, err
|
return 0, err
|
||||||
}
|
}
|
||||||
manifest, err := d.createManifest(path)
|
manifest, err := d.createManifest(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return bytesRead, parseError(path, err)
|
return 0, parseError(path, err)
|
||||||
}
|
}
|
||||||
manifest.Close()
|
manifest.Close()
|
||||||
} else {
|
} else {
|
||||||
return bytesRead, parseError(path, err)
|
return 0, parseError(path, err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// The manifest already exists. Get all the segments
|
// The manifest already exists. Get all the segments
|
||||||
currentLength = info.Bytes
|
currentLength = info.Bytes
|
||||||
segments, err = d.getAllSegments(segmentsContainer, path)
|
segments, err = d.getAllSegments(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return bytesRead, parseError(path, err)
|
return 0, parseError(path, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -291,7 +286,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
|
||||||
if offset >= currentLength {
|
if offset >= currentLength {
|
||||||
for offset-currentLength >= chunkSize {
|
for offset-currentLength >= chunkSize {
|
||||||
// Insert a block a zero
|
// Insert a block a zero
|
||||||
d.Conn.ObjectPut(segmentsContainer, getSegment(),
|
d.Conn.ObjectPut(d.Container, getSegment(),
|
||||||
bytes.NewReader(zeroBuf), false, "",
|
bytes.NewReader(zeroBuf), false, "",
|
||||||
d.getContentType(), nil)
|
d.getContentType(), nil)
|
||||||
currentLength += chunkSize
|
currentLength += chunkSize
|
||||||
|
@ -303,26 +298,34 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
|
||||||
} else {
|
} else {
|
||||||
// Offset is inside the current segment : we need to read the
|
// Offset is inside the current segment : we need to read the
|
||||||
// data from the beginning of the segment to offset
|
// data from the beginning of the segment to offset
|
||||||
paddingReader, _, err = d.Conn.ObjectOpen(segmentsContainer, getSegment(), false, nil)
|
file, _, err := d.Conn.ObjectOpen(d.Container, getSegment(), false, nil)
|
||||||
|
defer file.Close()
|
||||||
|
paddingReader = file
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return bytesRead, parseError(getSegment(), err)
|
return 0, parseError(getSegment(), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
multi := io.MultiReader(
|
multi = io.MultiReader(
|
||||||
io.LimitReader(paddingReader, offset-cursor),
|
io.LimitReader(paddingReader, offset-cursor),
|
||||||
io.LimitReader(reader, chunkSize-(offset-cursor)),
|
io.LimitReader(reader, chunkSize-(offset-cursor)),
|
||||||
)
|
)
|
||||||
|
|
||||||
for {
|
writeSegment := func(segment string) (finished bool, bytesRead int64, err error) {
|
||||||
currentSegment, err := d.Conn.ObjectCreate(segmentsContainer, getSegment(), false, "", d.getContentType(), nil)
|
currentSegment, err := d.Conn.ObjectCreate(d.Container, segment, false, "", d.getContentType(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return bytesRead, parseError(path, err)
|
return false, bytesRead, parseError(path, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err := io.Copy(currentSegment, multi)
|
n, err := io.Copy(currentSegment, multi)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return bytesRead, parseError(path, err)
|
return false, bytesRead, parseError(path, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if n > 0 {
|
||||||
|
defer currentSegment.Close()
|
||||||
|
bytesRead += n - max(0, offset-cursor)
|
||||||
}
|
}
|
||||||
|
|
||||||
if n < chunkSize {
|
if n < chunkSize {
|
||||||
|
@ -333,25 +336,39 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
|
||||||
headers["Range"] = "bytes=" + strconv.FormatInt(cursor+n, 10) + "-" + strconv.FormatInt(cursor+chunkSize, 10)
|
headers["Range"] = "bytes=" + strconv.FormatInt(cursor+n, 10) + "-" + strconv.FormatInt(cursor+chunkSize, 10)
|
||||||
file, _, err := d.Conn.ObjectOpen(d.Container, d.swiftPath(path), false, headers)
|
file, _, err := d.Conn.ObjectOpen(d.Container, d.swiftPath(path), false, headers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return bytesRead, parseError(path, err)
|
return false, bytesRead, parseError(path, err)
|
||||||
}
|
}
|
||||||
if _, err := io.Copy(currentSegment, file); err != nil {
|
|
||||||
return bytesRead, parseError(path, err)
|
_, copyErr := io.Copy(currentSegment, file)
|
||||||
|
|
||||||
|
if err := file.Close(); err != nil {
|
||||||
|
return false, bytesRead, parseError(path, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if copyErr != nil {
|
||||||
|
return false, bytesRead, parseError(path, copyErr)
|
||||||
}
|
}
|
||||||
file.Close()
|
|
||||||
}
|
}
|
||||||
if n > 0 {
|
|
||||||
currentSegment.Close()
|
return true, bytesRead, nil
|
||||||
bytesRead += n - max(0, offset-cursor)
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
|
|
||||||
currentSegment.Close()
|
multi = io.LimitReader(reader, chunkSize)
|
||||||
bytesRead += n - max(0, offset-cursor)
|
|
||||||
multi = io.MultiReader(io.LimitReader(reader, chunkSize))
|
|
||||||
cursor += chunkSize
|
cursor += chunkSize
|
||||||
partNumber++
|
partNumber++
|
||||||
|
|
||||||
|
return false, bytesRead, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
finished := false
|
||||||
|
read := int64(0)
|
||||||
|
bytesRead := int64(0)
|
||||||
|
for finished == false {
|
||||||
|
finished, read, err = writeSegment(getSegment())
|
||||||
|
bytesRead += read
|
||||||
|
if err != nil {
|
||||||
|
return bytesRead, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return bytesRead, nil
|
return bytesRead, nil
|
||||||
|
@ -392,7 +409,7 @@ func (d *driver) List(ctx context.Context, path string) ([]string, error) {
|
||||||
objects, err := d.Conn.Objects(d.Container, opts)
|
objects, err := d.Conn.Objects(d.Container, opts)
|
||||||
for _, obj := range objects {
|
for _, obj := range objects {
|
||||||
if !obj.PseudoDirectory {
|
if !obj.PseudoDirectory {
|
||||||
files = append(files, "/"+strings.TrimSuffix(obj.Name, "/"))
|
files = append(files, strings.TrimPrefix(strings.TrimSuffix(obj.Name, "/"), d.swiftPath("/")))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -425,40 +442,35 @@ func (d *driver) Delete(ctx context.Context, path string) error {
|
||||||
return storagedriver.PathNotFoundError{Path: path}
|
return storagedriver.PathNotFoundError{Path: path}
|
||||||
}
|
}
|
||||||
|
|
||||||
for index, name := range objects {
|
|
||||||
objects[index] = name[len(d.Prefix):]
|
|
||||||
}
|
|
||||||
|
|
||||||
var multiDelete = true
|
|
||||||
if d.BulkDeleteSupport {
|
if d.BulkDeleteSupport {
|
||||||
_, err := d.Conn.BulkDelete(d.Container, objects)
|
if _, err := d.Conn.BulkDelete(d.Container, objects); err != swift.Forbidden {
|
||||||
multiDelete = err != nil
|
return parseError(path, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if multiDelete {
|
|
||||||
for _, name := range objects {
|
|
||||||
if _, headers, err := d.Conn.Object(d.Container, name); err == nil {
|
|
||||||
manifest, ok := headers["X-Object-Manifest"]
|
|
||||||
if ok {
|
|
||||||
components := strings.SplitN(manifest, "/", 2)
|
|
||||||
segContainer := components[0]
|
|
||||||
segments, err := d.getAllSegments(segContainer, components[1])
|
|
||||||
if err != nil {
|
|
||||||
return parseError(name, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, s := range segments {
|
for _, name := range objects {
|
||||||
if err := d.Conn.ObjectDelete(segContainer, s.Name); err != nil {
|
if _, headers, err := d.Conn.Object(d.Container, name); err == nil {
|
||||||
return parseError(s.Name, err)
|
manifest, ok := headers["X-Object-Manifest"]
|
||||||
}
|
if ok {
|
||||||
|
components := strings.SplitN(manifest, "/", 2)
|
||||||
|
segContainer := components[0]
|
||||||
|
segments, err := d.getAllSegments(components[1])
|
||||||
|
if err != nil {
|
||||||
|
return parseError(name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, s := range segments {
|
||||||
|
if err := d.Conn.ObjectDelete(segContainer, s.Name); err != nil {
|
||||||
|
return parseError(s.Name, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
return parseError(name, err)
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
return parseError(name, err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := d.Conn.ObjectDelete(d.Container, name); err != nil {
|
if err := d.Conn.ObjectDelete(d.Container, name); err != nil {
|
||||||
return parseError(name, err)
|
return parseError(name, err)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -472,14 +484,18 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) swiftPath(path string) string {
|
func (d *driver) swiftPath(path string) string {
|
||||||
return strings.TrimLeft(strings.TrimRight(d.Prefix, "/")+path, "/")
|
return strings.TrimLeft(strings.TrimRight(d.Prefix+"/files"+path, "/"), "/")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *driver) swiftSegmentPath(path string) string {
|
||||||
|
return strings.TrimLeft(strings.TrimRight(d.Prefix+"/segments"+path, "/"), "/")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) createParentFolders(path string) error {
|
func (d *driver) createParentFolders(path string) error {
|
||||||
dir := gopath.Dir(path)
|
dir := gopath.Dir(path)
|
||||||
for dir != "/" {
|
for dir != "/" {
|
||||||
_, _, err := d.Conn.Object(d.Container, d.swiftPath(dir))
|
_, _, err := d.Conn.Object(d.Container, d.swiftPath(dir))
|
||||||
if swiftErr, ok := err.(*swift.Error); ok && swiftErr.StatusCode == 404 {
|
if err == swift.ContainerNotFound || err == swift.ObjectNotFound {
|
||||||
_, err := d.Conn.ObjectPut(d.Container, d.swiftPath(dir), bytes.NewReader(make([]byte, 0)),
|
_, err := d.Conn.ObjectPut(d.Container, d.swiftPath(dir), bytes.NewReader(make([]byte, 0)),
|
||||||
false, "", directoryMimeType, nil)
|
false, "", directoryMimeType, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -496,17 +512,13 @@ func (d *driver) getContentType() string {
|
||||||
return "application/octet-stream"
|
return "application/octet-stream"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) getSegmentsContainer() string {
|
func (d *driver) getAllSegments(path string) ([]swift.Object, error) {
|
||||||
return d.Container + "_segments"
|
return d.Conn.Objects(d.Container, &swift.ObjectsOpts{Prefix: d.swiftSegmentPath(path)})
|
||||||
}
|
|
||||||
|
|
||||||
func (d *driver) getAllSegments(container string, path string) ([]swift.Object, error) {
|
|
||||||
return d.Conn.Objects(container, &swift.ObjectsOpts{Prefix: d.swiftPath(path)})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) createManifest(path string) (*swift.ObjectCreateFile, error) {
|
func (d *driver) createManifest(path string) (*swift.ObjectCreateFile, error) {
|
||||||
headers := make(swift.Headers)
|
headers := make(swift.Headers)
|
||||||
headers["X-Object-Manifest"] = d.getSegmentsContainer() + "/" + d.swiftPath(path)
|
headers["X-Object-Manifest"] = d.Container + "/" + d.swiftSegmentPath(path)
|
||||||
return d.Conn.ObjectCreate(d.Container, d.swiftPath(path), false, "",
|
return d.Conn.ObjectCreate(d.Container, d.swiftPath(path), false, "",
|
||||||
d.getContentType(), headers)
|
d.getContentType(), headers)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue