Merge pull request #499 from RichardScothern/release/2.0.1

Release/2.0.1
This commit is contained in:
Stephen Day 2015-05-06 14:02:10 -07:00
commit d18399f0bf
44 changed files with 1002 additions and 142 deletions

View file

@ -49,7 +49,7 @@ Then immediately submit this new file as a pull-request, in order to get early f
Eventually, you will have to update your proposal to accommodate the feedback you received.
Usually, it's not advisable to start working too much on the implementation itself before the proposal receives sufficient feedback, since it can significantly altered (or rejected).
Usually, it's not advisable to start working too much on the implementation itself before the proposal receives sufficient feedback, since it can be significantly altered (or rejected).
Your implementation should then be submitted as a separate PR, that will be reviewed as well.

6
Godeps/Godeps.json generated
View file

@ -12,15 +12,15 @@
},
{
"ImportPath": "github.com/AdRoll/goamz/aws",
"Rev": "d3664b76d90508cdda5a6c92042f26eab5db3103"
"Rev": "cc210f45dcb9889c2769a274522be2bf70edfb99"
},
{
"ImportPath": "github.com/AdRoll/goamz/cloudfront",
"Rev": "d3664b76d90508cdda5a6c92042f26eab5db3103"
"Rev": "cc210f45dcb9889c2769a274522be2bf70edfb99"
},
{
"ImportPath": "github.com/AdRoll/goamz/s3",
"Rev": "d3664b76d90508cdda5a6c92042f26eab5db3103"
"Rev": "cc210f45dcb9889c2769a274522be2bf70edfb99"
},
{
"ImportPath": "github.com/MSOpenTech/azure-sdk-for-go/storage",

View file

@ -62,6 +62,7 @@ type Region struct {
SESEndpoint string
IAMEndpoint string
ELBEndpoint string
KMSEndpoint string
DynamoDBEndpoint string
CloudWatchServicepoint ServiceInfo
AutoScalingEndpoint string
@ -83,6 +84,7 @@ var Regions = map[string]Region{
USWest2.Name: USWest2,
USGovWest.Name: USGovWest,
SAEast.Name: SAEast,
CNNorth1.Name: CNNorth1,
}
// Designates a signer interface suitable for signing AWS requests, params
@ -208,7 +210,10 @@ func (a *Auth) Token() string {
return ""
}
if time.Since(a.expiration) >= -30*time.Second { //in an ideal world this should be zero assuming the instance is synching it's clock
*a, _ = GetAuth("", "", "", time.Time{})
auth, err := GetAuth("", "", "", time.Time{})
if err == nil {
*a = auth
}
}
return a.token
}

View file

@ -13,6 +13,7 @@ var USGovWest = Region{
"",
"https://iam.us-gov.amazonaws.com",
"https://elasticloadbalancing.us-gov-west-1.amazonaws.com",
"",
"https://dynamodb.us-gov-west-1.amazonaws.com",
ServiceInfo{"https://monitoring.us-gov-west-1.amazonaws.com", V2Signature},
"https://autoscaling.us-gov-west-1.amazonaws.com",
@ -36,6 +37,7 @@ var USEast = Region{
"https://email.us-east-1.amazonaws.com",
"https://iam.amazonaws.com",
"https://elasticloadbalancing.us-east-1.amazonaws.com",
"https://kms.us-east-1.amazonaws.com",
"https://dynamodb.us-east-1.amazonaws.com",
ServiceInfo{"https://monitoring.us-east-1.amazonaws.com", V2Signature},
"https://autoscaling.us-east-1.amazonaws.com",
@ -59,6 +61,7 @@ var USWest = Region{
"",
"https://iam.amazonaws.com",
"https://elasticloadbalancing.us-west-1.amazonaws.com",
"https://kms.us-west-1.amazonaws.com",
"https://dynamodb.us-west-1.amazonaws.com",
ServiceInfo{"https://monitoring.us-west-1.amazonaws.com", V2Signature},
"https://autoscaling.us-west-1.amazonaws.com",
@ -82,6 +85,7 @@ var USWest2 = Region{
"https://email.us-west-2.amazonaws.com",
"https://iam.amazonaws.com",
"https://elasticloadbalancing.us-west-2.amazonaws.com",
"https://kms.us-west-2.amazonaws.com",
"https://dynamodb.us-west-2.amazonaws.com",
ServiceInfo{"https://monitoring.us-west-2.amazonaws.com", V2Signature},
"https://autoscaling.us-west-2.amazonaws.com",
@ -105,6 +109,7 @@ var EUWest = Region{
"https://email.eu-west-1.amazonaws.com",
"https://iam.amazonaws.com",
"https://elasticloadbalancing.eu-west-1.amazonaws.com",
"https://kms.eu-west-1.amazonaws.com",
"https://dynamodb.eu-west-1.amazonaws.com",
ServiceInfo{"https://monitoring.eu-west-1.amazonaws.com", V2Signature},
"https://autoscaling.eu-west-1.amazonaws.com",
@ -128,6 +133,7 @@ var EUCentral = Region{
"",
"https://iam.amazonaws.com",
"https://elasticloadbalancing.eu-central-1.amazonaws.com",
"https://kms.eu-central-1.amazonaws.com",
"https://dynamodb.eu-central-1.amazonaws.com",
ServiceInfo{"https://monitoring.eu-central-1.amazonaws.com", V2Signature},
"https://autoscaling.eu-central-1.amazonaws.com",
@ -151,6 +157,7 @@ var APSoutheast = Region{
"",
"https://iam.amazonaws.com",
"https://elasticloadbalancing.ap-southeast-1.amazonaws.com",
"https://kms.ap-southeast-1.amazonaws.com",
"https://dynamodb.ap-southeast-1.amazonaws.com",
ServiceInfo{"https://monitoring.ap-southeast-1.amazonaws.com", V2Signature},
"https://autoscaling.ap-southeast-1.amazonaws.com",
@ -174,6 +181,7 @@ var APSoutheast2 = Region{
"",
"https://iam.amazonaws.com",
"https://elasticloadbalancing.ap-southeast-2.amazonaws.com",
"https://kms.ap-southeast-2.amazonaws.com",
"https://dynamodb.ap-southeast-2.amazonaws.com",
ServiceInfo{"https://monitoring.ap-southeast-2.amazonaws.com", V2Signature},
"https://autoscaling.ap-southeast-2.amazonaws.com",
@ -197,6 +205,7 @@ var APNortheast = Region{
"",
"https://iam.amazonaws.com",
"https://elasticloadbalancing.ap-northeast-1.amazonaws.com",
"https://kms.ap-northeast-1.amazonaws.com",
"https://dynamodb.ap-northeast-1.amazonaws.com",
ServiceInfo{"https://monitoring.ap-northeast-1.amazonaws.com", V2Signature},
"https://autoscaling.ap-northeast-1.amazonaws.com",
@ -220,6 +229,7 @@ var SAEast = Region{
"",
"https://iam.amazonaws.com",
"https://elasticloadbalancing.sa-east-1.amazonaws.com",
"https://kms.sa-east-1.amazonaws.com",
"https://dynamodb.sa-east-1.amazonaws.com",
ServiceInfo{"https://monitoring.sa-east-1.amazonaws.com", V2Signature},
"https://autoscaling.sa-east-1.amazonaws.com",
@ -229,3 +239,27 @@ var SAEast = Region{
"https://cloudformation.sa-east-1.amazonaws.com",
"https://elasticache.sa-east-1.amazonaws.com",
}
var CNNorth1 = Region{
"cn-north-1",
"https://ec2.cn-north-1.amazonaws.com.cn",
"https://s3.cn-north-1.amazonaws.com.cn",
"",
true,
true,
"",
"https://sns.cn-north-1.amazonaws.com.cn",
"https://sqs.cn-north-1.amazonaws.com.cn",
"",
"https://iam.cn-north-1.amazonaws.com.cn",
"https://elasticloadbalancing.cn-north-1.amazonaws.com.cn",
"",
"https://dynamodb.cn-north-1.amazonaws.com.cn",
ServiceInfo{"https://monitoring.cn-north-1.amazonaws.com.cn", V4Signature},
"https://autoscaling.cn-north-1.amazonaws.com.cn",
ServiceInfo{"https://rds.cn-north-1.amazonaws.com.cn", V4Signature},
"",
"https://sts.cn-north-1.amazonaws.com.cn",
"",
"",
}

View file

@ -25,6 +25,7 @@ import (
"net/http"
"net/http/httputil"
"net/url"
"path"
"strconv"
"strings"
"time"
@ -70,9 +71,8 @@ type Options struct {
ContentMD5 string
ContentDisposition string
Range string
StorageClass StorageClass
// What else?
//// The following become headers so they are []strings rather than strings... I think
// x-amz-storage-class []string
}
type CopyOptions struct {
@ -96,7 +96,7 @@ var attempts = aws.AttemptStrategy{
// New creates a new S3.
func New(auth aws.Auth, region aws.Region) *S3 {
return &S3{auth, region, 0, 0, 0, aws.V2Signature}
return &S3{auth, region, 0, 0, aws.V2Signature, 0}
}
// Bucket returns a Bucket with the given name.
@ -164,6 +164,13 @@ const (
BucketOwnerFull = ACL("bucket-owner-full-control")
)
type StorageClass string
const (
ReducedRedundancy = StorageClass("REDUCED_REDUNDANCY")
StandardStorage = StorageClass("STANDARD")
)
// PutBucket creates a new bucket.
//
// See http://goo.gl/ndjnR for details.
@ -401,6 +408,10 @@ func (o Options) addHeaders(headers map[string][]string) {
if len(o.ContentDisposition) != 0 {
headers["Content-Disposition"] = []string{o.ContentDisposition}
}
if len(o.StorageClass) != 0 {
headers["x-amz-storage-class"] = []string{string(o.StorageClass)}
}
for k, v := range o.Meta {
headers["x-amz-meta-"+k] = v
}
@ -816,8 +827,8 @@ func (b *Bucket) SignedURLWithMethod(method, path string, expires time.Time, par
// UploadSignedURL returns a signed URL that allows anyone holding the URL
// to upload the object at path. The signature is valid until expires.
// contenttype is a string like image/png
// path is the resource name in s3 terminalogy like images/ali.png [obviously exclusing the bucket name itself]
func (b *Bucket) UploadSignedURL(path, method, content_type string, expires time.Time) string {
// name is the resource name in s3 terminology like images/ali.png [obviously excluding the bucket name itself]
func (b *Bucket) UploadSignedURL(name, method, content_type string, expires time.Time) string {
expire_date := expires.Unix()
if method != "POST" {
method = "PUT"
@ -830,7 +841,7 @@ func (b *Bucket) UploadSignedURL(path, method, content_type string, expires time
tokenData = "x-amz-security-token:" + a.Token() + "\n"
}
stringToSign := method + "\n\n" + content_type + "\n" + strconv.FormatInt(expire_date, 10) + "\n" + tokenData + "/" + b.Name + "/" + path
stringToSign := method + "\n\n" + content_type + "\n" + strconv.FormatInt(expire_date, 10) + "\n" + tokenData + "/" + path.Join(b.Name, name)
secretKey := a.SecretKey
accessId := a.AccessKey
mac := hmac.New(sha1.New, []byte(secretKey))
@ -844,7 +855,7 @@ func (b *Bucket) UploadSignedURL(path, method, content_type string, expires time
log.Println("ERROR sining url for S3 upload", err)
return ""
}
signedurl.Path += path
signedurl.Path = name
params := url.Values{}
params.Add("AWSAccessKeyId", accessId)
params.Add("Expires", strconv.FormatInt(expire_date, 10))

View file

@ -230,6 +230,22 @@ func (s *S) TestPutObject(c *check.C) {
c.Assert(req.Header["X-Amz-Acl"], check.DeepEquals, []string{"private"})
}
func (s *S) TestPutObjectReducedRedundancy(c *check.C) {
testServer.Response(200, nil, "")
b := s.s3.Bucket("bucket")
err := b.Put("name", []byte("content"), "content-type", s3.Private, s3.Options{StorageClass: s3.ReducedRedundancy})
c.Assert(err, check.IsNil)
req := testServer.WaitRequest()
c.Assert(req.Method, check.Equals, "PUT")
c.Assert(req.URL.Path, check.Equals, "/bucket/name")
c.Assert(req.Header["Date"], check.Not(check.DeepEquals), []string{""})
c.Assert(req.Header["Content-Type"], check.DeepEquals, []string{"content-type"})
c.Assert(req.Header["Content-Length"], check.DeepEquals, []string{"7"})
c.Assert(req.Header["X-Amz-Storage-Class"], check.DeepEquals, []string{"REDUCED_REDUNDANCY"})
}
// PutCopy docs: http://goo.gl/mhEHtA
func (s *S) TestPutCopy(c *check.C) {
testServer.Response(200, nil, PutCopyResultDump)

View file

@ -11,6 +11,7 @@ import (
"io"
"io/ioutil"
"log"
"math/rand"
"net"
"net/http"
"net/url"
@ -51,6 +52,10 @@ type Config struct {
// all other regions.
// http://docs.amazonwebservices.com/AmazonS3/latest/API/ErrorResponses.html
Send409Conflict bool
// Address on which to listen. By default, a random port is assigned by the
// operating system and the server listens on localhost.
ListenAddress string
}
func (c *Config) send409Conflict() bool {
@ -76,6 +81,7 @@ type bucket struct {
acl s3.ACL
ctime time.Time
objects map[string]*object
multipartUploads map[string][]*multipartUploadPart
}
type object struct {
@ -86,6 +92,12 @@ type object struct {
data []byte
}
type multipartUploadPart struct {
data []byte
etag string
lastModified time.Time
}
// A resource encapsulates the subject of an HTTP request.
// The resource referred to may or may not exist
// when the request is made.
@ -97,7 +109,13 @@ type resource interface {
}
func NewServer(config *Config) (*Server, error) {
l, err := net.Listen("tcp", "localhost:0")
listenAddress := "localhost:0"
if config != nil && config.ListenAddress != "" {
listenAddress = config.ListenAddress
}
l, err := net.Listen("tcp", listenAddress)
if err != nil {
return nil, fmt.Errorf("cannot listen on localhost: %v", err)
}
@ -217,10 +235,8 @@ var unimplementedBucketResourceNames = map[string]bool{
}
var unimplementedObjectResourceNames = map[string]bool{
"uploadId": true,
"acl": true,
"torrent": true,
"uploads": true,
}
var pathRegexp = regexp.MustCompile("/(([^/]+)(/(.*))?)?")
@ -421,6 +437,7 @@ func (r bucketResource) put(a *action) interface{} {
name: r.name,
// TODO default acl
objects: make(map[string]*object),
multipartUploads: make(map[string][]*multipartUploadPart),
}
a.srv.buckets[r.name] = r.bucket
created = true
@ -615,12 +632,29 @@ func (objr objectResource) put(a *action) interface{} {
// TODO x-amz-server-side-encryption
// TODO x-amz-storage-class
// TODO is this correct, or should we erase all previous metadata?
obj := objr.object
if obj == nil {
obj = &object{
name: objr.name,
meta: make(http.Header),
uploadId := a.req.URL.Query().Get("uploadId")
// Check that the upload ID is valid if this is a multipart upload
if uploadId != "" {
if _, ok := objr.bucket.multipartUploads[uploadId]; !ok {
fatalf(404, "NoSuchUpload", "The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed.")
}
partNumberStr := a.req.URL.Query().Get("partNumber")
if partNumberStr == "" {
fatalf(400, "InvalidRequest", "Missing partNumber parameter")
}
partNumber, err := strconv.ParseUint(partNumberStr, 10, 32)
if err != nil {
fatalf(400, "InvalidRequest", "partNumber is not a number")
}
// Parts are 1-indexed for multipart uploads
if uint(partNumber)-1 != uint(len(objr.bucket.multipartUploads[uploadId])) {
fatalf(400, "InvalidRequest", "Invalid part number")
}
}
@ -646,6 +680,22 @@ func (objr objectResource) put(a *action) interface{} {
fatalf(400, "IncompleteBody", "You did not provide the number of bytes specified by the Content-Length HTTP header")
}
etag := fmt.Sprintf("\"%x\"", gotHash)
a.w.Header().Add("ETag", etag)
if uploadId == "" {
// For traditional uploads
// TODO is this correct, or should we erase all previous metadata?
obj := objr.object
if obj == nil {
obj = &object{
name: objr.name,
meta: make(http.Header),
}
}
// PUT request has been successful - save data and metadata
for key, values := range a.req.Header {
key = http.CanonicalHeaderKey(key)
@ -657,15 +707,143 @@ func (objr objectResource) put(a *action) interface{} {
obj.checksum = gotHash
obj.mtime = time.Now()
objr.bucket.objects[objr.name] = obj
} else {
// For multipart commit
parts := objr.bucket.multipartUploads[uploadId]
part := &multipartUploadPart{
data,
etag,
time.Now(),
}
objr.bucket.multipartUploads[uploadId] = append(parts, part)
}
return nil
}
func (objr objectResource) delete(a *action) interface{} {
uploadId := a.req.URL.Query().Get("uploadId")
if uploadId == "" {
// Traditional object delete
delete(objr.bucket.objects, objr.name)
} else {
// Multipart commit abort
_, ok := objr.bucket.multipartUploads[uploadId]
if !ok {
fatalf(404, "NoSuchUpload", "The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed.")
}
delete(objr.bucket.multipartUploads, uploadId)
}
return nil
}
func (objr objectResource) post(a *action) interface{} {
// Check if we're initializing a multipart upload
if _, ok := a.req.URL.Query()["uploads"]; ok {
type multipartInitResponse struct {
XMLName struct{} `xml:"InitiateMultipartUploadResult"`
Bucket string
Key string
UploadId string
}
uploadId := strconv.FormatInt(rand.Int63(), 16)
objr.bucket.multipartUploads[uploadId] = []*multipartUploadPart{}
return &multipartInitResponse{
Bucket: objr.bucket.name,
Key: objr.name,
UploadId: uploadId,
}
}
// Check if we're completing a multipart upload
if uploadId := a.req.URL.Query().Get("uploadId"); uploadId != "" {
type multipartCompleteRequestPart struct {
XMLName struct{} `xml:"Part"`
PartNumber uint
ETag string
}
type multipartCompleteRequest struct {
XMLName struct{} `xml:"CompleteMultipartUpload"`
Part []multipartCompleteRequestPart
}
type multipartCompleteResponse struct {
XMLName struct{} `xml:"CompleteMultipartUploadResult"`
Location string
Bucket string
Key string
ETag string
}
parts, ok := objr.bucket.multipartUploads[uploadId]
if !ok {
fatalf(404, "NoSuchUpload", "The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed.")
}
req := &multipartCompleteRequest{}
if err := xml.NewDecoder(a.req.Body).Decode(req); err != nil {
fatalf(400, "InvalidRequest", err.Error())
}
if len(req.Part) != len(parts) {
fatalf(400, "InvalidRequest", fmt.Sprintf("Number of parts does not match: expected %d, received %d", len(parts), len(req.Part)))
}
sum := md5.New()
data := &bytes.Buffer{}
w := io.MultiWriter(sum, data)
for i, p := range parts {
reqPart := req.Part[i]
if reqPart.PartNumber != uint(1+i) {
fatalf(400, "InvalidRequest", "Bad part number")
}
if reqPart.ETag != p.etag {
fatalf(400, "InvalidRequest", fmt.Sprintf("Invalid etag for part %d", reqPart.PartNumber))
}
w.Write(p.data)
}
delete(objr.bucket.multipartUploads, uploadId)
obj := objr.object
if obj == nil {
obj = &object{
name: objr.name,
meta: make(http.Header),
}
}
obj.data = data.Bytes()
obj.checksum = sum.Sum(nil)
obj.mtime = time.Now()
objr.bucket.objects[objr.name] = obj
objectLocation := fmt.Sprintf("http://%s/%s/%s", a.srv.listener.Addr().String(), objr.bucket.name, objr.name)
return &multipartCompleteResponse{
Location: objectLocation,
Bucket: objr.bucket.name,
Key: objr.name,
ETag: uploadId,
}
}
fatalf(400, "MethodNotAllowed", "The specified method is not allowed against this resource")
return nil
}

View file

@ -6,8 +6,8 @@ machine:
post:
# Install many go versions
- gvm install go1.3.3 -B --name=old
- gvm install go1.4 -B --name=stable
# - gvm install go1.3.3 -B --name=old
- gvm install go1.4.2 -B --name=stable
# - gvm install tip --name=bleed
environment:
@ -28,10 +28,10 @@ machine:
dependencies:
pre:
# Copy the code to the gopath of all go versions
- >
gvm use old &&
mkdir -p "$(dirname $BASE_OLD)" &&
cp -R "$CHECKOUT" "$BASE_OLD"
# - >
# gvm use old &&
# mkdir -p "$(dirname $BASE_OLD)" &&
# cp -R "$CHECKOUT" "$BASE_OLD"
- >
gvm use stable &&
@ -45,8 +45,8 @@ dependencies:
override:
# Install dependencies for every copied clone/go version
- gvm use old && go get github.com/tools/godep:
pwd: $BASE_OLD
# - gvm use old && go get github.com/tools/godep:
# pwd: $BASE_OLD
- gvm use stable && go get github.com/tools/godep:
pwd: $BASE_STABLE
@ -63,7 +63,7 @@ dependencies:
test:
pre:
# Output the go versions we are going to test
- gvm use old && go version
# - gvm use old && go version
- gvm use stable && go version
# - gvm use bleed && go version
@ -81,9 +81,9 @@ test:
override:
# Test every version we have (but stable)
- gvm use old; godep go test -test.v -test.short ./...:
timeout: 600
pwd: $BASE_OLD
# - gvm use old; godep go test -test.v -test.short ./...:
# timeout: 600
# pwd: $BASE_OLD
# - gvm use bleed; go test -test.v -test.short ./...:
# timeout: 600
@ -103,10 +103,11 @@ test:
# Aggregate and report to coveralls
- gvm use stable; go list ./... | xargs -L 1 -I{} cat "$GOPATH/src/{}/coverage.out" | grep -v "$CIRCLE_PAIN" >> ~/goverage.report:
pwd: $BASE_STABLE
- gvm use stable; goveralls -service circleci -coverprofile=/home/ubuntu/goverage.report -repotoken $COVERALLS_TOKEN:
pwd: $BASE_STABLE
# - gvm use stable; goveralls -service circleci -coverprofile=/home/ubuntu/goverage.report -repotoken $COVERALLS_TOKEN:
# pwd: $BASE_STABLE
## Notes
# Disabled coveralls reporting: build breaking sending coverage data to coveralls
# Disabled the -race detector due to massive memory usage.
# Do we want these as well?
# - go get code.google.com/p/go.tools/cmd/goimports

View file

@ -9,6 +9,9 @@ storage:
layerinfo: inmemory
filesystem:
rootdirectory: /tmp/registry-dev
maintenance:
uploadpurging:
enabled: false
http:
addr: :5000
secret: asecretforlocaldevelopment
@ -39,3 +42,4 @@ notifications:
threshold: 10
backoff: 1s
disabled: true

View file

@ -188,6 +188,8 @@ func (storage Storage) Type() string {
// Return only key in this map
for k := range storage {
switch k {
case "maintenance":
// allow configuration of maintenance
case "cache":
// allow configuration of caching
default:
@ -217,6 +219,8 @@ func (storage *Storage) UnmarshalYAML(unmarshal func(interface{}) error) error {
types := make([]string, 0, len(storageMap))
for k := range storageMap {
switch k {
case "maintenance":
// allow for configuration of maintenance
case "cache":
// allow configuration of caching
default:

View file

@ -302,7 +302,7 @@ func (irw *instrumentedResponseWriter) Flush() {
func (irw *instrumentedResponseWriter) Value(key interface{}) interface{} {
if keyStr, ok := key.(string); ok {
if keyStr == "http.response" {
return irw.ResponseWriter
return irw
}
if !strings.HasPrefix(keyStr, "http.response.") {
@ -322,9 +322,7 @@ func (irw *instrumentedResponseWriter) Value(key interface{}) interface{} {
case "written":
return irw.written
case "status":
if irw.status != 0 {
return irw.status
}
case "contenttype":
contentType := irw.Header().Get("Content-Type")
if contentType != "" {

View file

@ -132,8 +132,21 @@ func TestWithResponseWriter(t *testing.T) {
trw := testResponseWriter{}
ctx, rw := WithResponseWriter(Background(), &trw)
if ctx.Value("http.response") != &trw {
t.Fatalf("response not available in context: %v != %v", ctx.Value("http.response"), &trw)
if ctx.Value("http.response") != rw {
t.Fatalf("response not available in context: %v != %v", ctx.Value("http.response"), rw)
}
grw, err := GetResponseWriter(ctx)
if err != nil {
t.Fatalf("error getting response writer: %v", err)
}
if grw != rw {
t.Fatalf("unexpected response writer returned: %#v != %#v", grw, rw)
}
if ctx.Value("http.response.status") != 0 {
t.Fatalf("response status should always be a number and should be zero here: %v != 0", ctx.Value("http.response.status"))
}
if n, err := rw.Write(make([]byte, 1024)); err != nil {

36
contrib/apache/README.MD Normal file
View file

@ -0,0 +1,36 @@
# Apache HTTPd sample for Registry v1, v2 and mirror
3 containers involved
* Docker Registry v1 (registry 0.9.1)
* Docker Registry v2 (registry 2.0.0)
* Docker Registry v1 in mirror mode
HTTP for mirror and HTTPS for v1 & v2
* http://registry.example.com proxify Docker Registry 1.0 in Mirror mode
* https://registry.example.com proxify Docker Registry 1.0 or 2.0 in Hosting mode
## 3 Docker containers should be started
* Docker Registry 1.0 in Mirror mode : port 5001
* Docker Registry 1.0 in Hosting mode : port 5000
* Docker Registry 2.0 in Hosting mode : port 5002
### Registry v1
docker run -d -e SETTINGS_FLAVOR=dev -v /var/lib/docker-registry/storage/hosting-v1:/tmp -p 5000:5000 registry:0.9.1"
### Mirror
docker run -d -e SETTINGS_FLAVOR=dev -e STANDALONE=false -e MIRROR_SOURCE=https://registry-1.docker.io -e MIRROR_SOURCE_INDEX=https://index.docker.io \
-e MIRROR_TAGS_CACHE_TTL=172800 -v /var/lib/docker-registry/storage/mirror:/tmp -p 5001:5000 registry:0.9.1"
### Registry v2
docker run -d -e SETTINGS_FLAVOR=dev -v /var/lib/axway/docker-registry/storage/hosting2-v2:/tmp -p 5002:5000 registry:2.0"
# For Hosting mode access
* users should have account (valid-user) to be able to fetch images
* only users using account docker-deployer will be allowed to push images

127
contrib/apache/apache.conf Normal file
View file

@ -0,0 +1,127 @@
#
# Sample Apache 2.x configuration where :
#
<VirtualHost *:80>
ServerName registry.example.com
ServerAlias www.registry.example.com
ProxyRequests off
ProxyPreserveHost on
# no proxy for /error/ (Apache HTTPd errors messages)
ProxyPass /error/ !
ProxyPass /_ping http://localhost:5001/_ping
ProxyPassReverse /_ping http://localhost:5001/_ping
ProxyPass /v1 http://localhost:5001/v1
ProxyPassReverse /v1 http://localhost:5001/v1
# Logs
ErrorLog ${APACHE_LOG_DIR}/mirror_error_log
CustomLog ${APACHE_LOG_DIR}/mirror_access_log combined env=!dontlog
</VirtualHost>
<VirtualHost *:443>
ServerName registry.example.com
ServerAlias www.registry.example.com
SSLEngine on
SSLCertificateFile /etc/apache2/ssl/registry.example.com.crt
SSLCertificateKeyFile /etc/apache2/ssl/registry.example.com.key
# Higher Strength SSL Ciphers
SSLProtocol all -SSLv2 -SSLv3 -TLSv1
SSLCipherSuite RC4-SHA:HIGH
SSLHonorCipherOrder on
# Logs
ErrorLog ${APACHE_LOG_DIR}/registry_error_ssl_log
CustomLog ${APACHE_LOG_DIR}/registry_access_ssl_log combined env=!dontlog
Header set Host "registry.example.com"
Header set "Docker-Distribution-Api-Version" "registry/2.0"
RequestHeader set X-Forwarded-Proto "https"
ProxyRequests off
ProxyPreserveHost on
# no proxy for /error/ (Apache HTTPd errors messages)
ProxyPass /error/ !
#
# Registry v1
#
ProxyPass /v1 http://localhost:5000/v1
ProxyPassReverse /v1 http://localhost:5000/v1
ProxyPass /_ping http://localhost:5000/_ping
ProxyPassReverse /_ping http://localhost:5000/_ping
# Authentication require for push
<Location /v1>
Order deny,allow
Allow from all
AuthName "Registry Authentication"
AuthType basic
AuthUserFile "/etc/apache2/htpasswd/registry-htpasswd"
# Read access to authentified users
<Limit GET HEAD>
Require valid-user
</Limit>
# Write access to docker-deployer account only
<Limit POST PUT DELETE>
Require user docker-deployer
</Limit>
</Location>
# Allow ping to run unauthenticated.
<Location /v1/_ping>
Satisfy any
Allow from all
</Location>
# Allow ping to run unauthenticated.
<Location /_ping>
Satisfy any
Allow from all
</Location>
#
# Registry v2
#
ProxyPass /v2 http://localhost:5002/v2
ProxyPassReverse /v2 http://localhost:5002/v2
<Location /v2>
Order deny,allow
Allow from all
AuthName "Registry Authentication"
AuthType basic
AuthUserFile "/etc/apache2/htpasswd/registry-htpasswd"
# Read access to authentified users
<Limit GET HEAD>
Require valid-user
</Limit>
# Write access to docker-deployer only
<Limit POST PUT DELETE>
Require user docker-deployer
</Limit>
</Location>
</VirtualHost>

View file

@ -6,10 +6,10 @@ import (
"regexp"
)
// TarSumRegexp defines a reguler expression to match tarsum identifiers.
// TarSumRegexp defines a regular expression to match tarsum identifiers.
var TarsumRegexp = regexp.MustCompile("tarsum(?:.[a-z0-9]+)?\\+[a-zA-Z0-9]+:[A-Fa-f0-9]+")
// TarsumRegexpCapturing defines a reguler expression to match tarsum identifiers with
// TarsumRegexpCapturing defines a regular expression to match tarsum identifiers with
// capture groups corresponding to each component.
var TarsumRegexpCapturing = regexp.MustCompile("(tarsum)(.([a-z0-9]+))?\\+([a-zA-Z0-9]+):([A-Fa-f0-9]+)")

2
doc.go
View file

@ -2,6 +2,6 @@
// docker distribution. The goal is to allow users to reliably package, ship
// and store content related to docker images.
//
// This is currently a work in progress. More details are availalbe in the
// This is currently a work in progress. More details are available in the
// README.md.
package distribution

View file

@ -57,6 +57,12 @@ storage:
rootdirectory: /s3/object/name/prefix
cache:
layerinfo: inmemory
maintenance:
uploadpurging:
enabled: true
age: 168h
interval: 24h
dryrun: false
auth:
silly:
realm: silly-realm
@ -130,6 +136,34 @@ options marked as **required**. This indicates that you can omit the parent with
all its children. However, if the parent is included, you must also include all
the children marked **required**.
## Override configuration options
You can use environment variables to override most configuration parameters. The
exception is the `version` variable which cannot be overridden. You can set
environment variables on the command line using the `-e` flag on `docker run` or
from within a Dockerfile using the `ENV` instruction.
To override a configuration option, create an environment variable named
`REGISTRY\variable_` where *`variable`* is the name of the configuration option
and the `_` (underscore) represents indention levels. For example, you can
configure the `rootdirectory` of the `filesystem` storage backend:
```
storage:
filesystem:
rootdirectory: /tmp/registry
```
To override this value, set an environment variable like this:
```
REGISTRY_STORAGE_FILESYSTEM_ROOTDIRECTORY=/tmp/registry/test
```
This variable overrides the `/tmp/registry` value to the `/tmp/registry/test`
directory.
## version
```yaml
@ -235,6 +269,12 @@ storage:
rootdirectory: /s3/object/name/prefix
cache:
layerinfo: inmemory
maintenance:
uploadpurging:
enabled: true
age: 168h
interval: 24h
dryrun: false
```
The storage option is **required** and defines which storage backend is in use.
@ -424,6 +464,27 @@ This storage backend uses Amazon's Simple Storage Service (S3).
</tr>
</table>
### Maintenance
Currently the registry can perform one maintenance function: upload purging. This and future
maintenance functions which are related to storage can be configured under the maintenance section.
### Upload Purging
Upload purging is a background process that periodically removes orphaned files from the upload
directories of the registry. Upload purging is enabled by default. To
configure upload directory purging, the following parameters
must be set.
| Parameter | Required | Description
--------- | -------- | -----------
`enabled` | yes | Set to true to enable upload purging. Default=true. |
`age` | yes | Upload directories which are older than this age will be deleted. Default=168h (1 week)
`interval` | yes | The interval between upload directory purging. Default=24h.
`dryrun` | yes | dryrun can be set to true to obtain a summary of what directories will be deleted. Default=false.
Note: `age` and `interval` are strings containing a number with optional fraction and a unit suffix: e.g. 45m, 2h10m, 168h (1 week).
## auth
@ -1154,6 +1215,7 @@ Configure the behavior of the Redis connection pool.
</tr>
</table>
## Example: Development configuration
The following is a simple example you can use for local development:

View file

@ -208,6 +208,9 @@ storage:
layerinfo: inmemory
filesystem:
rootdirectory: /tmp/registry-dev
maintenance:
uploadpurging:
enabled: false
http:
addr: :5000
secret: asecretforlocaldevelopment

56
docs/osx-setup-guide.md Normal file
View file

@ -0,0 +1,56 @@
# OS X Setup Guide
This guide will walk you through running the new Go based [Docker registry](https://github.com/docker/distribution) on your local OS X machine.
## Checkout the Docker Distribution source tree
```
mkdir -p $GOPATH/src/github.com/docker
git clone https://github.com/docker/distribution.git $GOPATH/src/github.com/docker/distribution
cd $GOPATH/src/github.com/docker/distribution
```
## Build the registry binary
```
GOPATH=$(PWD)/Godeps/_workspace:$GOPATH make binaries
sudo cp bin/registry /usr/local/libexec/registry
```
## Setup
Copy the registry configuration file in place:
```
mkdir /Users/Shared/Registry
cp docs/osx/config.yml /Users/Shared/Registry/config.yml
```
## Running the Docker Registry under launchd
Copy the Docker registry plist into place:
```
plutil -lint docs/osx/com.docker.registry.plist
cp docs/osx/com.docker.registry.plist ~/Library/LaunchAgents/
chmod 644 ~/Library/LaunchAgents/com.docker.registry.plist
```
Start the Docker registry:
```
launchctl load ~/Library/LaunchAgents/com.docker.registry.plist
```
### Restarting the docker registry service
```
launchctl stop com.docker.registry
launchctl start com.docker.registry
```
### Unloading the docker registry service
```
launchctl unload ~/Library/LaunchAgents/com.docker.registry.plist
```

View file

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>com.docker.registry</string>
<key>KeepAlive</key>
<true/>
<key>StandardErrorPath</key>
<string>/Users/Shared/Registry/registry.log</string>
<key>StandardOutPath</key>
<string>/Users/Shared/Registry/registry.log</string>
<key>Program</key>
<string>/usr/local/libexec/registry</string>
<key>ProgramArguments</key>
<array>
<string>/usr/local/libexec/registry</string>
<string>/Users/Shared/Registry/config.yml</string>
</array>
<key>Sockets</key>
<dict>
<key>http-listen-address</key>
<dict>
<key>SockServiceName</key>
<string>5000</string>
<key>SockType</key>
<string>dgram</string>
<key>SockFamily</key>
<string>IPv4</string>
</dict>
<key>http-debug-address</key>
<dict>
<key>SockServiceName</key>
<string>5001</string>
<key>SockType</key>
<string>dgram</string>
<key>SockFamily</key>
<string>IPv4</string>
</dict>
</dict>
</dict>
</plist>

16
docs/osx/config.yml Normal file
View file

@ -0,0 +1,16 @@
version: 0.1
log:
level: info
fields:
service: registry
environment: macbook-air
storage:
cache:
layerinfo: inmemory
filesystem:
rootdirectory: /Users/Shared/Registry
http:
addr: 0.0.0.0:5000
secret: mytokensecret
debug:
addr: localhost:5001

View file

@ -995,7 +995,7 @@ Content-Type: application/json; charset=utf-8
"tag": <tag>,
"fsLayers": [
{
"blobSum": <tarsum>
"blobSum": "<digest>"
},
...
]
@ -1126,7 +1126,7 @@ Content-Type: application/json; charset=utf-8
"tag": <tag>,
"fsLayers": [
{
"blobSum": <tarsum>
"blobSum": "<digest>"
},
...
]
@ -1248,7 +1248,7 @@ Content-Type: application/json; charset=utf-8
"code": "BLOB_UNKNOWN",
"message": "blob unknown to registry",
"detail": {
"digest": <tarsum>
"digest": "<digest>"
}
},
...
@ -1452,7 +1452,7 @@ The error codes that may be included in the response body are enumerated below:
### Blob
Fetch the blob identified by `name` and `digest`. Used to fetch layers by tarsum digest.
Fetch the blob identified by `name` and `digest`. Used to fetch layers by digest.
@ -1800,7 +1800,7 @@ Initiate a resumable blob upload. If successful, an upload location will be prov
##### Initiate Monolithic Blob Upload
```
POST /v2/<name>/blobs/uploads/?digest=<tarsum>
POST /v2/<name>/blobs/uploads/?digest=<digest>
Host: <registry host>
Authorization: <scheme> <token>
Content-Length: <length of blob>
@ -2347,7 +2347,7 @@ Complete the upload specified by `uuid`, optionally appending the body as the fi
```
PUT /v2/<name>/blobs/uploads/<uuid>?digest=<tarsum>
PUT /v2/<name>/blobs/uploads/<uuid>?digest=<digest>
Host: <registry host>
Authorization: <scheme> <token>
Content-Range: <start of range>-<end of range, inclusive>

View file

@ -53,6 +53,7 @@ type httpStatusListener interface {
func (hs *httpSink) Write(events ...Event) error {
hs.mu.Lock()
defer hs.mu.Unlock()
defer hs.client.Transport.(*headerRoundTripper).CloseIdleConnections()
if hs.closed {
return ErrSinkClosed
@ -83,6 +84,7 @@ func (hs *httpSink) Write(events ...Event) error {
return fmt.Errorf("%v: error posting: %v", hs, err)
}
defer resp.Body.Close()
// The notifier will treat any 2xx or 3xx response as accepted by the
// endpoint.

View file

@ -220,7 +220,7 @@ type retryingSink struct {
sink Sink
closed bool
// circuit breaker hueristics
// circuit breaker heuristics
failures struct {
threshold int
recent int
@ -317,7 +317,7 @@ func (rs *retryingSink) wait(backoff time.Duration) {
time.Sleep(backoff)
}
// reset marks a succesful call.
// reset marks a successful call.
func (rs *retryingSink) reset() {
rs.failures.recent = 0
rs.failures.last = time.Time{}
@ -330,7 +330,7 @@ func (rs *retryingSink) failure() {
}
// proceed returns true if the call should proceed based on circuit breaker
// hueristics.
// heuristics.
func (rs *retryingSink) proceed() bool {
return rs.failures.recent < rs.failures.threshold ||
time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff))

View file

@ -135,7 +135,7 @@ const (
"tag": <tag>,
"fsLayers": [
{
"blobSum": <tarsum>
"blobSum": "<digest>"
},
...
]
@ -606,7 +606,7 @@ var routeDescriptors = []RouteDescriptor{
"code": "BLOB_UNKNOWN",
"message": "blob unknown to registry",
"detail": {
"digest": <tarsum>
"digest": "<digest>"
}
},
...
@ -712,7 +712,7 @@ var routeDescriptors = []RouteDescriptor{
Name: RouteNameBlob,
Path: "/v2/{name:" + RepositoryNameRegexp.String() + "}/blobs/{digest:" + digest.DigestRegexp.String() + "}",
Entity: "Blob",
Description: "Fetch the blob identified by `name` and `digest`. Used to fetch layers by tarsum digest.",
Description: "Fetch the blob identified by `name` and `digest`. Used to fetch layers by digest.",
Methods: []MethodDescriptor{
{
@ -898,7 +898,7 @@ var routeDescriptors = []RouteDescriptor{
{
Name: "digest",
Type: "query",
Format: "<tarsum>",
Format: "<digest>",
Regexp: digest.DigestRegexp,
Description: `Digest of uploaded blob. If present, the upload will be completed, in a single request, with contents of the request body as the resulting blob.`,
},
@ -1173,7 +1173,7 @@ var routeDescriptors = []RouteDescriptor{
{
Name: "digest",
Type: "string",
Format: "<tarsum>",
Format: "<digest>",
Regexp: digest.DigestRegexp,
Required: true,
Description: `Digest of uploaded blob.`,

View file

@ -62,7 +62,12 @@ func NewURLBuilderFromRequest(r *http.Request) *URLBuilder {
host := r.Host
forwardedHost := r.Header.Get("X-Forwarded-Host")
if len(forwardedHost) > 0 {
host = forwardedHost
// According to the Apache mod_proxy docs, X-Forwarded-Host can be a
// comma-separated list of hosts, to which each proxy appends the
// requested host. We want to grab the first from this comma-separated
// list.
hosts := strings.SplitN(forwardedHost, ",", 2)
host = strings.TrimSpace(hosts[0])
}
basePath := routeDescriptorsMap[RouteNameBase].Path

View file

@ -151,6 +151,12 @@ func TestBuilderFromRequest(t *testing.T) {
forwardedProtoHeader := make(http.Header, 1)
forwardedProtoHeader.Set("X-Forwarded-Proto", "https")
forwardedHostHeader1 := make(http.Header, 1)
forwardedHostHeader1.Set("X-Forwarded-Host", "first.example.com")
forwardedHostHeader2 := make(http.Header, 1)
forwardedHostHeader2.Set("X-Forwarded-Host", "first.example.com, proxy1.example.com")
testRequests := []struct {
request *http.Request
base string
@ -163,6 +169,14 @@ func TestBuilderFromRequest(t *testing.T) {
request: &http.Request{URL: u, Host: u.Host, Header: forwardedProtoHeader},
base: "https://example.com",
},
{
request: &http.Request{URL: u, Host: u.Host, Header: forwardedHostHeader1},
base: "http://first.example.com",
},
{
request: &http.Request{URL: u, Host: u.Host, Header: forwardedHostHeader2},
base: "http://first.example.com",
},
}
for _, tr := range testRequests {

View file

@ -3,7 +3,7 @@
// An access controller has a simple interface with a single `Authorized`
// method which checks that a given request is authorized to perform one or
// more actions on one or more resources. This method should return a non-nil
// error if the requset is not authorized.
// error if the request is not authorized.
//
// An implementation registers its access controller by name with a constructor
// which accepts an options map for configuring the access controller.
@ -50,7 +50,7 @@ type Resource struct {
}
// Access describes a specific action that is
// requested or allowed for a given recource.
// requested or allowed for a given resource.
type Access struct {
Resource
Action string

View file

@ -7,7 +7,7 @@ import (
)
// joseBase64UrlEncode encodes the given data using the standard base64 url
// encoding format but with all trailing '=' characters ommitted in accordance
// encoding format but with all trailing '=' characters omitted in accordance
// with the jose specification.
// http://tools.ietf.org/html/draft-ietf-jose-json-web-signature-31#section-2
func joseBase64UrlEncode(b []byte) string {

View file

@ -93,7 +93,7 @@ func TestURLPrefix(t *testing.T) {
}
// TestLayerAPI conducts a full of the of the layer api.
// TestLayerAPI conducts a full test of the of the layer api.
func TestLayerAPI(t *testing.T) {
// TODO(stevvooe): This test code is complete junk but it should cover the
// complete flow. This must be broken down and checked against the
@ -246,6 +246,16 @@ func TestLayerAPI(t *testing.T) {
t.Fatalf("response body did not pass verification")
}
// ----------------
// Fetch the layer with an invalid digest
badURL := strings.Replace(layerURL, "tarsum", "trsum", 1)
resp, err = http.Get(badURL)
if err != nil {
t.Fatalf("unexpected error fetching layer: %v", err)
}
checkResponse(t, "fetching layer bad digest", resp, http.StatusBadRequest)
// Missing tests:
// - Upload the same tarsum file under and different repository and
// ensure the content remains uncorrupted.

View file

@ -81,7 +81,18 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
panic(err)
}
startUploadPurger(app.driver, ctxu.GetLogger(app))
purgeConfig := uploadPurgeDefaultConfig()
if mc, ok := configuration.Storage["maintenance"]; ok {
for k, v := range mc {
switch k {
case "uploadpurging":
purgeConfig = v.(map[interface{}]interface{})
}
}
}
startUploadPurger(app.driver, ctxu.GetLogger(app), purgeConfig)
app.driver, err = applyStorageMiddleware(app.driver, configuration.Middleware["storage"])
if err != nil {
@ -365,11 +376,25 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler {
// future refactoring.
w.WriteHeader(http.StatusBadRequest)
}
app.logError(context, context.Errors)
serveJSON(w, context.Errors)
}
})
}
func (app *App) logError(context context.Context, errors v2.Errors) {
for _, e := range errors.Errors {
c := ctxu.WithValue(context, "err.code", e.Code)
c = ctxu.WithValue(c, "err.message", e.Message)
c = ctxu.WithValue(c, "err.detail", e.Detail)
c = ctxu.WithLogger(c, ctxu.GetLogger(c,
"err.code",
"err.message",
"err.detail"))
ctxu.GetLogger(c).Errorf("An error occured")
}
}
// context constructs the context object for the application. This only be
// called once per request.
func (app *App) context(w http.ResponseWriter, r *http.Request) *Context {
@ -554,26 +579,82 @@ func applyStorageMiddleware(driver storagedriver.StorageDriver, middlewares []co
return driver, nil
}
// uploadPurgeDefaultConfig provides a default configuration for upload
// purging to be used in the absence of configuration in the
// confifuration file
func uploadPurgeDefaultConfig() map[interface{}]interface{} {
config := map[interface{}]interface{}{}
config["enabled"] = true
config["age"] = "168h"
config["interval"] = "24h"
config["dryrun"] = false
return config
}
func badPurgeUploadConfig(reason string) {
panic(fmt.Sprintf("Unable to parse upload purge configuration: %s", reason))
}
// startUploadPurger schedules a goroutine which will periodically
// check upload directories for old files and delete them
func startUploadPurger(storageDriver storagedriver.StorageDriver, log ctxu.Logger) {
rand.Seed(time.Now().Unix())
jitter := time.Duration(rand.Int()%60) * time.Minute
func startUploadPurger(storageDriver storagedriver.StorageDriver, log ctxu.Logger, config map[interface{}]interface{}) {
if config["enabled"] == false {
return
}
// Start with reasonable defaults
// TODO:(richardscothern) make configurable
purgeAge := time.Duration(7 * 24 * time.Hour)
timeBetweenPurges := time.Duration(1 * 24 * time.Hour)
var purgeAgeDuration time.Duration
var err error
purgeAge, ok := config["age"]
if ok {
ageStr, ok := purgeAge.(string)
if !ok {
badPurgeUploadConfig("age is not a string")
}
purgeAgeDuration, err = time.ParseDuration(ageStr)
if err != nil {
badPurgeUploadConfig(fmt.Sprintf("Cannot parse duration: %s", err.Error()))
}
} else {
badPurgeUploadConfig("age missing")
}
var intervalDuration time.Duration
interval, ok := config["interval"]
if ok {
intervalStr, ok := interval.(string)
if !ok {
badPurgeUploadConfig("interval is not a string")
}
intervalDuration, err = time.ParseDuration(intervalStr)
if err != nil {
badPurgeUploadConfig(fmt.Sprintf("Cannot parse interval: %s", err.Error()))
}
} else {
badPurgeUploadConfig("interval missing")
}
var dryRunBool bool
dryRun, ok := config["dryrun"]
if ok {
dryRunBool, ok = dryRun.(bool)
if !ok {
badPurgeUploadConfig("cannot parse dryrun")
}
} else {
badPurgeUploadConfig("dryrun missing")
}
go func() {
rand.Seed(time.Now().Unix())
jitter := time.Duration(rand.Int()%60) * time.Minute
log.Infof("Starting upload purge in %s", jitter)
time.Sleep(jitter)
for {
storage.PurgeUploads(storageDriver, time.Now().Add(-purgeAge), true)
log.Infof("Starting upload purge in %s", timeBetweenPurges)
time.Sleep(timeBetweenPurges)
storage.PurgeUploads(storageDriver, time.Now().Add(-purgeAgeDuration), !dryRunBool)
log.Infof("Starting upload purge in %s", intervalDuration)
time.Sleep(intervalDuration)
}
}()
}

View file

@ -198,7 +198,12 @@ func (luh *layerUploadHandler) PutLayerUploadComplete(w http.ResponseWriter, r *
// may miss a root cause.
// Read in the final chunk, if any.
io.Copy(luh.Upload, r.Body)
if _, err := io.Copy(luh.Upload, r.Body); err != nil {
ctxu.GetLogger(luh).Errorf("unknown error copying into upload: %v", err)
w.WriteHeader(http.StatusInternalServerError)
luh.Errors.Push(v2.ErrorCodeUnknown, err)
return
}
layer, err := luh.Upload.Finish(dgst)
if err != nil {

View file

@ -94,6 +94,9 @@ func New(accountName, accountKey, container, realm string) (*Driver, error) {
}
// Implement the storagedriver.StorageDriver interface.
func (d *driver) Name() string {
return driverName
}
// GetContent retrieves the content stored at "path" as a []byte.
func (d *driver) GetContent(path string) ([]byte, error) {

View file

@ -34,7 +34,7 @@
// }
//
// The type now implements StorageDriver, proxying through Base, without
// exporting an unnessecary field.
// exporting an unnecessary field.
package base
import (
@ -53,7 +53,7 @@ type Base struct {
// GetContent wraps GetContent of underlying storage driver.
func (base *Base) GetContent(path string) ([]byte, error) {
_, done := context.WithTrace(context.Background())
defer done("Base.GetContent")
defer done("%s.GetContent(%q)", base.Name(), path)
if !storagedriver.PathRegexp.MatchString(path) {
return nil, storagedriver.InvalidPathError{Path: path}
@ -65,7 +65,7 @@ func (base *Base) GetContent(path string) ([]byte, error) {
// PutContent wraps PutContent of underlying storage driver.
func (base *Base) PutContent(path string, content []byte) error {
_, done := context.WithTrace(context.Background())
defer done("Base.PutContent")
defer done("%s.PutContent(%q)", base.Name(), path)
if !storagedriver.PathRegexp.MatchString(path) {
return storagedriver.InvalidPathError{Path: path}
@ -77,7 +77,7 @@ func (base *Base) PutContent(path string, content []byte) error {
// ReadStream wraps ReadStream of underlying storage driver.
func (base *Base) ReadStream(path string, offset int64) (io.ReadCloser, error) {
_, done := context.WithTrace(context.Background())
defer done("Base.ReadStream")
defer done("%s.ReadStream(%q, %d)", base.Name(), path, offset)
if offset < 0 {
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
@ -93,7 +93,7 @@ func (base *Base) ReadStream(path string, offset int64) (io.ReadCloser, error) {
// WriteStream wraps WriteStream of underlying storage driver.
func (base *Base) WriteStream(path string, offset int64, reader io.Reader) (nn int64, err error) {
_, done := context.WithTrace(context.Background())
defer done("Base.WriteStream")
defer done("%s.WriteStream(%q, %d)", base.Name(), path, offset)
if offset < 0 {
return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
@ -109,7 +109,7 @@ func (base *Base) WriteStream(path string, offset int64, reader io.Reader) (nn i
// Stat wraps Stat of underlying storage driver.
func (base *Base) Stat(path string) (storagedriver.FileInfo, error) {
_, done := context.WithTrace(context.Background())
defer done("Base.Stat")
defer done("%s.Stat(%q)", base.Name(), path)
if !storagedriver.PathRegexp.MatchString(path) {
return nil, storagedriver.InvalidPathError{Path: path}
@ -121,7 +121,7 @@ func (base *Base) Stat(path string) (storagedriver.FileInfo, error) {
// List wraps List of underlying storage driver.
func (base *Base) List(path string) ([]string, error) {
_, done := context.WithTrace(context.Background())
defer done("Base.List")
defer done("%s.List(%q)", base.Name(), path)
if !storagedriver.PathRegexp.MatchString(path) && path != "/" {
return nil, storagedriver.InvalidPathError{Path: path}
@ -133,7 +133,7 @@ func (base *Base) List(path string) ([]string, error) {
// Move wraps Move of underlying storage driver.
func (base *Base) Move(sourcePath string, destPath string) error {
_, done := context.WithTrace(context.Background())
defer done("Base.Move")
defer done("%s.Move(%q, %q", base.Name(), sourcePath, destPath)
if !storagedriver.PathRegexp.MatchString(sourcePath) {
return storagedriver.InvalidPathError{Path: sourcePath}
@ -147,7 +147,7 @@ func (base *Base) Move(sourcePath string, destPath string) error {
// Delete wraps Delete of underlying storage driver.
func (base *Base) Delete(path string) error {
_, done := context.WithTrace(context.Background())
defer done("Base.Move")
defer done("%s.Delete(%q)", base.Name(), path)
if !storagedriver.PathRegexp.MatchString(path) {
return storagedriver.InvalidPathError{Path: path}
@ -159,7 +159,7 @@ func (base *Base) Delete(path string) error {
// URLFor wraps URLFor of underlying storage driver.
func (base *Base) URLFor(path string, options map[string]interface{}) (string, error) {
_, done := context.WithTrace(context.Background())
defer done("Base.URLFor")
defer done("%s.URLFor(%q)", base.Name(), path)
if !storagedriver.PathRegexp.MatchString(path) {
return "", storagedriver.InvalidPathError{Path: path}

View file

@ -71,6 +71,10 @@ func New(rootDirectory string) *Driver {
// Implement the storagedriver.StorageDriver interface
func (d *driver) Name() string {
return driverName
}
// GetContent retrieves the content stored at "path" as a []byte.
func (d *driver) GetContent(path string) ([]byte, error) {
rc, err := d.ReadStream(path, 0)

View file

@ -64,6 +64,10 @@ func New() *Driver {
// Implement the storagedriver.StorageDriver interface.
func (d *driver) Name() string {
return driverName
}
// GetContent retrieves the content stored at "path" as a []byte.
func (d *driver) GetContent(path string) ([]byte, error) {
d.mutex.RLock()

View file

@ -101,7 +101,7 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) {
}
case "ReadStream":
path, _ := request.Parameters["Path"].(string)
// Depending on serialization method, Offset may be convereted to any int/uint type
// Depending on serialization method, Offset may be converted to any int/uint type
offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int()
reader, err := driver.ReadStream(path, offset)
var response ReadStreamResponse
@ -116,9 +116,9 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) {
}
case "WriteStream":
path, _ := request.Parameters["Path"].(string)
// Depending on serialization method, Offset may be convereted to any int/uint type
// Depending on serialization method, Offset may be converted to any int/uint type
offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int()
// Depending on serialization method, Size may be convereted to any int/uint type
// Depending on serialization method, Size may be converted to any int/uint type
size := reflect.ValueOf(request.Parameters["Size"]).Convert(reflect.TypeOf(int64(0))).Int()
reader, _ := request.Parameters["Reader"].(io.ReadCloser)
err := driver.WriteStream(path, offset, size, reader)

View file

@ -20,12 +20,15 @@ import (
"io"
"io/ioutil"
"net/http"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/AdRoll/goamz/aws"
"github.com/AdRoll/goamz/s3"
"github.com/Sirupsen/logrus"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base"
"github.com/docker/distribution/registry/storage/driver/factory"
@ -72,6 +75,9 @@ type driver struct {
ChunkSize int64
Encrypt bool
RootDirectory string
pool sync.Pool // pool []byte buffers used for WriteStream
zeros []byte // shared, zero-valued buffer used for WriteStream
}
type baseEmbed struct {
@ -148,9 +154,23 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
chunkSize := int64(defaultChunkSize)
chunkSizeParam, ok := parameters["chunksize"]
if ok {
chunkSize, ok = chunkSizeParam.(int64)
if !ok || chunkSize < minChunkSize {
return nil, fmt.Errorf("The chunksize parameter should be a number that is larger than 5*1024*1024")
switch v := chunkSizeParam.(type) {
case string:
vv, err := strconv.ParseInt(v, 0, 64)
if err != nil {
return nil, fmt.Errorf("chunksize parameter must be an integer, %v invalid", chunkSizeParam)
}
chunkSize = vv
case int64:
chunkSize = v
case int, uint, int32, uint32, uint64:
chunkSize = reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int()
default:
return nil, fmt.Errorf("invalid valud for chunksize: %#v", chunkSizeParam)
}
if chunkSize < minChunkSize {
return nil, fmt.Errorf("The chunksize %#v parameter should be a number that is larger than or equal to %d", chunkSize, minChunkSize)
}
}
@ -224,6 +244,11 @@ func New(params DriverParameters) (*Driver, error) {
ChunkSize: params.ChunkSize,
Encrypt: params.Encrypt,
RootDirectory: params.RootDirectory,
zeros: make([]byte, params.ChunkSize),
}
d.pool.New = func() interface{} {
return make([]byte, d.ChunkSize)
}
return &Driver{
@ -237,6 +262,10 @@ func New(params DriverParameters) (*Driver, error) {
// Implement the storagedriver.StorageDriver interface
func (d *driver) Name() string {
return driverName
}
// GetContent retrieves the content stored at "path" as a []byte.
func (d *driver) GetContent(path string) ([]byte, error) {
content, err := d.Bucket.Get(d.s3Path(path))
@ -281,14 +310,14 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
var putErrChan chan error
parts := []s3.Part{}
var part s3.Part
done := make(chan struct{}) // stopgap to free up waiting goroutines
multi, err := d.Bucket.InitMulti(d.s3Path(path), d.getContentType(), getPermissions(), d.getOptions())
if err != nil {
return 0, err
}
buf := make([]byte, d.ChunkSize)
zeroBuf := make([]byte, d.ChunkSize)
buf := d.getbuf()
// We never want to leave a dangling multipart upload, our only consistent state is
// when there is a whole object at path. This is in order to remain consistent with
@ -314,6 +343,9 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
}
}
}
d.putbuf(buf) // needs to be here to pick up new buf value
close(done) // free up any waiting goroutines
}()
// Fills from 0 to total from current
@ -367,21 +399,77 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
}
go func(bytesRead int, from int64, buf []byte) {
// parts and partNumber are safe, because this function is the only one modifying them and we
// force it to be executed serially.
if bytesRead > 0 {
part, putErr := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]))
if putErr != nil {
putErrChan <- putErr
defer d.putbuf(buf) // this buffer gets dropped after this call
// DRAGONS(stevvooe): There are few things one might want to know
// about this section. First, the putErrChan is expecting an error
// and a nil or just a nil to come through the channel. This is
// covered by the silly defer below. The other aspect is the s3
// retry backoff to deal with RequestTimeout errors. Even though
// the underlying s3 library should handle it, it doesn't seem to
// be part of the shouldRetry function (see AdRoll/goamz/s3).
defer func() {
select {
case putErrChan <- nil: // for some reason, we do this no matter what.
case <-done:
return // ensure we don't leak the goroutine
}
}()
if bytesRead <= 0 {
return
}
var err error
var part s3.Part
loop:
for retries := 0; retries < 5; retries++ {
part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]))
if err == nil {
break // success!
}
// NOTE(stevvooe): This retry code tries to only retry under
// conditions where the s3 package does not. We may add s3
// error codes to the below if we see others bubble up in the
// application. Right now, the most troubling is
// RequestTimeout, which seems to only triggered when a tcp
// connection to s3 slows to a crawl. If the RequestTimeout
// ends up getting added to the s3 library and we don't see
// other errors, this retry loop can be removed.
switch err := err.(type) {
case *s3.Error:
switch err.Code {
case "RequestTimeout":
// allow retries on only this error.
default:
break loop
}
}
backoff := 100 * time.Millisecond * time.Duration(retries+1)
logrus.Errorf("error putting part, retrying after %v: %v", err, backoff.String())
time.Sleep(backoff)
}
if err != nil {
logrus.Errorf("error putting part, aborting: %v", err)
select {
case putErrChan <- err:
case <-done:
return // don't leak the goroutine
}
}
// parts and partNumber are safe, because this function is the
// only one modifying them and we force it to be executed
// serially.
parts = append(parts, part)
partNumber++
}
putErrChan <- nil
}(bytesRead, from, buf)
buf = make([]byte, d.ChunkSize)
buf = d.getbuf() // use a new buffer for the next call
return nil
}
@ -429,7 +517,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
fromZeroFillSmall := func(from, to int64) error {
bytesRead = 0
for from+int64(bytesRead) < to {
nn, err := bytes.NewReader(zeroBuf).Read(buf[from+int64(bytesRead) : to])
nn, err := bytes.NewReader(d.zeros).Read(buf[from+int64(bytesRead) : to])
bytesRead += nn
if err != nil {
return err
@ -443,7 +531,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
fromZeroFillLarge := func(from, to int64) error {
bytesRead64 := int64(0)
for to-(from+bytesRead64) >= d.ChunkSize {
part, err := multi.PutPart(int(partNumber), bytes.NewReader(zeroBuf))
part, err := multi.PutPart(int(partNumber), bytes.NewReader(d.zeros))
if err != nil {
return err
}
@ -724,3 +812,13 @@ func getPermissions() s3.ACL {
func (d *driver) getContentType() string {
return "application/octet-stream"
}
// getbuf returns a buffer from the driver's pool with length d.ChunkSize.
func (d *driver) getbuf() []byte {
return d.pool.Get().([]byte)
}
func (d *driver) putbuf(p []byte) {
copy(p, d.zeros)
d.pool.Put(p)
}

View file

@ -35,6 +35,11 @@ const CurrentVersion Version = "0.1"
// StorageDriver defines methods that a Storage Driver must implement for a
// filesystem-like key/value object storage.
type StorageDriver interface {
// Name returns the human-readable "name" of the driver, useful in error
// messages and logging. By convention, this will just be the registration
// name, but drivers may provide other information here.
Name() string
// GetContent retrieves the content stored at "path" as a []byte.
// This should primarily be used for small objects.
GetContent(path string) ([]byte, error)

View file

@ -435,7 +435,7 @@ func (suite *DriverSuite) testContinueStreamAppend(c *check.C, chunkSize int64)
c.Assert(err, check.IsNil)
c.Assert(received, check.DeepEquals, fullContents)
// Writing past size of file extends file (no offest error). We would like
// Writing past size of file extends file (no offset error). We would like
// to write chunk 4 one chunk length past chunk 3. It should be successful
// and the resulting file will be 5 chunks long, with a chunk of all
// zeros.

View file

@ -336,7 +336,7 @@ func seekerSize(seeker io.ReadSeeker) (int64, error) {
// createTestLayer creates a simple test layer in the provided driver under
// tarsum dgst, returning the sha256 digest location. This is implemented
// peicemeal and should probably be replaced by the uploader when it's ready.
// piecemeal and should probably be replaced by the uploader when it's ready.
func writeTestLayer(driver storagedriver.StorageDriver, pathMapper *pathMapper, name string, dgst digest.Digest, content io.Reader) (digest.Digest, error) {
h := sha256.New()
rd := io.TeeReader(content, h)

View file

@ -65,7 +65,7 @@ func (ls *layerStore) Upload() (distribution.LayerUpload, error) {
uuid := uuid.New()
startedAt := time.Now().UTC()
path, err := ls.repository.registry.pm.path(uploadDataPathSpec{
path, err := ls.repository.pm.path(uploadDataPathSpec{
name: ls.repository.Name(),
uuid: uuid,
})
@ -74,7 +74,7 @@ func (ls *layerStore) Upload() (distribution.LayerUpload, error) {
return nil, err
}
startedAtPath, err := ls.repository.registry.pm.path(uploadStartedAtPathSpec{
startedAtPath, err := ls.repository.pm.path(uploadStartedAtPathSpec{
name: ls.repository.Name(),
uuid: uuid,
})
@ -95,7 +95,7 @@ func (ls *layerStore) Upload() (distribution.LayerUpload, error) {
// state of the upload.
func (ls *layerStore) Resume(uuid string) (distribution.LayerUpload, error) {
ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Resume")
startedAtPath, err := ls.repository.registry.pm.path(uploadStartedAtPathSpec{
startedAtPath, err := ls.repository.pm.path(uploadStartedAtPathSpec{
name: ls.repository.Name(),
uuid: uuid,
})
@ -152,7 +152,7 @@ func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (di
func (ls *layerStore) path(dgst digest.Digest) (string, error) {
// We must traverse this path through the link to enforce ownership.
layerLinkPath, err := ls.repository.registry.pm.path(layerLinkPathSpec{name: ls.repository.Name(), digest: dgst})
layerLinkPath, err := ls.repository.pm.path(layerLinkPathSpec{name: ls.repository.Name(), digest: dgst})
if err != nil {
return "", err
}

View file

@ -46,16 +46,37 @@ func (lw *layerWriter) StartedAt() time.Time {
// uploaded layer. The final size and checksum are validated against the
// contents of the uploaded layer. The checksum should be provided in the
// format <algorithm>:<hex digest>.
func (lw *layerWriter) Finish(digest digest.Digest) (distribution.Layer, error) {
func (lw *layerWriter) Finish(dgst digest.Digest) (distribution.Layer, error) {
ctxu.GetLogger(lw.layerStore.repository.ctx).Debug("(*layerWriter).Finish")
if err := lw.bufferedFileWriter.Close(); err != nil {
return nil, err
}
canonical, err := lw.validateLayer(digest)
if err != nil {
var (
canonical digest.Digest
err error
)
// HACK(stevvooe): To deal with s3's lack of consistency, attempt to retry
// validation on failure. Three attempts are made, backing off
// retries*100ms each time.
for retries := 0; ; retries++ {
canonical, err = lw.validateLayer(dgst)
if err == nil {
break
}
ctxu.GetLoggerWithField(lw.layerStore.repository.ctx, "retries", retries).
Errorf("error validating layer: %v", err)
if retries < 3 {
time.Sleep(100 * time.Millisecond * time.Duration(retries+1))
continue
}
return nil, err
}
if err := lw.moveLayer(canonical); err != nil {
@ -64,7 +85,7 @@ func (lw *layerWriter) Finish(digest digest.Digest) (distribution.Layer, error)
}
// Link the layer blob into the repository.
if err := lw.linkLayer(canonical, digest); err != nil {
if err := lw.linkLayer(canonical, dgst); err != nil {
return nil, err
}
@ -137,7 +158,7 @@ type hashStateEntry struct {
// getStoredHashStates returns a slice of hashStateEntries for this upload.
func (lw *layerWriter) getStoredHashStates() ([]hashStateEntry, error) {
uploadHashStatePathPrefix, err := lw.layerStore.repository.registry.pm.path(uploadHashStatePathSpec{
uploadHashStatePathPrefix, err := lw.layerStore.repository.pm.path(uploadHashStatePathSpec{
name: lw.layerStore.repository.Name(),
uuid: lw.uuid,
alg: lw.resumableDigester.Digest().Algorithm(),
@ -182,7 +203,7 @@ func (lw *layerWriter) resumeHashAt(offset int64) error {
}
if offset == int64(lw.resumableDigester.Len()) {
// State of digester is already at the requseted offset.
// State of digester is already at the requested offset.
return nil
}
@ -250,7 +271,7 @@ func (lw *layerWriter) resumeHashAt(offset int64) error {
}
func (lw *layerWriter) storeHashState() error {
uploadHashStatePath, err := lw.layerStore.repository.registry.pm.path(uploadHashStatePathSpec{
uploadHashStatePath, err := lw.layerStore.repository.pm.path(uploadHashStatePathSpec{
name: lw.layerStore.repository.Name(),
uuid: lw.uuid,
alg: lw.resumableDigester.Digest().Algorithm(),
@ -324,6 +345,8 @@ func (lw *layerWriter) validateLayer(dgst digest.Digest) (digest.Digest, error)
}
if !verified {
ctxu.GetLoggerWithField(lw.layerStore.repository.ctx, "canonical", dgst).
Errorf("canonical digest does match provided digest")
return "", distribution.ErrLayerInvalidDigest{
Digest: dgst,
Reason: fmt.Errorf("content does not match digest"),
@ -337,7 +360,7 @@ func (lw *layerWriter) validateLayer(dgst digest.Digest) (digest.Digest, error)
// identified by dgst. The layer should be validated before commencing the
// move.
func (lw *layerWriter) moveLayer(dgst digest.Digest) error {
blobPath, err := lw.layerStore.repository.registry.pm.path(blobDataPathSpec{
blobPath, err := lw.layerStore.repository.pm.path(blobDataPathSpec{
digest: dgst,
})
@ -403,7 +426,7 @@ func (lw *layerWriter) linkLayer(canonical digest.Digest, aliases ...digest.Dige
}
seenDigests[dgst] = struct{}{}
layerLinkPath, err := lw.layerStore.repository.registry.pm.path(layerLinkPathSpec{
layerLinkPath, err := lw.layerStore.repository.pm.path(layerLinkPathSpec{
name: lw.layerStore.repository.Name(),
digest: dgst,
})
@ -412,7 +435,7 @@ func (lw *layerWriter) linkLayer(canonical digest.Digest, aliases ...digest.Dige
return err
}
if err := lw.layerStore.repository.registry.driver.PutContent(layerLinkPath, []byte(canonical)); err != nil {
if err := lw.layerStore.repository.driver.PutContent(layerLinkPath, []byte(canonical)); err != nil {
return err
}
}
@ -424,7 +447,7 @@ func (lw *layerWriter) linkLayer(canonical digest.Digest, aliases ...digest.Dige
// instance. An error will be returned if the clean up cannot proceed. If the
// resources are already not present, no error will be returned.
func (lw *layerWriter) removeResources() error {
dataPath, err := lw.layerStore.repository.registry.pm.path(uploadDataPathSpec{
dataPath, err := lw.layerStore.repository.pm.path(uploadDataPathSpec{
name: lw.layerStore.repository.Name(),
uuid: lw.uuid,
})

View file

@ -387,7 +387,7 @@ type layerLinkPathSpec struct {
func (layerLinkPathSpec) pathSpec() {}
// blobAlgorithmReplacer does some very simple path sanitization for user
// input. Mostly, this is to provide some heirachry for tarsum digests. Paths
// input. Mostly, this is to provide some hierarchy for tarsum digests. Paths
// should be "safe" before getting this far due to strict digest requirements
// but we can add further path conversion here, if needed.
var blobAlgorithmReplacer = strings.NewReplacer(