diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 6674574d..7f36c94b 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -49,7 +49,7 @@ Then immediately submit this new file as a pull-request, in order to get early f Eventually, you will have to update your proposal to accommodate the feedback you received. -Usually, it's not advisable to start working too much on the implementation itself before the proposal receives sufficient feedback, since it can significantly altered (or rejected). +Usually, it's not advisable to start working too much on the implementation itself before the proposal receives sufficient feedback, since it can be significantly altered (or rejected). Your implementation should then be submitted as a separate PR, that will be reviewed as well. diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index dc643683..ab255849 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -12,15 +12,15 @@ }, { "ImportPath": "github.com/AdRoll/goamz/aws", - "Rev": "d3664b76d90508cdda5a6c92042f26eab5db3103" + "Rev": "cc210f45dcb9889c2769a274522be2bf70edfb99" }, { "ImportPath": "github.com/AdRoll/goamz/cloudfront", - "Rev": "d3664b76d90508cdda5a6c92042f26eab5db3103" + "Rev": "cc210f45dcb9889c2769a274522be2bf70edfb99" }, { "ImportPath": "github.com/AdRoll/goamz/s3", - "Rev": "d3664b76d90508cdda5a6c92042f26eab5db3103" + "Rev": "cc210f45dcb9889c2769a274522be2bf70edfb99" }, { "ImportPath": "github.com/MSOpenTech/azure-sdk-for-go/storage", diff --git a/Godeps/_workspace/src/github.com/AdRoll/goamz/aws/aws.go b/Godeps/_workspace/src/github.com/AdRoll/goamz/aws/aws.go index 38c9e656..87c2d6da 100644 --- a/Godeps/_workspace/src/github.com/AdRoll/goamz/aws/aws.go +++ b/Godeps/_workspace/src/github.com/AdRoll/goamz/aws/aws.go @@ -62,6 +62,7 @@ type Region struct { SESEndpoint string IAMEndpoint string ELBEndpoint string + KMSEndpoint string DynamoDBEndpoint string CloudWatchServicepoint ServiceInfo AutoScalingEndpoint string @@ -83,6 +84,7 @@ var Regions = map[string]Region{ USWest2.Name: USWest2, USGovWest.Name: USGovWest, SAEast.Name: SAEast, + CNNorth1.Name: CNNorth1, } // Designates a signer interface suitable for signing AWS requests, params @@ -208,7 +210,10 @@ func (a *Auth) Token() string { return "" } if time.Since(a.expiration) >= -30*time.Second { //in an ideal world this should be zero assuming the instance is synching it's clock - *a, _ = GetAuth("", "", "", time.Time{}) + auth, err := GetAuth("", "", "", time.Time{}) + if err == nil { + *a = auth + } } return a.token } diff --git a/Godeps/_workspace/src/github.com/AdRoll/goamz/aws/regions.go b/Godeps/_workspace/src/github.com/AdRoll/goamz/aws/regions.go index 4e39069e..fdc2626b 100644 --- a/Godeps/_workspace/src/github.com/AdRoll/goamz/aws/regions.go +++ b/Godeps/_workspace/src/github.com/AdRoll/goamz/aws/regions.go @@ -13,6 +13,7 @@ var USGovWest = Region{ "", "https://iam.us-gov.amazonaws.com", "https://elasticloadbalancing.us-gov-west-1.amazonaws.com", + "", "https://dynamodb.us-gov-west-1.amazonaws.com", ServiceInfo{"https://monitoring.us-gov-west-1.amazonaws.com", V2Signature}, "https://autoscaling.us-gov-west-1.amazonaws.com", @@ -36,6 +37,7 @@ var USEast = Region{ "https://email.us-east-1.amazonaws.com", "https://iam.amazonaws.com", "https://elasticloadbalancing.us-east-1.amazonaws.com", + "https://kms.us-east-1.amazonaws.com", "https://dynamodb.us-east-1.amazonaws.com", ServiceInfo{"https://monitoring.us-east-1.amazonaws.com", V2Signature}, "https://autoscaling.us-east-1.amazonaws.com", @@ -59,6 +61,7 @@ var USWest = Region{ "", "https://iam.amazonaws.com", "https://elasticloadbalancing.us-west-1.amazonaws.com", + "https://kms.us-west-1.amazonaws.com", "https://dynamodb.us-west-1.amazonaws.com", ServiceInfo{"https://monitoring.us-west-1.amazonaws.com", V2Signature}, "https://autoscaling.us-west-1.amazonaws.com", @@ -82,6 +85,7 @@ var USWest2 = Region{ "https://email.us-west-2.amazonaws.com", "https://iam.amazonaws.com", "https://elasticloadbalancing.us-west-2.amazonaws.com", + "https://kms.us-west-2.amazonaws.com", "https://dynamodb.us-west-2.amazonaws.com", ServiceInfo{"https://monitoring.us-west-2.amazonaws.com", V2Signature}, "https://autoscaling.us-west-2.amazonaws.com", @@ -105,6 +109,7 @@ var EUWest = Region{ "https://email.eu-west-1.amazonaws.com", "https://iam.amazonaws.com", "https://elasticloadbalancing.eu-west-1.amazonaws.com", + "https://kms.eu-west-1.amazonaws.com", "https://dynamodb.eu-west-1.amazonaws.com", ServiceInfo{"https://monitoring.eu-west-1.amazonaws.com", V2Signature}, "https://autoscaling.eu-west-1.amazonaws.com", @@ -128,6 +133,7 @@ var EUCentral = Region{ "", "https://iam.amazonaws.com", "https://elasticloadbalancing.eu-central-1.amazonaws.com", + "https://kms.eu-central-1.amazonaws.com", "https://dynamodb.eu-central-1.amazonaws.com", ServiceInfo{"https://monitoring.eu-central-1.amazonaws.com", V2Signature}, "https://autoscaling.eu-central-1.amazonaws.com", @@ -151,6 +157,7 @@ var APSoutheast = Region{ "", "https://iam.amazonaws.com", "https://elasticloadbalancing.ap-southeast-1.amazonaws.com", + "https://kms.ap-southeast-1.amazonaws.com", "https://dynamodb.ap-southeast-1.amazonaws.com", ServiceInfo{"https://monitoring.ap-southeast-1.amazonaws.com", V2Signature}, "https://autoscaling.ap-southeast-1.amazonaws.com", @@ -174,6 +181,7 @@ var APSoutheast2 = Region{ "", "https://iam.amazonaws.com", "https://elasticloadbalancing.ap-southeast-2.amazonaws.com", + "https://kms.ap-southeast-2.amazonaws.com", "https://dynamodb.ap-southeast-2.amazonaws.com", ServiceInfo{"https://monitoring.ap-southeast-2.amazonaws.com", V2Signature}, "https://autoscaling.ap-southeast-2.amazonaws.com", @@ -197,6 +205,7 @@ var APNortheast = Region{ "", "https://iam.amazonaws.com", "https://elasticloadbalancing.ap-northeast-1.amazonaws.com", + "https://kms.ap-northeast-1.amazonaws.com", "https://dynamodb.ap-northeast-1.amazonaws.com", ServiceInfo{"https://monitoring.ap-northeast-1.amazonaws.com", V2Signature}, "https://autoscaling.ap-northeast-1.amazonaws.com", @@ -220,6 +229,7 @@ var SAEast = Region{ "", "https://iam.amazonaws.com", "https://elasticloadbalancing.sa-east-1.amazonaws.com", + "https://kms.sa-east-1.amazonaws.com", "https://dynamodb.sa-east-1.amazonaws.com", ServiceInfo{"https://monitoring.sa-east-1.amazonaws.com", V2Signature}, "https://autoscaling.sa-east-1.amazonaws.com", @@ -229,3 +239,27 @@ var SAEast = Region{ "https://cloudformation.sa-east-1.amazonaws.com", "https://elasticache.sa-east-1.amazonaws.com", } + +var CNNorth1 = Region{ + "cn-north-1", + "https://ec2.cn-north-1.amazonaws.com.cn", + "https://s3.cn-north-1.amazonaws.com.cn", + "", + true, + true, + "", + "https://sns.cn-north-1.amazonaws.com.cn", + "https://sqs.cn-north-1.amazonaws.com.cn", + "", + "https://iam.cn-north-1.amazonaws.com.cn", + "https://elasticloadbalancing.cn-north-1.amazonaws.com.cn", + "", + "https://dynamodb.cn-north-1.amazonaws.com.cn", + ServiceInfo{"https://monitoring.cn-north-1.amazonaws.com.cn", V4Signature}, + "https://autoscaling.cn-north-1.amazonaws.com.cn", + ServiceInfo{"https://rds.cn-north-1.amazonaws.com.cn", V4Signature}, + "", + "https://sts.cn-north-1.amazonaws.com.cn", + "", + "", +} diff --git a/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3.go b/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3.go index 18313c28..69b2e071 100644 --- a/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3.go +++ b/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3.go @@ -25,6 +25,7 @@ import ( "net/http" "net/http/httputil" "net/url" + "path" "strconv" "strings" "time" @@ -70,9 +71,8 @@ type Options struct { ContentMD5 string ContentDisposition string Range string + StorageClass StorageClass // What else? - //// The following become headers so they are []strings rather than strings... I think - // x-amz-storage-class []string } type CopyOptions struct { @@ -96,7 +96,7 @@ var attempts = aws.AttemptStrategy{ // New creates a new S3. func New(auth aws.Auth, region aws.Region) *S3 { - return &S3{auth, region, 0, 0, 0, aws.V2Signature} + return &S3{auth, region, 0, 0, aws.V2Signature, 0} } // Bucket returns a Bucket with the given name. @@ -164,6 +164,13 @@ const ( BucketOwnerFull = ACL("bucket-owner-full-control") ) +type StorageClass string + +const ( + ReducedRedundancy = StorageClass("REDUCED_REDUNDANCY") + StandardStorage = StorageClass("STANDARD") +) + // PutBucket creates a new bucket. // // See http://goo.gl/ndjnR for details. @@ -401,6 +408,10 @@ func (o Options) addHeaders(headers map[string][]string) { if len(o.ContentDisposition) != 0 { headers["Content-Disposition"] = []string{o.ContentDisposition} } + if len(o.StorageClass) != 0 { + headers["x-amz-storage-class"] = []string{string(o.StorageClass)} + + } for k, v := range o.Meta { headers["x-amz-meta-"+k] = v } @@ -816,8 +827,8 @@ func (b *Bucket) SignedURLWithMethod(method, path string, expires time.Time, par // UploadSignedURL returns a signed URL that allows anyone holding the URL // to upload the object at path. The signature is valid until expires. // contenttype is a string like image/png -// path is the resource name in s3 terminalogy like images/ali.png [obviously exclusing the bucket name itself] -func (b *Bucket) UploadSignedURL(path, method, content_type string, expires time.Time) string { +// name is the resource name in s3 terminology like images/ali.png [obviously excluding the bucket name itself] +func (b *Bucket) UploadSignedURL(name, method, content_type string, expires time.Time) string { expire_date := expires.Unix() if method != "POST" { method = "PUT" @@ -830,7 +841,7 @@ func (b *Bucket) UploadSignedURL(path, method, content_type string, expires time tokenData = "x-amz-security-token:" + a.Token() + "\n" } - stringToSign := method + "\n\n" + content_type + "\n" + strconv.FormatInt(expire_date, 10) + "\n" + tokenData + "/" + b.Name + "/" + path + stringToSign := method + "\n\n" + content_type + "\n" + strconv.FormatInt(expire_date, 10) + "\n" + tokenData + "/" + path.Join(b.Name, name) secretKey := a.SecretKey accessId := a.AccessKey mac := hmac.New(sha1.New, []byte(secretKey)) @@ -844,7 +855,7 @@ func (b *Bucket) UploadSignedURL(path, method, content_type string, expires time log.Println("ERROR sining url for S3 upload", err) return "" } - signedurl.Path += path + signedurl.Path = name params := url.Values{} params.Add("AWSAccessKeyId", accessId) params.Add("Expires", strconv.FormatInt(expire_date, 10)) diff --git a/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3_test.go b/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3_test.go index 87b23ad0..161bb3af 100644 --- a/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3_test.go +++ b/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3_test.go @@ -230,6 +230,22 @@ func (s *S) TestPutObject(c *check.C) { c.Assert(req.Header["X-Amz-Acl"], check.DeepEquals, []string{"private"}) } +func (s *S) TestPutObjectReducedRedundancy(c *check.C) { + testServer.Response(200, nil, "") + + b := s.s3.Bucket("bucket") + err := b.Put("name", []byte("content"), "content-type", s3.Private, s3.Options{StorageClass: s3.ReducedRedundancy}) + c.Assert(err, check.IsNil) + + req := testServer.WaitRequest() + c.Assert(req.Method, check.Equals, "PUT") + c.Assert(req.URL.Path, check.Equals, "/bucket/name") + c.Assert(req.Header["Date"], check.Not(check.DeepEquals), []string{""}) + c.Assert(req.Header["Content-Type"], check.DeepEquals, []string{"content-type"}) + c.Assert(req.Header["Content-Length"], check.DeepEquals, []string{"7"}) + c.Assert(req.Header["X-Amz-Storage-Class"], check.DeepEquals, []string{"REDUCED_REDUNDANCY"}) +} + // PutCopy docs: http://goo.gl/mhEHtA func (s *S) TestPutCopy(c *check.C) { testServer.Response(200, nil, PutCopyResultDump) diff --git a/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3test/server.go b/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3test/server.go index 4dc95eae..d54a638c 100644 --- a/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3test/server.go +++ b/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3test/server.go @@ -11,6 +11,7 @@ import ( "io" "io/ioutil" "log" + "math/rand" "net" "net/http" "net/url" @@ -51,6 +52,10 @@ type Config struct { // all other regions. // http://docs.amazonwebservices.com/AmazonS3/latest/API/ErrorResponses.html Send409Conflict bool + + // Address on which to listen. By default, a random port is assigned by the + // operating system and the server listens on localhost. + ListenAddress string } func (c *Config) send409Conflict() bool { @@ -72,10 +77,11 @@ type Server struct { } type bucket struct { - name string - acl s3.ACL - ctime time.Time - objects map[string]*object + name string + acl s3.ACL + ctime time.Time + objects map[string]*object + multipartUploads map[string][]*multipartUploadPart } type object struct { @@ -86,6 +92,12 @@ type object struct { data []byte } +type multipartUploadPart struct { + data []byte + etag string + lastModified time.Time +} + // A resource encapsulates the subject of an HTTP request. // The resource referred to may or may not exist // when the request is made. @@ -97,7 +109,13 @@ type resource interface { } func NewServer(config *Config) (*Server, error) { - l, err := net.Listen("tcp", "localhost:0") + listenAddress := "localhost:0" + + if config != nil && config.ListenAddress != "" { + listenAddress = config.ListenAddress + } + + l, err := net.Listen("tcp", listenAddress) if err != nil { return nil, fmt.Errorf("cannot listen on localhost: %v", err) } @@ -217,10 +235,8 @@ var unimplementedBucketResourceNames = map[string]bool{ } var unimplementedObjectResourceNames = map[string]bool{ - "uploadId": true, - "acl": true, - "torrent": true, - "uploads": true, + "acl": true, + "torrent": true, } var pathRegexp = regexp.MustCompile("/(([^/]+)(/(.*))?)?") @@ -420,7 +436,8 @@ func (r bucketResource) put(a *action) interface{} { r.bucket = &bucket{ name: r.name, // TODO default acl - objects: make(map[string]*object), + objects: make(map[string]*object), + multipartUploads: make(map[string][]*multipartUploadPart), } a.srv.buckets[r.name] = r.bucket created = true @@ -615,12 +632,29 @@ func (objr objectResource) put(a *action) interface{} { // TODO x-amz-server-side-encryption // TODO x-amz-storage-class - // TODO is this correct, or should we erase all previous metadata? - obj := objr.object - if obj == nil { - obj = &object{ - name: objr.name, - meta: make(http.Header), + uploadId := a.req.URL.Query().Get("uploadId") + + // Check that the upload ID is valid if this is a multipart upload + if uploadId != "" { + if _, ok := objr.bucket.multipartUploads[uploadId]; !ok { + fatalf(404, "NoSuchUpload", "The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed.") + } + + partNumberStr := a.req.URL.Query().Get("partNumber") + + if partNumberStr == "" { + fatalf(400, "InvalidRequest", "Missing partNumber parameter") + } + + partNumber, err := strconv.ParseUint(partNumberStr, 10, 32) + + if err != nil { + fatalf(400, "InvalidRequest", "partNumber is not a number") + } + + // Parts are 1-indexed for multipart uploads + if uint(partNumber)-1 != uint(len(objr.bucket.multipartUploads[uploadId])) { + fatalf(400, "InvalidRequest", "Invalid part number") } } @@ -646,26 +680,170 @@ func (objr objectResource) put(a *action) interface{} { fatalf(400, "IncompleteBody", "You did not provide the number of bytes specified by the Content-Length HTTP header") } - // PUT request has been successful - save data and metadata - for key, values := range a.req.Header { - key = http.CanonicalHeaderKey(key) - if metaHeaders[key] || strings.HasPrefix(key, "X-Amz-Meta-") { - obj.meta[key] = values + etag := fmt.Sprintf("\"%x\"", gotHash) + + a.w.Header().Add("ETag", etag) + + if uploadId == "" { + // For traditional uploads + + // TODO is this correct, or should we erase all previous metadata? + obj := objr.object + if obj == nil { + obj = &object{ + name: objr.name, + meta: make(http.Header), + } } + + // PUT request has been successful - save data and metadata + for key, values := range a.req.Header { + key = http.CanonicalHeaderKey(key) + if metaHeaders[key] || strings.HasPrefix(key, "X-Amz-Meta-") { + obj.meta[key] = values + } + } + obj.data = data + obj.checksum = gotHash + obj.mtime = time.Now() + objr.bucket.objects[objr.name] = obj + } else { + // For multipart commit + + parts := objr.bucket.multipartUploads[uploadId] + part := &multipartUploadPart{ + data, + etag, + time.Now(), + } + + objr.bucket.multipartUploads[uploadId] = append(parts, part) } - obj.data = data - obj.checksum = gotHash - obj.mtime = time.Now() - objr.bucket.objects[objr.name] = obj + return nil } func (objr objectResource) delete(a *action) interface{} { - delete(objr.bucket.objects, objr.name) + uploadId := a.req.URL.Query().Get("uploadId") + + if uploadId == "" { + // Traditional object delete + delete(objr.bucket.objects, objr.name) + } else { + // Multipart commit abort + _, ok := objr.bucket.multipartUploads[uploadId] + + if !ok { + fatalf(404, "NoSuchUpload", "The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed.") + } + + delete(objr.bucket.multipartUploads, uploadId) + } return nil } func (objr objectResource) post(a *action) interface{} { + // Check if we're initializing a multipart upload + if _, ok := a.req.URL.Query()["uploads"]; ok { + type multipartInitResponse struct { + XMLName struct{} `xml:"InitiateMultipartUploadResult"` + Bucket string + Key string + UploadId string + } + + uploadId := strconv.FormatInt(rand.Int63(), 16) + + objr.bucket.multipartUploads[uploadId] = []*multipartUploadPart{} + + return &multipartInitResponse{ + Bucket: objr.bucket.name, + Key: objr.name, + UploadId: uploadId, + } + } + + // Check if we're completing a multipart upload + if uploadId := a.req.URL.Query().Get("uploadId"); uploadId != "" { + type multipartCompleteRequestPart struct { + XMLName struct{} `xml:"Part"` + PartNumber uint + ETag string + } + + type multipartCompleteRequest struct { + XMLName struct{} `xml:"CompleteMultipartUpload"` + Part []multipartCompleteRequestPart + } + + type multipartCompleteResponse struct { + XMLName struct{} `xml:"CompleteMultipartUploadResult"` + Location string + Bucket string + Key string + ETag string + } + + parts, ok := objr.bucket.multipartUploads[uploadId] + + if !ok { + fatalf(404, "NoSuchUpload", "The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed.") + } + + req := &multipartCompleteRequest{} + + if err := xml.NewDecoder(a.req.Body).Decode(req); err != nil { + fatalf(400, "InvalidRequest", err.Error()) + } + + if len(req.Part) != len(parts) { + fatalf(400, "InvalidRequest", fmt.Sprintf("Number of parts does not match: expected %d, received %d", len(parts), len(req.Part))) + } + + sum := md5.New() + data := &bytes.Buffer{} + w := io.MultiWriter(sum, data) + + for i, p := range parts { + reqPart := req.Part[i] + + if reqPart.PartNumber != uint(1+i) { + fatalf(400, "InvalidRequest", "Bad part number") + } + + if reqPart.ETag != p.etag { + fatalf(400, "InvalidRequest", fmt.Sprintf("Invalid etag for part %d", reqPart.PartNumber)) + } + + w.Write(p.data) + } + + delete(objr.bucket.multipartUploads, uploadId) + + obj := objr.object + + if obj == nil { + obj = &object{ + name: objr.name, + meta: make(http.Header), + } + } + + obj.data = data.Bytes() + obj.checksum = sum.Sum(nil) + obj.mtime = time.Now() + objr.bucket.objects[objr.name] = obj + + objectLocation := fmt.Sprintf("http://%s/%s/%s", a.srv.listener.Addr().String(), objr.bucket.name, objr.name) + + return &multipartCompleteResponse{ + Location: objectLocation, + Bucket: objr.bucket.name, + Key: objr.name, + ETag: uploadId, + } + } + fatalf(400, "MethodNotAllowed", "The specified method is not allowed against this resource") return nil } diff --git a/circle.yml b/circle.yml index 963b6027..b841cdec 100644 --- a/circle.yml +++ b/circle.yml @@ -6,8 +6,8 @@ machine: post: # Install many go versions - - gvm install go1.3.3 -B --name=old - - gvm install go1.4 -B --name=stable + # - gvm install go1.3.3 -B --name=old + - gvm install go1.4.2 -B --name=stable # - gvm install tip --name=bleed environment: @@ -28,10 +28,10 @@ machine: dependencies: pre: # Copy the code to the gopath of all go versions - - > - gvm use old && - mkdir -p "$(dirname $BASE_OLD)" && - cp -R "$CHECKOUT" "$BASE_OLD" + # - > + # gvm use old && + # mkdir -p "$(dirname $BASE_OLD)" && + # cp -R "$CHECKOUT" "$BASE_OLD" - > gvm use stable && @@ -45,8 +45,8 @@ dependencies: override: # Install dependencies for every copied clone/go version - - gvm use old && go get github.com/tools/godep: - pwd: $BASE_OLD + # - gvm use old && go get github.com/tools/godep: + # pwd: $BASE_OLD - gvm use stable && go get github.com/tools/godep: pwd: $BASE_STABLE @@ -63,7 +63,7 @@ dependencies: test: pre: # Output the go versions we are going to test - - gvm use old && go version + # - gvm use old && go version - gvm use stable && go version # - gvm use bleed && go version @@ -81,9 +81,9 @@ test: override: # Test every version we have (but stable) - - gvm use old; godep go test -test.v -test.short ./...: - timeout: 600 - pwd: $BASE_OLD + # - gvm use old; godep go test -test.v -test.short ./...: + # timeout: 600 + # pwd: $BASE_OLD # - gvm use bleed; go test -test.v -test.short ./...: # timeout: 600 @@ -103,10 +103,11 @@ test: # Aggregate and report to coveralls - gvm use stable; go list ./... | xargs -L 1 -I{} cat "$GOPATH/src/{}/coverage.out" | grep -v "$CIRCLE_PAIN" >> ~/goverage.report: pwd: $BASE_STABLE - - gvm use stable; goveralls -service circleci -coverprofile=/home/ubuntu/goverage.report -repotoken $COVERALLS_TOKEN: - pwd: $BASE_STABLE +# - gvm use stable; goveralls -service circleci -coverprofile=/home/ubuntu/goverage.report -repotoken $COVERALLS_TOKEN: +# pwd: $BASE_STABLE ## Notes + # Disabled coveralls reporting: build breaking sending coverage data to coveralls # Disabled the -race detector due to massive memory usage. # Do we want these as well? # - go get code.google.com/p/go.tools/cmd/goimports diff --git a/cmd/registry/config.yml b/cmd/registry/config.yml index 5dd39cb3..b1a8f48d 100644 --- a/cmd/registry/config.yml +++ b/cmd/registry/config.yml @@ -9,6 +9,9 @@ storage: layerinfo: inmemory filesystem: rootdirectory: /tmp/registry-dev + maintenance: + uploadpurging: + enabled: false http: addr: :5000 secret: asecretforlocaldevelopment @@ -39,3 +42,4 @@ notifications: threshold: 10 backoff: 1s disabled: true + \ No newline at end of file diff --git a/configuration/configuration.go b/configuration/configuration.go index 3d302e1c..074471b4 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -188,6 +188,8 @@ func (storage Storage) Type() string { // Return only key in this map for k := range storage { switch k { + case "maintenance": + // allow configuration of maintenance case "cache": // allow configuration of caching default: @@ -217,6 +219,8 @@ func (storage *Storage) UnmarshalYAML(unmarshal func(interface{}) error) error { types := make([]string, 0, len(storageMap)) for k := range storageMap { switch k { + case "maintenance": + // allow for configuration of maintenance case "cache": // allow configuration of caching default: diff --git a/context/http.go b/context/http.go index 98ab436d..91bcda95 100644 --- a/context/http.go +++ b/context/http.go @@ -302,7 +302,7 @@ func (irw *instrumentedResponseWriter) Flush() { func (irw *instrumentedResponseWriter) Value(key interface{}) interface{} { if keyStr, ok := key.(string); ok { if keyStr == "http.response" { - return irw.ResponseWriter + return irw } if !strings.HasPrefix(keyStr, "http.response.") { @@ -322,9 +322,7 @@ func (irw *instrumentedResponseWriter) Value(key interface{}) interface{} { case "written": return irw.written case "status": - if irw.status != 0 { - return irw.status - } + return irw.status case "contenttype": contentType := irw.Header().Get("Content-Type") if contentType != "" { diff --git a/context/http_test.go b/context/http_test.go index 42c78b75..3d4b3c8e 100644 --- a/context/http_test.go +++ b/context/http_test.go @@ -132,8 +132,21 @@ func TestWithResponseWriter(t *testing.T) { trw := testResponseWriter{} ctx, rw := WithResponseWriter(Background(), &trw) - if ctx.Value("http.response") != &trw { - t.Fatalf("response not available in context: %v != %v", ctx.Value("http.response"), &trw) + if ctx.Value("http.response") != rw { + t.Fatalf("response not available in context: %v != %v", ctx.Value("http.response"), rw) + } + + grw, err := GetResponseWriter(ctx) + if err != nil { + t.Fatalf("error getting response writer: %v", err) + } + + if grw != rw { + t.Fatalf("unexpected response writer returned: %#v != %#v", grw, rw) + } + + if ctx.Value("http.response.status") != 0 { + t.Fatalf("response status should always be a number and should be zero here: %v != 0", ctx.Value("http.response.status")) } if n, err := rw.Write(make([]byte, 1024)); err != nil { diff --git a/contrib/apache/README.MD b/contrib/apache/README.MD new file mode 100644 index 00000000..f7e14b5b --- /dev/null +++ b/contrib/apache/README.MD @@ -0,0 +1,36 @@ +# Apache HTTPd sample for Registry v1, v2 and mirror + +3 containers involved + +* Docker Registry v1 (registry 0.9.1) +* Docker Registry v2 (registry 2.0.0) +* Docker Registry v1 in mirror mode + +HTTP for mirror and HTTPS for v1 & v2 + +* http://registry.example.com proxify Docker Registry 1.0 in Mirror mode +* https://registry.example.com proxify Docker Registry 1.0 or 2.0 in Hosting mode + +## 3 Docker containers should be started + +* Docker Registry 1.0 in Mirror mode : port 5001 +* Docker Registry 1.0 in Hosting mode : port 5000 +* Docker Registry 2.0 in Hosting mode : port 5002 + +### Registry v1 + + docker run -d -e SETTINGS_FLAVOR=dev -v /var/lib/docker-registry/storage/hosting-v1:/tmp -p 5000:5000 registry:0.9.1" + +### Mirror + + docker run -d -e SETTINGS_FLAVOR=dev -e STANDALONE=false -e MIRROR_SOURCE=https://registry-1.docker.io -e MIRROR_SOURCE_INDEX=https://index.docker.io \ + -e MIRROR_TAGS_CACHE_TTL=172800 -v /var/lib/docker-registry/storage/mirror:/tmp -p 5001:5000 registry:0.9.1" + +### Registry v2 + + docker run -d -e SETTINGS_FLAVOR=dev -v /var/lib/axway/docker-registry/storage/hosting2-v2:/tmp -p 5002:5000 registry:2.0" + +# For Hosting mode access + +* users should have account (valid-user) to be able to fetch images +* only users using account docker-deployer will be allowed to push images diff --git a/contrib/apache/apache.conf b/contrib/apache/apache.conf new file mode 100644 index 00000000..b7581fd1 --- /dev/null +++ b/contrib/apache/apache.conf @@ -0,0 +1,127 @@ +# +# Sample Apache 2.x configuration where : +# + + + + ServerName registry.example.com + ServerAlias www.registry.example.com + + ProxyRequests off + ProxyPreserveHost on + + # no proxy for /error/ (Apache HTTPd errors messages) + ProxyPass /error/ ! + + ProxyPass /_ping http://localhost:5001/_ping + ProxyPassReverse /_ping http://localhost:5001/_ping + + ProxyPass /v1 http://localhost:5001/v1 + ProxyPassReverse /v1 http://localhost:5001/v1 + + # Logs + ErrorLog ${APACHE_LOG_DIR}/mirror_error_log + CustomLog ${APACHE_LOG_DIR}/mirror_access_log combined env=!dontlog + + + + + + + ServerName registry.example.com + ServerAlias www.registry.example.com + + SSLEngine on + SSLCertificateFile /etc/apache2/ssl/registry.example.com.crt + SSLCertificateKeyFile /etc/apache2/ssl/registry.example.com.key + + # Higher Strength SSL Ciphers + SSLProtocol all -SSLv2 -SSLv3 -TLSv1 + SSLCipherSuite RC4-SHA:HIGH + SSLHonorCipherOrder on + + # Logs + ErrorLog ${APACHE_LOG_DIR}/registry_error_ssl_log + CustomLog ${APACHE_LOG_DIR}/registry_access_ssl_log combined env=!dontlog + + Header set Host "registry.example.com" + Header set "Docker-Distribution-Api-Version" "registry/2.0" + RequestHeader set X-Forwarded-Proto "https" + + ProxyRequests off + ProxyPreserveHost on + + # no proxy for /error/ (Apache HTTPd errors messages) + ProxyPass /error/ ! + + # + # Registry v1 + # + + ProxyPass /v1 http://localhost:5000/v1 + ProxyPassReverse /v1 http://localhost:5000/v1 + + ProxyPass /_ping http://localhost:5000/_ping + ProxyPassReverse /_ping http://localhost:5000/_ping + + # Authentication require for push + + Order deny,allow + Allow from all + AuthName "Registry Authentication" + AuthType basic + AuthUserFile "/etc/apache2/htpasswd/registry-htpasswd" + + # Read access to authentified users + + Require valid-user + + + # Write access to docker-deployer account only + + Require user docker-deployer + + + + + # Allow ping to run unauthenticated. + + Satisfy any + Allow from all + + + # Allow ping to run unauthenticated. + + Satisfy any + Allow from all + + + # + # Registry v2 + # + + ProxyPass /v2 http://localhost:5002/v2 + ProxyPassReverse /v2 http://localhost:5002/v2 + + + Order deny,allow + Allow from all + AuthName "Registry Authentication" + AuthType basic + AuthUserFile "/etc/apache2/htpasswd/registry-htpasswd" + + # Read access to authentified users + + Require valid-user + + + # Write access to docker-deployer only + + Require user docker-deployer + + + + + + + diff --git a/digest/tarsum.go b/digest/tarsum.go index acf878b6..702d7dc3 100644 --- a/digest/tarsum.go +++ b/digest/tarsum.go @@ -6,10 +6,10 @@ import ( "regexp" ) -// TarSumRegexp defines a reguler expression to match tarsum identifiers. +// TarSumRegexp defines a regular expression to match tarsum identifiers. var TarsumRegexp = regexp.MustCompile("tarsum(?:.[a-z0-9]+)?\\+[a-zA-Z0-9]+:[A-Fa-f0-9]+") -// TarsumRegexpCapturing defines a reguler expression to match tarsum identifiers with +// TarsumRegexpCapturing defines a regular expression to match tarsum identifiers with // capture groups corresponding to each component. var TarsumRegexpCapturing = regexp.MustCompile("(tarsum)(.([a-z0-9]+))?\\+([a-zA-Z0-9]+):([A-Fa-f0-9]+)") diff --git a/doc.go b/doc.go index 9b981f7d..bdd8cb70 100644 --- a/doc.go +++ b/doc.go @@ -2,6 +2,6 @@ // docker distribution. The goal is to allow users to reliably package, ship // and store content related to docker images. // -// This is currently a work in progress. More details are availalbe in the +// This is currently a work in progress. More details are available in the // README.md. package distribution diff --git a/docs/configuration.md b/docs/configuration.md index b3105b3f..114eefc8 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -57,6 +57,12 @@ storage: rootdirectory: /s3/object/name/prefix cache: layerinfo: inmemory + maintenance: + uploadpurging: + enabled: true + age: 168h + interval: 24h + dryrun: false auth: silly: realm: silly-realm @@ -130,6 +136,34 @@ options marked as **required**. This indicates that you can omit the parent with all its children. However, if the parent is included, you must also include all the children marked **required**. +## Override configuration options + +You can use environment variables to override most configuration parameters. The +exception is the `version` variable which cannot be overridden. You can set +environment variables on the command line using the `-e` flag on `docker run` or +from within a Dockerfile using the `ENV` instruction. + +To override a configuration option, create an environment variable named +`REGISTRY\variable_` where *`variable`* is the name of the configuration option +and the `_` (underscore) represents indention levels. For example, you can +configure the `rootdirectory` of the `filesystem` storage backend: + +``` +storage: + filesystem: + rootdirectory: /tmp/registry +``` + +To override this value, set an environment variable like this: + +``` +REGISTRY_STORAGE_FILESYSTEM_ROOTDIRECTORY=/tmp/registry/test +``` + +This variable overrides the `/tmp/registry` value to the `/tmp/registry/test` +directory. + + ## version ```yaml @@ -235,6 +269,12 @@ storage: rootdirectory: /s3/object/name/prefix cache: layerinfo: inmemory + maintenance: + uploadpurging: + enabled: true + age: 168h + interval: 24h + dryrun: false ``` The storage option is **required** and defines which storage backend is in use. @@ -424,6 +464,27 @@ This storage backend uses Amazon's Simple Storage Service (S3). +### Maintenance + +Currently the registry can perform one maintenance function: upload purging. This and future +maintenance functions which are related to storage can be configured under the maintenance section. + +### Upload Purging + +Upload purging is a background process that periodically removes orphaned files from the upload +directories of the registry. Upload purging is enabled by default. To + configure upload directory purging, the following parameters +must be set. + + +| Parameter | Required | Description + --------- | -------- | ----------- +`enabled` | yes | Set to true to enable upload purging. Default=true. | +`age` | yes | Upload directories which are older than this age will be deleted. Default=168h (1 week) +`interval` | yes | The interval between upload directory purging. Default=24h. +`dryrun` | yes | dryrun can be set to true to obtain a summary of what directories will be deleted. Default=false. + +Note: `age` and `interval` are strings containing a number with optional fraction and a unit suffix: e.g. 45m, 2h10m, 168h (1 week). ## auth @@ -1153,7 +1214,8 @@ Configure the behavior of the Redis connection pool. - + + ## Example: Development configuration The following is a simple example you can use for local development: diff --git a/docs/deploying.md b/docs/deploying.md index 7aa5b905..10bf6b81 100644 --- a/docs/deploying.md +++ b/docs/deploying.md @@ -208,6 +208,9 @@ storage: layerinfo: inmemory filesystem: rootdirectory: /tmp/registry-dev + maintenance: + uploadpurging: + enabled: false http: addr: :5000 secret: asecretforlocaldevelopment diff --git a/docs/osx-setup-guide.md b/docs/osx-setup-guide.md new file mode 100644 index 00000000..cd9a2429 --- /dev/null +++ b/docs/osx-setup-guide.md @@ -0,0 +1,56 @@ +# OS X Setup Guide + +This guide will walk you through running the new Go based [Docker registry](https://github.com/docker/distribution) on your local OS X machine. + +## Checkout the Docker Distribution source tree + +``` +mkdir -p $GOPATH/src/github.com/docker +git clone https://github.com/docker/distribution.git $GOPATH/src/github.com/docker/distribution +cd $GOPATH/src/github.com/docker/distribution +``` + +## Build the registry binary + +``` +GOPATH=$(PWD)/Godeps/_workspace:$GOPATH make binaries +sudo cp bin/registry /usr/local/libexec/registry +``` + +## Setup + +Copy the registry configuration file in place: + +``` +mkdir /Users/Shared/Registry +cp docs/osx/config.yml /Users/Shared/Registry/config.yml +``` + +## Running the Docker Registry under launchd + +Copy the Docker registry plist into place: + +``` +plutil -lint docs/osx/com.docker.registry.plist +cp docs/osx/com.docker.registry.plist ~/Library/LaunchAgents/ +chmod 644 ~/Library/LaunchAgents/com.docker.registry.plist +``` + +Start the Docker registry: + +``` +launchctl load ~/Library/LaunchAgents/com.docker.registry.plist +``` + +### Restarting the docker registry service + +``` +launchctl stop com.docker.registry +launchctl start com.docker.registry +``` + +### Unloading the docker registry service + +``` +launchctl unload ~/Library/LaunchAgents/com.docker.registry.plist +``` diff --git a/docs/osx/com.docker.registry.plist b/docs/osx/com.docker.registry.plist new file mode 100644 index 00000000..0982349f --- /dev/null +++ b/docs/osx/com.docker.registry.plist @@ -0,0 +1,42 @@ + + + + + Label + com.docker.registry + KeepAlive + + StandardErrorPath + /Users/Shared/Registry/registry.log + StandardOutPath + /Users/Shared/Registry/registry.log + Program + /usr/local/libexec/registry + ProgramArguments + + /usr/local/libexec/registry + /Users/Shared/Registry/config.yml + + Sockets + + http-listen-address + + SockServiceName + 5000 + SockType + dgram + SockFamily + IPv4 + + http-debug-address + + SockServiceName + 5001 + SockType + dgram + SockFamily + IPv4 + + + + diff --git a/docs/osx/config.yml b/docs/osx/config.yml new file mode 100644 index 00000000..7c19e5f0 --- /dev/null +++ b/docs/osx/config.yml @@ -0,0 +1,16 @@ +version: 0.1 +log: + level: info + fields: + service: registry + environment: macbook-air +storage: + cache: + layerinfo: inmemory + filesystem: + rootdirectory: /Users/Shared/Registry +http: + addr: 0.0.0.0:5000 + secret: mytokensecret + debug: + addr: localhost:5001 diff --git a/docs/spec/api.md b/docs/spec/api.md index 2c3acabf..1d7540ed 100644 --- a/docs/spec/api.md +++ b/docs/spec/api.md @@ -995,7 +995,7 @@ Content-Type: application/json; charset=utf-8 "tag": , "fsLayers": [ { - "blobSum": + "blobSum": "" }, ... ] @@ -1126,7 +1126,7 @@ Content-Type: application/json; charset=utf-8 "tag": , "fsLayers": [ { - "blobSum": + "blobSum": "" }, ... ] @@ -1248,7 +1248,7 @@ Content-Type: application/json; charset=utf-8 "code": "BLOB_UNKNOWN", "message": "blob unknown to registry", "detail": { - "digest": + "digest": "" } }, ... @@ -1452,7 +1452,7 @@ The error codes that may be included in the response body are enumerated below: ### Blob -Fetch the blob identified by `name` and `digest`. Used to fetch layers by tarsum digest. +Fetch the blob identified by `name` and `digest`. Used to fetch layers by digest. @@ -1800,7 +1800,7 @@ Initiate a resumable blob upload. If successful, an upload location will be prov ##### Initiate Monolithic Blob Upload ``` -POST /v2//blobs/uploads/?digest= +POST /v2//blobs/uploads/?digest= Host: Authorization: Content-Length: @@ -2347,7 +2347,7 @@ Complete the upload specified by `uuid`, optionally appending the body as the fi ``` -PUT /v2//blobs/uploads/?digest= +PUT /v2//blobs/uploads/?digest= Host: Authorization: Content-Range: - diff --git a/notifications/http.go b/notifications/http.go index 15b3574c..465434f1 100644 --- a/notifications/http.go +++ b/notifications/http.go @@ -53,6 +53,7 @@ type httpStatusListener interface { func (hs *httpSink) Write(events ...Event) error { hs.mu.Lock() defer hs.mu.Unlock() + defer hs.client.Transport.(*headerRoundTripper).CloseIdleConnections() if hs.closed { return ErrSinkClosed @@ -83,6 +84,7 @@ func (hs *httpSink) Write(events ...Event) error { return fmt.Errorf("%v: error posting: %v", hs, err) } + defer resp.Body.Close() // The notifier will treat any 2xx or 3xx response as accepted by the // endpoint. diff --git a/notifications/sinks.go b/notifications/sinks.go index 2bf63e2d..dda4a565 100644 --- a/notifications/sinks.go +++ b/notifications/sinks.go @@ -220,7 +220,7 @@ type retryingSink struct { sink Sink closed bool - // circuit breaker hueristics + // circuit breaker heuristics failures struct { threshold int recent int @@ -317,7 +317,7 @@ func (rs *retryingSink) wait(backoff time.Duration) { time.Sleep(backoff) } -// reset marks a succesful call. +// reset marks a successful call. func (rs *retryingSink) reset() { rs.failures.recent = 0 rs.failures.last = time.Time{} @@ -330,7 +330,7 @@ func (rs *retryingSink) failure() { } // proceed returns true if the call should proceed based on circuit breaker -// hueristics. +// heuristics. func (rs *retryingSink) proceed() bool { return rs.failures.recent < rs.failures.threshold || time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff)) diff --git a/registry/api/v2/descriptors.go b/registry/api/v2/descriptors.go index 833bff8b..0baa5ee7 100644 --- a/registry/api/v2/descriptors.go +++ b/registry/api/v2/descriptors.go @@ -135,7 +135,7 @@ const ( "tag": , "fsLayers": [ { - "blobSum": + "blobSum": "" }, ... ] @@ -606,7 +606,7 @@ var routeDescriptors = []RouteDescriptor{ "code": "BLOB_UNKNOWN", "message": "blob unknown to registry", "detail": { - "digest": + "digest": "" } }, ... @@ -712,7 +712,7 @@ var routeDescriptors = []RouteDescriptor{ Name: RouteNameBlob, Path: "/v2/{name:" + RepositoryNameRegexp.String() + "}/blobs/{digest:" + digest.DigestRegexp.String() + "}", Entity: "Blob", - Description: "Fetch the blob identified by `name` and `digest`. Used to fetch layers by tarsum digest.", + Description: "Fetch the blob identified by `name` and `digest`. Used to fetch layers by digest.", Methods: []MethodDescriptor{ { @@ -898,7 +898,7 @@ var routeDescriptors = []RouteDescriptor{ { Name: "digest", Type: "query", - Format: "", + Format: "", Regexp: digest.DigestRegexp, Description: `Digest of uploaded blob. If present, the upload will be completed, in a single request, with contents of the request body as the resulting blob.`, }, @@ -1173,7 +1173,7 @@ var routeDescriptors = []RouteDescriptor{ { Name: "digest", Type: "string", - Format: "", + Format: "", Regexp: digest.DigestRegexp, Required: true, Description: `Digest of uploaded blob.`, diff --git a/registry/api/v2/urls.go b/registry/api/v2/urls.go index 4b42dd16..60aad565 100644 --- a/registry/api/v2/urls.go +++ b/registry/api/v2/urls.go @@ -62,7 +62,12 @@ func NewURLBuilderFromRequest(r *http.Request) *URLBuilder { host := r.Host forwardedHost := r.Header.Get("X-Forwarded-Host") if len(forwardedHost) > 0 { - host = forwardedHost + // According to the Apache mod_proxy docs, X-Forwarded-Host can be a + // comma-separated list of hosts, to which each proxy appends the + // requested host. We want to grab the first from this comma-separated + // list. + hosts := strings.SplitN(forwardedHost, ",", 2) + host = strings.TrimSpace(hosts[0]) } basePath := routeDescriptorsMap[RouteNameBase].Path diff --git a/registry/api/v2/urls_test.go b/registry/api/v2/urls_test.go index 237d0f61..1113a7dd 100644 --- a/registry/api/v2/urls_test.go +++ b/registry/api/v2/urls_test.go @@ -151,6 +151,12 @@ func TestBuilderFromRequest(t *testing.T) { forwardedProtoHeader := make(http.Header, 1) forwardedProtoHeader.Set("X-Forwarded-Proto", "https") + forwardedHostHeader1 := make(http.Header, 1) + forwardedHostHeader1.Set("X-Forwarded-Host", "first.example.com") + + forwardedHostHeader2 := make(http.Header, 1) + forwardedHostHeader2.Set("X-Forwarded-Host", "first.example.com, proxy1.example.com") + testRequests := []struct { request *http.Request base string @@ -163,6 +169,14 @@ func TestBuilderFromRequest(t *testing.T) { request: &http.Request{URL: u, Host: u.Host, Header: forwardedProtoHeader}, base: "https://example.com", }, + { + request: &http.Request{URL: u, Host: u.Host, Header: forwardedHostHeader1}, + base: "http://first.example.com", + }, + { + request: &http.Request{URL: u, Host: u.Host, Header: forwardedHostHeader2}, + base: "http://first.example.com", + }, } for _, tr := range testRequests { diff --git a/registry/auth/auth.go b/registry/auth/auth.go index a8499342..ec82b469 100644 --- a/registry/auth/auth.go +++ b/registry/auth/auth.go @@ -3,7 +3,7 @@ // An access controller has a simple interface with a single `Authorized` // method which checks that a given request is authorized to perform one or // more actions on one or more resources. This method should return a non-nil -// error if the requset is not authorized. +// error if the request is not authorized. // // An implementation registers its access controller by name with a constructor // which accepts an options map for configuring the access controller. @@ -50,7 +50,7 @@ type Resource struct { } // Access describes a specific action that is -// requested or allowed for a given recource. +// requested or allowed for a given resource. type Access struct { Resource Action string diff --git a/registry/auth/token/util.go b/registry/auth/token/util.go index bf3e01e8..d7f95be4 100644 --- a/registry/auth/token/util.go +++ b/registry/auth/token/util.go @@ -7,7 +7,7 @@ import ( ) // joseBase64UrlEncode encodes the given data using the standard base64 url -// encoding format but with all trailing '=' characters ommitted in accordance +// encoding format but with all trailing '=' characters omitted in accordance // with the jose specification. // http://tools.ietf.org/html/draft-ietf-jose-json-web-signature-31#section-2 func joseBase64UrlEncode(b []byte) string { diff --git a/registry/handlers/api_test.go b/registry/handlers/api_test.go index ab8187c1..3dd7e6ec 100644 --- a/registry/handlers/api_test.go +++ b/registry/handlers/api_test.go @@ -93,7 +93,7 @@ func TestURLPrefix(t *testing.T) { } -// TestLayerAPI conducts a full of the of the layer api. +// TestLayerAPI conducts a full test of the of the layer api. func TestLayerAPI(t *testing.T) { // TODO(stevvooe): This test code is complete junk but it should cover the // complete flow. This must be broken down and checked against the @@ -246,6 +246,16 @@ func TestLayerAPI(t *testing.T) { t.Fatalf("response body did not pass verification") } + // ---------------- + // Fetch the layer with an invalid digest + badURL := strings.Replace(layerURL, "tarsum", "trsum", 1) + resp, err = http.Get(badURL) + if err != nil { + t.Fatalf("unexpected error fetching layer: %v", err) + } + + checkResponse(t, "fetching layer bad digest", resp, http.StatusBadRequest) + // Missing tests: // - Upload the same tarsum file under and different repository and // ensure the content remains uncorrupted. diff --git a/registry/handlers/app.go b/registry/handlers/app.go index 28940c8e..3cc360c6 100644 --- a/registry/handlers/app.go +++ b/registry/handlers/app.go @@ -81,7 +81,18 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App panic(err) } - startUploadPurger(app.driver, ctxu.GetLogger(app)) + purgeConfig := uploadPurgeDefaultConfig() + if mc, ok := configuration.Storage["maintenance"]; ok { + for k, v := range mc { + switch k { + case "uploadpurging": + purgeConfig = v.(map[interface{}]interface{}) + } + } + + } + + startUploadPurger(app.driver, ctxu.GetLogger(app), purgeConfig) app.driver, err = applyStorageMiddleware(app.driver, configuration.Middleware["storage"]) if err != nil { @@ -365,11 +376,25 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler { // future refactoring. w.WriteHeader(http.StatusBadRequest) } + app.logError(context, context.Errors) serveJSON(w, context.Errors) } }) } +func (app *App) logError(context context.Context, errors v2.Errors) { + for _, e := range errors.Errors { + c := ctxu.WithValue(context, "err.code", e.Code) + c = ctxu.WithValue(c, "err.message", e.Message) + c = ctxu.WithValue(c, "err.detail", e.Detail) + c = ctxu.WithLogger(c, ctxu.GetLogger(c, + "err.code", + "err.message", + "err.detail")) + ctxu.GetLogger(c).Errorf("An error occured") + } +} + // context constructs the context object for the application. This only be // called once per request. func (app *App) context(w http.ResponseWriter, r *http.Request) *Context { @@ -554,26 +579,82 @@ func applyStorageMiddleware(driver storagedriver.StorageDriver, middlewares []co return driver, nil } +// uploadPurgeDefaultConfig provides a default configuration for upload +// purging to be used in the absence of configuration in the +// confifuration file +func uploadPurgeDefaultConfig() map[interface{}]interface{} { + config := map[interface{}]interface{}{} + config["enabled"] = true + config["age"] = "168h" + config["interval"] = "24h" + config["dryrun"] = false + return config +} + +func badPurgeUploadConfig(reason string) { + panic(fmt.Sprintf("Unable to parse upload purge configuration: %s", reason)) +} + // startUploadPurger schedules a goroutine which will periodically // check upload directories for old files and delete them -func startUploadPurger(storageDriver storagedriver.StorageDriver, log ctxu.Logger) { - rand.Seed(time.Now().Unix()) - jitter := time.Duration(rand.Int()%60) * time.Minute +func startUploadPurger(storageDriver storagedriver.StorageDriver, log ctxu.Logger, config map[interface{}]interface{}) { + if config["enabled"] == false { + return + } - // Start with reasonable defaults - // TODO:(richardscothern) make configurable - purgeAge := time.Duration(7 * 24 * time.Hour) - timeBetweenPurges := time.Duration(1 * 24 * time.Hour) + var purgeAgeDuration time.Duration + var err error + purgeAge, ok := config["age"] + if ok { + ageStr, ok := purgeAge.(string) + if !ok { + badPurgeUploadConfig("age is not a string") + } + purgeAgeDuration, err = time.ParseDuration(ageStr) + if err != nil { + badPurgeUploadConfig(fmt.Sprintf("Cannot parse duration: %s", err.Error())) + } + } else { + badPurgeUploadConfig("age missing") + } + + var intervalDuration time.Duration + interval, ok := config["interval"] + if ok { + intervalStr, ok := interval.(string) + if !ok { + badPurgeUploadConfig("interval is not a string") + } + + intervalDuration, err = time.ParseDuration(intervalStr) + if err != nil { + badPurgeUploadConfig(fmt.Sprintf("Cannot parse interval: %s", err.Error())) + } + } else { + badPurgeUploadConfig("interval missing") + } + + var dryRunBool bool + dryRun, ok := config["dryrun"] + if ok { + dryRunBool, ok = dryRun.(bool) + if !ok { + badPurgeUploadConfig("cannot parse dryrun") + } + } else { + badPurgeUploadConfig("dryrun missing") + } go func() { + rand.Seed(time.Now().Unix()) + jitter := time.Duration(rand.Int()%60) * time.Minute log.Infof("Starting upload purge in %s", jitter) time.Sleep(jitter) for { - storage.PurgeUploads(storageDriver, time.Now().Add(-purgeAge), true) - log.Infof("Starting upload purge in %s", timeBetweenPurges) - time.Sleep(timeBetweenPurges) + storage.PurgeUploads(storageDriver, time.Now().Add(-purgeAgeDuration), !dryRunBool) + log.Infof("Starting upload purge in %s", intervalDuration) + time.Sleep(intervalDuration) } }() - } diff --git a/registry/handlers/layerupload.go b/registry/handlers/layerupload.go index b728d0e1..5cfa4554 100644 --- a/registry/handlers/layerupload.go +++ b/registry/handlers/layerupload.go @@ -198,7 +198,12 @@ func (luh *layerUploadHandler) PutLayerUploadComplete(w http.ResponseWriter, r * // may miss a root cause. // Read in the final chunk, if any. - io.Copy(luh.Upload, r.Body) + if _, err := io.Copy(luh.Upload, r.Body); err != nil { + ctxu.GetLogger(luh).Errorf("unknown error copying into upload: %v", err) + w.WriteHeader(http.StatusInternalServerError) + luh.Errors.Push(v2.ErrorCodeUnknown, err) + return + } layer, err := luh.Upload.Finish(dgst) if err != nil { diff --git a/registry/storage/driver/azure/azure.go b/registry/storage/driver/azure/azure.go index 1473f523..b985b7a9 100644 --- a/registry/storage/driver/azure/azure.go +++ b/registry/storage/driver/azure/azure.go @@ -94,6 +94,9 @@ func New(accountName, accountKey, container, realm string) (*Driver, error) { } // Implement the storagedriver.StorageDriver interface. +func (d *driver) Name() string { + return driverName +} // GetContent retrieves the content stored at "path" as a []byte. func (d *driver) GetContent(path string) ([]byte, error) { diff --git a/registry/storage/driver/base/base.go b/registry/storage/driver/base/base.go index ba7a859d..8fa747dd 100644 --- a/registry/storage/driver/base/base.go +++ b/registry/storage/driver/base/base.go @@ -34,7 +34,7 @@ // } // // The type now implements StorageDriver, proxying through Base, without -// exporting an unnessecary field. +// exporting an unnecessary field. package base import ( @@ -53,7 +53,7 @@ type Base struct { // GetContent wraps GetContent of underlying storage driver. func (base *Base) GetContent(path string) ([]byte, error) { _, done := context.WithTrace(context.Background()) - defer done("Base.GetContent") + defer done("%s.GetContent(%q)", base.Name(), path) if !storagedriver.PathRegexp.MatchString(path) { return nil, storagedriver.InvalidPathError{Path: path} @@ -65,7 +65,7 @@ func (base *Base) GetContent(path string) ([]byte, error) { // PutContent wraps PutContent of underlying storage driver. func (base *Base) PutContent(path string, content []byte) error { _, done := context.WithTrace(context.Background()) - defer done("Base.PutContent") + defer done("%s.PutContent(%q)", base.Name(), path) if !storagedriver.PathRegexp.MatchString(path) { return storagedriver.InvalidPathError{Path: path} @@ -77,7 +77,7 @@ func (base *Base) PutContent(path string, content []byte) error { // ReadStream wraps ReadStream of underlying storage driver. func (base *Base) ReadStream(path string, offset int64) (io.ReadCloser, error) { _, done := context.WithTrace(context.Background()) - defer done("Base.ReadStream") + defer done("%s.ReadStream(%q, %d)", base.Name(), path, offset) if offset < 0 { return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} @@ -93,7 +93,7 @@ func (base *Base) ReadStream(path string, offset int64) (io.ReadCloser, error) { // WriteStream wraps WriteStream of underlying storage driver. func (base *Base) WriteStream(path string, offset int64, reader io.Reader) (nn int64, err error) { _, done := context.WithTrace(context.Background()) - defer done("Base.WriteStream") + defer done("%s.WriteStream(%q, %d)", base.Name(), path, offset) if offset < 0 { return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset} @@ -109,7 +109,7 @@ func (base *Base) WriteStream(path string, offset int64, reader io.Reader) (nn i // Stat wraps Stat of underlying storage driver. func (base *Base) Stat(path string) (storagedriver.FileInfo, error) { _, done := context.WithTrace(context.Background()) - defer done("Base.Stat") + defer done("%s.Stat(%q)", base.Name(), path) if !storagedriver.PathRegexp.MatchString(path) { return nil, storagedriver.InvalidPathError{Path: path} @@ -121,7 +121,7 @@ func (base *Base) Stat(path string) (storagedriver.FileInfo, error) { // List wraps List of underlying storage driver. func (base *Base) List(path string) ([]string, error) { _, done := context.WithTrace(context.Background()) - defer done("Base.List") + defer done("%s.List(%q)", base.Name(), path) if !storagedriver.PathRegexp.MatchString(path) && path != "/" { return nil, storagedriver.InvalidPathError{Path: path} @@ -133,7 +133,7 @@ func (base *Base) List(path string) ([]string, error) { // Move wraps Move of underlying storage driver. func (base *Base) Move(sourcePath string, destPath string) error { _, done := context.WithTrace(context.Background()) - defer done("Base.Move") + defer done("%s.Move(%q, %q", base.Name(), sourcePath, destPath) if !storagedriver.PathRegexp.MatchString(sourcePath) { return storagedriver.InvalidPathError{Path: sourcePath} @@ -147,7 +147,7 @@ func (base *Base) Move(sourcePath string, destPath string) error { // Delete wraps Delete of underlying storage driver. func (base *Base) Delete(path string) error { _, done := context.WithTrace(context.Background()) - defer done("Base.Move") + defer done("%s.Delete(%q)", base.Name(), path) if !storagedriver.PathRegexp.MatchString(path) { return storagedriver.InvalidPathError{Path: path} @@ -159,7 +159,7 @@ func (base *Base) Delete(path string) error { // URLFor wraps URLFor of underlying storage driver. func (base *Base) URLFor(path string, options map[string]interface{}) (string, error) { _, done := context.WithTrace(context.Background()) - defer done("Base.URLFor") + defer done("%s.URLFor(%q)", base.Name(), path) if !storagedriver.PathRegexp.MatchString(path) { return "", storagedriver.InvalidPathError{Path: path} diff --git a/registry/storage/driver/filesystem/driver.go b/registry/storage/driver/filesystem/driver.go index 0e5aea75..9ffe0888 100644 --- a/registry/storage/driver/filesystem/driver.go +++ b/registry/storage/driver/filesystem/driver.go @@ -71,6 +71,10 @@ func New(rootDirectory string) *Driver { // Implement the storagedriver.StorageDriver interface +func (d *driver) Name() string { + return driverName +} + // GetContent retrieves the content stored at "path" as a []byte. func (d *driver) GetContent(path string) ([]byte, error) { rc, err := d.ReadStream(path, 0) diff --git a/registry/storage/driver/inmemory/driver.go b/registry/storage/driver/inmemory/driver.go index f2c9c3ff..e0694de2 100644 --- a/registry/storage/driver/inmemory/driver.go +++ b/registry/storage/driver/inmemory/driver.go @@ -64,6 +64,10 @@ func New() *Driver { // Implement the storagedriver.StorageDriver interface. +func (d *driver) Name() string { + return driverName +} + // GetContent retrieves the content stored at "path" as a []byte. func (d *driver) GetContent(path string) ([]byte, error) { d.mutex.RLock() diff --git a/registry/storage/driver/ipc/server.go b/registry/storage/driver/ipc/server.go index 4c6f1d4d..1752f12b 100644 --- a/registry/storage/driver/ipc/server.go +++ b/registry/storage/driver/ipc/server.go @@ -101,7 +101,7 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) { } case "ReadStream": path, _ := request.Parameters["Path"].(string) - // Depending on serialization method, Offset may be convereted to any int/uint type + // Depending on serialization method, Offset may be converted to any int/uint type offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int() reader, err := driver.ReadStream(path, offset) var response ReadStreamResponse @@ -116,9 +116,9 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) { } case "WriteStream": path, _ := request.Parameters["Path"].(string) - // Depending on serialization method, Offset may be convereted to any int/uint type + // Depending on serialization method, Offset may be converted to any int/uint type offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int() - // Depending on serialization method, Size may be convereted to any int/uint type + // Depending on serialization method, Size may be converted to any int/uint type size := reflect.ValueOf(request.Parameters["Size"]).Convert(reflect.TypeOf(int64(0))).Int() reader, _ := request.Parameters["Reader"].(io.ReadCloser) err := driver.WriteStream(path, offset, size, reader) diff --git a/registry/storage/driver/s3/s3.go b/registry/storage/driver/s3/s3.go index 402f2eaa..57871b5d 100644 --- a/registry/storage/driver/s3/s3.go +++ b/registry/storage/driver/s3/s3.go @@ -20,12 +20,15 @@ import ( "io" "io/ioutil" "net/http" + "reflect" "strconv" "strings" + "sync" "time" "github.com/AdRoll/goamz/aws" "github.com/AdRoll/goamz/s3" + "github.com/Sirupsen/logrus" storagedriver "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/base" "github.com/docker/distribution/registry/storage/driver/factory" @@ -72,6 +75,9 @@ type driver struct { ChunkSize int64 Encrypt bool RootDirectory string + + pool sync.Pool // pool []byte buffers used for WriteStream + zeros []byte // shared, zero-valued buffer used for WriteStream } type baseEmbed struct { @@ -148,9 +154,23 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) { chunkSize := int64(defaultChunkSize) chunkSizeParam, ok := parameters["chunksize"] if ok { - chunkSize, ok = chunkSizeParam.(int64) - if !ok || chunkSize < minChunkSize { - return nil, fmt.Errorf("The chunksize parameter should be a number that is larger than 5*1024*1024") + switch v := chunkSizeParam.(type) { + case string: + vv, err := strconv.ParseInt(v, 0, 64) + if err != nil { + return nil, fmt.Errorf("chunksize parameter must be an integer, %v invalid", chunkSizeParam) + } + chunkSize = vv + case int64: + chunkSize = v + case int, uint, int32, uint32, uint64: + chunkSize = reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int() + default: + return nil, fmt.Errorf("invalid valud for chunksize: %#v", chunkSizeParam) + } + + if chunkSize < minChunkSize { + return nil, fmt.Errorf("The chunksize %#v parameter should be a number that is larger than or equal to %d", chunkSize, minChunkSize) } } @@ -224,6 +244,11 @@ func New(params DriverParameters) (*Driver, error) { ChunkSize: params.ChunkSize, Encrypt: params.Encrypt, RootDirectory: params.RootDirectory, + zeros: make([]byte, params.ChunkSize), + } + + d.pool.New = func() interface{} { + return make([]byte, d.ChunkSize) } return &Driver{ @@ -237,6 +262,10 @@ func New(params DriverParameters) (*Driver, error) { // Implement the storagedriver.StorageDriver interface +func (d *driver) Name() string { + return driverName +} + // GetContent retrieves the content stored at "path" as a []byte. func (d *driver) GetContent(path string) ([]byte, error) { content, err := d.Bucket.Get(d.s3Path(path)) @@ -281,14 +310,14 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total var putErrChan chan error parts := []s3.Part{} var part s3.Part + done := make(chan struct{}) // stopgap to free up waiting goroutines multi, err := d.Bucket.InitMulti(d.s3Path(path), d.getContentType(), getPermissions(), d.getOptions()) if err != nil { return 0, err } - buf := make([]byte, d.ChunkSize) - zeroBuf := make([]byte, d.ChunkSize) + buf := d.getbuf() // We never want to leave a dangling multipart upload, our only consistent state is // when there is a whole object at path. This is in order to remain consistent with @@ -314,6 +343,9 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total } } } + + d.putbuf(buf) // needs to be here to pick up new buf value + close(done) // free up any waiting goroutines }() // Fills from 0 to total from current @@ -367,21 +399,77 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total } go func(bytesRead int, from int64, buf []byte) { - // parts and partNumber are safe, because this function is the only one modifying them and we - // force it to be executed serially. - if bytesRead > 0 { - part, putErr := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from])) - if putErr != nil { - putErrChan <- putErr + defer d.putbuf(buf) // this buffer gets dropped after this call + + // DRAGONS(stevvooe): There are few things one might want to know + // about this section. First, the putErrChan is expecting an error + // and a nil or just a nil to come through the channel. This is + // covered by the silly defer below. The other aspect is the s3 + // retry backoff to deal with RequestTimeout errors. Even though + // the underlying s3 library should handle it, it doesn't seem to + // be part of the shouldRetry function (see AdRoll/goamz/s3). + defer func() { + select { + case putErrChan <- nil: // for some reason, we do this no matter what. + case <-done: + return // ensure we don't leak the goroutine + } + }() + + if bytesRead <= 0 { + return + } + + var err error + var part s3.Part + + loop: + for retries := 0; retries < 5; retries++ { + part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from])) + if err == nil { + break // success! } - parts = append(parts, part) - partNumber++ + // NOTE(stevvooe): This retry code tries to only retry under + // conditions where the s3 package does not. We may add s3 + // error codes to the below if we see others bubble up in the + // application. Right now, the most troubling is + // RequestTimeout, which seems to only triggered when a tcp + // connection to s3 slows to a crawl. If the RequestTimeout + // ends up getting added to the s3 library and we don't see + // other errors, this retry loop can be removed. + switch err := err.(type) { + case *s3.Error: + switch err.Code { + case "RequestTimeout": + // allow retries on only this error. + default: + break loop + } + } + + backoff := 100 * time.Millisecond * time.Duration(retries+1) + logrus.Errorf("error putting part, retrying after %v: %v", err, backoff.String()) + time.Sleep(backoff) } - putErrChan <- nil + + if err != nil { + logrus.Errorf("error putting part, aborting: %v", err) + select { + case putErrChan <- err: + case <-done: + return // don't leak the goroutine + } + } + + // parts and partNumber are safe, because this function is the + // only one modifying them and we force it to be executed + // serially. + parts = append(parts, part) + partNumber++ }(bytesRead, from, buf) - buf = make([]byte, d.ChunkSize) + buf = d.getbuf() // use a new buffer for the next call return nil } @@ -429,7 +517,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total fromZeroFillSmall := func(from, to int64) error { bytesRead = 0 for from+int64(bytesRead) < to { - nn, err := bytes.NewReader(zeroBuf).Read(buf[from+int64(bytesRead) : to]) + nn, err := bytes.NewReader(d.zeros).Read(buf[from+int64(bytesRead) : to]) bytesRead += nn if err != nil { return err @@ -443,7 +531,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total fromZeroFillLarge := func(from, to int64) error { bytesRead64 := int64(0) for to-(from+bytesRead64) >= d.ChunkSize { - part, err := multi.PutPart(int(partNumber), bytes.NewReader(zeroBuf)) + part, err := multi.PutPart(int(partNumber), bytes.NewReader(d.zeros)) if err != nil { return err } @@ -724,3 +812,13 @@ func getPermissions() s3.ACL { func (d *driver) getContentType() string { return "application/octet-stream" } + +// getbuf returns a buffer from the driver's pool with length d.ChunkSize. +func (d *driver) getbuf() []byte { + return d.pool.Get().([]byte) +} + +func (d *driver) putbuf(p []byte) { + copy(p, d.zeros) + d.pool.Put(p) +} diff --git a/registry/storage/driver/storagedriver.go b/registry/storage/driver/storagedriver.go index 442dc257..cda1c37d 100644 --- a/registry/storage/driver/storagedriver.go +++ b/registry/storage/driver/storagedriver.go @@ -35,6 +35,11 @@ const CurrentVersion Version = "0.1" // StorageDriver defines methods that a Storage Driver must implement for a // filesystem-like key/value object storage. type StorageDriver interface { + // Name returns the human-readable "name" of the driver, useful in error + // messages and logging. By convention, this will just be the registration + // name, but drivers may provide other information here. + Name() string + // GetContent retrieves the content stored at "path" as a []byte. // This should primarily be used for small objects. GetContent(path string) ([]byte, error) diff --git a/registry/storage/driver/testsuites/testsuites.go b/registry/storage/driver/testsuites/testsuites.go index 74ddab6f..9f387a62 100644 --- a/registry/storage/driver/testsuites/testsuites.go +++ b/registry/storage/driver/testsuites/testsuites.go @@ -435,7 +435,7 @@ func (suite *DriverSuite) testContinueStreamAppend(c *check.C, chunkSize int64) c.Assert(err, check.IsNil) c.Assert(received, check.DeepEquals, fullContents) - // Writing past size of file extends file (no offest error). We would like + // Writing past size of file extends file (no offset error). We would like // to write chunk 4 one chunk length past chunk 3. It should be successful // and the resulting file will be 5 chunks long, with a chunk of all // zeros. diff --git a/registry/storage/layer_test.go b/registry/storage/layer_test.go index e225d068..f25018da 100644 --- a/registry/storage/layer_test.go +++ b/registry/storage/layer_test.go @@ -336,7 +336,7 @@ func seekerSize(seeker io.ReadSeeker) (int64, error) { // createTestLayer creates a simple test layer in the provided driver under // tarsum dgst, returning the sha256 digest location. This is implemented -// peicemeal and should probably be replaced by the uploader when it's ready. +// piecemeal and should probably be replaced by the uploader when it's ready. func writeTestLayer(driver storagedriver.StorageDriver, pathMapper *pathMapper, name string, dgst digest.Digest, content io.Reader) (digest.Digest, error) { h := sha256.New() rd := io.TeeReader(content, h) diff --git a/registry/storage/layerstore.go b/registry/storage/layerstore.go index 1c7428a9..a86b668f 100644 --- a/registry/storage/layerstore.go +++ b/registry/storage/layerstore.go @@ -65,7 +65,7 @@ func (ls *layerStore) Upload() (distribution.LayerUpload, error) { uuid := uuid.New() startedAt := time.Now().UTC() - path, err := ls.repository.registry.pm.path(uploadDataPathSpec{ + path, err := ls.repository.pm.path(uploadDataPathSpec{ name: ls.repository.Name(), uuid: uuid, }) @@ -74,7 +74,7 @@ func (ls *layerStore) Upload() (distribution.LayerUpload, error) { return nil, err } - startedAtPath, err := ls.repository.registry.pm.path(uploadStartedAtPathSpec{ + startedAtPath, err := ls.repository.pm.path(uploadStartedAtPathSpec{ name: ls.repository.Name(), uuid: uuid, }) @@ -95,7 +95,7 @@ func (ls *layerStore) Upload() (distribution.LayerUpload, error) { // state of the upload. func (ls *layerStore) Resume(uuid string) (distribution.LayerUpload, error) { ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Resume") - startedAtPath, err := ls.repository.registry.pm.path(uploadStartedAtPathSpec{ + startedAtPath, err := ls.repository.pm.path(uploadStartedAtPathSpec{ name: ls.repository.Name(), uuid: uuid, }) @@ -152,7 +152,7 @@ func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (di func (ls *layerStore) path(dgst digest.Digest) (string, error) { // We must traverse this path through the link to enforce ownership. - layerLinkPath, err := ls.repository.registry.pm.path(layerLinkPathSpec{name: ls.repository.Name(), digest: dgst}) + layerLinkPath, err := ls.repository.pm.path(layerLinkPathSpec{name: ls.repository.Name(), digest: dgst}) if err != nil { return "", err } diff --git a/registry/storage/layerwriter.go b/registry/storage/layerwriter.go index 1e5ea918..adf68ca9 100644 --- a/registry/storage/layerwriter.go +++ b/registry/storage/layerwriter.go @@ -46,16 +46,37 @@ func (lw *layerWriter) StartedAt() time.Time { // uploaded layer. The final size and checksum are validated against the // contents of the uploaded layer. The checksum should be provided in the // format :. -func (lw *layerWriter) Finish(digest digest.Digest) (distribution.Layer, error) { +func (lw *layerWriter) Finish(dgst digest.Digest) (distribution.Layer, error) { ctxu.GetLogger(lw.layerStore.repository.ctx).Debug("(*layerWriter).Finish") if err := lw.bufferedFileWriter.Close(); err != nil { return nil, err } - canonical, err := lw.validateLayer(digest) - if err != nil { + var ( + canonical digest.Digest + err error + ) + + // HACK(stevvooe): To deal with s3's lack of consistency, attempt to retry + // validation on failure. Three attempts are made, backing off + // retries*100ms each time. + for retries := 0; ; retries++ { + canonical, err = lw.validateLayer(dgst) + if err == nil { + break + } + + ctxu.GetLoggerWithField(lw.layerStore.repository.ctx, "retries", retries). + Errorf("error validating layer: %v", err) + + if retries < 3 { + time.Sleep(100 * time.Millisecond * time.Duration(retries+1)) + continue + } + return nil, err + } if err := lw.moveLayer(canonical); err != nil { @@ -64,7 +85,7 @@ func (lw *layerWriter) Finish(digest digest.Digest) (distribution.Layer, error) } // Link the layer blob into the repository. - if err := lw.linkLayer(canonical, digest); err != nil { + if err := lw.linkLayer(canonical, dgst); err != nil { return nil, err } @@ -137,7 +158,7 @@ type hashStateEntry struct { // getStoredHashStates returns a slice of hashStateEntries for this upload. func (lw *layerWriter) getStoredHashStates() ([]hashStateEntry, error) { - uploadHashStatePathPrefix, err := lw.layerStore.repository.registry.pm.path(uploadHashStatePathSpec{ + uploadHashStatePathPrefix, err := lw.layerStore.repository.pm.path(uploadHashStatePathSpec{ name: lw.layerStore.repository.Name(), uuid: lw.uuid, alg: lw.resumableDigester.Digest().Algorithm(), @@ -182,7 +203,7 @@ func (lw *layerWriter) resumeHashAt(offset int64) error { } if offset == int64(lw.resumableDigester.Len()) { - // State of digester is already at the requseted offset. + // State of digester is already at the requested offset. return nil } @@ -250,7 +271,7 @@ func (lw *layerWriter) resumeHashAt(offset int64) error { } func (lw *layerWriter) storeHashState() error { - uploadHashStatePath, err := lw.layerStore.repository.registry.pm.path(uploadHashStatePathSpec{ + uploadHashStatePath, err := lw.layerStore.repository.pm.path(uploadHashStatePathSpec{ name: lw.layerStore.repository.Name(), uuid: lw.uuid, alg: lw.resumableDigester.Digest().Algorithm(), @@ -324,6 +345,8 @@ func (lw *layerWriter) validateLayer(dgst digest.Digest) (digest.Digest, error) } if !verified { + ctxu.GetLoggerWithField(lw.layerStore.repository.ctx, "canonical", dgst). + Errorf("canonical digest does match provided digest") return "", distribution.ErrLayerInvalidDigest{ Digest: dgst, Reason: fmt.Errorf("content does not match digest"), @@ -337,7 +360,7 @@ func (lw *layerWriter) validateLayer(dgst digest.Digest) (digest.Digest, error) // identified by dgst. The layer should be validated before commencing the // move. func (lw *layerWriter) moveLayer(dgst digest.Digest) error { - blobPath, err := lw.layerStore.repository.registry.pm.path(blobDataPathSpec{ + blobPath, err := lw.layerStore.repository.pm.path(blobDataPathSpec{ digest: dgst, }) @@ -403,7 +426,7 @@ func (lw *layerWriter) linkLayer(canonical digest.Digest, aliases ...digest.Dige } seenDigests[dgst] = struct{}{} - layerLinkPath, err := lw.layerStore.repository.registry.pm.path(layerLinkPathSpec{ + layerLinkPath, err := lw.layerStore.repository.pm.path(layerLinkPathSpec{ name: lw.layerStore.repository.Name(), digest: dgst, }) @@ -412,7 +435,7 @@ func (lw *layerWriter) linkLayer(canonical digest.Digest, aliases ...digest.Dige return err } - if err := lw.layerStore.repository.registry.driver.PutContent(layerLinkPath, []byte(canonical)); err != nil { + if err := lw.layerStore.repository.driver.PutContent(layerLinkPath, []byte(canonical)); err != nil { return err } } @@ -424,7 +447,7 @@ func (lw *layerWriter) linkLayer(canonical digest.Digest, aliases ...digest.Dige // instance. An error will be returned if the clean up cannot proceed. If the // resources are already not present, no error will be returned. func (lw *layerWriter) removeResources() error { - dataPath, err := lw.layerStore.repository.registry.pm.path(uploadDataPathSpec{ + dataPath, err := lw.layerStore.repository.pm.path(uploadDataPathSpec{ name: lw.layerStore.repository.Name(), uuid: lw.uuid, }) diff --git a/registry/storage/paths.go b/registry/storage/paths.go index 7aeff6e4..fe648f51 100644 --- a/registry/storage/paths.go +++ b/registry/storage/paths.go @@ -387,7 +387,7 @@ type layerLinkPathSpec struct { func (layerLinkPathSpec) pathSpec() {} // blobAlgorithmReplacer does some very simple path sanitization for user -// input. Mostly, this is to provide some heirachry for tarsum digests. Paths +// input. Mostly, this is to provide some hierarchy for tarsum digests. Paths // should be "safe" before getting this far due to strict digest requirements // but we can add further path conversion here, if needed. var blobAlgorithmReplacer = strings.NewReplacer(