Merge pull request #2257 from coreos-inc/clair-gc-take2

feat(gc): Garbage collection for security scanning
This commit is contained in:
josephschorr 2017-01-17 14:49:36 -05:00 committed by GitHub
commit aafcb592a6
8 changed files with 194 additions and 62 deletions

1
app.py
View file

@ -236,6 +236,7 @@ else:
database.configure(app.config)
model.config.app_config = app.config
model.config.store = storage
model.config.register_image_cleanup_callback(secscan_api.cleanup_layers)
@login_manager.user_loader
def load_user(user_uuid):

View file

@ -107,6 +107,10 @@ class Config(object):
def __init__(self):
self.app_config = None
self.store = None
self.image_cleanup_callbacks = []
def register_image_cleanup_callback(self, callback):
self.image_cleanup_callbacks.append(callback)
config = Config()

View file

@ -5,9 +5,9 @@ from datetime import timedelta, datetime
from peewee import JOIN_LEFT_OUTER, fn, SQL, IntegrityError
from cachetools import ttl_cache
from data.model import (DataModelException, tag, db_transaction, storage, permission,
from data.model import (config, DataModelException, tag, db_transaction, storage, permission,
_basequery)
from data.database import (Repository, Namespace, RepositoryTag, Star, Image, User,
from data.database import (Repository, Namespace, RepositoryTag, Star, Image, ImageStorage, User,
Visibility, RepositoryPermission, RepositoryActionCount,
Role, RepositoryAuthorizedEmail, TagManifest, DerivedStorageForImage,
Label, TagManifestLabel, db_for_update, get_epoch_timestamp,
@ -173,29 +173,24 @@ def garbage_collect_repo(repo, extra_candidate_set=None):
referenced_candidates = (direct_referenced | ancestor_referenced)
# We desire two pieces of information from the database from the following
# We desire a few pieces of information from the database from the following
# query: all of the image ids which are associated with this repository,
# and the storages which are associated with those images. In order to
# fetch just this information, and bypass all of the peewee model parsing
# code, which is overkill for just two fields, we use a tuple query, and
# feed that directly to the dictionary tuple constructor which takes an
# iterable of tuples containing [(k, v), (k, v), ...]
# and the storages which are associated with those images.
unreferenced_candidates = (Image
.select(Image.id, Image.storage)
.select(Image.id, Image.docker_image_id,
ImageStorage.id, ImageStorage.uuid)
.join(ImageStorage)
.where(Image.id << candidates_orphans,
~(Image.id << referenced_candidates))
.tuples())
~(Image.id << referenced_candidates)))
unreferecend_images_to_storages = dict(unreferenced_candidates)
to_remove = unreferecend_images_to_storages.keys()
if len(to_remove) > 0:
logger.info('Cleaning up unreferenced images: %s', to_remove)
storage_id_whitelist = set(unreferecend_images_to_storages.values())
image_ids_to_remove = [candidate.id for candidate in unreferenced_candidates]
if len(image_ids_to_remove) > 0:
logger.info('Cleaning up unreferenced images: %s', image_ids_to_remove)
storage_id_whitelist = set([candidate.storage_id for candidate in unreferenced_candidates])
# Lookup any derived images for the images to remove.
derived = DerivedStorageForImage.select().where(
DerivedStorageForImage.source_image << to_remove)
DerivedStorageForImage.source_image << image_ids_to_remove)
has_derived = False
for derived_image in derived:
@ -207,21 +202,30 @@ def garbage_collect_repo(repo, extra_candidate_set=None):
try:
(DerivedStorageForImage
.delete()
.where(DerivedStorageForImage.source_image << to_remove)
.where(DerivedStorageForImage.source_image << image_ids_to_remove)
.execute())
except IntegrityError:
logger.info('Could not GC derived images %s; will try again soon', to_remove)
logger.info('Could not GC derived images %s; will try again soon', image_ids_to_remove)
return False
try:
Image.delete().where(Image.id << to_remove).execute()
Image.delete().where(Image.id << image_ids_to_remove).execute()
except IntegrityError:
logger.info('Could not GC images %s; will try again soon', to_remove)
logger.info('Could not GC images %s; will try again soon', image_ids_to_remove)
return False
if len(to_remove) > 0:
logger.info('Garbage collecting storage for images: %s', to_remove)
storage.garbage_collect_storage(storage_id_whitelist)
# If any images were removed, GC any orphaned storages.
if len(image_ids_to_remove) > 0:
logger.info('Garbage collecting storage for images: %s', image_ids_to_remove)
storage_ids_removed = set(storage.garbage_collect_storage(storage_id_whitelist))
# If any storages were removed and cleanup callbacks are registered, call them with
# the images+storages removed.
if storage_ids_removed and config.image_cleanup_callbacks:
image_storages_removed = [candidate for candidate in unreferenced_candidates
if candidate.storage_id in storage_ids_removed]
for callback in config.image_cleanup_callbacks:
callback(image_storages_removed)
return True

