diff --git a/data/model/image.py b/data/model/image.py index 943d9f383..b22f70ce3 100644 --- a/data/model/image.py +++ b/data/model/image.py @@ -1,5 +1,6 @@ import logging import dateutil.parser +import random from peewee import JOIN_LEFT_OUTER, fn, SQL from datetime import datetime @@ -7,7 +8,8 @@ from datetime import datetime from data.model import (DataModelException, db_transaction, _basequery, storage, InvalidImageException, config) from data.database import (Image, Repository, ImageStoragePlacement, Namespace, ImageStorage, - ImageStorageLocation, RepositoryPermission, db_for_update) + ImageStorageLocation, RepositoryPermission, db_for_update, + db_random_func) logger = logging.getLogger(__name__) @@ -459,3 +461,74 @@ def ensure_image_locations(*names): data = [{'name': name} for name in insert_names] ImageStorageLocation.insert_many(data).execute() + +def get_secscan_candidates(engine_version, batch_size): + Parent = Image.alias() + ParentImageStorage = ImageStorage.alias() + rimages = [] + + # Collect the images without parents + candidates = (Image + .select(Image.id) + .join(ImageStorage) + .where(Image.security_indexed_engine < engine_version, + Image.parent >> None, + ImageStorage.uploading == False) + .limit(batch_size*10)) + + images = (Image + .select(Image, ImageStorage) + .join(ImageStorage) + .where(Image.id << candidates) + .order_by(db_random_func()) + .limit(batch_size)) + + for image in images: + rimages.append(image) + + # Collect the images with analyzed parents. + candidates = (Image + .select(Image.id) + .join(Parent, on=(Image.parent == Parent.id)) + .switch(Image) + .join(ImageStorage) + .where(Image.security_indexed_engine < engine_version, + Parent.security_indexed == True, + Parent.security_indexed_engine >= engine_version, + ImageStorage.uploading == False) + .limit(batch_size*10)) + + images = (Image + .select(Image, ImageStorage, Parent, ParentImageStorage) + .join(Parent, on=(Image.parent == Parent.id)) + .join(ParentImageStorage, on=(ParentImageStorage.id == Parent.storage)) + .switch(Image) + .join(ImageStorage) + .where(Image.id << candidates) + .order_by(db_random_func()) + .limit(batch_size)) + + for image in images: + rimages.append(image) + + # Shuffle the images, otherwise the images without parents will always be on the top + random.shuffle(rimages) + + return rimages + + +def set_secscan_status(image, indexed, version): + query = (Image + .select() + .join(ImageStorage) + .where(Image.docker_image_id == image.docker_image_id, + ImageStorage.uuid == image.storage.uuid)) + + ids_to_update = [row.id for row in query] + if not ids_to_update: + return + + (Image + .update(security_indexed=indexed, security_indexed_engine=version) + .where(Image.id << ids_to_update) + .execute()) diff --git a/data/model/storage.py b/data/model/storage.py index 77ab9148c..d3e452d9f 100644 --- a/data/model/storage.py +++ b/data/model/storage.py @@ -225,3 +225,13 @@ def lookup_repo_storages_by_content_checksum(repo, checksums): .select() .join(Image) .where(Image.repository == repo, ImageStorage.content_checksum << checksums)) + +def get_storage_locations(uuid): + query = (ImageStoragePlacement + .select() + .join(ImageStorageLocation) + .switch(ImageStoragePlacement) + .join(ImageStorage, JOIN_LEFT_OUTER) + .where(ImageStorage.uuid == uuid)) + + return [location.location.name for location in query] diff --git a/data/model/tag.py b/data/model/tag.py index 1a7932347..6883a39e3 100644 --- a/data/model/tag.py +++ b/data/model/tag.py @@ -3,7 +3,7 @@ from uuid import uuid4 from data.model import (image, db_transaction, DataModelException, _basequery, InvalidManifestException) from data.database import (RepositoryTag, Repository, Image, ImageStorage, Namespace, TagManifest, - get_epoch_timestamp, db_for_update) + RepositoryNotification, get_epoch_timestamp, db_for_update) def _tag_alive(query, now_ts=None): @@ -18,15 +18,29 @@ def get_matching_tags(docker_image_id, storage_uuid, *args): given docker_image_id and storage_uuid. """ image_query = image.get_repository_image_and_deriving(docker_image_id, storage_uuid) - return (RepositoryTag - .select(*args) - .distinct() - .join(Image) - .join(ImageStorage) - .where(Image.id << image_query, RepositoryTag.lifetime_end_ts >> None, - RepositoryTag.hidden == False)) + return _tag_alive(RepositoryTag + .select(*args) + .distinct() + .join(Image) + .join(ImageStorage) + .where(Image.id << image_query, RepositoryTag.hidden == False)) +def get_tags_for_image(image_id, *args): + return _tag_alive(RepositoryTag + .select(*args) + .distinct() + .where(RepositoryTag.image == image_id, + RepositoryTag.hidden == False)) + + +def filter_tags_have_repository_event(query, event): + return (query + .switch(RepositoryTag) + .join(Repository) + .join(RepositoryNotification) + .where(RepositoryNotification.event == event)) + def list_repository_tags(namespace_name, repository_name, include_hidden=False, include_storage=False): to_select = (RepositoryTag, Image) @@ -233,4 +247,3 @@ def _load_repo_manifests(namespace, repo_name): .join(Repository) .join(Namespace, on=(Namespace.id == Repository.namespace_user)) .where(Repository.name == repo_name, Namespace.username == namespace)) - diff --git a/test/data/test.db b/test/data/test.db index 33cbdddfe..e39fc8a87 100644 Binary files a/test/data/test.db and b/test/data/test.db differ diff --git a/workers/security_notification_worker.py b/workers/security_notification_worker.py index e54847abf..603633343 100644 --- a/workers/security_notification_worker.py +++ b/workers/security_notification_worker.py @@ -8,6 +8,7 @@ import features from app import secscan_notification_queue, secscan_api from data import model +from data.model.tag import filter_tags_have_repository_event, get_matching_tags from data.database import (Image, ImageStorage, ExternalNotificationEvent, Repository, RepositoryNotification, RepositoryTag) from endpoints.notificationhelper import spawn_notification @@ -31,23 +32,19 @@ class SecurityNotificationWorker(QueueWorker): tag_map = defaultdict(set) repository_map = {} - # Find all tags that contain the layer(s) introducing the vulnerability. + # Find all tags that contain the layer(s) introducing the vulnerability, + # in repositories that have the event setup. content = data['Content'] layer_ids = content.get('NewIntroducingLayersIDs', content.get('IntroducingLayersIDs', [])) for layer_id in layer_ids: (docker_image_id, storage_uuid) = layer_id.split('.', 2) - tags = model.tag.get_matching_tags(docker_image_id, storage_uuid, RepositoryTag, - Repository, Image, ImageStorage) - # Additionally filter to tags only in repositories that have the event setup. - matching = list(tags - .switch(RepositoryTag) - .join(Repository) - .join(RepositoryNotification) - .where(RepositoryNotification.event == event)) + matching = get_matching_tags(docker_image_id, storage_uuid, RepositoryTag, Repository, + Image, ImageStorage) + tags = list(filter_tags_have_repository_event(matching, event)) check_map = {} - for tag in matching: + for tag in tags: # Verify that the tag's root image has the vulnerability. tag_layer_id = '%s.%s' % (tag.image.docker_image_id, tag.image.storage.uuid) logger.debug('Checking if layer %s is vulnerable to %s', tag_layer_id, cve_id) diff --git a/workers/securityworker.py b/workers/securityworker.py index 2ce3fac9e..786085abe 100644 --- a/workers/securityworker.py +++ b/workers/securityworker.py @@ -4,121 +4,25 @@ import logging.config import requests import features import time -import os -import random from endpoints.notificationhelper import spawn_notification from collections import defaultdict -from peewee import JOIN_LEFT_OUTER -from app import app, config_provider, storage, OVERRIDE_CONFIG_DIRECTORY, secscan_api +from app import app, config_provider, storage, secscan_api from workers.worker import Worker -from data.database import (Image, ImageStorage, ImageStorageLocation, ImageStoragePlacement, - db_random_func, UseThenDisconnect, RepositoryTag, Repository, - ExternalNotificationEvent, RepositoryNotification) +from data import model +from data.model.tag import filter_tags_have_repository_event, get_tags_for_image +from data.model.image import get_secscan_candidates, set_secscan_status +from data.model.storage import get_storage_locations +from data.database import (UseThenDisconnect, ExternalNotificationEvent) from util.secscan.api import SecurityConfigValidator logger = logging.getLogger(__name__) -BATCH_SIZE = 20 -INDEXING_INTERVAL = 10 +BATCH_SIZE = 5 +INDEXING_INTERVAL = 30 API_METHOD_INSERT = '/v1/layers' API_METHOD_VERSION = '/v1/versions/engine' -def _get_images_to_export_list(version): - Parent = Image.alias() - ParentImageStorage = ImageStorage.alias() - rimages = [] - - # Collect the images without parents - candidates = (Image - .select(Image.id, Image.docker_image_id, ImageStorage.uuid) - .join(ImageStorage) - .where(Image.security_indexed_engine < version, - Image.parent_id >> None, - ImageStorage.uploading == False) - .limit(BATCH_SIZE*10) - .alias('candidates')) - - images = (Image - .select(candidates.c.id, candidates.c.docker_image_id, candidates.c.uuid) - .from_(candidates) - .order_by(db_random_func()) - .tuples() - .limit(BATCH_SIZE)) - - for image in images: - rimages.append({'image_id': image[0], - 'docker_image_id': image[1], - 'storage_uuid': image[2], - 'parent_docker_image_id': None, - 'parent_storage_uuid': None}) - - # Collect the images with analyzed parents. - candidates = (Image - .select(Image.id, - Image.docker_image_id, - ImageStorage.uuid, - Parent.docker_image_id.alias('parent_docker_image_id'), - ParentImageStorage.uuid.alias('parent_storage_uuid')) - .join(Parent, on=(Image.parent_id == Parent.id)) - .join(ParentImageStorage, on=(ParentImageStorage.id == Parent.storage)) - .switch(Image) - .join(ImageStorage) - .where(Image.security_indexed_engine < version, - Parent.security_indexed == True, - Parent.security_indexed_engine >= version, - ImageStorage.uploading == False) - .limit(BATCH_SIZE*10) - .alias('candidates')) - - images = (Image - .select(candidates.c.id, - candidates.c.docker_image_id, - candidates.c.uuid, - candidates.c.parent_docker_image_id, - candidates.c.parent_storage_uuid) - .from_(candidates) - .order_by(db_random_func()) - .tuples() - .limit(BATCH_SIZE)) - - for image in images: - rimages.append({'image_id': image[0], - 'docker_image_id': image[1], - 'storage_uuid': image[2], - 'parent_docker_image_id': image[3], - 'parent_storage_uuid': image[4]}) - - # Shuffle the images, otherwise the images without parents will always be on the top - random.shuffle(rimages) - return rimages - -def _get_storage_locations(uuid): - query = (ImageStoragePlacement - .select() - .join(ImageStorageLocation) - .switch(ImageStoragePlacement) - .join(ImageStorage, JOIN_LEFT_OUTER) - .where(ImageStorage.uuid == uuid)) - - return [location.location.name for location in query] - -def _update_image(image, indexed, version): - query = (Image - .select() - .join(ImageStorage) - .where(Image.docker_image_id == image['docker_image_id'], - ImageStorage.uuid == image['storage_uuid'])) - - ids_to_update = [row.id for row in query] - if not ids_to_update: - return - - (Image - .update(security_indexed=indexed, security_indexed_engine=version) - .where(Image.id << ids_to_update) - .execute()) - class SecurityWorker(Worker): def __init__(self): @@ -133,21 +37,22 @@ class SecurityWorker(Worker): self._keys = validator.keypair() self.add_operation(self._index_images, INDEXING_INTERVAL) - logger.warning('Failed to validate security scan configuration') + else: + logger.warning('Failed to validate security scan configuration') def _get_image_url(self, image): """ Gets the download URL for an image and if the storage doesn't exist, marks the image as unindexed. """ - path = storage.image_layer_path(image['storage_uuid']) + path = model.storage.get_layer_path(image.storage) locations = self._default_storage_locations if not storage.exists(locations, path): - locations = _get_storage_locations(image['storage_uuid']) + locations = get_storage_locations(image.storage.uuid) if not locations or not storage.exists(locations, path): - logger.warning('Could not find a valid location to download layer %s', - image['docker_image_id']+'.'+image['storage_uuid']) - _update_image(image, False, self._target_version) + logger.warning('Could not find a valid location to download layer %s.%s', + image.docker_image_id, image.storage.uuid) + set_secscan_status(image, False, self._target_version) return None uri = storage.get_direct_download_url(locations, path) @@ -172,23 +77,21 @@ class SecurityWorker(Worker): return None request = { - 'ID': '%s.%s' % (image['docker_image_id'], image['storage_uuid']), + 'ID': '%s.%s' % (image.docker_image_id, image.storage.uuid), 'Path': url, } - if image['parent_docker_image_id'] is not None and image['parent_storage_uuid'] is not None: - request['ParentID'] = '%s.%s' % (image['parent_docker_image_id'], - image['parent_storage_uuid']) + if image.parent is not None: + request['ParentID'] = '%s.%s' % (image.parent.docker_image_id, + image.parent.storage.uuid) return request def _analyze_image(self, image): - """ Analyzes an image by passing it to Clair. Returns the vulnerabilities detected - (if any) or None on error. - """ + """ Analyzes an image by passing it to Clair. """ request = self._new_request(image) if request is None: - return None + return False # Analyze the image. try: @@ -200,7 +103,7 @@ class SecurityWorker(Worker): jsonResponse = httpResponse.json() except (requests.exceptions.RequestException, ValueError): logger.exception('An exception occurred when analyzing layer ID %s', request['ID']) - return None + return False # Handle any errors from the security scanner. if httpResponse.status_code != 201: @@ -210,11 +113,12 @@ class SecurityWorker(Worker): request['ID'], jsonResponse['Message']) # Hopefully, there is no version lower than the target one running - _update_image(image, False, self._target_version) + set_secscan_status(image, False, self._target_version) + + return True else: logger.warning('Got non-201 when analyzing layer ID %s: %s', request['ID'], jsonResponse) - - return None + return False # Verify that the version matches. api_version = jsonResponse['Version'] @@ -222,25 +126,29 @@ class SecurityWorker(Worker): logger.warning('An engine runs on version %d but the target version is %d') # Mark the image as analyzed. - logger.debug('Layer %s analyzed successfully; Loading vulnerabilities for layer', - image['image_id']) - _update_image(image, True, api_version) + logger.debug('Layer %s analyzed successfully', image.id) + set_secscan_status(image, True, api_version) - # Lookup the vulnerabilities for the image, now that it is analyzed. + return True + + def _get_vulnerabilities(self, image): + """ Returns the vulnerabilities detected (if any) or None on error. """ try: - response = secscan_api.call('layers/%s/vulnerabilities', None, request['ID']) + response = secscan_api.call('layers/%s/vulnerabilities', None, + '%s.%s' % (image.docker_image_id, image.storage.uuid)) logger.debug('Got response %s for vulnerabilities for layer %s', - response.status_code, image['image_id']) + response.status_code, image.id) if response.status_code == 404: return None except (requests.exceptions.RequestException, ValueError): - logger.exception('Failed to get vulnerability response for %s', image['image_id']) + logger.exception('Failed to get vulnerability response for %s', image.id) return None return response.json() def _index_images(self): logger.debug('Started indexing') + event = ExternalNotificationEvent.get(name='vulnerability_found') with UseThenDisconnect(app.config): while True: @@ -248,7 +156,7 @@ class SecurityWorker(Worker): images = [] try: logger.debug('Looking up images to index') - images = _get_images_to_export_list(self._target_version) + images = get_secscan_candidates(self._target_version, BATCH_SIZE) except Image.DoesNotExist: pass @@ -258,48 +166,49 @@ class SecurityWorker(Worker): logger.debug('Found %d images to index', len(images)) for image in images: - # Analyze the image, retrieving the vulnerabilities (if any). - sec_data = self._analyze_image(image) - if sec_data is None: - continue + # Analyze the image. + analyzed = self._analyze_image(image) + if not analyzed: + return - if not sec_data.get('Vulnerabilities'): - continue - - # Dispatch events for any detected vulnerabilities - logger.debug('Got vulnerabilities for layer %s: %s', image['image_id'], sec_data) - event = ExternalNotificationEvent.get(name='vulnerability_found') - matching = (RepositoryTag - .select(RepositoryTag, Repository) - .distinct() - .join(Repository) - .join(RepositoryNotification) - .where(RepositoryNotification.event == event, - RepositoryTag.image == image['image_id'], - RepositoryTag.hidden == False, - RepositoryTag.lifetime_end_ts >> None)) + # Get the tags of the image we analyzed + matching = list(filter_tags_have_repository_event(get_tags_for_image(image.id), event)) repository_map = defaultdict(list) for tag in matching: repository_map[tag.repository_id].append(tag) - for repository_id in repository_map: - tags = repository_map[repository_id] + # If there is at least one tag, + # Lookup the vulnerabilities for the image, now that it is analyzed. + if len(repository_map) > 0: + logger.debug('Loading vulnerabilities for layer %s', image.id) + sec_data = self._get_vulnerabilities(image) - for vuln in sec_data['Vulnerabilities']: - event_data = { - 'tags': [tag.name for tag in tags], - 'vulnerability': { - 'id': vuln['ID'], - 'description': vuln['Description'], - 'link': vuln['Link'], - 'priority': vuln['Priority'], - }, - } + if sec_data is None: + continue - spawn_notification(tags[0].repository, 'vulnerability_found', event_data) + if not sec_data.get('Vulnerabilities'): + continue + # Dispatch events for any detected vulnerabilities + logger.debug('Got vulnerabilities for layer %s: %s', image.id, sec_data) + + for repository_id in repository_map: + tags = repository_map[repository_id] + + for vuln in sec_data['Vulnerabilities']: + event_data = { + 'tags': [tag.name for tag in tags], + 'vulnerability': { + 'id': vuln['ID'], + 'description': vuln['Description'], + 'link': vuln['Link'], + 'priority': vuln['Priority'], + }, + } + + spawn_notification(tags[0].repository, 'vulnerability_found', event_data) if __name__ == '__main__': if not features.SECURITY_SCANNER: