registry/vendor/github.com/ncw/swift/largeobjects.go
2018-09-05 00:28:40 -07:00

448 lines
13 KiB
Go

package swift
import (
"bufio"
"bytes"
"crypto/rand"
"crypto/sha1"
"encoding/hex"
"errors"
"fmt"
"io"
"os"
gopath "path"
"strconv"
"strings"
"time"
)
// NotLargeObject is returned if an operation is performed on an object which isn't large.
var NotLargeObject = errors.New("Not a large object")
// readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded
var readAfterWriteTimeout = 15 * time.Second
// readAfterWriteWait defines the time to sleep between two retries
var readAfterWriteWait = 200 * time.Millisecond
// largeObjectCreateFile represents an open static or dynamic large object
type largeObjectCreateFile struct {
conn *Connection
container string
objectName string
currentLength int64
filePos int64
chunkSize int64
segmentContainer string
prefix string
contentType string
checkHash bool
segments []Object
headers Headers
minChunkSize int64
}
func swiftSegmentPath(path string) (string, error) {
checksum := sha1.New()
random := make([]byte, 32)
if _, err := rand.Read(random); err != nil {
return "", err
}
path = hex.EncodeToString(checksum.Sum(append([]byte(path), random...)))
return strings.TrimLeft(strings.TrimRight("segments/"+path[0:3]+"/"+path[3:], "/"), "/"), nil
}
func getSegment(segmentPath string, partNumber int) string {
return fmt.Sprintf("%s/%016d", segmentPath, partNumber)
}
func parseFullPath(manifest string) (container string, prefix string) {
components := strings.SplitN(manifest, "/", 2)
container = components[0]
if len(components) > 1 {
prefix = components[1]
}
return container, prefix
}
func (headers Headers) IsLargeObjectDLO() bool {
_, isDLO := headers["X-Object-Manifest"]
return isDLO
}
func (headers Headers) IsLargeObjectSLO() bool {
_, isSLO := headers["X-Static-Large-Object"]
return isSLO
}
func (headers Headers) IsLargeObject() bool {
return headers.IsLargeObjectSLO() || headers.IsLargeObjectDLO()
}
func (c *Connection) getAllSegments(container string, path string, headers Headers) (string, []Object, error) {
if manifest, isDLO := headers["X-Object-Manifest"]; isDLO {
segmentContainer, segmentPath := parseFullPath(manifest)
segments, err := c.getAllDLOSegments(segmentContainer, segmentPath)
return segmentContainer, segments, err
}
if headers.IsLargeObjectSLO() {
return c.getAllSLOSegments(container, path)
}
return "", nil, NotLargeObject
}
// LargeObjectOpts describes how a large object should be created
type LargeObjectOpts struct {
Container string // Name of container to place object
ObjectName string // Name of object
Flags int // Creation flags
CheckHash bool // If set Check the hash
Hash string // If set use this hash to check
ContentType string // Content-Type of the object
Headers Headers // Additional headers to upload the object with
ChunkSize int64 // Size of chunks of the object, defaults to 10MB if not set
MinChunkSize int64 // Minimum chunk size, automatically set for SLO's based on info
SegmentContainer string // Name of the container to place segments
SegmentPrefix string // Prefix to use for the segments
NoBuffer bool // Prevents using a bufio.Writer to write segments
}
type LargeObjectFile interface {
io.Writer
io.Seeker
io.Closer
Size() int64
Flush() error
}
// largeObjectCreate creates a large object at opts.Container, opts.ObjectName.
//
// opts.Flags can have the following bits set
// os.TRUNC - remove the contents of the large object if it exists
// os.APPEND - write at the end of the large object
func (c *Connection) largeObjectCreate(opts *LargeObjectOpts) (*largeObjectCreateFile, error) {
var (
segmentPath string
segmentContainer string
segments []Object
currentLength int64
err error
)
if opts.SegmentPrefix != "" {
segmentPath = opts.SegmentPrefix
} else if segmentPath, err = swiftSegmentPath(opts.ObjectName); err != nil {
return nil, err
}
if info, headers, err := c.Object(opts.Container, opts.ObjectName); err == nil {
if opts.Flags&os.O_TRUNC != 0 {
c.LargeObjectDelete(opts.Container, opts.ObjectName)
} else {
currentLength = info.Bytes
if headers.IsLargeObject() {
segmentContainer, segments, err = c.getAllSegments(opts.Container, opts.ObjectName, headers)
if err != nil {
return nil, err
}
if len(segments) > 0 {
segmentPath = gopath.Dir(segments[0].Name)
}
} else {
if err = c.ObjectMove(opts.Container, opts.ObjectName, opts.Container, getSegment(segmentPath, 1)); err != nil {
return nil, err
}
segments = append(segments, info)
}
}
} else if err != ObjectNotFound {
return nil, err
}
// segmentContainer is not empty when the manifest already existed
if segmentContainer == "" {
if opts.SegmentContainer != "" {
segmentContainer = opts.SegmentContainer
} else {
segmentContainer = opts.Container + "_segments"
}
}
file := &largeObjectCreateFile{
conn: c,
checkHash: opts.CheckHash,
container: opts.Container,
objectName: opts.ObjectName,
chunkSize: opts.ChunkSize,
minChunkSize: opts.MinChunkSize,
headers: opts.Headers,
segmentContainer: segmentContainer,
prefix: segmentPath,
segments: segments,
currentLength: currentLength,
}
if file.chunkSize == 0 {
file.chunkSize = 10 * 1024 * 1024
}
if file.minChunkSize > file.chunkSize {
file.chunkSize = file.minChunkSize
}
if opts.Flags&os.O_APPEND != 0 {
file.filePos = currentLength
}
return file, nil
}
// LargeObjectDelete deletes the large object named by container, path
func (c *Connection) LargeObjectDelete(container string, objectName string) error {
_, headers, err := c.Object(container, objectName)
if err != nil {
return err
}
var objects [][]string
if headers.IsLargeObject() {
segmentContainer, segments, err := c.getAllSegments(container, objectName, headers)
if err != nil {
return err
}
for _, obj := range segments {
objects = append(objects, []string{segmentContainer, obj.Name})
}
}
objects = append(objects, []string{container, objectName})
info, err := c.cachedQueryInfo()
if err == nil && info.SupportsBulkDelete() && len(objects) > 0 {
filenames := make([]string, len(objects))
for i, obj := range objects {
filenames[i] = obj[0] + "/" + obj[1]
}
_, err = c.doBulkDelete(filenames)
// Don't fail on ObjectNotFound because eventual consistency
// makes this situation normal.
if err != nil && err != Forbidden && err != ObjectNotFound {
return err
}
} else {
for _, obj := range objects {
if err := c.ObjectDelete(obj[0], obj[1]); err != nil {
return err
}
}
}
return nil
}
// LargeObjectGetSegments returns all the segments that compose an object
// If the object is a Dynamic Large Object (DLO), it just returns the objects
// that have the prefix as indicated by the manifest.
// If the object is a Static Large Object (SLO), it retrieves the JSON content
// of the manifest and return all the segments of it.
func (c *Connection) LargeObjectGetSegments(container string, path string) (string, []Object, error) {
_, headers, err := c.Object(container, path)
if err != nil {
return "", nil, err
}
return c.getAllSegments(container, path, headers)
}
// Seek sets the offset for the next write operation
func (file *largeObjectCreateFile) Seek(offset int64, whence int) (int64, error) {
switch whence {
case 0:
file.filePos = offset
case 1:
file.filePos += offset
case 2:
file.filePos = file.currentLength + offset
default:
return -1, fmt.Errorf("invalid value for whence")
}
if file.filePos < 0 {
return -1, fmt.Errorf("negative offset")
}
return file.filePos, nil
}
func (file *largeObjectCreateFile) Size() int64 {
return file.currentLength
}
func withLORetry(expectedSize int64, fn func() (Headers, int64, error)) (err error) {
endTimer := time.NewTimer(readAfterWriteTimeout)
defer endTimer.Stop()
waitingTime := readAfterWriteWait
for {
var headers Headers
var sz int64
if headers, sz, err = fn(); err == nil {
if !headers.IsLargeObjectDLO() || (expectedSize == 0 && sz > 0) || expectedSize == sz {
return
}
} else {
return
}
waitTimer := time.NewTimer(waitingTime)
select {
case <-endTimer.C:
waitTimer.Stop()
err = fmt.Errorf("Timeout expired while waiting for object to have size == %d, got: %d", expectedSize, sz)
return
case <-waitTimer.C:
waitingTime *= 2
}
}
}
func (c *Connection) waitForSegmentsToShowUp(container, objectName string, expectedSize int64) (err error) {
err = withLORetry(expectedSize, func() (Headers, int64, error) {
var info Object
var headers Headers
info, headers, err = c.objectBase(container, objectName)
if err != nil {
return headers, 0, err
}
return headers, info.Bytes, nil
})
return
}
// Write satisfies the io.Writer interface
func (file *largeObjectCreateFile) Write(buf []byte) (int, error) {
var sz int64
var relativeFilePos int
writeSegmentIdx := 0
for i, obj := range file.segments {
if file.filePos < sz+obj.Bytes || (i == len(file.segments)-1 && file.filePos < sz+file.minChunkSize) {
relativeFilePos = int(file.filePos - sz)
break
}
writeSegmentIdx++
sz += obj.Bytes
}
sizeToWrite := len(buf)
for offset := 0; offset < sizeToWrite; {
newSegment, n, err := file.writeSegment(buf[offset:], writeSegmentIdx, relativeFilePos)
if err != nil {
return 0, err
}
if writeSegmentIdx < len(file.segments) {
file.segments[writeSegmentIdx] = *newSegment
} else {
file.segments = append(file.segments, *newSegment)
}
offset += n
writeSegmentIdx++
relativeFilePos = 0
}
file.filePos += int64(sizeToWrite)
file.currentLength = 0
for _, obj := range file.segments {
file.currentLength += obj.Bytes
}
return sizeToWrite, nil
}
func (file *largeObjectCreateFile) writeSegment(buf []byte, writeSegmentIdx int, relativeFilePos int) (*Object, int, error) {
var (
readers []io.Reader
existingSegment *Object
segmentSize int
)
segmentName := getSegment(file.prefix, writeSegmentIdx+1)
sizeToRead := int(file.chunkSize)
if writeSegmentIdx < len(file.segments) {
existingSegment = &file.segments[writeSegmentIdx]
if writeSegmentIdx != len(file.segments)-1 {
sizeToRead = int(existingSegment.Bytes)
}
if relativeFilePos > 0 {
headers := make(Headers)
headers["Range"] = "bytes=0-" + strconv.FormatInt(int64(relativeFilePos-1), 10)
existingSegmentReader, _, err := file.conn.ObjectOpen(file.segmentContainer, segmentName, true, headers)
if err != nil {
return nil, 0, err
}
defer existingSegmentReader.Close()
sizeToRead -= relativeFilePos
segmentSize += relativeFilePos
readers = []io.Reader{existingSegmentReader}
}
}
if sizeToRead > len(buf) {
sizeToRead = len(buf)
}
segmentSize += sizeToRead
readers = append(readers, bytes.NewReader(buf[:sizeToRead]))
if existingSegment != nil && segmentSize < int(existingSegment.Bytes) {
headers := make(Headers)
headers["Range"] = "bytes=" + strconv.FormatInt(int64(segmentSize), 10) + "-"
tailSegmentReader, _, err := file.conn.ObjectOpen(file.segmentContainer, segmentName, true, headers)
if err != nil {
return nil, 0, err
}
defer tailSegmentReader.Close()
segmentSize = int(existingSegment.Bytes)
readers = append(readers, tailSegmentReader)
}
segmentReader := io.MultiReader(readers...)
headers, err := file.conn.ObjectPut(file.segmentContainer, segmentName, segmentReader, true, "", file.contentType, nil)
if err != nil {
return nil, 0, err
}
return &Object{Name: segmentName, Bytes: int64(segmentSize), Hash: headers["Etag"]}, sizeToRead, nil
}
func withBuffer(opts *LargeObjectOpts, lo LargeObjectFile) LargeObjectFile {
if !opts.NoBuffer {
return &bufferedLargeObjectFile{
LargeObjectFile: lo,
bw: bufio.NewWriterSize(lo, int(opts.ChunkSize)),
}
}
return lo
}
type bufferedLargeObjectFile struct {
LargeObjectFile
bw *bufio.Writer
}
func (blo *bufferedLargeObjectFile) Close() error {
err := blo.bw.Flush()
if err != nil {
return err
}
return blo.LargeObjectFile.Close()
}
func (blo *bufferedLargeObjectFile) Write(p []byte) (n int, err error) {
return blo.bw.Write(p)
}
func (blo *bufferedLargeObjectFile) Seek(offset int64, whence int) (int64, error) {
err := blo.bw.Flush()
if err != nil {
return 0, err
}
return blo.LargeObjectFile.Seek(offset, whence)
}
func (blo *bufferedLargeObjectFile) Size() int64 {
return blo.LargeObjectFile.Size() + int64(blo.bw.Buffered())
}
func (blo *bufferedLargeObjectFile) Flush() error {
err := blo.bw.Flush()
if err != nil {
return err
}
return blo.LargeObjectFile.Flush()
}