From a780136337fafdb8f46f28405f89f8d535e37af7 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Fri, 10 Mar 2017 15:51:44 -0500 Subject: [PATCH] workers.securityworker: revert to image querying --- data/model/image.py | 2 +- workers/securityworker.py | 23 ++++++++++++++--------- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/data/model/image.py b/data/model/image.py index cd207c468..5ec12682e 100644 --- a/data/model/image.py +++ b/data/model/image.py @@ -490,7 +490,7 @@ def total_image_count(): return Image.select().count() -def get_image_id(): +def get_image_pk_field(): """ Returns the primary key for Image DB model """ return Image.id diff --git a/workers/securityworker.py b/workers/securityworker.py index 964d3c5b1..9356cbad5 100644 --- a/workers/securityworker.py +++ b/workers/securityworker.py @@ -1,20 +1,21 @@ import logging.config import time +from math import floor, log10 + import features from app import app, secscan_api, prometheus from workers.worker import Worker from data.database import UseThenDisconnect -from data.model.tag import (get_tags_images_eligible_for_scan, get_tag_pk_field, - get_max_id_for_sec_scan, get_min_id_for_sec_scan) +from data.model.image import (get_images_eligible_for_scan, get_image_pk_field, + get_max_id_for_sec_scan, get_min_id_for_sec_scan) from util.secscan.api import SecurityConfigValidator from util.secscan.analyzer import LayerAnalyzer, PreemptedException from util.migrate.allocator import yield_random_entries from endpoints.v2 import v2_bp -BATCH_SIZE = 50 DEFAULT_INDEXING_INTERVAL = 30 @@ -33,8 +34,9 @@ class SecurityWorker(Worker): self._analyzer = LayerAnalyzer(app.config, secscan_api) # Get the ID of the first image we want to analyze. - self._min_id = app.config.get('SECURITY_SCANNER_INDEXING_MIN_ID', - get_min_id_for_sec_scan(self._target_version)) + self._min_id = app.config.get('SECURITY_SCANNER_INDEXING_MIN_ID') + if self._min_id is None: + self._min_id = get_min_id_for_sec_scan(self._target_version) interval = app.config.get('SECURITY_SCANNER_INDEXING_INTERVAL', DEFAULT_INDEXING_INTERVAL) self.add_operation(self._index_images, interval) @@ -43,7 +45,7 @@ class SecurityWorker(Worker): def _index_images(self): def batch_query(): - return get_tags_images_eligible_for_scan(self._target_version) + return get_images_eligible_for_scan(self._target_version) # Get the ID of the last image we can analyze. Will be None if there are no images in the # database. @@ -53,17 +55,20 @@ class SecurityWorker(Worker): max_unscanned_images_gauge.Set(max_id) + # 4^log10(total) gives us a scalable batch size into the billions. + batch_size = 4 ** int(floor(log10(max(10, max_id - self._min_id)))) + with UseThenDisconnect(app.config): to_scan_generator = yield_random_entries( batch_query, - get_tag_pk_field(), - BATCH_SIZE, + get_image_pk_field(), + batch_size, max_id, self._min_id, ) for candidate, abt, num_remaining in to_scan_generator: try: - self._analyzer.analyze_recursively(candidate.image) + self._analyzer.analyze_recursively(candidate) except PreemptedException: logger.info('Another worker pre-empted us for layer: %s', candidate.id) abt.set()