Change derived storage to be based on image

Fixes #971
This commit is contained in:
Joseph Schorr 2015-11-24 12:44:07 -05:00
parent 8d05d40cf7
commit 762cd56e64
10 changed files with 155 additions and 112 deletions

View file

@ -514,6 +514,7 @@ class ImageStorageSignature(BaseModel):
) )
# NOTE: This table is *deprecated* and will be removed in the next PR.
class DerivedImageStorage(BaseModel): class DerivedImageStorage(BaseModel):
source = ForeignKeyField(ImageStorage, null=True, related_name='source') source = ForeignKeyField(ImageStorage, null=True, related_name='source')
derivative = ForeignKeyField(ImageStorage, related_name='derivative') derivative = ForeignKeyField(ImageStorage, related_name='derivative')
@ -596,6 +597,19 @@ class Image(BaseModel):
_ImageProxy.initialize(Image) _ImageProxy.initialize(Image)
class DerivedStorageForImage(BaseModel):
source_image = ForeignKeyField(Image)
derivative = ForeignKeyField(ImageStorage)
transformation = ForeignKeyField(ImageStorageTransformation)
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
(('source_image', 'transformation'), True),
)
class RepositoryTag(BaseModel): class RepositoryTag(BaseModel):
name = CharField() name = CharField()
image = ForeignKeyField(Image) image = ForeignKeyField(Image)
@ -831,4 +845,4 @@ all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission,
RepositoryAuthorizedEmail, ImageStorageTransformation, DerivedImageStorage, RepositoryAuthorizedEmail, ImageStorageTransformation, DerivedImageStorage,
TeamMemberInvite, ImageStorageSignature, ImageStorageSignatureKind, TeamMemberInvite, ImageStorageSignature, ImageStorageSignatureKind,
AccessTokenKind, Star, RepositoryActionCount, TagManifest, UserRegion, AccessTokenKind, Star, RepositoryActionCount, TagManifest, UserRegion,
QuayService, QuayRegion, QuayRelease, BlobUpload] QuayService, QuayRegion, QuayRelease, BlobUpload, DerivedStorageForImage]

View file

@ -0,0 +1,39 @@
"""Add new DerivedStorageForImage table
Revision ID: 5a2445ffe21b
Revises: 1b2bb93ceb82
Create Date: 2015-11-24 11:58:02.956687
"""
# revision identifiers, used by Alembic.
revision = '5a2445ffe21b'
down_revision = '1b2bb93ceb82'
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
def upgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.create_table('derivedstorageforimage',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('source_image_id', sa.Integer(), nullable=False),
sa.Column('derivative_id', sa.Integer(), nullable=False),
sa.Column('transformation_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['derivative_id'], ['imagestorage.id'], name=op.f('fk_derivedstorageforimage_derivative_id_imagestorage')),
sa.ForeignKeyConstraint(['source_image_id'], ['image.id'], name=op.f('fk_derivedstorageforimage_source_image_id_image')),
sa.ForeignKeyConstraint(['transformation_id'], ['imagestoragetransformation.id'], name=op.f('fk_derivedstorageforimage_transformation_constraint')),
sa.PrimaryKeyConstraint('id', name=op.f('pk_derivedstorageforimage'))
)
op.create_index('derivedstorageforimage_derivative_id', 'derivedstorageforimage', ['derivative_id'], unique=False)
op.create_index('derivedstorageforimage_source_image_id', 'derivedstorageforimage', ['source_image_id'], unique=False)
op.create_index('derivedstorageforimage_source_image_id_transformation_id', 'derivedstorageforimage', ['source_image_id', 'transformation_id'], unique=True)
op.create_index('derivedstorageforimage_transformation_id', 'derivedstorageforimage', ['transformation_id'], unique=False)
### end Alembic commands ###
def downgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.drop_table('derivedstorageforimage')
### end Alembic commands ###

View file

@ -8,8 +8,8 @@ from datetime import datetime
from data.model import (DataModelException, db_transaction, _basequery, storage, from data.model import (DataModelException, db_transaction, _basequery, storage,
InvalidImageException, config) InvalidImageException, config)
from data.database import (Image, Repository, ImageStoragePlacement, Namespace, ImageStorage, from data.database import (Image, Repository, ImageStoragePlacement, Namespace, ImageStorage,
ImageStorageLocation, RepositoryPermission, db_for_update, ImageStorageLocation, RepositoryPermission, DerivedStorageForImage,
db_random_func) ImageStorageTransformation, db_random_func, db_for_update)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -542,3 +542,48 @@ def set_secscan_status(image, indexed, version):
.update(security_indexed=indexed, security_indexed_engine=version) .update(security_indexed=indexed, security_indexed_engine=version)
.where(Image.id << ids_to_update) .where(Image.id << ids_to_update)
.execute()) .execute())
def find_or_create_derived_storage(source_image, transformation_name, preferred_location):
existing = find_derived_storage_for_image(source_image, transformation_name)
if existing is not None:
return existing
logger.debug('Creating storage dervied from source image: %s', source_image.id)
trans = ImageStorageTransformation.get(name=transformation_name)
new_storage = storage.create_v1_storage(preferred_location)
DerivedStorageForImage.create(source_image=source_image, derivative=new_storage,
transformation=trans)
return new_storage
def find_derived_storage_for_image(source_image, transformation_name):
try:
found = (ImageStorage
.select(ImageStorage, DerivedStorageForImage)
.join(DerivedStorageForImage)
.join(ImageStorageTransformation)
.where(DerivedStorageForImage.source_image == source_image,
ImageStorageTransformation.name == transformation_name)
.get())
found.locations = {placement.location.name for placement in found.imagestorageplacement_set}
return found
except ImageStorage.DoesNotExist:
return None
def delete_derived_storage_by_uuid(storage_uuid):
try:
image_storage = storage.get_storage_by_uuid(storage_uuid)
except InvalidImageException:
return
try:
DerivedStorageForImage.get(derivative=image_storage)
except DerivedStorageForImage.DoesNotExist:
return
image_storage.delete_instance(recursive=True)

View file

@ -7,8 +7,8 @@ from data.model import (DataModelException, tag, db_transaction, storage, image,
_basequery, config) _basequery, config)
from data.database import (Repository, Namespace, RepositoryTag, Star, Image, ImageStorage, User, from data.database import (Repository, Namespace, RepositoryTag, Star, Image, ImageStorage, User,
Visibility, RepositoryPermission, TupleSelector, RepositoryActionCount, Visibility, RepositoryPermission, TupleSelector, RepositoryActionCount,
Role, RepositoryAuthorizedEmail, TagManifest, db_for_update, Role, RepositoryAuthorizedEmail, TagManifest, DerivedStorageForImage,
get_epoch_timestamp, db_random_func) db_for_update, get_epoch_timestamp, db_random_func)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -108,7 +108,7 @@ def garbage_collect_repository(namespace_name, repository_name):
def garbage_collect_repo(repo): def garbage_collect_repo(repo):
logger.debug('Garbage collecting repository %s', repo.id) logger.debug('Garbage collecting repository %s', repo.id)
storage_id_whitelist = {} storage_id_whitelist = set()
tag.garbage_collect_tags(repo) tag.garbage_collect_tags(repo)
with db_transaction(): with db_transaction():
@ -142,6 +142,21 @@ def garbage_collect_repo(repo):
if len(to_remove) > 0: if len(to_remove) > 0:
logger.info('Cleaning up unreferenced images: %s', to_remove) logger.info('Cleaning up unreferenced images: %s', to_remove)
storage_id_whitelist = {images_to_storages[to_remove_id] for to_remove_id in to_remove} storage_id_whitelist = {images_to_storages[to_remove_id] for to_remove_id in to_remove}
# Lookup any derived images for the images to remove.
derived = DerivedStorageForImage.select().where(
DerivedStorageForImage.source_image << to_remove)
has_derived = False
for derived_image in derived:
has_derived = True
storage_id_whitelist.add(derived_image.derivative_id)
# Delete any derived images and the images themselves.
if has_derived:
DerivedStorageForImage.delete().where(
DerivedStorageForImage.source_image << to_remove).execute()
Image.delete().where(Image.id << to_remove).execute() Image.delete().where(Image.id << to_remove).execute()
if len(to_remove) > 0: if len(to_remove) > 0:

View file

@ -3,7 +3,7 @@ import logging
from peewee import JOIN_LEFT_OUTER, fn, SQL from peewee import JOIN_LEFT_OUTER, fn, SQL
from data.model import config, db_transaction, InvalidImageException from data.model import config, db_transaction, InvalidImageException
from data.database import (ImageStorage, Image, DerivedImageStorage, ImageStoragePlacement, from data.database import (ImageStorage, Image, DerivedStorageForImage, ImageStoragePlacement,
ImageStorageLocation, ImageStorageTransformation, ImageStorageSignature, ImageStorageLocation, ImageStorageTransformation, ImageStorageSignature,
ImageStorageSignatureKind, Repository, Namespace) ImageStorageSignatureKind, Repository, Namespace)
@ -17,18 +17,6 @@ def add_storage_placement(storage, location_name):
ImageStoragePlacement.create(location=location, storage=storage) ImageStoragePlacement.create(location=location, storage=storage)
def find_or_create_derived_storage(source, transformation_name, preferred_location):
existing = find_derived_storage(source, transformation_name)
if existing is not None:
return existing
logger.debug('Creating storage dervied from source: %s', source.uuid)
trans = ImageStorageTransformation.get(name=transformation_name)
new_storage = create_v1_storage(preferred_location)
DerivedImageStorage.create(source=source, derivative=new_storage, transformation=trans)
return new_storage
def garbage_collect_storage(storage_id_whitelist): def garbage_collect_storage(storage_id_whitelist):
if len(storage_id_whitelist) == 0: if len(storage_id_whitelist) == 0:
return return
@ -42,37 +30,11 @@ def garbage_collect_storage(storage_id_whitelist):
.switch(ImageStorage) .switch(ImageStorage)
.join(Image, JOIN_LEFT_OUTER) .join(Image, JOIN_LEFT_OUTER)
.switch(ImageStorage) .switch(ImageStorage)
.join(DerivedImageStorage, JOIN_LEFT_OUTER, .join(DerivedStorageForImage, JOIN_LEFT_OUTER,
on=(ImageStorage.id == DerivedImageStorage.derivative)) on=(ImageStorage.id == DerivedStorageForImage.derivative))
.where(ImageStorage.id << list(candidates)) .where(ImageStorage.id << list(candidates))
.group_by(*group_by) .group_by(*group_by)
.having((fn.Count(Image.id) == 0) & (fn.Count(DerivedImageStorage.id) == 0))) .having((fn.Count(Image.id) == 0) & (fn.Count(DerivedStorageForImage.id) == 0)))
# Note: We remove the derived image storage in its own transaction as a way to reduce the
# time that the transaction holds on the database indicies. This could result in a derived
# image storage being deleted for an image storage which is later reused during this time,
# but since these are caches anyway, it isn't terrible and worth the tradeoff (for now).
logger.debug('Garbage collecting derived storage from candidates: %s', storage_id_whitelist)
with db_transaction():
# Find out which derived storages will be removed, and add them to the whitelist
# The comma after ImageStorage.id is VERY important, it makes it a tuple, which is a sequence
orphaned_from_candidates = list(orphaned_storage_query(ImageStorage.select(ImageStorage.id),
storage_id_whitelist,
(ImageStorage.id,)))
if len(orphaned_from_candidates) > 0:
derived_to_remove = (ImageStorage
.select(ImageStorage.id)
.join(DerivedImageStorage,
on=(ImageStorage.id == DerivedImageStorage.derivative))
.where(DerivedImageStorage.source << orphaned_from_candidates))
storage_id_whitelist.update({derived.id for derived in derived_to_remove})
# Remove the dervived image storages with sources of orphaned storages
(DerivedImageStorage
.delete()
.where(DerivedImageStorage.source << orphaned_from_candidates)
.execute())
# Note: Both of these deletes must occur in the same transaction (unfortunately) because a # Note: Both of these deletes must occur in the same transaction (unfortunately) because a
# storage without any placement is invalid, and a placement cannot exist without a storage. # storage without any placement is invalid, and a placement cannot exist without a storage.
@ -151,36 +113,6 @@ def lookup_storage_signature(storage, signature_kind):
return None return None
def find_derived_storage(source, transformation_name):
try:
found = (ImageStorage
.select(ImageStorage, DerivedImageStorage)
.join(DerivedImageStorage, on=(ImageStorage.id == DerivedImageStorage.derivative))
.join(ImageStorageTransformation)
.where(DerivedImageStorage.source == source,
ImageStorageTransformation.name == transformation_name)
.get())
found.locations = {placement.location.name for placement in found.imagestorageplacement_set}
return found
except ImageStorage.DoesNotExist:
return None
def delete_derived_storage_by_uuid(storage_uuid):
try:
image_storage = get_storage_by_uuid(storage_uuid)
except InvalidImageException:
return
try:
DerivedImageStorage.get(derivative=image_storage)
except DerivedImageStorage.DoesNotExist:
return
image_storage.delete_instance(recursive=True)
def _get_storage(query_modifier): def _get_storage(query_modifier):
query = (ImageStoragePlacement query = (ImageStoragePlacement
.select(ImageStoragePlacement, ImageStorage, ImageStorageLocation) .select(ImageStoragePlacement, ImageStorage, ImageStorageLocation)

View file

@ -1,7 +1,6 @@
import logging import logging
import json import json
import hashlib import hashlib
import uuid
from flask import redirect, Blueprint, abort, send_file, make_response from flask import redirect, Blueprint, abort, send_file, make_response
@ -86,7 +85,7 @@ def _write_synthetic_image_to_storage(verb, linked_storage_uuid, linked_location
logger.debug('Exception when building %s image %s: %s', verb, linked_storage_uuid, ex) logger.debug('Exception when building %s image %s: %s', verb, linked_storage_uuid, ex)
with database.UseThenDisconnect(app.config): with database.UseThenDisconnect(app.config):
model.storage.delete_derived_storage_by_uuid(linked_storage_uuid) model.image.delete_derived_storage_by_uuid(linked_storage_uuid)
queue_file.add_exception_handler(handle_exception) queue_file.add_exception_handler(handle_exception)
@ -137,10 +136,10 @@ def _repo_verb_signature(namespace, repository, tag, verb, checker=None, **kwarg
# Verify that the image exists and that we have access to it. # Verify that the image exists and that we have access to it.
store = Storage(app) store = Storage(app)
result = _verify_repo_verb(store, namespace, repository, tag, verb, checker) result = _verify_repo_verb(store, namespace, repository, tag, verb, checker)
(repo_image, tag_image, image_json) = result (repo_image, _, _) = result
# Lookup the derived image storage for the verb. # Lookup the derived image storage for the verb.
derived = model.storage.find_derived_storage(repo_image.storage, verb) derived = model.image.find_derived_storage_for_image(repo_image, verb)
if derived is None or derived.uploading: if derived is None or derived.uploading:
return make_response('', 202) return make_response('', 202)
@ -166,27 +165,26 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=
# Log the action. # Log the action.
track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, **kwargs) track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, **kwargs)
# Lookup/create the derived image storage for the verb. # Lookup/create the derived image storage for the verb and repo image.
#derived = model.storage.find_or_create_derived_storage(repo_image.storage, verb, derived = model.image.find_or_create_derived_storage(repo_image, verb,
# store.preferred_locations[0]) store.preferred_locations[0])
#if not derived.uploading: if not derived.uploading:
# logger.debug('Derived %s image %s exists in storage', verb, derived.uuid) logger.debug('Derived %s image %s exists in storage', verb, derived.uuid)
# derived_layer_path = model.storage.get_layer_path(derived) derived_layer_path = model.storage.get_layer_path(derived)
# download_url = store.get_direct_download_url(derived.locations, derived_layer_path) download_url = store.get_direct_download_url(derived.locations, derived_layer_path)
# if download_url: if download_url:
# logger.debug('Redirecting to download URL for derived %s image %s', verb, derived.uuid) logger.debug('Redirecting to download URL for derived %s image %s', verb, derived.uuid)
# return redirect(download_url) return redirect(download_url)
# # Close the database handle here for this process before we send the long download. # Close the database handle here for this process before we send the long download.
# database.close_db_filter(None) database.close_db_filter(None)
# logger.debug('Sending cached derived %s image %s', verb, derived.uuid) logger.debug('Sending cached derived %s image %s', verb, derived.uuid)
# return send_file(store.stream_read_file(derived.locations, derived_layer_path)) return send_file(store.stream_read_file(derived.locations, derived_layer_path))
derived_uuid = str(uuid.uuid4())
logger.debug('Building and returning derived %s image %s', verb, derived_uuid) logger.debug('Building and returning derived %s image %s', verb, derived.uuid)
# Load the image's JSON layer. # Load the image's JSON layer.
if not image_json: if not image_json:
@ -207,23 +205,23 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=
args, finished=_cleanup) args, finished=_cleanup)
client_queue_file = QueueFile(queue_process.create_queue(), 'client') client_queue_file = QueueFile(queue_process.create_queue(), 'client')
#storage_queue_file = QueueFile(queue_process.create_queue(), 'storage') storage_queue_file = QueueFile(queue_process.create_queue(), 'storage')
# If signing is required, add a QueueFile for signing the image as we stream it out. # If signing is required, add a QueueFile for signing the image as we stream it out.
#signing_queue_file = None signing_queue_file = None
#if sign and signer.name: if sign and signer.name:
# signing_queue_file = QueueFile(queue_process.create_queue(), 'signing') signing_queue_file = QueueFile(queue_process.create_queue(), 'signing')
# Start building. # Start building.
queue_process.run() queue_process.run()
# Start the storage saving. # Start the storage saving.
#storage_args = (verb, derived_uuid, derived.locations, storage_queue_file) storage_args = (verb, derived.uuid, derived.locations, storage_queue_file)
#QueueProcess.run_process(_write_synthetic_image_to_storage, storage_args, finished=_cleanup) QueueProcess.run_process(_write_synthetic_image_to_storage, storage_args, finished=_cleanup)
#if sign and signer.name: if sign and signer.name:
# signing_args = (verb, derived_uuid, signing_queue_file) signing_args = (verb, derived.uuid, signing_queue_file)
# QueueProcess.run_process(_sign_sythentic_image, signing_args, finished=_cleanup) QueueProcess.run_process(_sign_sythentic_image, signing_args, finished=_cleanup)
# Close the database handle here for this process before we send the long download. # Close the database handle here for this process before we send the long download.
database.close_db_filter(None) database.close_db_filter(None)

