Actually store the generated image storage in the database, and allow it to be garbage collected when the parent image storage is collected.
This commit is contained in:
parent
43555af63d
commit
11bb8e6448
5 changed files with 120 additions and 50 deletions
|
@ -192,7 +192,6 @@ class PermissionPrototype(BaseModel):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class AccessToken(BaseModel):
|
class AccessToken(BaseModel):
|
||||||
friendly_name = CharField(null=True)
|
friendly_name = CharField(null=True)
|
||||||
code = CharField(default=random_string_generator(length=64), unique=True,
|
code = CharField(default=random_string_generator(length=64), unique=True,
|
||||||
|
@ -238,6 +237,23 @@ class ImageStorage(BaseModel):
|
||||||
uploading = BooleanField(default=True, null=True)
|
uploading = BooleanField(default=True, null=True)
|
||||||
|
|
||||||
|
|
||||||
|
class ImageStorageTransformation(BaseModel):
|
||||||
|
name = CharField(index=True, unique=True)
|
||||||
|
|
||||||
|
|
||||||
|
class DerivedImageStorage(BaseModel):
|
||||||
|
source = ForeignKeyField(ImageStorage, null=True, related_name='source')
|
||||||
|
derivative = ForeignKeyField(ImageStorage, related_name='derivative')
|
||||||
|
transformation = ForeignKeyField(ImageStorageTransformation)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
read_slaves = (read_slave,)
|
||||||
|
indexes = (
|
||||||
|
(('source', 'transformation'), True),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class ImageStorageLocation(BaseModel):
|
class ImageStorageLocation(BaseModel):
|
||||||
name = CharField(unique=True, index=True)
|
name = CharField(unique=True, index=True)
|
||||||
|
|
||||||
|
@ -422,4 +438,4 @@ all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission,
|
||||||
OAuthApplication, OAuthAuthorizationCode, OAuthAccessToken, NotificationKind,
|
OAuthApplication, OAuthAuthorizationCode, OAuthAccessToken, NotificationKind,
|
||||||
Notification, ImageStorageLocation, ImageStoragePlacement,
|
Notification, ImageStorageLocation, ImageStoragePlacement,
|
||||||
ExternalNotificationEvent, ExternalNotificationMethod, RepositoryNotification,
|
ExternalNotificationEvent, ExternalNotificationMethod, RepositoryNotification,
|
||||||
RepositoryAuthorizedEmail]
|
RepositoryAuthorizedEmail, ImageStorageTransformation, DerivedImageStorage]
|
||||||
|
|
|
@ -70,6 +70,10 @@ class InvalidBuildTriggerException(DataModelException):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidImageException(DataModelException):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class TooManyUsersException(DataModelException):
|
class TooManyUsersException(DataModelException):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -1055,6 +1059,14 @@ def __translate_ancestry(old_ancestry, translations, repository, username, prefe
|
||||||
return '/%s/' % '/'.join(new_ids)
|
return '/%s/' % '/'.join(new_ids)
|
||||||
|
|
||||||
|
|
||||||
|
def _create_storage(location_name):
|
||||||
|
storage = ImageStorage.create()
|
||||||
|
location = ImageStorageLocation.get(name=location_name)
|
||||||
|
ImageStoragePlacement.create(location=location, storage=storage)
|
||||||
|
storage.locations = {location_name}
|
||||||
|
return storage
|
||||||
|
|
||||||
|
|
||||||
def find_create_or_link_image(docker_image_id, repository, username, translations,
|
def find_create_or_link_image(docker_image_id, repository, username, translations,
|
||||||
preferred_location):
|
preferred_location):
|
||||||
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
||||||
|
@ -1093,10 +1105,7 @@ def find_create_or_link_image(docker_image_id, repository, username, translation
|
||||||
origin_image_id = to_copy.id
|
origin_image_id = to_copy.id
|
||||||
except Image.DoesNotExist:
|
except Image.DoesNotExist:
|
||||||
logger.debug('Creating new storage for docker id: %s', docker_image_id)
|
logger.debug('Creating new storage for docker id: %s', docker_image_id)
|
||||||
storage = ImageStorage.create()
|
storage = _create_storage(preferred_location)
|
||||||
location = ImageStorageLocation.get(name=preferred_location)
|
|
||||||
ImageStoragePlacement.create(location=location, storage=storage)
|
|
||||||
storage.locations = {preferred_location}
|
|
||||||
|
|
||||||
logger.debug('Storage locations: %s', storage.locations)
|
logger.debug('Storage locations: %s', storage.locations)
|
||||||
|
|
||||||
|
@ -1114,6 +1123,43 @@ def find_create_or_link_image(docker_image_id, repository, username, translation
|
||||||
return new_image
|
return new_image
|
||||||
|
|
||||||
|
|
||||||
|
def find_or_create_derived_storage(source, transformation_name, preferred_location):
|
||||||
|
try:
|
||||||
|
found = (ImageStorage
|
||||||
|
.select(ImageStorage, DerivedImageStorage)
|
||||||
|
.join(DerivedImageStorage, on=(ImageStorage.id == DerivedImageStorage.derivative))
|
||||||
|
.join(ImageStorageTransformation)
|
||||||
|
.where(DerivedImageStorage.source == source,
|
||||||
|
ImageStorageTransformation.name == transformation_name)
|
||||||
|
.get())
|
||||||
|
|
||||||
|
found.locations = {placement.location.name for placement in found.imagestorageplacement_set}
|
||||||
|
return found
|
||||||
|
except ImageStorage.DoesNotExist:
|
||||||
|
logger.debug('Creating storage dervied from source: %s', source.uuid)
|
||||||
|
trans = ImageStorageTransformation.get(name=transformation_name)
|
||||||
|
new_storage = _create_storage(preferred_location)
|
||||||
|
DerivedImageStorage.create(source=source, derivative=new_storage, transformation=trans)
|
||||||
|
return new_storage
|
||||||
|
|
||||||
|
|
||||||
|
def get_storage_by_uuid(storage_uuid):
|
||||||
|
placements = list(ImageStoragePlacement
|
||||||
|
.select(ImageStoragePlacement, ImageStorage, ImageStorageLocation)
|
||||||
|
.join(ImageStorageLocation)
|
||||||
|
.switch(ImageStoragePlacement)
|
||||||
|
.join(ImageStorage)
|
||||||
|
.where(ImageStorage.uuid == storage_uuid))
|
||||||
|
|
||||||
|
if not placements:
|
||||||
|
raise InvalidImageException('No storage found with uuid: %s', storage_uuid)
|
||||||
|
|
||||||
|
found = placements[0].storage
|
||||||
|
found.locations = {placement.location.name for placement in placements}
|
||||||
|
|
||||||
|
return found
|
||||||
|
|
||||||
|
|
||||||
def set_image_size(docker_image_id, namespace_name, repository_name,
|
def set_image_size(docker_image_id, namespace_name, repository_name,
|
||||||
image_size):
|
image_size):
|
||||||
try:
|
try:
|
||||||
|
@ -1252,15 +1298,8 @@ def garbage_collect_repository(namespace_name, repository_name):
|
||||||
|
|
||||||
image_to_remove.delete_instance()
|
image_to_remove.delete_instance()
|
||||||
|
|
||||||
if uuids_to_check_for_gc:
|
def remove_storages(query):
|
||||||
storage_to_remove = (ImageStorage
|
for storage in query:
|
||||||
.select()
|
|
||||||
.join(Image, JOIN_LEFT_OUTER)
|
|
||||||
.group_by(ImageStorage)
|
|
||||||
.where(ImageStorage.uuid << list(uuids_to_check_for_gc))
|
|
||||||
.having(fn.Count(Image.id) == 0))
|
|
||||||
|
|
||||||
for storage in storage_to_remove:
|
|
||||||
logger.debug('Garbage collecting image storage: %s', storage.uuid)
|
logger.debug('Garbage collecting image storage: %s', storage.uuid)
|
||||||
|
|
||||||
image_path = config.store.image_path(storage.uuid)
|
image_path = config.store.image_path(storage.uuid)
|
||||||
|
@ -1269,7 +1308,24 @@ def garbage_collect_repository(namespace_name, repository_name):
|
||||||
placement.delete_instance()
|
placement.delete_instance()
|
||||||
config.store.remove({location_name}, image_path)
|
config.store.remove({location_name}, image_path)
|
||||||
|
|
||||||
storage.delete_instance()
|
storage.delete_instance(recursive=True)
|
||||||
|
|
||||||
|
if uuids_to_check_for_gc:
|
||||||
|
storage_to_remove = (ImageStorage
|
||||||
|
.select()
|
||||||
|
.join(Image, JOIN_LEFT_OUTER)
|
||||||
|
.group_by(ImageStorage)
|
||||||
|
.where(ImageStorage.uuid << list(uuids_to_check_for_gc))
|
||||||
|
.having(fn.Count(Image.id) == 0))
|
||||||
|
|
||||||
|
remove_storages(storage_to_remove)
|
||||||
|
|
||||||
|
# Now remove any derived image storages whose sources have been removed
|
||||||
|
derived_storages_to_remove = (ImageStorage
|
||||||
|
.select()
|
||||||
|
.join(DerivedImageStorage, on=(ImageStorage.id == DerivedImageStorage.derivative))
|
||||||
|
.where(DerivedImageStorage.source >> None))
|
||||||
|
remove_storages(derived_storages_to_remove)
|
||||||
|
|
||||||
return len(to_remove)
|
return len(to_remove)
|
||||||
|
|
||||||
|
|
|
@ -1,26 +1,24 @@
|
||||||
import logging
|
import logging
|
||||||
import json
|
import json
|
||||||
import hashlib
|
|
||||||
|
|
||||||
from flask import (make_response, request, session, Response, redirect,
|
from flask import redirect, Blueprint, abort, send_file
|
||||||
Blueprint, abort, send_file, make_response)
|
|
||||||
|
|
||||||
from app import storage as store, app
|
from app import storage as store, app
|
||||||
from auth.auth import process_auth
|
from auth.auth import process_auth
|
||||||
from auth.permissions import ReadRepositoryPermission
|
from auth.permissions import ReadRepositoryPermission
|
||||||
from data import model
|
from data import model
|
||||||
from endpoints.registry import set_cache_headers
|
from data import database
|
||||||
|
|
||||||
from util.queuefile import QueueFile
|
from util.queuefile import QueueFile
|
||||||
from util.queueprocess import QueueProcess
|
from util.queueprocess import QueueProcess
|
||||||
from util.gzipwrap import GzipWrap
|
from util.gzipwrap import GzipWrap
|
||||||
from util.streamlayerformat import StreamLayerMerger
|
from util.streamlayerformat import StreamLayerMerger
|
||||||
|
|
||||||
from werkzeug.wsgi import wrap_file
|
|
||||||
|
|
||||||
verbs = Blueprint('verbs', __name__)
|
verbs = Blueprint('verbs', __name__)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def _open_stream(namespace, repository, image_list):
|
def _open_stream(namespace, repository, image_list):
|
||||||
def get_next_layer():
|
def get_next_layer():
|
||||||
for current_image_id in image_list:
|
for current_image_id in image_list:
|
||||||
|
@ -32,20 +30,25 @@ def _open_stream(namespace, repository, image_list):
|
||||||
logger.debug('Returning image layer %s: %s' % (current_image_id, current_image_path))
|
logger.debug('Returning image layer %s: %s' % (current_image_id, current_image_path))
|
||||||
yield current_image_stream
|
yield current_image_stream
|
||||||
|
|
||||||
|
database.configure(app.config)
|
||||||
stream = GzipWrap(StreamLayerMerger(get_next_layer).get_generator())
|
stream = GzipWrap(StreamLayerMerger(get_next_layer).get_generator())
|
||||||
return stream.read
|
return stream.read
|
||||||
|
|
||||||
def _write_synthetic_image_to_storage(namespace, repository, locations,
|
|
||||||
synthetic_image_id, queue_file):
|
def _write_synthetic_image_to_storage(linked_storage_uuid, linked_locations, queue_file):
|
||||||
# TODO: make sure this synthetic image expires!
|
image_path = store.image_layer_path(linked_storage_uuid)
|
||||||
image_path = store.image_layer_path(synthetic_image_id)
|
store.stream_write(linked_locations, image_path, queue_file)
|
||||||
store.stream_write(locations, image_path, queue_file)
|
|
||||||
queue_file.close()
|
queue_file.close()
|
||||||
|
|
||||||
|
database.configure(app.config)
|
||||||
|
done_uploading = model.get_storage_by_uuid(linked_storage_uuid)
|
||||||
|
done_uploading.uploading = False
|
||||||
|
done_uploading.save()
|
||||||
|
|
||||||
|
|
||||||
@verbs.route('/<namespace>/<repository>/<tag>/squash', methods=['GET'])
|
@verbs.route('/<namespace>/<repository>/<tag>/squash', methods=['GET'])
|
||||||
@process_auth
|
@process_auth
|
||||||
@set_cache_headers
|
def get_squashed_tag(namespace, repository, tag):
|
||||||
def get_squashed_tag(namespace, repository, tag, headers):
|
|
||||||
permission = ReadRepositoryPermission(namespace, repository)
|
permission = ReadRepositoryPermission(namespace, repository)
|
||||||
if permission.can() or model.repository_is_public(namespace, repository):
|
if permission.can() or model.repository_is_public(namespace, repository):
|
||||||
# Lookup the requested tag.
|
# Lookup the requested tag.
|
||||||
|
@ -57,34 +60,26 @@ def get_squashed_tag(namespace, repository, tag, headers):
|
||||||
repo_image = model.get_repo_image(namespace, repository, tag_image.docker_image_id)
|
repo_image = model.get_repo_image(namespace, repository, tag_image.docker_image_id)
|
||||||
if not repo_image:
|
if not repo_image:
|
||||||
abort(404)
|
abort(404)
|
||||||
|
|
||||||
# Calculate a synthetic image ID by hashing the *image storage ID* with our
|
|
||||||
# secret. This is done to prevent the ID being guessable/overwritable by
|
|
||||||
# external pushes.
|
|
||||||
unhashed = str(repo_image.storage.id) + ':' + app.config['SECRET_KEY']
|
|
||||||
synthetic_image_id = hashlib.sha256(unhashed).hexdigest()
|
|
||||||
|
|
||||||
# Check to see if the synthetic image ID exists in storage. If so, we just return a 302.
|
derived = model.find_or_create_derived_storage(repo_image.storage, 'squash',
|
||||||
logger.debug('Looking up synthetic image %s', synthetic_image_id)
|
store.preferred_locations[0])
|
||||||
|
if not derived.uploading:
|
||||||
locations = repo_image.storage.locations
|
logger.debug('Derived image %s exists in storage', derived.uuid)
|
||||||
saved_image_path = store.image_layer_path(synthetic_image_id)
|
derived_layer_path = store.image_layer_path(derived.uuid)
|
||||||
if store.exists(locations, saved_image_path):
|
download_url = store.get_direct_download_url(derived.locations, derived_layer_path)
|
||||||
logger.debug('Synthetic image %s exists in storage', synthetic_image_id)
|
|
||||||
download_url = store.get_direct_download_url(locations, saved_image_path)
|
|
||||||
if download_url:
|
if download_url:
|
||||||
logger.debug('Redirecting to download URL for synthetic image %s', synthetic_image_id)
|
logger.debug('Redirecting to download URL for derived image %s', derived.uuid)
|
||||||
return redirect(download_url, code=302)
|
return redirect(download_url)
|
||||||
|
|
||||||
logger.debug('Sending cached synthetic image %s', synthetic_image_id)
|
logger.debug('Sending cached derived image %s', derived.uuid)
|
||||||
return send_file(store.stream_read_file(locations, saved_image_path))
|
return send_file(store.stream_read_file(derived.locations, derived_layer_path))
|
||||||
|
|
||||||
# Load the ancestry for the image.
|
# Load the ancestry for the image.
|
||||||
logger.debug('Building and returning synthetic image %s', synthetic_image_id)
|
logger.debug('Building and returning derived image %s', derived.uuid)
|
||||||
uuid = repo_image.storage.uuid
|
uuid = repo_image.storage.uuid
|
||||||
ancestry_data = store.get_content(repo_image.storage.locations, store.image_ancestry_path(uuid))
|
ancestry_data = store.get_content(repo_image.storage.locations, store.image_ancestry_path(uuid))
|
||||||
full_image_list = json.loads(ancestry_data)
|
full_image_list = json.loads(ancestry_data)
|
||||||
|
|
||||||
# Create a queue process to generate the data. The queue files will read from the process
|
# Create a queue process to generate the data. The queue files will read from the process
|
||||||
# and send the results to the client and storage.
|
# and send the results to the client and storage.
|
||||||
args = (namespace, repository, full_image_list)
|
args = (namespace, repository, full_image_list)
|
||||||
|
@ -92,12 +87,12 @@ def get_squashed_tag(namespace, repository, tag, headers):
|
||||||
|
|
||||||
client_queue_file = QueueFile(queue_process.create_queue(), 'client')
|
client_queue_file = QueueFile(queue_process.create_queue(), 'client')
|
||||||
storage_queue_file = QueueFile(queue_process.create_queue(), 'storage')
|
storage_queue_file = QueueFile(queue_process.create_queue(), 'storage')
|
||||||
|
|
||||||
# Start building.
|
# Start building.
|
||||||
queue_process.run()
|
queue_process.run()
|
||||||
|
|
||||||
# Start the storage saving.
|
# Start the storage saving.
|
||||||
storage_args = (namespace, repository, locations, synthetic_image_id, storage_queue_file)
|
storage_args = (derived.uuid, derived.locations, storage_queue_file)
|
||||||
QueueProcess.run_process(_write_synthetic_image_to_storage, storage_args)
|
QueueProcess.run_process(_write_synthetic_image_to_storage, storage_args)
|
||||||
|
|
||||||
# Return the client's data.
|
# Return the client's data.
|
||||||
|
|
|
@ -243,6 +243,8 @@ def initialize_database():
|
||||||
ImageStorageLocation.create(name='local_eu')
|
ImageStorageLocation.create(name='local_eu')
|
||||||
ImageStorageLocation.create(name='local_us')
|
ImageStorageLocation.create(name='local_us')
|
||||||
|
|
||||||
|
ImageStorageTransformation.create(name='squash')
|
||||||
|
|
||||||
# NOTE: These MUST be copied over to NotificationKind, since every external
|
# NOTE: These MUST be copied over to NotificationKind, since every external
|
||||||
# notification can also generate a Quay.io notification.
|
# notification can also generate a Quay.io notification.
|
||||||
ExternalNotificationEvent.create(name='repo_push')
|
ExternalNotificationEvent.create(name='repo_push')
|
||||||
|
|
|
@ -36,3 +36,4 @@ psycopg2
|
||||||
pyyaml
|
pyyaml
|
||||||
git+https://github.com/DevTable/aniso8601-fake.git
|
git+https://github.com/DevTable/aniso8601-fake.git
|
||||||
git+https://github.com/DevTable/anunidecode.git
|
git+https://github.com/DevTable/anunidecode.git
|
||||||
|
gipc
|
Reference in a new issue