Merge branch 'master' into nomenclature
Conflicts: test/data/test.db
This commit is contained in:
commit
f4681f2c18
60 changed files with 1716 additions and 496 deletions
116
data/database.py
116
data/database.py
|
@ -35,6 +35,36 @@ class CallableProxy(Proxy):
|
|||
raise AttributeError('Cannot use uninitialized Proxy.')
|
||||
return self.obj(*args, **kwargs)
|
||||
|
||||
|
||||
class CloseForLongOperation(object):
|
||||
""" Helper object which disconnects the database then reconnects after the nested operation
|
||||
completes.
|
||||
"""
|
||||
|
||||
def __init__(self, config_object):
|
||||
self.config_object = config_object
|
||||
|
||||
def __enter__(self):
|
||||
close_db_filter(None)
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
# Note: Nothing to do. The next SQL call will reconnect automatically.
|
||||
pass
|
||||
|
||||
|
||||
class UseThenDisconnect(object):
|
||||
""" Helper object for conducting work with a database and then tearing it down. """
|
||||
|
||||
def __init__(self, config_object):
|
||||
self.config_object = config_object
|
||||
|
||||
def __enter__(self):
|
||||
configure(self.config_object)
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
close_db_filter(None)
|
||||
|
||||
|
||||
db = Proxy()
|
||||
read_slave = Proxy()
|
||||
db_random_func = CallableProxy()
|
||||
|
@ -56,6 +86,7 @@ def _db_from_url(url, db_kwargs):
|
|||
|
||||
|
||||
def configure(config_object):
|
||||
logger.debug('Configuring database')
|
||||
db_kwargs = dict(config_object['DB_CONNECTION_ARGS'])
|
||||
write_db_uri = config_object['DB_URI']
|
||||
db.initialize(_db_from_url(write_db_uri, db_kwargs))
|
||||
|
@ -90,6 +121,15 @@ def close_db_filter(_):
|
|||
read_slave.close()
|
||||
|
||||
|
||||
class QuayUserField(ForeignKeyField):
|
||||
def __init__(self, allows_robots=False, *args, **kwargs):
|
||||
self.allows_robots = allows_robots
|
||||
if not 'rel_model' in kwargs:
|
||||
kwargs['rel_model'] = User
|
||||
|
||||
super(QuayUserField, self).__init__(*args, **kwargs)
|
||||
|
||||
|
||||
class BaseModel(ReadSlaveModel):
|
||||
class Meta:
|
||||
database = db
|
||||
|
@ -109,6 +149,19 @@ class User(BaseModel):
|
|||
invalid_login_attempts = IntegerField(default=0)
|
||||
last_invalid_login = DateTimeField(default=datetime.utcnow)
|
||||
|
||||
def delete_instance(self, recursive=False, delete_nullable=False):
|
||||
# If we are deleting a robot account, only execute the subset of queries necessary.
|
||||
if self.robot:
|
||||
# For all the model dependencies, only delete those that allow robots.
|
||||
for query, fk in self.dependencies(search_nullable=True):
|
||||
if isinstance(fk, QuayUserField) and fk.allows_robots:
|
||||
model = fk.model_class
|
||||
model.delete().where(query).execute()
|
||||
|
||||
# Delete the instance itself.
|
||||
super(User, self).delete_instance(recursive=False, delete_nullable=False)
|
||||
else:
|
||||
super(User, self).delete_instance(recursive=recursive, delete_nullable=delete_nullable)
|
||||
|
||||
class TeamRole(BaseModel):
|
||||
name = CharField(index=True)
|
||||
|
@ -116,7 +169,7 @@ class TeamRole(BaseModel):
|
|||
|
||||
class Team(BaseModel):
|
||||
name = CharField(index=True)
|
||||
organization = ForeignKeyField(User, index=True)
|
||||
organization = QuayUserField(index=True)
|
||||
role = ForeignKeyField(TeamRole)
|
||||
description = TextField(default='')
|
||||
|
||||
|
@ -130,7 +183,7 @@ class Team(BaseModel):
|
|||
|
||||
|
||||
class TeamMember(BaseModel):
|
||||
user = ForeignKeyField(User, index=True)
|
||||
user = QuayUserField(allows_robots=True, index=True)
|
||||
team = ForeignKeyField(Team, index=True)
|
||||
|
||||
class Meta:
|
||||
|
@ -144,7 +197,7 @@ class TeamMember(BaseModel):
|
|||
|
||||
class TeamMemberInvite(BaseModel):
|
||||
# Note: Either user OR email will be filled in, but not both.
|
||||
user = ForeignKeyField(User, index=True, null=True)
|
||||
user = QuayUserField(index=True, null=True)
|
||||
email = CharField(null=True)
|
||||
team = ForeignKeyField(Team, index=True)
|
||||
inviter = ForeignKeyField(User, related_name='inviter')
|
||||
|
@ -156,7 +209,7 @@ class LoginService(BaseModel):
|
|||
|
||||
|
||||
class FederatedLogin(BaseModel):
|
||||
user = ForeignKeyField(User, index=True)
|
||||
user = QuayUserField(allows_robots=True, index=True)
|
||||
service = ForeignKeyField(LoginService, index=True)
|
||||
service_ident = CharField()
|
||||
metadata_json = TextField(default='{}')
|
||||
|
@ -178,7 +231,7 @@ class Visibility(BaseModel):
|
|||
|
||||
|
||||
class Repository(BaseModel):
|
||||
namespace_user = ForeignKeyField(User, null=True)
|
||||
namespace_user = QuayUserField(null=True)
|
||||
name = CharField()
|
||||
visibility = ForeignKeyField(Visibility)
|
||||
description = TextField(null=True)
|
||||
|
@ -192,6 +245,24 @@ class Repository(BaseModel):
|
|||
(('namespace_user', 'name'), True),
|
||||
)
|
||||
|
||||
def delete_instance(self, recursive=False, delete_nullable=False):
|
||||
# Note: peewee generates extra nested deletion statements here that are slow and unnecessary.
|
||||
# Therefore, we define our own deletion order here and use the dependency system to verify it.
|
||||
ordered_dependencies = [RepositoryAuthorizedEmail, RepositoryTag, Image, LogEntry,
|
||||
RepositoryBuild, RepositoryBuildTrigger, RepositoryNotification,
|
||||
RepositoryPermission, AccessToken]
|
||||
|
||||
for query, fk in self.dependencies(search_nullable=True):
|
||||
model = fk.model_class
|
||||
if not model in ordered_dependencies:
|
||||
raise Exception('Missing repository deletion dependency: %s', model)
|
||||
|
||||
for model in ordered_dependencies:
|
||||
model.delete().where(model.repository == self).execute()
|
||||
|
||||
# Delete the repository itself.
|
||||
super(Repository, self).delete_instance(recursive=False, delete_nullable=False)
|
||||
|
||||
|
||||
class Role(BaseModel):
|
||||
name = CharField(index=True, unique=True)
|
||||
|
@ -199,7 +270,7 @@ class Role(BaseModel):
|
|||
|
||||
class RepositoryPermission(BaseModel):
|
||||
team = ForeignKeyField(Team, index=True, null=True)
|
||||
user = ForeignKeyField(User, index=True, null=True)
|
||||
user = QuayUserField(allows_robots=True, index=True, null=True)
|
||||
repository = ForeignKeyField(Repository, index=True)
|
||||
role = ForeignKeyField(Role)
|
||||
|
||||
|
@ -213,12 +284,12 @@ class RepositoryPermission(BaseModel):
|
|||
|
||||
|
||||
class PermissionPrototype(BaseModel):
|
||||
org = ForeignKeyField(User, index=True, related_name='orgpermissionproto')
|
||||
org = QuayUserField(index=True, related_name='orgpermissionproto')
|
||||
uuid = CharField(default=uuid_generator)
|
||||
activating_user = ForeignKeyField(User, index=True, null=True,
|
||||
related_name='userpermissionproto')
|
||||
delegate_user = ForeignKeyField(User, related_name='receivingpermission',
|
||||
null=True)
|
||||
activating_user = QuayUserField(allows_robots=True, index=True, null=True,
|
||||
related_name='userpermissionproto')
|
||||
delegate_user = QuayUserField(allows_robots=True,related_name='receivingpermission',
|
||||
null=True)
|
||||
delegate_team = ForeignKeyField(Team, related_name='receivingpermission',
|
||||
null=True)
|
||||
role = ForeignKeyField(Role)
|
||||
|
@ -249,16 +320,16 @@ class RepositoryBuildTrigger(BaseModel):
|
|||
uuid = CharField(default=uuid_generator)
|
||||
service = ForeignKeyField(BuildTriggerService, index=True)
|
||||
repository = ForeignKeyField(Repository, index=True)
|
||||
connected_user = ForeignKeyField(User)
|
||||
connected_user = QuayUserField()
|
||||
auth_token = CharField()
|
||||
config = TextField(default='{}')
|
||||
write_token = ForeignKeyField(AccessToken, null=True)
|
||||
pull_robot = ForeignKeyField(User, null=True, related_name='triggerpullrobot')
|
||||
pull_robot = QuayUserField(allows_robots=True, null=True, related_name='triggerpullrobot')
|
||||
|
||||
|
||||
class EmailConfirmation(BaseModel):
|
||||
code = CharField(default=random_string_generator(), unique=True, index=True)
|
||||
user = ForeignKeyField(User)
|
||||
user = QuayUserField()
|
||||
pw_reset = BooleanField(default=False)
|
||||
new_email = CharField(null=True)
|
||||
email_confirm = BooleanField(default=False)
|
||||
|
@ -315,7 +386,7 @@ class Image(BaseModel):
|
|||
# to be globally unique we can't treat them as such for permissions and
|
||||
# security reasons. So rather than Repository <-> Image being many to many
|
||||
# each image now belongs to exactly one repository.
|
||||
docker_image_id = CharField()
|
||||
docker_image_id = CharField(index=True)
|
||||
repository = ForeignKeyField(Repository)
|
||||
|
||||
# '/' separated list of ancestory ids, e.g. /1/2/6/7/10/
|
||||
|
@ -365,7 +436,7 @@ class RepositoryBuild(BaseModel):
|
|||
started = DateTimeField(default=datetime.now)
|
||||
display_name = CharField()
|
||||
trigger = ForeignKeyField(RepositoryBuildTrigger, null=True, index=True)
|
||||
pull_robot = ForeignKeyField(User, null=True, related_name='buildpullrobot')
|
||||
pull_robot = QuayUserField(null=True, related_name='buildpullrobot')
|
||||
logs_archived = BooleanField(default=False)
|
||||
|
||||
|
||||
|
@ -384,11 +455,10 @@ class LogEntryKind(BaseModel):
|
|||
|
||||
class LogEntry(BaseModel):
|
||||
kind = ForeignKeyField(LogEntryKind, index=True)
|
||||
account = ForeignKeyField(User, index=True, related_name='account')
|
||||
performer = ForeignKeyField(User, index=True, null=True,
|
||||
related_name='performer')
|
||||
account = QuayUserField(index=True, related_name='account')
|
||||
performer = QuayUserField(allows_robots=True, index=True, null=True,
|
||||
related_name='performer')
|
||||
repository = ForeignKeyField(Repository, index=True, null=True)
|
||||
access_token = ForeignKeyField(AccessToken, null=True)
|
||||
datetime = DateTimeField(default=datetime.now, index=True)
|
||||
ip = CharField(null=True)
|
||||
metadata_json = TextField(default='{}')
|
||||
|
@ -399,7 +469,7 @@ class OAuthApplication(BaseModel):
|
|||
client_secret = CharField(default=random_string_generator(length=40))
|
||||
redirect_uri = CharField()
|
||||
application_uri = CharField()
|
||||
organization = ForeignKeyField(User)
|
||||
organization = QuayUserField()
|
||||
|
||||
name = CharField()
|
||||
description = TextField(default='')
|
||||
|
@ -416,7 +486,7 @@ class OAuthAuthorizationCode(BaseModel):
|
|||
class OAuthAccessToken(BaseModel):
|
||||
uuid = CharField(default=uuid_generator, index=True)
|
||||
application = ForeignKeyField(OAuthApplication)
|
||||
authorized_user = ForeignKeyField(User)
|
||||
authorized_user = QuayUserField()
|
||||
scope = CharField()
|
||||
access_token = CharField(index=True)
|
||||
token_type = CharField(default='Bearer')
|
||||
|
@ -432,7 +502,7 @@ class NotificationKind(BaseModel):
|
|||
class Notification(BaseModel):
|
||||
uuid = CharField(default=uuid_generator, index=True)
|
||||
kind = ForeignKeyField(NotificationKind, index=True)
|
||||
target = ForeignKeyField(User, index=True)
|
||||
target = QuayUserField(index=True)
|
||||
metadata_json = TextField(default='{}')
|
||||
created = DateTimeField(default=datetime.now, index=True)
|
||||
dismissed = BooleanField(default=False)
|
||||
|
|
|
@ -5,7 +5,7 @@ import os
|
|||
from alembic import context
|
||||
from sqlalchemy import engine_from_config, pool
|
||||
from logging.config import fileConfig
|
||||
from urllib import unquote
|
||||
from urllib import unquote, quote
|
||||
from peewee import SqliteDatabase
|
||||
|
||||
from data.database import all_models, db
|
||||
|
@ -24,6 +24,11 @@ if 'GENMIGRATE' in os.environ:
|
|||
else:
|
||||
db_uri = 'postgresql://postgres@%s/genschema' % (docker_host_ip)
|
||||
|
||||
if 'DB_URI' in os.environ:
|
||||
db_uri = os.environ['DB_URI']
|
||||
|
||||
app.config['DB_URI'] = db_uri
|
||||
|
||||
config = context.config
|
||||
config.set_main_option('sqlalchemy.url', db_uri)
|
||||
|
||||
|
@ -69,7 +74,7 @@ def run_migrations_online():
|
|||
|
||||
"""
|
||||
|
||||
if isinstance(db.obj, SqliteDatabase) and not 'GENMIGRATE' in os.environ:
|
||||
if isinstance(db.obj, SqliteDatabase) and not 'GENMIGRATE' in os.environ and not 'DB_URI' in os.environ:
|
||||
print ('Skipping Sqlite migration!')
|
||||
return
|
||||
|
||||
|
|
|
@ -5,8 +5,8 @@ up_mysql() {
|
|||
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=password -d mysql
|
||||
|
||||
# Sleep for 5s to get MySQL get started.
|
||||
echo 'Sleeping for 5...'
|
||||
sleep 5
|
||||
echo 'Sleeping for 10...'
|
||||
sleep 10
|
||||
|
||||
# Add the database to mysql.
|
||||
docker run --rm --link mysql:mysql mysql sh -c 'echo "create database genschema" | mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -ppassword'
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
"""Add log entry kind for verbs
|
||||
|
||||
Revision ID: 204abf14783d
|
||||
Revises: 2430f55c41d5
|
||||
Create Date: 2014-10-29 15:38:06.100915
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '204abf14783d'
|
||||
down_revision = '2430f55c41d5'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
def upgrade(tables):
|
||||
op.bulk_insert(tables.logentrykind,
|
||||
[
|
||||
{'id': 46, 'name':'repo_verb'},
|
||||
])
|
||||
|
||||
|
||||
def downgrade(tables):
|
||||
op.execute(
|
||||
(tables.logentrykind.delete()
|
||||
.where(tables.logentrykind.c.name == op.inline_literal('repo_verb')))
|
||||
|
||||
)
|
|
@ -16,7 +16,9 @@ from util.uncompressedsize import backfill_sizes_from_data
|
|||
|
||||
|
||||
def upgrade(tables):
|
||||
backfill_sizes_from_data()
|
||||
# Note: Doing non-alembic operations inside alembic can cause a deadlock. This call has been
|
||||
# moved to runmigration.sh.
|
||||
pass
|
||||
|
||||
def downgrade(tables):
|
||||
pass
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
"""Add an index to the docker_image_id field
|
||||
|
||||
Revision ID: 313d297811c4
|
||||
Revises: 204abf14783d
|
||||
Create Date: 2014-11-13 12:40:57.414787
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '313d297811c4'
|
||||
down_revision = '204abf14783d'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import mysql
|
||||
|
||||
def upgrade(tables):
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_index('image_docker_image_id', 'image', ['docker_image_id'], unique=False)
|
||||
### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade(tables):
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_index('image_docker_image_id', table_name='image')
|
||||
### end Alembic commands ###
|
|
@ -12,7 +12,6 @@ down_revision = '82297d834ad'
|
|||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import mysql
|
||||
|
||||
def upgrade(tables):
|
||||
op.bulk_insert(tables.logentrykind,
|
||||
|
|
|
@ -17,7 +17,7 @@ from sqlalchemy.dialects import mysql
|
|||
def upgrade(tables):
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
op.add_column('user', sa.Column('invalid_login_attempts', sa.Integer(), nullable=False, server_default="0"))
|
||||
op.add_column('user', sa.Column('last_invalid_login', sa.DateTime(), nullable=False, server_default=sa.func.now()))
|
||||
op.add_column('user', sa.Column('last_invalid_login', sa.DateTime(), nullable=False))
|
||||
### end Alembic commands ###
|
||||
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@ import logging
|
|||
import dateutil.parser
|
||||
import json
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime, timedelta, date
|
||||
|
||||
from data.database import (User, Repository, Image, AccessToken, Role, RepositoryPermission,
|
||||
Visibility, RepositoryTag, EmailConfirmation, FederatedLogin,
|
||||
|
@ -14,7 +14,7 @@ from data.database import (User, Repository, Image, AccessToken, Role, Repositor
|
|||
ExternalNotificationEvent, ExternalNotificationMethod,
|
||||
RepositoryNotification, RepositoryAuthorizedEmail, TeamMemberInvite,
|
||||
DerivedImageStorage, ImageStorageTransformation, random_string_generator,
|
||||
db, BUILD_PHASE)
|
||||
db, BUILD_PHASE, QuayUserField)
|
||||
from peewee import JOIN_LEFT_OUTER, fn
|
||||
from util.validation import (validate_username, validate_email, validate_password,
|
||||
INVALID_PASSWORD_MESSAGE)
|
||||
|
@ -288,6 +288,7 @@ def delete_robot(robot_username):
|
|||
try:
|
||||
robot = User.get(username=robot_username, robot=True)
|
||||
robot.delete_instance(recursive=True, delete_nullable=True)
|
||||
|
||||
except User.DoesNotExist:
|
||||
raise InvalidRobotException('Could not find robot with username: %s' %
|
||||
robot_username)
|
||||
|
@ -632,7 +633,7 @@ def get_matching_users(username_prefix, robot_namespace=None,
|
|||
|
||||
query = (User
|
||||
.select(User.username, User.robot)
|
||||
.group_by(User.username)
|
||||
.group_by(User.username, User.robot)
|
||||
.where(direct_user_query))
|
||||
|
||||
if organization:
|
||||
|
@ -829,8 +830,10 @@ def _filter_to_repos_for_user(query, username=None, namespace=None,
|
|||
if namespace:
|
||||
where_clause = where_clause & (Namespace.username == namespace)
|
||||
|
||||
# TODO(jschorr, jake): Figure out why the old join on Visibility was so darn slow and
|
||||
# remove this hack.
|
||||
if include_public:
|
||||
new_clause = (Visibility.name == 'public')
|
||||
new_clause = (Repository.visibility == _get_public_repo_visibility())
|
||||
if where_clause:
|
||||
where_clause = where_clause | new_clause
|
||||
else:
|
||||
|
@ -839,6 +842,16 @@ def _filter_to_repos_for_user(query, username=None, namespace=None,
|
|||
return query.where(where_clause)
|
||||
|
||||
|
||||
_public_repo_visibility_cache = None
|
||||
def _get_public_repo_visibility():
|
||||
global _public_repo_visibility_cache
|
||||
|
||||
if not _public_repo_visibility_cache:
|
||||
_public_repo_visibility_cache = Visibility.get(name='public')
|
||||
|
||||
return _public_repo_visibility_cache
|
||||
|
||||
|
||||
def get_matching_repositories(repo_term, username=None):
|
||||
namespace_term = repo_term
|
||||
name_term = repo_term
|
||||
|
@ -1059,16 +1072,26 @@ def get_repository(namespace_name, repository_name):
|
|||
return None
|
||||
|
||||
|
||||
def get_repo_image(namespace_name, repository_name, image_id):
|
||||
def get_repo_image(namespace_name, repository_name, docker_image_id):
|
||||
def limit_to_image_id(query):
|
||||
return query.where(Image.docker_image_id == image_id)
|
||||
return query.where(Image.docker_image_id == docker_image_id).limit(1)
|
||||
|
||||
query = _get_repository_images(namespace_name, repository_name, limit_to_image_id)
|
||||
try:
|
||||
return query.get()
|
||||
except Image.DoesNotExist:
|
||||
return None
|
||||
|
||||
|
||||
def get_repo_image_extended(namespace_name, repository_name, docker_image_id):
|
||||
def limit_to_image_id(query):
|
||||
return query.where(Image.docker_image_id == docker_image_id).limit(1)
|
||||
|
||||
images = _get_repository_images_base(namespace_name, repository_name, limit_to_image_id)
|
||||
if not images:
|
||||
return None
|
||||
else:
|
||||
return images[0]
|
||||
|
||||
return images[0]
|
||||
|
||||
def repository_is_public(namespace_name, repository_name):
|
||||
try:
|
||||
|
@ -1161,20 +1184,21 @@ def __translate_ancestry(old_ancestry, translations, repository, username, prefe
|
|||
if old_ancestry == '/':
|
||||
return '/'
|
||||
|
||||
def translate_id(old_id):
|
||||
def translate_id(old_id, docker_image_id):
|
||||
logger.debug('Translating id: %s', old_id)
|
||||
if old_id not in translations:
|
||||
# Figure out which docker_image_id the old id refers to, then find a
|
||||
# a local one
|
||||
old = Image.select(Image.docker_image_id).where(Image.id == old_id).get()
|
||||
image_in_repo = find_create_or_link_image(old.docker_image_id, repository, username,
|
||||
image_in_repo = find_create_or_link_image(docker_image_id, repository, username,
|
||||
translations, preferred_location)
|
||||
translations[old_id] = image_in_repo.id
|
||||
|
||||
return translations[old_id]
|
||||
|
||||
# Select all the ancestor Docker IDs in a single query.
|
||||
old_ids = [int(id_str) for id_str in old_ancestry.split('/')[1:-1]]
|
||||
new_ids = [str(translate_id(old_id)) for old_id in old_ids]
|
||||
query = Image.select(Image.id, Image.docker_image_id).where(Image.id << old_ids)
|
||||
old_images = {i.id: i.docker_image_id for i in query}
|
||||
|
||||
# Translate the old images into new ones.
|
||||
new_ids = [str(translate_id(old_id, old_images[old_id])) for old_id in old_ids]
|
||||
return '/%s/' % '/'.join(new_ids)
|
||||
|
||||
|
||||
|
@ -1186,36 +1210,22 @@ def _create_storage(location_name):
|
|||
return storage
|
||||
|
||||
|
||||
def find_create_or_link_image(docker_image_id, repository, username, translations,
|
||||
preferred_location):
|
||||
def _find_or_link_image(existing_image, repository, username, translations, preferred_location):
|
||||
# TODO(jake): This call is currently recursively done under a single transaction. Can we make
|
||||
# it instead be done under a set of transactions?
|
||||
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
||||
# Check for an existing image, under the transaction, to make sure it doesn't already exist.
|
||||
repo_image = get_repo_image(repository.namespace_user.username, repository.name,
|
||||
docker_image_id)
|
||||
existing_image.docker_image_id)
|
||||
if repo_image:
|
||||
return repo_image
|
||||
|
||||
query = (Image
|
||||
.select(Image, ImageStorage)
|
||||
.distinct()
|
||||
.join(ImageStorage)
|
||||
.switch(Image)
|
||||
.join(Repository)
|
||||
.join(Visibility)
|
||||
.switch(Repository)
|
||||
.join(RepositoryPermission, JOIN_LEFT_OUTER)
|
||||
.switch(Repository)
|
||||
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
||||
.where(ImageStorage.uploading == False))
|
||||
|
||||
query = (_filter_to_repos_for_user(query, username)
|
||||
.where(Image.docker_image_id == docker_image_id))
|
||||
|
||||
new_image_ancestry = '/'
|
||||
origin_image_id = None
|
||||
# Make sure the existing base image still exists.
|
||||
try:
|
||||
to_copy = query.get()
|
||||
to_copy = Image.select().join(ImageStorage).where(Image.id == existing_image.id).get()
|
||||
|
||||
msg = 'Linking image to existing storage with docker id: %s and uuid: %s'
|
||||
logger.debug(msg, docker_image_id, to_copy.storage.uuid)
|
||||
logger.debug(msg, existing_image.docker_image_id, to_copy.storage.uuid)
|
||||
|
||||
new_image_ancestry = __translate_ancestry(to_copy.ancestors, translations, repository,
|
||||
username, preferred_location)
|
||||
|
@ -1223,25 +1233,71 @@ def find_create_or_link_image(docker_image_id, repository, username, translation
|
|||
storage = to_copy.storage
|
||||
storage.locations = {placement.location.name
|
||||
for placement in storage.imagestorageplacement_set}
|
||||
origin_image_id = to_copy.id
|
||||
|
||||
new_image = Image.create(docker_image_id=existing_image.docker_image_id,
|
||||
repository=repository, storage=storage,
|
||||
ancestors=new_image_ancestry)
|
||||
|
||||
logger.debug('Storing translation %s -> %s', existing_image.id, new_image.id)
|
||||
translations[existing_image.id] = new_image.id
|
||||
return new_image
|
||||
except Image.DoesNotExist:
|
||||
logger.debug('Creating new storage for docker id: %s', docker_image_id)
|
||||
storage = _create_storage(preferred_location)
|
||||
|
||||
logger.debug('Storage locations: %s', storage.locations)
|
||||
|
||||
new_image = Image.create(docker_image_id=docker_image_id,
|
||||
repository=repository, storage=storage,
|
||||
ancestors=new_image_ancestry)
|
||||
|
||||
logger.debug('new_image storage locations: %s', new_image.storage.locations)
|
||||
return None
|
||||
|
||||
|
||||
if origin_image_id:
|
||||
logger.debug('Storing translation %s -> %s', origin_image_id, new_image.id)
|
||||
translations[origin_image_id] = new_image.id
|
||||
def find_create_or_link_image(docker_image_id, repository, username, translations,
|
||||
preferred_location):
|
||||
|
||||
return new_image
|
||||
# First check for the image existing in the repository. If found, we simply return it.
|
||||
repo_image = get_repo_image(repository.namespace_user.username, repository.name,
|
||||
docker_image_id)
|
||||
if repo_image:
|
||||
return repo_image
|
||||
|
||||
# We next check to see if there is an existing storage the new image can link to.
|
||||
existing_image_query = (Image
|
||||
.select(Image, ImageStorage)
|
||||
.distinct()
|
||||
.join(ImageStorage)
|
||||
.switch(Image)
|
||||
.join(Repository)
|
||||
.join(RepositoryPermission, JOIN_LEFT_OUTER)
|
||||
.switch(Repository)
|
||||
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
||||
.where(ImageStorage.uploading == False))
|
||||
|
||||
existing_image_query = (_filter_to_repos_for_user(existing_image_query, username)
|
||||
.where(Image.docker_image_id == docker_image_id))
|
||||
|
||||
# If there is an existing image, we try to translate its ancestry and copy its storage.
|
||||
new_image = None
|
||||
try:
|
||||
logger.debug('Looking up existing image for ID: %s', docker_image_id)
|
||||
existing_image = existing_image_query.get()
|
||||
|
||||
logger.debug('Existing image %s found for ID: %s', existing_image.id, docker_image_id)
|
||||
new_image = _find_or_link_image(existing_image, repository, username, translations,
|
||||
preferred_location)
|
||||
if new_image:
|
||||
return new_image
|
||||
except Image.DoesNotExist:
|
||||
logger.debug('No existing image found for ID: %s', docker_image_id)
|
||||
pass
|
||||
|
||||
# Otherwise, create a new storage directly.
|
||||
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
||||
# Final check for an existing image, under the transaction.
|
||||
repo_image = get_repo_image(repository.namespace_user.username, repository.name,
|
||||
docker_image_id)
|
||||
if repo_image:
|
||||
return repo_image
|
||||
|
||||
logger.debug('Creating new storage for docker id: %s', docker_image_id)
|
||||
storage = _create_storage(preferred_location)
|
||||
|
||||
return Image.create(docker_image_id=docker_image_id,
|
||||
repository=repository, storage=storage,
|
||||
ancestors='/')
|
||||
|
||||
|
||||
def find_or_create_derived_storage(source, transformation_name, preferred_location):
|
||||
|
@ -1355,6 +1411,15 @@ def set_image_metadata(docker_image_id, namespace_name, repository_name, created
|
|||
fetched.storage.save()
|
||||
return fetched
|
||||
|
||||
def _get_repository_images(namespace_name, repository_name, query_modifier):
|
||||
query = (Image
|
||||
.select()
|
||||
.join(Repository)
|
||||
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
||||
.where(Repository.name == repository_name, Namespace.username == namespace_name))
|
||||
|
||||
query = query_modifier(query)
|
||||
return query
|
||||
|
||||
def _get_repository_images_base(namespace_name, repository_name, query_modifier):
|
||||
query = (ImageStoragePlacement
|
||||
|
@ -1391,6 +1456,20 @@ def _get_repository_images_base(namespace_name, repository_name, query_modifier)
|
|||
return images.values()
|
||||
|
||||
|
||||
def lookup_repository_images(namespace_name, repository_name, docker_image_ids):
|
||||
return (Image
|
||||
.select()
|
||||
.join(Repository)
|
||||
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
||||
.where(Repository.name == repository_name, Namespace.username == namespace_name,
|
||||
Image.docker_image_id << docker_image_ids))
|
||||
|
||||
def get_matching_repository_images(namespace_name, repository_name, docker_image_ids):
|
||||
def modify_query(q):
|
||||
return q.where(Image.docker_image_id << docker_image_ids)
|
||||
|
||||
return _get_repository_images_base(namespace_name, repository_name, modify_query)
|
||||
|
||||
def get_repository_images(namespace_name, repository_name):
|
||||
return _get_repository_images_base(namespace_name, repository_name, lambda q: q)
|
||||
|
||||
|
@ -1406,7 +1485,12 @@ def list_repository_tags(namespace_name, repository_name):
|
|||
|
||||
|
||||
def garbage_collect_repository(namespace_name, repository_name):
|
||||
storage_id_whitelist = {}
|
||||
|
||||
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
||||
# TODO (jake): We could probably select this and all the images in a single query using
|
||||
# a different kind of join.
|
||||
|
||||
# Get a list of all images used by tags in the repository
|
||||
tag_query = (RepositoryTag
|
||||
.select(RepositoryTag, Image, ImageStorage)
|
||||
|
@ -1425,29 +1509,31 @@ def garbage_collect_repository(namespace_name, repository_name):
|
|||
referenced_anscestors = referenced_anscestors.union(set(ancestor_list))
|
||||
referenced_anscestors.add(tag.image.id)
|
||||
|
||||
all_repo_images = get_repository_images(namespace_name, repository_name)
|
||||
all_repo_images = _get_repository_images(namespace_name, repository_name, lambda q: q)
|
||||
all_images = {int(img.id): img for img in all_repo_images}
|
||||
to_remove = set(all_images.keys()).difference(referenced_anscestors)
|
||||
|
||||
if len(to_remove) > 0:
|
||||
logger.info('Cleaning up unreferenced images: %s', to_remove)
|
||||
storage_id_whitelist = {all_images[to_remove_id].storage.id for to_remove_id in to_remove}
|
||||
|
||||
Image.delete().where(Image.id << list(to_remove)).execute()
|
||||
|
||||
garbage_collect_storage(storage_id_whitelist)
|
||||
if len(to_remove) > 0:
|
||||
logger.info('Garbage collecting storage for images: %s', to_remove)
|
||||
garbage_collect_storage(storage_id_whitelist)
|
||||
|
||||
return len(to_remove)
|
||||
|
||||
|
||||
def garbage_collect_storage(storage_id_whitelist):
|
||||
# We are going to make the conscious decision to not delete image storage inside the transaction
|
||||
# This may end up producing garbage in s3, trading off for higher availability in the database
|
||||
if len(storage_id_whitelist) == 0:
|
||||
return
|
||||
|
||||
def placements_query_to_paths_set(placements_query):
|
||||
return {(placement.location.name, config.store.image_path(placement.storage.uuid))
|
||||
for placement in placements_query}
|
||||
|
||||
def orphaned_storage_query(select_base_query, candidates):
|
||||
def orphaned_storage_query(select_base_query, candidates, group_by):
|
||||
return (select_base_query
|
||||
.switch(ImageStorage)
|
||||
.join(Image, JOIN_LEFT_OUTER)
|
||||
|
@ -1455,14 +1541,19 @@ def garbage_collect_storage(storage_id_whitelist):
|
|||
.join(DerivedImageStorage, JOIN_LEFT_OUTER,
|
||||
on=(ImageStorage.id == DerivedImageStorage.derivative))
|
||||
.where(ImageStorage.id << list(candidates))
|
||||
.group_by(ImageStorage)
|
||||
.group_by(*group_by)
|
||||
.having((fn.Count(Image.id) == 0) & (fn.Count(DerivedImageStorage.id) == 0)))
|
||||
|
||||
logger.debug('Garbage collecting storage from candidates: %s', storage_id_whitelist)
|
||||
# Note: We remove the derived image storage in its own transaction as a way to reduce the
|
||||
# time that the transaction holds on the database indicies. This could result in a derived
|
||||
# image storage being deleted for an image storage which is later reused during this time,
|
||||
# but since these are caches anyway, it isn't terrible and worth the tradeoff (for now).
|
||||
logger.debug('Garbage collecting derived storage from candidates: %s', storage_id_whitelist)
|
||||
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
||||
# Find out which derived storages will be removed, and add them to the whitelist
|
||||
orphaned_from_candidates = list(orphaned_storage_query(ImageStorage.select(ImageStorage.id),
|
||||
storage_id_whitelist))
|
||||
storage_id_whitelist,
|
||||
(ImageStorage.id,)))
|
||||
|
||||
if len(orphaned_from_candidates) > 0:
|
||||
derived_to_remove = (ImageStorage
|
||||
|
@ -1478,6 +1569,12 @@ def garbage_collect_storage(storage_id_whitelist):
|
|||
.where(DerivedImageStorage.source << orphaned_from_candidates)
|
||||
.execute())
|
||||
|
||||
# Note: Both of these deletes must occur in the same transaction (unfortunately) because a
|
||||
# storage without any placement is invalid, and a placement cannot exist without a storage.
|
||||
# TODO(jake): We might want to allow for null storages on placements, which would allow us to
|
||||
# delete the storages, then delete the placements in a non-transaction.
|
||||
logger.debug('Garbage collecting storages from candidates: %s', storage_id_whitelist)
|
||||
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
||||
# Track all of the data that should be removed from blob storage
|
||||
placements_to_remove = orphaned_storage_query(ImageStoragePlacement
|
||||
.select(ImageStoragePlacement,
|
||||
|
@ -1486,7 +1583,10 @@ def garbage_collect_storage(storage_id_whitelist):
|
|||
.join(ImageStorageLocation)
|
||||
.switch(ImageStoragePlacement)
|
||||
.join(ImageStorage),
|
||||
storage_id_whitelist)
|
||||
storage_id_whitelist,
|
||||
(ImageStorage, ImageStoragePlacement,
|
||||
ImageStorageLocation))
|
||||
|
||||
paths_to_remove = placements_query_to_paths_set(placements_to_remove.clone())
|
||||
|
||||
# Remove the placements for orphaned storages
|
||||
|
@ -1499,14 +1599,17 @@ def garbage_collect_storage(storage_id_whitelist):
|
|||
|
||||
# Remove the all orphaned storages
|
||||
orphaned_storages = list(orphaned_storage_query(ImageStorage.select(ImageStorage.id),
|
||||
storage_id_whitelist))
|
||||
storage_id_whitelist,
|
||||
(ImageStorage.id,)))
|
||||
if len(orphaned_storages) > 0:
|
||||
(ImageStorage
|
||||
.delete()
|
||||
.where(ImageStorage.id << orphaned_storages)
|
||||
.execute())
|
||||
|
||||
# Delete the actual blob storage
|
||||
# We are going to make the conscious decision to not delete image storage blobs inside
|
||||
# transactions.
|
||||
# This may end up producing garbage in s3, trading off for higher availability in the database.
|
||||
for location_name, image_path in paths_to_remove:
|
||||
logger.debug('Removing %s from %s', image_path, location_name)
|
||||
config.store.remove({location_name}, image_path)
|
||||
|
@ -1527,7 +1630,7 @@ def get_tag_image(namespace_name, repository_name, tag_name):
|
|||
|
||||
|
||||
def get_image_by_id(namespace_name, repository_name, docker_image_id):
|
||||
image = get_repo_image(namespace_name, repository_name, docker_image_id)
|
||||
image = get_repo_image_extended(namespace_name, repository_name, docker_image_id)
|
||||
if not image:
|
||||
raise DataModelException('Unable to find image \'%s\' for repo \'%s/%s\'' %
|
||||
(docker_image_id, namespace_name, repository_name))
|
||||
|
@ -1714,7 +1817,7 @@ def purge_repository(namespace_name, repository_name):
|
|||
|
||||
# Delete the rest of the repository metadata
|
||||
fetched = _get_repository(namespace_name, repository_name)
|
||||
fetched.delete_instance(recursive=True)
|
||||
fetched.delete_instance(recursive=True, delete_nullable=True)
|
||||
|
||||
|
||||
def get_private_repo_count(username):
|
||||
|
@ -1758,11 +1861,10 @@ def get_repository_delegate_tokens(namespace_name, repository_name):
|
|||
|
||||
def get_repo_delegate_token(namespace_name, repository_name, code):
|
||||
repo_query = get_repository_delegate_tokens(namespace_name, repository_name)
|
||||
found = list(repo_query.where(AccessToken.code == code))
|
||||
|
||||
if found:
|
||||
return found[0]
|
||||
else:
|
||||
try:
|
||||
return repo_query.where(AccessToken.code == code).get()
|
||||
except AccessToken.DoesNotExist:
|
||||
raise InvalidTokenException('Unable to find token with code: %s' % code)
|
||||
|
||||
|
||||
|
@ -1937,9 +2039,9 @@ def list_logs(start_time, end_time, performer=None, repository=None, namespace=N
|
|||
if namespace:
|
||||
joined = joined.where(User.username == namespace)
|
||||
|
||||
return joined.where(
|
||||
return list(joined.where(
|
||||
LogEntry.datetime >= start_time,
|
||||
LogEntry.datetime < end_time).order_by(LogEntry.datetime.desc())
|
||||
LogEntry.datetime < end_time).order_by(LogEntry.datetime.desc()))
|
||||
|
||||
|
||||
def log_action(kind_name, user_or_organization_name, performer=None,
|
||||
|
@ -1951,7 +2053,7 @@ def log_action(kind_name, user_or_organization_name, performer=None,
|
|||
kind = LogEntryKind.get(LogEntryKind.name == kind_name)
|
||||
account = User.get(User.username == user_or_organization_name)
|
||||
LogEntry.create(kind=kind, account=account, performer=performer,
|
||||
repository=repository, access_token=access_token, ip=ip,
|
||||
repository=repository, ip=ip,
|
||||
metadata_json=json.dumps(metadata), datetime=timestamp)
|
||||
|
||||
|
||||
|
@ -2239,6 +2341,18 @@ def confirm_team_invite(code, user):
|
|||
found.delete_instance()
|
||||
return (team, inviter)
|
||||
|
||||
|
||||
def get_repository_usage():
|
||||
one_month_ago = date.today() - timedelta(weeks=4)
|
||||
repo_pull = LogEntryKind.get(name = 'pull_repo')
|
||||
repo_verb = LogEntryKind.get(name = 'repo_verb')
|
||||
return (LogEntry.select(LogEntry.ip, LogEntry.repository)
|
||||
.where((LogEntry.kind == repo_pull) | (LogEntry.kind == repo_verb))
|
||||
.where(~(LogEntry.repository >> None))
|
||||
.where(LogEntry.datetime >= one_month_ago)
|
||||
.group_by(LogEntry.ip, LogEntry.repository)
|
||||
.count())
|
||||
|
||||
def archivable_buildlogs_query():
|
||||
presumed_dead_date = datetime.utcnow() - PRESUMED_DEAD_BUILD_AGE
|
||||
return (RepositoryBuild.select()
|
||||
|
|
|
@ -42,10 +42,10 @@ class WorkQueue(object):
|
|||
return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list)
|
||||
|
||||
def update_metrics(self):
|
||||
if self._reporter is None:
|
||||
return
|
||||
|
||||
with self._transaction_factory(db):
|
||||
if self._reporter is None:
|
||||
return
|
||||
|
||||
now = datetime.utcnow()
|
||||
name_match_query = self._name_match_query()
|
||||
|
||||
|
@ -55,7 +55,7 @@ class WorkQueue(object):
|
|||
avialable_query = self._available_jobs(now, name_match_query, running_query)
|
||||
available_count = avialable_query.select(QueueItem.queue_name).distinct().count()
|
||||
|
||||
self._reporter(self._currently_processing, running_count, running_count + available_count)
|
||||
self._reporter(self._currently_processing, running_count, running_count + available_count)
|
||||
|
||||
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
|
||||
"""
|
||||
|
|
Reference in a new issue