import logging import random from datetime import timedelta, datetime from peewee import JOIN_LEFT_OUTER, fn, SQL, IntegrityError from cachetools import ttl_cache from data.model import (config, DataModelException, tag, db_transaction, storage, permission, _basequery) from data.database import (Repository, Namespace, RepositoryTag, Star, Image, ImageStorage, User, Visibility, RepositoryPermission, RepositoryActionCount, Role, RepositoryAuthorizedEmail, TagManifest, DerivedStorageForImage, Label, TagManifestLabel, db_for_update, get_epoch_timestamp, db_random_func, db_concat_func) from data.text import prefix_search from util.itertoolrecipes import take logger = logging.getLogger(__name__) def get_repo_kind_name(repo): return Repository.kind.get_name(repo.kind_id) def get_repository_count(): return Repository.select().count() def get_public_repo_visibility(): return _basequery.get_public_repo_visibility() def create_repository(namespace, name, creating_user, visibility='private', repo_kind='image'): private = Visibility.get(name=visibility) namespace_user = User.get(username=namespace) repo = Repository.create(name=name, visibility=private, namespace_user=namespace_user, kind=Repository.kind.get_id(repo_kind)) admin = Role.get(name='admin') yesterday = datetime.now() - timedelta(days=1) RepositoryActionCount.create(repository=repo, count=0, date=yesterday) if creating_user and not creating_user.organization: RepositoryPermission.create(user=creating_user, repository=repo, role=admin) if creating_user.username != namespace: # Permission prototypes only work for orgs permission.apply_default_permissions(repo, creating_user) return repo def get_repository(namespace_name, repository_name, kind_filter=None): try: return _basequery.get_existing_repository(namespace_name, repository_name, kind_filter=kind_filter) except Repository.DoesNotExist: return None def get_or_create_repository(namespace, name, creating_user, visibility='private', repo_kind='image'): repo = get_repository(namespace, name, repo_kind) if repo is None: repo = create_repository(namespace, name, creating_user, visibility, repo_kind) return repo def purge_repository(namespace_name, repository_name): """ Completely delete all traces of the repository. Will return True upon complete success, and False upon partial or total failure. Garbage collection is incremental and repeatable, so this return value does not need to be checked or responded to. """ try: repo = _basequery.get_existing_repository(namespace_name, repository_name) except Repository.DoesNotExist: return False # Delete all tags to allow gc to reclaim storage previously_referenced = tag.purge_all_tags(repo) unreferenced_image_q = Image.select(Image.id).where(Image.repository == repo) if len(previously_referenced) > 0: unreferenced_image_q = (unreferenced_image_q .where(~(Image.id << list(previously_referenced)))) unreferenced_candidates = set(img[0] for img in unreferenced_image_q.tuples()) # Gc to remove the images and storage all_repo_images = previously_referenced | unreferenced_candidates successful_gc = garbage_collect_repo(repo, all_repo_images) if not successful_gc: return False # Delete the rest of the repository metadata try: fetched = _basequery.get_existing_repository(namespace_name, repository_name) except Repository.DoesNotExist: return False fetched.delete_instance(recursive=True, delete_nullable=False) return True @ttl_cache(maxsize=1, ttl=600) def _get_gc_expiration_policies(): policy_tuples_query = (Namespace .select(Namespace.removed_tag_expiration_s) .distinct() .limit(100) # This sucks but it's the only way to limit memory .tuples()) return [policy[0] for policy in policy_tuples_query] def get_random_gc_policy(): """ Return a single random policy from the database to use when garbage collecting. """ return random.choice(_get_gc_expiration_policies()) def find_repository_with_garbage(limit_to_gc_policy_s): expiration_timestamp = get_epoch_timestamp() - limit_to_gc_policy_s try: candidates = (RepositoryTag .select(RepositoryTag.repository) .join(Repository) .join(Namespace, on=(Repository.namespace_user == Namespace.id)) .where(~(RepositoryTag.lifetime_end_ts >> None), (RepositoryTag.lifetime_end_ts <= expiration_timestamp), (Namespace.removed_tag_expiration_s == limit_to_gc_policy_s)) .limit(500) .distinct() .alias('candidates')) found = (RepositoryTag .select(candidates.c.repository_id) .from_(candidates) .order_by(db_random_func()) .get()) if found is None: return return Repository.get(Repository.id == found.repository_id) except RepositoryTag.DoesNotExist: return None except Repository.DoesNotExist: return None def garbage_collect_repo(repo, extra_candidate_set=None): """ Garbage collect the specified repository object. This will remove all images, derived images, and other associated metadata, for images which are no longer referenced by a tag or another image which is itself tagged. Returns True if garbage collection was completed without error and False otherwise. Retries are safe and work incrementally, so this return value does not need to be checked or handled. """ logger.debug('Garbage collecting repository %s', repo.id) storage_id_whitelist = set() candidate_orphan_image_set = tag.garbage_collect_tags(repo) if extra_candidate_set: candidate_orphan_image_set.update(extra_candidate_set) if not len(candidate_orphan_image_set): logger.debug('No candidate images for GC for repo: %s', repo.id) return True candidates_orphans = list(candidate_orphan_image_set) with db_transaction(): Candidate = Image.alias() Tagged = Image.alias() ancestor_superset = Tagged.ancestors ** db_concat_func(Candidate.ancestors, Candidate.id, '/%') # We are going to compute all images which are being referenced in two ways: # First, we will find all images which have their ancestor paths appear in # another image. Secondly, we union in all of the candidate images which are # directly referenced by a tag. This can be used in a subquery to directly # find which candidates are being referenced without any client side # computation or extra round trips. ancestor_referenced = (Candidate .select(Candidate.id) .join(Tagged, on=ancestor_superset) .join(RepositoryTag, on=(Tagged.id == RepositoryTag.image)) .where(RepositoryTag.repository == repo.id, Candidate.id << candidates_orphans)) direct_referenced = (RepositoryTag .select(RepositoryTag.image) .where(RepositoryTag.repository == repo.id, RepositoryTag.image << candidates_orphans)) referenced_candidates = (direct_referenced | ancestor_referenced) # We desire a few pieces of information from the database from the following # query: all of the image ids which are associated with this repository, # and the storages which are associated with those images. unreferenced_candidates = (Image .select(Image.id, Image.docker_image_id, ImageStorage.id, ImageStorage.uuid) .join(ImageStorage) .where(Image.id << candidates_orphans, ~(Image.id << referenced_candidates))) image_ids_to_remove = [candidate.id for candidate in unreferenced_candidates] if len(image_ids_to_remove) > 0: logger.info('Cleaning up unreferenced images: %s', image_ids_to_remove) storage_id_whitelist = set([candidate.storage_id for candidate in unreferenced_candidates]) # Lookup any derived images for the images to remove. derived = DerivedStorageForImage.select().where( DerivedStorageForImage.source_image << image_ids_to_remove) has_derived = False for derived_image in derived: has_derived = True storage_id_whitelist.add(derived_image.derivative_id) # Delete any derived images and the images themselves. if has_derived: try: (DerivedStorageForImage .delete() .where(DerivedStorageForImage.source_image << image_ids_to_remove) .execute()) except IntegrityError: logger.info('Could not GC derived images %s; will try again soon', image_ids_to_remove) return False try: Image.delete().where(Image.id << image_ids_to_remove).execute() except IntegrityError: logger.info('Could not GC images %s; will try again soon', image_ids_to_remove) return False # If any images were removed, GC any orphaned storages. if len(image_ids_to_remove) > 0: logger.info('Garbage collecting storage for images: %s', image_ids_to_remove) storage_ids_removed = set(storage.garbage_collect_storage(storage_id_whitelist)) # If any storages were removed and cleanup callbacks are registered, call them with # the images+storages removed. if storage_ids_removed and config.image_cleanup_callbacks: image_storages_removed = [candidate for candidate in unreferenced_candidates if candidate.storage_id in storage_ids_removed] for callback in config.image_cleanup_callbacks: callback(image_storages_removed) return True def star_repository(user, repository): """ Stars a repository. """ star = Star.create(user=user.id, repository=repository.id) star.save() def unstar_repository(user, repository): """ Unstars a repository. """ try: (Star .delete() .where(Star.repository == repository.id, Star.user == user.id) .execute()) except Star.DoesNotExist: raise DataModelException('Star not found.') def get_user_starred_repositories(user, repo_kind='image'): """ Retrieves all of the repositories a user has starred. """ query = (Repository .select(Repository, User, Visibility, Repository.id.alias('rid')) .join(Star) .switch(Repository) .join(User) .switch(Repository) .join(Visibility) .where(Star.user == user, Repository.kind == Repository.kind.get_id(repo_kind))) return query def repository_is_starred(user, repository): """ Determines whether a user has starred a repository or not. """ try: (Star .select() .where(Star.repository == repository.id, Star.user == user.id) .get()) return True except Star.DoesNotExist: return False def get_when_last_modified(repository_ids): if not repository_ids: return {} tuples = (RepositoryTag .select(RepositoryTag.repository, fn.Max(RepositoryTag.lifetime_start_ts)) .where(RepositoryTag.repository << repository_ids) .group_by(RepositoryTag.repository) .tuples()) last_modified_map = {} for record in tuples: last_modified_map[record[0]] = record[1] return last_modified_map def get_visible_repositories(username, namespace=None, repo_kind='image', include_public=False, start_id=None, limit=None): """ Returns the repositories visible to the given user (if any). """ if not include_public and not username: # Short circuit by returning a query that will find no repositories. We need to return a query # here, as it will be modified by other queries later on. return Repository.select(Repository.id.alias('rid')).where(Repository.id == -1) query = (Repository .select(Repository.name, Repository.id.alias('rid'), Repository.description, Namespace.username, Repository.visibility, Repository.kind) .switch(Repository) .join(Namespace, on=(Repository.namespace_user == Namespace.id))) if username: # Note: We only need the permissions table if we will filter based on a user's permissions. query = query.switch(Repository).distinct().join(RepositoryPermission, JOIN_LEFT_OUTER) query = _basequery.filter_to_repos_for_user(query, username, namespace, repo_kind, include_public, start_id=start_id) if limit is not None: query = query.limit(limit).order_by(SQL('rid')) return query def get_app_repository(namespace_name, repository_name): """ Find an application repository. """ try: return _basequery.get_existing_repository(namespace_name, repository_name, kind_filter='application') except Repository.DoesNotExist: return None def get_app_search(lookup, username=None, limit=50): return get_filtered_matching_repositories(lookup, filter_username=username, repo_kind='application', offset=0, limit=limit) def get_filtered_matching_repositories(lookup_value, filter_username=None, repo_kind='image', offset=0, limit=25): """ Returns an iterator of all repositories matching the given lookup value, with optional filtering to a specific user. If the user is unspecified, only public repositories will be returned. """ # Build the unfiltered search query. unfiltered_query = _get_sorted_matching_repositories(lookup_value, repo_kind=repo_kind, include_private=filter_username is not None) # Add a filter to the iterator, if necessary. if filter_username is not None: iterator = _filter_repositories_visible_to_username(unfiltered_query, filter_username, limit) else: iterator = unfiltered_query if offset > 0: take(offset, iterator) # Return the results. return list(take(limit, iterator)) def _filter_repositories_visible_to_username(unfiltered_query, filter_username, limit): encountered = set() chunk_count = limit * 2 unfiltered_page = 0 iteration_count = 0 while iteration_count < 10: # Just to be safe # Find the next chunk's worth of repository IDs, paginated by the chunk size. unfiltered_page = unfiltered_page + 1 found_ids = [r.id for r in unfiltered_query.paginate(unfiltered_page, chunk_count)] # Make sure we haven't encountered these results before. This code is used to handle # the case where we've previously seen a result, as pagination is not necessary # stable in SQL databases. unfiltered_repository_ids = set(found_ids) new_unfiltered_ids = unfiltered_repository_ids - encountered if not new_unfiltered_ids: break encountered.update(new_unfiltered_ids) # Filter the repositories found to only those visible to the current user. query = (Repository .select(Repository, Namespace) .distinct() .join(Namespace, on=(Namespace.id == Repository.namespace_user)) .switch(Repository) .join(RepositoryPermission) .where(Repository.id << list(new_unfiltered_ids))) filtered = _basequery.filter_to_repos_for_user(query, filter_username) for filtered_repo in filtered: yield filtered_repo # If the number of found IDs is less than the chunk count, then we're done. if len(found_ids) < chunk_count: break iteration_count = iteration_count + 1 def _get_sorted_matching_repositories(lookup_value, repo_kind='image', include_private=False): """ Returns a query of repositories matching the given lookup string, with optional inclusion of private repositories. Note that this method does *not* filter results based on visibility to users. """ last_week = datetime.now() - timedelta(weeks=1) query = (Repository .select(Repository, Namespace) .join(Namespace, on=(Namespace.id == Repository.namespace_user)) .where(Repository.name.match(lookup_value) | Repository.description.match(lookup_value), Repository.kind == Repository.kind.get_id(repo_kind)) .group_by(Repository.id, Namespace.id)) if not include_private: query = query.where(Repository.visibility == _basequery.get_public_repo_visibility()) query = (query .switch(Repository) .join(RepositoryActionCount) .where(RepositoryActionCount.date >= last_week) .order_by(fn.Sum(RepositoryActionCount.count).desc())) return query def lookup_repository(repo_id): try: return Repository.get(Repository.id == repo_id) except Repository.DoesNotExist: return None def is_repository_public(repository): return repository.visibility_id == _basequery.get_public_repo_visibility().id def repository_is_public(namespace_name, repository_name): try: (Repository .select() .join(Namespace, on=(Repository.namespace_user == Namespace.id)) .switch(Repository) .join(Visibility) .where(Namespace.username == namespace_name, Repository.name == repository_name, Visibility.name == 'public') .get()) return True except Repository.DoesNotExist: return False def set_repository_visibility(repo, visibility): visibility_obj = Visibility.get(name=visibility) if not visibility_obj: return repo.visibility = visibility_obj repo.save() def get_email_authorized_for_repo(namespace, repository, email): try: return (RepositoryAuthorizedEmail .select(RepositoryAuthorizedEmail, Repository, Namespace) .join(Repository) .join(Namespace, on=(Repository.namespace_user == Namespace.id)) .where(Namespace.username == namespace, Repository.name == repository, RepositoryAuthorizedEmail.email == email) .get()) except RepositoryAuthorizedEmail.DoesNotExist: return None def create_email_authorization_for_repo(namespace_name, repository_name, email): try: repo = _basequery.get_existing_repository(namespace_name, repository_name) except Repository.DoesNotExist: raise DataModelException('Invalid repository %s/%s' % (namespace_name, repository_name)) return RepositoryAuthorizedEmail.create(repository=repo, email=email, confirmed=False) def confirm_email_authorization_for_repo(code): try: found = (RepositoryAuthorizedEmail .select(RepositoryAuthorizedEmail, Repository, Namespace) .join(Repository) .join(Namespace, on=(Repository.namespace_user == Namespace.id)) .where(RepositoryAuthorizedEmail.code == code) .get()) except RepositoryAuthorizedEmail.DoesNotExist: raise DataModelException('Invalid confirmation code.') found.confirmed = True found.save() return found def list_popular_public_repos(action_count_threshold, time_span, repo_kind='image'): cutoff = datetime.now() - time_span return (Repository .select(Namespace.username, Repository.name) .join(Namespace, on=(Repository.namespace_user == Namespace.id)) .switch(Repository) .join(RepositoryActionCount) .where(RepositoryActionCount.date >= cutoff, Repository.visibility == get_public_repo_visibility(), Repository.kind == Repository.kind.get_id(repo_kind)) .group_by(RepositoryActionCount.repository, Repository.name, Namespace.username) .having(fn.Sum(RepositoryActionCount.count) >= action_count_threshold) .tuples())