Refactor security worker

This commit is contained in:
Quentin Machu 2015-11-17 17:42:52 -05:00
parent 206ffc65af
commit 605ed1fc77
6 changed files with 184 additions and 182 deletions

View file

@ -1,5 +1,6 @@
import logging import logging
import dateutil.parser import dateutil.parser
import random
from peewee import JOIN_LEFT_OUTER, fn, SQL from peewee import JOIN_LEFT_OUTER, fn, SQL
from datetime import datetime from datetime import datetime
@ -7,7 +8,8 @@ from datetime import datetime
from data.model import (DataModelException, db_transaction, _basequery, storage, from data.model import (DataModelException, db_transaction, _basequery, storage,
InvalidImageException, config) InvalidImageException, config)
from data.database import (Image, Repository, ImageStoragePlacement, Namespace, ImageStorage, from data.database import (Image, Repository, ImageStoragePlacement, Namespace, ImageStorage,
ImageStorageLocation, RepositoryPermission, db_for_update) ImageStorageLocation, RepositoryPermission, db_for_update,
db_random_func)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -459,3 +461,74 @@ def ensure_image_locations(*names):
data = [{'name': name} for name in insert_names] data = [{'name': name} for name in insert_names]
ImageStorageLocation.insert_many(data).execute() ImageStorageLocation.insert_many(data).execute()
def get_secscan_candidates(engine_version, batch_size):
Parent = Image.alias()
ParentImageStorage = ImageStorage.alias()
rimages = []
# Collect the images without parents
candidates = (Image
.select(Image.id)
.join(ImageStorage)
.where(Image.security_indexed_engine < engine_version,
Image.parent >> None,
ImageStorage.uploading == False)
.limit(batch_size*10))
images = (Image
.select(Image, ImageStorage)
.join(ImageStorage)
.where(Image.id << candidates)
.order_by(db_random_func())
.limit(batch_size))
for image in images:
rimages.append(image)
# Collect the images with analyzed parents.
candidates = (Image
.select(Image.id)
.join(Parent, on=(Image.parent == Parent.id))
.switch(Image)
.join(ImageStorage)
.where(Image.security_indexed_engine < engine_version,
Parent.security_indexed == True,
Parent.security_indexed_engine >= engine_version,
ImageStorage.uploading == False)
.limit(batch_size*10))
images = (Image
.select(Image, ImageStorage, Parent, ParentImageStorage)
.join(Parent, on=(Image.parent == Parent.id))
.join(ParentImageStorage, on=(ParentImageStorage.id == Parent.storage))
.switch(Image)
.join(ImageStorage)
.where(Image.id << candidates)
.order_by(db_random_func())
.limit(batch_size))
for image in images:
rimages.append(image)
# Shuffle the images, otherwise the images without parents will always be on the top
random.shuffle(rimages)
return rimages
def set_secscan_status(image, indexed, version):
query = (Image
.select()
.join(ImageStorage)
.where(Image.docker_image_id == image.docker_image_id,
ImageStorage.uuid == image.storage.uuid))
ids_to_update = [row.id for row in query]
if not ids_to_update:
return
(Image
.update(security_indexed=indexed, security_indexed_engine=version)
.where(Image.id << ids_to_update)
.execute())

View file

@ -225,3 +225,13 @@ def lookup_repo_storages_by_content_checksum(repo, checksums):
.select() .select()
.join(Image) .join(Image)
.where(Image.repository == repo, ImageStorage.content_checksum << checksums)) .where(Image.repository == repo, ImageStorage.content_checksum << checksums))
def get_storage_locations(uuid):
query = (ImageStoragePlacement
.select()
.join(ImageStorageLocation)
.switch(ImageStoragePlacement)
.join(ImageStorage, JOIN_LEFT_OUTER)
.where(ImageStorage.uuid == uuid))
return [location.location.name for location in query]

View file

