Storagedriver: GCS: implement resumable uploads
Signed-off-by: Arthur Baars <arthur@semmle.com>
This commit is contained in:
parent
666273d9f6
commit
7162cb19c6
3 changed files with 468 additions and 170 deletions
|
@ -10,7 +10,7 @@ import (
|
||||||
_ "github.com/docker/distribution/registry/proxy"
|
_ "github.com/docker/distribution/registry/proxy"
|
||||||
_ "github.com/docker/distribution/registry/storage/driver/azure"
|
_ "github.com/docker/distribution/registry/storage/driver/azure"
|
||||||
_ "github.com/docker/distribution/registry/storage/driver/filesystem"
|
_ "github.com/docker/distribution/registry/storage/driver/filesystem"
|
||||||
// _ "github.com/docker/distribution/registry/storage/driver/gcs"
|
_ "github.com/docker/distribution/registry/storage/driver/gcs"
|
||||||
_ "github.com/docker/distribution/registry/storage/driver/inmemory"
|
_ "github.com/docker/distribution/registry/storage/driver/inmemory"
|
||||||
_ "github.com/docker/distribution/registry/storage/driver/middleware/cloudfront"
|
_ "github.com/docker/distribution/registry/storage/driver/middleware/cloudfront"
|
||||||
// _ "github.com/docker/distribution/registry/storage/driver/oss"
|
// _ "github.com/docker/distribution/registry/storage/driver/oss"
|
||||||
|
|
|
@ -7,11 +7,8 @@
|
||||||
// Because gcs is a key, value store the Stat call does not support last modification
|
// Because gcs is a key, value store the Stat call does not support last modification
|
||||||
// time for directories (directories are an abstraction for key, value stores)
|
// time for directories (directories are an abstraction for key, value stores)
|
||||||
//
|
//
|
||||||
// Keep in mind that gcs guarantees only eventual consistency, so do not assume
|
// Note that the contents of incomplete uploads are not accessible even though
|
||||||
// that a successful write will mean immediate access to the data written (although
|
// Stat returns their length
|
||||||
// in most regions a new object put has guaranteed read after write). The only true
|
|
||||||
// guarantee is that once you call Stat and receive a certain file size, that much of
|
|
||||||
// the file is already accessible.
|
|
||||||
//
|
//
|
||||||
// +build include_gcs
|
// +build include_gcs
|
||||||
|
|
||||||
|
@ -25,7 +22,9 @@ import (
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"regexp"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -34,7 +33,6 @@ import (
|
||||||
"golang.org/x/oauth2/google"
|
"golang.org/x/oauth2/google"
|
||||||
"golang.org/x/oauth2/jwt"
|
"golang.org/x/oauth2/jwt"
|
||||||
"google.golang.org/api/googleapi"
|
"google.golang.org/api/googleapi"
|
||||||
storageapi "google.golang.org/api/storage/v1"
|
|
||||||
"google.golang.org/cloud"
|
"google.golang.org/cloud"
|
||||||
"google.golang.org/cloud/storage"
|
"google.golang.org/cloud/storage"
|
||||||
|
|
||||||
|
@ -46,8 +44,18 @@ import (
|
||||||
"github.com/docker/distribution/registry/storage/driver/factory"
|
"github.com/docker/distribution/registry/storage/driver/factory"
|
||||||
)
|
)
|
||||||
|
|
||||||
const driverName = "gcs"
|
const (
|
||||||
const dummyProjectID = "<unknown>"
|
driverName = "gcs"
|
||||||
|
dummyProjectID = "<unknown>"
|
||||||
|
|
||||||
|
uploadSessionContentType = "application/x-docker-upload-session"
|
||||||
|
minChunkSize = 256 * 1024
|
||||||
|
maxChunkSize = 20 * minChunkSize
|
||||||
|
|
||||||
|
maxTries = 5
|
||||||
|
)
|
||||||
|
|
||||||
|
var rangeHeader = regexp.MustCompile(`^bytes=([0-9])+-([0-9]+)$`)
|
||||||
|
|
||||||
// driverParameters is a struct that encapsulates all of the driver parameters after all values have been set
|
// driverParameters is a struct that encapsulates all of the driver parameters after all values have been set
|
||||||
type driverParameters struct {
|
type driverParameters struct {
|
||||||
|
@ -155,7 +163,17 @@ func (d *driver) Name() string {
|
||||||
// GetContent retrieves the content stored at "path" as a []byte.
|
// GetContent retrieves the content stored at "path" as a []byte.
|
||||||
// This should primarily be used for small objects.
|
// This should primarily be used for small objects.
|
||||||
func (d *driver) GetContent(context ctx.Context, path string) ([]byte, error) {
|
func (d *driver) GetContent(context ctx.Context, path string) ([]byte, error) {
|
||||||
rc, err := d.ReadStream(context, path, 0)
|
gcsContext := d.context(context)
|
||||||
|
name := d.pathToKey(path)
|
||||||
|
var rc io.ReadCloser
|
||||||
|
err := retry(func() error {
|
||||||
|
var err error
|
||||||
|
rc, err = storage.NewReader(gcsContext, d.bucket, name)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err == storage.ErrObjectNotExist {
|
||||||
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -171,25 +189,53 @@ func (d *driver) GetContent(context ctx.Context, path string) ([]byte, error) {
|
||||||
// PutContent stores the []byte content at a location designated by "path".
|
// PutContent stores the []byte content at a location designated by "path".
|
||||||
// This should primarily be used for small objects.
|
// This should primarily be used for small objects.
|
||||||
func (d *driver) PutContent(context ctx.Context, path string, contents []byte) error {
|
func (d *driver) PutContent(context ctx.Context, path string, contents []byte) error {
|
||||||
wc := storage.NewWriter(d.context(context), d.bucket, d.pathToKey(path))
|
return retry(func() error {
|
||||||
wc.ContentType = "application/octet-stream"
|
wc := storage.NewWriter(d.context(context), d.bucket, d.pathToKey(path))
|
||||||
defer wc.Close()
|
wc.ContentType = "application/octet-stream"
|
||||||
_, err := wc.Write(contents)
|
return putContentsClose(wc, contents)
|
||||||
return err
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadStream retrieves an io.ReadCloser for the content stored at "path"
|
// Reader retrieves an io.ReadCloser for the content stored at "path"
|
||||||
// with a given byte offset.
|
// with a given byte offset.
|
||||||
// May be used to resume reading a stream by providing a nonzero offset.
|
// May be used to resume reading a stream by providing a nonzero offset.
|
||||||
func (d *driver) ReadStream(context ctx.Context, path string, offset int64) (io.ReadCloser, error) {
|
func (d *driver) Reader(context ctx.Context, path string, offset int64) (io.ReadCloser, error) {
|
||||||
name := d.pathToKey(path)
|
res, err := getObject(d.client, d.bucket, d.pathToKey(path), offset)
|
||||||
|
if err != nil {
|
||||||
|
if res != nil {
|
||||||
|
if res.StatusCode == http.StatusNotFound {
|
||||||
|
res.Body.Close()
|
||||||
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.StatusCode == http.StatusRequestedRangeNotSatisfiable {
|
||||||
|
res.Body.Close()
|
||||||
|
obj, err := storageStatObject(d.context(context), d.bucket, d.pathToKey(path))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if offset == int64(obj.Size) {
|
||||||
|
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
|
||||||
|
}
|
||||||
|
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if res.Header.Get("Content-Type") == uploadSessionContentType {
|
||||||
|
defer res.Body.Close()
|
||||||
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||||
|
}
|
||||||
|
return res.Body, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getObject(client *http.Client, bucket string, name string, offset int64) (*http.Response, error) {
|
||||||
// copied from google.golang.org/cloud/storage#NewReader :
|
// copied from google.golang.org/cloud/storage#NewReader :
|
||||||
// to set the additional "Range" header
|
// to set the additional "Range" header
|
||||||
u := &url.URL{
|
u := &url.URL{
|
||||||
Scheme: "https",
|
Scheme: "https",
|
||||||
Host: "storage.googleapis.com",
|
Host: "storage.googleapis.com",
|
||||||
Path: fmt.Sprintf("/%s/%s", d.bucket, name),
|
Path: fmt.Sprintf("/%s/%s", bucket, name),
|
||||||
}
|
}
|
||||||
req, err := http.NewRequest("GET", u.String(), nil)
|
req, err := http.NewRequest("GET", u.String(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -198,122 +244,253 @@ func (d *driver) ReadStream(context ctx.Context, path string, offset int64) (io.
|
||||||
if offset > 0 {
|
if offset > 0 {
|
||||||
req.Header.Set("Range", fmt.Sprintf("bytes=%v-", offset))
|
req.Header.Set("Range", fmt.Sprintf("bytes=%v-", offset))
|
||||||
}
|
}
|
||||||
res, err := d.client.Do(req)
|
var res *http.Response
|
||||||
|
err = retry(func() error {
|
||||||
|
var err error
|
||||||
|
res, err = client.Do(req)
|
||||||
|
return err
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if res.StatusCode == http.StatusNotFound {
|
return res, googleapi.CheckMediaResponse(res)
|
||||||
res.Body.Close()
|
}
|
||||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
|
||||||
|
// Writer returns a FileWriter which will store the content written to it
|
||||||
|
// at the location designated by "path" after the call to Commit.
|
||||||
|
func (d *driver) Writer(context ctx.Context, path string, append bool) (storagedriver.FileWriter, error) {
|
||||||
|
writer := &writer{
|
||||||
|
client: d.client,
|
||||||
|
bucket: d.bucket,
|
||||||
|
name: d.pathToKey(path),
|
||||||
|
buffer: make([]byte, maxChunkSize),
|
||||||
}
|
}
|
||||||
if res.StatusCode == http.StatusRequestedRangeNotSatisfiable {
|
|
||||||
res.Body.Close()
|
if append {
|
||||||
obj, err := storageStatObject(d.context(context), d.bucket, name)
|
err := writer.init(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if offset == int64(obj.Size) {
|
|
||||||
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
|
|
||||||
}
|
|
||||||
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
|
|
||||||
}
|
}
|
||||||
if res.StatusCode < 200 || res.StatusCode > 299 {
|
return writer, nil
|
||||||
res.Body.Close()
|
|
||||||
return nil, fmt.Errorf("storage: can't read object %v/%v, status code: %v", d.bucket, name, res.Status)
|
|
||||||
}
|
|
||||||
return res.Body, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteStream stores the contents of the provided io.ReadCloser at a
|
type writer struct {
|
||||||
// location designated by the given path.
|
client *http.Client
|
||||||
// May be used to resume writing a stream by providing a nonzero offset.
|
bucket string
|
||||||
// The offset must be no larger than the CurrentSize for this path.
|
name string
|
||||||
func (d *driver) WriteStream(context ctx.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) {
|
size int64
|
||||||
if offset < 0 {
|
offset int64
|
||||||
return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
|
closed bool
|
||||||
}
|
sessionURI string
|
||||||
|
buffer []byte
|
||||||
|
buffSize int
|
||||||
|
}
|
||||||
|
|
||||||
if offset == 0 {
|
// Cancel removes any written content from this FileWriter.
|
||||||
return d.writeCompletely(context, path, 0, reader)
|
func (w *writer) Cancel() error {
|
||||||
}
|
err := w.checkClosed()
|
||||||
|
|
||||||
service, err := storageapi.New(d.client)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
objService := storageapi.NewObjectsService(service)
|
|
||||||
var obj *storageapi.Object
|
|
||||||
err = retry(5, func() error {
|
|
||||||
o, err := objService.Get(d.bucket, d.pathToKey(path)).Do()
|
|
||||||
obj = o
|
|
||||||
return err
|
return err
|
||||||
})
|
}
|
||||||
// obj, err := retry(5, objService.Get(d.bucket, d.pathToKey(path)).Do)
|
w.closed = true
|
||||||
|
err = storageDeleteObject(cloud.NewContext(dummyProjectID, w.client), w.bucket, w.name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
if status, ok := err.(*googleapi.Error); ok {
|
||||||
}
|
if status.Code == http.StatusNotFound {
|
||||||
|
err = nil
|
||||||
// cannot append more chunks, so redo from scratch
|
}
|
||||||
if obj.ComponentCount >= 1023 {
|
}
|
||||||
return d.writeCompletely(context, path, offset, reader)
|
}
|
||||||
}
|
return err
|
||||||
|
}
|
||||||
// skip from reader
|
|
||||||
objSize := int64(obj.Size)
|
func (w *writer) Close() error {
|
||||||
nn, err := skip(reader, objSize-offset)
|
if w.closed {
|
||||||
if err != nil {
|
return nil
|
||||||
return nn, err
|
}
|
||||||
}
|
w.closed = true
|
||||||
|
|
||||||
// Size <= offset
|
err := w.writeChunk()
|
||||||
partName := fmt.Sprintf("%v#part-%d#", d.pathToKey(path), obj.ComponentCount)
|
if err != nil {
|
||||||
gcsContext := d.context(context)
|
return err
|
||||||
wc := storage.NewWriter(gcsContext, d.bucket, partName)
|
}
|
||||||
wc.ContentType = "application/octet-stream"
|
|
||||||
|
// Copy the remaining bytes from the buffer to the upload session
|
||||||
if objSize < offset {
|
// Normally buffSize will be smaller than minChunkSize. However, in the
|
||||||
err = writeZeros(wc, offset-objSize)
|
// unlikely event that the upload session failed to start, this number could be higher.
|
||||||
if err != nil {
|
// In this case we can safely clip the remaining bytes to the minChunkSize
|
||||||
wc.CloseWithError(err)
|
if w.buffSize > minChunkSize {
|
||||||
return nn, err
|
w.buffSize = minChunkSize
|
||||||
|
}
|
||||||
|
|
||||||
|
// commit the writes by updating the upload session
|
||||||
|
err = retry(func() error {
|
||||||
|
wc := storage.NewWriter(cloud.NewContext(dummyProjectID, w.client), w.bucket, w.name)
|
||||||
|
wc.ContentType = uploadSessionContentType
|
||||||
|
wc.Metadata = map[string]string{
|
||||||
|
"Session-URI": w.sessionURI,
|
||||||
|
"Offset": strconv.FormatInt(w.offset, 10),
|
||||||
|
}
|
||||||
|
return putContentsClose(wc, w.buffer[0:w.buffSize])
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
w.size = w.offset + int64(w.buffSize)
|
||||||
|
w.buffSize = 0
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func putContentsClose(wc *storage.Writer, contents []byte) error {
|
||||||
|
size := len(contents)
|
||||||
|
var nn int
|
||||||
|
var err error
|
||||||
|
for nn < size {
|
||||||
|
n, err := wc.Write(contents[nn:size])
|
||||||
|
nn += n
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
n, err := io.Copy(wc, reader)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
wc.CloseWithError(err)
|
wc.CloseWithError(err)
|
||||||
return nn, err
|
return err
|
||||||
|
}
|
||||||
|
return wc.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Commit flushes all content written to this FileWriter and makes it
|
||||||
|
// available for future calls to StorageDriver.GetContent and
|
||||||
|
// StorageDriver.Reader.
|
||||||
|
func (w *writer) Commit() error {
|
||||||
|
|
||||||
|
if err := w.checkClosed(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
w.closed = true
|
||||||
|
|
||||||
|
// no session started yet just perform a simple upload
|
||||||
|
if w.sessionURI == "" {
|
||||||
|
err := retry(func() error {
|
||||||
|
wc := storage.NewWriter(cloud.NewContext(dummyProjectID, w.client), w.bucket, w.name)
|
||||||
|
wc.ContentType = "application/octet-stream"
|
||||||
|
return putContentsClose(wc, w.buffer[0:w.buffSize])
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
w.size = w.offset + int64(w.buffSize)
|
||||||
|
w.buffSize = 0
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
size := w.offset + int64(w.buffSize)
|
||||||
|
var nn int
|
||||||
|
// loop must be performed at least once to ensure the file is committed even when
|
||||||
|
// the buffer is empty
|
||||||
|
for {
|
||||||
|
n, err := putChunk(w.client, w.sessionURI, w.buffer[nn:w.buffSize], w.offset, size)
|
||||||
|
nn += int(n)
|
||||||
|
w.offset += n
|
||||||
|
w.size = w.offset
|
||||||
|
if err != nil {
|
||||||
|
w.buffSize = copy(w.buffer, w.buffer[nn:w.buffSize])
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if nn == w.buffSize {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w.buffSize = 0
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *writer) checkClosed() error {
|
||||||
|
if w.closed {
|
||||||
|
return fmt.Errorf("Writer already closed")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *writer) writeChunk() error {
|
||||||
|
var err error
|
||||||
|
// chunks can be uploaded only in multiples of minChunkSize
|
||||||
|
// chunkSize is a multiple of minChunkSize less than or equal to buffSize
|
||||||
|
chunkSize := w.buffSize - (w.buffSize % minChunkSize)
|
||||||
|
if chunkSize == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// if their is no sessionURI yet, obtain one by starting the session
|
||||||
|
if w.sessionURI == "" {
|
||||||
|
w.sessionURI, err = startSession(w.client, w.bucket, w.name)
|
||||||
}
|
}
|
||||||
err = wc.Close()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nn, err
|
return err
|
||||||
}
|
}
|
||||||
// wc was closed successfully, so the temporary part exists, schedule it for deletion at the end
|
nn, err := putChunk(w.client, w.sessionURI, w.buffer[0:chunkSize], w.offset, -1)
|
||||||
// of the function
|
w.offset += nn
|
||||||
defer storageDeleteObject(gcsContext, d.bucket, partName)
|
if w.offset > w.size {
|
||||||
|
w.size = w.offset
|
||||||
|
}
|
||||||
|
// shift the remaining bytes to the start of the buffer
|
||||||
|
w.buffSize = copy(w.buffer, w.buffer[int(nn):w.buffSize])
|
||||||
|
|
||||||
req := &storageapi.ComposeRequest{
|
return err
|
||||||
Destination: &storageapi.Object{Bucket: obj.Bucket, Name: obj.Name, ContentType: obj.ContentType},
|
}
|
||||||
SourceObjects: []*storageapi.ComposeRequestSourceObjects{
|
|
||||||
{
|
func (w *writer) Write(p []byte) (int, error) {
|
||||||
Name: obj.Name,
|
err := w.checkClosed()
|
||||||
Generation: obj.Generation,
|
if err != nil {
|
||||||
}, {
|
return 0, err
|
||||||
Name: partName,
|
|
||||||
Generation: wc.Object().Generation,
|
|
||||||
}},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err = retry(5, func() error { _, err := objService.Compose(d.bucket, obj.Name, req).Do(); return err })
|
var nn int
|
||||||
if err == nil {
|
for nn < len(p) {
|
||||||
nn = nn + n
|
n := copy(w.buffer[w.buffSize:], p[nn:])
|
||||||
|
w.buffSize += n
|
||||||
|
if w.buffSize == cap(w.buffer) {
|
||||||
|
err = w.writeChunk()
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nn += n
|
||||||
}
|
}
|
||||||
|
|
||||||
return nn, err
|
return nn, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Size returns the number of bytes written to this FileWriter.
|
||||||
|
func (w *writer) Size() int64 {
|
||||||
|
return w.size
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *writer) init(path string) error {
|
||||||
|
res, err := getObject(w.client, w.bucket, w.name, 0)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
if res.Header.Get("Content-Type") != uploadSessionContentType {
|
||||||
|
return storagedriver.PathNotFoundError{Path: path}
|
||||||
|
}
|
||||||
|
offset, err := strconv.ParseInt(res.Header.Get("X-Goog-Meta-Offset"), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
buffer, err := ioutil.ReadAll(res.Body)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
w.sessionURI = res.Header.Get("X-Goog-Meta-Session-URI")
|
||||||
|
w.buffSize = copy(w.buffer, buffer)
|
||||||
|
w.offset = offset
|
||||||
|
w.size = offset + int64(w.buffSize)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type request func() error
|
type request func() error
|
||||||
|
|
||||||
func retry(maxTries int, req request) error {
|
func retry(req request) error {
|
||||||
backoff := time.Second
|
backoff := time.Second
|
||||||
var err error
|
var err error
|
||||||
for i := 0; i < maxTries; i++ {
|
for i := 0; i < maxTries; i++ {
|
||||||
|
@ -335,53 +512,6 @@ func retry(maxTries int, req request) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) writeCompletely(context ctx.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) {
|
|
||||||
wc := storage.NewWriter(d.context(context), d.bucket, d.pathToKey(path))
|
|
||||||
wc.ContentType = "application/octet-stream"
|
|
||||||
defer wc.Close()
|
|
||||||
|
|
||||||
// Copy the first offset bytes of the existing contents
|
|
||||||
// (padded with zeros if needed) into the writer
|
|
||||||
if offset > 0 {
|
|
||||||
existing, err := d.ReadStream(context, path, 0)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
defer existing.Close()
|
|
||||||
n, err := io.CopyN(wc, existing, offset)
|
|
||||||
if err == io.EOF {
|
|
||||||
err = writeZeros(wc, offset-n)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return io.Copy(wc, reader)
|
|
||||||
}
|
|
||||||
|
|
||||||
func skip(reader io.Reader, count int64) (int64, error) {
|
|
||||||
if count <= 0 {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
return io.CopyN(ioutil.Discard, reader, count)
|
|
||||||
}
|
|
||||||
|
|
||||||
func writeZeros(wc io.Writer, count int64) error {
|
|
||||||
buf := make([]byte, 32*1024)
|
|
||||||
for count > 0 {
|
|
||||||
size := cap(buf)
|
|
||||||
if int64(size) > count {
|
|
||||||
size = int(count)
|
|
||||||
}
|
|
||||||
n, err := wc.Write(buf[0:size])
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
count = count - int64(n)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stat retrieves the FileInfo for the given path, including the current
|
// Stat retrieves the FileInfo for the given path, including the current
|
||||||
// size in bytes and the creation time.
|
// size in bytes and the creation time.
|
||||||
func (d *driver) Stat(context ctx.Context, path string) (storagedriver.FileInfo, error) {
|
func (d *driver) Stat(context ctx.Context, path string) (storagedriver.FileInfo, error) {
|
||||||
|
@ -390,6 +520,9 @@ func (d *driver) Stat(context ctx.Context, path string) (storagedriver.FileInfo,
|
||||||
gcsContext := d.context(context)
|
gcsContext := d.context(context)
|
||||||
obj, err := storageStatObject(gcsContext, d.bucket, d.pathToKey(path))
|
obj, err := storageStatObject(gcsContext, d.bucket, d.pathToKey(path))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
if obj.ContentType == uploadSessionContentType {
|
||||||
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||||
|
}
|
||||||
fi = storagedriver.FileInfoFields{
|
fi = storagedriver.FileInfoFields{
|
||||||
Path: path,
|
Path: path,
|
||||||
Size: obj.Size,
|
Size: obj.Size,
|
||||||
|
@ -440,15 +573,10 @@ func (d *driver) List(context ctx.Context, path string) ([]string, error) {
|
||||||
}
|
}
|
||||||
for _, object := range objects.Results {
|
for _, object := range objects.Results {
|
||||||
// GCS does not guarantee strong consistency between
|
// GCS does not guarantee strong consistency between
|
||||||
// DELETE and LIST operationsCheck that the object is not deleted,
|
// DELETE and LIST operations. Check that the object is not deleted,
|
||||||
// so filter out any objects with a non-zero time-deleted
|
// and filter out any objects with a non-zero time-deleted
|
||||||
if object.Deleted.IsZero() {
|
if object.Deleted.IsZero() && object.ContentType != uploadSessionContentType {
|
||||||
name := object.Name
|
list = append(list, d.keyToPath(object.Name))
|
||||||
// Ignore objects with names that end with '#' (these are uploaded parts)
|
|
||||||
if name[len(name)-1] != '#' {
|
|
||||||
name = d.keyToPath(name)
|
|
||||||
list = append(list, name)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, subpath := range objects.Prefixes {
|
for _, subpath := range objects.Prefixes {
|
||||||
|
@ -474,7 +602,7 @@ func (d *driver) Move(context ctx.Context, sourcePath string, destPath string) e
|
||||||
gcsContext := d.context(context)
|
gcsContext := d.context(context)
|
||||||
_, err := storageCopyObject(gcsContext, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), nil)
|
_, err := storageCopyObject(gcsContext, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if status := err.(*googleapi.Error); status != nil {
|
if status, ok := err.(*googleapi.Error); ok {
|
||||||
if status.Code == http.StatusNotFound {
|
if status.Code == http.StatusNotFound {
|
||||||
return storagedriver.PathNotFoundError{Path: sourcePath}
|
return storagedriver.PathNotFoundError{Path: sourcePath}
|
||||||
}
|
}
|
||||||
|
@ -545,7 +673,7 @@ func (d *driver) Delete(context ctx.Context, path string) error {
|
||||||
}
|
}
|
||||||
err = storageDeleteObject(gcsContext, d.bucket, d.pathToKey(path))
|
err = storageDeleteObject(gcsContext, d.bucket, d.pathToKey(path))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if status := err.(*googleapi.Error); status != nil {
|
if status, ok := err.(*googleapi.Error); ok {
|
||||||
if status.Code == http.StatusNotFound {
|
if status.Code == http.StatusNotFound {
|
||||||
return storagedriver.PathNotFoundError{Path: path}
|
return storagedriver.PathNotFoundError{Path: path}
|
||||||
}
|
}
|
||||||
|
@ -555,14 +683,14 @@ func (d *driver) Delete(context ctx.Context, path string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func storageDeleteObject(context context.Context, bucket string, name string) error {
|
func storageDeleteObject(context context.Context, bucket string, name string) error {
|
||||||
return retry(5, func() error {
|
return retry(func() error {
|
||||||
return storage.DeleteObject(context, bucket, name)
|
return storage.DeleteObject(context, bucket, name)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func storageStatObject(context context.Context, bucket string, name string) (*storage.Object, error) {
|
func storageStatObject(context context.Context, bucket string, name string) (*storage.Object, error) {
|
||||||
var obj *storage.Object
|
var obj *storage.Object
|
||||||
err := retry(5, func() error {
|
err := retry(func() error {
|
||||||
var err error
|
var err error
|
||||||
obj, err = storage.StatObject(context, bucket, name)
|
obj, err = storage.StatObject(context, bucket, name)
|
||||||
return err
|
return err
|
||||||
|
@ -572,7 +700,7 @@ func storageStatObject(context context.Context, bucket string, name string) (*st
|
||||||
|
|
||||||
func storageListObjects(context context.Context, bucket string, q *storage.Query) (*storage.Objects, error) {
|
func storageListObjects(context context.Context, bucket string, q *storage.Query) (*storage.Objects, error) {
|
||||||
var objs *storage.Objects
|
var objs *storage.Objects
|
||||||
err := retry(5, func() error {
|
err := retry(func() error {
|
||||||
var err error
|
var err error
|
||||||
objs, err = storage.ListObjects(context, bucket, q)
|
objs, err = storage.ListObjects(context, bucket, q)
|
||||||
return err
|
return err
|
||||||
|
@ -582,7 +710,7 @@ func storageListObjects(context context.Context, bucket string, q *storage.Query
|
||||||
|
|
||||||
func storageCopyObject(context context.Context, srcBucket, srcName string, destBucket, destName string, attrs *storage.ObjectAttrs) (*storage.Object, error) {
|
func storageCopyObject(context context.Context, srcBucket, srcName string, destBucket, destName string, attrs *storage.ObjectAttrs) (*storage.Object, error) {
|
||||||
var obj *storage.Object
|
var obj *storage.Object
|
||||||
err := retry(5, func() error {
|
err := retry(func() error {
|
||||||
var err error
|
var err error
|
||||||
obj, err = storage.CopyObject(context, srcBucket, srcName, destBucket, destName, attrs)
|
obj, err = storage.CopyObject(context, srcBucket, srcName, destBucket, destName, attrs)
|
||||||
return err
|
return err
|
||||||
|
@ -626,6 +754,80 @@ func (d *driver) URLFor(context ctx.Context, path string, options map[string]int
|
||||||
return storage.SignedURL(d.bucket, name, opts)
|
return storage.SignedURL(d.bucket, name, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func startSession(client *http.Client, bucket string, name string) (uri string, err error) {
|
||||||
|
u := &url.URL{
|
||||||
|
Scheme: "https",
|
||||||
|
Host: "www.googleapis.com",
|
||||||
|
Path: fmt.Sprintf("/upload/storage/v1/b/%v/o", bucket),
|
||||||
|
RawQuery: fmt.Sprintf("uploadType=resumable&name=%v", name),
|
||||||
|
}
|
||||||
|
err = retry(func() error {
|
||||||
|
req, err := http.NewRequest("POST", u.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req.Header.Set("X-Upload-Content-Type", "application/octet-stream")
|
||||||
|
req.Header.Set("Content-Length", "0")
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
err = googleapi.CheckMediaResponse(resp)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
uri = resp.Header.Get("Location")
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return uri, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func putChunk(client *http.Client, sessionURI string, chunk []byte, from int64, totalSize int64) (int64, error) {
|
||||||
|
bytesPut := int64(0)
|
||||||
|
err := retry(func() error {
|
||||||
|
req, err := http.NewRequest("PUT", sessionURI, bytes.NewReader(chunk))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
length := int64(len(chunk))
|
||||||
|
to := from + length - 1
|
||||||
|
size := "*"
|
||||||
|
if totalSize >= 0 {
|
||||||
|
size = strconv.FormatInt(totalSize, 10)
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/octet-stream")
|
||||||
|
if from == to+1 {
|
||||||
|
req.Header.Set("Content-Range", fmt.Sprintf("bytes */%v", size))
|
||||||
|
} else {
|
||||||
|
req.Header.Set("Content-Range", fmt.Sprintf("bytes %v-%v/%v", from, to, size))
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Length", strconv.FormatInt(length, 10))
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if totalSize < 0 && resp.StatusCode == 308 {
|
||||||
|
groups := rangeHeader.FindStringSubmatch(resp.Header.Get("Range"))
|
||||||
|
end, err := strconv.ParseInt(groups[2], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
bytesPut = end - from + 1
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
err = googleapi.CheckMediaResponse(resp)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
bytesPut = to - from + 1
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return bytesPut, err
|
||||||
|
}
|
||||||
|
|
||||||
func (d *driver) context(context ctx.Context) context.Context {
|
func (d *driver) context(context ctx.Context) context.Context {
|
||||||
return cloud.WithContext(context, dummyProjectID, d.client)
|
return cloud.WithContext(context, dummyProjectID, d.client)
|
||||||
}
|
}
|
||||||
|
|
|
@ -85,6 +85,102 @@ func init() {
|
||||||
}, skipGCS)
|
}, skipGCS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test Committing a FileWriter without having called Write
|
||||||
|
func TestCommitEmpty(t *testing.T) {
|
||||||
|
if skipGCS() != "" {
|
||||||
|
t.Skip(skipGCS())
|
||||||
|
}
|
||||||
|
|
||||||
|
validRoot, err := ioutil.TempDir("", "driver-")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating temporary directory: %v", err)
|
||||||
|
}
|
||||||
|
defer os.Remove(validRoot)
|
||||||
|
|
||||||
|
driver, err := gcsDriverConstructor(validRoot)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating rooted driver: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
filename := "/test"
|
||||||
|
ctx := ctx.Background()
|
||||||
|
|
||||||
|
writer, err := driver.Writer(ctx, filename, false)
|
||||||
|
defer driver.Delete(ctx, filename)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("driver.Writer: unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
err = writer.Commit()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("writer.Commit: unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
err = writer.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("writer.Close: unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if writer.Size() != 0 {
|
||||||
|
t.Fatalf("writer.Size: %d != 0", writer.Size())
|
||||||
|
}
|
||||||
|
readContents, err := driver.GetContent(ctx, filename)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("driver.GetContent: unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if len(readContents) != 0 {
|
||||||
|
t.Fatalf("len(driver.GetContent(..)): %d != 0", len(readContents))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test Committing a FileWriter after having written exactly
|
||||||
|
// defaultChunksize bytes.
|
||||||
|
func TestCommit(t *testing.T) {
|
||||||
|
if skipGCS() != "" {
|
||||||
|
t.Skip(skipGCS())
|
||||||
|
}
|
||||||
|
|
||||||
|
validRoot, err := ioutil.TempDir("", "driver-")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating temporary directory: %v", err)
|
||||||
|
}
|
||||||
|
defer os.Remove(validRoot)
|
||||||
|
|
||||||
|
driver, err := gcsDriverConstructor(validRoot)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating rooted driver: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
filename := "/test"
|
||||||
|
ctx := ctx.Background()
|
||||||
|
|
||||||
|
contents := make([]byte, defaultChunkSize)
|
||||||
|
writer, err := driver.Writer(ctx, filename, false)
|
||||||
|
defer driver.Delete(ctx, filename)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("driver.Writer: unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
_, err = writer.Write(contents)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("writer.Write: unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
err = writer.Commit()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("writer.Commit: unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
err = writer.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("writer.Close: unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if writer.Size() != int64(len(contents)) {
|
||||||
|
t.Fatalf("writer.Size: %d != %d", writer.Size(), len(contents))
|
||||||
|
}
|
||||||
|
readContents, err := driver.GetContent(ctx, filename)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("driver.GetContent: unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if len(readContents) != len(contents) {
|
||||||
|
t.Fatalf("len(driver.GetContent(..)): %d != %d", len(readContents), len(contents))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRetry(t *testing.T) {
|
func TestRetry(t *testing.T) {
|
||||||
if skipGCS() != "" {
|
if skipGCS() != "" {
|
||||||
t.Skip(skipGCS())
|
t.Skip(skipGCS())
|
||||||
|
@ -100,7 +196,7 @@ func TestRetry(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := retry(2, func() error {
|
err := retry(func() error {
|
||||||
return &googleapi.Error{
|
return &googleapi.Error{
|
||||||
Code: 503,
|
Code: 503,
|
||||||
Message: "google api error",
|
Message: "google api error",
|
||||||
|
@ -108,7 +204,7 @@ func TestRetry(t *testing.T) {
|
||||||
})
|
})
|
||||||
assertError("googleapi: Error 503: google api error", err)
|
assertError("googleapi: Error 503: google api error", err)
|
||||||
|
|
||||||
err = retry(2, func() error {
|
err = retry(func() error {
|
||||||
return &googleapi.Error{
|
return &googleapi.Error{
|
||||||
Code: 404,
|
Code: 404,
|
||||||
Message: "google api error",
|
Message: "google api error",
|
||||||
|
@ -116,7 +212,7 @@ func TestRetry(t *testing.T) {
|
||||||
})
|
})
|
||||||
assertError("googleapi: Error 404: google api error", err)
|
assertError("googleapi: Error 404: google api error", err)
|
||||||
|
|
||||||
err = retry(2, func() error {
|
err = retry(func() error {
|
||||||
return fmt.Errorf("error")
|
return fmt.Errorf("error")
|
||||||
})
|
})
|
||||||
assertError("error", err)
|
assertError("error", err)
|
||||||
|
|
Loading…
Reference in a new issue