Merge pull request #2698 from cquon/swift_vendor

Update ncw/swift Vendor Package
This commit is contained in:
Olivier Gambier 2018-09-05 15:26:25 -07:00 committed by GitHub
commit 6d66d0367e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 1427 additions and 184 deletions

View file

@ -23,7 +23,7 @@ github.com/satori/go.uuid f58768cc1a7a7e77a3bd49e98cdd21419399b6a3
github.com/matttproud/golang_protobuf_extensions c12348ce28de40eed0136aa2b644d0ee0650e56c
github.com/miekg/dns 271c58e0c14f552178ea321a545ff9af38930f39
github.com/mitchellh/mapstructure 482a9fd5fa83e8c4e7817413b80f3eb8feec03ef
github.com/ncw/swift b964f2ca856aac39885e258ad25aec08d5f64ee6
github.com/ncw/swift a0320860b16212c2b59b4912bb6508cda1d7cee6
github.com/prometheus/client_golang c332b6f63c0658a65eca15c0e5247ded801cf564
github.com/prometheus/client_model 99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c
github.com/prometheus/common 89604d197083d4781071d3c65855d24ecfb0a563

View file

@ -26,26 +26,27 @@ See here for full package docs
- http://godoc.org/github.com/ncw/swift
Here is a short example from the docs
```go
import "github.com/ncw/swift"
import "github.com/ncw/swift"
// Create a connection
c := swift.Connection{
UserName: "user",
ApiKey: "key",
AuthUrl: "auth_url",
Domain: "domain", // Name of the domain (v3 auth only)
Tenant: "tenant", // Name of the tenant (v2 auth only)
}
// Authenticate
err := c.Authenticate()
if err != nil {
panic(err)
}
// List all the containers
containers, err := c.ContainerNames(nil)
fmt.Println(containers)
// etc...
// Create a connection
c := swift.Connection{
UserName: "user",
ApiKey: "key",
AuthUrl: "auth_url",
Domain: "domain", // Name of the domain (v3 auth only)
Tenant: "tenant", // Name of the tenant (v2 auth only)
}
// Authenticate
err := c.Authenticate()
if err != nil {
panic(err)
}
// List all the containers
containers, err := c.ContainerNames(nil)
fmt.Println(containers)
// etc...
```
Additions
---------
@ -138,3 +139,5 @@ Contributors
- Cezar Sa Espinola <cezarsa@gmail.com>
- Sam Gunaratne <samgzeit@gmail.com>
- Richard Scothern <richard.scothern@gmail.com>
- Michel Couillard <couillard.michel@voxlog.ca>
- Christopher Waldon <ckwaldon@us.ibm.com>

View file

