diff --git a/api_test.go b/api_test.go index 6e8c403c..2298d3b7 100644 --- a/api_test.go +++ b/api_test.go @@ -459,6 +459,8 @@ func pushLayer(t *testing.T, ub *v2.URLBuilder, name string, dgst digest.Digest, } u.RawQuery = url.Values{ + "_state": u.Query()["_state"], + "digest": []string{dgst.String()}, // TODO(stevvooe): Layer upload can be completed with and without size diff --git a/app.go b/app.go index d8276cec..1790b3ae 100644 --- a/app.go +++ b/app.go @@ -29,6 +29,8 @@ type App struct { // services contains the main services instance for the application. services *storage.Services + tokenProvider tokenProvider + accessController auth.AccessController } @@ -62,6 +64,7 @@ func NewApp(configuration configuration.Configuration) *App { app.driver = driver app.services = storage.NewServices(app.driver) + app.tokenProvider = newHMACTokenProvider(configuration.Cluster.Secret) authType := configuration.Auth.Type() diff --git a/configuration/configuration.go b/configuration/configuration.go index 6ac64147..9d991dfa 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -33,6 +33,12 @@ type Configuration struct { // Addr specifies the bind address for the registry instance. Addr string `yaml:"addr"` } `yaml:"http"` + + // Cluster contains configuration parameters for clustering the registry. + Cluster struct { + // Secret specifies the secret key which HMAC tokens are created with. + Secret string `yaml:"secret"` + } `yaml:"cluster"` } // v0_1Configuration is a Version 0.1 Configuration struct diff --git a/layerupload.go b/layerupload.go index 49b10789..865bf12c 100644 --- a/layerupload.go +++ b/layerupload.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "net/http" + "net/url" "strconv" "github.com/Sirupsen/logrus" @@ -32,9 +33,17 @@ func layerUploadDispatcher(ctx *Context, r *http.Request) http.Handler { if luh.UUID != "" { luh.log = luh.log.WithField("uuid", luh.UUID) - layers := ctx.services.Layers() - upload, err := layers.Resume(luh.UUID) + state, err := ctx.tokenProvider.LayerUploadStateFromToken(r.FormValue("_state")) + if err != nil { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + logrus.Infof("error resolving upload: %v", err) + w.WriteHeader(http.StatusInternalServerError) + luh.Errors.Push(v2.ErrorCodeUnknown, err) + }) + } + layers := ctx.services.Layers() + upload, err := layers.Resume(state) if err != nil && err != storage.ErrLayerUploadUnknown { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { logrus.Infof("error resolving upload: %v", err) @@ -162,7 +171,14 @@ func (luh *layerUploadHandler) CancelLayerUpload(w http.ResponseWriter, r *http. // chunk responses. This sets the correct headers but the response status is // left to the caller. func (luh *layerUploadHandler) layerUploadResponse(w http.ResponseWriter, r *http.Request) error { - uploadURL, err := luh.urlBuilder.BuildBlobUploadChunkURL(luh.Upload.Name(), luh.Upload.UUID()) + values := make(url.Values) + stateToken, err := luh.Context.tokenProvider.LayerUploadStateToToken(storage.LayerUploadState{Name: luh.Upload.Name(), UUID: luh.Upload.UUID(), Offset: luh.Upload.Offset()}) + if err != nil { + logrus.Infof("error building upload state token: %s", err) + return err + } + values.Set("_state", stateToken) + uploadURL, err := luh.urlBuilder.BuildBlobUploadChunkURL(luh.Upload.Name(), luh.Upload.UUID(), values) if err != nil { logrus.Infof("error building upload url: %s", err) return err diff --git a/storage/layer_test.go b/storage/layer_test.go index 166d803a..be5ac57d 100644 --- a/storage/layer_test.go +++ b/storage/layer_test.go @@ -58,7 +58,7 @@ func TestSimpleLayerUpload(t *testing.T) { } // Do a resume, get unknown upload - layerUpload, err = ls.Resume(layerUpload.UUID()) + layerUpload, err = ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()}) if err != ErrLayerUploadUnknown { t.Fatalf("unexpected error resuming upload, should be unkown: %v", err) } @@ -90,7 +90,7 @@ func TestSimpleLayerUpload(t *testing.T) { layerUpload.Close() // Do a resume, for good fun - layerUpload, err = ls.Resume(layerUpload.UUID()) + layerUpload, err = ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()}) if err != nil { t.Fatalf("unexpected error resuming upload: %v", err) } @@ -103,7 +103,7 @@ func TestSimpleLayerUpload(t *testing.T) { } // After finishing an upload, it should no longer exist. - if _, err := ls.Resume(layerUpload.UUID()); err != ErrLayerUploadUnknown { + if _, err := ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()}); err != ErrLayerUploadUnknown { t.Fatalf("expected layer upload to be unknown, got %v", err) } diff --git a/storage/layerstore.go b/storage/layerstore.go index d945b767..42bd0f4f 100644 --- a/storage/layerstore.go +++ b/storage/layerstore.go @@ -76,8 +76,8 @@ func (ls *layerStore) Upload(name string) (LayerUpload, error) { // Resume continues an in progress layer upload, returning the current // state of the upload. -func (ls *layerStore) Resume(uuid string) (LayerUpload, error) { - lus, err := ls.uploadStore.GetState(uuid) +func (ls *layerStore) Resume(lus LayerUploadState) (LayerUpload, error) { + _, err := ls.uploadStore.GetState(lus.UUID) if err != nil { return nil, err diff --git a/storage/layerupload.go b/storage/layerupload.go index 4c991d7c..47c12501 100644 --- a/storage/layerupload.go +++ b/storage/layerupload.go @@ -1,8 +1,8 @@ package storage import ( - "encoding/json" "fmt" + "io" "io/ioutil" "os" "path/filepath" @@ -13,8 +13,6 @@ import ( "github.com/docker/distribution/manifest" "github.com/docker/distribution/storagedriver" "github.com/docker/docker/pkg/tarsum" - - "io" ) // LayerUploadState captures the state serializable state of the layer upload. @@ -61,7 +59,6 @@ type layerUploadStore interface { New(name string) (LayerUploadState, error) Open(uuid string) (layerFile, error) GetState(uuid string) (LayerUploadState, error) - SaveState(lus LayerUploadState) error DeleteState(uuid string) error } @@ -171,11 +168,6 @@ func (luc *layerUploadController) Write(p []byte) (int, error) { luc.LayerUploadState.Offset += int64(n) - if err := luc.uploadStore.SaveState(luc.LayerUploadState); err != nil { - // TODO(stevvooe): This failure case may require more thought. - return n, err - } - return n, err } @@ -384,10 +376,6 @@ func (llufs *localFSLayerUploadStore) New(name string) (LayerUploadState, error) return lus, err } - if err := llufs.SaveState(lus); err != nil { - return lus, err - } - return lus, nil } @@ -402,43 +390,18 @@ func (llufs *localFSLayerUploadStore) Open(uuid string) (layerFile, error) { } func (llufs *localFSLayerUploadStore) GetState(uuid string) (LayerUploadState, error) { - // TODO(stevvoe): Storing this state on the local file system is an - // intermediate stop gap. This technique is unlikely to handle any kind of - // concurrency very well. - var lus LayerUploadState - fp, err := os.Open(llufs.path(uuid, "state.json")) - if err != nil { + + if _, err := os.Stat(llufs.path(uuid, "")); err != nil { if os.IsNotExist(err) { return lus, ErrLayerUploadUnknown } return lus, err } - defer fp.Close() - - dec := json.NewDecoder(fp) - if err := dec.Decode(&lus); err != nil { - return lus, err - } - return lus, nil } -func (llufs *localFSLayerUploadStore) SaveState(lus LayerUploadState) error { - p, err := json.Marshal(lus) - if err != nil { - return err - } - - err = ioutil.WriteFile(llufs.path(lus.UUID, "state.json"), p, 0644) - if os.IsNotExist(err) { - return ErrLayerUploadUnknown - } - - return err -} - func (llufs *localFSLayerUploadStore) DeleteState(uuid string) error { if err := os.RemoveAll(llufs.path(uuid, "")); err != nil { if os.IsNotExist(err) { diff --git a/storage/manifeststore_test.go b/storage/manifeststore_test.go index a6a00aa1..991028e5 100644 --- a/storage/manifeststore_test.go +++ b/storage/manifeststore_test.go @@ -153,6 +153,6 @@ func (mockedExistenceLayerService) Upload(name string) (LayerUpload, error) { panic("not implemented") } -func (mockedExistenceLayerService) Resume(uuid string) (LayerUpload, error) { +func (mockedExistenceLayerService) Resume(lus LayerUploadState) (LayerUpload, error) { panic("not implemented") } diff --git a/storage/services.go b/storage/services.go index a6025581..15008f84 100644 --- a/storage/services.go +++ b/storage/services.go @@ -83,5 +83,5 @@ type LayerService interface { // Resume continues an in progress layer upload, returning the current // state of the upload. - Resume(uuid string) (LayerUpload, error) + Resume(layerUploadState LayerUploadState) (LayerUpload, error) } diff --git a/tokens.go b/tokens.go new file mode 100644 index 00000000..46caf81a --- /dev/null +++ b/tokens.go @@ -0,0 +1,65 @@ +package registry + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/base64" + "encoding/json" + "fmt" + + "github.com/docker/distribution/storage" +) + +// tokenProvider contains methods for serializing and deserializing state from token strings. +type tokenProvider interface { + // LayerUploadStateFromToken retrieves the LayerUploadState for a given state token. + LayerUploadStateFromToken(stateToken string) (storage.LayerUploadState, error) + + // LayerUploadStateToToken returns a token string representing the given LayerUploadState. + LayerUploadStateToToken(layerUploadState storage.LayerUploadState) (string, error) +} + +type hmacTokenProvider struct { + secret string +} + +func newHMACTokenProvider(secret string) tokenProvider { + return &hmacTokenProvider{secret: secret} +} + +// LayerUploadStateFromToken deserializes the given HMAC stateToken and validates the prefix HMAC +func (ts *hmacTokenProvider) LayerUploadStateFromToken(stateToken string) (storage.LayerUploadState, error) { + var lus storage.LayerUploadState + + tokenBytes, err := base64.URLEncoding.DecodeString(stateToken) + if err != nil { + return lus, err + } + mac := hmac.New(sha256.New, []byte(ts.secret)) + + if len(tokenBytes) < mac.Size() { + return lus, fmt.Errorf("Invalid token") + } + + macBytes := tokenBytes[:mac.Size()] + messageBytes := tokenBytes[mac.Size():] + + mac.Write(messageBytes) + if !hmac.Equal(mac.Sum(nil), macBytes) { + return lus, fmt.Errorf("Invalid token") + } + + if err := json.Unmarshal(messageBytes, &lus); err != nil { + return lus, err + } + + return lus, nil +} + +// LayerUploadStateToToken serializes the given LayerUploadState to JSON with an HMAC prepended +func (ts *hmacTokenProvider) LayerUploadStateToToken(lus storage.LayerUploadState) (string, error) { + mac := hmac.New(sha256.New, []byte(ts.secret)) + stateJSON := fmt.Sprintf("{\"Name\": \"%s\", \"UUID\": \"%s\", \"Offset\": %d}", lus.Name, lus.UUID, lus.Offset) + mac.Write([]byte(stateJSON)) + return base64.URLEncoding.EncodeToString(append(mac.Sum(nil), stateJSON...)), nil +}