This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.

167 lines
7.6 KiB
Raw Normal View History

from collections import defaultdict
from datetime import datetime, timedelta
from auth.permissions import ReadRepositoryPermission
from data import model, oci_model
from endpoints.api.repository_models_interface import RepositoryDataInterface, RepositoryBaseElement, Repository, \
ApplicationRepository, ImageRepositoryRepository, Tag, Channel, Release, Count
def create_channel(channel, releases_channels_map):
return Channel(,, channel.linked_tag.lifetime_start)
class PreOCIModel(RepositoryDataInterface):
PreOCIModel implements the data model for the Repo Email using a database schema
before it was changed to support the OCI specification.
def check_repository_usage(self, username, plan_found):
private_repos = model.user.get_private_repo_count(username)
if plan_found is None:
repos_allowed = 0
repos_allowed = plan_found['privateRepos']
user_or_org = model.user.get_namespace_user(username)
if private_repos > repos_allowed:
model.notification.create_unique_notification('over_private_usage', user_or_org,
{'namespace': username})
model.notification.delete_notifications_by_kind(user_or_org, 'over_private_usage')
def purge_repository(self, namespace_name, repository_name):
model.repository.purge_repository(namespace_name, repository_name)
user = model.user.get_namespace_user(namespace_name)
return user.username
def set_description(self, namespace_name, repository_name, description):
repo = model.repository.get_repository(namespace_name, repository_name)
model.repository.set_description(repo, description)
def set_trust(self, namespace_name, repository_name, trust):
repo = model.repository.get_repository(namespace_name, repository_name)
model.repository.set_trust(repo, trust)
def set_repository_visibility(self, namespace_name, repository_name, visibility):
repo = model.repository.get_repository(namespace_name, repository_name)
model.repository.set_repository_visibility(repo, visibility)
def get_repo_list(self, starred, user, repo_kind, namespace, username, public, page_token,
last_modified, popularity):
next_page_token = None
# Lookup the requested repositories (either starred or non-starred.)
if starred:
# Return the full list of repos starred by the current user that are still visible to them.
def can_view_repo(repo):
return ReadRepositoryPermission(repo.namespace_user.username,
unfiltered_repos = model.repository.get_user_starred_repositories(user,
repos = [repo for repo in unfiltered_repos if can_view_repo(repo)]
elif namespace:
# Repositories filtered by namespace do not need pagination (their results are fairly small),
# so we just do the lookup directly.
repos = list(
model.repository.get_visible_repositories(username=username, include_public=public,
namespace=namespace, kind_filter=repo_kind))
# Determine the starting offset for pagination. Note that we don't use the normal
# model.modelutil.paginate method here, as that does not operate over UNION queries, which
# get_visible_repositories will return if there is a logged-in user (for performance reasons).
# Also note the +1 on the limit, as paginate_query uses the extra result to determine whether
# there is a next page.
start_id = model.modelutil.pagination_start(page_token)
repo_query = model.repository.get_visible_repositories(
username=username, include_public=public, start_id=start_id, limit=REPOS_PER_PAGE + 1,
repos, next_page_token = model.modelutil.paginate_query(repo_query, limit=REPOS_PER_PAGE,
# Collect the IDs of the repositories found for subequent lookup of popularity
# and/or last modified.
last_modified_map = {}
action_sum_map = {}
if last_modified or popularity:
repository_ids = [repo.rid for repo in repos]
if last_modified:
last_modified_map = model.repository.get_when_last_modified(repository_ids)
if popularity:
action_sum_map = model.log.get_repositories_action_sums(repository_ids)
# Collect the IDs of the repositories that are starred for the user, so we can mark them
# in the returned results.
star_set = set()
if username:
starred_repos = model.repository.get_user_starred_repositories(user)
star_set = { for starred in starred_repos}
return [
RepositoryBaseElement(repo.namespace_user.username,, in star_set,
repo.visibility_id == model.repository.get_public_repo_visibility().id,
repo_kind, repo.description, repo.namespace_user.organization,
action_sum_map.get(repo.rid), last_modified, popularity, username)
for repo in repos
], next_page_token
def repo_exists(self, namespace_name, repository_name):
repo = model.repository.get_repository(namespace_name, repository_name)
if repo is None:
return False
return True
def create_repo(self, namespace_name, repository_name, owner, description, visibility='private',
repo = model.repository.create_repository(namespace_name, repository_name, owner, visibility,
model.repository.set_description(repo, description)
return Repository(namespace_name, repository_name)
def get_repo(self, namespace_name, repository_name, user):
repo = model.repository.get_repository(namespace_name, repository_name)
if repo is None:
return None
is_starred = model.repository.repository_is_starred(user, repo) if user else False
is_public = model.repository.is_repository_public(repo)
base = RepositoryBaseElement(
namespace_name, repository_name, is_starred, is_public,, repo.description,
repo.namespace_user.organization, repo.namespace_user.removed_tag_expiration_s, None, None,
False, False, False)
# Note: This is *temporary* code for the new OCI model stuff.
if base.kind_name == 'application':
channels =
releases = oci_model.release.get_release_objs(repo)
releases_channels_map = defaultdict(list)
return ApplicationRepository(
base, [create_channel(channel, releases_channels_map) for channel in channels], [
Release(, release.lifetime_start, releases_channels_map)
for release in releases
tags = model.tag.list_active_repo_tags(repo)
start_date = - timedelta(days=MAX_DAYS_IN_3_MONTHS)
counts = model.log.get_repository_action_counts(repo, start_date)
return ImageRepositoryRepository(base, [
Tag(, tag.image.docker_image_id, tag.image.aggregate_size, tag.lifetime_start_ts,
tag.tagmanifest.digest, tag.lifetime_end_ts) for tag in tags
], [Count(, count.count) for count in counts], repo.badge_token, repo.trust_enabled)
pre_oci_model = PreOCIModel()