View file

@ -72,8 +72,12 @@ def _orphaned_storage_query(candidate_ids):
def garbage_collect_storage(storage_id_whitelist):
""" Performs GC on a possible subset of the storage's with the IDs found in the
whitelist. The storages in the whitelist will be checked, and any orphaned will
be removed, with those IDs being returned.
"""
if len(storage_id_whitelist) == 0:
return
return []
def placements_query_to_paths_set(placements_query):
return {(get_image_location_for_id(placement.location_id).name,
@ -89,7 +93,7 @@ def garbage_collect_storage(storage_id_whitelist):
orphaned_storage_ids = _orphaned_storage_query(storage_id_whitelist)
if len(orphaned_storage_ids) == 0:
# Nothing to GC.
return
return []
placements_to_remove = list(ImageStoragePlacement
.select()
@ -133,6 +137,8 @@ def garbage_collect_storage(storage_id_whitelist):
logger.debug('Removing %s from %s', image_path, location_name)
config.store.remove({location_name}, image_path)
return orphaned_storage_ids
def create_v1_storage(location_name):
storage = ImageStorage.create(cas_path=False)

View file

@ -146,20 +146,43 @@ class TestGarbageCollection(unittest.TestCase):
return len(label_ids - referenced_by_manifest)
@contextmanager
def assert_no_new_dangling_storages_or_labels(self):
def assert_gc_integrity(self, expect_storage_removed=True):
""" Specialized assertion for ensuring that GC cleans up all dangling storages
and labels.
and labels, invokes the callback for images removed and doesn't invoke the
callback for images *not* removed.
"""
# TODO: Consider also asserting the number of DB queries being performed.
# Add a callback for when images are removed.
removed_image_storages = []
model.config.register_image_cleanup_callback(removed_image_storages.extend)
# Store the number of dangling storages and labels.
existing_storage_count = self._get_dangling_storage_count()
existing_label_count = self._get_dangling_label_count()
yield
# Ensure the number of dangling storages and labels has not changed.
updated_storage_count = self._get_dangling_storage_count()
self.assertEqual(updated_storage_count, existing_storage_count)
updated_label_count = self._get_dangling_label_count()
self.assertEqual(updated_label_count, existing_label_count)
# Ensure that for each call to the image+storage cleanup callback, the image and its
# storage is not found *anywhere* in the database.
for removed_image_and_storage in removed_image_storages:
with self.assertRaises(Image.DoesNotExist):
Image.get(id=removed_image_and_storage.id)
with self.assertRaises(ImageStorage.DoesNotExist):
ImageStorage.get(id=removed_image_and_storage.storage_id)
with self.assertRaises(ImageStorage.DoesNotExist):
ImageStorage.get(uuid=removed_image_and_storage.storage.uuid)
self.assertEquals(expect_storage_removed, bool(removed_image_storages))
def test_has_garbage(self):
""" Remove all existing repositories, then add one without garbage, check, then add one with
garbage, and check again.
@ -212,14 +235,14 @@ class TestGarbageCollection(unittest.TestCase):
def test_one_tag(self):
""" Create a repository with a single tag, then remove that tag and verify that the repository
is now empty. """
with self.assert_no_new_dangling_storages_or_labels():
with self.assert_gc_integrity():
repository = self.createRepository(latest=['i1', 'i2', 'i3'])
self.deleteTag(repository, 'latest')
self.assertDeleted(repository, 'i1', 'i2', 'i3')
def test_two_tags_unshared_images(self):
""" Repository has two tags with no shared images between them. """
with self.assert_no_new_dangling_storages_or_labels():
with self.assert_gc_integrity():
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['f1', 'f2'])
self.deleteTag(repository, 'latest')
self.assertDeleted(repository, 'i1', 'i2', 'i3')
@ -229,7 +252,7 @@ class TestGarbageCollection(unittest.TestCase):
""" Repository has two tags with shared images. Deleting the tag should only remove the
unshared images.
"""
with self.assert_no_new_dangling_storages_or_labels():
with self.assert_gc_integrity():
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
self.deleteTag(repository, 'latest')
self.assertDeleted(repository, 'i2', 'i3')
@ -239,7 +262,7 @@ class TestGarbageCollection(unittest.TestCase):
""" Two repositories with different images. Removing the tag from one leaves the other's
images intact.
"""
with self.assert_no_new_dangling_storages_or_labels():
with self.assert_gc_integrity():
repository1 = self.createRepository(latest=['i1', 'i2', 'i3'], name='repo1')
repository2 = self.createRepository(latest=['j1', 'j2', 'j3'], name='repo2')
@ -252,7 +275,7 @@ class TestGarbageCollection(unittest.TestCase):
""" Two repositories with shared images. Removing the tag from one leaves the other's
images intact.
"""
with self.assert_no_new_dangling_storages_or_labels():
with self.assert_gc_integrity():
repository1 = self.createRepository(latest=['i1', 'i2', 'i3'], name='repo1')
repository2 = self.createRepository(latest=['i1', 'i2', 'j1'], name='repo2')
@ -265,7 +288,7 @@ class TestGarbageCollection(unittest.TestCase):
""" Two repositories under different namespaces should result in the images being deleted
but not completely removed from the database.
"""
with self.assert_no_new_dangling_storages_or_labels():
with self.assert_gc_integrity():
repository1 = self.createRepository(namespace=ADMIN_ACCESS_USER, latest=['i1', 'i2', 'i3'])
repository2 = self.createRepository(namespace=PUBLIC_USER, latest=['i1', 'i2', 'i3'])
@ -277,7 +300,7 @@ class TestGarbageCollection(unittest.TestCase):
""" Repository has multiple tags with shared images. Selectively deleting the tags, and
verifying at each step.
"""
with self.assert_no_new_dangling_storages_or_labels():
with self.assert_gc_integrity():
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1', 'f2'],
third=['t1', 't2', 't3'], fourth=['i1', 'f1'])
@ -314,7 +337,7 @@ class TestGarbageCollection(unittest.TestCase):
self.assertDeleted(repository, 'i1')
def test_empty_gc(self):
with self.assert_no_new_dangling_storages_or_labels():
with self.assert_gc_integrity(expect_storage_removed=False):
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1', 'f2'],
third=['t1', 't2', 't3'], fourth=['i1', 'f1'])
@ -324,7 +347,7 @@ class TestGarbageCollection(unittest.TestCase):
def test_time_machine_no_gc(self):
""" Repository has two tags with shared images. Deleting the tag should not remove any images
"""
with self.assert_no_new_dangling_storages_or_labels():
with self.assert_gc_integrity(expect_storage_removed=False):
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
self._set_tag_expiration_policy(repository.namespace_user.username, 60*60*24)
@ -336,7 +359,7 @@ class TestGarbageCollection(unittest.TestCase):
""" Repository has two tags with shared images. Deleting the second tag should cause the images
for the first deleted tag to gc.
"""
with self.assert_no_new_dangling_storages_or_labels():
with self.assert_gc_integrity():
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
self._set_tag_expiration_policy(repository.namespace_user.username, 1)
@ -351,6 +374,37 @@ class TestGarbageCollection(unittest.TestCase):
self.assertDeleted(repository, 'i2', 'i3')
self.assertNotDeleted(repository, 'i1', 'f1')
def test_images_shared_storage(self):
""" Repository with two tags, both with the same shared storage. Deleting the first
tag should delete the first image, but *not* its storage.
"""
with self.assert_gc_integrity(expect_storage_removed=False):
repository = self.createRepository()
# Add two tags, each with their own image, but with the same storage.
image_storage = model.storage.create_v1_storage(storage.preferred_locations[0])
first_image = Image.create(docker_image_id='i1',
repository=repository, storage=image_storage,
ancestors='/')
second_image = Image.create(docker_image_id='i2',
repository=repository, storage=image_storage,
ancestors='/')
model.tag.store_tag_manifest(repository.namespace_user.username, repository.name,
'first', first_image.docker_image_id,
'sha:someshahere', '{}')
model.tag.store_tag_manifest(repository.namespace_user.username, repository.name,
'second', second_image.docker_image_id,
'sha:someshahere', '{}')
# Delete the first tag.
self.deleteTag(repository, 'first')
self.assertDeleted(repository, 'i1')
self.assertNotDeleted(repository, 'i2')
if __name__ == '__main__':
unittest.main()

View file

@ -726,5 +726,34 @@ class TestSecurityScanner(unittest.TestCase):
self.assertIsNone(notification_queue.get())
def test_layer_gc(self):
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
# Delete the prod tag so that only the `latest` tag remains.
model.tag.delete_tag(ADMIN_ACCESS_USER, SIMPLE_REPO, 'prod')
with fake_security_scanner() as security_scanner:
# Analyze the layer.
analyzer = LayerAnalyzer(app.config, self.api)
analyzer.analyze_recursively(layer)
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
self.assertAnalyzed(layer, security_scanner, True, 1)
self.assertTrue(security_scanner.has_layer(security_scanner.layer_id(layer)))
namespace_user = model.user.get_user(ADMIN_ACCESS_USER)
model.user.change_user_tag_expiration(namespace_user, 0)
# Delete the tag in the repository and GC.
model.tag.delete_tag(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
time.sleep(1)
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
model.repository.garbage_collect_repo(repo)
# Ensure that the security scanner no longer has the image.
self.assertFalse(security_scanner.has_layer(security_scanner.layer_id(layer)))
if __name__ == '__main__':
unittest.main()

View file

@ -1,8 +1,10 @@
import logging
from urlparse import urljoin
import requests
from flask import url_for
from urlparse import urljoin
from data.database import CloseForLongOperation
from data import model
@ -40,11 +42,17 @@ class APIRequestFailure(Exception):
_API_METHOD_INSERT = 'layers'
_API_METHOD_GET_LAYER = 'layers/%s'
_API_METHOD_DELETE_LAYER = 'layers/%s'
_API_METHOD_MARK_NOTIFICATION_READ = 'notifications/%s'
_API_METHOD_GET_NOTIFICATION = 'notifications/%s'
_API_METHOD_PING = 'metrics'
def compute_layer_id(layer):
""" Returns the ID for the layer in the security scanner. """
return '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
class SecurityScannerAPI(object):
""" Helper class for talking to the Security Scan service (Clair). """
def __init__(self, app, config, storage, client=None, skip_validation=False):
@ -62,7 +70,6 @@ class SecurityScannerAPI(object):
self._default_storage_locations = config['DISTRIBUTED_STORAGE_PREFERENCE']
self._target_version = config.get('SECURITY_SCANNER_ENGINE_VERSION_TARGET', 2)
def _get_image_url_and_auth(self, image):
""" Returns a tuple of the url and the auth header value that must be used
to fetch the layer data itself. If the image can't be addressed, we return
@ -74,8 +81,8 @@ class SecurityScannerAPI(object):
if not self._storage.exists(locations, path):
locations = get_storage_locations(image.storage.uuid)
if not locations or not self._storage.exists(locations, path):
logger.warning('Could not find a valid location to download layer %s.%s out of %s',
image.docker_image_id, image.storage.uuid, locations)
logger.warning('Could not find a valid location to download layer %s out of %s',
compute_layer_id(image), locations)
return None, None
uri = self._storage.get_direct_download_url(locations, path)
@ -106,17 +113,16 @@ class SecurityScannerAPI(object):
return uri, auth_header
def _new_analyze_request(self, image):
""" Create the request body to submit the given image for analysis. If the image's URL cannot
def _new_analyze_request(self, layer):
""" Create the request body to submit the given layer for analysis. If the layer's URL cannot
be found, returns None.
"""
url, auth_header = self._get_image_url_and_auth(image)
url, auth_header = self._get_image_url_and_auth(layer)
if url is None:
return None
layer_request = {
'Name': '%s.%s' % (image.docker_image_id, image.storage.uuid),
'Name': compute_layer_id(layer),
'Path': url,
'Format': 'Docker',
}
@ -126,14 +132,23 @@ class SecurityScannerAPI(object):
'Authorization': auth_header,
}
if image.parent.docker_image_id and image.parent.storage.uuid:
layer_request['ParentName'] = '%s.%s' % (image.parent.docker_image_id,
image.parent.storage.uuid)
if layer.parent.docker_image_id and layer.parent.storage.uuid:
layer_request['ParentName'] = compute_layer_id(layer.parent)
return {
'Layer': layer_request,
}
def cleanup_layers(self, layers):
""" Callback invoked by garbage collection to cleanup any layers that no longer
need to be stored in the security scanner.
"""
if self._config is None:
# Security scanner not enabled.
return
for layer in layers:
self.delete_layer(layer)
def ping(self):
""" Calls GET on the metrics endpoint of the security scanner to ensure it is running
@ -151,6 +166,17 @@ class SecurityScannerAPI(object):
logger.exception('Exception when trying to connect to security scanner endpoint')
raise Exception('Exception when trying to connect to security scanner endpoint')
def delete_layer(self, layer):
""" Calls DELETE on the given layer in the security scanner, removing it from
its database.
"""
layer_id = compute_layer_id(layer)
try:
response = self._call('DELETE', _API_METHOD_DELETE_LAYER % layer_id)
return response.status_code / 100 == 2
except requests.exceptions.RequestException:
logger.exception('Failed to delete layer: %s', layer_id)
return False
def analyze_layer(self, layer):
""" Posts the given layer to the security scanner for analysis, blocking until complete.
@ -201,7 +227,6 @@ class SecurityScannerAPI(object):
# Return the parsed API version.
return json_response['Layer']['IndexedByVersion']
def check_layer_vulnerable(self, layer_id, cve_name):
""" Checks to see if the layer with the given ID is vulnerable to the specified CVE. """
layer_data = self._get_layer_data(layer_id, include_vulnerabilities=True)
@ -215,7 +240,6 @@ class SecurityScannerAPI(object):
return False
def get_notification(self, notification_name, layer_limit=100, page=None):
""" Gets the data for a specific notification, with optional page token.
Returns a tuple of the data (None on failure) and whether to retry.
@ -245,7 +269,6 @@ class SecurityScannerAPI(object):
return json_response, False
def mark_notification_read(self, notification_name):
""" Marks a security scanner notification as read. """
try:
@ -255,13 +278,11 @@ class SecurityScannerAPI(object):
logger.exception('Failed to mark notification as read: %s', notification_name)
return False
def get_layer_data(self, layer, include_features=False, include_vulnerabilities=False):
""" Returns the layer data for the specified layer. On error, returns None. """
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
layer_id = compute_layer_id(layer)
return self._get_layer_data(layer_id, include_features, include_vulnerabilities)
def _get_layer_data(self, layer_id, include_features=False, include_vulnerabilities=False):
try:
params = {}
@ -288,7 +309,6 @@ class SecurityScannerAPI(object):
return json_response
def _call(self, method, relative_url, params=None, body=None):
""" Issues an HTTP call to the sec API at the given relative URL.
This function disconnects from the database while awaiting a response

View file

@ -5,7 +5,7 @@ import urlparse
from contextlib import contextmanager
from httmock import urlmatch, HTTMock, all_requests
from util.secscan.api import UNKNOWN_PARENT_LAYER_ERROR_MSG
from util.secscan.api import UNKNOWN_PARENT_LAYER_ERROR_MSG, compute_layer_id
@contextmanager
def fake_security_scanner(hostname='fakesecurityscanner'):
@ -79,7 +79,7 @@ class FakeSecurityScanner(object):
def layer_id(self, layer):
""" Returns the Quay Security Scanner layer ID for the given layer (Image row). """
return '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
return compute_layer_id(layer)
def add_layer(self, layer_id):
""" Adds a layer to the security scanner, with no features or vulnerabilities. """
@ -191,6 +191,20 @@ class FakeSecurityScanner(object):
'content': json.dumps({'Layer': layer_data}),
}
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers/(.+)', method='DELETE')
def remove_layer_mock(url, _):
layer_id = url.path[len('/v1/layers/'):]
if not layer_id in self.layers:
return {
'status_code': 404,
'content': json.dumps({'Error': {'Message': 'Unknown layer'}}),
}
self.layers.pop(layer_id)
return {
'status_code': 204, 'content': '',
}
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers', method='POST')
def post_layer_mock(_, request):
body_data = json.loads(request.body)
@ -293,5 +307,5 @@ class FakeSecurityScanner(object):
def response_content(url, _):
raise Exception('Unknown endpoint: ' + str(url))
return [get_layer_mock, post_layer_mock, get_notification, delete_notification,
response_content]
return [get_layer_mock, post_layer_mock, remove_layer_mock, get_notification,
delete_notification, response_content]