@ -3,7 +3,7 @@ from uuid import uuid4
from data.model import (image, db_transaction, DataModelException, _basequery, from data.model import (image, db_transaction, DataModelException, _basequery,
InvalidManifestException) InvalidManifestException)
from data.database import (RepositoryTag, Repository, Image, ImageStorage, Namespace, TagManifest, from data.database import (RepositoryTag, Repository, Image, ImageStorage, Namespace, TagManifest,
get_epoch_timestamp, db_for_update) RepositoryNotification, get_epoch_timestamp, db_for_update)
def _tag_alive(query, now_ts=None): def _tag_alive(query, now_ts=None):
@ -18,15 +18,29 @@ def get_matching_tags(docker_image_id, storage_uuid, *args):
given docker_image_id and storage_uuid. """ given docker_image_id and storage_uuid. """
image_query = image.get_repository_image_and_deriving(docker_image_id, storage_uuid) image_query = image.get_repository_image_and_deriving(docker_image_id, storage_uuid)
return (RepositoryTag return _tag_alive(RepositoryTag
.select(*args) .select(*args)
.distinct() .distinct()
.join(Image) .join(Image)
.join(ImageStorage) .join(ImageStorage)
.where(Image.id << image_query, RepositoryTag.lifetime_end_ts >> None, .where(Image.id << image_query, RepositoryTag.hidden == False))
def get_tags_for_image(image_id, *args):
return _tag_alive(RepositoryTag
.select(*args)
.distinct()
.where(RepositoryTag.image == image_id,
RepositoryTag.hidden == False)) RepositoryTag.hidden == False))
def filter_tags_have_repository_event(query, event):
return (query
.switch(RepositoryTag)
.join(Repository)
.join(RepositoryNotification)
.where(RepositoryNotification.event == event))
def list_repository_tags(namespace_name, repository_name, include_hidden=False, def list_repository_tags(namespace_name, repository_name, include_hidden=False,
include_storage=False): include_storage=False):
to_select = (RepositoryTag, Image) to_select = (RepositoryTag, Image)
@ -233,4 +247,3 @@ def _load_repo_manifests(namespace, repo_name):
.join(Repository) .join(Repository)
.join(Namespace, on=(Namespace.id == Repository.namespace_user)) .join(Namespace, on=(Namespace.id == Repository.namespace_user))
.where(Repository.name == repo_name, Namespace.username == namespace)) .where(Repository.name == repo_name, Namespace.username == namespace))

Binary file not shown.

View file

@ -8,6 +8,7 @@ import features
from app import secscan_notification_queue, secscan_api from app import secscan_notification_queue, secscan_api
from data import model from data import model
from data.model.tag import filter_tags_have_repository_event, get_matching_tags
from data.database import (Image, ImageStorage, ExternalNotificationEvent, from data.database import (Image, ImageStorage, ExternalNotificationEvent,
Repository, RepositoryNotification, RepositoryTag) Repository, RepositoryNotification, RepositoryTag)
from endpoints.notificationhelper import spawn_notification from endpoints.notificationhelper import spawn_notification
@ -31,23 +32,19 @@ class SecurityNotificationWorker(QueueWorker):
tag_map = defaultdict(set) tag_map = defaultdict(set)
repository_map = {} repository_map = {}
# Find all tags that contain the layer(s) introducing the vulnerability. # Find all tags that contain the layer(s) introducing the vulnerability,
# in repositories that have the event setup.
content = data['Content'] content = data['Content']
layer_ids = content.get('NewIntroducingLayersIDs', content.get('IntroducingLayersIDs', [])) layer_ids = content.get('NewIntroducingLayersIDs', content.get('IntroducingLayersIDs', []))
for layer_id in layer_ids: for layer_id in layer_ids:
(docker_image_id, storage_uuid) = layer_id.split('.', 2) (docker_image_id, storage_uuid) = layer_id.split('.', 2)
tags = model.tag.get_matching_tags(docker_image_id, storage_uuid, RepositoryTag,
Repository, Image, ImageStorage)
# Additionally filter to tags only in repositories that have the event setup. matching = get_matching_tags(docker_image_id, storage_uuid, RepositoryTag, Repository,
matching = list(tags Image, ImageStorage)
.switch(RepositoryTag) tags = list(filter_tags_have_repository_event(matching, event))
.join(Repository)
.join(RepositoryNotification)
.where(RepositoryNotification.event == event))
check_map = {} check_map = {}
for tag in matching: for tag in tags:
# Verify that the tag's root image has the vulnerability. # Verify that the tag's root image has the vulnerability.
tag_layer_id = '%s.%s' % (tag.image.docker_image_id, tag.image.storage.uuid) tag_layer_id = '%s.%s' % (tag.image.docker_image_id, tag.image.storage.uuid)
logger.debug('Checking if layer %s is vulnerable to %s', tag_layer_id, cve_id) logger.debug('Checking if layer %s is vulnerable to %s', tag_layer_id, cve_id)