@ -117,7 +117,7 @@ func (auth *v3Auth) Request(c *Connection) (*http.Request, error) {
v3 := v3AuthRequest{}
if c.UserName == "" {
if c.UserName == "" && c.UserId == "" {
v3.Auth.Identity.Methods = []string{v3AuthMethodToken}
v3.Auth.Identity.Token = &v3AuthToken{Id: c.ApiKey}
} else {
@ -125,6 +125,7 @@ func (auth *v3Auth) Request(c *Connection) (*http.Request, error) {
v3.Auth.Identity.Password = &v3AuthPassword{
User: v3User{
Name: c.UserName,
Id: c.UserId,
Password: c.ApiKey,
},
}

136
vendor/github.com/ncw/swift/dlo.go generated vendored Normal file
View file

@ -0,0 +1,136 @@
package swift
import (
"os"
)
// DynamicLargeObjectCreateFile represents an open static large object
type DynamicLargeObjectCreateFile struct {
largeObjectCreateFile
}
// DynamicLargeObjectCreateFile creates a dynamic large object
// returning an object which satisfies io.Writer, io.Seeker, io.Closer
// and io.ReaderFrom. The flags are as passes to the
// largeObjectCreate method.
func (c *Connection) DynamicLargeObjectCreateFile(opts *LargeObjectOpts) (LargeObjectFile, error) {
lo, err := c.largeObjectCreate(opts)
if err != nil {
return nil, err
}
return withBuffer(opts, &DynamicLargeObjectCreateFile{
largeObjectCreateFile: *lo,
}), nil
}
// DynamicLargeObjectCreate creates or truncates an existing dynamic
// large object returning a writeable object. This sets opts.Flags to
// an appropriate value before calling DynamicLargeObjectCreateFile
func (c *Connection) DynamicLargeObjectCreate(opts *LargeObjectOpts) (LargeObjectFile, error) {
opts.Flags = os.O_TRUNC | os.O_CREATE
return c.DynamicLargeObjectCreateFile(opts)
}
// DynamicLargeObjectDelete deletes a dynamic large object and all of its segments.
func (c *Connection) DynamicLargeObjectDelete(container string, path string) error {
return c.LargeObjectDelete(container, path)
}
// DynamicLargeObjectMove moves a dynamic large object from srcContainer, srcObjectName to dstContainer, dstObjectName
func (c *Connection) DynamicLargeObjectMove(srcContainer string, srcObjectName string, dstContainer string, dstObjectName string) error {
info, headers, err := c.Object(dstContainer, srcObjectName)
if err != nil {
return err
}
segmentContainer, segmentPath := parseFullPath(headers["X-Object-Manifest"])
if err := c.createDLOManifest(dstContainer, dstObjectName, segmentContainer+"/"+segmentPath, info.ContentType); err != nil {
return err
}
if err := c.ObjectDelete(srcContainer, srcObjectName); err != nil {
return err
}
return nil
}
// createDLOManifest creates a dynamic large object manifest
func (c *Connection) createDLOManifest(container string, objectName string, prefix string, contentType string) error {
headers := make(Headers)
headers["X-Object-Manifest"] = prefix
manifest, err := c.ObjectCreate(container, objectName, false, "", contentType, headers)
if err != nil {
return err
}
if err := manifest.Close(); err != nil {
return err
}
return nil
}
// Close satisfies the io.Closer interface
func (file *DynamicLargeObjectCreateFile) Close() error {
return file.Flush()
}
func (file *DynamicLargeObjectCreateFile) Flush() error {
err := file.conn.createDLOManifest(file.container, file.objectName, file.segmentContainer+"/"+file.prefix, file.contentType)
if err != nil {
return err
}
return file.conn.waitForSegmentsToShowUp(file.container, file.objectName, file.Size())
}
func (c *Connection) getAllDLOSegments(segmentContainer, segmentPath string) ([]Object, error) {
//a simple container listing works 99.9% of the time
segments, err := c.ObjectsAll(segmentContainer, &ObjectsOpts{Prefix: segmentPath})
if err != nil {
return nil, err
}
hasObjectName := make(map[string]struct{})
for _, segment := range segments {
hasObjectName[segment.Name] = struct{}{}
}
//The container listing might be outdated (i.e. not contain all existing
//segment objects yet) because of temporary inconsistency (Swift is only
//eventually consistent!). Check its completeness.
segmentNumber := 0
for {
segmentNumber++
segmentName := getSegment(segmentPath, segmentNumber)
if _, seen := hasObjectName[segmentName]; seen {
continue
}
//This segment is missing in the container listing. Use a more reliable
//request to check its existence. (HEAD requests on segments are
//guaranteed to return the correct metadata, except for the pathological
//case of an outage of large parts of the Swift cluster or its network,
//since every segment is only written once.)
segment, _, err := c.Object(segmentContainer, segmentName)
switch err {
case nil:
//found new segment -> add it in the correct position and keep
//going, more might be missing
if segmentNumber <= len(segments) {
segments = append(segments[:segmentNumber], segments[segmentNumber-1:]...)
segments[segmentNumber-1] = segment
} else {
segments = append(segments, segment)
}
continue
case ObjectNotFound:
//This segment is missing. Since we upload segments sequentially,
//there won't be any more segments after it.
return segments, nil
default:
return nil, err //unexpected error
}
}
}

448
vendor/github.com/ncw/swift/largeobjects.go generated vendored Normal file
View file

@ -0,0 +1,448 @@
package swift
import (
"bufio"
"bytes"
"crypto/rand"
"crypto/sha1"
"encoding/hex"
"errors"
"fmt"
"io"
"os"
gopath "path"
"strconv"
"strings"
"time"
)
// NotLargeObject is returned if an operation is performed on an object which isn't large.
var NotLargeObject = errors.New("Not a large object")
// readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded
var readAfterWriteTimeout = 15 * time.Second
// readAfterWriteWait defines the time to sleep between two retries
var readAfterWriteWait = 200 * time.Millisecond
// largeObjectCreateFile represents an open static or dynamic large object
type largeObjectCreateFile struct {
conn *Connection
container string
objectName string
currentLength int64
filePos int64
chunkSize int64
segmentContainer string
prefix string
contentType string
checkHash bool
segments []Object
headers Headers
minChunkSize int64
}
func swiftSegmentPath(path string) (string, error) {
checksum := sha1.New()
random := make([]byte, 32)
if _, err := rand.Read(random); err != nil {
return "", err
}
path = hex.EncodeToString(checksum.Sum(append([]byte(path), random...)))
return strings.TrimLeft(strings.TrimRight("segments/"+path[0:3]+"/"+path[3:], "/"), "/"), nil
}
func getSegment(segmentPath string, partNumber int) string {
return fmt.Sprintf("%s/%016d", segmentPath, partNumber)
}
func parseFullPath(manifest string) (container string, prefix string) {
components := strings.SplitN(manifest, "/", 2)
container = components[0]
if len(components) > 1 {
prefix = components[1]
}
return container, prefix
}
func (headers Headers) IsLargeObjectDLO() bool {
_, isDLO := headers["X-Object-Manifest"]
return isDLO
}
func (headers Headers) IsLargeObjectSLO() bool {
_, isSLO := headers["X-Static-Large-Object"]
return isSLO
}
func (headers Headers) IsLargeObject() bool {
return headers.IsLargeObjectSLO() || headers.IsLargeObjectDLO()
}
func (c *Connection) getAllSegments(container string, path string, headers Headers) (string, []Object, error) {
if manifest, isDLO := headers["X-Object-Manifest"]; isDLO {
segmentContainer, segmentPath := parseFullPath(manifest)
segments, err := c.getAllDLOSegments(segmentContainer, segmentPath)
return segmentContainer, segments, err
}
if headers.IsLargeObjectSLO() {
return c.getAllSLOSegments(container, path)
}
return "", nil, NotLargeObject
}
// LargeObjectOpts describes how a large object should be created
type LargeObjectOpts struct {
Container string // Name of container to place object
ObjectName string // Name of object
Flags int // Creation flags
CheckHash bool // If set Check the hash
Hash string // If set use this hash to check
ContentType string // Content-Type of the object
Headers Headers // Additional headers to upload the object with
ChunkSize int64 // Size of chunks of the object, defaults to 10MB if not set
MinChunkSize int64 // Minimum chunk size, automatically set for SLO's based on info
SegmentContainer string // Name of the container to place segments
SegmentPrefix string // Prefix to use for the segments
NoBuffer bool // Prevents using a bufio.Writer to write segments
}
type LargeObjectFile interface {
io.Writer
io.Seeker
io.Closer
Size() int64
Flush() error
}
// largeObjectCreate creates a large object at opts.Container, opts.ObjectName.
//
// opts.Flags can have the following bits set
// os.TRUNC - remove the contents of the large object if it exists
// os.APPEND - write at the end of the large object
func (c *Connection) largeObjectCreate(opts *LargeObjectOpts) (*largeObjectCreateFile, error) {
var (
segmentPath string
segmentContainer string
segments []Object
currentLength int64
err error
)
if opts.SegmentPrefix != "" {
segmentPath = opts.SegmentPrefix
} else if segmentPath, err = swiftSegmentPath(opts.ObjectName); err != nil {
return nil, err
}
if info, headers, err := c.Object(opts.Container, opts.ObjectName); err == nil {
if opts.Flags&os.O_TRUNC != 0 {
c.LargeObjectDelete(opts.Container, opts.ObjectName)
} else {
currentLength = info.Bytes
if headers.IsLargeObject() {
segmentContainer, segments, err = c.getAllSegments(opts.Container, opts.ObjectName, headers)
if err != nil {
return nil, err
}
if len(segments) > 0 {
segmentPath = gopath.Dir(segments[0].Name)
}
} else {
if err = c.ObjectMove(opts.Container, opts.ObjectName, opts.Container, getSegment(segmentPath, 1)); err != nil {
return nil, err
}
segments = append(segments, info)
}
}
} else if err != ObjectNotFound {
return nil, err
}
// segmentContainer is not empty when the manifest already existed
if segmentContainer == "" {
if opts.SegmentContainer != "" {
segmentContainer = opts.SegmentContainer
} else {
segmentContainer = opts.Container + "_segments"
}
}
file := &largeObjectCreateFile{
conn: c,
checkHash: opts.CheckHash,
container: opts.Container,
objectName: opts.ObjectName,
chunkSize: opts.ChunkSize,
minChunkSize: opts.MinChunkSize,
headers: opts.Headers,
segmentContainer: segmentContainer,
prefix: segmentPath,
segments: segments,
currentLength: currentLength,
}
if file.chunkSize == 0 {
file.chunkSize = 10 * 1024 * 1024
}
if file.minChunkSize > file.chunkSize {
file.chunkSize = file.minChunkSize
}
if opts.Flags&os.O_APPEND != 0 {
file.filePos = currentLength
}
return file, nil
}
// LargeObjectDelete deletes the large object named by container, path
func (c *Connection) LargeObjectDelete(container string, objectName string) error {
_, headers, err := c.Object(container, objectName)
if err != nil {
return err
}
var objects [][]string
if headers.IsLargeObject() {
segmentContainer, segments, err := c.getAllSegments(container, objectName, headers)
if err != nil {
return err
}
for _, obj := range segments {
objects = append(objects, []string{segmentContainer, obj.Name})
}
}
objects = append(objects, []string{container, objectName})
info, err := c.cachedQueryInfo()
if err == nil && info.SupportsBulkDelete() && len(objects) > 0 {
filenames := make([]string, len(objects))
for i, obj := range objects {
filenames[i] = obj[0] + "/" + obj[1]
}
_, err = c.doBulkDelete(filenames)
// Don't fail on ObjectNotFound because eventual consistency
// makes this situation normal.
if err != nil && err != Forbidden && err != ObjectNotFound {
return err
}
} else {
for _, obj := range objects {
if err := c.ObjectDelete(obj[0], obj[1]); err != nil {
return err
}
}
}
return nil
}
// LargeObjectGetSegments returns all the segments that compose an object
// If the object is a Dynamic Large Object (DLO), it just returns the objects
// that have the prefix as indicated by the manifest.
// If the object is a Static Large Object (SLO), it retrieves the JSON content
// of the manifest and return all the segments of it.
func (c *Connection) LargeObjectGetSegments(container string, path string) (string, []Object, error) {
_, headers, err := c.Object(container, path)
if err != nil {
return "", nil, err
}
return c.getAllSegments(container, path, headers)
}
// Seek sets the offset for the next write operation
func (file *largeObjectCreateFile) Seek(offset int64, whence int) (int64, error) {
switch whence {
case 0:
file.filePos = offset
case 1:
file.filePos += offset
case 2:
file.filePos = file.currentLength + offset
default:
return -1, fmt.Errorf("invalid value for whence")
}
if file.filePos < 0 {
return -1, fmt.Errorf("negative offset")
}
return file.filePos, nil
}
func (file *largeObjectCreateFile) Size() int64 {
return file.currentLength
}
func withLORetry(expectedSize int64, fn func() (Headers, int64, error)) (err error) {
endTimer := time.NewTimer(readAfterWriteTimeout)
defer endTimer.Stop()
waitingTime := readAfterWriteWait
for {
var headers Headers
var sz int64
if headers, sz, err = fn(); err == nil {
if !headers.IsLargeObjectDLO() || (expectedSize == 0 && sz > 0) || expectedSize == sz {
return
}
} else {
return
}
waitTimer := time.NewTimer(waitingTime)
select {
case <-endTimer.C:
waitTimer.Stop()
err = fmt.Errorf("Timeout expired while waiting for object to have size == %d, got: %d", expectedSize, sz)
return
case <-waitTimer.C:
waitingTime *= 2
}
}
}
func (c *Connection) waitForSegmentsToShowUp(container, objectName string, expectedSize int64) (err error) {
err = withLORetry(expectedSize, func() (Headers, int64, error) {
var info Object
var headers Headers
info, headers, err = c.objectBase(container, objectName)
if err != nil {
return headers, 0, err
}
return headers, info.Bytes, nil
})
return
}
// Write satisfies the io.Writer interface
func (file *largeObjectCreateFile) Write(buf []byte) (int, error) {
var sz int64
var relativeFilePos int
writeSegmentIdx := 0
for i, obj := range file.segments {
if file.filePos < sz+obj.Bytes || (i == len(file.segments)-1 && file.filePos < sz+file.minChunkSize) {
relativeFilePos = int(file.filePos - sz)
break
}
writeSegmentIdx++
sz += obj.Bytes
}
sizeToWrite := len(buf)
for offset := 0; offset < sizeToWrite; {
newSegment, n, err := file.writeSegment(buf[offset:], writeSegmentIdx, relativeFilePos)
if err != nil {
return 0, err
}
if writeSegmentIdx < len(file.segments) {
file.segments[writeSegmentIdx] = *newSegment
} else {
file.segments = append(file.segments, *newSegment)
}
offset += n
writeSegmentIdx++
relativeFilePos = 0
}
file.filePos += int64(sizeToWrite)
file.currentLength = 0
for _, obj := range file.segments {
file.currentLength += obj.Bytes
}
return sizeToWrite, nil
}
func (file *largeObjectCreateFile) writeSegment(buf []byte, writeSegmentIdx int, relativeFilePos int) (*Object, int, error) {
var (
readers []io.Reader
existingSegment *Object
segmentSize int
)
segmentName := getSegment(file.prefix, writeSegmentIdx+1)
sizeToRead := int(file.chunkSize)
if writeSegmentIdx < len(file.segments) {
existingSegment = &file.segments[writeSegmentIdx]
if writeSegmentIdx != len(file.segments)-1 {
sizeToRead = int(existingSegment.Bytes)
}
if relativeFilePos > 0 {
headers := make(Headers)
headers["Range"] = "bytes=0-" + strconv.FormatInt(int64(relativeFilePos-1), 10)
existingSegmentReader, _, err := file.conn.ObjectOpen(file.segmentContainer, segmentName, true, headers)
if err != nil {
return nil, 0, err
}
defer existingSegmentReader.Close()
sizeToRead -= relativeFilePos
segmentSize += relativeFilePos
readers = []io.Reader{existingSegmentReader}
}
}
if sizeToRead > len(buf) {
sizeToRead = len(buf)
}
segmentSize += sizeToRead
readers = append(readers, bytes.NewReader(buf[:sizeToRead]))
if existingSegment != nil && segmentSize < int(existingSegment.Bytes) {
headers := make(Headers)
headers["Range"] = "bytes=" + strconv.FormatInt(int64(segmentSize), 10) + "-"
tailSegmentReader, _, err := file.conn.ObjectOpen(file.segmentContainer, segmentName, true, headers)
if err != nil {
return nil, 0, err
}
defer tailSegmentReader.Close()
segmentSize = int(existingSegment.Bytes)
readers = append(readers, tailSegmentReader)
}
segmentReader := io.MultiReader(readers...)
headers, err := file.conn.ObjectPut(file.segmentContainer, segmentName, segmentReader, true, "", file.contentType, nil)
if err != nil {
return nil, 0, err
}
return &Object{Name: segmentName, Bytes: int64(segmentSize), Hash: headers["Etag"]}, sizeToRead, nil
}
func withBuffer(opts *LargeObjectOpts, lo LargeObjectFile) LargeObjectFile {
if !opts.NoBuffer {
return &bufferedLargeObjectFile{
LargeObjectFile: lo,
bw: bufio.NewWriterSize(lo, int(opts.ChunkSize)),
}
}
return lo
}
type bufferedLargeObjectFile struct {
LargeObjectFile
bw *bufio.Writer
}
func (blo *bufferedLargeObjectFile) Close() error {
err := blo.bw.Flush()
if err != nil {
return err
}
return blo.LargeObjectFile.Close()
}
func (blo *bufferedLargeObjectFile) Write(p []byte) (n int, err error) {
return blo.bw.Write(p)
}
func (blo *bufferedLargeObjectFile) Seek(offset int64, whence int) (int64, error) {
err := blo.bw.Flush()
if err != nil {
return 0, err
}
return blo.LargeObjectFile.Seek(offset, whence)
}
func (blo *bufferedLargeObjectFile) Size() int64 {
return blo.LargeObjectFile.Size() + int64(blo.bw.Buffered())
}
func (blo *bufferedLargeObjectFile) Flush() error {
err := blo.bw.Flush()
if err != nil {
return err
}
return blo.LargeObjectFile.Flush()
}

171
vendor/github.com/ncw/swift/slo.go generated vendored Normal file
View file

@ -0,0 +1,171 @@
package swift
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/url"
"os"
)
// StaticLargeObjectCreateFile represents an open static large object
type StaticLargeObjectCreateFile struct {
largeObjectCreateFile
}
var SLONotSupported = errors.New("SLO not supported")
type swiftSegment struct {
Path string `json:"path,omitempty"`
Etag string `json:"etag,omitempty"`
Size int64 `json:"size_bytes,omitempty"`
// When uploading a manifest, the attributes must be named `path`, `etag` and `size_bytes`
// but when querying the JSON content of a manifest with the `multipart-manifest=get`
// parameter, Swift names those attributes `name`, `hash` and `bytes`.
// We use all the different attributes names in this structure to be able to use
// the same structure for both uploading and retrieving.
Name string `json:"name,omitempty"`
Hash string `json:"hash,omitempty"`
Bytes int64 `json:"bytes,omitempty"`
ContentType string `json:"content_type,omitempty"`
LastModified string `json:"last_modified,omitempty"`
}
// StaticLargeObjectCreateFile creates a static large object returning
// an object which satisfies io.Writer, io.Seeker, io.Closer and
// io.ReaderFrom. The flags are as passed to the largeObjectCreate
// method.
func (c *Connection) StaticLargeObjectCreateFile(opts *LargeObjectOpts) (LargeObjectFile, error) {
info, err := c.cachedQueryInfo()
if err != nil || !info.SupportsSLO() {
return nil, SLONotSupported
}
realMinChunkSize := info.SLOMinSegmentSize()
if realMinChunkSize > opts.MinChunkSize {
opts.MinChunkSize = realMinChunkSize
}
lo, err := c.largeObjectCreate(opts)
if err != nil {
return nil, err
}
return withBuffer(opts, &StaticLargeObjectCreateFile{
largeObjectCreateFile: *lo,
}), nil
}
// StaticLargeObjectCreate creates or truncates an existing static
// large object returning a writeable object. This sets opts.Flags to
// an appropriate value before calling StaticLargeObjectCreateFile
func (c *Connection) StaticLargeObjectCreate(opts *LargeObjectOpts) (LargeObjectFile, error) {
opts.Flags = os.O_TRUNC | os.O_CREATE
return c.StaticLargeObjectCreateFile(opts)
}
// StaticLargeObjectDelete deletes a static large object and all of its segments.
func (c *Connection) StaticLargeObjectDelete(container string, path string) error {
info, err := c.cachedQueryInfo()
if err != nil || !info.SupportsSLO() {
return SLONotSupported
}
return c.LargeObjectDelete(container, path)
}
// StaticLargeObjectMove moves a static large object from srcContainer, srcObjectName to dstContainer, dstObjectName
func (c *Connection) StaticLargeObjectMove(srcContainer string, srcObjectName string, dstContainer string, dstObjectName string) error {
swiftInfo, err := c.cachedQueryInfo()
if err != nil || !swiftInfo.SupportsSLO() {
return SLONotSupported
}
info, headers, err := c.Object(srcContainer, srcObjectName)
if err != nil {
return err
}
container, segments, err := c.getAllSegments(srcContainer, srcObjectName, headers)
if err != nil {
return err
}
//copy only metadata during move (other headers might not be safe for copying)
headers = headers.ObjectMetadata().ObjectHeaders()
if err := c.createSLOManifest(dstContainer, dstObjectName, info.ContentType, container, segments, headers); err != nil {
return err
}
if err := c.ObjectDelete(srcContainer, srcObjectName); err != nil {
return err
}
return nil
}
// createSLOManifest creates a static large object manifest
func (c *Connection) createSLOManifest(container string, path string, contentType string, segmentContainer string, segments []Object, h Headers) error {
sloSegments := make([]swiftSegment, len(segments))
for i, segment := range segments {
sloSegments[i].Path = fmt.Sprintf("%s/%s", segmentContainer, segment.Name)
sloSegments[i].Etag = segment.Hash
sloSegments[i].Size = segment.Bytes
}
content, err := json.Marshal(sloSegments)
if err != nil {
return err
}
values := url.Values{}
values.Set("multipart-manifest", "put")
if _, err := c.objectPut(container, path, bytes.NewBuffer(content), false, "", contentType, h, values); err != nil {
return err
}
return nil
}
func (file *StaticLargeObjectCreateFile) Close() error {
return file.Flush()
}
func (file *StaticLargeObjectCreateFile) Flush() error {
if err := file.conn.createSLOManifest(file.container, file.objectName, file.contentType, file.segmentContainer, file.segments, file.headers); err != nil {
return err
}
return file.conn.waitForSegmentsToShowUp(file.container, file.objectName, file.Size())
}
func (c *Connection) getAllSLOSegments(container, path string) (string, []Object, error) {
var (
segmentList []swiftSegment
segments []Object
segPath string
segmentContainer string
)
values := url.Values{}
values.Set("multipart-manifest", "get")
file, _, err := c.objectOpen(container, path, true, nil, values)
if err != nil {
return "", nil, err
}
content, err := ioutil.ReadAll(file)
if err != nil {
return "", nil, err
}
json.Unmarshal(content, &segmentList)
for _, segment := range segmentList {
segmentContainer, segPath = parseFullPath(segment.Name[1:])
segments = append(segments, Object{
Name: segPath,
Bytes: segment.Bytes,
Hash: segment.Hash,
})
}
return segmentContainer, segments, nil
}

489
vendor/github.com/ncw/swift/swift.go generated vendored
View file

@ -11,9 +11,11 @@ import (
"fmt"
"hash"
"io"
"io/ioutil"
"mime"
"net/http"
"net/url"
"os"
"path"
"strconv"
"strings"
@ -33,6 +35,17 @@ const (
allObjectsChanLimit = 1000 // ...when fetching to a channel
)
// ObjectType is the type of the swift object, regular, static large,
// or dynamic large.
type ObjectType int
// Values that ObjectType can take
const (
RegularObjectType ObjectType = iota
StaticLargeObjectType
DynamicLargeObjectType
)
// Connection holds the details of the connection to the swift server.
//
// You need to provide UserName, ApiKey and AuthUrl when you create a
@ -86,6 +99,7 @@ type Connection struct {
Domain string // User's domain name
DomainId string // User's domain Id
UserName string // UserName for api
UserId string // User Id
ApiKey string // Key for api access
AuthUrl string // Auth URL
Retries int // Retries on error (default is 3)
@ -108,6 +122,139 @@ type Connection struct {
client *http.Client
Auth Authenticator `json:"-" xml:"-"` // the current authenticator
authLock sync.Mutex // lock when R/W StorageUrl, AuthToken, Auth
// swiftInfo is filled after QueryInfo is called
swiftInfo SwiftInfo
}
// setFromEnv reads the value that param points to (it must be a
// pointer), if it isn't the zero value then it reads the environment
// variable name passed in, parses it according to the type and writes
// it to the pointer.
func setFromEnv(param interface{}, name string) (err error) {
val := os.Getenv(name)
if val == "" {
return
}
switch result := param.(type) {
case *string:
if *result == "" {
*result = val
}
case *int:
if *result == 0 {
*result, err = strconv.Atoi(val)
}
case *bool:
if *result == false {
*result, err = strconv.ParseBool(val)
}
case *time.Duration:
if *result == 0 {
*result, err = time.ParseDuration(val)
}
case *EndpointType:
if *result == EndpointType("") {
*result = EndpointType(val)
}
default:
return newErrorf(0, "can't set var of type %T", param)
}
return err
}
// ApplyEnvironment reads environment variables and applies them to
// the Connection structure. It won't overwrite any parameters which
// are already set in the Connection struct.
//
// To make a new Connection object entirely from the environment you
// would do:
//
// c := new(Connection)
// err := c.ApplyEnvironment()
// if err != nil { log.Fatal(err) }
//
// The naming of these variables follows the official Openstack naming
// scheme so it should be compatible with OpenStack rc files.
//
// For v1 authentication (obsolete)
// ST_AUTH - Auth URL
// ST_USER - UserName for api
// ST_KEY - Key for api access
//
// For v2 authentication
// OS_AUTH_URL - Auth URL
// OS_USERNAME - UserName for api
// OS_PASSWORD - Key for api access
// OS_TENANT_NAME - Name of the tenant
// OS_TENANT_ID - Id of the tenant
// OS_REGION_NAME - Region to use - default is use first region
//
// For v3 authentication
// OS_AUTH_URL - Auth URL
// OS_USERNAME - UserName for api
// OS_USER_ID - User Id
// OS_PASSWORD - Key for api access
// OS_USER_DOMAIN_NAME - User's domain name
// OS_USER_DOMAIN_ID - User's domain Id
// OS_PROJECT_NAME - Name of the project
// OS_PROJECT_DOMAIN_NAME - Name of the tenant's domain, only needed if it differs from the user domain
// OS_PROJECT_DOMAIN_ID - Id of the tenant's domain, only needed if it differs the from user domain
// OS_TRUST_ID - If of the trust
// OS_REGION_NAME - Region to use - default is use first region
//
// Other
// OS_ENDPOINT_TYPE - Endpoint type public, internal or admin
// ST_AUTH_VERSION - Choose auth version - 1, 2 or 3 or leave at 0 for autodetect
//
// For manual authentication
// OS_STORAGE_URL - storage URL from alternate authentication
// OS_AUTH_TOKEN - Auth Token from alternate authentication
//
// Library specific
// GOSWIFT_RETRIES - Retries on error (default is 3)
// GOSWIFT_USER_AGENT - HTTP User agent (default goswift/1.0)
// GOSWIFT_CONNECT_TIMEOUT - Connect channel timeout with unit, eg "10s", "100ms" (default "10s")
// GOSWIFT_TIMEOUT - Data channel timeout with unit, eg "10s", "100ms" (default "60s")
// GOSWIFT_INTERNAL - Set this to "true" to use the the internal network (obsolete - use OS_ENDPOINT_TYPE)
func (c *Connection) ApplyEnvironment() (err error) {
for _, item := range []struct {
result interface{}
name string
}{
// Environment variables - keep in same order as Connection
{&c.Domain, "OS_USER_DOMAIN_NAME"},
{&c.DomainId, "OS_USER_DOMAIN_ID"},
{&c.UserName, "OS_USERNAME"},
{&c.UserId, "OS_USER_ID"},
{&c.ApiKey, "OS_PASSWORD"},
{&c.AuthUrl, "OS_AUTH_URL"},
{&c.Retries, "GOSWIFT_RETRIES"},
{&c.UserAgent, "GOSWIFT_USER_AGENT"},
{&c.ConnectTimeout, "GOSWIFT_CONNECT_TIMEOUT"},
{&c.Timeout, "GOSWIFT_TIMEOUT"},
{&c.Region, "OS_REGION_NAME"},
{&c.AuthVersion, "ST_AUTH_VERSION"},
{&c.Internal, "GOSWIFT_INTERNAL"},
{&c.Tenant, "OS_TENANT_NAME"}, //v2
{&c.Tenant, "OS_PROJECT_NAME"}, // v3
{&c.TenantId, "OS_TENANT_ID"},
{&c.EndpointType, "OS_ENDPOINT_TYPE"},
{&c.TenantDomain, "OS_PROJECT_DOMAIN_NAME"},
{&c.TenantDomainId, "OS_PROJECT_DOMAIN_ID"},
{&c.TrustId, "OS_TRUST_ID"},
{&c.StorageUrl, "OS_STORAGE_URL"},
{&c.AuthToken, "OS_AUTH_TOKEN"},
// v1 auth alternatives
{&c.ApiKey, "ST_KEY"},
{&c.UserName, "ST_USER"},
{&c.AuthUrl, "ST_AUTH"},
} {
err = setFromEnv(item.result, item.name)
if err != nil {
return newErrorf(0, "failed to read env var %q: %v", item.name, err)
}
}
return nil
}
// Error - all errors generated by this package are of this type. Other error
@ -140,6 +287,7 @@ type errorMap map[int]error
var (
// Specific Errors you might want to check for equality
NotModified = newError(304, "Not Modified")
BadRequest = newError(400, "Bad Request")
AuthorizationFailed = newError(401, "Authorization Failed")
ContainerNotFound = newError(404, "Container Not Found")
@ -149,6 +297,7 @@ var (
TimeoutError = newError(408, "Timeout when reading or writing data")
Forbidden = newError(403, "Operation forbidden")
TooLargeObject = newError(413, "Too Large Object")
RateLimit = newError(498, "Rate Limit")
// Mappings for authentication errors
authErrorMap = errorMap{
@ -163,15 +312,18 @@ var (
403: Forbidden,
404: ContainerNotFound,
409: ContainerNotEmpty,
498: RateLimit,
}
// Mappings for object errors
objectErrorMap = errorMap{
304: NotModified,
400: BadRequest,
403: Forbidden,
404: ObjectNotFound,
413: TooLargeObject,
422: ObjectCorrupted,
498: RateLimit,
}
)
@ -184,15 +336,32 @@ func checkClose(c io.Closer, err *error) {
}
}
// drainAndClose discards all data from rd and closes it.
// If an error occurs during Read, it is discarded.
func drainAndClose(rd io.ReadCloser, err *error) {
if rd == nil {
return
}
_, _ = io.Copy(ioutil.Discard, rd)
cerr := rd.Close()
if err != nil && *err == nil {
*err = cerr
}
}
// parseHeaders checks a response for errors and translates into
// standard errors if necessary.
// standard errors if necessary. If an error is returned, resp.Body
// has been drained and closed.
func (c *Connection) parseHeaders(resp *http.Response, errorMap errorMap) error {
if errorMap != nil {
if err, ok := errorMap[resp.StatusCode]; ok {
drainAndClose(resp.Body, nil)
return err
}
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
drainAndClose(resp.Body, nil)
return newErrorf(resp.StatusCode, "HTTP Error: %d: %s", resp.StatusCode, resp.Status)
}
return nil
@ -305,13 +474,14 @@ again:
}
if req != nil {
timer := time.NewTimer(c.ConnectTimeout)
defer timer.Stop()
var resp *http.Response
resp, err = c.doTimeoutRequest(timer, req)
if err != nil {
return
}
defer func() {
checkClose(resp.Body, &err)
drainAndClose(resp.Body, &err)
// Flush the auth connection - we don't want to keep
// it open if keepalives were enabled
flushKeepaliveConnections(c.Transport)
@ -406,6 +576,24 @@ func (c *Connection) authenticated() bool {
// the enabled middlewares and their configuration
type SwiftInfo map[string]interface{}
func (i SwiftInfo) SupportsBulkDelete() bool {
_, val := i["bulk_delete"]
return val
}
func (i SwiftInfo) SupportsSLO() bool {
_, val := i["slo"]
return val
}
func (i SwiftInfo) SLOMinSegmentSize() int64 {
if slo, ok := i["slo"].(map[string]interface{}); ok {
val, _ := slo["min_segment_size"].(float64)
return int64(val)
}
return 1
}
// Discover Swift configuration by doing a request against /info
func (c *Connection) QueryInfo() (infos SwiftInfo, err error) {
infoUrl, err := url.Parse(c.StorageUrl)
@ -413,14 +601,36 @@ func (c *Connection) QueryInfo() (infos SwiftInfo, err error) {
return nil, err
}
infoUrl.Path = path.Join(infoUrl.Path, "..", "..", "info")
resp, err := http.Get(infoUrl.String())
resp, err := c.client.Get(infoUrl.String())
if err == nil {
if resp.StatusCode != http.StatusOK {
drainAndClose(resp.Body, nil)
return nil, fmt.Errorf("Invalid status code for info request: %d", resp.StatusCode)
}
err = readJson(resp, &infos)
if err == nil {
c.authLock.Lock()
c.swiftInfo = infos
c.authLock.Unlock()
}
return infos, err
}
return nil, err
}
func (c *Connection) cachedQueryInfo() (infos SwiftInfo, err error) {
c.authLock.Lock()
infos = c.swiftInfo
c.authLock.Unlock()
if infos == nil {
infos, err = c.QueryInfo()
if err != nil {
return
}
}
return infos, nil
}
// RequestOpts contains parameters for Connection.storage.
type RequestOpts struct {
Container string
@ -444,6 +654,7 @@ type RequestOpts struct {
// Any other parameters (if not None) are added to the targetUrl
//
// Returns a response or an error. If response is returned then
// the resp.Body must be read completely and
// resp.Body.Close() must be called on it, unless noResponse is set in
// which case the body will be closed in this function
//
@ -484,6 +695,7 @@ func (c *Connection) Call(targetUrl string, p RequestOpts) (resp *http.Response,
URL.RawQuery = p.Parameters.Encode()
}
timer := time.NewTimer(c.ConnectTimeout)
defer timer.Stop()
reader := p.Body
if reader != nil {
reader = newWatchdogReader(reader, c.Timeout, timer)
@ -518,7 +730,7 @@ func (c *Connection) Call(targetUrl string, p RequestOpts) (resp *http.Response,
}
// Check to see if token has expired
if resp.StatusCode == 401 && retries > 0 {
_ = resp.Body.Close()
drainAndClose(resp.Body, nil)
c.UnAuthenticate()
retries--
} else {
@ -527,12 +739,12 @@ func (c *Connection) Call(targetUrl string, p RequestOpts) (resp *http.Response,
}
if err = c.parseHeaders(resp, p.ErrorMap); err != nil {
_ = resp.Body.Close()
return nil, nil, err
}
headers = readHeaders(resp)
if p.NoResponse {
err = resp.Body.Close()
var err error
drainAndClose(resp.Body, &err)
if err != nil {
return nil, nil, err
}
@ -574,7 +786,7 @@ func (c *Connection) storage(p RequestOpts) (resp *http.Response, headers Header
//
// Closes the response when done
func readLines(resp *http.Response) (lines []string, err error) {
defer checkClose(resp.Body, &err)
defer drainAndClose(resp.Body, &err)
reader := bufio.NewReader(resp.Body)
buffer := bytes.NewBuffer(make([]byte, 0, 128))
var part []byte
@ -599,7 +811,7 @@ func readLines(resp *http.Response) (lines []string, err error) {
//
// Closes the response when done
func readJson(resp *http.Response, result interface{}) (err error) {
defer checkClose(resp.Body, &err)
defer drainAndClose(resp.Body, &err)
decoder := json.NewDecoder(resp.Body)
return decoder.Decode(result)
}
@ -796,14 +1008,15 @@ func (c *Connection) ObjectNames(container string, opts *ObjectsOpts) ([]string,
// Object contains information about an object
type Object struct {
Name string `json:"name"` // object name
ContentType string `json:"content_type"` // eg application/directory
Bytes int64 `json:"bytes"` // size in bytes
ServerLastModified string `json:"last_modified"` // Last modified time, eg '2011-06-30T08:20:47.736680' as a string supplied by the server
LastModified time.Time // Last modified time converted to a time.Time
Hash string `json:"hash"` // MD5 hash, eg "d41d8cd98f00b204e9800998ecf8427e"
PseudoDirectory bool // Set when using delimiter to show that this directory object does not really exist
SubDir string `json:"subdir"` // returned only when using delimiter to mark "pseudo directories"
Name string `json:"name"` // object name
ContentType string `json:"content_type"` // eg application/directory
Bytes int64 `json:"bytes"` // size in bytes
ServerLastModified string `json:"last_modified"` // Last modified time, eg '2011-06-30T08:20:47.736680' as a string supplied by the server
LastModified time.Time // Last modified time converted to a time.Time
Hash string `json:"hash"` // MD5 hash, eg "d41d8cd98f00b204e9800998ecf8427e"
PseudoDirectory bool // Set when using delimiter to show that this directory object does not really exist
SubDir string `json:"subdir"` // returned only when using delimiter to mark "pseudo directories"
ObjectType ObjectType // type of this object
}
// Objects returns a slice of Object with information about each
@ -1141,6 +1354,19 @@ func (file *ObjectCreateFile) Close() error {
return nil
}
// Headers returns the response headers from the created object if the upload
// has been completed. The Close() method must be called on an ObjectCreateFile
// before this method.
func (file *ObjectCreateFile) Headers() (Headers, error) {
// error out if upload is not complete.
select {
case <-file.done:
default:
return nil, fmt.Errorf("Cannot get metadata, object upload failed or has not yet completed.")
}
return file.headers, nil
}
// Check it satisfies the interface
var _ io.WriteCloser = &ObjectCreateFile{}
@ -1202,7 +1428,7 @@ func (c *Connection) ObjectCreate(container string, objectName string, checkHash
}
// Run the PUT in the background piping it data
go func() {
file.resp, file.headers, file.err = c.storage(RequestOpts{
opts := RequestOpts{
Container: container,
ObjectName: objectName,
Operation: "PUT",
@ -1210,7 +1436,8 @@ func (c *Connection) ObjectCreate(container string, objectName string, checkHash
Body: pipeReader,
NoResponse: true,
ErrorMap: objectErrorMap,
})
}
file.resp, file.headers, file.err = c.storage(opts)
// Signal finished
pipeReader.Close()
close(file.done)
@ -1218,6 +1445,37 @@ func (c *Connection) ObjectCreate(container string, objectName string, checkHash
return
}
func (c *Connection) objectPut(container string, objectName string, contents io.Reader, checkHash bool, Hash string, contentType string, h Headers, parameters url.Values) (headers Headers, err error) {
extraHeaders := objectPutHeaders(objectName, &checkHash, Hash, contentType, h)
hash := md5.New()
var body io.Reader = contents
if checkHash {
body = io.TeeReader(contents, hash)
}
_, headers, err = c.storage(RequestOpts{
Container: container,
ObjectName: objectName,
Operation: "PUT",
Headers: extraHeaders,
Body: body,
NoResponse: true,
ErrorMap: objectErrorMap,
Parameters: parameters,
})
if err != nil {
return
}
if checkHash {
receivedMd5 := strings.ToLower(headers["Etag"])
calculatedMd5 := fmt.Sprintf("%x", hash.Sum(nil))
if receivedMd5 != calculatedMd5 {
err = ObjectCorrupted
return
}
}
return
}
// ObjectPut creates or updates the path in the container from
// contents. contents should be an open io.Reader which will have all
// its contents read.
@ -1240,33 +1498,7 @@ func (c *Connection) ObjectCreate(container string, objectName string, checkHash
// If contentType is set it will be used, otherwise one will be
// guessed from objectName using mime.TypeByExtension
func (c *Connection) ObjectPut(container string, objectName string, contents io.Reader, checkHash bool, Hash string, contentType string, h Headers) (headers Headers, err error) {
extraHeaders := objectPutHeaders(objectName, &checkHash, Hash, contentType, h)
hash := md5.New()
var body io.Reader = contents
if checkHash {
body = io.TeeReader(contents, hash)
}
_, headers, err = c.storage(RequestOpts{
Container: container,
ObjectName: objectName,
Operation: "PUT",
Headers: extraHeaders,
Body: body,
NoResponse: true,
ErrorMap: objectErrorMap,
})
if err != nil {
return
}
if checkHash {
receivedMd5 := strings.ToLower(headers["Etag"])
calculatedMd5 := fmt.Sprintf("%x", hash.Sum(nil))
if receivedMd5 != calculatedMd5 {
err = ObjectCorrupted
return
}
}
return
return c.objectPut(container, objectName, contents, checkHash, Hash, contentType, h, nil)
}
// ObjectPutBytes creates an object from a []byte in a container.
@ -1274,7 +1506,8 @@ func (c *Connection) ObjectPut(container string, objectName string, contents io.
// This is a simplified interface which checks the MD5.
func (c *Connection) ObjectPutBytes(container string, objectName string, contents []byte, contentType string) (err error) {
buf := bytes.NewBuffer(contents)
_, err = c.ObjectPut(container, objectName, buf, true, "", contentType, nil)
h := Headers{"Content-Length": strconv.Itoa(len(contents))}
_, err = c.ObjectPut(container, objectName, buf, true, "", contentType, h)
return
}
@ -1283,7 +1516,8 @@ func (c *Connection) ObjectPutBytes(container string, objectName string, content
// This is a simplified interface which checks the MD5
func (c *Connection) ObjectPutString(container string, objectName string, contents string, contentType string) (err error) {
buf := strings.NewReader(contents)
_, err = c.ObjectPut(container, objectName, buf, true, "", contentType, nil)
h := Headers{"Content-Length": strconv.Itoa(len(contents))}
_, err = c.ObjectPut(container, objectName, buf, true, "", contentType, h)
return
}
@ -1303,10 +1537,14 @@ type ObjectOpenFile struct {
lengthOk bool // whether length is valid
length int64 // length of the object if read
seeked bool // whether we have seeked this file or not
overSeeked bool // set if we have seeked to the end or beyond
}
// Read bytes from the object - see io.Reader
func (file *ObjectOpenFile) Read(p []byte) (n int, err error) {
if file.overSeeked {
return 0, io.EOF
}
n, err = file.body.Read(p)
file.bytes += int64(n)
file.pos += int64(n)
@ -1330,6 +1568,7 @@ func (file *ObjectOpenFile) Read(p []byte) (n int, err error) {
//
// Seek(0, 1) will return the current file pointer.
func (file *ObjectOpenFile) Seek(offset int64, whence int) (newPos int64, err error) {
file.overSeeked = false
switch whence {
case 0: // relative to start
newPos = offset
@ -1340,6 +1579,10 @@ func (file *ObjectOpenFile) Seek(offset int64, whence int) (newPos int64, err er
return file.pos, newError(0, "Length of file unknown so can't seek from end")
}
newPos = file.length + offset
if offset >= 0 {
file.overSeeked = true
return
}
default:
panic("Unknown whence in ObjectOpenFile.Seek")
}
@ -1419,6 +1662,57 @@ func (file *ObjectOpenFile) Close() (err error) {
var _ io.ReadCloser = &ObjectOpenFile{}
var _ io.Seeker = &ObjectOpenFile{}
func (c *Connection) objectOpenBase(container string, objectName string, checkHash bool, h Headers, parameters url.Values) (file *ObjectOpenFile, headers Headers, err error) {
var resp *http.Response
opts := RequestOpts{
Container: container,
ObjectName: objectName,
Operation: "GET",
ErrorMap: objectErrorMap,
Headers: h,
Parameters: parameters,
}
resp, headers, err = c.storage(opts)
if err != nil {
return
}
// Can't check MD5 on an object with X-Object-Manifest or X-Static-Large-Object set
if checkHash && headers.IsLargeObject() {
// log.Printf("swift: turning off md5 checking on object with manifest %v", objectName)
checkHash = false
}
file = &ObjectOpenFile{
connection: c,
container: container,
objectName: objectName,
headers: h,
resp: resp,
checkHash: checkHash,
body: resp.Body,
}
if checkHash {
file.hash = md5.New()
file.body = io.TeeReader(resp.Body, file.hash)
}
// Read Content-Length
if resp.Header.Get("Content-Length") != "" {
file.length, err = getInt64FromHeader(resp, "Content-Length")
file.lengthOk = (err == nil)
}
return
}
func (c *Connection) objectOpen(container string, objectName string, checkHash bool, h Headers, parameters url.Values) (file *ObjectOpenFile, headers Headers, err error) {
err = withLORetry(0, func() (Headers, int64, error) {
file, headers, err = c.objectOpenBase(container, objectName, checkHash, h, parameters)
if err != nil {
return headers, 0, err
}
return headers, file.length, nil
})
return
}
// ObjectOpen returns an ObjectOpenFile for reading the contents of
// the object. This satisfies the io.ReadCloser and the io.Seeker
// interfaces.
@ -1443,41 +1737,7 @@ var _ io.Seeker = &ObjectOpenFile{}
//
// headers["Content-Type"] will give the content type if desired.
func (c *Connection) ObjectOpen(container string, objectName string, checkHash bool, h Headers) (file *ObjectOpenFile, headers Headers, err error) {
var resp *http.Response
resp, headers, err = c.storage(RequestOpts{
Container: container,
ObjectName: objectName,
Operation: "GET",
ErrorMap: objectErrorMap,
Headers: h,
})
if err != nil {
return
}
// Can't check MD5 on an object with X-Object-Manifest or X-Static-Large-Object set
if checkHash && (headers["X-Object-Manifest"] != "" || headers["X-Static-Large-Object"] != "") {
// log.Printf("swift: turning off md5 checking on object with manifest %v", objectName)
checkHash = false
}
file = &ObjectOpenFile{
connection: c,
container: container,
objectName: objectName,
headers: h,
resp: resp,
checkHash: checkHash,
body: resp.Body,
}
if checkHash {
file.hash = md5.New()
file.body = io.TeeReader(resp.Body, file.hash)
}
// Read Content-Length
if resp.Header.Get("Content-Length") != "" {
file.length, err = getInt64FromHeader(resp, "Content-Length")
file.lengthOk = (err == nil)
}
return
return c.objectOpen(container, objectName, checkHash, h, nil)
}
// ObjectGet gets the object into the io.Writer contents.
@ -1580,26 +1840,19 @@ type BulkDeleteResult struct {
Headers Headers // Response HTTP headers.
}
// BulkDelete deletes multiple objectNames from container in one operation.
//
// Some servers may not accept bulk-delete requests since bulk-delete is
// an optional feature of swift - these will return the Forbidden error.
//
// See also:
// * http://docs.openstack.org/trunk/openstack-object-storage/admin/content/object-storage-bulk-delete.html
// * http://docs.rackspace.com/files/api/v1/cf-devguide/content/Bulk_Delete-d1e2338.html
func (c *Connection) BulkDelete(container string, objectNames []string) (result BulkDeleteResult, err error) {
func (c *Connection) doBulkDelete(objects []string) (result BulkDeleteResult, err error) {
var buffer bytes.Buffer
for _, s := range objectNames {
buffer.WriteString(fmt.Sprintf("/%s/%s\n", container,
url.QueryEscape(s)))
for _, s := range objects {
u := url.URL{Path: s}
buffer.WriteString(u.String() + "\n")
}
resp, headers, err := c.storage(RequestOpts{
Operation: "DELETE",
Parameters: url.Values{"bulk-delete": []string{"1"}},
Headers: Headers{
"Accept": "application/json",
"Content-Type": "text/plain",
"Accept": "application/json",
"Content-Type": "text/plain",
"Content-Length": strconv.Itoa(buffer.Len()),
},
ErrorMap: ContainerErrorMap,
Body: &buffer,
@ -1633,6 +1886,22 @@ func (c *Connection) BulkDelete(container string, objectNames []string) (result
return
}
// BulkDelete deletes multiple objectNames from container in one operation.
//
// Some servers may not accept bulk-delete requests since bulk-delete is
// an optional feature of swift - these will return the Forbidden error.
//
// See also:
// * http://docs.openstack.org/trunk/openstack-object-storage/admin/content/object-storage-bulk-delete.html
// * http://docs.rackspace.com/files/api/v1/cf-devguide/content/Bulk_Delete-d1e2338.html
func (c *Connection) BulkDelete(container string, objectNames []string) (result BulkDeleteResult, err error) {
fullPaths := make([]string, len(objectNames))
for i, name := range objectNames {
fullPaths[i] = fmt.Sprintf("/%s/%s", container, name)
}
return c.doBulkDelete(fullPaths)
}
// BulkUploadResult stores results of BulkUpload().
//
// Individual errors may (or may not) be returned by Errors.
@ -1716,6 +1985,17 @@ func (c *Connection) BulkUpload(uploadPath string, dataStream io.Reader, format
//
// Use headers.ObjectMetadata() to read the metadata in the Headers.
func (c *Connection) Object(container string, objectName string) (info Object, headers Headers, err error) {
err = withLORetry(0, func() (Headers, int64, error) {
info, headers, err = c.objectBase(container, objectName)
if err != nil {
return headers, 0, err
}
return headers, info.Bytes, nil
})
return
}
func (c *Connection) objectBase(container string, objectName string) (info Object, headers Headers, err error) {
var resp *http.Response
resp, headers, err = c.storage(RequestOpts{
Container: container,
@ -1756,6 +2036,12 @@ func (c *Connection) Object(container string, objectName string) (info Object, h
}
info.Hash = resp.Header.Get("Etag")
if resp.Header.Get("X-Object-Manifest") != "" {
info.ObjectType = DynamicLargeObjectType
} else if resp.Header.Get("X-Static-Large-Object") != "" {
info.ObjectType = StaticLargeObjectType
}
return
}
@ -1793,6 +2079,15 @@ func (c *Connection) ObjectUpdate(container string, objectName string, h Headers
return err
}
// urlPathEscape escapes URL path the in string using URL escaping rules
//
// This mimics url.PathEscape which only available from go 1.8
func urlPathEscape(in string) string {
var u url.URL
u.Path = in
return u.String()
}
// ObjectCopy does a server side copy of an object to a new position
//
// All metadata is preserved. If metadata is set in the headers then
@ -1805,7 +2100,7 @@ func (c *Connection) ObjectUpdate(container string, objectName string, h Headers
func (c *Connection) ObjectCopy(srcContainer string, srcObjectName string, dstContainer string, dstObjectName string, h Headers) (headers Headers, err error) {
// Meta stuff
extraHeaders := map[string]string{
"Destination": dstContainer + "/" + dstObjectName,
"Destination": urlPathEscape(dstContainer + "/" + dstObjectName),
}
for key, value := range h {
extraHeaders[key] = value

View file

@ -21,6 +21,7 @@ import (
"mime"
"net"
"net/http"
"net/http/httptest"
"net/url"
"path"
"regexp"
@ -28,6 +29,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@ -39,21 +41,28 @@ const (
TEST_ACCOUNT = "swifttest"
)
type HandlerOverrideFunc func(w http.ResponseWriter, r *http.Request, recorder *httptest.ResponseRecorder)
type SwiftServer struct {
// `sync/atomic` expects the first word in an allocated struct to be 64-bit
// aligned on both ARM and x86-32.
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for more details.
reqId int64
sync.RWMutex
t *testing.T
reqId int
mu sync.Mutex
Listener net.Listener
AuthURL string
URL string
Accounts map[string]*account
Sessions map[string]*session
override map[string]HandlerOverrideFunc
}
// The Folder type represents a container stored in an account
type Folder struct {
Count int `json:"count"`
Bytes int `json:"bytes"`
Count int64 `json:"count"`
Bytes int64 `json:"bytes"`
Name string `json:"name"`
}
@ -96,13 +105,16 @@ type metadata struct {
}
type account struct {
sync.RWMutex
swift.Account
metadata
password string
Containers map[string]*container
password string
ContainersLock sync.RWMutex
Containers map[string]*container
}
type object struct {
sync.RWMutex
metadata
name string
mtime time.Time
@ -112,11 +124,31 @@ type object struct {
}
type container struct {
// `sync/atomic` expects the first word in an allocated struct to be 64-bit
// aligned on both ARM and x86-32.
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for more details.
bytes int64
sync.RWMutex
metadata
name string
ctime time.Time
objects map[string]*object
bytes int
}
type segment struct {
Path string `json:"path,omitempty"`
Hash string `json:"hash,omitempty"`
Size int64 `json:"size_bytes,omitempty"`
// When uploading a manifest, the attributes must be named `path`, `hash` and `size`
// but when querying the JSON content of a manifest with the `multipart-manifest=get`
// parameter, Swift names those attributes `name`, `etag` and `bytes`.
// We use all the different attributes names in this structure to be able to use
// the same structure for both uploading and retrieving.
Name string `json:"name,omitempty"`
Etag string `json:"etag,omitempty"`
Bytes int64 `json:"bytes,omitempty"`
ContentType string `json:"content_type,omitempty"`
LastModified string `json:"last_modified,omitempty"`
}
// A resource encapsulates the subject of an HTTP request.
@ -179,9 +211,12 @@ func (m metadata) getMetadata(a *action) {
}
}
func (c container) list(delimiter string, marker string, prefix string, parent string) (resp []interface{}) {
func (c *container) list(delimiter string, marker string, prefix string, parent string) (resp []interface{}) {
var tmp orderedObjects
c.RLock()
defer c.RUnlock()
// first get all matching objects and arrange them in alphabetical order.
for _, obj := range c.objects {
if strings.HasPrefix(obj.name, prefix) {
@ -236,19 +271,23 @@ func (r containerResource) get(a *action) interface{} {
fatalf(404, "NoSuchContainer", "The specified container does not exist")
}
r.container.RLock()
delimiter := a.req.Form.Get("delimiter")
marker := a.req.Form.Get("marker")
prefix := a.req.Form.Get("prefix")
format := a.req.URL.Query().Get("format")
parent := a.req.Form.Get("path")
a.w.Header().Set("X-Container-Bytes-Used", strconv.Itoa(r.container.bytes))
a.w.Header().Set("X-Container-Bytes-Used", strconv.Itoa(int(r.container.bytes)))
a.w.Header().Set("X-Container-Object-Count", strconv.Itoa(len(r.container.objects)))
r.container.getMetadata(a)
if a.req.Method == "HEAD" {
r.container.RUnlock()
return nil
}
r.container.RUnlock()
objects := r.container.list(delimiter, marker, prefix, parent)
@ -297,8 +336,10 @@ func (r containerResource) delete(a *action) interface{} {
if len(b.objects) > 0 {
fatalf(409, "Conflict", "The container you tried to delete is not empty")
}
a.user.Lock()
delete(a.user.Containers, b.name)
a.user.Account.Containers--
a.user.Unlock()
return nil
}
@ -319,8 +360,11 @@ func (r containerResource) put(a *action) interface{} {
},
}
r.container.setMetadata(a, "container")
a.user.Lock()
a.user.Containers[r.name] = r.container
a.user.Account.Containers++
a.user.Unlock()
}
return nil
@ -330,10 +374,13 @@ func (r containerResource) post(a *action) interface{} {
if r.container == nil {
fatalf(400, "Method", "The resource could not be found.")
} else {
r.container.RLock()
defer r.container.RUnlock()
r.container.setMetadata(a, "container")
a.w.WriteHeader(201)
jsonMarshal(a.w, Folder{
Count: len(r.container.objects),
Count: int64(len(r.container.objects)),
Bytes: r.container.bytes,
Name: r.container.name,
})
@ -388,10 +435,11 @@ func (obj *object) Key() Key {
}
var metaHeaders = map[string]bool{
"Content-Type": true,
"Content-Encoding": true,
"Content-Disposition": true,
"X-Object-Manifest": true,
"Content-Type": true,
"Content-Encoding": true,
"Content-Disposition": true,
"X-Object-Manifest": true,
"X-Static-Large-Object": true,
}
var rangeRegexp = regexp.MustCompile("(bytes=)?([0-9]*)-([0-9]*)")
@ -409,6 +457,9 @@ func (objr objectResource) get(a *action) interface{} {
fatalf(404, "Not Found", "The resource could not be found.")
}
obj.RLock()
defer obj.RUnlock()
h := a.w.Header()
// add metadata
obj.getMetadata(a)
@ -433,7 +484,9 @@ func (objr objectResource) get(a *action) interface{} {
if manifest, ok := obj.meta["X-Object-Manifest"]; ok {
var segments []io.Reader
components := strings.SplitN(manifest[0], "/", 2)
a.user.RLock()
segContainer := a.user.Containers[components[0]]
a.user.RUnlock()
prefix := components[1]
resp := segContainer.list("", "", prefix, "")
sum := md5.New()
@ -453,19 +506,54 @@ func (objr objectResource) get(a *action) interface{} {
}
etag = sum.Sum(nil)
if end == -1 {
end = size
end = size - 1
}
reader = io.LimitReader(io.MultiReader(segments...), int64(end-start))
reader = io.LimitReader(io.MultiReader(segments...), int64(end-start+1))
} else if value, ok := obj.meta["X-Static-Large-Object"]; ok && value[0] == "True" && a.req.URL.Query().Get("multipart-manifest") != "get" {
var segments []io.Reader
var segmentList []segment
json.Unmarshal(obj.data, &segmentList)
cursor := 0
size := 0
sum := md5.New()
for _, segment := range segmentList {
components := strings.SplitN(segment.Name[1:], "/", 2)
a.user.RLock()
segContainer := a.user.Containers[components[0]]
a.user.RUnlock()
objectName := components[1]
segObject := segContainer.objects[objectName]
length := len(segObject.data)
size += length
sum.Write([]byte(hex.EncodeToString(segObject.checksum)))
if start >= cursor+length {
continue
}
segments = append(segments, bytes.NewReader(segObject.data[max(0, start-cursor):]))
cursor += length
}
etag = sum.Sum(nil)
if end == -1 {
end = size - 1
}
reader = io.LimitReader(io.MultiReader(segments...), int64(end-start+1))
} else {
if end == -1 {
end = len(obj.data)
end = len(obj.data) - 1
}
etag = obj.checksum
reader = bytes.NewReader(obj.data[start:end])
reader = bytes.NewReader(obj.data[start : end+1])
}
h.Set("Content-Length", fmt.Sprint(end-start))
h.Set("ETag", hex.EncodeToString(etag))
etagHex := hex.EncodeToString(etag)
if a.req.Header.Get("If-None-Match") == etagHex {
a.w.WriteHeader(http.StatusNotModified)
return nil
}
h.Set("Content-Length", fmt.Sprint(end-start+1))
h.Set("ETag", etagHex)
h.Set("Last-Modified", obj.mtime.Format(http.TimeFormat))
if a.req.Method == "HEAD" {
@ -514,10 +602,10 @@ func (objr objectResource) put(a *action) interface{} {
meta: make(http.Header),
},
}
a.user.Objects++
atomic.AddInt64(&a.user.Objects, 1)
} else {
objr.container.bytes -= len(obj.data)
a.user.BytesUsed -= int64(len(obj.data))
atomic.AddInt64(&objr.container.bytes, -int64(len(obj.data)))
atomic.AddInt64(&a.user.BytesUsed, -int64(len(obj.data)))
}
var content_type string
@ -528,15 +616,39 @@ func (objr objectResource) put(a *action) interface{} {
}
}
if a.req.URL.Query().Get("multipart-manifest") == "put" {
// TODO: check the content of the SLO
a.req.Header.Set("X-Static-Large-Object", "True")
var segments []segment
json.Unmarshal(data, &segments)
for i := range segments {
segments[i].Name = "/" + segments[i].Path
segments[i].Path = ""
segments[i].Hash = segments[i].Etag
segments[i].Etag = ""
segments[i].Bytes = segments[i].Size
segments[i].Size = 0
}
data, _ = json.Marshal(segments)
sum = md5.New()
sum.Write(data)
gotHash = sum.Sum(nil)
}
// PUT request has been successful - save data and metadata
obj.setMetadata(a, "object")
obj.content_type = content_type
obj.data = data
obj.checksum = gotHash
obj.mtime = time.Now().UTC()
objr.container.Lock()
objr.container.objects[objr.name] = obj
objr.container.bytes += len(data)
a.user.BytesUsed += int64(len(data))
objr.container.bytes += int64(len(data))
objr.container.Unlock()
atomic.AddInt64(&a.user.BytesUsed, int64(len(data)))
h := a.w.Header()
h.Set("ETag", hex.EncodeToString(obj.checksum))
@ -549,14 +661,25 @@ func (objr objectResource) delete(a *action) interface{} {
fatalf(404, "NoSuchKey", "The specified key does not exist.")
}
objr.container.bytes -= len(objr.object.data)
a.user.BytesUsed -= int64(len(objr.object.data))
objr.container.Lock()
defer objr.container.Unlock()
objr.object.Lock()
defer objr.object.Unlock()
objr.container.bytes -= int64(len(objr.object.data))
delete(objr.container.objects, objr.name)
a.user.Objects--
atomic.AddInt64(&a.user.BytesUsed, -int64(len(objr.object.data)))
atomic.AddInt64(&a.user.Objects, -1)
return nil
}
func (objr objectResource) post(a *action) interface{} {
objr.object.Lock()
defer objr.object.Unlock()
obj := objr.object
obj.setMetadata(a, "object")
return nil
@ -568,6 +691,9 @@ func (objr objectResource) copy(a *action) interface{} {
}
obj := objr.object
obj.RLock()
defer obj.RUnlock()
destination := a.req.Header.Get("Destination")
if destination == "" {
fatalf(400, "Bad Request", "You must provide a Destination header")
@ -590,29 +716,38 @@ func (objr objectResource) copy(a *action) interface{} {
meta: make(http.Header),
},
}
a.user.Objects++
atomic.AddInt64(&a.user.Objects, 1)
} else {
obj2 = objr2.object
objr2.container.bytes -= len(obj2.data)
a.user.BytesUsed -= int64(len(obj2.data))
atomic.AddInt64(&objr2.container.bytes, -int64(len(obj2.data)))
atomic.AddInt64(&a.user.BytesUsed, -int64(len(obj2.data)))
}
default:
fatalf(400, "Bad Request", "Destination must point to a valid object path")
}
if objr2.container.name != objr2.container.name && obj2.name != obj.name {
obj2.Lock()
defer obj2.Unlock()
}
obj2.content_type = obj.content_type
obj2.data = obj.data
obj2.checksum = obj.checksum
obj2.mtime = time.Now()
objr2.container.objects[objr2.name] = obj2
objr2.container.bytes += len(obj.data)
a.user.BytesUsed += int64(len(obj.data))
for key, values := range obj.metadata.meta {
obj2.metadata.meta[key] = values
}
obj2.setMetadata(a, "object")
objr2.container.Lock()
objr2.container.objects[objr2.name] = obj2
objr2.container.bytes += int64(len(obj.data))
objr2.container.Unlock()
atomic.AddInt64(&a.user.BytesUsed, int64(len(obj.data)))
return nil
}
@ -620,8 +755,14 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) {
// ignore error from ParseForm as it's usually spurious.
req.ParseForm()
s.mu.Lock()
defer s.mu.Unlock()
if fn := s.override[req.URL.Path]; fn != nil {
originalRW := w
recorder := httptest.NewRecorder()
w = recorder
defer func() {
fn(originalRW, req, recorder)
}()
}
if DEBUG {
log.Printf("swifttest %q %q", req.Method, req.URL)
@ -630,9 +771,9 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) {
srv: s,
w: w,
req: req,
reqId: fmt.Sprintf("%09X", s.reqId),
reqId: fmt.Sprintf("%09X", atomic.LoadInt64(&s.reqId)),
}
s.reqId++
atomic.AddInt64(&s.reqId, 1)
var r resource
defer func() {
@ -651,6 +792,8 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) {
if req.URL.String() == "/v1.0" {
username := req.Header.Get("x-auth-user")
key := req.Header.Get("x-auth-key")
s.Lock()
defer s.Unlock()
if acct, ok := s.Accounts[username]; ok {
if acct.password == key {
r := make([]byte, 16)
@ -676,6 +819,11 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) {
"tempurl": map[string]interface{}{
"methods": []string{"GET", "HEAD", "PUT"},
},
"slo": map[string]interface{}{
"max_manifest_segments": 1000,
"max_manifest_size": 2097152,
"min_segment_size": 1,
},
})
return
}
@ -688,9 +836,11 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) {
if key == "" && signature != "" && expires != "" {
accountName, _, _, _ := s.parseURL(req.URL)
secretKey := ""
s.RLock()
if account, ok := s.Accounts[accountName]; ok {
secretKey = account.meta.Get("X-Account-Meta-Temp-Url-Key")
}
s.RUnlock()
get_hmac := func(method string) string {
mac := hmac.New(sha1.New, []byte(secretKey))
@ -707,12 +857,16 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) {
panic(notAuthorized())
}
} else {
s.RLock()
session, ok := s.Sessions[key[7:]]
if !ok {
s.RUnlock()
panic(notAuthorized())
return
}
a.user = s.Accounts[session.username]
s.RUnlock()
}
switch req.Method {
@ -746,6 +900,14 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) {
}
}
func (s *SwiftServer) SetOverride(path string, fn HandlerOverrideFunc) {
s.override[path] = fn
}
func (s *SwiftServer) UnsetOverride(path string) {
delete(s.override, path)
}
func jsonMarshal(w io.Writer, x interface{}) {
if err := json.NewEncoder(w).Encode(x); err != nil {
panic(fmt.Errorf("error marshalling %#v: %v", x, err))
@ -773,14 +935,21 @@ func (srv *SwiftServer) resourceForURL(u *url.URL) (r resource) {
fatalf(404, "InvalidURI", err.Error())
}
srv.RLock()
account, ok := srv.Accounts[accountName]
if !ok {
//srv.RUnlock()
fatalf(404, "NoSuchAccount", "The specified account does not exist")
}
srv.RUnlock()
account.RLock()
if containerName == "" {
account.RUnlock()
return rootResource{}
}
account.RUnlock()
b := containerResource{
name: containerName,
container: account.Containers[containerName],
@ -800,6 +969,8 @@ func (srv *SwiftServer) resourceForURL(u *url.URL) (r resource) {
container: b.container,
}
objr.container.RLock()
defer objr.container.RUnlock()
if obj := objr.container.objects[objr.name]; obj != nil {
objr.object = obj
}
@ -835,9 +1006,12 @@ func (rootResource) get(a *action) interface{} {
h := a.w.Header()
h.Set("X-Account-Bytes-Used", strconv.Itoa(int(a.user.BytesUsed)))
h.Set("X-Account-Container-Count", strconv.Itoa(int(a.user.Account.Containers)))
h.Set("X-Account-Object-Count", strconv.Itoa(int(a.user.Objects)))
h.Set("X-Account-Bytes-Used", strconv.Itoa(int(atomic.LoadInt64(&a.user.BytesUsed))))
h.Set("X-Account-Container-Count", strconv.Itoa(int(atomic.LoadInt64(&a.user.Account.Containers))))
h.Set("X-Account-Object-Count", strconv.Itoa(int(atomic.LoadInt64(&a.user.Objects))))
a.user.RLock()
defer a.user.RUnlock()
// add metadata
a.user.metadata.getMetadata(a)
@ -862,7 +1036,7 @@ func (rootResource) get(a *action) interface{} {
}
if format == "json" {
resp = append(resp, Folder{
Count: len(container.objects),
Count: int64(len(container.objects)),
Bytes: container.bytes,
Name: container.name,
})
@ -879,7 +1053,9 @@ func (rootResource) get(a *action) interface{} {
}
func (r rootResource) post(a *action) interface{} {
a.user.Lock()
a.user.metadata.setMetadata(a, "account")
a.user.Unlock()
return nil
}
@ -894,21 +1070,10 @@ func (rootResource) delete(a *action) interface{} {
func (rootResource) copy(a *action) interface{} { return notAllowed() }
func NewSwiftServer(address string) (*SwiftServer, error) {
var (
l net.Listener
err error
)
if strings.Index(address, ":") == -1 {
for port := 1024; port < 65535; port++ {
addr := fmt.Sprintf("%s:%d", address, port)
if l, err = net.Listen("tcp", addr); err == nil {
address = addr
break
}
}
} else {
l, err = net.Listen("tcp", address)
address += ":0"
}
l, err := net.Listen("tcp", address)
if err != nil {
return nil, fmt.Errorf("cannot listen on %s: %v", address, err)
}
@ -919,6 +1084,7 @@ func NewSwiftServer(address string) (*SwiftServer, error) {
URL: "http://" + l.Addr().String() + "/v1",
Accounts: make(map[string]*account),
Sessions: make(map[string]*session),
override: make(map[string]HandlerOverrideFunc),
}
server.Accounts[TEST_ACCOUNT] = &account{

View file

@ -38,10 +38,12 @@ func (t *timeoutReader) Read(p []byte) (int, error) {
done <- result{n, err}
}()
// Wait for the read or the timeout
timer := time.NewTimer(t.timeout)
defer timer.Stop()
select {
case r := <-done:
return r.n, r.err
case <-time.After(t.timeout):
case <-timer.C:
t.cancel()
return 0, TimeoutError
}

View file

@ -5,29 +5,50 @@ import (
"time"
)
var watchdogChunkSize = 1 << 20 // 1 MiB
// An io.Reader which resets a watchdog timer whenever data is read
type watchdogReader struct {
timeout time.Duration
reader io.Reader
timer *time.Timer
timeout time.Duration
reader io.Reader
timer *time.Timer
chunkSize int
}
// Returns a new reader which will kick the watchdog timer whenever data is read
func newWatchdogReader(reader io.Reader, timeout time.Duration, timer *time.Timer) *watchdogReader {
return &watchdogReader{
timeout: timeout,
reader: reader,
timer: timer,
timeout: timeout,
reader: reader,
timer: timer,
chunkSize: watchdogChunkSize,
}
}
// Read reads up to len(p) bytes into p
func (t *watchdogReader) Read(p []byte) (n int, err error) {
// FIXME limit the amount of data read in one chunk so as to not exceed the timeout?
func (t *watchdogReader) Read(p []byte) (int, error) {
//read from underlying reader in chunks not larger than t.chunkSize
//while resetting the watchdog timer before every read; the small chunk
//size ensures that the timer does not fire when reading a large amount of
//data from a slow connection
start := 0
end := len(p)
for start < end {
length := end - start
if length > t.chunkSize {
length = t.chunkSize
}
resetTimer(t.timer, t.timeout)
n, err := t.reader.Read(p[start : start+length])
start += n
if n == 0 || err != nil {
return start, err
}
}
resetTimer(t.timer, t.timeout)
n, err = t.reader.Read(p)
resetTimer(t.timer, t.timeout)
return
return start, nil
}
// Check it satisfies the interface