This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/data/model/blob.py
Joseph Schorr f75f315037 Optimize lookup of shared global blobs
Currently, we only have one (the shared empty layer), but this should make the blob lookups for repositories significantly faster, as we won't need to do the massive join.
2019-02-14 12:46:42 -05:00

204 lines
7.5 KiB
Python

import logging
from datetime import datetime
from uuid import uuid4
from data.model import (tag, _basequery, BlobDoesNotExist, InvalidBlobUpload, db_transaction,
storage as storage_model, InvalidImageException)
from data.database import (Repository, Namespace, ImageStorage, Image, ImageStoragePlacement,
BlobUpload, ImageStorageLocation, db_random_func)
logger = logging.getLogger(__name__)
def get_repository_blob_by_digest(repository, blob_digest):
""" Find the content-addressable blob linked to the specified repository.
"""
try:
storage = (ImageStorage
.select(ImageStorage.uuid)
.join(Image)
.where(Image.repository == repository,
ImageStorage.content_checksum == blob_digest,
ImageStorage.uploading == False)
.get())
return storage_model.get_storage_by_uuid(storage.uuid)
except (ImageStorage.DoesNotExist, InvalidImageException):
raise BlobDoesNotExist('Blob does not exist with digest: {0}'.format(blob_digest))
def get_repo_blob_by_digest(namespace, repo_name, blob_digest):
""" Find the content-addressable blob linked to the specified repository.
"""
try:
storage = (ImageStorage
.select(ImageStorage.uuid)
.join(Image)
.join(Repository)
.join(Namespace, on=(Namespace.id == Repository.namespace_user))
.where(Repository.name == repo_name, Namespace.username == namespace,
ImageStorage.content_checksum == blob_digest,
ImageStorage.uploading == False)
.get())
return storage_model.get_storage_by_uuid(storage.uuid)
except (ImageStorage.DoesNotExist, InvalidImageException):
raise BlobDoesNotExist('Blob does not exist with digest: {0}'.format(blob_digest))
def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_obj, byte_count,
link_expiration_s, uncompressed_byte_count=None):
""" Store a record of the blob and temporarily link it to the specified repository.
"""
with db_transaction():
try:
storage = ImageStorage.get(content_checksum=blob_digest)
storage.image_size = byte_count
if uncompressed_byte_count is not None:
storage.uncompressed_size = uncompressed_byte_count
storage.save()
ImageStoragePlacement.get(storage=storage, location=location_obj)
except ImageStorage.DoesNotExist:
storage = ImageStorage.create(content_checksum=blob_digest, uploading=False,
image_size=byte_count,
uncompressed_size=uncompressed_byte_count)
ImageStoragePlacement.create(storage=storage, location=location_obj)
except ImageStoragePlacement.DoesNotExist:
ImageStoragePlacement.create(storage=storage, location=location_obj)
_temp_link_blob(namespace, repo_name, storage, link_expiration_s)
return storage
def temp_link_blob(namespace, repo_name, blob_digest, link_expiration_s):
""" Temporarily links to the blob record from the given namespace. If the blob record is not
found, return None.
"""
with db_transaction():
try:
storage = ImageStorage.get(content_checksum=blob_digest)
except ImageStorage.DoesNotExist:
return None
_temp_link_blob(namespace, repo_name, storage, link_expiration_s)
return storage
def _temp_link_blob(namespace, repo_name, storage, link_expiration_s):
""" Note: Should *always* be called by a parent under a transaction. """
random_image_name = str(uuid4())
repo = _basequery.get_existing_repository(namespace, repo_name)
# Create a temporary link into the repository, to be replaced by the v1 metadata later
# and create a temporary tag to reference it
image = Image.create(storage=storage, docker_image_id=random_image_name, repository=repo)
tag.create_temporary_hidden_tag(repo, image, link_expiration_s)
def get_stale_blob_upload(stale_timespan):
""" Returns a random blob upload which was created before the stale timespan. """
stale_threshold = datetime.now() - stale_timespan
try:
candidates = (BlobUpload
.select()
.where(BlobUpload.created <= stale_threshold)
.limit(500)
.distinct()
.alias('candidates'))
found = (BlobUpload
.select(candidates.c.id)
.from_(candidates)
.order_by(db_random_func())
.get())
if not found:
return None
return (BlobUpload
.select(BlobUpload, ImageStorageLocation)
.join(ImageStorageLocation)
.where(BlobUpload.id == found.id)
.get())
except BlobUpload.DoesNotExist:
return None
def get_blob_upload_by_uuid(upload_uuid):
""" Loads the upload with the given UUID, if any. """
try:
return (BlobUpload
.select()
.where(BlobUpload.uuid == upload_uuid)
.get())
except BlobUpload.DoesNotExist:
return None
def get_blob_upload(namespace, repo_name, upload_uuid):
""" Load the upload which is already in progress.
"""
try:
return (BlobUpload
.select(BlobUpload, ImageStorageLocation)
.join(ImageStorageLocation)
.switch(BlobUpload)
.join(Repository)
.join(Namespace, on=(Namespace.id == Repository.namespace_user))
.where(Repository.name == repo_name, Namespace.username == namespace,
BlobUpload.uuid == upload_uuid)
.get())
except BlobUpload.DoesNotExist:
raise InvalidBlobUpload()
def initiate_upload(namespace, repo_name, uuid, location_name, storage_metadata):
repo = _basequery.get_existing_repository(namespace, repo_name)
location = storage_model.get_image_location_for_name(location_name)
return BlobUpload.create(repository=repo, location=location.id, uuid=uuid,
storage_metadata=storage_metadata)
def get_shared_blob(digest):
""" Returns the ImageStorage blob with the given digest or, if not present,
returns None. This method is *only* to be used for shared blobs that are
globally accessible, such as the special empty gzipped tar layer that Docker
no longer pushes to us.
"""
try:
return ImageStorage.get(content_checksum=digest, uploading=False)
except ImageStorage.DoesNotExist:
return None
def get_or_create_shared_blob(digest, byte_data, storage):
""" Returns the ImageStorage blob with the given digest or, if not present,
adds a row and writes the given byte data to the storage engine.
This method is *only* to be used for shared blobs that are globally
accessible, such as the special empty gzipped tar layer that Docker
no longer pushes to us.
"""
try:
return ImageStorage.get(content_checksum=digest, uploading=False)
except ImageStorage.DoesNotExist:
record = ImageStorage.create(image_size=len(byte_data), content_checksum=digest,
cas_path=True, uploading=True)
preferred = storage.preferred_locations[0]
location_obj = ImageStorageLocation.get(name=preferred)
try:
storage.put_content([preferred], storage_model.get_layer_path(record), byte_data)
ImageStoragePlacement.create(storage=record, location=location_obj)
record.uploading = False
record.save()
except:
logger.exception('Exception when trying to write special layer %s', digest)
record.delete_instance()
raise
return record