This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/data/model/repository.py

452 lines
16 KiB
Python
Raw Normal View History

import logging
import random
from datetime import timedelta, datetime
from peewee import JOIN_LEFT_OUTER, fn, SQL, IntegrityError
from cachetools import ttl_cache
from data.model import (DataModelException, tag, db_transaction, storage, permission,
_basequery)
from data.database import (Repository, Namespace, RepositoryTag, Star, Image, User,
Visibility, RepositoryPermission, RepositoryActionCount,
Role, RepositoryAuthorizedEmail, TagManifest, DerivedStorageForImage,
Label, TagManifestLabel, db_for_update, get_epoch_timestamp,
db_random_func)
logger = logging.getLogger(__name__)
def get_public_repo_visibility():
return _basequery.get_public_repo_visibility()
def create_repository(namespace, name, creating_user, visibility='private'):
private = Visibility.get(name=visibility)
namespace_user = User.get(username=namespace)
repo = Repository.create(name=name, visibility=private, namespace_user=namespace_user)
admin = Role.get(name='admin')
if creating_user and not creating_user.organization:
RepositoryPermission.create(user=creating_user, repository=repo, role=admin)
if creating_user.username != namespace:
# Permission prototypes only work for orgs
permission.apply_default_permissions(repo, creating_user)
return repo
def get_repository(namespace_name, repository_name):
try:
return _basequery.get_existing_repository(namespace_name, repository_name)
except Repository.DoesNotExist:
return None
def _purge_all_repository_tags(namespace_name, repository_name):
""" Immediately purge all repository tags without respecting the lifeline procedure """
try:
repo = _basequery.get_existing_repository(namespace_name, repository_name)
except Repository.DoesNotExist:
raise DataModelException('Invalid repository \'%s/%s\'' %
(namespace_name, repository_name))
# Finds all the tags to delete.
repo_tags = list(RepositoryTag.select().where(RepositoryTag.repository == repo.id))
if not repo_tags:
return
# Find all labels to delete.
2016-08-26 20:07:49 +00:00
manifest_labels_query = (TagManifestLabel
.select()
.where(TagManifestLabel.repository == repo))
2016-08-26 20:07:49 +00:00
label_ids = [manifest_label.label_id for manifest_label in manifest_labels_query]
if label_ids:
# Delete all the mapping entries.
TagManifestLabel.delete().where(TagManifestLabel.repository == repo).execute()
2016-08-26 20:07:49 +00:00
# Delete all the matching labels.
Label.delete().where(Label.id << label_ids).execute()
# Delete all the manifests.
TagManifest.delete().where(TagManifest.tag << repo_tags).execute()
# Delete all tags.
RepositoryTag.delete().where(RepositoryTag.repository == repo.id).execute()
def purge_repository(namespace_name, repository_name):
# Delete all tags to allow gc to reclaim storage
_purge_all_repository_tags(namespace_name, repository_name)
# Gc to remove the images and storage
garbage_collect_repository(namespace_name, repository_name)
# Delete the rest of the repository metadata
fetched = _basequery.get_existing_repository(namespace_name, repository_name)
fetched.delete_instance(recursive=True, delete_nullable=False)
@ttl_cache(maxsize=1, ttl=600)
def _get_gc_expiration_policies():
policy_tuples_query = (Namespace
.select(Namespace.removed_tag_expiration_s)
.distinct()
.limit(100) # This sucks but it's the only way to limit memory
.tuples())
return [policy[0] for policy in policy_tuples_query]
def get_random_gc_policy():
""" Return a single random policy from the database to use when garbage collecting.
"""
return random.choice(_get_gc_expiration_policies())
def find_repository_with_garbage(limit_to_gc_policy_s):
expiration_timestamp = get_epoch_timestamp() - limit_to_gc_policy_s
try:
candidates = (RepositoryTag
.select(RepositoryTag.repository)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(~(RepositoryTag.lifetime_end_ts >> None),
(RepositoryTag.lifetime_end_ts <= expiration_timestamp),
(Namespace.removed_tag_expiration_s == limit_to_gc_policy_s))
.limit(500)
.distinct()
.alias('candidates'))
found = (RepositoryTag
.select(candidates.c.repository_id)
.from_(candidates)
.order_by(db_random_func())
.get())
if found is None:
return
return Repository.get(Repository.id == found.repository_id)
except RepositoryTag.DoesNotExist:
return None
except Repository.DoesNotExist:
return None
def garbage_collect_repository(namespace_name, repository_name):
repo = get_repository(namespace_name, repository_name)
if repo is not None:
garbage_collect_repo(repo)
def garbage_collect_repo(repo):
logger.debug('Garbage collecting repository %s', repo.id)
storage_id_whitelist = set()
tag.garbage_collect_tags(repo)
with db_transaction():
# Get a list of all images used by tags in the repository
tagged_images = (Image
.select(Image.id, Image.ancestors)
.join(RepositoryTag)
.where(Image.repository == repo))
def gen_referenced_ancestors():
for tagged_image in tagged_images:
# The ancestor list is in the format '/1/2/3/', extract just the ids
ancestor_id_strings = tagged_image.ancestors.split('/')[1:-1]
for img_id_str in ancestor_id_strings:
yield int(img_id_str)
yield tagged_image.id
referenced_ancestors = set(gen_referenced_ancestors())
# We desire two pieces of information from the database from the following
# query: all of the image ids which are associated with this repository,
# and the storages which are associated with those images. In order to
# fetch just this information, and bypass all of the peewee model parsing
# code, which is overkill for just two fields, we use a tuple query, and
# feed that directly to the dictionary tuple constructor which takes an
# iterable of tuples containing [(k, v), (k, v), ...]
all_repo_images = Image.select(Image.id, Image.storage).where(Image.repository == repo).tuples()
images_to_storages = dict(all_repo_images)
to_remove = list(set(images_to_storages.keys()).difference(referenced_ancestors))
if len(to_remove) > 0:
logger.info('Cleaning up unreferenced images: %s', to_remove)
storage_id_whitelist = {images_to_storages[to_remove_id] for to_remove_id in to_remove}
# Lookup any derived images for the images to remove.
derived = DerivedStorageForImage.select().where(
DerivedStorageForImage.source_image << to_remove)
has_derived = False
for derived_image in derived:
has_derived = True
storage_id_whitelist.add(derived_image.derivative_id)
# Delete any derived images and the images themselves.
if has_derived:
try:
(DerivedStorageForImage
.delete()
.where(DerivedStorageForImage.source_image << to_remove)
.execute())
except IntegrityError:
logger.info('Could not GC derived images %s; will try again soon', to_remove)
return
try:
Image.delete().where(Image.id << to_remove).execute()
except IntegrityError:
logger.info('Could not GC images %s; will try again soon', to_remove)
return
if len(to_remove) > 0:
logger.info('Garbage collecting storage for images: %s', to_remove)
storage.garbage_collect_storage(storage_id_whitelist)
def star_repository(user, repository):
""" Stars a repository. """
star = Star.create(user=user.id, repository=repository.id)
star.save()
def unstar_repository(user, repository):
""" Unstars a repository. """
try:
(Star
.delete()
.where(Star.repository == repository.id, Star.user == user.id)
.execute())
except Star.DoesNotExist:
raise DataModelException('Star not found.')
def get_user_starred_repositories(user):
""" Retrieves all of the repositories a user has starred. """
query = (Repository
.select(Repository, User, Visibility, Repository.id.alias('rid'))
.join(Star)
.switch(Repository)
.join(User)
.switch(Repository)
.join(Visibility)
.where(Star.user == user))
return query
def repository_is_starred(user, repository):
""" Determines whether a user has starred a repository or not. """
try:
(Star
.select()
.where(Star.repository == repository.id, Star.user == user.id)
.get())
return True
except Star.DoesNotExist:
return False
def get_when_last_modified(repository_ids):
if not repository_ids:
return {}
tuples = (RepositoryTag
.select(RepositoryTag.repository, fn.Max(RepositoryTag.lifetime_start_ts))
.where(RepositoryTag.repository << repository_ids)
.group_by(RepositoryTag.repository)
.tuples())
last_modified_map = {}
for record in tuples:
last_modified_map[record[0]] = record[1]
return last_modified_map
def get_visible_repositories(username, namespace=None, include_public=False, start_id=None,
limit=None):
""" Returns the repositories visible to the given user (if any).
"""
2015-07-20 18:17:26 +00:00
if not include_public and not username:
2016-06-30 21:31:46 +00:00
# Short circuit by returning a query that will find no repositories. We need to return a query
# here, as it will be modified by other queries later on.
return Repository.select(Repository.id.alias('rid')).where(Repository.id == -1)
2015-07-20 18:17:26 +00:00
query = (Repository
.select(Repository.name, Repository.id.alias('rid'),
Repository.description, Namespace.username, Repository.visibility)
.switch(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id)))
if username:
# Note: We only need the permissions table if we will filter based on a user's permissions.
query = query.switch(Repository).distinct().join(RepositoryPermission, JOIN_LEFT_OUTER)
query = _basequery.filter_to_repos_for_user(query, username, namespace, include_public,
start_id=start_id)
if limit is not None:
query = query.limit(limit).order_by(SQL('rid'))
return query
def get_sorted_matching_repositories(prefix, only_public, checker, limit=10):
""" Returns repositories matching the given prefix string and passing the given checker
function.
"""
last_week = datetime.now() - timedelta(weeks=1)
results = []
existing_ids = []
def get_search_results(search_clause, with_count=False):
if len(results) >= limit:
return
select_items = [Repository, Namespace]
if with_count:
select_items.append(fn.Sum(RepositoryActionCount.count).alias('count'))
query = (Repository
.select(*select_items)
.join(Namespace, on=(Namespace.id == Repository.namespace_user))
.switch(Repository)
.where(search_clause)
.group_by(Repository.id, Namespace.id))
if only_public:
query = query.where(Repository.visibility == _basequery.get_public_repo_visibility())
if existing_ids:
query = query.where(~(Repository.id << existing_ids))
if with_count:
query = (query
.switch(Repository)
.join(RepositoryActionCount)
.where(RepositoryActionCount.date >= last_week)
.order_by(fn.Sum(RepositoryActionCount.count).desc()))
for result in query:
if len(results) >= limit:
return results
# Note: We compare IDs here, instead of objects, because calling .visibility on the
# Repository will kick off a new SQL query to retrieve that visibility enum value. We don't
# join the visibility table in SQL, as well, because it is ungodly slow in MySQL :-/
result.is_public = result.visibility_id == _basequery.get_public_repo_visibility().id
result.count = result.count if with_count else 0
if not checker(result):
continue
results.append(result)
existing_ids.append(result.id)
# For performance reasons, we conduct the repo name and repo namespace searches on their
# own. This also affords us the ability to give higher precedence to repository names matching
# over namespaces, which is semantically correct.
get_search_results(_basequery.prefix_search(Repository.name, prefix), with_count=True)
get_search_results(_basequery.prefix_search(Repository.name, prefix), with_count=False)
get_search_results(_basequery.prefix_search(Namespace.username, prefix), with_count=True)
get_search_results(_basequery.prefix_search(Namespace.username, prefix), with_count=False)
return results
def lookup_repository(repo_id):
try:
return Repository.get(Repository.id == repo_id)
except Repository.DoesNotExist:
return None
def is_repository_public(repository):
return repository.visibility == _basequery.get_public_repo_visibility()
def repository_is_public(namespace_name, repository_name):
try:
(Repository
.select()
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.switch(Repository)
.join(Visibility)
.where(Namespace.username == namespace_name, Repository.name == repository_name,
Visibility.name == 'public')
.get())
return True
except Repository.DoesNotExist:
return False
def set_repository_visibility(repo, visibility):
visibility_obj = Visibility.get(name=visibility)
if not visibility_obj:
return
repo.visibility = visibility_obj
repo.save()
def get_email_authorized_for_repo(namespace, repository, email):
try:
return (RepositoryAuthorizedEmail
.select(RepositoryAuthorizedEmail, Repository, Namespace)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(Namespace.username == namespace, Repository.name == repository,
RepositoryAuthorizedEmail.email == email)
.get())
except RepositoryAuthorizedEmail.DoesNotExist:
return None
def create_email_authorization_for_repo(namespace_name, repository_name, email):
try:
repo = _basequery.get_existing_repository(namespace_name, repository_name)
except Repository.DoesNotExist:
raise DataModelException('Invalid repository %s/%s' %
(namespace_name, repository_name))
return RepositoryAuthorizedEmail.create(repository=repo, email=email, confirmed=False)
def confirm_email_authorization_for_repo(code):
try:
found = (RepositoryAuthorizedEmail
.select(RepositoryAuthorizedEmail, Repository, Namespace)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(RepositoryAuthorizedEmail.code == code)
.get())
except RepositoryAuthorizedEmail.DoesNotExist:
raise DataModelException('Invalid confirmation code.')
found.confirmed = True
found.save()
return found
def list_popular_public_repos(action_count_threshold, time_span):
cutoff = datetime.now() - time_span
return (Repository
.select(Namespace.username, Repository.name)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.switch(Repository)
.join(RepositoryActionCount)
.where(RepositoryActionCount.date >= cutoff,
Repository.visibility == get_public_repo_visibility())
2016-06-30 21:31:46 +00:00
.group_by(RepositoryActionCount.repository, Repository.name, Namespace.username)
.having(fn.Sum(RepositoryActionCount.count) >= action_count_threshold)
.tuples())