From c0374d71c94fadf835cc0b218d9fd1e97efd45f9 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 24 Feb 2016 16:01:27 -0500 Subject: [PATCH] Refactor the security worker and API calls and add a bunch of tests --- app.py | 3 +- data/database.py | 3 + endpoints/api/secscan.py | 42 ++--- endpoints/common.py | 3 +- endpoints/notificationevent.py | 2 +- initdb.py | 46 ++--- requirements-nover.txt | 3 +- requirements.txt | 1 + test/test_api_security.py | 24 +-- test/test_api_usage.py | 71 ++++++++ test/test_secscan.py | 242 +++++++++++++++++++++++++ test/testconfig.py | 5 +- util/secscan/__init__.py | 90 +++++++++ util/secscan/analyzer.py | 138 ++++++++++++++ util/secscan/api.py | 321 +++++++++++++++------------------ util/secscan/validator.py | 65 +++++++ workers/securityworker.py | 208 +-------------------- 17 files changed, 811 insertions(+), 456 deletions(-) create mode 100644 test/test_secscan.py create mode 100644 util/secscan/analyzer.py create mode 100644 util/secscan/validator.py diff --git a/app.py b/app.py index d31c49397..1a7375000 100644 --- a/app.py +++ b/app.py @@ -188,8 +188,7 @@ dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'], tf reporter=MetricQueueReporter(metric_queue)) notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf) secscan_notification_queue = WorkQueue(app.config['SECSCAN_NOTIFICATION_QUEUE_NAME'], tf) - -secscan_api = SecurityScannerAPI(app, config_provider) +secscan_api = SecurityScannerAPI(app.config, config_provider, storage) # Check for a key in config. If none found, generate a new signing key for Docker V2 manifests. _v2_key_path = os.path.join(OVERRIDE_CONFIG_DIRECTORY, DOCKER_V2_SIGNINGKEY_FILENAME) diff --git a/data/database.py b/data/database.py index 84f8b63db..e7f090de2 100644 --- a/data/database.py +++ b/data/database.py @@ -113,6 +113,9 @@ class CloseForLongOperation(object): self.config_object = config_object def __enter__(self): + if self.config_object.get('TESTING') == True: + return + close_db_filter(None) def __exit__(self, type, value, traceback): diff --git a/endpoints/api/secscan.py b/endpoints/api/secscan.py index 146227a76..47197295f 100644 --- a/endpoints/api/secscan.py +++ b/endpoints/api/secscan.py @@ -2,14 +2,13 @@ import logging import features -import json -import requests from app import secscan_api from data import model from endpoints.api import (require_repo_read, NotFound, DownstreamIssue, path_param, RepositoryParamResource, resource, nickname, show_if, parse_args, query_param, truthy_bool) +from util.secscan.api import APIRequestFailure logger = logging.getLogger(__name__) @@ -22,30 +21,6 @@ class SCAN_STATUS(object): QUEUED = 'queued' -def _call_security_api(relative_url, *args, **kwargs): - """ Issues an HTTP call to the sec API at the given relative URL. """ - try: - response = secscan_api.call(relative_url, None, *args, **kwargs) - except requests.exceptions.Timeout: - raise DownstreamIssue(payload=dict(message='API call timed out')) - except requests.exceptions.ConnectionError: - raise DownstreamIssue(payload=dict(message='Could not connect to downstream service')) - - if response.status_code == 404: - raise NotFound() - - try: - response_data = json.loads(response.text) - except ValueError: - raise DownstreamIssue(payload=dict(message='Non-json response from downstream service')) - - if response.status_code / 100 != 2: - logger.warning('Got %s status code to call: %s', response.status_code, response.text) - raise DownstreamIssue(payload=dict(message=response_data['Message'])) - - return response_data - - def _get_status(repo_image): if repo_image.security_indexed_engine is not None and repo_image.security_indexed_engine >= 0: return SCAN_STATUS.SCANNED if repo_image.security_indexed else SCAN_STATUS.FAILED @@ -78,11 +53,16 @@ class RepositoryImageSecurity(RepositoryParamResource): 'status': _get_status(repo_image), } - layer_id = '%s.%s' % (repo_image.docker_image_id, repo_image.storage.uuid) - if parsed_args.vulnerabilities: - data = _call_security_api('layers/%s?vulnerabilities', layer_id) - else: - data = _call_security_api('layers/%s?features', layer_id) + try: + if parsed_args.vulnerabilities: + data = secscan_api.get_layer_data(repo_image, include_vulnerabilities=True) + else: + data = secscan_api.get_layer_data(repo_image, include_features=True) + except APIRequestFailure as arf: + raise DownstreamIssue(arf.message) + + if data is None: + raise NotFound() return { 'status': _get_status(repo_image), diff --git a/endpoints/common.py b/endpoints/common.py index b4c897dad..e3e37aa07 100644 --- a/endpoints/common.py +++ b/endpoints/common.py @@ -18,11 +18,10 @@ from app import app, oauth_apps, LoginWrappedDBUser from auth.permissions import QuayDeferredPermissionUser from auth import scopes -from werkzeug.routing import BaseConverter from functools import wraps from config import frontend_visible_config from external_libraries import get_external_javascript, get_external_css -from util.secscan.api import PRIORITY_LEVELS +from util.secscan import PRIORITY_LEVELS from util.names import parse_namespace_repository import features diff --git a/endpoints/notificationevent.py b/endpoints/notificationevent.py index b0b58f44e..925247809 100644 --- a/endpoints/notificationevent.py +++ b/endpoints/notificationevent.py @@ -5,7 +5,7 @@ import json from datetime import datetime from notificationhelper import build_event_data from util.jinjautil import get_template_env -from util.secscan.api import PRIORITY_LEVELS, get_priority_for_index +from util.secscan import PRIORITY_LEVELS, get_priority_for_index template_env = get_template_env("events") logger = logging.getLogger(__name__) diff --git a/initdb.py b/initdb.py index 0e79a4d37..63cebbeb4 100644 --- a/initdb.py +++ b/initdb.py @@ -72,7 +72,7 @@ def __gen_image_uuid(repo, image_num): global_image_num = count() -def __create_subtree(repo, structure, creator_username, parent, tag_map): +def __create_subtree(with_storage, repo, structure, creator_username, parent, tag_map): num_nodes, subtrees, last_node_tags = structure # create the nodes @@ -91,7 +91,7 @@ def __create_subtree(repo, structure, creator_username, parent, tag_map): model.storage.save_torrent_info(new_image.storage, 1, 'deadbeef') # Write some data for the storage. - if os.environ.get('WRITE_STORAGE_FILES'): + if with_storage or os.environ.get('WRITE_STORAGE_FILES'): storage_paths = StoragePaths() paths = [storage_paths.v1_image_layer_path] @@ -147,10 +147,10 @@ def __create_subtree(repo, structure, creator_username, parent, tag_map): found_tag.save() for subtree in subtrees: - __create_subtree(repo, subtree, creator_username, new_image, tag_map) + __create_subtree(with_storage, repo, subtree, creator_username, new_image, tag_map) -def __generate_repository(user_obj, name, description, is_public, permissions, structure): +def __generate_repository(with_storage, user_obj, name, description, is_public, permissions, structure): repo = model.repository.create_repository(user_obj.username, name, user_obj) if is_public: @@ -165,9 +165,9 @@ def __generate_repository(user_obj, name, description, is_public, permissions, s if isinstance(structure, list): for leaf in structure: - __create_subtree(repo, leaf, user_obj.username, None, {}) + __create_subtree(with_storage, repo, leaf, user_obj.username, None, {}) else: - __create_subtree(repo, structure, user_obj.username, None, {}) + __create_subtree(with_storage, repo, structure, user_obj.username, None, {}) return repo @@ -181,7 +181,7 @@ def finished_database_for_testing(testcase): """ testcases[testcase]['savepoint'].__exit__(True, None, None) -def setup_database_for_testing(testcase): +def setup_database_for_testing(testcase, with_storage=False, force_rebuild=False): """ Called when a testcase has started using the database, indicating that the database should be setup (if not already) and a savepoint created. """ @@ -190,13 +190,13 @@ def setup_database_for_testing(testcase): if not IS_TESTING_REAL_DATABASE and not isinstance(db.obj, SqliteDatabase): raise RuntimeError('Attempted to wipe production database!') - if not db_initialized_for_testing.is_set(): + if not db_initialized_for_testing.is_set() or force_rebuild: logger.debug('Setting up DB for testing.') # Setup the database. wipe_database() initialize_database() - populate_database() + populate_database(with_storage=with_storage) models_missing_data = find_models_missing_data() if models_missing_data: @@ -359,7 +359,7 @@ def wipe_database(): drop_model_tables(all_models, fail_silently=True) -def populate_database(minimal=False): +def populate_database(minimal=False, with_storage=False): logger.debug('Populating the DB with test data.') new_user_1 = model.user.create_user('devtable', 'password', 'jschorr@devtable.com') @@ -428,15 +428,15 @@ def populate_database(minimal=False): metadata=notification_metadata) - __generate_repository(new_user_4, 'randomrepo', 'Random repo repository.', False, + __generate_repository(with_storage, new_user_4, 'randomrepo', 'Random repo repository.', False, [], (4, [], ['latest', 'prod'])) - simple_repo = __generate_repository(new_user_1, 'simple', 'Simple repository.', False, + simple_repo = __generate_repository(with_storage, new_user_1, 'simple', 'Simple repository.', False, [], (4, [], ['latest', 'prod'])) model.blob.initiate_upload(new_user_1.username, simple_repo.name, str(uuid4()), 'local_us', {}) model.notification.create_repo_notification(simple_repo, 'repo_push', 'quay_notification', {}, {}) - __generate_repository(new_user_1, 'sharedtags', + __generate_repository(with_storage, new_user_1, 'sharedtags', 'Shared tags repository', False, [(new_user_2, 'read'), (dtrobot[0], 'read')], (2, [(3, [], ['v2.0', 'v2.1', 'v2.2']), @@ -444,10 +444,10 @@ def populate_database(minimal=False): ['staging', '8423b58']), (1, [], None)], None)], None)) - __generate_repository(new_user_1, 'history', 'Historical repository.', False, + __generate_repository(with_storage, new_user_1, 'history', 'Historical repository.', False, [], (4, [(2, [], 'latest'), (3, [], '#latest')], None)) - __generate_repository(new_user_1, 'complex', + __generate_repository(with_storage, new_user_1, 'complex', 'Complex repository with many branches and tags.', False, [(new_user_2, 'read'), (dtrobot[0], 'read')], (2, [(3, [], 'v2.0'), @@ -455,7 +455,7 @@ def populate_database(minimal=False): 'staging'), (1, [], None)], None)], None)) - __generate_repository(new_user_1, 'gargantuan', None, False, [], + __generate_repository(with_storage, new_user_1, 'gargantuan', None, False, [], (2, [(3, [], 'v2.0'), (1, [(1, [(1, [], ['latest', 'prod'])], 'staging'), @@ -465,21 +465,21 @@ def populate_database(minimal=False): (1, [(1, [], 'v5.0'), (1, [], 'v6.0')], None)], None)) - __generate_repository(new_user_2, 'publicrepo', + __generate_repository(with_storage, new_user_2, 'publicrepo', 'Public repository pullable by the world.', True, [], (10, [], 'latest')) - __generate_repository(outside_org, 'coolrepo', + __generate_repository(with_storage, outside_org, 'coolrepo', 'Some cool repo.', False, [], (5, [], 'latest')) - __generate_repository(new_user_1, 'shared', + __generate_repository(with_storage, new_user_1, 'shared', 'Shared repository, another user can write.', False, [(new_user_2, 'write'), (reader, 'read')], (5, [], 'latest')) - building = __generate_repository(new_user_1, 'building', + building = __generate_repository(with_storage, new_user_1, 'building', 'Empty repository which is building.', False, [], (0, [], None)) @@ -564,10 +564,10 @@ def populate_database(minimal=False): owners.description = 'Owners have unfetterd access across the entire org.' owners.save() - org_repo = __generate_repository(org, 'orgrepo', 'Repository owned by an org.', False, + org_repo = __generate_repository(with_storage, org, 'orgrepo', 'Repository owned by an org.', False, [(outside_org, 'read')], (4, [], ['latest', 'prod'])) - __generate_repository(org, 'anotherorgrepo', 'Another repository owned by an org.', False, + __generate_repository(with_storage, org, 'anotherorgrepo', 'Another repository owned by an org.', False, [], (4, [], ['latest', 'prod'])) creators = model.team.create_team('creators', org, 'creator', 'Creators of orgrepo.') @@ -583,7 +583,7 @@ def populate_database(minimal=False): model.team.add_user_to_team(creatorbot, creators) model.team.add_user_to_team(creatoruser, creators) - __generate_repository(new_user_1, 'superwide', None, False, [], + __generate_repository(with_storage, new_user_1, 'superwide', None, False, [], [(10, [], 'latest2'), (2, [], 'latest3'), (2, [(1, [], 'latest11'), (2, [], 'latest12')], diff --git a/requirements-nover.txt b/requirements-nover.txt index 487a8edd4..c671f52b6 100644 --- a/requirements-nover.txt +++ b/requirements-nover.txt @@ -60,4 +60,5 @@ bintrees redlock semantic-version bencode -cryptography==1.1.2 # Remove version when https://github.com/pyca/cryptography/issues/2690 fixed \ No newline at end of file +cryptography==1.1.2 # Remove version when https://github.com/pyca/cryptography/issues/2690 fixed +httmock \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index c2d5b216d..4feffb10d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,6 +32,7 @@ greenlet==0.4.9 gunicorn==18.0 hiredis==0.2.0 html5lib==0.9999999 +httmock==1.2.4 idna==2.0 ipaddress==1.0.16 iso8601==0.1.11 diff --git a/test/test_api_security.py b/test/test_api_security.py index 8df483c69..c785aaee4 100644 --- a/test/test_api_security.py +++ b/test/test_api_security.py @@ -49,7 +49,7 @@ from endpoints.api.superuser import (SuperUserLogs, SuperUserList, SuperUserMana SuperUserSendRecoveryEmail, ChangeLog, SuperUserOrganizationManagement, SuperUserOrganizationList, SuperUserAggregateLogs) -from endpoints.api.secscan import RepositoryImagePackages, RepositoryImageVulnerabilities +from endpoints.api.secscan import RepositoryImageSecurity try: @@ -4170,28 +4170,10 @@ class TestOrganizationInvoiceField(ApiTestCase): self._run_test('DELETE', 201, 'devtable', None) -class TestRepositoryImageVulnerabilities(ApiTestCase): +class TestRepositoryImageSecurity(ApiTestCase): def setUp(self): ApiTestCase.setUp(self) - self._set_url(RepositoryImageVulnerabilities, repository='devtable/simple', imageid='fake') - - def test_get_anonymous(self): - self._run_test('GET', 401, None, None) - - def test_get_freshuser(self): - self._run_test('GET', 403, 'freshuser', None) - - def test_get_reader(self): - self._run_test('GET', 403, 'reader', None) - - def test_get_devtable(self): - self._run_test('GET', 404, 'devtable', None) - - -class TestRepositoryImagePackages(ApiTestCase): - def setUp(self): - ApiTestCase.setUp(self) - self._set_url(RepositoryImagePackages, repository='devtable/simple', imageid='fake') + self._set_url(RepositoryImageSecurity, repository='devtable/simple', imageid='fake') def test_get_anonymous(self): self._run_test('GET', 401, None, None) diff --git a/test/test_api_usage.py b/test/test_api_usage.py index 0269e3747..c46a2f8ee 100644 --- a/test/test_api_usage.py +++ b/test/test_api_usage.py @@ -11,6 +11,7 @@ from urllib import urlencode from urlparse import urlparse, urlunparse, parse_qs from playhouse.test_utils import assert_query_count, _QueryLogHandler +from httmock import urlmatch, HTTMock from endpoints.api import api_bp, api from endpoints.building import PreparedBuild @@ -52,6 +53,7 @@ from endpoints.api.repository import RepositoryList, RepositoryVisibility, Repos from endpoints.api.permission import (RepositoryUserPermission, RepositoryTeamPermission, RepositoryTeamPermissionList, RepositoryUserPermissionList) from endpoints.api.superuser import SuperUserLogs, SuperUserList, SuperUserManagement +from endpoints.api.secscan import RepositoryImageSecurity from endpoints.api.suconfig import (SuperUserRegistryStatus, SuperUserConfig, SuperUserConfigFile, SuperUserCreateInitialSuperUser) @@ -3430,6 +3432,75 @@ class TestSuperUserConfig(ApiTestCase): self.assertTrue(json['exists']) + +@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') +def get_layer_success_mock(url, request): + vulnerabilities = [ + { + "Name": "CVE-2014-9471", + "Namespace": "debian:8", + "Description": "The parse_datetime function in GNU coreutils allows remote attackers to cause a denial of service (crash) or possibly execute arbitrary code via a crafted date string, as demonstrated by the \"--date=TZ=\"123\"345\" @1\" string to the touch or date command.", + "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471", + "Severity": "Low", + "FixedBy": "9.23-5" + } + ] + + features = [ + { + "Name": "coreutils", + "Namespace": "debian:8", + "Version": "8.23-4", + "Vulnerabilities": vulnerabilities, + } + ] + + if not request.url.endswith('?vulnerabilities'): + vulnerabilities = [] + + if not request.url.endswith('?features'): + features = [] + + return py_json.dumps({ + "Layer": { + "Name": "17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52", + "Namespace": "debian:8", + "ParentName": "140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2", + "IndexedByVersion": 1, + "Features": features + } + }) + + + +class TestRepositoryImageSecurity(ApiTestCase): + def test_get_vulnerabilities(self): + self.login(ADMIN_ACCESS_USER) + + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, 'simple', 'latest') + + # Grab the security info for the tag. It should be queued. + response = self.getJsonResponse(RepositoryImageSecurity, + params=dict(repository=ADMIN_ACCESS_USER + '/simple', + imageid=layer.docker_image_id, + vulnerabilities='true')) + self.assertEquals('queued', response['status']) + + # Mark the layer as indexed. + layer.security_indexed = True + layer.security_indexed_engine = app.config['SECURITY_SCANNER']['ENGINE_VERSION_TARGET'] + layer.save() + + # Grab the security info again. + with HTTMock(get_layer_success_mock): + response = self.getJsonResponse(RepositoryImageSecurity, + params=dict(repository=ADMIN_ACCESS_USER + '/simple', + imageid=layer.docker_image_id, + vulnerabilities='true')) + self.assertEquals('scanned', response['status']) + self.assertEquals(1, response['data']['Layer']['IndexedByVersion']) + + class TestSuperUserManagement(ApiTestCase): def test_get_user(self): self.login(ADMIN_ACCESS_USER) diff --git a/test/test_secscan.py b/test/test_secscan.py new file mode 100644 index 000000000..9afc9ee21 --- /dev/null +++ b/test/test_secscan.py @@ -0,0 +1,242 @@ +import unittest +import json +from httmock import urlmatch, all_requests, HTTMock + +from app import app, config_provider, storage, notification_queue +from initdb import setup_database_for_testing, finished_database_for_testing +from util.secscan.api import SecurityScannerAPI, AnalyzeLayerException +from util.secscan.analyzer import LayerAnalyzer +from data import model + + +ADMIN_ACCESS_USER = 'devtable' +SIMPLE_REPO = 'simple' + +_PORT_NUMBER = 5001 + +@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') +def get_layer_failure_mock(url, request): + return {'status_code': 404, 'content': json.dumps({'Error': {'Message': 'Unknown layer'}})} + + +@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$') +def analyze_layer_badrequest_mock(url, request): + return {'status_code': 400, 'content': json.dumps({'Error': {'Message': 'Bad request'}})} + + +@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$') +def analyze_layer_internal_mock(url, request): + return {'status_code': 500, 'content': json.dumps({'Error': {'Message': 'Internal server error'}})} + + +@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$') +def analyze_layer_failure_mock(url, request): + return {'status_code': 422, 'content': json.dumps({'Error': {'Message': 'Bad layer'}})} + + +@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$') +def analyze_layer_success_mock(url, request): + return {'status_code': 201, 'content': json.dumps({ + "Layer": { + "Name": "523ef1d23f222195488575f52a39c729c76a8c5630c9a194139cb246fb212da6", + "Path": "/mnt/layers/523ef1d23f222195488575f52a39c729c76a8c5630c9a194139cb246fb212da6/layer.tar", + "ParentName": "140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2", + "Format": "Docker", + "IndexedByVersion": 1 + } + })} + + +@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') +def get_layer_success_mock(url, request): + vulnerabilities = [ + { + "Name": "CVE-2014-9471", + "Namespace": "debian:8", + "Description": "The parse_datetime function in GNU coreutils allows remote attackers to cause a denial of service (crash) or possibly execute arbitrary code via a crafted date string, as demonstrated by the \"--date=TZ=\"123\"345\" @1\" string to the touch or date command.", + "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471", + "Severity": "Low", + "FixedBy": "9.23-5" + } + ] + + features = [ + { + "Name": "coreutils", + "Namespace": "debian:8", + "Version": "8.23-4", + "Vulnerabilities": vulnerabilities, + } + ] + + if not request.url.endswith('?vulnerabilities'): + vulnerabilities = [] + + if not request.url.endswith('?features'): + features = [] + + return json.dumps({ + "Layer": { + "Name": "17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52", + "Namespace": "debian:8", + "ParentName": "140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2", + "IndexedByVersion": 1, + "Features": features + } + }) + + +@all_requests +def response_content(url, request): + return {'status_code': 500, 'content': 'Unknown endpoint'} + + +class TestSecurityScanner(unittest.TestCase): + def setUp(self): + # Enable direct download in fake storage. + storage.put_content(['local_us'], 'supports_direct_download', 'true') + + # Setup the database with fake storage. + setup_database_for_testing(self, with_storage=True, force_rebuild=True) + self.app = app.test_client() + self.ctx = app.test_request_context() + self.ctx.__enter__() + + self.api = SecurityScannerAPI(app.config, config_provider, storage) + + def tearDown(self): + storage.put_content(['local_us'], 'supports_direct_download', 'false') + finished_database_for_testing(self) + self.ctx.__exit__(True, None, None) + + def assertAnalyzed(self, layer, isAnalyzed, engineVersion): + self.assertEquals(isAnalyzed, layer.security_indexed) + self.assertEquals(engineVersion, layer.security_indexed_engine) + + # Ensure all parent layers are marked as analyzed. + parents = model.image.get_parent_images(ADMIN_ACCESS_USER, SIMPLE_REPO, layer) + for parent in parents: + self.assertEquals(isAnalyzed, parent.security_indexed) + self.assertEquals(engineVersion, parent.security_indexed_engine) + + + def test_get_layer_success(self): + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + with HTTMock(get_layer_success_mock, response_content): + result = self.api.get_layer_data(layer, include_vulnerabilities=True) + self.assertIsNotNone(result) + self.assertEquals(result['Layer']['Name'], '17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52') + + + def test_get_layer_failure(self): + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + with HTTMock(get_layer_failure_mock, response_content): + result = self.api.get_layer_data(layer, include_vulnerabilities=True) + self.assertIsNone(result) + + + def test_analyze_layer_success(self): + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertFalse(layer.security_indexed) + self.assertEquals(-1, layer.security_indexed_engine) + + with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content): + analyzer = LayerAnalyzer(app.config, self.api) + analyzer.analyze_recursively(layer) + + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertAnalyzed(layer, True, 1) + + + def test_analyze_layer_failure(self): + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertFalse(layer.security_indexed) + self.assertEquals(-1, layer.security_indexed_engine) + + with HTTMock(analyze_layer_failure_mock, response_content): + analyzer = LayerAnalyzer(app.config, self.api) + analyzer.analyze_recursively(layer) + + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertAnalyzed(layer, False, 1) + + + def test_analyze_layer_internal_error(self): + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertFalse(layer.security_indexed) + self.assertEquals(-1, layer.security_indexed_engine) + + with HTTMock(analyze_layer_internal_mock, response_content): + analyzer = LayerAnalyzer(app.config, self.api) + analyzer.analyze_recursively(layer) + + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertAnalyzed(layer, False, -1) + + + def test_analyze_layer_bad_request(self): + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertFalse(layer.security_indexed) + self.assertEquals(-1, layer.security_indexed_engine) + + with HTTMock(analyze_layer_badrequest_mock, response_content): + analyzer = LayerAnalyzer(app.config, self.api) + try: + analyzer.analyze_recursively(layer) + except AnalyzeLayerException: + return + + self.fail('Expected exception on bad request') + + + def test_analyze_layer_missing_storage(self): + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertFalse(layer.security_indexed) + self.assertEquals(-1, layer.security_indexed_engine) + + # Delete the storage for the layer. + path = model.storage.get_layer_path(layer.storage) + locations = app.config['DISTRIBUTED_STORAGE_PREFERENCE'] + storage.remove(locations, path) + + with HTTMock(analyze_layer_success_mock, response_content): + analyzer = LayerAnalyzer(app.config, self.api) + analyzer.analyze_recursively(layer) + + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertEquals(False, layer.security_indexed) + self.assertEquals(1, layer.security_indexed_engine) + + + def test_analyze_layer_success_events(self): + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertFalse(layer.security_indexed) + self.assertEquals(-1, layer.security_indexed_engine) + + # Ensure there are no existing events. + self.assertIsNone(notification_queue.get()) + + # Add a repo event for the layer. + repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) + model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) + + with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content): + analyzer = LayerAnalyzer(app.config, self.api) + analyzer.analyze_recursively(layer) + + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertAnalyzed(layer, True, 1) + + # Ensure an event was written for the tag. + queue_item = notification_queue.get() + self.assertIsNotNone(queue_item) + + body = json.loads(queue_item.body) + self.assertEquals(['latest', 'prod'], body['event_data']['tags']) + self.assertEquals('CVE-2014-9471', body['event_data']['vulnerability']['id']) + self.assertEquals('Low', body['event_data']['vulnerability']['priority']) + self.assertTrue(body['event_data']['vulnerability']['has_fix']) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/test/testconfig.py b/test/testconfig.py index 0146a0988..994d3d8c4 100644 --- a/test/testconfig.py +++ b/test/testconfig.py @@ -26,7 +26,7 @@ class TestConfig(DefaultConfig): DB_URI = os.environ.get('TEST_DATABASE_URI', 'sqlite:///{0}'.format(TEST_DB_FILE.name)) DB_CONNECTION_ARGS = { 'threadlocals': True, - 'autorollback': True + 'autorollback': True, } @staticmethod @@ -59,7 +59,8 @@ class TestConfig(DefaultConfig): FEATURE_SECURITY_SCANNER = True SECURITY_SCANNER = { - 'ENDPOINT': 'http://localhost/some/invalid/path', + 'ENDPOINT': 'http://mockclairservice/', + 'API_VERSION': 'v1', 'ENGINE_VERSION_TARGET': 1, 'API_CALL_TIMEOUT': 1 } \ No newline at end of file diff --git a/util/secscan/__init__.py b/util/secscan/__init__.py index e69de29bb..d09cadd73 100644 --- a/util/secscan/__init__.py +++ b/util/secscan/__init__.py @@ -0,0 +1,90 @@ +# NOTE: This objects are used directly in the external-notification-data and vulnerability-service +# on the frontend, so be careful with changing their existing keys. +PRIORITY_LEVELS = { + 'Unknown': { + 'title': 'Unknown', + 'index': '6', + 'level': 'info', + + 'description': 'Unknown is either a security problem that has not been assigned ' + + 'to a priority yet or a priority that our system did not recognize', + 'banner_required': False + }, + + 'Negligible': { + 'title': 'Negligible', + 'index': '5', + 'level': 'info', + + 'description': 'Negligible is technically a security problem, but is only theoretical ' + + 'in nature, requires a very special situation, has almost no install base, ' + + 'or does no real damage.', + 'banner_required': False + }, + + 'Low': { + 'title': 'Low', + 'index': '4', + 'level': 'warning', + + 'description': 'Low is a security problem, but is hard to exploit due to environment, ' + + 'requires a user-assisted attack, a small install base, or does very ' + + 'little damage.', + 'banner_required': False + }, + + 'Medium': { + 'title': 'Medium', + 'value': 'Medium', + 'index': '3', + 'level': 'warning', + + 'description': 'Medium is a real security problem, and is exploitable for many people. ' + + 'Includes network daemon denial of service attacks, cross-site scripting, ' + + 'and gaining user privileges.', + 'banner_required': False + }, + + 'High': { + 'title': 'High', + 'value': 'High', + 'index': '2', + 'level': 'warning', + + 'description': 'High is a real problem, exploitable for many people in a default installation. ' + + 'Includes serious remote denial of services, local root privilege escalations, ' + + 'or data loss.', + 'banner_required': False + }, + + 'Critical': { + 'title': 'Critical', + 'value': 'Critical', + 'index': '1', + 'level': 'error', + + 'description': 'Critical is a world-burning problem, exploitable for nearly all people in ' + + 'a installation of the package. Includes remote root privilege escalations, ' + + 'or massive data loss.', + 'banner_required': True + }, + + 'Defcon1': { + 'title': 'Defcon 1', + 'value': 'Defcon1', + 'index': '0', + 'level': 'error', + + 'description': 'Defcon1 is a Critical problem which has been manually highlighted ' + + 'by the Quay team. It requires immediate attention.', + 'banner_required': True + } +} + + +def get_priority_for_index(index): + for priority in PRIORITY_LEVELS: + if PRIORITY_LEVELS[priority]['index'] == index: + return priority + + return 'Unknown' \ No newline at end of file diff --git a/util/secscan/analyzer.py b/util/secscan/analyzer.py new file mode 100644 index 000000000..4533e933e --- /dev/null +++ b/util/secscan/analyzer.py @@ -0,0 +1,138 @@ +import logging +import logging.config + +from collections import defaultdict + +from endpoints.notificationhelper import spawn_notification +from data.database import Image, ExternalNotificationEvent +from data.model.tag import filter_tags_have_repository_event, get_tags_for_image +from data.model.image import set_secscan_status, get_image_with_storage_and_parent_base +from util.secscan.api import APIRequestFailure + +logger = logging.getLogger(__name__) + + +class LayerAnalyzer(object): + """ Helper class to perform analysis of a layer via the security scanner. """ + def __init__(self, config, api): + secscan_config = config.get('SECURITY_SCANNER') + + self._api = api + self._target_version = secscan_config['ENGINE_VERSION_TARGET'] + + + def analyze_recursively(self, layer): + """ Analyzes a layer and all its parents. + + Return a tuple of two bools: + - The first one tells us if the layer and its parents analyzed successfully. + - The second one is set to False when another call pre-empted the candidate's analysis + for us. + """ + if layer.parent_id and layer.parent.security_indexed_engine < self._target_version: + # The image has a parent that is not analyzed yet with this engine. + # Get the parent to get it's own parent and recurse. + try: + base_query = get_image_with_storage_and_parent_base() + parent_layer = base_query.where(Image.id == layer.parent_id).get() + except Image.DoesNotExist: + logger.warning("Image %s has Image %s as parent but doesn't exist.", layer.id, + layer.parent_id) + + return False, set_secscan_status(layer, False, self._target_version) + + cont, _ = self.analyze_recursively(parent_layer) + if not cont: + # The analysis failed for some reason and did not mark the layer as failed, + # thus we should not try to analyze the children of that layer. + # Interrupt the recursive analysis and return as no-one pre-empted us. + return False, True + + # Now we know all parents are analyzed. + return self._analyze(layer) + + + def _analyze(self, layer): + """ Analyzes a single layer. + + Return a tuple of two bools: + - The first one tells us if we should evaluate its children. + - The second one is set to False when another worker pre-empted the candidate's analysis + for us. + """ + + # If the parent couldn't be analyzed with the target version or higher, we can't analyze + # this image. Mark it as failed with the current target version. + if (layer.parent_id and not layer.parent.security_indexed and + layer.parent.security_indexed_engine >= self._target_version): + return True, set_secscan_status(layer, False, self._target_version) + + # Analyze the image. + logger.info('Analyzing layer %s', layer.docker_image_id) + (analyzed_version, should_requeue) = self._api.analyze_layer(layer) + + # If analysis failed, then determine whether we need to requeue. + if not analyzed_version: + if should_requeue: + # If the layer needs to be requeued, return that the children cannot be analyzed (at this + # time) and there was no collision with another worker. + return False, False + else: + # If the layer cannot be requeued, we allow the children to be analyzed, because the code + # path above will mark them as not analyzable, and we mark the image itself as not being + # analyzable. + return True, set_secscan_status(layer, False, self._target_version) + + # Mark the image as analyzed. + logger.info('Analyzed layer %s successfully with version %s', layer.docker_image_id, + analyzed_version) + set_status = set_secscan_status(layer, True, analyzed_version) + + # If we are the one who've done the job successfully first, get the vulnerabilities and + # send notifications to the repos that have a tag on that layer. + if set_status: + # Get the tags of the layer we analyzed. + repository_map = defaultdict(list) + event = ExternalNotificationEvent.get(name='vulnerability_found') + matching = list(filter_tags_have_repository_event(get_tags_for_image(layer.id), event)) + + for tag in matching: + repository_map[tag.repository_id].append(tag) + + # If there is at least one tag, + # Lookup the vulnerabilities for the image, now that it is analyzed. + if len(repository_map) > 0: + logger.debug('Loading data for layer %s', layer.id) + try: + layer_data = self._api.get_layer_data(layer, include_vulnerabilities=True) + except APIRequestFailure: + layer_data = None + + if layer_data is not None: + # Dispatch events for any detected vulnerabilities + logger.debug('Got data for layer %s: %s', layer.id, layer_data) + found_features = layer_data['Layer']['Features'] + for repository_id in repository_map: + tags = repository_map[repository_id] + + for feature in found_features: + if 'Vulnerabilities' not in feature: + continue + + for vulnerability in feature['Vulnerabilities']: + event_data = { + 'tags': [tag.name for tag in tags], + 'vulnerability': { + 'id': vulnerability['Name'], + 'description': vulnerability.get('Description', None), + 'link': vulnerability.get('Link', None), + 'has_fix': 'FixedBy' in vulnerability, + + # TODO: Change this key name if/when we change the event format. + 'priority': vulnerability.get('Severity', 'Unknown'), + }, + } + + spawn_notification(tags[0].repository, 'vulnerability_found', event_data) + + return True, set_status diff --git a/util/secscan/api.py b/util/secscan/api.py index 5041ec8ff..fd4d369fe 100644 --- a/util/secscan/api.py +++ b/util/secscan/api.py @@ -1,205 +1,180 @@ -import features import logging import requests from data.database import CloseForLongOperation +from data import model +from data.model.storage import get_storage_locations + from urlparse import urljoin +from util.secscan.validator import SecurityConfigValidator logger = logging.getLogger(__name__) -# NOTE: This objects are used directly in the external-notification-data and vulnerability-service -# on the frontend, so be careful with changing their existing keys. -PRIORITY_LEVELS = { - 'Unknown': { - 'title': 'Unknown', - 'index': '6', - 'level': 'info', +class AnalyzeLayerException(Exception): + """ Exception raised when a layer fails to analyze due to a *client-side* issue. """ - 'description': 'Unknown is either a security problem that has not been assigned ' + - 'to a priority yet or a priority that our system did not recognize', - 'banner_required': False - }, - - 'Negligible': { - 'title': 'Negligible', - 'index': '5', - 'level': 'info', - - 'description': 'Negligible is technically a security problem, but is only theoretical ' + - 'in nature, requires a very special situation, has almost no install base, ' + - 'or does no real damage.', - 'banner_required': False - }, - - 'Low': { - 'title': 'Low', - 'index': '4', - 'level': 'warning', - - 'description': 'Low is a security problem, but is hard to exploit due to environment, ' + - 'requires a user-assisted attack, a small install base, or does very ' + - 'little damage.', - 'banner_required': False - }, - - 'Medium': { - 'title': 'Medium', - 'value': 'Medium', - 'index': '3', - 'level': 'warning', - - 'description': 'Medium is a real security problem, and is exploitable for many people. ' + - 'Includes network daemon denial of service attacks, cross-site scripting, ' + - 'and gaining user privileges.', - 'banner_required': False - }, - - 'High': { - 'title': 'High', - 'value': 'High', - 'index': '2', - 'level': 'warning', - - 'description': 'High is a real problem, exploitable for many people in a default installation. ' + - 'Includes serious remote denial of services, local root privilege escalations, ' + - 'or data loss.', - 'banner_required': False - }, - - 'Critical': { - 'title': 'Critical', - 'value': 'Critical', - 'index': '1', - 'level': 'error', - - 'description': 'Critical is a world-burning problem, exploitable for nearly all people in ' + - 'a installation of the package. Includes remote root privilege escalations, ' + - 'or massive data loss.', - 'banner_required': True - }, - - 'Defcon1': { - 'title': 'Defcon 1', - 'value': 'Defcon1', - 'index': '0', - 'level': 'error', - - 'description': 'Defcon1 is a Critical problem which has been manually highlighted ' + - 'by the Quay team. It requires immediate attention.', - 'banner_required': True - } -} +class APIRequestFailure(Exception): + """ Exception raised when there is a failure to conduct an API request. """ -def get_priority_for_index(index): - for priority in PRIORITY_LEVELS: - if PRIORITY_LEVELS[priority]['index'] == index: - return priority - - return 'Unknown' - -class SecurityConfigValidator(object): - def __init__(self, app, config_provider): - self._config_provider = config_provider - - if not features.SECURITY_SCANNER: - return - - self._security_config = app.config['SECURITY_SCANNER'] - if self._security_config is None: - return - - self._certificate = self._get_filepath('CA_CERTIFICATE_FILENAME') or False - self._public_key = self._get_filepath('PUBLIC_KEY_FILENAME') - self._private_key = self._get_filepath('PRIVATE_KEY_FILENAME') - - if self._public_key and self._private_key: - self._keys = (self._public_key, self._private_key) - else: - self._keys = None - - def _get_filepath(self, key): - config = self._security_config - - if key in config: - with self._config_provider.get_volume_file(config[key]) as f: - return f.name - - return None - - def cert(self): - return self._certificate - - def keypair(self): - return self._keys - - def valid(self): - if not features.SECURITY_SCANNER: - return False - - if not self._security_config: - logger.debug('Missing SECURITY_SCANNER block in configuration') - return False - - if not 'ENDPOINT' in self._security_config: - logger.debug('Missing ENDPOINT field in SECURITY_SCANNER configuration') - return False - - endpoint = self._security_config['ENDPOINT'] or '' - if not endpoint.startswith('http://') and not endpoint.startswith('https://'): - logger.debug('ENDPOINT field in SECURITY_SCANNER configuration must start with http or https') - return False - - if endpoint.startswith('https://') and (self._certificate is False or self._keys is None): - logger.debug('Certificate and key pair required for talking to security worker over HTTPS') - return False - - return True +_API_METHOD_INSERT = 'layers' +_API_METHOD_GET_LAYER = 'layers/%s' +_API_METHOD_GET_WITH_VULNERABILITIES_FLAG = '?vulnerabilities' +_API_METHOD_GET_WITH_FEATURES_FLAG = '?features' class SecurityScannerAPI(object): """ Helper class for talking to the Security Scan service (Clair). """ - def __init__(self, app, config_provider): - self.app = app + def __init__(self, config, config_provider, storage): + self.config = config self.config_provider = config_provider + + self._storage = storage self._security_config = None - config_validator = SecurityConfigValidator(app, config_provider) + config_validator = SecurityConfigValidator(config, config_provider) if not config_validator.valid(): logger.warning('Invalid config provided to SecurityScannerAPI') return - self._security_config = app.config.get('SECURITY_SCANNER') + self._default_storage_locations = config['DISTRIBUTED_STORAGE_PREFERENCE'] + + self._security_config = config.get('SECURITY_SCANNER') + self._target_version = self._security_config['ENGINE_VERSION_TARGET'] + self._certificate = config_validator.cert() self._keys = config_validator.keypair() - def check_layer_vulnerable(self, layer_id, cve_id): - """ Checks with Clair whether the given layer is vulnerable to the given CVE. """ - try: - body = { - 'LayersIDs': [layer_id] + + def _get_image_url(self, image): + """ Gets the download URL for an image and if the storage doesn't exist, + returns None. + """ + path = model.storage.get_layer_path(image.storage) + locations = self._default_storage_locations + + if not self._storage.exists(locations, path): + locations = get_storage_locations(image.storage.uuid) + + if not locations or not self._storage.exists(locations, path): + logger.warning('Could not find a valid location to download layer %s.%s out of %s', + image.docker_image_id, image.storage.uuid, locations) + return None + + uri = self._storage.get_direct_download_url(locations, path) + if uri is None: + # Handle local storage. + local_storage_enabled = False + for storage_type, _ in self.config.get('DISTRIBUTED_STORAGE_CONFIG', {}).values(): + if storage_type == 'LocalStorage': + local_storage_enabled = True + + if local_storage_enabled: + # TODO: fix to use the proper local storage path. + uri = path + else: + logger.warning('Could not get image URL and local storage was not enabled') + return None + + return uri + + + def _new_analyze_request(self, image): + """ Create the request body to submit the given image for analysis. If the image's URL cannot + be found, returns None. + """ + url = self._get_image_url(image) + if url is None: + return None + + request = { + 'Layer': { + 'Name': '%s.%s' % (image.docker_image_id, image.storage.uuid), + 'Path': url, + 'Format': 'Docker' } - response = self.call('vulnerabilities/%s/affected-layers', body, cve_id) - except requests.exceptions.RequestException: - logger.exception('Got exception when trying to call Clair endpoint') - return False + } - if response.status_code != 200: - return False + if image.parent.docker_image_id and image.parent.storage.uuid: + request['Layer']['ParentName'] = '%s.%s' % (image.parent.docker_image_id, + image.parent.storage.uuid) + return request + + + def analyze_layer(self, layer): + """ Posts the given layer to the security scanner for analysis, blocking until complete. + Returns a tuple containing the analysis version (on success, None on failure) and + whether the request should be retried. + """ + request = self._new_analyze_request(layer) + if not request: + return None, False + + logger.info('Analyzing layer %s', request['Layer']['Name']) try: - response_data = response.json() - except ValueError: - logger.exception('Got exception when trying to parse Clair response') - return False + response = self._call(_API_METHOD_INSERT, request) + json_response = response.json() + except requests.exceptions.Timeout: + logger.exception('Timeout when trying to post layer data response for %s', layer.id) + return None, True + except requests.exceptions.ConnectionError: + logger.exception('Connection error when trying to post layer data response for %s', layer.id) + return None, True + except (requests.exceptions.RequestException, ValueError): + logger.exception('Failed to post layer data response for %s', layer.id) + return None, False - if (not layer_id in response_data or - not response_data[layer_id].get('Vulnerable', False)): - return False + # Handle any errors from the security scanner. + if response.status_code != 201: + message = json_response.get('Error').get('Message', '') + logger.warning('A warning event occurred when analyzing layer %s (status code %s): %s', + request['Layer']['Name'], response.status_code, message) - return True + # 400 means the layer could not be analyzed due to a bad request. + if response.status_code == 400: + logger.error('Bad request when calling security scanner for layer %s: %s', + response.status_code, json_response) + raise AnalyzeLayerException('Bad request to security scanner') - def call(self, relative_url, body=None, *args, **kwargs): + # 422 means that the layer could not be analyzed: + # - the layer could not be extracted (manifest?) + # - the layer operating system / package manager is unsupported + return None, response.status_code != 422 + + api_version = json_response['Layer']['IndexedByVersion'] + return api_version, False + + + def get_layer_data(self, layer, include_features=False, include_vulnerabilities=False): + """ Returns the layer data for the specified layer. On error, returns None. """ + layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) + try: + flag = '' + if include_features: + flag = _API_METHOD_GET_WITH_FEATURES_FLAG + + if include_vulnerabilities: + flag = _API_METHOD_GET_WITH_VULNERABILITIES_FLAG + + response = self._call(_API_METHOD_GET_LAYER + flag, None, layer_id) + logger.debug('Got response %s for vulnerabilities for layer %s', + response.status_code, layer_id) + except requests.exceptions.Timeout: + raise APIRequestFailure('API call timed out') + except requests.exceptions.ConnectionError: + raise APIRequestFailure('Could not connect to security service') + except (requests.exceptions.RequestException, ValueError): + logger.exception('Failed to get layer data response for %s', layer.id) + raise APIRequestFailure() + + if response.status_code == 404: + return None + + return response.json() + + + def _call(self, relative_url, body=None, *args, **kwargs): """ Issues an HTTP call to the sec API at the given relative URL. This function disconnects from the database while awaiting a response from the API server. @@ -211,14 +186,16 @@ class SecurityScannerAPI(object): api_url = urljoin(security_config['ENDPOINT'], '/' + security_config['API_VERSION']) + '/' url = urljoin(api_url, relative_url % args) - client = self.app.config['HTTPCLIENT'] + client = self.config['HTTPCLIENT'] timeout = security_config.get('API_TIMEOUT_SECONDS', 1) logger.debug('Looking up sec information: %s', url) - with CloseForLongOperation(self.app.config): + with CloseForLongOperation(self.config): if body is not None: + logger.debug('POSTing security URL %s', url) return client.post(url, json=body, params=kwargs, timeout=timeout, cert=self._keys, verify=self._certificate) else: + logger.debug('GETing security URL %s', url) return client.get(url, params=kwargs, timeout=timeout, cert=self._keys, verify=self._certificate) diff --git a/util/secscan/validator.py b/util/secscan/validator.py new file mode 100644 index 000000000..44739a825 --- /dev/null +++ b/util/secscan/validator.py @@ -0,0 +1,65 @@ +import features +import logging + +logger = logging.getLogger(__name__) + + +class SecurityConfigValidator(object): + """ Helper class for validating the security scanner configuration. """ + def __init__(self, config, config_provider): + self._config_provider = config_provider + + if not features.SECURITY_SCANNER: + return + + self._security_config = config['SECURITY_SCANNER'] + if self._security_config is None: + return + + self._certificate = self._get_filepath('CA_CERTIFICATE_FILENAME') or False + self._public_key = self._get_filepath('PUBLIC_KEY_FILENAME') + self._private_key = self._get_filepath('PRIVATE_KEY_FILENAME') + + if self._public_key and self._private_key: + self._keys = (self._public_key, self._private_key) + else: + self._keys = None + + def _get_filepath(self, key): + config = self._security_config + + if key in config: + with self._config_provider.get_volume_file(config[key]) as f: + return f.name + + return None + + def cert(self): + return self._certificate + + def keypair(self): + return self._keys + + def valid(self): + if not features.SECURITY_SCANNER: + return False + + if not self._security_config: + logger.debug('Missing SECURITY_SCANNER block in configuration') + return False + + if not 'ENDPOINT' in self._security_config: + logger.debug('Missing ENDPOINT field in SECURITY_SCANNER configuration') + return False + + endpoint = self._security_config['ENDPOINT'] or '' + if not endpoint.startswith('http://') and not endpoint.startswith('https://'): + logger.debug('ENDPOINT field in SECURITY_SCANNER configuration must start with http or https') + return False + + if endpoint.startswith('https://') and (self._certificate is False or self._keys is None): + logger.debug('Certificate and key pair required for talking to security worker over HTTPS') + return False + + return True + diff --git a/workers/securityworker.py b/workers/securityworker.py index 51998e068..a39bccb71 100644 --- a/workers/securityworker.py +++ b/workers/securityworker.py @@ -1,98 +1,37 @@ import logging import logging.config -import requests import features import time from peewee import fn -from collections import defaultdict -from app import app, config_provider, storage, secscan_api -from endpoints.notificationhelper import spawn_notification +from app import app, config_provider, secscan_api from workers.worker import Worker -from data import model -from data.database import (Image, UseThenDisconnect, ExternalNotificationEvent) -from data.model.tag import filter_tags_have_repository_event, get_tags_for_image -from data.model.image import set_secscan_status, get_image_with_storage_and_parent_base -from data.model.storage import get_storage_locations +from data.database import Image, UseThenDisconnect +from data.model.image import get_image_with_storage_and_parent_base from util.secscan.api import SecurityConfigValidator +from util.secscan.analyzer import LayerAnalyzer from util.migrate.allocator import yield_random_entries BATCH_SIZE = 50 INDEXING_INTERVAL = 30 -API_METHOD_INSERT = '/v1/layers' -API_METHOD_GET_WITH_VULNERABILITIES = '/v1/layers/%s?vulnerabilities' logger = logging.getLogger(__name__) class SecurityWorker(Worker): def __init__(self): super(SecurityWorker, self).__init__() - validator = SecurityConfigValidator(app, config_provider) + validator = SecurityConfigValidator(app.config, config_provider) if validator.valid(): secscan_config = app.config.get('SECURITY_SCANNER') - self._api = secscan_config['ENDPOINT'] self._target_version = secscan_config['ENGINE_VERSION_TARGET'] - self._default_storage_locations = app.config['DISTRIBUTED_STORAGE_PREFERENCE'] - self._cert = validator.cert() - self._keys = validator.keypair() + self._analyzer = LayerAnalyzer(app.config, secscan_api) self.add_operation(self._index_images, INDEXING_INTERVAL) else: logger.warning('Failed to validate security scan configuration') - def _new_request(self, image): - """ Create the request body to submit the given image for analysis. """ - url = self._get_image_url(image) - if url is None: - return None - - request = { - 'Layer': { - 'Name': '%s.%s' % (image.docker_image_id, image.storage.uuid), - 'Path': url, - 'Format': 'Docker' - } - } - - if image.parent.docker_image_id and image.parent.storage.uuid: - request['Layer']['ParentName'] = '%s.%s' % (image.parent.docker_image_id, - image.parent.storage.uuid) - - return request - - def _get_image_url(self, image): - """ Gets the download URL for an image and if the storage doesn't exist, - marks the image as unindexed. """ - path = model.storage.get_layer_path(image.storage) - locations = self._default_storage_locations - - if not storage.exists(locations, path): - locations = get_storage_locations(image.storage.uuid) - - if not locations or not storage.exists(locations, path): - logger.warning('Could not find a valid location to download layer %s.%s', - image.docker_image_id, image.storage.uuid) - set_secscan_status(image, False, self._target_version) - return None - - uri = storage.get_direct_download_url(locations, path) - if uri is None: - # Handle local storage - local_storage_enabled = False - for storage_type, _ in app.config.get('DISTRIBUTED_STORAGE_CONFIG', {}).values(): - if storage_type == 'LocalStorage': - local_storage_enabled = True - - if local_storage_enabled: - uri = path - else: - logger.warning('Could not get image URL and local storage was not enabled') - return None - - return uri - def _index_images(self): def batch_query(): base_query = get_image_with_storage_and_parent_base() @@ -106,144 +45,11 @@ class SecurityWorker(Worker): with UseThenDisconnect(app.config): for candidate, abt in yield_random_entries(batch_query, Image.id, BATCH_SIZE, max_id, min_id): - _, continue_batch = self._analyze_recursively(candidate) + _, continue_batch = self._analyzer.analyze_recursively(candidate) if not continue_batch: logger.info('Another worker pre-empted us for layer: %s', candidate.id) abt.set() - def _analyze_recursively(self, layer): - """ Analyzes a layer and all its parents """ - if layer.parent_id and layer.parent.security_indexed_engine < self._target_version: - # The image has a parent that is not analyzed yet with this engine. - # Get the parent to get it's own parent and recurse. - try: - base_query = get_image_with_storage_and_parent_base() - parent_layer = base_query.where(Image.id == layer.parent_id).get() - except Image.DoesNotExist: - logger.warning("Image %s has Image %s as parent but doesn't exist.", layer.id, - layer.parent_id) - - return False, set_secscan_status(layer, False, self._target_version) - - cont, _ = self._analyze_recursively(parent_layer) - if not cont: - # The analysis failed for some reason and did not mark the layer as failed, - # thus we should not try to analyze the children of that layer. - # Interrupt the recursive analysis and return as no-one pre-empted us. - return False, True - - # Now we know all parents are analyzed. - return self._analyze(layer) - - def _analyze(self, layer): - """ Analyzes a single layer. - Return two bools, the first one tells us if we should evaluate its children, the second - one is set to False when another worker pre-empted the candidate's analysis for us. """ - - # If the parent couldn't be analyzed with the target version or higher, we can't analyze - # this image. Mark it as failed with the current target version. - if (layer.parent_id and not layer.parent.security_indexed and - layer.parent.security_indexed_engine >= self._target_version): - return True, set_secscan_status(layer, False, self._target_version) - - request = self._new_request(layer) - if request is None: - return False, True - - # Analyze the image. - try: - logger.info('Analyzing layer %s', request['Layer']['Name']) - # Using invalid certificates doesn't return proper errors because of - # https://github.com/shazow/urllib3/issues/556 - http_response = requests.post(self._api + API_METHOD_INSERT, json=request, - cert=self._keys, verify=self._cert) - json_response = http_response.json() - except (requests.exceptions.RequestException, ValueError): - logger.exception('An exception occurred when analyzing layer %s', request['Layer']['Name']) - return False, True - - # Handle any errors from the security scanner. - if http_response.status_code != 201: - message = json_response.get('Error').get('Message', '') - logger.warning('A warning event occurred when analyzing layer %s (status code %s): %s', - request['Layer']['Name'], http_response.status_code, message) - - # 422 means that the layer could not be analyzed: - # - the layer could not be extracted (manifest?) - # - the layer operating system / package manager is unsupported - # Set the layer as failed. - if http_response.status_code == 422: - return True, set_secscan_status(layer, False, self._target_version) - else: - return False, True - - # Verify that the version matches. - api_version = json_response['Layer']['IndexedByVersion'] - if api_version < self._target_version: - logger.warning('An engine runs on version %d but the target version is %d', api_version, - self._target_version) - - # Mark the image as analyzed. - logger.info('Analyzed layer %s successfully', request['Layer']['Name']) - set_status = set_secscan_status(layer, True, api_version) - - # If we are the one who've done the job successfully first, get the vulnerabilities and - # send notifications to the repos that have a tag on that layer. - # TODO(josephschorr): Adapt this depending on the new notification format we adopt. - # if set_status: - # # Get the tags of the layer we analyzed. - # repository_map = defaultdict(list) - # event = ExternalNotificationEvent.get(name='vulnerability_found') - # matching = list(filter_tags_have_repository_event(get_tags_for_image(layer.id), event)) - # - # for tag in matching: - # repository_map[tag.repository_id].append(tag) - # - # # If there is at least one tag, - # # Lookup the vulnerabilities for the image, now that it is analyzed. - # if len(repository_map) > 0: - # logger.debug('Loading vulnerabilities for layer %s', layer.id) - # sec_data = self._get_vulnerabilities(layer) - # - # if sec_data is not None: - # # Dispatch events for any detected vulnerabilities - # logger.debug('Got vulnerabilities for layer %s: %s', layer.id, sec_data) - # - # for repository_id in repository_map: - # tags = repository_map[repository_id] - # - # for vuln in sec_data['Vulnerabilities']: - # event_data = { - # 'tags': [tag.name for tag in tags], - # 'vulnerability': { - # 'id': vuln['Name'], - # 'description': vuln['Description'], - # 'link': vuln['Link'], - # 'priority': vuln['Priority'], - # }, - # } - # - # spawn_notification(tags[0].repository, 'vulnerability_found', event_data) - - return True, set_status - - def _get_vulnerabilities(self, layer): - """ Returns the vulnerabilities detected (if any) or None on error. """ - try: - response = secscan_api.call(self._api + API_METHOD_GET_WITH_VULNERABILITIES, None, - '%s.%s' % (layer.docker_image_id, layer.storage.uuid)) - - logger.debug('Got response %s for vulnerabilities for layer %s', - response.status_code, layer.id) - - if response.status_code == 404: - return None - except (requests.exceptions.RequestException, ValueError): - logger.exception('Failed to get vulnerability response for %s', layer.id) - return None - - return response.json() - if __name__ == '__main__': if not features.SECURITY_SCANNER: