Add feature flag to force all direct download URLs to be proxied

Fixes #1667
This commit is contained in:
Joseph Schorr 2016-08-24 12:55:33 -04:00
parent 2b00c644b5
commit dd2e086a20
12 changed files with 350 additions and 34 deletions

3
app.py
View file

@ -173,7 +173,8 @@ login_manager = LoginManager(app)
mail = Mail(app)
prometheus = PrometheusPlugin(app)
metric_queue = MetricQueue(prometheus)
storage = Storage(app, metric_queue)
instance_keys = InstanceKeys(app)
storage = Storage(app, metric_queue, instance_keys)
userfiles = Userfiles(app, storage)
log_archive = LogArchive(app, storage)
analytics = Analytics(app)

View file

@ -11,7 +11,7 @@ from .auth_context import set_grant_context, get_grant_context
from .permissions import repository_read_grant, repository_write_grant
from util.names import parse_namespace_repository
from util.http import abort
from util.security.registry_jwt import (ANONYMOUS_SUB, decode_bearer_token,
from util.security.registry_jwt import (ANONYMOUS_SUB, decode_bearer_header,
InvalidBearerTokenException)
from data import model
@ -136,15 +136,15 @@ def get_auth_headers(repository=None, scopes=None):
return headers
def identity_from_bearer_token(bearer_token):
""" Process a bearer token and return the loaded identity, or raise InvalidJWTException if an
def identity_from_bearer_token(bearer_header):
""" Process a bearer header and return the loaded identity, or raise InvalidJWTException if an
identity could not be loaded. Expects tokens and grants in the format of the Docker registry
v2 auth spec: https://docs.docker.com/registry/spec/auth/token/
"""
logger.debug('Validating auth header: %s', bearer_token)
logger.debug('Validating auth header: %s', bearer_header)
try:
payload = decode_bearer_token(bearer_token, instance_keys)
payload = decode_bearer_header(bearer_header, instance_keys)
except InvalidBearerTokenException as bte:
logger.exception('Invalid bearer token: %s', bte)
raise InvalidJWTException(bte)

View file

@ -30,6 +30,36 @@ location /realtime {
proxy_request_buffering off;
}
location ~ ^/_storage_proxy/([^/]+)/([^/]+)/([^/]+)/(.+) {
auth_request /_storage_proxy_auth;
resolver 8.8.8.8;
proxy_pass $2://$3/$4$is_args$args;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $3;
add_header Host $3;
proxy_buffering off;
proxy_request_buffering off;
proxy_read_timeout 60s;
}
location = /_storage_proxy_auth {
proxy_pass http://web_app_server;
proxy_pass_request_body off;
proxy_set_header Content-Length "";
proxy_set_header X-Original-URI $request_uri;
proxy_read_timeout 10;
}
# At the begining and end of a push/pull, (/v1/repositories|/v2/auth/) is hit by the Docker
# client. By rate-limiting just this endpoint, we can avoid accidentally
# blocking pulls/pushes for images with many layers.

View file

@ -211,6 +211,9 @@ class DefaultConfig(object):
# Feature Flag: Whether to require invitations when adding a user to a team.
FEATURE_REQUIRE_TEAM_INVITE = True
# Feature Flag: Whether to proxy all direct download URLs in storage via the registry's nginx.
FEATURE_PROXY_STORAGE = False
# The namespace to use for library repositories.
# Note: This must remain 'library' until Docker removes their hard-coded namespace for libraries.
# See: https://github.com/docker/docker/blob/master/registry/session.go#L320

View file

@ -3,7 +3,9 @@ from storage.cloud import S3Storage, GoogleCloudStorage, RadosGWStorage
from storage.fakestorage import FakeStorage
from storage.distributedstorage import DistributedStorage
from storage.swift import SwiftStorage
from storage.downloadproxy import DownloadProxy
from urlparse import urlparse, parse_qs
STORAGE_DRIVER_CLASSES = {
'LocalStorage': LocalStorage,
@ -23,14 +25,14 @@ def get_storage_driver(metric_queue, storage_params):
class Storage(object):
def __init__(self, app=None, metric_queue=None):
def __init__(self, app=None, metric_queue=None, instance_keys=None):
self.app = app
if app is not None and metric_queue is not None:
self.state = self.init_app(app, metric_queue)
if app is not None:
self.state = self.init_app(app, metric_queue, instance_keys)
else:
self.state = None
def init_app(self, app, metric_queue):
def init_app(self, app, metric_queue, instance_keys):
storages = {}
for location, storage_params in app.config.get('DISTRIBUTED_STORAGE_CONFIG').items():
storages[location] = get_storage_driver(metric_queue, storage_params)
@ -40,7 +42,12 @@ class Storage(object):
preference = storages.keys()
default_locations = app.config.get('DISTRIBUTED_STORAGE_DEFAULT_LOCATIONS') or []
d_storage = DistributedStorage(storages, preference, default_locations)
download_proxy = None
if app.config.get('FEATURE_PROXY_STORAGE', False) and instance_keys is not None:
download_proxy = DownloadProxy(app, instance_keys)
d_storage = DistributedStorage(storages, preference, default_locations, download_proxy)
# register extension with app
app.extensions = getattr(app, 'extensions', {})

View file

@ -5,10 +5,8 @@ from functools import wraps
from storage.basestorage import StoragePaths, BaseStorage, BaseStorageV2
logger = logging.getLogger(__name__)
def _location_aware(unbound_func):
@wraps(unbound_func)
def wrapper(self, locations, *args, **kwargs):
@ -27,17 +25,19 @@ def _location_aware(unbound_func):
class DistributedStorage(StoragePaths):
def __init__(self, storages, preferred_locations=None, default_locations=None):
def __init__(self, storages, preferred_locations=None, default_locations=None, proxy=None):
self._storages = dict(storages)
self.preferred_locations = list(preferred_locations or [])
self.default_locations = list(default_locations or [])
self.proxy = proxy
@property
def locations(self):
""" Returns the names of the locations supported. """
return list(self._storages.keys())
get_direct_download_url = _location_aware(BaseStorage.get_direct_download_url)
_get_direct_download_url = _location_aware(BaseStorage.get_direct_download_url)
get_direct_upload_url = _location_aware(BaseStorage.get_direct_upload_url)
get_content = _location_aware(BaseStorage.get_content)
put_content = _location_aware(BaseStorage.put_content)
@ -55,6 +55,19 @@ class DistributedStorage(StoragePaths):
complete_chunked_upload = _location_aware(BaseStorageV2.complete_chunked_upload)
cancel_chunked_upload = _location_aware(BaseStorageV2.cancel_chunked_upload)
def get_direct_download_url(self, locations, path, expires_in=600, requires_cors=False,
head=False):
download_url = self._get_direct_download_url(locations, path, expires_in, requires_cors, head)
if download_url is None:
return None
if self.proxy is None:
return download_url
return self.proxy.proxy_download_url(download_url)
def copy_between(self, path, source_location, destination_location):
""" Copies a file between the source location and the destination location. """
source_storage = self._storages[source_location]

167
storage/downloadproxy.py Normal file
View file

@ -0,0 +1,167 @@
import urllib
from urlparse import urlparse, parse_qs
from util.security.registry_jwt import (generate_bearer_token, decode_bearer_token,
InvalidBearerTokenException)
from flask import abort, request
from jsonschema import validate, ValidationError
import logging
logger = logging.getLogger(__name__)
PROXY_STORAGE_MAX_LIFETIME_S = 30 # Seconds
STORAGE_PROXY_SUBJECT = 'storageproxy'
STORAGE_PROXY_ACCESS_TYPE = 'storageproxy'
ACCESS_SCHEMA = {
'type': 'array',
'description': 'List of access granted to the subject',
'items': {
'type': 'object',
'required': [
'type',
'scheme',
'host',
'uri',
],
'properties': {
'type': {
'type': 'string',
'description': 'We only allow storage proxy permissions',
'enum': [
'storageproxy',
],
},
'scheme': {
'type': 'string',
'description': 'The scheme for the storage URL being proxied'
},
'host': {
'type': 'string',
'description': 'The hostname for the storage URL being proxied'
},
'uri': {
'type': 'string',
'description': 'The URI path for the storage URL being proxied'
},
},
},
}
class DownloadProxy(object):
""" Helper class to enable proxying of direct download URLs for storage via the registry's
local NGINX.
"""
def __init__(self, app, instance_keys):
self.app = app
self.instance_keys = instance_keys
app.add_url_rule('/_storage_proxy_auth', '_storage_proxy_auth', self._validate_proxy_url)
def proxy_download_url(self, download_url):
""" Returns a URL to proxy the specified blob download URL.
"""
# Parse the URL to be downloaded into its components (host, path, scheme).
parsed = urlparse(download_url)
path = parsed.path
if parsed.query:
path = path + '?' + parsed.query
if path.startswith('/'):
path = path[1:]
access = {
'type': STORAGE_PROXY_ACCESS_TYPE,
'uri': path,
'host': parsed.netloc,
'scheme': parsed.scheme,
}
# Generate a JWT that signs access to this URL. This JWT will be passed back to the registry
# code when the download commences. Note that we don't add any context here, as it isn't
# needed.
server_hostname = self.app.config['SERVER_HOSTNAME']
token = generate_bearer_token(server_hostname, STORAGE_PROXY_SUBJECT, {}, [access],
PROXY_STORAGE_MAX_LIFETIME_S, self.instance_keys)
url_scheme = self.app.config['PREFERRED_URL_SCHEME']
server_hostname = self.app.config['SERVER_HOSTNAME']
# The proxy path is of the form:
# http(s)://registry_server/_storage_proxy/{token}/{scheme}/{hostname}/rest/of/path/here
encoded_token = urllib.quote(token)
proxy_url = '%s://%s/_storage_proxy/%s/%s/%s/%s' % (url_scheme, server_hostname, encoded_token,
parsed.scheme, parsed.netloc, path)
logger.debug('Proxying via URL %s', proxy_url)
return proxy_url
def _validate_proxy_url(self):
original_uri = request.headers.get('X-Original-URI', None)
if not original_uri:
logger.error('Missing original URI: %s', request.headers)
abort(401)
if not original_uri.startswith('/_storage_proxy/'):
logger.error('Unknown storage proxy path: %s', original_uri)
abort(401)
# The proxy path is of the form:
# /_storage_proxy/{token}/{scheme}/{hostname}/rest/of/path/here
without_prefix = original_uri[len('/_storage_proxy/'):]
parts = without_prefix.split('/', 3)
if len(parts) != 4:
logger.error('Invalid storage proxy path (found %s parts): %s', len(parts), without_prefix)
abort(401)
encoded_token, scheme, host, uri = parts
token = urllib.unquote(encoded_token)
logger.debug('Got token %s for storage proxy auth request %s with parts %s', token,
original_uri, parts)
# Decode the bearer token.
try:
decoded = decode_bearer_token(token, self.instance_keys)
except InvalidBearerTokenException:
logger.exception('Invalid token for storage proxy')
abort(401)
# Ensure it is for the proxy.
if decoded['sub'] != STORAGE_PROXY_SUBJECT:
logger.exception('Invalid subject %s for storage proxy auth', decoded['subject'])
abort(401)
# Validate that the access matches the token format.
access = decoded.get('access', {})
try:
validate(access, ACCESS_SCHEMA)
except ValidationError:
logger.exception('We should not be minting invalid credentials: %s', access)
abort(401)
# For now, we only expect a single access credential.
if len(access) != 1:
logger.exception('We should not be minting invalid credentials: %s', access)
abort(401)
# Ensure the signed access matches the requested URL's pieces.
granted_access = access[0]
if granted_access['scheme'] != scheme:
logger.exception('Mismatch in scheme. %s expected, %s found', granted_access['scheme'],
scheme)
abort(401)
if granted_access['host'] != host:
logger.exception('Mismatch in host. %s expected, %s found', granted_access['host'], host)
abort(401)
if granted_access['uri'] != uri:
logger.exception('Mismatch in uri. %s expected, %s found', granted_access['uri'], uri)
abort(401)
return 'OK'

View file

@ -18,7 +18,7 @@ class FakeStorage(BaseStorageV2):
def get_direct_download_url(self, path, expires_in=60, requires_cors=False, head=False):
try:
if self.get_content('supports_direct_download') == 'true':
return 'http://somefakeurl'
return 'http://somefakeurl?goes=here'
except:
pass

View file

@ -28,7 +28,7 @@ from initdb import wipe_database, initialize_database, populate_database
from endpoints.csrf import generate_csrf_token
from tempfile import NamedTemporaryFile
from jsonschema import validate as validate_schema
from util.security.registry_jwt import decode_bearer_token
from util.security.registry_jwt import decode_bearer_header
import endpoints.decorated
import json
@ -1543,7 +1543,7 @@ class TorrentTestMixin(V2RegistryPullMixin):
contents = bencode.bdecode(torrent)
# Ensure that there is a webseed.
self.assertEquals(contents['url-list'], 'http://somefakeurl')
self.assertEquals(contents['url-list'], 'http://somefakeurl?goes=here')
# Ensure there is an announce and some pieces.
self.assertIsNotNone(contents.get('info', {}).get('pieces'))
@ -1887,7 +1887,7 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC
contents = bencode.bdecode(response.content)
# Ensure that there is a webseed.
self.assertEquals(contents['url-list'], 'http://somefakeurl')
self.assertEquals(contents['url-list'], 'http://somefakeurl?goes=here')
# Ensure there is an announce and some pieces.
self.assertIsNotNone(contents.get('info', {}).get('pieces'))
@ -1956,9 +1956,9 @@ class V2LoginTests(V2RegistryLoginMixin, LoginTests, RegistryTestCaseMixin, Base
# Validate the returned token.
encoded = response.json()['token']
token = 'Bearer ' + encoded
header = 'Bearer ' + encoded
payload = decode_bearer_token(token, instance_keys)
payload = decode_bearer_header(header, instance_keys)
self.assertIsNotNone(payload)
if scope is None:

View file

@ -10,7 +10,7 @@ from endpoints.v2.v2auth import TOKEN_VALIDITY_LIFETIME_S
from auth.registry_jwt_auth import identity_from_bearer_token, InvalidJWTException
from util.morecollections import AttrDict
from util.security.registry_jwt import (ANONYMOUS_SUB, build_context_and_subject,
decode_bearer_token, generate_bearer_token)
generate_bearer_token)
TEST_AUDIENCE = app.config['SERVER_HOSTNAME']

89
test/test_storageproxy.py Normal file
View file

@ -0,0 +1,89 @@
import unittest
import requests
import os
from flask import Flask
from flask.ext.testing import LiveServerTestCase
from initdb import setup_database_for_testing, finished_database_for_testing
from util.security.instancekeys import InstanceKeys
from storage import Storage
_PORT_NUMBER = 5001
class TestStorageProxy(LiveServerTestCase):
def setUp(self):
setup_database_for_testing(self)
def tearDown(self):
finished_database_for_testing(self)
def create_app(self):
global _PORT_NUMBER
_PORT_NUMBER = _PORT_NUMBER + 1
self.test_app = Flask('teststorageproxy')
self.test_app.config['LIVESERVER_PORT'] = _PORT_NUMBER
if os.environ.get('DEBUG') == 'true':
self.test_app.config['DEBUG'] = True
self.test_app.config['TESTING'] = True
self.test_app.config['SERVER_HOSTNAME'] = 'localhost:%s' % _PORT_NUMBER
self.test_app.config['INSTANCE_SERVICE_KEY_KID_LOCATION'] = 'test/data/test.kid'
self.test_app.config['INSTANCE_SERVICE_KEY_LOCATION'] = 'test/data/test.pem'
self.test_app.config['INSTANCE_SERVICE_KEY_SERVICE'] = 'quay'
# UGH... Such a stupid hack!
self.test_app.config['FEATURE_PROXY_STORAGE'] = self.id().find('notinstalled') < 0
self.test_app.config['DISTRIBUTED_STORAGE_CONFIG'] = {
'test': ['FakeStorage', {}],
}
instance_keys = InstanceKeys(self.test_app)
self.storage = Storage(self.test_app, instance_keys=instance_keys)
self.test_app.config['DISTRIBUTED_STORAGE_PREFERENCE'] = ['test']
return self.test_app
def test_storage_proxy_auth_notinstalled(self):
# Active direct download on the fake storage.
self.storage.put_content(['test'], 'supports_direct_download', 'true')
# Get the unwrapped URL.
direct_download_url = self.storage.get_direct_download_url(['test'], 'somepath')
self.assertEquals(-1, direct_download_url.find('/_storage_proxy/'))
# Ensure that auth returns 404.
headers = {
'X-Original-URI': 'someurihere'
}
resp = requests.get('http://%s/_storage_proxy_auth' % self.test_app.config['SERVER_HOSTNAME'],
headers=headers)
self.assertEquals(404, resp.status_code)
def test_storage_proxy_auth(self):
# Active direct download on the fake storage.
self.storage.put_content(['test'], 'supports_direct_download', 'true')
# Get the wrapped URL.
direct_download_url = self.storage.get_direct_download_url(['test'], 'somepath')
# Ensure it refers to the storage proxy.
proxy_index = direct_download_url.find('/_storage_proxy/')
self.assertTrue(proxy_index > 0)
# Ensure that auth returns 200 for the URL pieces.
headers = {
'X-Original-URI': direct_download_url[proxy_index:]
}
resp = requests.get('http://%s/_storage_proxy_auth' % self.test_app.config['SERVER_HOSTNAME'],
headers=headers)
self.assertEquals(200, resp.status_code)
if __name__ == '__main__':
unittest.main()

View file

@ -18,6 +18,21 @@ class InvalidBearerTokenException(Exception):
pass
def decode_bearer_header(bearer_header, instance_keys):
""" decode_bearer_header decodes the given bearer header that contains an encoded JWT with both
a Key ID as well as the signed JWT and returns the decoded and validated JWT. On any error,
raises an InvalidBearerTokenException with the reason for failure.
"""
# Extract the jwt token from the header
match = jwtutil.TOKEN_REGEX.match(bearer_header)
if match is None:
raise InvalidBearerTokenException('Invalid bearer token format')
encoded_jwt = match.group(1)
logger.debug('encoded JWT: %s', encoded_jwt)
return decode_bearer_token(encoded_jwt, instance_keys)
def decode_bearer_token(bearer_token, instance_keys):
""" decode_bearer_token decodes the given bearer token that contains both a Key ID as well as the
encoded JWT and returns the decoded and validated JWT. On any error, raises an
@ -25,19 +40,11 @@ def decode_bearer_token(bearer_token, instance_keys):
"""
app_config = instance_keys.app.config
# Extract the jwt token from the header
match = jwtutil.TOKEN_REGEX.match(bearer_token)
if match is None:
raise InvalidBearerTokenException('Invalid bearer token format')
encoded_jwt = match.group(1)
logger.debug('encoded JWT: %s', encoded_jwt)
# Decode the key ID.
headers = jwt.get_unverified_header(encoded_jwt)
headers = jwt.get_unverified_header(bearer_token)
kid = headers.get('kid', None)
if kid is None:
logger.error('Missing kid header on encoded JWT: %s', encoded_jwt)
logger.error('Missing kid header on encoded JWT: %s', bearer_token)
raise InvalidBearerTokenException('Missing kid header')
# Find the matching public key.
@ -51,9 +58,8 @@ def decode_bearer_token(bearer_token, instance_keys):
expected_issuer = instance_keys.service_name
audience = app_config['SERVER_HOSTNAME']
max_signed_s = app_config.get('REGISTRY_JWT_AUTH_MAX_FRESH_S', 3660)
max_exp = jwtutil.exp_max_s_option(max_signed_s)
payload = jwtutil.decode(encoded_jwt, public_key, algorithms=[ALGORITHM], audience=audience,
payload = jwtutil.decode(bearer_token, public_key, algorithms=[ALGORITHM], audience=audience,
issuer=expected_issuer, options=max_exp, leeway=JWT_CLOCK_SKEW_SECONDS)
except jwtutil.InvalidTokenError as ite:
logger.exception('Invalid token reason: %s', ite)