464 lines
17 KiB
Python
464 lines
17 KiB
Python
import logging
|
|
import random
|
|
|
|
from datetime import timedelta, datetime
|
|
from peewee import JOIN_LEFT_OUTER, fn, SQL, IntegrityError
|
|
from cachetools import ttl_cache
|
|
|
|
from data.model import (DataModelException, tag, db_transaction, storage, permission,
|
|
_basequery)
|
|
from data.database import (Repository, Namespace, RepositoryTag, Star, Image, User,
|
|
Visibility, RepositoryPermission, RepositoryActionCount,
|
|
Role, RepositoryAuthorizedEmail, TagManifest, DerivedStorageForImage,
|
|
Label, TagManifestLabel, db_for_update, get_epoch_timestamp,
|
|
db_random_func, db_concat_func)
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_public_repo_visibility():
|
|
return _basequery.get_public_repo_visibility()
|
|
|
|
|
|
def create_repository(namespace, name, creating_user, visibility='private'):
|
|
private = Visibility.get(name=visibility)
|
|
namespace_user = User.get(username=namespace)
|
|
repo = Repository.create(name=name, visibility=private, namespace_user=namespace_user)
|
|
admin = Role.get(name='admin')
|
|
|
|
if creating_user and not creating_user.organization:
|
|
RepositoryPermission.create(user=creating_user, repository=repo, role=admin)
|
|
|
|
if creating_user.username != namespace:
|
|
# Permission prototypes only work for orgs
|
|
permission.apply_default_permissions(repo, creating_user)
|
|
return repo
|
|
|
|
|
|
def get_repository(namespace_name, repository_name):
|
|
try:
|
|
return _basequery.get_existing_repository(namespace_name, repository_name)
|
|
except Repository.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def purge_repository(namespace_name, repository_name):
|
|
""" Completely delete all traces of the repository. Will return True upon
|
|
complete success, and False upon partial or total failure. Garbage
|
|
collection is incremental and repeatable, so this return value does
|
|
not need to be checked or responded to.
|
|
"""
|
|
|
|
repo = _basequery.get_existing_repository(namespace_name, repository_name)
|
|
|
|
# Delete all tags to allow gc to reclaim storage
|
|
previously_referenced = tag.purge_all_tags(repo)
|
|
unreferenced_image_q = Image.select(Image.id).where(Image.repository == repo)
|
|
|
|
if len(previously_referenced) > 0:
|
|
unreferenced_image_q = (unreferenced_image_q
|
|
.where(~(Image.id << list(previously_referenced))))
|
|
|
|
unreferenced_candidates = set(img[0] for img in unreferenced_image_q.tuples())
|
|
|
|
# Gc to remove the images and storage
|
|
all_repo_images = previously_referenced | unreferenced_candidates
|
|
successful_gc = garbage_collect_repo(repo, all_repo_images)
|
|
|
|
if not successful_gc:
|
|
return False
|
|
|
|
# Delete the rest of the repository metadata
|
|
fetched = _basequery.get_existing_repository(namespace_name, repository_name)
|
|
fetched.delete_instance(recursive=True, delete_nullable=False)
|
|
|
|
return True
|
|
|
|
|
|
@ttl_cache(maxsize=1, ttl=600)
|
|
def _get_gc_expiration_policies():
|
|
policy_tuples_query = (Namespace
|
|
.select(Namespace.removed_tag_expiration_s)
|
|
.distinct()
|
|
.limit(100) # This sucks but it's the only way to limit memory
|
|
.tuples())
|
|
return [policy[0] for policy in policy_tuples_query]
|
|
|
|
|
|
def get_random_gc_policy():
|
|
""" Return a single random policy from the database to use when garbage collecting.
|
|
"""
|
|
return random.choice(_get_gc_expiration_policies())
|
|
|
|
|
|
def find_repository_with_garbage(limit_to_gc_policy_s):
|
|
expiration_timestamp = get_epoch_timestamp() - limit_to_gc_policy_s
|
|
|
|
try:
|
|
candidates = (RepositoryTag
|
|
.select(RepositoryTag.repository)
|
|
.join(Repository)
|
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
|
.where(~(RepositoryTag.lifetime_end_ts >> None),
|
|
(RepositoryTag.lifetime_end_ts <= expiration_timestamp),
|
|
(Namespace.removed_tag_expiration_s == limit_to_gc_policy_s))
|
|
.limit(500)
|
|
.distinct()
|
|
.alias('candidates'))
|
|
|
|
found = (RepositoryTag
|
|
.select(candidates.c.repository_id)
|
|
.from_(candidates)
|
|
.order_by(db_random_func())
|
|
.get())
|
|
|
|
if found is None:
|
|
return
|
|
|
|
return Repository.get(Repository.id == found.repository_id)
|
|
except RepositoryTag.DoesNotExist:
|
|
return None
|
|
except Repository.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def garbage_collect_repo(repo, extra_candidate_set=None):
|
|
""" Garbage collect the specified repository object. This will remove all
|
|
images, derived images, and other associated metadata, for images which
|
|
are no longer referenced by a tag or another image which is itself
|
|
tagged. Returns True if garbage collection was completed without error
|
|
and False otherwise. Retries are safe and work incrementally, so this
|
|
return value does not need to be checked or handled.
|
|
"""
|
|
logger.debug('Garbage collecting repository %s', repo.id)
|
|
|
|
storage_id_whitelist = set()
|
|
candidate_orphan_image_set = tag.garbage_collect_tags(repo)
|
|
|
|
if extra_candidate_set:
|
|
candidate_orphan_image_set.update(extra_candidate_set)
|
|
|
|
if not len(candidate_orphan_image_set):
|
|
logger.debug('No candidate images for GC for repo: %s', repo.id)
|
|
return True
|
|
|
|
candidates_orphans = list(candidate_orphan_image_set)
|
|
|
|
with db_transaction():
|
|
Candidate = Image.alias()
|
|
Tagged = Image.alias()
|
|
ancestor_superset = Tagged.ancestors ** db_concat_func(Candidate.ancestors, Candidate.id, '/%')
|
|
|
|
# We are going to compute all images which are being referenced in two ways:
|
|
# First, we will find all images which have their ancestor paths appear in
|
|
# another image. Secondly, we union in all of the candidate images which are
|
|
# directly referenced by a tag. This can be used in a subquery to directly
|
|
# find which candidates are being referenced without any client side
|
|
# computation or extra round trips.
|
|
ancestor_referenced = (Candidate
|
|
.select(Candidate.id)
|
|
.join(Tagged, on=ancestor_superset)
|
|
.join(RepositoryTag, on=(Tagged.id == RepositoryTag.image))
|
|
.where(RepositoryTag.repository == repo.id,
|
|
Candidate.id << candidates_orphans))
|
|
|
|
direct_referenced = (RepositoryTag
|
|
.select(RepositoryTag.image)
|
|
.where(RepositoryTag.repository == repo.id,
|
|
RepositoryTag.image << candidates_orphans))
|
|
|
|
referenced_candidates = (direct_referenced | ancestor_referenced)
|
|
|
|
# We desire two pieces of information from the database from the following
|
|
# query: all of the image ids which are associated with this repository,
|
|
# and the storages which are associated with those images. In order to
|
|
# fetch just this information, and bypass all of the peewee model parsing
|
|
# code, which is overkill for just two fields, we use a tuple query, and
|
|
# feed that directly to the dictionary tuple constructor which takes an
|
|
# iterable of tuples containing [(k, v), (k, v), ...]
|
|
unreferenced_candidates = (Image
|
|
.select(Image.id, Image.storage)
|
|
.where(Image.id << candidates_orphans,
|
|
~(Image.id << referenced_candidates))
|
|
.tuples())
|
|
|
|
unreferecend_images_to_storages = dict(unreferenced_candidates)
|
|
to_remove = unreferecend_images_to_storages.keys()
|
|
|
|
if len(to_remove) > 0:
|
|
logger.info('Cleaning up unreferenced images: %s', to_remove)
|
|
storage_id_whitelist = set(unreferecend_images_to_storages.values())
|
|
|
|
# Lookup any derived images for the images to remove.
|
|
derived = DerivedStorageForImage.select().where(
|
|
DerivedStorageForImage.source_image << to_remove)
|
|
|
|
has_derived = False
|
|
for derived_image in derived:
|
|
has_derived = True
|
|
storage_id_whitelist.add(derived_image.derivative_id)
|
|
|
|
# Delete any derived images and the images themselves.
|
|
if has_derived:
|
|
try:
|
|
(DerivedStorageForImage
|
|
.delete()
|
|
.where(DerivedStorageForImage.source_image << to_remove)
|
|
.execute())
|
|
except IntegrityError:
|
|
logger.info('Could not GC derived images %s; will try again soon', to_remove)
|
|
return False
|
|
|
|
try:
|
|
Image.delete().where(Image.id << to_remove).execute()
|
|
except IntegrityError:
|
|
logger.info('Could not GC images %s; will try again soon', to_remove)
|
|
return False
|
|
|
|
if len(to_remove) > 0:
|
|
logger.info('Garbage collecting storage for images: %s', to_remove)
|
|
storage.garbage_collect_storage(storage_id_whitelist)
|
|
|
|
return True
|
|
|
|
|
|
def star_repository(user, repository):
|
|
""" Stars a repository. """
|
|
star = Star.create(user=user.id, repository=repository.id)
|
|
star.save()
|
|
|
|
|
|
def unstar_repository(user, repository):
|
|
""" Unstars a repository. """
|
|
try:
|
|
(Star
|
|
.delete()
|
|
.where(Star.repository == repository.id, Star.user == user.id)
|
|
.execute())
|
|
except Star.DoesNotExist:
|
|
raise DataModelException('Star not found.')
|
|
|
|
|
|
def get_user_starred_repositories(user):
|
|
""" Retrieves all of the repositories a user has starred. """
|
|
query = (Repository
|
|
.select(Repository, User, Visibility, Repository.id.alias('rid'))
|
|
.join(Star)
|
|
.switch(Repository)
|
|
.join(User)
|
|
.switch(Repository)
|
|
.join(Visibility)
|
|
.where(Star.user == user))
|
|
|
|
return query
|
|
|
|
|
|
def repository_is_starred(user, repository):
|
|
""" Determines whether a user has starred a repository or not. """
|
|
try:
|
|
(Star
|
|
.select()
|
|
.where(Star.repository == repository.id, Star.user == user.id)
|
|
.get())
|
|
return True
|
|
except Star.DoesNotExist:
|
|
return False
|
|
|
|
|
|
def get_when_last_modified(repository_ids):
|
|
if not repository_ids:
|
|
return {}
|
|
|
|
tuples = (RepositoryTag
|
|
.select(RepositoryTag.repository, fn.Max(RepositoryTag.lifetime_start_ts))
|
|
.where(RepositoryTag.repository << repository_ids)
|
|
.group_by(RepositoryTag.repository)
|
|
.tuples())
|
|
|
|
last_modified_map = {}
|
|
for record in tuples:
|
|
last_modified_map[record[0]] = record[1]
|
|
|
|
return last_modified_map
|
|
|
|
|
|
def get_visible_repositories(username, namespace=None, include_public=False, start_id=None,
|
|
limit=None):
|
|
""" Returns the repositories visible to the given user (if any).
|
|
"""
|
|
if not include_public and not username:
|
|
# Short circuit by returning a query that will find no repositories. We need to return a query
|
|
# here, as it will be modified by other queries later on.
|
|
return Repository.select(Repository.id.alias('rid')).where(Repository.id == -1)
|
|
|
|
query = (Repository
|
|
.select(Repository.name, Repository.id.alias('rid'),
|
|
Repository.description, Namespace.username, Repository.visibility)
|
|
.switch(Repository)
|
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id)))
|
|
|
|
if username:
|
|
# Note: We only need the permissions table if we will filter based on a user's permissions.
|
|
query = query.switch(Repository).distinct().join(RepositoryPermission, JOIN_LEFT_OUTER)
|
|
|
|
query = _basequery.filter_to_repos_for_user(query, username, namespace, include_public,
|
|
start_id=start_id)
|
|
|
|
if limit is not None:
|
|
query = query.limit(limit).order_by(SQL('rid'))
|
|
|
|
return query
|
|
|
|
|
|
def get_sorted_matching_repositories(prefix, only_public, checker, limit=10):
|
|
""" Returns repositories matching the given prefix string and passing the given checker
|
|
function.
|
|
"""
|
|
last_week = datetime.now() - timedelta(weeks=1)
|
|
results = []
|
|
existing_ids = []
|
|
|
|
def get_search_results(search_clause, with_count=False):
|
|
if len(results) >= limit:
|
|
return
|
|
|
|
select_items = [Repository, Namespace]
|
|
if with_count:
|
|
select_items.append(fn.Sum(RepositoryActionCount.count).alias('count'))
|
|
|
|
query = (Repository
|
|
.select(*select_items)
|
|
.join(Namespace, on=(Namespace.id == Repository.namespace_user))
|
|
.switch(Repository)
|
|
.where(search_clause)
|
|
.group_by(Repository.id, Namespace.id))
|
|
|
|
if only_public:
|
|
query = query.where(Repository.visibility == _basequery.get_public_repo_visibility())
|
|
|
|
if existing_ids:
|
|
query = query.where(~(Repository.id << existing_ids))
|
|
|
|
if with_count:
|
|
query = (query
|
|
.switch(Repository)
|
|
.join(RepositoryActionCount)
|
|
.where(RepositoryActionCount.date >= last_week)
|
|
.order_by(fn.Sum(RepositoryActionCount.count).desc()))
|
|
|
|
for result in query:
|
|
if len(results) >= limit:
|
|
return results
|
|
|
|
# Note: We compare IDs here, instead of objects, because calling .visibility on the
|
|
# Repository will kick off a new SQL query to retrieve that visibility enum value. We don't
|
|
# join the visibility table in SQL, as well, because it is ungodly slow in MySQL :-/
|
|
result.is_public = result.visibility_id == _basequery.get_public_repo_visibility().id
|
|
result.count = result.count if with_count else 0
|
|
|
|
if not checker(result):
|
|
continue
|
|
|
|
results.append(result)
|
|
existing_ids.append(result.id)
|
|
|
|
# For performance reasons, we conduct the repo name and repo namespace searches on their
|
|
# own. This also affords us the ability to give higher precedence to repository names matching
|
|
# over namespaces, which is semantically correct.
|
|
get_search_results(_basequery.prefix_search(Repository.name, prefix), with_count=True)
|
|
get_search_results(_basequery.prefix_search(Repository.name, prefix), with_count=False)
|
|
|
|
get_search_results(_basequery.prefix_search(Namespace.username, prefix), with_count=True)
|
|
get_search_results(_basequery.prefix_search(Namespace.username, prefix), with_count=False)
|
|
|
|
return results
|
|
|
|
|
|
def lookup_repository(repo_id):
|
|
try:
|
|
return Repository.get(Repository.id == repo_id)
|
|
except Repository.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def is_repository_public(repository):
|
|
return repository.visibility == _basequery.get_public_repo_visibility()
|
|
|
|
|
|
def repository_is_public(namespace_name, repository_name):
|
|
try:
|
|
(Repository
|
|
.select()
|
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
|
.switch(Repository)
|
|
.join(Visibility)
|
|
.where(Namespace.username == namespace_name, Repository.name == repository_name,
|
|
Visibility.name == 'public')
|
|
.get())
|
|
return True
|
|
except Repository.DoesNotExist:
|
|
return False
|
|
|
|
|
|
def set_repository_visibility(repo, visibility):
|
|
visibility_obj = Visibility.get(name=visibility)
|
|
if not visibility_obj:
|
|
return
|
|
|
|
repo.visibility = visibility_obj
|
|
repo.save()
|
|
|
|
|
|
def get_email_authorized_for_repo(namespace, repository, email):
|
|
try:
|
|
return (RepositoryAuthorizedEmail
|
|
.select(RepositoryAuthorizedEmail, Repository, Namespace)
|
|
.join(Repository)
|
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
|
.where(Namespace.username == namespace, Repository.name == repository,
|
|
RepositoryAuthorizedEmail.email == email)
|
|
.get())
|
|
except RepositoryAuthorizedEmail.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def create_email_authorization_for_repo(namespace_name, repository_name, email):
|
|
try:
|
|
repo = _basequery.get_existing_repository(namespace_name, repository_name)
|
|
except Repository.DoesNotExist:
|
|
raise DataModelException('Invalid repository %s/%s' %
|
|
(namespace_name, repository_name))
|
|
|
|
return RepositoryAuthorizedEmail.create(repository=repo, email=email, confirmed=False)
|
|
|
|
|
|
def confirm_email_authorization_for_repo(code):
|
|
try:
|
|
found = (RepositoryAuthorizedEmail
|
|
.select(RepositoryAuthorizedEmail, Repository, Namespace)
|
|
.join(Repository)
|
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
|
.where(RepositoryAuthorizedEmail.code == code)
|
|
.get())
|
|
except RepositoryAuthorizedEmail.DoesNotExist:
|
|
raise DataModelException('Invalid confirmation code.')
|
|
|
|
found.confirmed = True
|
|
found.save()
|
|
|
|
return found
|
|
|
|
|
|
def list_popular_public_repos(action_count_threshold, time_span):
|
|
cutoff = datetime.now() - time_span
|
|
return (Repository
|
|
.select(Namespace.username, Repository.name)
|
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
|
.switch(Repository)
|
|
.join(RepositoryActionCount)
|
|
.where(RepositoryActionCount.date >= cutoff,
|
|
Repository.visibility == get_public_repo_visibility())
|
|
.group_by(RepositoryActionCount.repository, Repository.name, Namespace.username)
|
|
.having(fn.Sum(RepositoryActionCount.count) >= action_count_threshold)
|
|
.tuples())
|