View file

@ -4,121 +4,25 @@ import logging.config
import requests import requests
import features import features
import time import time
import os
import random
from endpoints.notificationhelper import spawn_notification from endpoints.notificationhelper import spawn_notification
from collections import defaultdict from collections import defaultdict
from peewee import JOIN_LEFT_OUTER from app import app, config_provider, storage, secscan_api
from app import app, config_provider, storage, OVERRIDE_CONFIG_DIRECTORY, secscan_api
from workers.worker import Worker from workers.worker import Worker
from data.database import (Image, ImageStorage, ImageStorageLocation, ImageStoragePlacement, from data import model
db_random_func, UseThenDisconnect, RepositoryTag, Repository, from data.model.tag import filter_tags_have_repository_event, get_tags_for_image
ExternalNotificationEvent, RepositoryNotification) from data.model.image import get_secscan_candidates, set_secscan_status
from data.model.storage import get_storage_locations
from data.database import (UseThenDisconnect, ExternalNotificationEvent)
from util.secscan.api import SecurityConfigValidator from util.secscan.api import SecurityConfigValidator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
BATCH_SIZE = 20 BATCH_SIZE = 5
INDEXING_INTERVAL = 10 INDEXING_INTERVAL = 30
API_METHOD_INSERT = '/v1/layers' API_METHOD_INSERT = '/v1/layers'
API_METHOD_VERSION = '/v1/versions/engine' API_METHOD_VERSION = '/v1/versions/engine'
def _get_images_to_export_list(version):
Parent = Image.alias()
ParentImageStorage = ImageStorage.alias()
rimages = []
# Collect the images without parents
candidates = (Image
.select(Image.id, Image.docker_image_id, ImageStorage.uuid)
.join(ImageStorage)
.where(Image.security_indexed_engine < version,
Image.parent_id >> None,
ImageStorage.uploading == False)
.limit(BATCH_SIZE*10)
.alias('candidates'))
images = (Image
.select(candidates.c.id, candidates.c.docker_image_id, candidates.c.uuid)
.from_(candidates)
.order_by(db_random_func())
.tuples()
.limit(BATCH_SIZE))
for image in images:
rimages.append({'image_id': image[0],
'docker_image_id': image[1],
'storage_uuid': image[2],
'parent_docker_image_id': None,
'parent_storage_uuid': None})
# Collect the images with analyzed parents.
candidates = (Image
.select(Image.id,
Image.docker_image_id,
ImageStorage.uuid,
Parent.docker_image_id.alias('parent_docker_image_id'),
ParentImageStorage.uuid.alias('parent_storage_uuid'))
.join(Parent, on=(Image.parent_id == Parent.id))
.join(ParentImageStorage, on=(ParentImageStorage.id == Parent.storage))
.switch(Image)
.join(ImageStorage)
.where(Image.security_indexed_engine < version,
Parent.security_indexed == True,
Parent.security_indexed_engine >= version,
ImageStorage.uploading == False)
.limit(BATCH_SIZE*10)
.alias('candidates'))
images = (Image
.select(candidates.c.id,
candidates.c.docker_image_id,
candidates.c.uuid,
candidates.c.parent_docker_image_id,
candidates.c.parent_storage_uuid)
.from_(candidates)
.order_by(db_random_func())
.tuples()
.limit(BATCH_SIZE))
for image in images:
rimages.append({'image_id': image[0],
'docker_image_id': image[1],
'storage_uuid': image[2],
'parent_docker_image_id': image[3],
'parent_storage_uuid': image[4]})
# Shuffle the images, otherwise the images without parents will always be on the top
random.shuffle(rimages)
return rimages
def _get_storage_locations(uuid):
query = (ImageStoragePlacement
.select()
.join(ImageStorageLocation)
.switch(ImageStoragePlacement)
.join(ImageStorage, JOIN_LEFT_OUTER)
.where(ImageStorage.uuid == uuid))
return [location.location.name for location in query]
def _update_image(image, indexed, version):
query = (Image
.select()
.join(ImageStorage)
.where(Image.docker_image_id == image['docker_image_id'],
ImageStorage.uuid == image['storage_uuid']))
ids_to_update = [row.id for row in query]
if not ids_to_update:
return
(Image
.update(security_indexed=indexed, security_indexed_engine=version)
.where(Image.id << ids_to_update)
.execute())
class SecurityWorker(Worker): class SecurityWorker(Worker):
def __init__(self): def __init__(self):
@ -133,21 +37,22 @@ class SecurityWorker(Worker):
self._keys = validator.keypair() self._keys = validator.keypair()
self.add_operation(self._index_images, INDEXING_INTERVAL) self.add_operation(self._index_images, INDEXING_INTERVAL)
else:
logger.warning('Failed to validate security scan configuration') logger.warning('Failed to validate security scan configuration')
def _get_image_url(self, image): def _get_image_url(self, image):
""" Gets the download URL for an image and if the storage doesn't exist, """ Gets the download URL for an image and if the storage doesn't exist,
marks the image as unindexed. """ marks the image as unindexed. """
path = storage.image_layer_path(image['storage_uuid']) path = model.storage.get_layer_path(image.storage)
locations = self._default_storage_locations locations = self._default_storage_locations
if not storage.exists(locations, path): if not storage.exists(locations, path):
locations = _get_storage_locations(image['storage_uuid']) locations = get_storage_locations(image.storage.uuid)
if not locations or not storage.exists(locations, path): if not locations or not storage.exists(locations, path):
logger.warning('Could not find a valid location to download layer %s', logger.warning('Could not find a valid location to download layer %s.%s',
image['docker_image_id']+'.'+image['storage_uuid']) image.docker_image_id, image.storage.uuid)
_update_image(image, False, self._target_version) set_secscan_status(image, False, self._target_version)
return None return None
uri = storage.get_direct_download_url(locations, path) uri = storage.get_direct_download_url(locations, path)
@ -172,23 +77,21 @@ class SecurityWorker(Worker):
return None return None
request = { request = {
'ID': '%s.%s' % (image['docker_image_id'], image['storage_uuid']), 'ID': '%s.%s' % (image.docker_image_id, image.storage.uuid),
'Path': url, 'Path': url,
} }
if image['parent_docker_image_id'] is not None and image['parent_storage_uuid'] is not None: if image.parent is not None:
request['ParentID'] = '%s.%s' % (image['parent_docker_image_id'], request['ParentID'] = '%s.%s' % (image.parent.docker_image_id,
image['parent_storage_uuid']) image.parent.storage.uuid)
return request return request
def _analyze_image(self, image): def _analyze_image(self, image):
""" Analyzes an image by passing it to Clair. Returns the vulnerabilities detected """ Analyzes an image by passing it to Clair. """
(if any) or None on error.
"""
request = self._new_request(image) request = self._new_request(image)
if request is None: if request is None:
return None return False
# Analyze the image. # Analyze the image.
try: try:
@ -200,7 +103,7 @@ class SecurityWorker(Worker):
jsonResponse = httpResponse.json() jsonResponse = httpResponse.json()
except (requests.exceptions.RequestException, ValueError): except (requests.exceptions.RequestException, ValueError):
logger.exception('An exception occurred when analyzing layer ID %s', request['ID']) logger.exception('An exception occurred when analyzing layer ID %s', request['ID'])
return None return False
# Handle any errors from the security scanner. # Handle any errors from the security scanner.
if httpResponse.status_code != 201: if httpResponse.status_code != 201:
@ -210,11 +113,12 @@ class SecurityWorker(Worker):
request['ID'], jsonResponse['Message']) request['ID'], jsonResponse['Message'])
# Hopefully, there is no version lower than the target one running # Hopefully, there is no version lower than the target one running
_update_image(image, False, self._target_version) set_secscan_status(image, False, self._target_version)
return True
else: else:
logger.warning('Got non-201 when analyzing layer ID %s: %s', request['ID'], jsonResponse) logger.warning('Got non-201 when analyzing layer ID %s: %s', request['ID'], jsonResponse)
return False
return None
# Verify that the version matches. # Verify that the version matches.
api_version = jsonResponse['Version'] api_version = jsonResponse['Version']
@ -222,25 +126,29 @@ class SecurityWorker(Worker):
logger.warning('An engine runs on version %d but the target version is %d') logger.warning('An engine runs on version %d but the target version is %d')
# Mark the image as analyzed. # Mark the image as analyzed.
logger.debug('Layer %s analyzed successfully; Loading vulnerabilities for layer', logger.debug('Layer %s analyzed successfully', image.id)
image['image_id']) set_secscan_status(image, True, api_version)
_update_image(image, True, api_version)
# Lookup the vulnerabilities for the image, now that it is analyzed. return True
def _get_vulnerabilities(self, image):
""" Returns the vulnerabilities detected (if any) or None on error. """
try: try:
response = secscan_api.call('layers/%s/vulnerabilities', None, request['ID']) response = secscan_api.call('layers/%s/vulnerabilities', None,
'%s.%s' % (image.docker_image_id, image.storage.uuid))
logger.debug('Got response %s for vulnerabilities for layer %s', logger.debug('Got response %s for vulnerabilities for layer %s',
response.status_code, image['image_id']) response.status_code, image.id)
if response.status_code == 404: if response.status_code == 404:
return None return None
except (requests.exceptions.RequestException, ValueError): except (requests.exceptions.RequestException, ValueError):
logger.exception('Failed to get vulnerability response for %s', image['image_id']) logger.exception('Failed to get vulnerability response for %s', image.id)
return None return None
return response.json() return response.json()
def _index_images(self): def _index_images(self):
logger.debug('Started indexing') logger.debug('Started indexing')
event = ExternalNotificationEvent.get(name='vulnerability_found')
with UseThenDisconnect(app.config): with UseThenDisconnect(app.config):
while True: while True:
@ -248,7 +156,7 @@ class SecurityWorker(Worker):
images = [] images = []
try: try:
logger.debug('Looking up images to index') logger.debug('Looking up images to index')
images = _get_images_to_export_list(self._target_version) images = get_secscan_candidates(self._target_version, BATCH_SIZE)
except Image.DoesNotExist: except Image.DoesNotExist:
pass pass
@ -258,8 +166,25 @@ class SecurityWorker(Worker):
logger.debug('Found %d images to index', len(images)) logger.debug('Found %d images to index', len(images))
for image in images: for image in images:
# Analyze the image, retrieving the vulnerabilities (if any). # Analyze the image.
sec_data = self._analyze_image(image) analyzed = self._analyze_image(image)
if not analyzed:
return
# Get the tags of the image we analyzed
matching = list(filter_tags_have_repository_event(get_tags_for_image(image.id), event))
repository_map = defaultdict(list)
for tag in matching:
repository_map[tag.repository_id].append(tag)
# If there is at least one tag,
# Lookup the vulnerabilities for the image, now that it is analyzed.
if len(repository_map) > 0:
logger.debug('Loading vulnerabilities for layer %s', image.id)
sec_data = self._get_vulnerabilities(image)
if sec_data is None: if sec_data is None:
continue continue
@ -267,22 +192,7 @@ class SecurityWorker(Worker):
continue continue
# Dispatch events for any detected vulnerabilities # Dispatch events for any detected vulnerabilities
logger.debug('Got vulnerabilities for layer %s: %s', image['image_id'], sec_data) logger.debug('Got vulnerabilities for layer %s: %s', image.id, sec_data)
event = ExternalNotificationEvent.get(name='vulnerability_found')
matching = (RepositoryTag
.select(RepositoryTag, Repository)
.distinct()
.join(Repository)
.join(RepositoryNotification)
.where(RepositoryNotification.event == event,
RepositoryTag.image == image['image_id'],
RepositoryTag.hidden == False,
RepositoryTag.lifetime_end_ts >> None))
repository_map = defaultdict(list)
for tag in matching:
repository_map[tag.repository_id].append(tag)
for repository_id in repository_map: for repository_id in repository_map:
tags = repository_map[repository_id] tags = repository_map[repository_id]
@ -300,7 +210,6 @@ class SecurityWorker(Worker):
spawn_notification(tags[0].repository, 'vulnerability_found', event_data) spawn_notification(tags[0].repository, 'vulnerability_found', event_data)
if __name__ == '__main__': if __name__ == '__main__':
if not features.SECURITY_SCANNER: if not features.SECURITY_SCANNER:
logger.debug('Security scanner disabled; skipping SecurityWorker') logger.debug('Security scanner disabled; skipping SecurityWorker')