Modify blob upload API
- Ensures new uploads and resumed upload statuses always return an offset of 0. This allows future clients which support resumable upload to not attempt resumable upload on this version which does not support it. - Add PATCH support for streaming data on upload. - Add messaging to specification that PATCH with content range is currently not supported. - Update PUT blob to only support full data or no data, no more last chunk messaging as it was not supported. closes #470 Signed-off-by: Derek McGowan <derek@mcgstyle.net> (github: dmcgowan)
This commit is contained in:
parent
7f3a57fdbb
commit
123546212c
3 changed files with 215 additions and 57 deletions
|
@ -209,6 +209,13 @@ func TestLayerAPI(t *testing.T) {
|
|||
uploadURLBase, uploadUUID = startPushLayer(t, env.builder, imageName)
|
||||
pushLayer(t, env.builder, imageName, layerDigest, uploadURLBase, layerFile)
|
||||
|
||||
// ------------------------------------------
|
||||
// Now, push just a chunk
|
||||
layerFile.Seek(0, 0)
|
||||
|
||||
uploadURLBase, uploadUUID = startPushLayer(t, env.builder, imageName)
|
||||
uploadURLBase, dgst := pushChunk(t, env.builder, imageName, uploadURLBase, layerFile, layerLength)
|
||||
finishUpload(t, env.builder, imageName, uploadURLBase, dgst)
|
||||
// ------------------------
|
||||
// Use a head request to see if the layer exists.
|
||||
resp, err = http.Head(layerURL)
|
||||
|
@ -616,6 +623,75 @@ func pushLayer(t *testing.T, ub *v2.URLBuilder, name string, dgst digest.Digest,
|
|||
return resp.Header.Get("Location")
|
||||
}
|
||||
|
||||
func finishUpload(t *testing.T, ub *v2.URLBuilder, name string, uploadURLBase string, dgst digest.Digest) string {
|
||||
resp, err := doPushLayer(t, ub, name, dgst, uploadURLBase, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error doing push layer request: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
checkResponse(t, "putting monolithic chunk", resp, http.StatusCreated)
|
||||
|
||||
expectedLayerURL, err := ub.BuildBlobURL(name, dgst)
|
||||
if err != nil {
|
||||
t.Fatalf("error building expected layer url: %v", err)
|
||||
}
|
||||
|
||||
checkHeaders(t, resp, http.Header{
|
||||
"Location": []string{expectedLayerURL},
|
||||
"Content-Length": []string{"0"},
|
||||
"Docker-Content-Digest": []string{dgst.String()},
|
||||
})
|
||||
|
||||
return resp.Header.Get("Location")
|
||||
}
|
||||
|
||||
func doPushChunk(t *testing.T, uploadURLBase string, body io.Reader) (*http.Response, digest.Digest, error) {
|
||||
u, err := url.Parse(uploadURLBase)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error parsing pushLayer url: %v", err)
|
||||
}
|
||||
|
||||
u.RawQuery = url.Values{
|
||||
"_state": u.Query()["_state"],
|
||||
}.Encode()
|
||||
|
||||
uploadURL := u.String()
|
||||
|
||||
digester := digest.NewCanonicalDigester()
|
||||
|
||||
req, err := http.NewRequest("PATCH", uploadURL, io.TeeReader(body, digester))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating new request: %v", err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/octet-stream")
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
|
||||
return resp, digester.Digest(), err
|
||||
}
|
||||
|
||||
func pushChunk(t *testing.T, ub *v2.URLBuilder, name string, uploadURLBase string, body io.Reader, length int64) (string, digest.Digest) {
|
||||
resp, dgst, err := doPushChunk(t, uploadURLBase, body)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error doing push layer request: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
checkResponse(t, "putting chunk", resp, http.StatusAccepted)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("error generating sha256 digest of body")
|
||||
}
|
||||
|
||||
checkHeaders(t, resp, http.Header{
|
||||
"Range": []string{fmt.Sprintf("0-%d", length-1)},
|
||||
"Content-Length": []string{"0"},
|
||||
})
|
||||
|
||||
return resp.Header.Get("Location"), dgst
|
||||
}
|
||||
|
||||
func checkResponse(t *testing.T, msg string, resp *http.Response, expectedStatus int) {
|
||||
if resp.StatusCode != expectedStatus {
|
||||
t.Logf("unexpected status %s: %v != %v", msg, resp.StatusCode, expectedStatus)
|
||||
|
|
|
@ -23,11 +23,10 @@ func layerUploadDispatcher(ctx *Context, r *http.Request) http.Handler {
|
|||
}
|
||||
|
||||
handler := http.Handler(handlers.MethodHandler{
|
||||
"POST": http.HandlerFunc(luh.StartLayerUpload),
|
||||
"GET": http.HandlerFunc(luh.GetUploadStatus),
|
||||
"HEAD": http.HandlerFunc(luh.GetUploadStatus),
|
||||
// TODO(stevvooe): Must implement patch support.
|
||||
// "PATCH": http.HandlerFunc(luh.PutLayerChunk),
|
||||
"POST": http.HandlerFunc(luh.StartLayerUpload),
|
||||
"GET": http.HandlerFunc(luh.GetUploadStatus),
|
||||
"HEAD": http.HandlerFunc(luh.GetUploadStatus),
|
||||
"PATCH": http.HandlerFunc(luh.PatchLayerData),
|
||||
"PUT": http.HandlerFunc(luh.PutLayerUploadComplete),
|
||||
"DELETE": http.HandlerFunc(luh.CancelLayerUpload),
|
||||
})
|
||||
|
@ -133,7 +132,7 @@ func (luh *layerUploadHandler) StartLayerUpload(w http.ResponseWriter, r *http.R
|
|||
luh.Upload = upload
|
||||
defer luh.Upload.Close()
|
||||
|
||||
if err := luh.layerUploadResponse(w, r); err != nil {
|
||||
if err := luh.layerUploadResponse(w, r, true); err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError) // Error conditions here?
|
||||
luh.Errors.Push(v2.ErrorCodeUnknown, err)
|
||||
return
|
||||
|
@ -151,7 +150,10 @@ func (luh *layerUploadHandler) GetUploadStatus(w http.ResponseWriter, r *http.Re
|
|||
return
|
||||
}
|
||||
|
||||
if err := luh.layerUploadResponse(w, r); err != nil {
|
||||
// TODO(dmcgowan): Set last argument to false in layerUploadResponse when
|
||||
// resumable upload is supported. This will enable returning a non-zero
|
||||
// range for clients to begin uploading at an offset.
|
||||
if err := luh.layerUploadResponse(w, r, true); err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError) // Error conditions here?
|
||||
luh.Errors.Push(v2.ErrorCodeUnknown, err)
|
||||
return
|
||||
|
@ -161,11 +163,45 @@ func (luh *layerUploadHandler) GetUploadStatus(w http.ResponseWriter, r *http.Re
|
|||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// PutLayerUploadComplete takes the final request of a layer upload. The final
|
||||
// chunk may include all the layer data, the final chunk of layer data or no
|
||||
// layer data. Any data provided is received and verified. If successful, the
|
||||
// layer is linked into the blob store and 201 Created is returned with the
|
||||
// canonical url of the layer.
|
||||
// PatchLayerData writes data to an upload.
|
||||
func (luh *layerUploadHandler) PatchLayerData(w http.ResponseWriter, r *http.Request) {
|
||||
if luh.Upload == nil {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
luh.Errors.Push(v2.ErrorCodeBlobUploadUnknown)
|
||||
return
|
||||
}
|
||||
|
||||
ct := r.Header.Get("Content-Type")
|
||||
if ct != "" && ct != "application/octet-stream" {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
// TODO(dmcgowan): encode error
|
||||
return
|
||||
}
|
||||
|
||||
// TODO(dmcgowan): support Content-Range header to seek and write range
|
||||
|
||||
// Copy the data
|
||||
if _, err := io.Copy(luh.Upload, r.Body); err != nil {
|
||||
ctxu.GetLogger(luh).Errorf("unknown error copying into upload: %v", err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
luh.Errors.Push(v2.ErrorCodeUnknown, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := luh.layerUploadResponse(w, r, false); err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError) // Error conditions here?
|
||||
luh.Errors.Push(v2.ErrorCodeUnknown, err)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
}
|
||||
|
||||
// PutLayerUploadComplete takes the final request of a layer upload. The
|
||||
// request may include all the layer data or no layer data. Any data
|
||||
// provided is received and verified. If successful, the layer is linked
|
||||
// into the blob store and 201 Created is returned with the canonical
|
||||
// url of the layer.
|
||||
func (luh *layerUploadHandler) PutLayerUploadComplete(w http.ResponseWriter, r *http.Request) {
|
||||
if luh.Upload == nil {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
|
@ -190,14 +226,11 @@ func (luh *layerUploadHandler) PutLayerUploadComplete(w http.ResponseWriter, r *
|
|||
return
|
||||
}
|
||||
|
||||
// TODO(stevvooe): Check the incoming range header here, per the
|
||||
// specification. LayerUpload should be seeked (sought?) to that position.
|
||||
|
||||
// TODO(stevvooe): Consider checking the error on this copy.
|
||||
// Theoretically, problems should be detected during verification but we
|
||||
// may miss a root cause.
|
||||
|
||||
// Read in the final chunk, if any.
|
||||
// Read in the data, if any.
|
||||
if _, err := io.Copy(luh.Upload, r.Body); err != nil {
|
||||
ctxu.GetLogger(luh).Errorf("unknown error copying into upload: %v", err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
|
@ -260,13 +293,19 @@ func (luh *layerUploadHandler) CancelLayerUpload(w http.ResponseWriter, r *http.
|
|||
|
||||
// layerUploadResponse provides a standard request for uploading layers and
|
||||
// chunk responses. This sets the correct headers but the response status is
|
||||
// left to the caller.
|
||||
func (luh *layerUploadHandler) layerUploadResponse(w http.ResponseWriter, r *http.Request) error {
|
||||
// left to the caller. The fresh argument is used to ensure that new layer
|
||||
// uploads always start at a 0 offset. This allows disabling resumable push
|
||||
// by always returning a 0 offset on check status.
|
||||
func (luh *layerUploadHandler) layerUploadResponse(w http.ResponseWriter, r *http.Request, fresh bool) error {
|
||||
|
||||
offset, err := luh.Upload.Seek(0, os.SEEK_CUR)
|
||||
if err != nil {
|
||||
ctxu.GetLogger(luh).Errorf("unable get current offset of layer upload: %v", err)
|
||||
return err
|
||||
var offset int64
|
||||
if !fresh {
|
||||
var err error
|
||||
offset, err = luh.Upload.Seek(0, os.SEEK_CUR)
|
||||
if err != nil {
|
||||
ctxu.GetLogger(luh).Errorf("unable get current offset of layer upload: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(stevvooe): Need a better way to manage the upload state automatically.
|
||||
|
@ -291,10 +330,15 @@ func (luh *layerUploadHandler) layerUploadResponse(w http.ResponseWriter, r *htt
|
|||
return err
|
||||
}
|
||||
|
||||
endRange := offset
|
||||
if endRange > 0 {
|
||||
endRange = endRange - 1
|
||||
}
|
||||
|
||||
w.Header().Set("Docker-Upload-UUID", luh.UUID)
|
||||
w.Header().Set("Location", uploadURL)
|
||||
w.Header().Set("Content-Length", "0")
|
||||
w.Header().Set("Range", fmt.Sprintf("0-%d", luh.State.Offset))
|
||||
w.Header().Set("Range", fmt.Sprintf("0-%d", endRange))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue