Do not use Swift server side copy for manifests to handle >5G files
Signed-off-by: Sylvain Baubeau <sbaubeau@redhat.com>
This commit is contained in:
parent
326c3a9c49
commit
b4cf6c053b
1 changed files with 76 additions and 27 deletions
|
@ -20,7 +20,10 @@ package swift
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"crypto/sha1"
|
||||
"crypto/tls"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -237,6 +240,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
|
|||
paddingReader io.Reader
|
||||
currentLength int64
|
||||
cursor int64
|
||||
segmentPath string
|
||||
)
|
||||
|
||||
partNumber := 1
|
||||
|
@ -244,7 +248,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
|
|||
zeroBuf := make([]byte, d.ChunkSize)
|
||||
|
||||
getSegment := func() string {
|
||||
return fmt.Sprintf("%s/%016d", d.swiftSegmentPath(path), partNumber)
|
||||
return fmt.Sprintf("%s/%016d", segmentPath, partNumber)
|
||||
}
|
||||
|
||||
max := func(a int64, b int64) int64 {
|
||||
|
@ -254,24 +258,36 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
|
|||
return b
|
||||
}
|
||||
|
||||
info, _, err := d.Conn.Object(d.Container, d.swiftPath(path))
|
||||
if err != nil {
|
||||
if err == swift.ObjectNotFound {
|
||||
// Create a object manifest
|
||||
manifest, err := d.createManifest(path)
|
||||
if err != nil {
|
||||
createManifest := true
|
||||
info, headers, err := d.Conn.Object(d.Container, d.swiftPath(path))
|
||||
if err == nil {
|
||||
manifest, ok := headers["X-Object-Manifest"]
|
||||
if !ok {
|
||||
if segmentPath, err = d.swiftSegmentPath(path); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
manifest.Close()
|
||||
} else if err == swift.ContainerNotFound {
|
||||
return 0, storagedriver.PathNotFoundError{Path: path}
|
||||
if err := d.Conn.ObjectMove(d.Container, d.swiftPath(path), d.Container, getSegment()); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
segments = append(segments, info)
|
||||
} else {
|
||||
_, segmentPath = parseManifest(manifest)
|
||||
if segments, err = d.getAllSegments(segmentPath); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
createManifest = false
|
||||
}
|
||||
currentLength = info.Bytes
|
||||
} else if err == swift.ObjectNotFound {
|
||||
if segmentPath, err = d.swiftSegmentPath(path); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
} else {
|
||||
// The manifest already exists. Get all the segments
|
||||
currentLength = info.Bytes
|
||||
if segments, err = d.getAllSegments(path); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if createManifest {
|
||||
if err := d.createManifest(path, d.Container+"/"+segmentPath); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
@ -468,8 +484,18 @@ func (d *driver) List(ctx context.Context, path string) ([]string, error) {
|
|||
// Move moves an object stored at sourcePath to destPath, removing the original
|
||||
// object.
|
||||
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
|
||||
err := d.Conn.ObjectMove(d.Container, d.swiftPath(sourcePath), d.Container, d.swiftPath(destPath))
|
||||
if err == swift.ContainerNotFound || err == swift.ObjectNotFound {
|
||||
_, headers, err := d.Conn.Object(d.Container, d.swiftPath(sourcePath))
|
||||
if err == nil {
|
||||
if manifest, ok := headers["X-Object-Manifest"]; ok {
|
||||
if err = d.createManifest(destPath, manifest); err != nil {
|
||||
return err
|
||||
}
|
||||
err = d.Conn.ObjectDelete(d.Container, d.swiftPath(sourcePath))
|
||||
} else {
|
||||
err = d.Conn.ObjectMove(d.Container, d.swiftPath(sourcePath), d.Container, d.swiftPath(destPath))
|
||||
}
|
||||
}
|
||||
if err == swift.ObjectNotFound {
|
||||
return storagedriver.PathNotFoundError{Path: sourcePath}
|
||||
}
|
||||
return err
|
||||
|
@ -509,9 +535,8 @@ func (d *driver) Delete(ctx context.Context, path string) error {
|
|||
if _, headers, err := d.Conn.Object(d.Container, obj.Name); err == nil {
|
||||
manifest, ok := headers["X-Object-Manifest"]
|
||||
if ok {
|
||||
components := strings.SplitN(manifest, "/", 2)
|
||||
segContainer := components[0]
|
||||
segments, err := d.getAllSegments(components[1])
|
||||
segContainer, prefix := parseManifest(manifest)
|
||||
segments, err := d.getAllSegments(prefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -566,8 +591,14 @@ func (d *driver) swiftPath(path string) string {
|
|||
return strings.TrimLeft(strings.TrimRight(d.Prefix+"/files"+path, "/"), "/")
|
||||
}
|
||||
|
||||
func (d *driver) swiftSegmentPath(path string) string {
|
||||
return strings.TrimLeft(strings.TrimRight(d.Prefix+"/segments"+path, "/"), "/")
|
||||
func (d *driver) swiftSegmentPath(path string) (string, error) {
|
||||
checksum := sha1.New()
|
||||
random := make([]byte, 32)
|
||||
if _, err := rand.Read(random); err != nil {
|
||||
return "", err
|
||||
}
|
||||
path = hex.EncodeToString(checksum.Sum(append([]byte(path), random...)))
|
||||
return strings.TrimLeft(strings.TrimRight(d.Prefix+"/segments/"+path[0:3]+"/"+path[3:], "/"), "/"), nil
|
||||
}
|
||||
|
||||
func (d *driver) getContentType() string {
|
||||
|
@ -575,21 +606,30 @@ func (d *driver) getContentType() string {
|
|||
}
|
||||
|
||||
func (d *driver) getAllSegments(path string) ([]swift.Object, error) {
|
||||
segments, err := d.Conn.ObjectsAll(d.Container, &swift.ObjectsOpts{Prefix: d.swiftSegmentPath(path)})
|
||||
segments, err := d.Conn.ObjectsAll(d.Container, &swift.ObjectsOpts{Prefix: path})
|
||||
if err == swift.ContainerNotFound {
|
||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
return segments, err
|
||||
}
|
||||
|
||||
func (d *driver) createManifest(path string) (*swift.ObjectCreateFile, error) {
|
||||
func (d *driver) createManifest(path string, segments string) error {
|
||||
headers := make(swift.Headers)
|
||||
headers["X-Object-Manifest"] = d.Container + "/" + d.swiftSegmentPath(path)
|
||||
file, err := d.Conn.ObjectCreate(d.Container, d.swiftPath(path), false, "", d.getContentType(), headers)
|
||||
if err == swift.ContainerNotFound {
|
||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||
headers["X-Object-Manifest"] = segments
|
||||
manifest, err := d.Conn.ObjectCreate(d.Container, d.swiftPath(path), false, "", d.getContentType(), headers)
|
||||
if err != nil {
|
||||
if err == swift.ObjectNotFound {
|
||||
return storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
return err
|
||||
}
|
||||
return file, err
|
||||
if err := manifest.Close(); err != nil {
|
||||
if err == swift.ObjectNotFound {
|
||||
return storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func detectBulkDelete(authURL string) (bulkDelete bool) {
|
||||
|
@ -604,3 +644,12 @@ func detectBulkDelete(authURL string) (bulkDelete bool) {
|
|||
}
|
||||
return
|
||||
}
|
||||
|
||||
func parseManifest(manifest string) (container string, prefix string) {
|
||||
components := strings.SplitN(manifest, "/", 2)
|
||||
container = components[0]
|
||||
if len(components) > 1 {
|
||||
prefix = components[1]
|
||||
}
|
||||
return container, prefix
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue