This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/data/model/repository.py
2016-12-13 18:40:58 -05:00

467 lines
17 KiB
Python

import logging
import random
from datetime import timedelta, datetime
from peewee import JOIN_LEFT_OUTER, fn, SQL, IntegrityError
from cachetools import ttl_cache
from data.model import (DataModelException, tag, db_transaction, storage, permission,
_basequery)
from data.database import (Repository, Namespace, RepositoryTag, Star, Image, User,
Visibility, RepositoryPermission, RepositoryActionCount,
Role, RepositoryAuthorizedEmail, TagManifest, DerivedStorageForImage,
Label, TagManifestLabel, db_for_update, get_epoch_timestamp,
db_random_func, db_concat_func)
logger = logging.getLogger(__name__)
def get_repository_count():
return Repository.select().count()
def get_public_repo_visibility():
return _basequery.get_public_repo_visibility()
def create_repository(namespace, name, creating_user, visibility='private'):
private = Visibility.get(name=visibility)
namespace_user = User.get(username=namespace)
repo = Repository.create(name=name, visibility=private, namespace_user=namespace_user)
admin = Role.get(name='admin')
if creating_user and not creating_user.organization:
RepositoryPermission.create(user=creating_user, repository=repo, role=admin)
if creating_user.username != namespace:
# Permission prototypes only work for orgs
permission.apply_default_permissions(repo, creating_user)
return repo
def get_repository(namespace_name, repository_name):
try:
return _basequery.get_existing_repository(namespace_name, repository_name)
except Repository.DoesNotExist:
return None
def purge_repository(namespace_name, repository_name):
""" Completely delete all traces of the repository. Will return True upon
complete success, and False upon partial or total failure. Garbage
collection is incremental and repeatable, so this return value does
not need to be checked or responded to.
"""
repo = _basequery.get_existing_repository(namespace_name, repository_name)
# Delete all tags to allow gc to reclaim storage
previously_referenced = tag.purge_all_tags(repo)
unreferenced_image_q = Image.select(Image.id).where(Image.repository == repo)
if len(previously_referenced) > 0:
unreferenced_image_q = (unreferenced_image_q
.where(~(Image.id << list(previously_referenced))))
unreferenced_candidates = set(img[0] for img in unreferenced_image_q.tuples())
# Gc to remove the images and storage
all_repo_images = previously_referenced | unreferenced_candidates
successful_gc = garbage_collect_repo(repo, all_repo_images)
if not successful_gc:
return False
# Delete the rest of the repository metadata
fetched = _basequery.get_existing_repository(namespace_name, repository_name)
fetched.delete_instance(recursive=True, delete_nullable=False)
return True
@ttl_cache(maxsize=1, ttl=600)
def _get_gc_expiration_policies():
policy_tuples_query = (Namespace
.select(Namespace.removed_tag_expiration_s)
.distinct()
.limit(100) # This sucks but it's the only way to limit memory
.tuples())
return [policy[0] for policy in policy_tuples_query]
def get_random_gc_policy():
""" Return a single random policy from the database to use when garbage collecting.
"""
return random.choice(_get_gc_expiration_policies())
def find_repository_with_garbage(limit_to_gc_policy_s):
expiration_timestamp = get_epoch_timestamp() - limit_to_gc_policy_s
try:
candidates = (RepositoryTag
.select(RepositoryTag.repository)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(~(RepositoryTag.lifetime_end_ts >> None),
(RepositoryTag.lifetime_end_ts <= expiration_timestamp),
(Namespace.removed_tag_expiration_s == limit_to_gc_policy_s))
.limit(500)
.distinct()
.alias('candidates'))
found = (RepositoryTag
.select(candidates.c.repository_id)
.from_(candidates)
.order_by(db_random_func())
.get())
if found is None:
return
return Repository.get(Repository.id == found.repository_id)
except RepositoryTag.DoesNotExist:
return None
except Repository.DoesNotExist:
return None
def garbage_collect_repo(repo, extra_candidate_set=None):
""" Garbage collect the specified repository object. This will remove all
images, derived images, and other associated metadata, for images which
are no longer referenced by a tag or another image which is itself
tagged. Returns True if garbage collection was completed without error
and False otherwise. Retries are safe and work incrementally, so this
return value does not need to be checked or handled.
"""
logger.debug('Garbage collecting repository %s', repo.id)
storage_id_whitelist = set()
candidate_orphan_image_set = tag.garbage_collect_tags(repo)
if extra_candidate_set:
candidate_orphan_image_set.update(extra_candidate_set)
if not len(candidate_orphan_image_set):
logger.debug('No candidate images for GC for repo: %s', repo.id)
return True
candidates_orphans = list(candidate_orphan_image_set)
with db_transaction():
Candidate = Image.alias()
Tagged = Image.alias()
ancestor_superset = Tagged.ancestors ** db_concat_func(Candidate.ancestors, Candidate.id, '/%')
# We are going to compute all images which are being referenced in two ways:
# First, we will find all images which have their ancestor paths appear in
# another image. Secondly, we union in all of the candidate images which are
# directly referenced by a tag. This can be used in a subquery to directly
# find which candidates are being referenced without any client side
# computation or extra round trips.
ancestor_referenced = (Candidate
.select(Candidate.id)
.join(Tagged, on=ancestor_superset)
.join(RepositoryTag, on=(Tagged.id == RepositoryTag.image))
.where(RepositoryTag.repository == repo.id,
Candidate.id << candidates_orphans))
direct_referenced = (RepositoryTag
.select(RepositoryTag.image)
.where(RepositoryTag.repository == repo.id,
RepositoryTag.image << candidates_orphans))
referenced_candidates = (direct_referenced | ancestor_referenced)
# We desire two pieces of information from the database from the following
# query: all of the image ids which are associated with this repository,
# and the storages which are associated with those images. In order to
# fetch just this information, and bypass all of the peewee model parsing
# code, which is overkill for just two fields, we use a tuple query, and
# feed that directly to the dictionary tuple constructor which takes an
# iterable of tuples containing [(k, v), (k, v), ...]
unreferenced_candidates = (Image
.select(Image.id, Image.storage)
.where(Image.id << candidates_orphans,
~(Image.id << referenced_candidates))
.tuples())
unreferecend_images_to_storages = dict(unreferenced_candidates)
to_remove = unreferecend_images_to_storages.keys()
if len(to_remove) > 0:
logger.info('Cleaning up unreferenced images: %s', to_remove)
storage_id_whitelist = set(unreferecend_images_to_storages.values())
# Lookup any derived images for the images to remove.
derived = DerivedStorageForImage.select().where(
DerivedStorageForImage.source_image << to_remove)
has_derived = False
for derived_image in derived:
has_derived = True
storage_id_whitelist.add(derived_image.derivative_id)
# Delete any derived images and the images themselves.
if has_derived:
try:
(DerivedStorageForImage
.delete()
.where(DerivedStorageForImage.source_image << to_remove)
.execute())
except IntegrityError:
logger.info('Could not GC derived images %s; will try again soon', to_remove)
return False
try:
Image.delete().where(Image.id << to_remove).execute()
except IntegrityError:
logger.info('Could not GC images %s; will try again soon', to_remove)
return False
if len(to_remove) > 0:
logger.info('Garbage collecting storage for images: %s', to_remove)
storage.garbage_collect_storage(storage_id_whitelist)
return True
def star_repository(user, repository):
""" Stars a repository. """
star = Star.create(user=user.id, repository=repository.id)
star.save()
def unstar_repository(user, repository):
""" Unstars a repository. """
try:
(Star
.delete()
.where(Star.repository == repository.id, Star.user == user.id)
.execute())
except Star.DoesNotExist:
raise DataModelException('Star not found.')
def get_user_starred_repositories(user):
""" Retrieves all of the repositories a user has starred. """
query = (Repository
.select(Repository, User, Visibility, Repository.id.alias('rid'))
.join(Star)
.switch(Repository)
.join(User)
.switch(Repository)
.join(Visibility)
.where(Star.user == user))
return query
def repository_is_starred(user, repository):
""" Determines whether a user has starred a repository or not. """
try:
(Star
.select()
.where(Star.repository == repository.id, Star.user == user.id)
.get())
return True
except Star.DoesNotExist:
return False
def get_when_last_modified(repository_ids):
if not repository_ids:
return {}
tuples = (RepositoryTag
.select(RepositoryTag.repository, fn.Max(RepositoryTag.lifetime_start_ts))
.where(RepositoryTag.repository << repository_ids)
.group_by(RepositoryTag.repository)
.tuples())
last_modified_map = {}
for record in tuples:
last_modified_map[record[0]] = record[1]
return last_modified_map
def get_visible_repositories(username, namespace=None, include_public=False, start_id=None,
limit=None):
""" Returns the repositories visible to the given user (if any).
"""
if not include_public and not username:
# Short circuit by returning a query that will find no repositories. We need to return a query
# here, as it will be modified by other queries later on.
return Repository.select(Repository.id.alias('rid')).where(Repository.id == -1)
query = (Repository
.select(Repository.name, Repository.id.alias('rid'),
Repository.description, Namespace.username, Repository.visibility)
.switch(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id)))
if username:
# Note: We only need the permissions table if we will filter based on a user's permissions.
query = query.switch(Repository).distinct().join(RepositoryPermission, JOIN_LEFT_OUTER)
query = _basequery.filter_to_repos_for_user(query, username, namespace, include_public,
start_id=start_id)
if limit is not None:
query = query.limit(limit).order_by(SQL('rid'))
return query
def get_sorted_matching_repositories(prefix, only_public, checker, limit=10):
""" Returns repositories matching the given prefix string and passing the given checker
function.
"""
last_week = datetime.now() - timedelta(weeks=1)
results = []
existing_ids = []
def get_search_results(search_clause, with_count=False):
if len(results) >= limit:
return
select_items = [Repository, Namespace]
if with_count:
select_items.append(fn.Sum(RepositoryActionCount.count).alias('count'))
query = (Repository
.select(*select_items)
.join(Namespace, on=(Namespace.id == Repository.namespace_user))
.switch(Repository)
.where(search_clause)
.group_by(Repository.id, Namespace.id))
if only_public:
query = query.where(Repository.visibility == _basequery.get_public_repo_visibility())
if existing_ids:
query = query.where(~(Repository.id << existing_ids))
if with_count:
query = (query
.switch(Repository)
.join(RepositoryActionCount)
.where(RepositoryActionCount.date >= last_week)
.order_by(fn.Sum(RepositoryActionCount.count).desc()))
for result in query:
if len(results) >= limit:
return results
# Note: We compare IDs here, instead of objects, because calling .visibility on the
# Repository will kick off a new SQL query to retrieve that visibility enum value. We don't
# join the visibility table in SQL, as well, because it is ungodly slow in MySQL :-/
result.is_public = result.visibility_id == _basequery.get_public_repo_visibility().id
result.count = result.count if with_count else 0
if not checker(result):
continue
results.append(result)
existing_ids.append(result.id)
# For performance reasons, we conduct the repo name and repo namespace searches on their
# own. This also affords us the ability to give higher precedence to repository names matching
# over namespaces, which is semantically correct.
get_search_results(_basequery.prefix_search(Repository.name, prefix), with_count=True)
get_search_results(_basequery.prefix_search(Repository.name, prefix), with_count=False)
get_search_results(_basequery.prefix_search(Namespace.username, prefix), with_count=True)
get_search_results(_basequery.prefix_search(Namespace.username, prefix), with_count=False)
return results
def lookup_repository(repo_id):
try:
return Repository.get(Repository.id == repo_id)
except Repository.DoesNotExist:
return None
def is_repository_public(repository):
return repository.visibility == _basequery.get_public_repo_visibility()
def repository_is_public(namespace_name, repository_name):
try:
(Repository
.select()
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.switch(Repository)
.join(Visibility)
.where(Namespace.username == namespace_name, Repository.name == repository_name,
Visibility.name == 'public')
.get())
return True
except Repository.DoesNotExist:
return False
def set_repository_visibility(repo, visibility):
visibility_obj = Visibility.get(name=visibility)
if not visibility_obj:
return
repo.visibility = visibility_obj
repo.save()
def get_email_authorized_for_repo(namespace, repository, email):
try:
return (RepositoryAuthorizedEmail
.select(RepositoryAuthorizedEmail, Repository, Namespace)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(Namespace.username == namespace, Repository.name == repository,
RepositoryAuthorizedEmail.email == email)
.get())
except RepositoryAuthorizedEmail.DoesNotExist:
return None
def create_email_authorization_for_repo(namespace_name, repository_name, email):
try:
repo = _basequery.get_existing_repository(namespace_name, repository_name)
except Repository.DoesNotExist:
raise DataModelException('Invalid repository %s/%s' %
(namespace_name, repository_name))
return RepositoryAuthorizedEmail.create(repository=repo, email=email, confirmed=False)
def confirm_email_authorization_for_repo(code):
try:
found = (RepositoryAuthorizedEmail
.select(RepositoryAuthorizedEmail, Repository, Namespace)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(RepositoryAuthorizedEmail.code == code)
.get())
except RepositoryAuthorizedEmail.DoesNotExist:
raise DataModelException('Invalid confirmation code.')
found.confirmed = True
found.save()
return found
def list_popular_public_repos(action_count_threshold, time_span):
cutoff = datetime.now() - time_span
return (Repository
.select(Namespace.username, Repository.name)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.switch(Repository)
.join(RepositoryActionCount)
.where(RepositoryActionCount.date >= cutoff,
Repository.visibility == get_public_repo_visibility())
.group_by(RepositoryActionCount.repository, Repository.name, Namespace.username)
.having(fn.Sum(RepositoryActionCount.count) >= action_count_threshold)
.tuples())