diff --git a/app.py b/app.py index c6c7eecbf..953af51d8 100644 --- a/app.py +++ b/app.py @@ -236,6 +236,7 @@ else: database.configure(app.config) model.config.app_config = app.config model.config.store = storage +model.config.register_image_cleanup_callback(secscan_api.cleanup_layers) @login_manager.user_loader def load_user(user_uuid): diff --git a/data/model/__init__.py b/data/model/__init__.py index 23a1a647c..331b2f0b3 100644 --- a/data/model/__init__.py +++ b/data/model/__init__.py @@ -107,6 +107,10 @@ class Config(object): def __init__(self): self.app_config = None self.store = None + self.image_cleanup_callbacks = [] + + def register_image_cleanup_callback(self, callback): + self.image_cleanup_callbacks.append(callback) config = Config() diff --git a/data/model/repository.py b/data/model/repository.py index 0281bc592..11bcafcf1 100644 --- a/data/model/repository.py +++ b/data/model/repository.py @@ -5,9 +5,9 @@ from datetime import timedelta, datetime from peewee import JOIN_LEFT_OUTER, fn, SQL, IntegrityError from cachetools import ttl_cache -from data.model import (DataModelException, tag, db_transaction, storage, permission, +from data.model import (config, DataModelException, tag, db_transaction, storage, permission, _basequery) -from data.database import (Repository, Namespace, RepositoryTag, Star, Image, User, +from data.database import (Repository, Namespace, RepositoryTag, Star, Image, ImageStorage, User, Visibility, RepositoryPermission, RepositoryActionCount, Role, RepositoryAuthorizedEmail, TagManifest, DerivedStorageForImage, Label, TagManifestLabel, db_for_update, get_epoch_timestamp, @@ -173,29 +173,24 @@ def garbage_collect_repo(repo, extra_candidate_set=None): referenced_candidates = (direct_referenced | ancestor_referenced) - # We desire two pieces of information from the database from the following + # We desire a few pieces of information from the database from the following # query: all of the image ids which are associated with this repository, - # and the storages which are associated with those images. In order to - # fetch just this information, and bypass all of the peewee model parsing - # code, which is overkill for just two fields, we use a tuple query, and - # feed that directly to the dictionary tuple constructor which takes an - # iterable of tuples containing [(k, v), (k, v), ...] + # and the storages which are associated with those images. unreferenced_candidates = (Image - .select(Image.id, Image.storage) + .select(Image.id, Image.docker_image_id, + ImageStorage.id, ImageStorage.uuid) + .join(ImageStorage) .where(Image.id << candidates_orphans, - ~(Image.id << referenced_candidates)) - .tuples()) + ~(Image.id << referenced_candidates))) - unreferecend_images_to_storages = dict(unreferenced_candidates) - to_remove = unreferecend_images_to_storages.keys() - - if len(to_remove) > 0: - logger.info('Cleaning up unreferenced images: %s', to_remove) - storage_id_whitelist = set(unreferecend_images_to_storages.values()) + image_ids_to_remove = [candidate.id for candidate in unreferenced_candidates] + if len(image_ids_to_remove) > 0: + logger.info('Cleaning up unreferenced images: %s', image_ids_to_remove) + storage_id_whitelist = set([candidate.storage_id for candidate in unreferenced_candidates]) # Lookup any derived images for the images to remove. derived = DerivedStorageForImage.select().where( - DerivedStorageForImage.source_image << to_remove) + DerivedStorageForImage.source_image << image_ids_to_remove) has_derived = False for derived_image in derived: @@ -207,21 +202,30 @@ def garbage_collect_repo(repo, extra_candidate_set=None): try: (DerivedStorageForImage .delete() - .where(DerivedStorageForImage.source_image << to_remove) + .where(DerivedStorageForImage.source_image << image_ids_to_remove) .execute()) except IntegrityError: - logger.info('Could not GC derived images %s; will try again soon', to_remove) + logger.info('Could not GC derived images %s; will try again soon', image_ids_to_remove) return False try: - Image.delete().where(Image.id << to_remove).execute() + Image.delete().where(Image.id << image_ids_to_remove).execute() except IntegrityError: - logger.info('Could not GC images %s; will try again soon', to_remove) + logger.info('Could not GC images %s; will try again soon', image_ids_to_remove) return False - if len(to_remove) > 0: - logger.info('Garbage collecting storage for images: %s', to_remove) - storage.garbage_collect_storage(storage_id_whitelist) + # If any images were removed, GC any orphaned storages. + if len(image_ids_to_remove) > 0: + logger.info('Garbage collecting storage for images: %s', image_ids_to_remove) + storage_ids_removed = set(storage.garbage_collect_storage(storage_id_whitelist)) + + # If any storages were removed and cleanup callbacks are registered, call them with + # the images+storages removed. + if storage_ids_removed and config.image_cleanup_callbacks: + image_storages_removed = [candidate for candidate in unreferenced_candidates + if candidate.storage_id in storage_ids_removed] + for callback in config.image_cleanup_callbacks: + callback(image_storages_removed) return True diff --git a/data/model/storage.py b/data/model/storage.py index 0c7bf80ba..61153714e 100644 --- a/data/model/storage.py +++ b/data/model/storage.py @@ -72,8 +72,12 @@ def _orphaned_storage_query(candidate_ids): def garbage_collect_storage(storage_id_whitelist): + """ Performs GC on a possible subset of the storage's with the IDs found in the + whitelist. The storages in the whitelist will be checked, and any orphaned will + be removed, with those IDs being returned. + """ if len(storage_id_whitelist) == 0: - return + return [] def placements_query_to_paths_set(placements_query): return {(get_image_location_for_id(placement.location_id).name, @@ -89,7 +93,7 @@ def garbage_collect_storage(storage_id_whitelist): orphaned_storage_ids = _orphaned_storage_query(storage_id_whitelist) if len(orphaned_storage_ids) == 0: # Nothing to GC. - return + return [] placements_to_remove = list(ImageStoragePlacement .select() @@ -133,6 +137,8 @@ def garbage_collect_storage(storage_id_whitelist): logger.debug('Removing %s from %s', image_path, location_name) config.store.remove({location_name}, image_path) + return orphaned_storage_ids + def create_v1_storage(location_name): storage = ImageStorage.create(cas_path=False) diff --git a/test/test_gc.py b/test/test_gc.py index c366a5a83..5155d354f 100644 --- a/test/test_gc.py +++ b/test/test_gc.py @@ -146,20 +146,43 @@ class TestGarbageCollection(unittest.TestCase): return len(label_ids - referenced_by_manifest) @contextmanager - def assert_no_new_dangling_storages_or_labels(self): + def assert_gc_integrity(self, expect_storage_removed=True): """ Specialized assertion for ensuring that GC cleans up all dangling storages - and labels. + and labels, invokes the callback for images removed and doesn't invoke the + callback for images *not* removed. """ # TODO: Consider also asserting the number of DB queries being performed. + + # Add a callback for when images are removed. + removed_image_storages = [] + model.config.register_image_cleanup_callback(removed_image_storages.extend) + + # Store the number of dangling storages and labels. existing_storage_count = self._get_dangling_storage_count() existing_label_count = self._get_dangling_label_count() yield + + # Ensure the number of dangling storages and labels has not changed. updated_storage_count = self._get_dangling_storage_count() self.assertEqual(updated_storage_count, existing_storage_count) updated_label_count = self._get_dangling_label_count() self.assertEqual(updated_label_count, existing_label_count) + # Ensure that for each call to the image+storage cleanup callback, the image and its + # storage is not found *anywhere* in the database. + for removed_image_and_storage in removed_image_storages: + with self.assertRaises(Image.DoesNotExist): + Image.get(id=removed_image_and_storage.id) + + with self.assertRaises(ImageStorage.DoesNotExist): + ImageStorage.get(id=removed_image_and_storage.storage_id) + + with self.assertRaises(ImageStorage.DoesNotExist): + ImageStorage.get(uuid=removed_image_and_storage.storage.uuid) + + self.assertEquals(expect_storage_removed, bool(removed_image_storages)) + def test_has_garbage(self): """ Remove all existing repositories, then add one without garbage, check, then add one with garbage, and check again. @@ -212,14 +235,14 @@ class TestGarbageCollection(unittest.TestCase): def test_one_tag(self): """ Create a repository with a single tag, then remove that tag and verify that the repository is now empty. """ - with self.assert_no_new_dangling_storages_or_labels(): + with self.assert_gc_integrity(): repository = self.createRepository(latest=['i1', 'i2', 'i3']) self.deleteTag(repository, 'latest') self.assertDeleted(repository, 'i1', 'i2', 'i3') def test_two_tags_unshared_images(self): """ Repository has two tags with no shared images between them. """ - with self.assert_no_new_dangling_storages_or_labels(): + with self.assert_gc_integrity(): repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['f1', 'f2']) self.deleteTag(repository, 'latest') self.assertDeleted(repository, 'i1', 'i2', 'i3') @@ -229,7 +252,7 @@ class TestGarbageCollection(unittest.TestCase): """ Repository has two tags with shared images. Deleting the tag should only remove the unshared images. """ - with self.assert_no_new_dangling_storages_or_labels(): + with self.assert_gc_integrity(): repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1']) self.deleteTag(repository, 'latest') self.assertDeleted(repository, 'i2', 'i3') @@ -239,7 +262,7 @@ class TestGarbageCollection(unittest.TestCase): """ Two repositories with different images. Removing the tag from one leaves the other's images intact. """ - with self.assert_no_new_dangling_storages_or_labels(): + with self.assert_gc_integrity(): repository1 = self.createRepository(latest=['i1', 'i2', 'i3'], name='repo1') repository2 = self.createRepository(latest=['j1', 'j2', 'j3'], name='repo2') @@ -252,7 +275,7 @@ class TestGarbageCollection(unittest.TestCase): """ Two repositories with shared images. Removing the tag from one leaves the other's images intact. """ - with self.assert_no_new_dangling_storages_or_labels(): + with self.assert_gc_integrity(): repository1 = self.createRepository(latest=['i1', 'i2', 'i3'], name='repo1') repository2 = self.createRepository(latest=['i1', 'i2', 'j1'], name='repo2') @@ -265,7 +288,7 @@ class TestGarbageCollection(unittest.TestCase): """ Two repositories under different namespaces should result in the images being deleted but not completely removed from the database. """ - with self.assert_no_new_dangling_storages_or_labels(): + with self.assert_gc_integrity(): repository1 = self.createRepository(namespace=ADMIN_ACCESS_USER, latest=['i1', 'i2', 'i3']) repository2 = self.createRepository(namespace=PUBLIC_USER, latest=['i1', 'i2', 'i3']) @@ -277,7 +300,7 @@ class TestGarbageCollection(unittest.TestCase): """ Repository has multiple tags with shared images. Selectively deleting the tags, and verifying at each step. """ - with self.assert_no_new_dangling_storages_or_labels(): + with self.assert_gc_integrity(): repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1', 'f2'], third=['t1', 't2', 't3'], fourth=['i1', 'f1']) @@ -314,7 +337,7 @@ class TestGarbageCollection(unittest.TestCase): self.assertDeleted(repository, 'i1') def test_empty_gc(self): - with self.assert_no_new_dangling_storages_or_labels(): + with self.assert_gc_integrity(expect_storage_removed=False): repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1', 'f2'], third=['t1', 't2', 't3'], fourth=['i1', 'f1']) @@ -324,7 +347,7 @@ class TestGarbageCollection(unittest.TestCase): def test_time_machine_no_gc(self): """ Repository has two tags with shared images. Deleting the tag should not remove any images """ - with self.assert_no_new_dangling_storages_or_labels(): + with self.assert_gc_integrity(expect_storage_removed=False): repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1']) self._set_tag_expiration_policy(repository.namespace_user.username, 60*60*24) @@ -336,7 +359,7 @@ class TestGarbageCollection(unittest.TestCase): """ Repository has two tags with shared images. Deleting the second tag should cause the images for the first deleted tag to gc. """ - with self.assert_no_new_dangling_storages_or_labels(): + with self.assert_gc_integrity(): repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1']) self._set_tag_expiration_policy(repository.namespace_user.username, 1) @@ -351,6 +374,37 @@ class TestGarbageCollection(unittest.TestCase): self.assertDeleted(repository, 'i2', 'i3') self.assertNotDeleted(repository, 'i1', 'f1') + def test_images_shared_storage(self): + """ Repository with two tags, both with the same shared storage. Deleting the first + tag should delete the first image, but *not* its storage. + """ + with self.assert_gc_integrity(expect_storage_removed=False): + repository = self.createRepository() + + # Add two tags, each with their own image, but with the same storage. + image_storage = model.storage.create_v1_storage(storage.preferred_locations[0]) + + first_image = Image.create(docker_image_id='i1', + repository=repository, storage=image_storage, + ancestors='/') + + second_image = Image.create(docker_image_id='i2', + repository=repository, storage=image_storage, + ancestors='/') + + model.tag.store_tag_manifest(repository.namespace_user.username, repository.name, + 'first', first_image.docker_image_id, + 'sha:someshahere', '{}') + + model.tag.store_tag_manifest(repository.namespace_user.username, repository.name, + 'second', second_image.docker_image_id, + 'sha:someshahere', '{}') + + # Delete the first tag. + self.deleteTag(repository, 'first') + self.assertDeleted(repository, 'i1') + self.assertNotDeleted(repository, 'i2') + if __name__ == '__main__': unittest.main() diff --git a/test/test_secscan.py b/test/test_secscan.py index 7a40d81a9..b1df0aa70 100644 --- a/test/test_secscan.py +++ b/test/test_secscan.py @@ -726,5 +726,34 @@ class TestSecurityScanner(unittest.TestCase): self.assertIsNone(notification_queue.get()) + def test_layer_gc(self): + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) + + # Delete the prod tag so that only the `latest` tag remains. + model.tag.delete_tag(ADMIN_ACCESS_USER, SIMPLE_REPO, 'prod') + + with fake_security_scanner() as security_scanner: + # Analyze the layer. + analyzer = LayerAnalyzer(app.config, self.api) + analyzer.analyze_recursively(layer) + + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertAnalyzed(layer, security_scanner, True, 1) + self.assertTrue(security_scanner.has_layer(security_scanner.layer_id(layer))) + + namespace_user = model.user.get_user(ADMIN_ACCESS_USER) + model.user.change_user_tag_expiration(namespace_user, 0) + + # Delete the tag in the repository and GC. + model.tag.delete_tag(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + time.sleep(1) + + repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) + model.repository.garbage_collect_repo(repo) + + # Ensure that the security scanner no longer has the image. + self.assertFalse(security_scanner.has_layer(security_scanner.layer_id(layer))) + + if __name__ == '__main__': unittest.main() diff --git a/util/secscan/api.py b/util/secscan/api.py index c2f62f3c2..432c2cdcd 100644 --- a/util/secscan/api.py +++ b/util/secscan/api.py @@ -1,8 +1,10 @@ import logging + +from urlparse import urljoin + import requests from flask import url_for -from urlparse import urljoin from data.database import CloseForLongOperation from data import model @@ -40,11 +42,17 @@ class APIRequestFailure(Exception): _API_METHOD_INSERT = 'layers' _API_METHOD_GET_LAYER = 'layers/%s' +_API_METHOD_DELETE_LAYER = 'layers/%s' _API_METHOD_MARK_NOTIFICATION_READ = 'notifications/%s' _API_METHOD_GET_NOTIFICATION = 'notifications/%s' _API_METHOD_PING = 'metrics' +def compute_layer_id(layer): + """ Returns the ID for the layer in the security scanner. """ + return '%s.%s' % (layer.docker_image_id, layer.storage.uuid) + + class SecurityScannerAPI(object): """ Helper class for talking to the Security Scan service (Clair). """ def __init__(self, app, config, storage, client=None, skip_validation=False): @@ -62,7 +70,6 @@ class SecurityScannerAPI(object): self._default_storage_locations = config['DISTRIBUTED_STORAGE_PREFERENCE'] self._target_version = config.get('SECURITY_SCANNER_ENGINE_VERSION_TARGET', 2) - def _get_image_url_and_auth(self, image): """ Returns a tuple of the url and the auth header value that must be used to fetch the layer data itself. If the image can't be addressed, we return @@ -74,8 +81,8 @@ class SecurityScannerAPI(object): if not self._storage.exists(locations, path): locations = get_storage_locations(image.storage.uuid) if not locations or not self._storage.exists(locations, path): - logger.warning('Could not find a valid location to download layer %s.%s out of %s', - image.docker_image_id, image.storage.uuid, locations) + logger.warning('Could not find a valid location to download layer %s out of %s', + compute_layer_id(image), locations) return None, None uri = self._storage.get_direct_download_url(locations, path) @@ -106,17 +113,16 @@ class SecurityScannerAPI(object): return uri, auth_header - - def _new_analyze_request(self, image): - """ Create the request body to submit the given image for analysis. If the image's URL cannot + def _new_analyze_request(self, layer): + """ Create the request body to submit the given layer for analysis. If the layer's URL cannot be found, returns None. """ - url, auth_header = self._get_image_url_and_auth(image) + url, auth_header = self._get_image_url_and_auth(layer) if url is None: return None layer_request = { - 'Name': '%s.%s' % (image.docker_image_id, image.storage.uuid), + 'Name': compute_layer_id(layer), 'Path': url, 'Format': 'Docker', } @@ -126,14 +132,23 @@ class SecurityScannerAPI(object): 'Authorization': auth_header, } - if image.parent.docker_image_id and image.parent.storage.uuid: - layer_request['ParentName'] = '%s.%s' % (image.parent.docker_image_id, - image.parent.storage.uuid) + if layer.parent.docker_image_id and layer.parent.storage.uuid: + layer_request['ParentName'] = compute_layer_id(layer.parent) return { 'Layer': layer_request, } + def cleanup_layers(self, layers): + """ Callback invoked by garbage collection to cleanup any layers that no longer + need to be stored in the security scanner. + """ + if self._config is None: + # Security scanner not enabled. + return + + for layer in layers: + self.delete_layer(layer) def ping(self): """ Calls GET on the metrics endpoint of the security scanner to ensure it is running @@ -151,6 +166,17 @@ class SecurityScannerAPI(object): logger.exception('Exception when trying to connect to security scanner endpoint') raise Exception('Exception when trying to connect to security scanner endpoint') + def delete_layer(self, layer): + """ Calls DELETE on the given layer in the security scanner, removing it from + its database. + """ + layer_id = compute_layer_id(layer) + try: + response = self._call('DELETE', _API_METHOD_DELETE_LAYER % layer_id) + return response.status_code / 100 == 2 + except requests.exceptions.RequestException: + logger.exception('Failed to delete layer: %s', layer_id) + return False def analyze_layer(self, layer): """ Posts the given layer to the security scanner for analysis, blocking until complete. @@ -201,7 +227,6 @@ class SecurityScannerAPI(object): # Return the parsed API version. return json_response['Layer']['IndexedByVersion'] - def check_layer_vulnerable(self, layer_id, cve_name): """ Checks to see if the layer with the given ID is vulnerable to the specified CVE. """ layer_data = self._get_layer_data(layer_id, include_vulnerabilities=True) @@ -215,7 +240,6 @@ class SecurityScannerAPI(object): return False - def get_notification(self, notification_name, layer_limit=100, page=None): """ Gets the data for a specific notification, with optional page token. Returns a tuple of the data (None on failure) and whether to retry. @@ -245,7 +269,6 @@ class SecurityScannerAPI(object): return json_response, False - def mark_notification_read(self, notification_name): """ Marks a security scanner notification as read. """ try: @@ -255,13 +278,11 @@ class SecurityScannerAPI(object): logger.exception('Failed to mark notification as read: %s', notification_name) return False - def get_layer_data(self, layer, include_features=False, include_vulnerabilities=False): """ Returns the layer data for the specified layer. On error, returns None. """ - layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) + layer_id = compute_layer_id(layer) return self._get_layer_data(layer_id, include_features, include_vulnerabilities) - def _get_layer_data(self, layer_id, include_features=False, include_vulnerabilities=False): try: params = {} @@ -288,7 +309,6 @@ class SecurityScannerAPI(object): return json_response - def _call(self, method, relative_url, params=None, body=None): """ Issues an HTTP call to the sec API at the given relative URL. This function disconnects from the database while awaiting a response diff --git a/util/secscan/fake.py b/util/secscan/fake.py index 0ed5c12f5..a5f1190bd 100644 --- a/util/secscan/fake.py +++ b/util/secscan/fake.py @@ -5,7 +5,7 @@ import urlparse from contextlib import contextmanager from httmock import urlmatch, HTTMock, all_requests -from util.secscan.api import UNKNOWN_PARENT_LAYER_ERROR_MSG +from util.secscan.api import UNKNOWN_PARENT_LAYER_ERROR_MSG, compute_layer_id @contextmanager def fake_security_scanner(hostname='fakesecurityscanner'): @@ -79,7 +79,7 @@ class FakeSecurityScanner(object): def layer_id(self, layer): """ Returns the Quay Security Scanner layer ID for the given layer (Image row). """ - return '%s.%s' % (layer.docker_image_id, layer.storage.uuid) + return compute_layer_id(layer) def add_layer(self, layer_id): """ Adds a layer to the security scanner, with no features or vulnerabilities. """ @@ -191,6 +191,20 @@ class FakeSecurityScanner(object): 'content': json.dumps({'Layer': layer_data}), } + @urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers/(.+)', method='DELETE') + def remove_layer_mock(url, _): + layer_id = url.path[len('/v1/layers/'):] + if not layer_id in self.layers: + return { + 'status_code': 404, + 'content': json.dumps({'Error': {'Message': 'Unknown layer'}}), + } + + self.layers.pop(layer_id) + return { + 'status_code': 204, 'content': '', + } + @urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers', method='POST') def post_layer_mock(_, request): body_data = json.loads(request.body) @@ -293,5 +307,5 @@ class FakeSecurityScanner(object): def response_content(url, _): raise Exception('Unknown endpoint: ' + str(url)) - return [get_layer_mock, post_layer_mock, get_notification, delete_notification, - response_content] + return [get_layer_mock, post_layer_mock, remove_layer_mock, get_notification, + delete_notification, response_content]