registry: Refactor requestfactory to use http.RoundTrippers

This patch removes the need for requestFactories and decorators
by implementing http.RoundTripper transports instead.

It refactors some challenging-to-read code.

NewSession now takes an *http.Client that can already have a
custom Transport, it will add its own auth transport by wrapping
it.

The idea is that callers of http.Client should not bother
setting custom headers for every handler but instead it should
be transparent to the callers of a same context.

This patch is needed for future refactorings of registry,
namely refactoring of the v1 client code.

Signed-off-by: Tibor Vass <tibor@docker.com>
This commit is contained in:
Tibor Vass 2015-05-14 07:12:54 -07:00
parent f13f3a774f
commit 89bd48481c
9 changed files with 373 additions and 353 deletions

View file

@ -3,6 +3,7 @@ package registry
import (
"bytes"
"crypto/sha256"
"errors"
// this is required for some certificates
_ "crypto/sha512"
"encoding/hex"
@ -20,64 +21,105 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/docker/cliconfig"
"github.com/docker/docker/pkg/httputils"
"github.com/docker/docker/pkg/requestdecorator"
"github.com/docker/docker/pkg/tarsum"
)
type Session struct {
authConfig *cliconfig.AuthConfig
reqFactory *requestdecorator.RequestFactory
indexEndpoint *Endpoint
jar *cookiejar.Jar
timeout TimeoutType
client *http.Client
// TODO(tiborvass): remove authConfig
authConfig *cliconfig.AuthConfig
}
func NewSession(authConfig *cliconfig.AuthConfig, factory *requestdecorator.RequestFactory, endpoint *Endpoint, timeout bool) (r *Session, err error) {
r = &Session{
authConfig: authConfig,
indexEndpoint: endpoint,
// authTransport handles the auth layer when communicating with a v1 registry (private or official)
//
// For private v1 registries, set alwaysSetBasicAuth to true.
//
// For the official v1 registry, if there isn't already an Authorization header in the request,
// but there is an X-Docker-Token header set to true, then Basic Auth will be used to set the Authorization header.
// After sending the request with the provided base http.RoundTripper, if an X-Docker-Token header, representing
// a token, is present in the response, then it gets cached and sent in the Authorization header of all subsequent
// requests.
//
// If the server sends a token without the client having requested it, it is ignored.
//
// This RoundTripper also has a CancelRequest method important for correct timeout handling.
type authTransport struct {
http.RoundTripper
*cliconfig.AuthConfig
alwaysSetBasicAuth bool
token []string
}
func (tr *authTransport) RoundTrip(req *http.Request) (*http.Response, error) {
req = cloneRequest(req)
if tr.alwaysSetBasicAuth {
req.SetBasicAuth(tr.Username, tr.Password)
return tr.RoundTripper.RoundTrip(req)
}
if timeout {
r.timeout = ReceiveTimeout
}
var askedForToken bool
r.jar, err = cookiejar.New(nil)
// Don't override
if req.Header.Get("Authorization") == "" {
if req.Header.Get("X-Docker-Token") == "true" {
req.SetBasicAuth(tr.Username, tr.Password)
askedForToken = true
} else if len(tr.token) > 0 {
req.Header.Set("Authorization", "Token "+strings.Join(tr.token, ","))
}
}
resp, err := tr.RoundTripper.RoundTrip(req)
if err != nil {
return nil, err
}
if askedForToken && len(resp.Header["X-Docker-Token"]) > 0 {
tr.token = resp.Header["X-Docker-Token"]
}
return resp, nil
}
// TODO(tiborvass): remove authConfig param once registry client v2 is vendored
func NewSession(client *http.Client, authConfig *cliconfig.AuthConfig, endpoint *Endpoint) (r *Session, err error) {
r = &Session{
authConfig: authConfig,
client: client,
indexEndpoint: endpoint,
}
var alwaysSetBasicAuth bool
// If we're working with a standalone private registry over HTTPS, send Basic Auth headers
// alongside our requests.
if r.indexEndpoint.VersionString(1) != IndexServerAddress() && r.indexEndpoint.URL.Scheme == "https" {
info, err := r.indexEndpoint.Ping()
// alongside all our requests.
if endpoint.VersionString(1) != IndexServerAddress() && endpoint.URL.Scheme == "https" {
info, err := endpoint.Ping()
if err != nil {
return nil, err
}
if info.Standalone && authConfig != nil && factory != nil {
logrus.Debugf("Endpoint %s is eligible for private registry. Enabling decorator.", r.indexEndpoint.String())
dec := requestdecorator.NewAuthDecorator(authConfig.Username, authConfig.Password)
factory.AddDecorator(dec)
if info.Standalone && authConfig != nil {
logrus.Debugf("Endpoint %s is eligible for private registry. Enabling decorator.", endpoint.String())
alwaysSetBasicAuth = true
}
}
r.reqFactory = factory
return r, nil
}
client.Transport = &authTransport{RoundTripper: client.Transport, AuthConfig: authConfig, alwaysSetBasicAuth: alwaysSetBasicAuth}
func (r *Session) doRequest(req *http.Request) (*http.Response, *http.Client, error) {
return doRequest(req, r.jar, r.timeout, r.indexEndpoint.IsSecure)
jar, err := cookiejar.New(nil)
if err != nil {
return nil, errors.New("cookiejar.New is not supposed to return an error")
}
client.Jar = jar
return r, nil
}
// Retrieve the history of a given image from the Registry.
// Return a list of the parent's json (requested image included)
func (r *Session) GetRemoteHistory(imgID, registry string, token []string) ([]string, error) {
req, err := r.reqFactory.NewRequest("GET", registry+"images/"+imgID+"/ancestry", nil)
if err != nil {
return nil, err
}
setTokenAuth(req, token)
res, _, err := r.doRequest(req)
func (r *Session) GetRemoteHistory(imgID, registry string) ([]string, error) {
res, err := r.client.Get(registry + "images/" + imgID + "/ancestry")
if err != nil {
return nil, err
}
@ -89,27 +131,18 @@ func (r *Session) GetRemoteHistory(imgID, registry string, token []string) ([]st
return nil, httputils.NewHTTPRequestError(fmt.Sprintf("Server error: %d trying to fetch remote history for %s", res.StatusCode, imgID), res)
}
jsonString, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("Error while reading the http response: %s", err)
var history []string
if err := json.NewDecoder(res.Body).Decode(&history); err != nil {
return nil, fmt.Errorf("Error while reading the http response: %v", err)
}
logrus.Debugf("Ancestry: %s", jsonString)
history := new([]string)
if err := json.Unmarshal(jsonString, history); err != nil {
return nil, err
}
return *history, nil
logrus.Debugf("Ancestry: %v", history)
return history, nil
}
// Check if an image exists in the Registry
func (r *Session) LookupRemoteImage(imgID, registry string, token []string) error {
req, err := r.reqFactory.NewRequest("GET", registry+"images/"+imgID+"/json", nil)
if err != nil {
return err
}
setTokenAuth(req, token)
res, _, err := r.doRequest(req)
func (r *Session) LookupRemoteImage(imgID, registry string) error {
res, err := r.client.Get(registry + "images/" + imgID + "/json")
if err != nil {
return err
}
@ -121,14 +154,8 @@ func (r *Session) LookupRemoteImage(imgID, registry string, token []string) erro
}
// Retrieve an image from the Registry.
func (r *Session) GetRemoteImageJSON(imgID, registry string, token []string) ([]byte, int, error) {
// Get the JSON
req, err := r.reqFactory.NewRequest("GET", registry+"images/"+imgID+"/json", nil)
if err != nil {
return nil, -1, fmt.Errorf("Failed to download json: %s", err)
}
setTokenAuth(req, token)
res, _, err := r.doRequest(req)
func (r *Session) GetRemoteImageJSON(imgID, registry string) ([]byte, int, error) {
res, err := r.client.Get(registry + "images/" + imgID + "/json")
if err != nil {
return nil, -1, fmt.Errorf("Failed to download json: %s", err)
}
@ -147,44 +174,44 @@ func (r *Session) GetRemoteImageJSON(imgID, registry string, token []string) ([]
jsonString, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, -1, fmt.Errorf("Failed to parse downloaded json: %s (%s)", err, jsonString)
return nil, -1, fmt.Errorf("Failed to parse downloaded json: %v (%s)", err, jsonString)
}
return jsonString, imageSize, nil
}
func (r *Session) GetRemoteImageLayer(imgID, registry string, token []string, imgSize int64) (io.ReadCloser, error) {
func (r *Session) GetRemoteImageLayer(imgID, registry string, imgSize int64) (io.ReadCloser, error) {
var (
retries = 5
statusCode = 0
client *http.Client
res *http.Response
err error
imageURL = fmt.Sprintf("%simages/%s/layer", registry, imgID)
)
req, err := r.reqFactory.NewRequest("GET", imageURL, nil)
req, err := http.NewRequest("GET", imageURL, nil)
if err != nil {
return nil, fmt.Errorf("Error while getting from the server: %s\n", err)
return nil, fmt.Errorf("Error while getting from the server: %v", err)
}
setTokenAuth(req, token)
// TODO: why are we doing retries at this level?
// These retries should be generic to both v1 and v2
for i := 1; i <= retries; i++ {
statusCode = 0
res, client, err = r.doRequest(req)
if err != nil {
logrus.Debugf("Error contacting registry: %s", err)
if res != nil {
if res.Body != nil {
res.Body.Close()
}
statusCode = res.StatusCode
}
if i == retries {
return nil, fmt.Errorf("Server error: Status %d while fetching image layer (%s)",
statusCode, imgID)
}
time.Sleep(time.Duration(i) * 5 * time.Second)
continue
res, err = r.client.Do(req)
if err == nil {
break
}
break
logrus.Debugf("Error contacting registry %s: %v", registry, err)
if res != nil {
if res.Body != nil {
res.Body.Close()
}
statusCode = res.StatusCode
}
if i == retries {
return nil, fmt.Errorf("Server error: Status %d while fetching image layer (%s)",
statusCode, imgID)
}
time.Sleep(time.Duration(i) * 5 * time.Second)
}
if res.StatusCode != 200 {
@ -195,13 +222,13 @@ func (r *Session) GetRemoteImageLayer(imgID, registry string, token []string, im
if res.Header.Get("Accept-Ranges") == "bytes" && imgSize > 0 {
logrus.Debugf("server supports resume")
return httputils.ResumableRequestReaderWithInitialResponse(client, req, 5, imgSize, res), nil
return httputils.ResumableRequestReaderWithInitialResponse(r.client, req, 5, imgSize, res), nil
}
logrus.Debugf("server doesn't support resume")
return res.Body, nil
}
func (r *Session) GetRemoteTags(registries []string, repository string, token []string) (map[string]string, error) {
func (r *Session) GetRemoteTags(registries []string, repository string) (map[string]string, error) {
if strings.Count(repository, "/") == 0 {
// This will be removed once the Registry supports auto-resolution on
// the "library" namespace
@ -209,13 +236,7 @@ func (r *Session) GetRemoteTags(registries []string, repository string, token []
}
for _, host := range registries {
endpoint := fmt.Sprintf("%srepositories/%s/tags", host, repository)
req, err := r.reqFactory.NewRequest("GET", endpoint, nil)
if err != nil {
return nil, err
}
setTokenAuth(req, token)
res, _, err := r.doRequest(req)
res, err := r.client.Get(endpoint)
if err != nil {
return nil, err
}
@ -263,16 +284,13 @@ func (r *Session) GetRepositoryData(remote string) (*RepositoryData, error) {
logrus.Debugf("[registry] Calling GET %s", repositoryTarget)
req, err := r.reqFactory.NewRequest("GET", repositoryTarget, nil)
req, err := http.NewRequest("GET", repositoryTarget, nil)
if err != nil {
return nil, err
}
if r.authConfig != nil && len(r.authConfig.Username) > 0 {
req.SetBasicAuth(r.authConfig.Username, r.authConfig.Password)
}
// this will set basic auth in r.client.Transport and send cached X-Docker-Token headers for all subsequent requests
req.Header.Set("X-Docker-Token", "true")
res, _, err := r.doRequest(req)
res, err := r.client.Do(req)
if err != nil {
return nil, err
}
@ -292,11 +310,6 @@ func (r *Session) GetRepositoryData(remote string) (*RepositoryData, error) {
return nil, httputils.NewHTTPRequestError(fmt.Sprintf("Error: Status %d trying to pull repository %s: %q", res.StatusCode, remote, errBody), res)
}
var tokens []string
if res.Header.Get("X-Docker-Token") != "" {
tokens = res.Header["X-Docker-Token"]
}
var endpoints []string
if res.Header.Get("X-Docker-Endpoints") != "" {
endpoints, err = buildEndpointsList(res.Header["X-Docker-Endpoints"], r.indexEndpoint.VersionString(1))
@ -322,29 +335,29 @@ func (r *Session) GetRepositoryData(remote string) (*RepositoryData, error) {
return &RepositoryData{
ImgList: imgsData,
Endpoints: endpoints,
Tokens: tokens,
}, nil
}
func (r *Session) PushImageChecksumRegistry(imgData *ImgData, registry string, token []string) error {
func (r *Session) PushImageChecksumRegistry(imgData *ImgData, registry string) error {
logrus.Debugf("[registry] Calling PUT %s", registry+"images/"+imgData.ID+"/checksum")
u := registry + "images/" + imgData.ID + "/checksum"
req, err := r.reqFactory.NewRequest("PUT", registry+"images/"+imgData.ID+"/checksum", nil)
logrus.Debugf("[registry] Calling PUT %s", u)
req, err := http.NewRequest("PUT", u, nil)
if err != nil {
return err
}
setTokenAuth(req, token)
req.Header.Set("X-Docker-Checksum", imgData.Checksum)
req.Header.Set("X-Docker-Checksum-Payload", imgData.ChecksumPayload)
res, _, err := r.doRequest(req)
res, err := r.client.Do(req)
if err != nil {
return fmt.Errorf("Failed to upload metadata: %s", err)
return fmt.Errorf("Failed to upload metadata: %v", err)
}
defer res.Body.Close()
if len(res.Cookies()) > 0 {
r.jar.SetCookies(req.URL, res.Cookies())
r.client.Jar.SetCookies(req.URL, res.Cookies())
}
if res.StatusCode != 200 {
errBody, err := ioutil.ReadAll(res.Body)
@ -363,18 +376,19 @@ func (r *Session) PushImageChecksumRegistry(imgData *ImgData, registry string, t
}
// Push a local image to the registry
func (r *Session) PushImageJSONRegistry(imgData *ImgData, jsonRaw []byte, registry string, token []string) error {
func (r *Session) PushImageJSONRegistry(imgData *ImgData, jsonRaw []byte, registry string) error {
logrus.Debugf("[registry] Calling PUT %s", registry+"images/"+imgData.ID+"/json")
u := registry + "images/" + imgData.ID + "/json"
req, err := r.reqFactory.NewRequest("PUT", registry+"images/"+imgData.ID+"/json", bytes.NewReader(jsonRaw))
logrus.Debugf("[registry] Calling PUT %s", u)
req, err := http.NewRequest("PUT", u, bytes.NewReader(jsonRaw))
if err != nil {
return err
}
req.Header.Add("Content-type", "application/json")
setTokenAuth(req, token)
res, _, err := r.doRequest(req)
res, err := r.client.Do(req)
if err != nil {
return fmt.Errorf("Failed to upload metadata: %s", err)
}
@ -398,9 +412,11 @@ func (r *Session) PushImageJSONRegistry(imgData *ImgData, jsonRaw []byte, regist
return nil
}
func (r *Session) PushImageLayerRegistry(imgID string, layer io.Reader, registry string, token []string, jsonRaw []byte) (checksum string, checksumPayload string, err error) {
func (r *Session) PushImageLayerRegistry(imgID string, layer io.Reader, registry string, jsonRaw []byte) (checksum string, checksumPayload string, err error) {
logrus.Debugf("[registry] Calling PUT %s", registry+"images/"+imgID+"/layer")
u := registry + "images/" + imgID + "/layer"
logrus.Debugf("[registry] Calling PUT %s", u)
tarsumLayer, err := tarsum.NewTarSum(layer, false, tarsum.Version0)
if err != nil {
@ -411,17 +427,16 @@ func (r *Session) PushImageLayerRegistry(imgID string, layer io.Reader, registry
h.Write([]byte{'\n'})
checksumLayer := io.TeeReader(tarsumLayer, h)
req, err := r.reqFactory.NewRequest("PUT", registry+"images/"+imgID+"/layer", checksumLayer)
req, err := http.NewRequest("PUT", u, checksumLayer)
if err != nil {
return "", "", err
}
req.Header.Add("Content-Type", "application/octet-stream")
req.ContentLength = -1
req.TransferEncoding = []string{"chunked"}
setTokenAuth(req, token)
res, _, err := r.doRequest(req)
res, err := r.client.Do(req)
if err != nil {
return "", "", fmt.Errorf("Failed to upload layer: %s", err)
return "", "", fmt.Errorf("Failed to upload layer: %v", err)
}
if rc, ok := layer.(io.Closer); ok {
if err := rc.Close(); err != nil {
@ -444,19 +459,18 @@ func (r *Session) PushImageLayerRegistry(imgID string, layer io.Reader, registry
// push a tag on the registry.
// Remote has the format '<user>/<repo>
func (r *Session) PushRegistryTag(remote, revision, tag, registry string, token []string) error {
func (r *Session) PushRegistryTag(remote, revision, tag, registry string) error {
// "jsonify" the string
revision = "\"" + revision + "\""
path := fmt.Sprintf("repositories/%s/tags/%s", remote, tag)
req, err := r.reqFactory.NewRequest("PUT", registry+path, strings.NewReader(revision))
req, err := http.NewRequest("PUT", registry+path, strings.NewReader(revision))
if err != nil {
return err
}
req.Header.Add("Content-type", "application/json")
setTokenAuth(req, token)
req.ContentLength = int64(len(revision))
res, _, err := r.doRequest(req)
res, err := r.client.Do(req)
if err != nil {
return err
}
@ -491,7 +505,8 @@ func (r *Session) PushImageJSONIndex(remote string, imgList []*ImgData, validate
logrus.Debugf("[registry] PUT %s", u)
logrus.Debugf("Image list pushed to index:\n%s", imgListJSON)
headers := map[string][]string{
"Content-type": {"application/json"},
"Content-type": {"application/json"},
// this will set basic auth in r.client.Transport and send cached X-Docker-Token headers for all subsequent requests
"X-Docker-Token": {"true"},
}
if validate {
@ -526,9 +541,6 @@ func (r *Session) PushImageJSONIndex(remote string, imgList []*ImgData, validate
}
return nil, httputils.NewHTTPRequestError(fmt.Sprintf("Error: Status %d trying to push repository %s: %q", res.StatusCode, remote, errBody), res)
}
if res.Header.Get("X-Docker-Token") == "" {
return nil, fmt.Errorf("Index response didn't contain an access token")
}
tokens = res.Header["X-Docker-Token"]
logrus.Debugf("Auth token: %v", tokens)
@ -539,8 +551,7 @@ func (r *Session) PushImageJSONIndex(remote string, imgList []*ImgData, validate
if err != nil {
return nil, err
}
}
if validate {
} else {
if res.StatusCode != 204 {
errBody, err := ioutil.ReadAll(res.Body)
if err != nil {
@ -551,22 +562,20 @@ func (r *Session) PushImageJSONIndex(remote string, imgList []*ImgData, validate
}
return &RepositoryData{
Tokens: tokens,
Endpoints: endpoints,
}, nil
}
func (r *Session) putImageRequest(u string, headers map[string][]string, body []byte) (*http.Response, error) {
req, err := r.reqFactory.NewRequest("PUT", u, bytes.NewReader(body))
req, err := http.NewRequest("PUT", u, bytes.NewReader(body))
if err != nil {
return nil, err
}
req.SetBasicAuth(r.authConfig.Username, r.authConfig.Password)
req.ContentLength = int64(len(body))
for k, v := range headers {
req.Header[k] = v
}
response, _, err := r.doRequest(req)
response, err := r.client.Do(req)
if err != nil {
return nil, err
}
@ -580,15 +589,7 @@ func shouldRedirect(response *http.Response) bool {
func (r *Session) SearchRepositories(term string) (*SearchResults, error) {
logrus.Debugf("Index server: %s", r.indexEndpoint)
u := r.indexEndpoint.VersionString(1) + "search?q=" + url.QueryEscape(term)
req, err := r.reqFactory.NewRequest("GET", u, nil)
if err != nil {
return nil, err
}
if r.authConfig != nil && len(r.authConfig.Username) > 0 {
req.SetBasicAuth(r.authConfig.Username, r.authConfig.Password)
}
req.Header.Set("X-Docker-Token", "true")
res, _, err := r.doRequest(req)
res, err := r.client.Get(u)
if err != nil {
return nil, err
}
@ -600,6 +601,7 @@ func (r *Session) SearchRepositories(term string) (*SearchResults, error) {
return result, json.NewDecoder(res.Body).Decode(result)
}
// TODO(tiborvass): remove this once registry client v2 is vendored
func (r *Session) GetAuthConfig(withPasswd bool) *cliconfig.AuthConfig {
password := ""
if withPasswd {
@ -611,9 +613,3 @@ func (r *Session) GetAuthConfig(withPasswd bool) *cliconfig.AuthConfig {
Email: r.authConfig.Email,
}
}
func setTokenAuth(req *http.Request, token []string) {
if req.Header.Get("Authorization") == "" { // Don't override
req.Header.Set("Authorization", "Token "+strings.Join(token, ","))
}
}