From dd2e086a20831d22d821010aa2373f815f4261a4 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 24 Aug 2016 12:55:33 -0400 Subject: [PATCH] Add feature flag to force all direct download URLs to be proxied Fixes #1667 --- app.py | 3 +- auth/registry_jwt_auth.py | 10 +- conf/nginx/server-base.conf | 30 ++++++ config.py | 3 + storage/__init__.py | 17 +++- storage/distributedstorage.py | 21 ++++- storage/downloadproxy.py | 167 ++++++++++++++++++++++++++++++++++ storage/fakestorage.py | 2 +- test/registry_tests.py | 10 +- test/test_registry_v2_auth.py | 2 +- test/test_storageproxy.py | 89 ++++++++++++++++++ util/security/registry_jwt.py | 30 +++--- 12 files changed, 350 insertions(+), 34 deletions(-) create mode 100644 storage/downloadproxy.py create mode 100644 test/test_storageproxy.py diff --git a/app.py b/app.py index 579dd0229..0d28ad708 100644 --- a/app.py +++ b/app.py @@ -173,7 +173,8 @@ login_manager = LoginManager(app) mail = Mail(app) prometheus = PrometheusPlugin(app) metric_queue = MetricQueue(prometheus) -storage = Storage(app, metric_queue) +instance_keys = InstanceKeys(app) +storage = Storage(app, metric_queue, instance_keys) userfiles = Userfiles(app, storage) log_archive = LogArchive(app, storage) analytics = Analytics(app) diff --git a/auth/registry_jwt_auth.py b/auth/registry_jwt_auth.py index a4c48d749..014757751 100644 --- a/auth/registry_jwt_auth.py +++ b/auth/registry_jwt_auth.py @@ -11,7 +11,7 @@ from .auth_context import set_grant_context, get_grant_context from .permissions import repository_read_grant, repository_write_grant from util.names import parse_namespace_repository from util.http import abort -from util.security.registry_jwt import (ANONYMOUS_SUB, decode_bearer_token, +from util.security.registry_jwt import (ANONYMOUS_SUB, decode_bearer_header, InvalidBearerTokenException) from data import model @@ -136,15 +136,15 @@ def get_auth_headers(repository=None, scopes=None): return headers -def identity_from_bearer_token(bearer_token): - """ Process a bearer token and return the loaded identity, or raise InvalidJWTException if an +def identity_from_bearer_token(bearer_header): + """ Process a bearer header and return the loaded identity, or raise InvalidJWTException if an identity could not be loaded. Expects tokens and grants in the format of the Docker registry v2 auth spec: https://docs.docker.com/registry/spec/auth/token/ """ - logger.debug('Validating auth header: %s', bearer_token) + logger.debug('Validating auth header: %s', bearer_header) try: - payload = decode_bearer_token(bearer_token, instance_keys) + payload = decode_bearer_header(bearer_header, instance_keys) except InvalidBearerTokenException as bte: logger.exception('Invalid bearer token: %s', bte) raise InvalidJWTException(bte) diff --git a/conf/nginx/server-base.conf b/conf/nginx/server-base.conf index 985e04a7b..983874e40 100644 --- a/conf/nginx/server-base.conf +++ b/conf/nginx/server-base.conf @@ -30,6 +30,36 @@ location /realtime { proxy_request_buffering off; } +location ~ ^/_storage_proxy/([^/]+)/([^/]+)/([^/]+)/(.+) { + auth_request /_storage_proxy_auth; + + resolver 8.8.8.8; + + proxy_pass $2://$3/$4$is_args$args; + + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header Host $3; + + add_header Host $3; + + proxy_buffering off; + proxy_request_buffering off; + + proxy_read_timeout 60s; +} + + +location = /_storage_proxy_auth { + proxy_pass http://web_app_server; + proxy_pass_request_body off; + proxy_set_header Content-Length ""; + + proxy_set_header X-Original-URI $request_uri; + + proxy_read_timeout 10; +} + # At the begining and end of a push/pull, (/v1/repositories|/v2/auth/) is hit by the Docker # client. By rate-limiting just this endpoint, we can avoid accidentally # blocking pulls/pushes for images with many layers. diff --git a/config.py b/config.py index 957c03c93..f07a068cd 100644 --- a/config.py +++ b/config.py @@ -211,6 +211,9 @@ class DefaultConfig(object): # Feature Flag: Whether to require invitations when adding a user to a team. FEATURE_REQUIRE_TEAM_INVITE = True + # Feature Flag: Whether to proxy all direct download URLs in storage via the registry's nginx. + FEATURE_PROXY_STORAGE = False + # The namespace to use for library repositories. # Note: This must remain 'library' until Docker removes their hard-coded namespace for libraries. # See: https://github.com/docker/docker/blob/master/registry/session.go#L320 diff --git a/storage/__init__.py b/storage/__init__.py index f78a8f5e7..fd29b2ac6 100644 --- a/storage/__init__.py +++ b/storage/__init__.py @@ -3,7 +3,9 @@ from storage.cloud import S3Storage, GoogleCloudStorage, RadosGWStorage from storage.fakestorage import FakeStorage from storage.distributedstorage import DistributedStorage from storage.swift import SwiftStorage +from storage.downloadproxy import DownloadProxy +from urlparse import urlparse, parse_qs STORAGE_DRIVER_CLASSES = { 'LocalStorage': LocalStorage, @@ -23,14 +25,14 @@ def get_storage_driver(metric_queue, storage_params): class Storage(object): - def __init__(self, app=None, metric_queue=None): + def __init__(self, app=None, metric_queue=None, instance_keys=None): self.app = app - if app is not None and metric_queue is not None: - self.state = self.init_app(app, metric_queue) + if app is not None: + self.state = self.init_app(app, metric_queue, instance_keys) else: self.state = None - def init_app(self, app, metric_queue): + def init_app(self, app, metric_queue, instance_keys): storages = {} for location, storage_params in app.config.get('DISTRIBUTED_STORAGE_CONFIG').items(): storages[location] = get_storage_driver(metric_queue, storage_params) @@ -40,7 +42,12 @@ class Storage(object): preference = storages.keys() default_locations = app.config.get('DISTRIBUTED_STORAGE_DEFAULT_LOCATIONS') or [] - d_storage = DistributedStorage(storages, preference, default_locations) + + download_proxy = None + if app.config.get('FEATURE_PROXY_STORAGE', False) and instance_keys is not None: + download_proxy = DownloadProxy(app, instance_keys) + + d_storage = DistributedStorage(storages, preference, default_locations, download_proxy) # register extension with app app.extensions = getattr(app, 'extensions', {}) diff --git a/storage/distributedstorage.py b/storage/distributedstorage.py index 47be02a86..070cac05b 100644 --- a/storage/distributedstorage.py +++ b/storage/distributedstorage.py @@ -5,10 +5,8 @@ from functools import wraps from storage.basestorage import StoragePaths, BaseStorage, BaseStorageV2 - logger = logging.getLogger(__name__) - def _location_aware(unbound_func): @wraps(unbound_func) def wrapper(self, locations, *args, **kwargs): @@ -27,17 +25,19 @@ def _location_aware(unbound_func): class DistributedStorage(StoragePaths): - def __init__(self, storages, preferred_locations=None, default_locations=None): + def __init__(self, storages, preferred_locations=None, default_locations=None, proxy=None): self._storages = dict(storages) self.preferred_locations = list(preferred_locations or []) self.default_locations = list(default_locations or []) + self.proxy = proxy @property def locations(self): """ Returns the names of the locations supported. """ return list(self._storages.keys()) - get_direct_download_url = _location_aware(BaseStorage.get_direct_download_url) + _get_direct_download_url = _location_aware(BaseStorage.get_direct_download_url) + get_direct_upload_url = _location_aware(BaseStorage.get_direct_upload_url) get_content = _location_aware(BaseStorage.get_content) put_content = _location_aware(BaseStorage.put_content) @@ -55,6 +55,19 @@ class DistributedStorage(StoragePaths): complete_chunked_upload = _location_aware(BaseStorageV2.complete_chunked_upload) cancel_chunked_upload = _location_aware(BaseStorageV2.cancel_chunked_upload) + + def get_direct_download_url(self, locations, path, expires_in=600, requires_cors=False, + head=False): + download_url = self._get_direct_download_url(locations, path, expires_in, requires_cors, head) + if download_url is None: + return None + + if self.proxy is None: + return download_url + + return self.proxy.proxy_download_url(download_url) + + def copy_between(self, path, source_location, destination_location): """ Copies a file between the source location and the destination location. """ source_storage = self._storages[source_location] diff --git a/storage/downloadproxy.py b/storage/downloadproxy.py new file mode 100644 index 000000000..41b0727aa --- /dev/null +++ b/storage/downloadproxy.py @@ -0,0 +1,167 @@ +import urllib +from urlparse import urlparse, parse_qs +from util.security.registry_jwt import (generate_bearer_token, decode_bearer_token, + InvalidBearerTokenException) + +from flask import abort, request +from jsonschema import validate, ValidationError + +import logging + +logger = logging.getLogger(__name__) + + +PROXY_STORAGE_MAX_LIFETIME_S = 30 # Seconds +STORAGE_PROXY_SUBJECT = 'storageproxy' +STORAGE_PROXY_ACCESS_TYPE = 'storageproxy' + +ACCESS_SCHEMA = { + 'type': 'array', + 'description': 'List of access granted to the subject', + 'items': { + 'type': 'object', + 'required': [ + 'type', + 'scheme', + 'host', + 'uri', + ], + 'properties': { + 'type': { + 'type': 'string', + 'description': 'We only allow storage proxy permissions', + 'enum': [ + 'storageproxy', + ], + }, + 'scheme': { + 'type': 'string', + 'description': 'The scheme for the storage URL being proxied' + }, + 'host': { + 'type': 'string', + 'description': 'The hostname for the storage URL being proxied' + }, + 'uri': { + 'type': 'string', + 'description': 'The URI path for the storage URL being proxied' + }, + }, + }, +} + + +class DownloadProxy(object): + """ Helper class to enable proxying of direct download URLs for storage via the registry's + local NGINX. + """ + def __init__(self, app, instance_keys): + self.app = app + self.instance_keys = instance_keys + + app.add_url_rule('/_storage_proxy_auth', '_storage_proxy_auth', self._validate_proxy_url) + + def proxy_download_url(self, download_url): + """ Returns a URL to proxy the specified blob download URL. + """ + # Parse the URL to be downloaded into its components (host, path, scheme). + parsed = urlparse(download_url) + + path = parsed.path + if parsed.query: + path = path + '?' + parsed.query + + if path.startswith('/'): + path = path[1:] + + access = { + 'type': STORAGE_PROXY_ACCESS_TYPE, + 'uri': path, + 'host': parsed.netloc, + 'scheme': parsed.scheme, + } + + # Generate a JWT that signs access to this URL. This JWT will be passed back to the registry + # code when the download commences. Note that we don't add any context here, as it isn't + # needed. + server_hostname = self.app.config['SERVER_HOSTNAME'] + token = generate_bearer_token(server_hostname, STORAGE_PROXY_SUBJECT, {}, [access], + PROXY_STORAGE_MAX_LIFETIME_S, self.instance_keys) + + url_scheme = self.app.config['PREFERRED_URL_SCHEME'] + server_hostname = self.app.config['SERVER_HOSTNAME'] + + # The proxy path is of the form: + # http(s)://registry_server/_storage_proxy/{token}/{scheme}/{hostname}/rest/of/path/here + encoded_token = urllib.quote(token) + proxy_url = '%s://%s/_storage_proxy/%s/%s/%s/%s' % (url_scheme, server_hostname, encoded_token, + parsed.scheme, parsed.netloc, path) + logger.debug('Proxying via URL %s', proxy_url) + return proxy_url + + + def _validate_proxy_url(self): + original_uri = request.headers.get('X-Original-URI', None) + if not original_uri: + logger.error('Missing original URI: %s', request.headers) + abort(401) + + if not original_uri.startswith('/_storage_proxy/'): + logger.error('Unknown storage proxy path: %s', original_uri) + abort(401) + + # The proxy path is of the form: + # /_storage_proxy/{token}/{scheme}/{hostname}/rest/of/path/here + without_prefix = original_uri[len('/_storage_proxy/'):] + parts = without_prefix.split('/', 3) + if len(parts) != 4: + logger.error('Invalid storage proxy path (found %s parts): %s', len(parts), without_prefix) + abort(401) + + encoded_token, scheme, host, uri = parts + token = urllib.unquote(encoded_token) + + logger.debug('Got token %s for storage proxy auth request %s with parts %s', token, + original_uri, parts) + + # Decode the bearer token. + try: + decoded = decode_bearer_token(token, self.instance_keys) + except InvalidBearerTokenException: + logger.exception('Invalid token for storage proxy') + abort(401) + + # Ensure it is for the proxy. + if decoded['sub'] != STORAGE_PROXY_SUBJECT: + logger.exception('Invalid subject %s for storage proxy auth', decoded['subject']) + abort(401) + + # Validate that the access matches the token format. + access = decoded.get('access', {}) + try: + validate(access, ACCESS_SCHEMA) + except ValidationError: + logger.exception('We should not be minting invalid credentials: %s', access) + abort(401) + + # For now, we only expect a single access credential. + if len(access) != 1: + logger.exception('We should not be minting invalid credentials: %s', access) + abort(401) + + # Ensure the signed access matches the requested URL's pieces. + granted_access = access[0] + if granted_access['scheme'] != scheme: + logger.exception('Mismatch in scheme. %s expected, %s found', granted_access['scheme'], + scheme) + abort(401) + + if granted_access['host'] != host: + logger.exception('Mismatch in host. %s expected, %s found', granted_access['host'], host) + abort(401) + + if granted_access['uri'] != uri: + logger.exception('Mismatch in uri. %s expected, %s found', granted_access['uri'], uri) + abort(401) + + return 'OK' diff --git a/storage/fakestorage.py b/storage/fakestorage.py index 0fe0b4213..894b6d261 100644 --- a/storage/fakestorage.py +++ b/storage/fakestorage.py @@ -18,7 +18,7 @@ class FakeStorage(BaseStorageV2): def get_direct_download_url(self, path, expires_in=60, requires_cors=False, head=False): try: if self.get_content('supports_direct_download') == 'true': - return 'http://somefakeurl' + return 'http://somefakeurl?goes=here' except: pass diff --git a/test/registry_tests.py b/test/registry_tests.py index a5c42fb56..4bd91109a 100644 --- a/test/registry_tests.py +++ b/test/registry_tests.py @@ -28,7 +28,7 @@ from initdb import wipe_database, initialize_database, populate_database from endpoints.csrf import generate_csrf_token from tempfile import NamedTemporaryFile from jsonschema import validate as validate_schema -from util.security.registry_jwt import decode_bearer_token +from util.security.registry_jwt import decode_bearer_header import endpoints.decorated import json @@ -1543,7 +1543,7 @@ class TorrentTestMixin(V2RegistryPullMixin): contents = bencode.bdecode(torrent) # Ensure that there is a webseed. - self.assertEquals(contents['url-list'], 'http://somefakeurl') + self.assertEquals(contents['url-list'], 'http://somefakeurl?goes=here') # Ensure there is an announce and some pieces. self.assertIsNotNone(contents.get('info', {}).get('pieces')) @@ -1887,7 +1887,7 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC contents = bencode.bdecode(response.content) # Ensure that there is a webseed. - self.assertEquals(contents['url-list'], 'http://somefakeurl') + self.assertEquals(contents['url-list'], 'http://somefakeurl?goes=here') # Ensure there is an announce and some pieces. self.assertIsNotNone(contents.get('info', {}).get('pieces')) @@ -1956,9 +1956,9 @@ class V2LoginTests(V2RegistryLoginMixin, LoginTests, RegistryTestCaseMixin, Base # Validate the returned token. encoded = response.json()['token'] - token = 'Bearer ' + encoded + header = 'Bearer ' + encoded - payload = decode_bearer_token(token, instance_keys) + payload = decode_bearer_header(header, instance_keys) self.assertIsNotNone(payload) if scope is None: diff --git a/test/test_registry_v2_auth.py b/test/test_registry_v2_auth.py index 039dcd7da..be3d40c03 100644 --- a/test/test_registry_v2_auth.py +++ b/test/test_registry_v2_auth.py @@ -10,7 +10,7 @@ from endpoints.v2.v2auth import TOKEN_VALIDITY_LIFETIME_S from auth.registry_jwt_auth import identity_from_bearer_token, InvalidJWTException from util.morecollections import AttrDict from util.security.registry_jwt import (ANONYMOUS_SUB, build_context_and_subject, - decode_bearer_token, generate_bearer_token) + generate_bearer_token) TEST_AUDIENCE = app.config['SERVER_HOSTNAME'] diff --git a/test/test_storageproxy.py b/test/test_storageproxy.py new file mode 100644 index 000000000..b314c1793 --- /dev/null +++ b/test/test_storageproxy.py @@ -0,0 +1,89 @@ +import unittest +import requests +import os + +from flask import Flask +from flask.ext.testing import LiveServerTestCase +from initdb import setup_database_for_testing, finished_database_for_testing +from util.security.instancekeys import InstanceKeys +from storage import Storage + +_PORT_NUMBER = 5001 + +class TestStorageProxy(LiveServerTestCase): + def setUp(self): + setup_database_for_testing(self) + + def tearDown(self): + finished_database_for_testing(self) + + def create_app(self): + global _PORT_NUMBER + _PORT_NUMBER = _PORT_NUMBER + 1 + + self.test_app = Flask('teststorageproxy') + self.test_app.config['LIVESERVER_PORT'] = _PORT_NUMBER + + if os.environ.get('DEBUG') == 'true': + self.test_app.config['DEBUG'] = True + + self.test_app.config['TESTING'] = True + self.test_app.config['SERVER_HOSTNAME'] = 'localhost:%s' % _PORT_NUMBER + + self.test_app.config['INSTANCE_SERVICE_KEY_KID_LOCATION'] = 'test/data/test.kid' + self.test_app.config['INSTANCE_SERVICE_KEY_LOCATION'] = 'test/data/test.pem' + self.test_app.config['INSTANCE_SERVICE_KEY_SERVICE'] = 'quay' + + # UGH... Such a stupid hack! + self.test_app.config['FEATURE_PROXY_STORAGE'] = self.id().find('notinstalled') < 0 + + self.test_app.config['DISTRIBUTED_STORAGE_CONFIG'] = { + 'test': ['FakeStorage', {}], + } + + instance_keys = InstanceKeys(self.test_app) + self.storage = Storage(self.test_app, instance_keys=instance_keys) + self.test_app.config['DISTRIBUTED_STORAGE_PREFERENCE'] = ['test'] + return self.test_app + + def test_storage_proxy_auth_notinstalled(self): + # Active direct download on the fake storage. + self.storage.put_content(['test'], 'supports_direct_download', 'true') + + # Get the unwrapped URL. + direct_download_url = self.storage.get_direct_download_url(['test'], 'somepath') + self.assertEquals(-1, direct_download_url.find('/_storage_proxy/')) + + # Ensure that auth returns 404. + headers = { + 'X-Original-URI': 'someurihere' + } + + resp = requests.get('http://%s/_storage_proxy_auth' % self.test_app.config['SERVER_HOSTNAME'], + headers=headers) + self.assertEquals(404, resp.status_code) + + + def test_storage_proxy_auth(self): + # Active direct download on the fake storage. + self.storage.put_content(['test'], 'supports_direct_download', 'true') + + # Get the wrapped URL. + direct_download_url = self.storage.get_direct_download_url(['test'], 'somepath') + + # Ensure it refers to the storage proxy. + proxy_index = direct_download_url.find('/_storage_proxy/') + self.assertTrue(proxy_index > 0) + + # Ensure that auth returns 200 for the URL pieces. + headers = { + 'X-Original-URI': direct_download_url[proxy_index:] + } + + resp = requests.get('http://%s/_storage_proxy_auth' % self.test_app.config['SERVER_HOSTNAME'], + headers=headers) + self.assertEquals(200, resp.status_code) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/util/security/registry_jwt.py b/util/security/registry_jwt.py index 85d621d40..212fa4f37 100644 --- a/util/security/registry_jwt.py +++ b/util/security/registry_jwt.py @@ -18,6 +18,21 @@ class InvalidBearerTokenException(Exception): pass +def decode_bearer_header(bearer_header, instance_keys): + """ decode_bearer_header decodes the given bearer header that contains an encoded JWT with both + a Key ID as well as the signed JWT and returns the decoded and validated JWT. On any error, + raises an InvalidBearerTokenException with the reason for failure. + """ + # Extract the jwt token from the header + match = jwtutil.TOKEN_REGEX.match(bearer_header) + if match is None: + raise InvalidBearerTokenException('Invalid bearer token format') + + encoded_jwt = match.group(1) + logger.debug('encoded JWT: %s', encoded_jwt) + return decode_bearer_token(encoded_jwt, instance_keys) + + def decode_bearer_token(bearer_token, instance_keys): """ decode_bearer_token decodes the given bearer token that contains both a Key ID as well as the encoded JWT and returns the decoded and validated JWT. On any error, raises an @@ -25,19 +40,11 @@ def decode_bearer_token(bearer_token, instance_keys): """ app_config = instance_keys.app.config - # Extract the jwt token from the header - match = jwtutil.TOKEN_REGEX.match(bearer_token) - if match is None: - raise InvalidBearerTokenException('Invalid bearer token format') - - encoded_jwt = match.group(1) - logger.debug('encoded JWT: %s', encoded_jwt) - # Decode the key ID. - headers = jwt.get_unverified_header(encoded_jwt) + headers = jwt.get_unverified_header(bearer_token) kid = headers.get('kid', None) if kid is None: - logger.error('Missing kid header on encoded JWT: %s', encoded_jwt) + logger.error('Missing kid header on encoded JWT: %s', bearer_token) raise InvalidBearerTokenException('Missing kid header') # Find the matching public key. @@ -51,9 +58,8 @@ def decode_bearer_token(bearer_token, instance_keys): expected_issuer = instance_keys.service_name audience = app_config['SERVER_HOSTNAME'] max_signed_s = app_config.get('REGISTRY_JWT_AUTH_MAX_FRESH_S', 3660) - max_exp = jwtutil.exp_max_s_option(max_signed_s) - payload = jwtutil.decode(encoded_jwt, public_key, algorithms=[ALGORITHM], audience=audience, + payload = jwtutil.decode(bearer_token, public_key, algorithms=[ALGORITHM], audience=audience, issuer=expected_issuer, options=max_exp, leeway=JWT_CLOCK_SKEW_SECONDS) except jwtutil.InvalidTokenError as ite: logger.exception('Invalid token reason: %s', ite)