StorageDriver: GCS: retry all api calls
Signed-off-by: Arthur Baars <arthur@semmle.com>
This commit is contained in:
parent
ffc9527782
commit
59a9607783
1 changed files with 50 additions and 14 deletions
|
@ -206,7 +206,7 @@ func (d *driver) ReadStream(context ctx.Context, path string, offset int64) (io.
|
||||||
}
|
}
|
||||||
if res.StatusCode == http.StatusRequestedRangeNotSatisfiable {
|
if res.StatusCode == http.StatusRequestedRangeNotSatisfiable {
|
||||||
res.Body.Close()
|
res.Body.Close()
|
||||||
obj, err := storage.StatObject(d.context(context), d.bucket, name)
|
obj, err := storageStatObject(d.context(context), d.bucket, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -287,7 +287,7 @@ func (d *driver) WriteStream(context ctx.Context, path string, offset int64, rea
|
||||||
}
|
}
|
||||||
// wc was closed succesfully, so the temporary part exists, schedule it for deletion at the end
|
// wc was closed succesfully, so the temporary part exists, schedule it for deletion at the end
|
||||||
// of the function
|
// of the function
|
||||||
defer storage.DeleteObject(gcsContext, d.bucket, partName)
|
defer storageDeleteObject(gcsContext, d.bucket, partName)
|
||||||
|
|
||||||
req := &storageapi.ComposeRequest{
|
req := &storageapi.ComposeRequest{
|
||||||
Destination: &storageapi.Object{Bucket: obj.Bucket, Name: obj.Name, ContentType: obj.ContentType},
|
Destination: &storageapi.Object{Bucket: obj.Bucket, Name: obj.Name, ContentType: obj.ContentType},
|
||||||
|
@ -386,7 +386,7 @@ func (d *driver) Stat(context ctx.Context, path string) (storagedriver.FileInfo,
|
||||||
var fi storagedriver.FileInfoFields
|
var fi storagedriver.FileInfoFields
|
||||||
//try to get as file
|
//try to get as file
|
||||||
gcsContext := d.context(context)
|
gcsContext := d.context(context)
|
||||||
obj, err := storage.StatObject(gcsContext, d.bucket, d.pathToKey(path))
|
obj, err := storageStatObject(gcsContext, d.bucket, d.pathToKey(path))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
fi = storagedriver.FileInfoFields{
|
fi = storagedriver.FileInfoFields{
|
||||||
Path: path,
|
Path: path,
|
||||||
|
@ -404,7 +404,7 @@ func (d *driver) Stat(context ctx.Context, path string) (storagedriver.FileInfo,
|
||||||
query.Prefix = dirpath
|
query.Prefix = dirpath
|
||||||
query.MaxResults = 1
|
query.MaxResults = 1
|
||||||
|
|
||||||
objects, err := storage.ListObjects(gcsContext, d.bucket, query)
|
objects, err := storageListObjects(gcsContext, d.bucket, query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -432,7 +432,7 @@ func (d *driver) List(context ctx.Context, path string) ([]string, error) {
|
||||||
query.Prefix = d.pathToDirKey(path)
|
query.Prefix = d.pathToDirKey(path)
|
||||||
list := make([]string, 0, 64)
|
list := make([]string, 0, 64)
|
||||||
for {
|
for {
|
||||||
objects, err := storage.ListObjects(d.context(context), d.bucket, query)
|
objects, err := storageListObjects(d.context(context), d.bucket, query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -482,7 +482,7 @@ func (d *driver) Move(context ctx.Context, sourcePath string, destPath string) e
|
||||||
var err error
|
var err error
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
dest := destPrefix + key[len(prefix):]
|
dest := destPrefix + key[len(prefix):]
|
||||||
_, err = storage.CopyObject(gcsContext, d.bucket, key, d.bucket, dest, nil)
|
_, err = storageCopyObject(gcsContext, d.bucket, key, d.bucket, dest, nil)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
copies = append(copies, dest)
|
copies = append(copies, dest)
|
||||||
} else {
|
} else {
|
||||||
|
@ -492,20 +492,20 @@ func (d *driver) Move(context ctx.Context, sourcePath string, destPath string) e
|
||||||
// if an error occurred, attempt to cleanup the copies made
|
// if an error occurred, attempt to cleanup the copies made
|
||||||
if err != nil {
|
if err != nil {
|
||||||
for i := len(copies) - 1; i >= 0; i-- {
|
for i := len(copies) - 1; i >= 0; i-- {
|
||||||
_ = storage.DeleteObject(gcsContext, d.bucket, copies[i])
|
_ = storageDeleteObject(gcsContext, d.bucket, copies[i])
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// delete originals
|
// delete originals
|
||||||
for i := len(keys) - 1; i >= 0; i-- {
|
for i := len(keys) - 1; i >= 0; i-- {
|
||||||
err2 := storage.DeleteObject(gcsContext, d.bucket, keys[i])
|
err2 := storageDeleteObject(gcsContext, d.bucket, keys[i])
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
err = err2
|
err = err2
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = storage.CopyObject(gcsContext, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), nil)
|
_, err = storageCopyObject(gcsContext, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if status := err.(*googleapi.Error); status != nil {
|
if status := err.(*googleapi.Error); status != nil {
|
||||||
if status.Code == http.StatusNotFound {
|
if status.Code == http.StatusNotFound {
|
||||||
|
@ -514,7 +514,7 @@ func (d *driver) Move(context ctx.Context, sourcePath string, destPath string) e
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return storage.DeleteObject(gcsContext, d.bucket, d.pathToKey(sourcePath))
|
return storageDeleteObject(gcsContext, d.bucket, d.pathToKey(sourcePath))
|
||||||
}
|
}
|
||||||
|
|
||||||
// listAll recursively lists all names of objects stored at "prefix" and its subpaths.
|
// listAll recursively lists all names of objects stored at "prefix" and its subpaths.
|
||||||
|
@ -524,7 +524,7 @@ func (d *driver) listAll(context context.Context, prefix string) ([]string, erro
|
||||||
query.Prefix = prefix
|
query.Prefix = prefix
|
||||||
query.Versions = false
|
query.Versions = false
|
||||||
for {
|
for {
|
||||||
objects, err := storage.ListObjects(d.context(context), d.bucket, query)
|
objects, err := storageListObjects(d.context(context), d.bucket, query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -555,8 +555,8 @@ func (d *driver) Delete(context ctx.Context, path string) error {
|
||||||
if len(keys) > 0 {
|
if len(keys) > 0 {
|
||||||
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
|
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
err := storage.DeleteObject(gcsContext, d.bucket, key)
|
err := storageDeleteObject(gcsContext, d.bucket, key)
|
||||||
// GCS only guarantees eventual consistency, solistAll might return
|
// GCS only guarantees eventual consistency, so listAll might return
|
||||||
// paths that no longer exist. If this happens, just ignore any not
|
// paths that no longer exist. If this happens, just ignore any not
|
||||||
// found error
|
// found error
|
||||||
if status, ok := err.(*googleapi.Error); ok {
|
if status, ok := err.(*googleapi.Error); ok {
|
||||||
|
@ -570,7 +570,7 @@ func (d *driver) Delete(context ctx.Context, path string) error {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
err = storage.DeleteObject(gcsContext, d.bucket, d.pathToKey(path))
|
err = storageDeleteObject(gcsContext, d.bucket, d.pathToKey(path))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if status := err.(*googleapi.Error); status != nil {
|
if status := err.(*googleapi.Error); status != nil {
|
||||||
if status.Code == http.StatusNotFound {
|
if status.Code == http.StatusNotFound {
|
||||||
|
@ -581,6 +581,42 @@ func (d *driver) Delete(context ctx.Context, path string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func storageDeleteObject(context context.Context, bucket string, name string) error {
|
||||||
|
return retry(5, func() error {
|
||||||
|
return storage.DeleteObject(context, bucket, name)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func storageStatObject(context context.Context, bucket string, name string) (*storage.Object, error) {
|
||||||
|
var obj *storage.Object
|
||||||
|
err := retry(5, func() error {
|
||||||
|
var err error
|
||||||
|
obj, err = storage.StatObject(context, bucket, name)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
return obj, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func storageListObjects(context context.Context, bucket string, q *storage.Query) (*storage.Objects, error) {
|
||||||
|
var objs *storage.Objects
|
||||||
|
err := retry(5, func() error {
|
||||||
|
var err error
|
||||||
|
objs, err = storage.ListObjects(context, bucket, q)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
return objs, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func storageCopyObject(context context.Context, srcBucket, srcName string, destBucket, destName string, attrs *storage.ObjectAttrs) (*storage.Object, error) {
|
||||||
|
var obj *storage.Object
|
||||||
|
err := retry(5, func() error {
|
||||||
|
var err error
|
||||||
|
obj, err = storage.CopyObject(context, srcBucket, srcName, destBucket, destName, attrs)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
return obj, err
|
||||||
|
}
|
||||||
|
|
||||||
// URLFor returns a URL which may be used to retrieve the content stored at
|
// URLFor returns a URL which may be used to retrieve the content stored at
|
||||||
// the given path, possibly using the given options.
|
// the given path, possibly using the given options.
|
||||||
// Returns ErrUnsupportedMethod if this driver has no privateKey
|
// Returns ErrUnsupportedMethod if this driver has no privateKey
|
||||||
|
|
Loading…
Reference in a new issue