Binary file not shown.

View file

@ -56,6 +56,6 @@ up_postgres
echo '> Running Full Test Suite (postgres)' echo '> Running Full Test Suite (postgres)'
set +e set +e
run_tests "postgresql://postgres@127.0.0.1/genschema" run_tests "postgresql://postgres@192.168.99.100/genschema"
set -e set -e
down_postgres down_postgres

View file

@ -1568,9 +1568,9 @@ class TestDeleteRepository(ApiTestCase):
# Create some repository action count entries. # Create some repository action count entries.
RepositoryActionCount.create(repository=repository, date=datetime.datetime.now(), count=1) RepositoryActionCount.create(repository=repository, date=datetime.datetime.now(), count=1)
RepositoryActionCount.create(repository=repository, RepositoryActionCount.create(repository=repository,
date=datetime.datetime.now() - datetime.timedelta(days=1), count=2) date=datetime.datetime.now() - datetime.timedelta(days=2), count=2)
RepositoryActionCount.create(repository=repository, RepositoryActionCount.create(repository=repository,
date=datetime.datetime.now() - datetime.timedelta(days=3), count=6) date=datetime.datetime.now() - datetime.timedelta(days=5), count=6)
# Delete the repository. # Delete the repository.
self.deleteResponse(Repository, params=dict(repository=self.COMPLEX_REPO)) self.deleteResponse(Repository, params=dict(repository=self.COMPLEX_REPO))

View file

@ -40,8 +40,8 @@ class TestGarbageCollection(unittest.TestCase):
image.storage.save() image.storage.save()
# Create derived images as well. # Create derived images as well.
for i in range(0, 2): model.image.find_or_create_derived_storage(image, 'squash', preferred)
model.storage.find_or_create_derived_storage(image.storage, 'squash', preferred) model.image.find_or_create_derived_storage(image, 'aci', preferred)
# Add some additional placements to the image. # Add some additional placements to the image.
for location_name in ['local_eu']: for location_name in ['local_eu']: