This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/util/migrate/backfill_checksums.py
Joseph Schorr 0f3db709ea Add a vulnerability_found event for notice when we detect a vuln
Fixes #637

Note: This PR does *not* actually raise the event; it merely adds support for it
2015-11-06 15:22:18 -05:00

66 lines
2.4 KiB
Python

import logging
from app import storage as store
from data.database import ImageStorage, ImageStoragePlacement, ImageStorageLocation, JOIN_LEFT_OUTER
from digest import checksums
logger = logging.getLogger(__name__)
# TODO: Fix this to use random
# TODO: Copy in all referenced peewee models, as a later migration changes these
def _get_imagestorages_with_locations(query_modifier):
query = (ImageStoragePlacement
.select(ImageStoragePlacement, ImageStorage, ImageStorageLocation)
.join(ImageStorageLocation)
.switch(ImageStoragePlacement)
.join(ImageStorage, JOIN_LEFT_OUTER))
query = query_modifier(query)
location_list = list(query)
storages = {}
for location in location_list:
storage = location.storage
if not storage.id in storages:
storages[storage.id] = storage
storage.locations = set()
else:
storage = storages[storage.id]
storage.locations.add(location.location.name)
return storages.values()
def backfill_checksum(imagestorage_with_locations):
try:
json_data = store.get_content(imagestorage_with_locations.locations, store.image_json_path(imagestorage_with_locations.uuid))
with store.stream_read_file(imagestorage_with_locations.locations, store.image_layer_path(imagestorage_with_locations.uuid)) as fp:
imagestorage_with_locations.checksum = 'sha256:{0}'.format(checksums.sha256_file(fp, json_data + '\n'))
imagestorage_with_locations.save()
except:
imagestorage_with_locations.checksum = 'unknown:{0}'.format(imagestorage_with_locations.uuid)
imagestorage_with_locations.save()
def backfill_checksums():
logger.setLevel(logging.DEBUG)
logger.debug('backfill_checksums: Starting')
logger.debug('backfill_checksums: This can be a LONG RUNNING OPERATION. Please wait!')
def limit_to_empty_checksum(query):
return query.where(ImageStorage.checksum >> None, ImageStorage.uploading == False).limit(100)
while True:
storages = _get_imagestorages_with_locations(limit_to_empty_checksum)
if len(storages) == 0:
logger.debug('backfill_checksums: Completed')
return
for storage in storages:
backfill_checksum(storage)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('peewee').setLevel(logging.CRITICAL)
logging.getLogger('boto').setLevel(logging.CRITICAL)
backfill_checksums()