Merge branch 'master' into star

This commit is contained in:
Jimmy Zelinskie 2015-02-18 17:36:58 -05:00
commit 917dd6b674
229 changed files with 10807 additions and 3003 deletions

View file

@ -1,14 +1,15 @@
import string
import logging
import uuid
import time
from random import SystemRandom
from datetime import datetime
from peewee import (Proxy, MySQLDatabase, SqliteDatabase, PostgresqlDatabase, fn, CharField,
BooleanField, IntegerField, DateTimeField, ForeignKeyField, TextField,
BigIntegerField)
from peewee import *
from data.read_slave import ReadSlaveModel
from sqlalchemy.engine.url import make_url
from data.read_slave import ReadSlaveModel
from util.names import urn_generator
@ -31,6 +32,16 @@ SCHEME_RANDOM_FUNCTION = {
'postgresql+psycopg2': fn.Random,
}
def real_for_update(query):
return query.for_update()
def null_for_update(query):
return query
SCHEME_SPECIALIZED_FOR_UPDATE = {
'sqlite': null_for_update,
}
class CallableProxy(Proxy):
def __call__(self, *args, **kwargs):
if self.obj is None:
@ -70,6 +81,15 @@ class UseThenDisconnect(object):
db = Proxy()
read_slave = Proxy()
db_random_func = CallableProxy()
db_for_update = CallableProxy()
def validate_database_url(url, connect_timeout=5):
driver = _db_from_url(url, {
'connect_timeout': connect_timeout
})
driver.connect()
driver.close()
def _db_from_url(url, db_kwargs):
@ -84,6 +104,10 @@ def _db_from_url(url, db_kwargs):
if parsed_url.password:
db_kwargs['password'] = parsed_url.password
# Note: sqlite does not support connect_timeout.
if parsed_url.drivername == 'sqlite' and 'connect_timeout' in db_kwargs:
del db_kwargs['connect_timeout']
return SCHEME_DRIVERS[parsed_url.drivername](parsed_url.database, **db_kwargs)
@ -95,6 +119,8 @@ def configure(config_object):
parsed_write_uri = make_url(write_db_uri)
db_random_func.initialize(SCHEME_RANDOM_FUNCTION[parsed_write_uri.drivername])
db_for_update.initialize(SCHEME_SPECIALIZED_FOR_UPDATE.get(parsed_write_uri.drivername,
real_for_update))
read_slave_uri = config_object.get('DB_READ_SLAVE_URI', None)
if read_slave_uri is not None:
@ -113,6 +139,9 @@ def uuid_generator():
return str(uuid.uuid4())
_get_epoch_timestamp = lambda: int(time.time())
def close_db_filter(_):
if not db.is_closed():
logger.debug('Disconnecting from database.')
@ -124,8 +153,9 @@ def close_db_filter(_):
class QuayUserField(ForeignKeyField):
def __init__(self, allows_robots=False, *args, **kwargs):
def __init__(self, allows_robots=False, robot_null_delete=False, *args, **kwargs):
self.allows_robots = allows_robots
self.robot_null_delete = robot_null_delete
if not 'rel_model' in kwargs:
kwargs['rel_model'] = User
@ -151,6 +181,7 @@ class User(BaseModel):
invoice_email = BooleanField(default=False)
invalid_login_attempts = IntegerField(default=0)
last_invalid_login = DateTimeField(default=datetime.utcnow)
removed_tag_expiration_s = IntegerField(default=1209600) # Two weeks
def delete_instance(self, recursive=False, delete_nullable=False):
# If we are deleting a robot account, only execute the subset of queries necessary.
@ -159,7 +190,11 @@ class User(BaseModel):
for query, fk in self.dependencies(search_nullable=True):
if isinstance(fk, QuayUserField) and fk.allows_robots:
model = fk.model_class
model.delete().where(query).execute()
if fk.robot_null_delete:
model.update(**{fk.name: None}).where(query).execute()
else:
model.delete().where(query).execute()
# Delete the instance itself.
super(User, self).delete_instance(recursive=False, delete_nullable=False)
@ -319,6 +354,10 @@ class PermissionPrototype(BaseModel):
)
class AccessTokenKind(BaseModel):
name = CharField(unique=True, index=True)
class AccessToken(BaseModel):
friendly_name = CharField(null=True)
code = CharField(default=random_string_generator(length=64), unique=True,
@ -327,6 +366,7 @@ class AccessToken(BaseModel):
created = DateTimeField(default=datetime.now)
role = ForeignKeyField(Role)
temporary = BooleanField(default=True)
kind = ForeignKeyField(AccessTokenKind, null=True)
class BuildTriggerService(BaseModel):
@ -368,6 +408,24 @@ class ImageStorageTransformation(BaseModel):
name = CharField(index=True, unique=True)
class ImageStorageSignatureKind(BaseModel):
name = CharField(index=True, unique=True)
class ImageStorageSignature(BaseModel):
storage = ForeignKeyField(ImageStorage, index=True)
kind = ForeignKeyField(ImageStorageSignatureKind)
signature = TextField(null=True)
uploading = BooleanField(default=True, null=True)
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
(('kind', 'storage'), True),
)
class DerivedImageStorage(BaseModel):
source = ForeignKeyField(ImageStorage, null=True, related_name='source')
derivative = ForeignKeyField(ImageStorage, related_name='derivative')
@ -424,12 +482,15 @@ class RepositoryTag(BaseModel):
name = CharField()
image = ForeignKeyField(Image)
repository = ForeignKeyField(Repository)
lifetime_start_ts = IntegerField(default=_get_epoch_timestamp)
lifetime_end_ts = IntegerField(null=True, index=True)
hidden = BooleanField(default=False)
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
(('repository', 'name'), True),
(('repository', 'name'), False),
)
@ -441,23 +502,10 @@ class BUILD_PHASE(object):
PULLING = 'pulling'
BUILDING = 'building'
PUSHING = 'pushing'
WAITING = 'waiting'
COMPLETE = 'complete'
class RepositoryBuild(BaseModel):
uuid = CharField(default=uuid_generator, index=True)
repository = ForeignKeyField(Repository, index=True)
access_token = ForeignKeyField(AccessToken)
resource_key = CharField(index=True)
job_config = TextField()
phase = CharField(default='waiting')
started = DateTimeField(default=datetime.now)
display_name = CharField()
trigger = ForeignKeyField(RepositoryBuildTrigger, null=True, index=True)
pull_robot = QuayUserField(null=True, related_name='buildpullrobot')
logs_archived = BooleanField(default=False)
class QueueItem(BaseModel):
queue_name = CharField(index=True, max_length=1024)
body = TextField()
@ -467,6 +515,21 @@ class QueueItem(BaseModel):
retries_remaining = IntegerField(default=5)
class RepositoryBuild(BaseModel):
uuid = CharField(default=uuid_generator, index=True)
repository = ForeignKeyField(Repository, index=True)
access_token = ForeignKeyField(AccessToken)
resource_key = CharField(index=True)
job_config = TextField()
phase = CharField(default=BUILD_PHASE.WAITING)
started = DateTimeField(default=datetime.now)
display_name = CharField()
trigger = ForeignKeyField(RepositoryBuildTrigger, null=True, index=True)
pull_robot = QuayUserField(null=True, related_name='buildpullrobot')
logs_archived = BooleanField(default=False)
queue_item = ForeignKeyField(QueueItem, null=True, index=True)
class LogEntryKind(BaseModel):
name = CharField(index=True, unique=True)
@ -475,7 +538,7 @@ class LogEntry(BaseModel):
kind = ForeignKeyField(LogEntryKind, index=True)
account = QuayUserField(index=True, related_name='account')
performer = QuayUserField(allows_robots=True, index=True, null=True,
related_name='performer')
related_name='performer', robot_null_delete=True)
repository = ForeignKeyField(Repository, index=True, null=True)
datetime = DateTimeField(default=datetime.now, index=True)
ip = CharField(null=True)
@ -566,4 +629,5 @@ all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission,
Notification, ImageStorageLocation, ImageStoragePlacement,
ExternalNotificationEvent, ExternalNotificationMethod, RepositoryNotification,
RepositoryAuthorizedEmail, ImageStorageTransformation, DerivedImageStorage,
TeamMemberInvite, Star]
TeamMemberInvite, ImageStorageSignature, ImageStorageSignatureKind,
AccessTokenKind, Star]

View file

@ -18,7 +18,8 @@ config.set_main_option('sqlalchemy.url', unquote(app.config['DB_URI']))
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
if config.config_file_name:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support

View file

@ -2,13 +2,14 @@ set -e
DOCKER_IP=`echo $DOCKER_HOST | sed 's/tcp:\/\///' | sed 's/:.*//'`
MYSQL_CONFIG_OVERRIDE="{\"DB_URI\":\"mysql+pymysql://root:password@$DOCKER_IP/genschema\"}"
PERCONA_CONFIG_OVERRIDE="{\"DB_URI\":\"mysql+pymysql://root@$DOCKER_IP/genschema\"}"
PGSQL_CONFIG_OVERRIDE="{\"DB_URI\":\"postgresql://postgres@$DOCKER_IP/genschema\"}"
up_mysql() {
# Run a SQL database on port 3306 inside of Docker.
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=password -d mysql
# Sleep for 5s to get MySQL get started.
# Sleep for 10s to get MySQL get started.
echo 'Sleeping for 10...'
sleep 10
@ -21,6 +22,40 @@ down_mysql() {
docker rm mysql
}
up_mariadb() {
# Run a SQL database on port 3306 inside of Docker.
docker run --name mariadb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=password -d mariadb
# Sleep for 10s to get MySQL get started.
echo 'Sleeping for 10...'
sleep 10
# Add the database to mysql.
docker run --rm --link mariadb:mariadb mariadb sh -c 'echo "create database genschema" | mysql -h"$MARIADB_PORT_3306_TCP_ADDR" -P"$MARIADB_PORT_3306_TCP_PORT" -uroot -ppassword'
}
down_mariadb() {
docker kill mariadb
docker rm mariadb
}
up_percona() {
# Run a SQL database on port 3306 inside of Docker.
docker run --name percona -p 3306:3306 -d dockerfile/percona
# Sleep for 10s
echo 'Sleeping for 10...'
sleep 10
# Add the daabase to mysql.
docker run --rm --link percona:percona dockerfile/percona sh -c 'echo "create database genschema" | mysql -h $PERCONA_PORT_3306_TCP_ADDR'
}
down_percona() {
docker kill percona
docker rm percona
}
up_postgres() {
# Run a SQL database on port 5432 inside of Docker.
docker run --name postgres -p 5432:5432 -d postgres
@ -73,6 +108,26 @@ test_migrate $MYSQL_CONFIG_OVERRIDE
set -e
down_mysql
# Test via MariaDB.
echo '> Starting MariaDB'
up_mariadb
echo '> Testing Migration (mariadb)'
set +e
test_migrate $MYSQL_CONFIG_OVERRIDE
set -e
down_mariadb
# Test via Percona.
echo '> Starting Percona'
up_percona
echo '> Testing Migration (percona)'
set +e
test_migrate $PERCONA_CONFIG_OVERRIDE
set -e
down_percona
# Test via Postgres.
echo '> Starting Postgres'
up_postgres

View file

@ -0,0 +1,30 @@
"""Add build queue item reference to the repositorybuild table
Revision ID: 14fe12ade3df
Revises: 5ad999136045
Create Date: 2015-02-12 16:11:57.814645
"""
# revision identifiers, used by Alembic.
revision = '14fe12ade3df'
down_revision = '5ad999136045'
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
def upgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.add_column('repositorybuild', sa.Column('queue_item_id', sa.Integer(), nullable=True))
op.create_index('repositorybuild_queue_item_id', 'repositorybuild', ['queue_item_id'], unique=False)
op.create_foreign_key(op.f('fk_repositorybuild_queue_item_id_queueitem'), 'repositorybuild', 'queueitem', ['queue_item_id'], ['id'])
### end Alembic commands ###
def downgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(op.f('fk_repositorybuild_queue_item_id_queueitem'), 'repositorybuild', type_='foreignkey')
op.drop_index('repositorybuild_queue_item_id', table_name='repositorybuild')
op.drop_column('repositorybuild', 'queue_item_id')
### end Alembic commands ###

View file

@ -0,0 +1,37 @@
"""Actually remove the column access_token_id
Revision ID: 1d2d86d09fcd
Revises: 14fe12ade3df
Create Date: 2015-02-12 16:27:30.260797
"""
# revision identifiers, used by Alembic.
revision = '1d2d86d09fcd'
down_revision = '14fe12ade3df'
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
from sqlalchemy.exc import InternalError
def upgrade(tables):
### commands auto generated by Alembic - please adjust! ###
try:
op.drop_constraint(u'fk_logentry_access_token_id_accesstoken', 'logentry', type_='foreignkey')
op.drop_index('logentry_access_token_id', table_name='logentry')
op.drop_column('logentry', 'access_token_id')
except InternalError:
pass
### end Alembic commands ###
def downgrade(tables):
### commands auto generated by Alembic - please adjust! ###
try:
op.add_column('logentry', sa.Column('access_token_id', mysql.INTEGER(display_width=11), autoincrement=False, nullable=True))
op.create_foreign_key(u'fk_logentry_access_token_id_accesstoken', 'logentry', 'accesstoken', ['access_token_id'], ['id'])
op.create_index('logentry_access_token_id', 'logentry', ['access_token_id'], unique=False)
except InternalError:
pass
### end Alembic commands ###

View file

@ -0,0 +1,25 @@
"""mysql max index lengths
Revision ID: 228d1af6af1c
Revises: 5b84373e5db
Create Date: 2015-01-06 14:35:24.651424
"""
# revision identifiers, used by Alembic.
revision = '228d1af6af1c'
down_revision = '5b84373e5db'
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
def upgrade(tables):
op.drop_index('queueitem_queue_name', table_name='queueitem')
op.create_index('queueitem_queue_name', 'queueitem', ['queue_name'], unique=False, mysql_length=767)
op.drop_index('image_ancestors', table_name='image')
op.create_index('image_ancestors', 'image', ['ancestors'], unique=False, mysql_length=767)
def downgrade(tables):
pass

View file

@ -0,0 +1,44 @@
"""Add access token kinds type
Revision ID: 3e2d38b52a75
Revises: 1d2d86d09fcd
Create Date: 2015-02-17 12:03:26.422485
"""
# revision identifiers, used by Alembic.
revision = '3e2d38b52a75'
down_revision = '1d2d86d09fcd'
from alembic import op
import sqlalchemy as sa
def upgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.create_table('accesstokenkind',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.PrimaryKeyConstraint('id', name=op.f('pk_accesstokenkind'))
)
op.create_index('accesstokenkind_name', 'accesstokenkind', ['name'], unique=True)
op.add_column(u'accesstoken', sa.Column('kind_id', sa.Integer(), nullable=True))
op.create_index('accesstoken_kind_id', 'accesstoken', ['kind_id'], unique=False)
op.create_foreign_key(op.f('fk_accesstoken_kind_id_accesstokenkind'), 'accesstoken', 'accesstokenkind', ['kind_id'], ['id'])
### end Alembic commands ###
op.bulk_insert(tables.accesstokenkind,
[
{'id': 1, 'name':'build-worker'},
{'id': 2, 'name':'pushpull-token'},
])
def downgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(op.f('fk_accesstoken_kind_id_accesstokenkind'), 'accesstoken', type_='foreignkey')
op.drop_index('accesstoken_kind_id', table_name='accesstoken')
op.drop_column(u'accesstoken', 'kind_id')
op.drop_index('accesstokenkind_name', table_name='accesstokenkind')
op.drop_table('accesstokenkind')
### end Alembic commands ###

View file

@ -0,0 +1,26 @@
"""Allow tags to be marked as hidden.
Revision ID: 4ef04c61fcf9
Revises: 509d2857566f
Create Date: 2015-02-18 16:34:16.586129
"""
# revision identifiers, used by Alembic.
revision = '4ef04c61fcf9'
down_revision = '509d2857566f'
from alembic import op
import sqlalchemy as sa
def upgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.add_column('repositorytag', sa.Column('hidden', sa.Boolean(), nullable=False, server_default=sa.sql.expression.false()))
### end Alembic commands ###
def downgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.drop_column('repositorytag', 'hidden')
### end Alembic commands ###

View file

@ -0,0 +1,36 @@
"""Track the lifetime start and end for tags to allow the state of a repository to be rewound.
Revision ID: 509d2857566f
Revises: 3e2d38b52a75
Create Date: 2015-02-13 14:35:38.939049
"""
# revision identifiers, used by Alembic.
revision = '509d2857566f'
down_revision = '3e2d38b52a75'
from alembic import op
import sqlalchemy as sa
def upgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.add_column('repositorytag', sa.Column('lifetime_end_ts', sa.Integer(), nullable=True))
op.add_column('repositorytag', sa.Column('lifetime_start_ts', sa.Integer(), nullable=False, server_default="0"))
op.create_index('repositorytag_lifetime_end_ts', 'repositorytag', ['lifetime_end_ts'], unique=False)
op.drop_index('repositorytag_repository_id_name', table_name='repositorytag')
op.create_index('repositorytag_repository_id_name', 'repositorytag', ['repository_id', 'name'], unique=False)
op.add_column('user', sa.Column('removed_tag_expiration_s', sa.Integer(), nullable=False, server_default="1209600"))
### end Alembic commands ###
def downgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.drop_column('user', 'removed_tag_expiration_s')
op.drop_index('repositorytag_repository_id_name', table_name='repositorytag')
op.create_index('repositorytag_repository_id_name', 'repositorytag', ['repository_id', 'name'], unique=True)
op.drop_index('repositorytag_lifetime_end_ts', table_name='repositorytag')
op.drop_column('repositorytag', 'lifetime_start_ts')
op.drop_column('repositorytag', 'lifetime_end_ts')
### end Alembic commands ###

View file

@ -53,7 +53,7 @@ def upgrade(tables):
op.create_index('queueitem_available', 'queueitem', ['available'], unique=False)
op.create_index('queueitem_available_after', 'queueitem', ['available_after'], unique=False)
op.create_index('queueitem_processing_expires', 'queueitem', ['processing_expires'], unique=False)
op.create_index('queueitem_queue_name', 'queueitem', ['queue_name'], unique=False)
op.create_index('queueitem_queue_name', 'queueitem', ['queue_name'], unique=False, mysql_length=767)
op.create_table('role',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
@ -376,7 +376,7 @@ def upgrade(tables):
sa.ForeignKeyConstraint(['storage_id'], ['imagestorage.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index('image_ancestors', 'image', ['ancestors'], unique=False)
op.create_index('image_ancestors', 'image', ['ancestors'], unique=False, mysql_length=767)
op.create_index('image_repository_id', 'image', ['repository_id'], unique=False)
op.create_index('image_repository_id_docker_image_id', 'image', ['repository_id', 'docker_image_id'], unique=True)
op.create_index('image_storage_id', 'image', ['storage_id'], unique=False)

View file

@ -0,0 +1,55 @@
"""Add signature storage
Revision ID: 5ad999136045
Revises: 228d1af6af1c
Create Date: 2015-02-05 15:01:54.989573
"""
# revision identifiers, used by Alembic.
revision = '5ad999136045'
down_revision = '228d1af6af1c'
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
def upgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.create_table('imagestoragesignaturekind',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.PrimaryKeyConstraint('id', name=op.f('pk_imagestoragesignaturekind'))
)
op.create_index('imagestoragesignaturekind_name', 'imagestoragesignaturekind', ['name'], unique=True)
op.create_table('imagestoragesignature',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('storage_id', sa.Integer(), nullable=False),
sa.Column('kind_id', sa.Integer(), nullable=False),
sa.Column('signature', sa.Text(), nullable=True),
sa.Column('uploading', sa.Boolean(), nullable=True),
sa.ForeignKeyConstraint(['kind_id'], ['imagestoragesignaturekind.id'], name=op.f('fk_imagestoragesignature_kind_id_imagestoragesignaturekind')),
sa.ForeignKeyConstraint(['storage_id'], ['imagestorage.id'], name=op.f('fk_imagestoragesignature_storage_id_imagestorage')),
sa.PrimaryKeyConstraint('id', name=op.f('pk_imagestoragesignature'))
)
op.create_index('imagestoragesignature_kind_id', 'imagestoragesignature', ['kind_id'], unique=False)
op.create_index('imagestoragesignature_kind_id_storage_id', 'imagestoragesignature', ['kind_id', 'storage_id'], unique=True)
op.create_index('imagestoragesignature_storage_id', 'imagestoragesignature', ['storage_id'], unique=False)
### end Alembic commands ###
op.bulk_insert(tables.imagestoragetransformation,
[
{'id': 2, 'name':'aci'},
])
op.bulk_insert(tables.imagestoragesignaturekind,
[
{'id': 1, 'name':'gpg2'},
])
def downgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.drop_table('imagestoragesignature')
op.drop_table('imagestoragesignaturekind')
### end Alembic commands ###

View file

@ -0,0 +1,24 @@
"""Convert slack webhook data
Revision ID: 5b84373e5db
Revises: 1c5b738283a5
Create Date: 2014-12-16 12:02:55.167744
"""
# revision identifiers, used by Alembic.
revision = '5b84373e5db'
down_revision = '1c5b738283a5'
from alembic import op
import sqlalchemy as sa
from util.migrateslackwebhook import run_slackwebhook_migration
def upgrade(tables):
run_slackwebhook_migration()
def downgrade(tables):
pass

View file

@ -2,8 +2,10 @@ import bcrypt
import logging
import dateutil.parser
import json
import time
from datetime import datetime, timedelta, date
from uuid import uuid4
from data.database import (User, Repository, Image, AccessToken, Role, RepositoryPermission,
Visibility, RepositoryTag, EmailConfirmation, FederatedLogin,
@ -14,7 +16,9 @@ from data.database import (User, Repository, Image, AccessToken, Role, Repositor
ExternalNotificationEvent, ExternalNotificationMethod,
RepositoryNotification, RepositoryAuthorizedEmail, TeamMemberInvite,
DerivedImageStorage, ImageStorageTransformation, random_string_generator,
db, BUILD_PHASE, QuayUserField, Star)
db, BUILD_PHASE, QuayUserField, ImageStorageSignature, QueueItem,
ImageStorageSignatureKind, validate_database_url, db_for_update,
AccessTokenKind, Star)
from peewee import JOIN_LEFT_OUTER, fn
from util.validation import (validate_username, validate_email, validate_password,
INVALID_PASSWORD_MESSAGE)
@ -105,12 +109,15 @@ class TooManyLoginAttemptsException(Exception):
self.retry_after = retry_after
def _get_repository(namespace_name, repository_name):
return (Repository
.select(Repository, Namespace)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(Namespace.username == namespace_name, Repository.name == repository_name)
.get())
def _get_repository(namespace_name, repository_name, for_update=False):
query = (Repository
.select(Repository, Namespace)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(Namespace.username == namespace_name, Repository.name == repository_name))
if for_update:
query = db_for_update(query)
return query.get()
def hash_password(password, salt=None):
@ -164,8 +171,7 @@ def _create_user(username, email):
pass
try:
new_user = User.create(username=username, email=email)
return new_user
return User.create(username=username, email=email)
except Exception as ex:
raise DataModelException(ex.message)
@ -295,6 +301,9 @@ def delete_robot(robot_username):
def _list_entity_robots(entity_name):
""" Return the list of robots for the specified entity. This MUST return a query, not a
materialized list so that callers can use db_for_update.
"""
return (User
.select()
.join(FederatedLogin)
@ -901,14 +910,17 @@ def change_password(user, new_password):
delete_notifications_by_kind(user, 'password_required')
def change_username(user, new_username):
def change_username(user_id, new_username):
(username_valid, username_issue) = validate_username(new_username)
if not username_valid:
raise InvalidUsernameException('Invalid username %s: %s' % (new_username, username_issue))
with config.app_config['DB_TRANSACTION_FACTORY'](db):
# Reload the user for update
user = db_for_update(User.select().where(User.id == user_id)).get()
# Rename the robots
for robot in _list_entity_robots(user.username):
for robot in db_for_update(_list_entity_robots(user.username)):
_, robot_shortname = parse_robot_username(robot.username)
new_robot_name = format_robot_username(new_username, robot_shortname)
robot.username = new_robot_name
@ -924,6 +936,11 @@ def change_invoice_email(user, invoice_email):
user.save()
def change_user_tag_expiration(user, tag_expiration_s):
user.removed_tag_expiration_s = tag_expiration_s
user.save()
def update_email(user, new_email, auto_verify=False):
user.email = new_email
user.verified = auto_verify
@ -1087,6 +1104,26 @@ def get_repository(namespace_name, repository_name):
return None
def get_image(repo, dockerfile_id):
try:
return Image.get(Image.docker_image_id == dockerfile_id, Image.repository == repo)
except Image.DoesNotExist:
return None
def find_child_image(repo, parent_image, command):
try:
return (Image.select()
.join(ImageStorage)
.switch(Image)
.where(Image.ancestors % '%/' + parent_image.id + '/%',
ImageStorage.command == command)
.order_by(ImageStorage.created.desc())
.get())
except Image.DoesNotExist:
return None
def get_repo_image(namespace_name, repository_name, docker_image_id):
def limit_to_image_id(query):
return query.where(Image.docker_image_id == docker_image_id).limit(1)
@ -1249,9 +1286,9 @@ def _find_or_link_image(existing_image, repository, username, translations, pref
storage.locations = {placement.location.name
for placement in storage.imagestorageplacement_set}
new_image = Image.create(docker_image_id=existing_image.docker_image_id,
repository=repository, storage=storage,
ancestors=new_image_ancestry)
new_image = Image.create(docker_image_id=existing_image.docker_image_id,
repository=repository, storage=storage,
ancestors=new_image_ancestry)
logger.debug('Storing translation %s -> %s', existing_image.id, new_image.id)
translations[existing_image.id] = new_image.id
@ -1315,7 +1352,28 @@ def find_create_or_link_image(docker_image_id, repository, username, translation
ancestors='/')
def find_or_create_derived_storage(source, transformation_name, preferred_location):
def find_or_create_storage_signature(storage, signature_kind):
found = lookup_storage_signature(storage, signature_kind)
if found is None:
kind = ImageStorageSignatureKind.get(name=signature_kind)
found = ImageStorageSignature.create(storage=storage, kind=kind)
return found
def lookup_storage_signature(storage, signature_kind):
kind = ImageStorageSignatureKind.get(name=signature_kind)
try:
return (ImageStorageSignature
.select()
.where(ImageStorageSignature.storage == storage,
ImageStorageSignature.kind == kind)
.get())
except ImageStorageSignature.DoesNotExist:
return None
def find_derived_storage(source, transformation_name):
try:
found = (ImageStorage
.select(ImageStorage, DerivedImageStorage)
@ -1328,11 +1386,19 @@ def find_or_create_derived_storage(source, transformation_name, preferred_locati
found.locations = {placement.location.name for placement in found.imagestorageplacement_set}
return found
except ImageStorage.DoesNotExist:
logger.debug('Creating storage dervied from source: %s', source.uuid)
trans = ImageStorageTransformation.get(name=transformation_name)
new_storage = _create_storage(preferred_location)
DerivedImageStorage.create(source=source, derivative=new_storage, transformation=trans)
return new_storage
return None
def find_or_create_derived_storage(source, transformation_name, preferred_location):
existing = find_derived_storage(source, transformation_name)
if existing is not None:
return existing
logger.debug('Creating storage dervied from source: %s', source.uuid)
trans = ImageStorageTransformation.get(name=transformation_name)
new_storage = _create_storage(preferred_location)
DerivedImageStorage.create(source=source, derivative=new_storage, transformation=trans)
return new_storage
def delete_derived_storage_by_uuid(storage_uuid):
@ -1401,7 +1467,7 @@ def set_image_metadata(docker_image_id, namespace_name, repository_name, created
Image.docker_image_id == docker_image_id))
try:
fetched = query.get()
fetched = db_for_update(query).get()
except Image.DoesNotExist:
raise DataModelException('No image with specified id and repository')
@ -1489,19 +1555,48 @@ def get_repository_images(namespace_name, repository_name):
return _get_repository_images_base(namespace_name, repository_name, lambda q: q)
def list_repository_tags(namespace_name, repository_name):
return (RepositoryTag
.select(RepositoryTag, Image)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.switch(RepositoryTag)
.join(Image)
.where(Repository.name == repository_name, Namespace.username == namespace_name))
def _tag_alive(query):
return query.where((RepositoryTag.lifetime_end_ts >> None) |
(RepositoryTag.lifetime_end_ts > int(time.time())))
def list_repository_tags(namespace_name, repository_name, include_hidden=False):
query = _tag_alive(RepositoryTag
.select(RepositoryTag, Image)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.switch(RepositoryTag)
.join(Image)
.where(Repository.name == repository_name,
Namespace.username == namespace_name))
if not include_hidden:
query = query.where(RepositoryTag.hidden == False)
return query
def _garbage_collect_tags(namespace_name, repository_name):
to_delete = (RepositoryTag
.select(RepositoryTag.id)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(Repository.name == repository_name, Namespace.username == namespace_name,
~(RepositoryTag.lifetime_end_ts >> None),
(RepositoryTag.lifetime_end_ts + Namespace.removed_tag_expiration_s) <=
int(time.time())))
(RepositoryTag
.delete()
.where(RepositoryTag.id << to_delete)
.execute())
def garbage_collect_repository(namespace_name, repository_name):
storage_id_whitelist = {}
_garbage_collect_tags(namespace_name, repository_name)
with config.app_config['DB_TRANSACTION_FACTORY'](db):
# TODO (jake): We could probably select this and all the images in a single query using
# a different kind of join.
@ -1535,12 +1630,10 @@ def garbage_collect_repository(namespace_name, repository_name):
if len(to_remove) > 0:
logger.info('Garbage collecting storage for images: %s', to_remove)
garbage_collect_storage(storage_id_whitelist)
return len(to_remove)
_garbage_collect_storage(storage_id_whitelist)
def garbage_collect_storage(storage_id_whitelist):
def _garbage_collect_storage(storage_id_whitelist):
if len(storage_id_whitelist) == 0:
return
@ -1632,10 +1725,10 @@ def garbage_collect_storage(storage_id_whitelist):
def get_tag_image(namespace_name, repository_name, tag_name):
def limit_to_tag(query):
return (query
.switch(Image)
.join(RepositoryTag)
.where(RepositoryTag.name == tag_name))
return _tag_alive(query
.switch(Image)
.join(RepositoryTag)
.where(RepositoryTag.name == tag_name))
images = _get_repository_images_base(namespace_name, repository_name, limit_to_tag)
if not images:
@ -1643,7 +1736,6 @@ def get_tag_image(namespace_name, repository_name, tag_name):
else:
return images[0]
def get_image_by_id(namespace_name, repository_name, docker_image_id):
image = get_repo_image_extended(namespace_name, repository_name, docker_image_id)
if not image:
@ -1672,45 +1764,69 @@ def get_parent_images(namespace_name, repository_name, image_obj):
def create_or_update_tag(namespace_name, repository_name, tag_name,
tag_docker_image_id):
try:
repo = _get_repository(namespace_name, repository_name)
except Repository.DoesNotExist:
raise DataModelException('Invalid repository %s/%s' % (namespace_name, repository_name))
try:
image = Image.get(Image.docker_image_id == tag_docker_image_id, Image.repository == repo)
except Image.DoesNotExist:
raise DataModelException('Invalid image with id: %s' % tag_docker_image_id)
with config.app_config['DB_TRANSACTION_FACTORY'](db):
try:
repo = _get_repository(namespace_name, repository_name)
except Repository.DoesNotExist:
raise DataModelException('Invalid repository %s/%s' % (namespace_name, repository_name))
try:
tag = RepositoryTag.get(RepositoryTag.repository == repo, RepositoryTag.name == tag_name)
tag.image = image
tag.save()
except RepositoryTag.DoesNotExist:
tag = RepositoryTag.create(repository=repo, image=image, name=tag_name)
try:
image = Image.get(Image.docker_image_id == tag_docker_image_id, Image.repository == repo)
except Image.DoesNotExist:
raise DataModelException('Invalid image with id: %s' % tag_docker_image_id)
return tag
now_ts = int(time.time())
try:
# When we move a tag, we really end the timeline of the old one and create a new one
query = _tag_alive(RepositoryTag
.select()
.where(RepositoryTag.repository == repo, RepositoryTag.name == tag_name))
tag = query.get()
tag.lifetime_end_ts = now_ts
tag.save()
except RepositoryTag.DoesNotExist:
# No tag that needs to be ended
pass
return RepositoryTag.create(repository=repo, image=image, name=tag_name,
lifetime_start_ts=now_ts)
def delete_tag(namespace_name, repository_name, tag_name):
try:
found = (RepositoryTag
.select()
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(Repository.name == repository_name, Namespace.username == namespace_name,
RepositoryTag.name == tag_name)
.get())
with config.app_config['DB_TRANSACTION_FACTORY'](db):
try:
query = _tag_alive(RepositoryTag
.select(RepositoryTag, Repository)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(Repository.name == repository_name,
Namespace.username == namespace_name,
RepositoryTag.name == tag_name))
found = db_for_update(query).get()
except RepositoryTag.DoesNotExist:
msg = ('Invalid repository tag \'%s\' on repository \'%s/%s\'' %
(tag_name, namespace_name, repository_name))
raise DataModelException(msg)
except RepositoryTag.DoesNotExist:
msg = ('Invalid repository tag \'%s\' on repository \'%s/%s\'' %
(tag_name, namespace_name, repository_name))
raise DataModelException(msg)
found.delete_instance()
found.lifetime_end_ts = int(time.time())
found.save()
def delete_all_repository_tags(namespace_name, repository_name):
def create_temporary_hidden_tag(repo, image, expiration_s):
""" Create a tag with a defined timeline, that will not appear in the UI or CLI. Returns the name
of the temporary tag. """
now_ts = int(time.time())
expire_ts = now_ts + expiration_s
tag_name = str(uuid4())
RepositoryTag.create(repository=repo, image=image, name=tag_name, lifetime_start_ts=now_ts,
lifetime_end_ts=expire_ts, hidden=True)
return tag_name
def purge_all_repository_tags(namespace_name, repository_name):
""" Immediately purge all repository tags without respecting the lifeline procedure """
try:
repo = _get_repository(namespace_name, repository_name)
except Repository.DoesNotExist:
@ -1825,7 +1941,7 @@ def set_team_repo_permission(team_name, namespace_name, repository_name,
def purge_repository(namespace_name, repository_name):
# Delete all tags to allow gc to reclaim storage
delete_all_repository_tags(namespace_name, repository_name)
purge_all_repository_tags(namespace_name, repository_name)
# Gc to remove the images and storage
garbage_collect_repository(namespace_name, repository_name)
@ -1845,10 +1961,14 @@ def get_private_repo_count(username):
.count())
def create_access_token(repository, role):
def create_access_token(repository, role, kind=None, friendly_name=None):
role = Role.get(Role.name == role)
kind_ref = None
if kind is not None:
kind_ref = AccessTokenKind.get(AccessTokenKind.name == kind)
new_token = AccessToken.create(repository=repository, temporary=True,
role=role)
role=role, kind=kind_ref, friendly_name=friendly_name)
return new_token
@ -1967,10 +2087,10 @@ def create_repository_build(repo, access_token, job_config_obj, dockerfile_id,
pull_robot = lookup_robot(pull_robot_name)
return RepositoryBuild.create(repository=repo, access_token=access_token,
job_config=json.dumps(job_config_obj),
display_name=display_name, trigger=trigger,
resource_key=dockerfile_id,
pull_robot=pull_robot)
job_config=json.dumps(job_config_obj),
display_name=display_name, trigger=trigger,
resource_key=dockerfile_id,
pull_robot=pull_robot)
def get_pull_robot_name(trigger):
@ -2255,11 +2375,20 @@ def delete_user(user):
# TODO: also delete any repository data associated
def check_health():
def check_health(app_config):
# Attempt to connect to the database first. If the DB is not responding,
# using the validate_database_url will timeout quickly, as opposed to
# making a normal connect which will just hang (thus breaking the health
# check).
try:
validate_database_url(app_config['DB_URI'], connect_timeout=3)
except Exception:
logger.exception('Could not connect to the database')
return False
# We will connect to the db, check that it contains some log entry kinds
try:
found_count = LogEntryKind.select().count()
return found_count > 0
return bool(list(LogEntryKind.select().limit(1)))
except:
return False
@ -2365,6 +2494,32 @@ def confirm_team_invite(code, user):
found.delete_instance()
return (team, inviter)
def cancel_repository_build(build):
with config.app_config['DB_TRANSACTION_FACTORY'](db):
# Reload the build for update.
try:
build = db_for_update(RepositoryBuild.select().where(RepositoryBuild.id == build.id)).get()
except RepositoryBuild.DoesNotExist:
return False
if build.phase != BUILD_PHASE.WAITING or not build.queue_item:
return False
# Load the build queue item for update.
try:
queue_item = db_for_update(QueueItem.select()
.where(QueueItem.id == build.queue_item.id)).get()
except QueueItem.DoesNotExist:
return False
# Check the queue item.
if not queue_item.available or queue_item.retries_remaining == 0:
return False
# Delete the queue item and build.
queue_item.delete_instance(recursive=True)
build.delete_instance()
return True
def get_repository_usage():
one_month_ago = date.today() - timedelta(weeks=4)

View file

@ -1,11 +1,17 @@
from datetime import datetime, timedelta
from data.database import QueueItem, db
from data.database import QueueItem, db, db_for_update
from util.morecollections import AttrDict
MINIMUM_EXTENSION = timedelta(seconds=20)
class NoopWith:
def __enter__(self):
pass
def __exit__(self, type, value, traceback):
pass
class WorkQueue(object):
def __init__(self, queue_name, transaction_factory,
@ -31,31 +37,50 @@ class WorkQueue(object):
QueueItem.processing_expires > now,
QueueItem.queue_name ** name_match_query))
def _available_jobs(self, now, name_match_query, running_query):
def _available_jobs(self, now, name_match_query):
return (QueueItem
.select()
.where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now,
((QueueItem.available == True) | (QueueItem.processing_expires <= now)),
QueueItem.retries_remaining > 0, ~(QueueItem.queue_name << running_query)))
QueueItem.retries_remaining > 0))
def _available_jobs_not_running(self, now, name_match_query, running_query):
return (self
._available_jobs(now, name_match_query)
.where(~(QueueItem.queue_name << running_query)))
def _name_match_query(self):
return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list)
def update_metrics(self):
if self._reporter is None:
return
def _item_by_id_for_update(self, queue_id):
return db_for_update(QueueItem.select().where(QueueItem.id == queue_id)).get()
with self._transaction_factory(db):
def get_metrics(self, require_transaction=True):
guard = self._transaction_factory(db) if require_transaction else NoopWith()
with guard:
now = datetime.utcnow()
name_match_query = self._name_match_query()
running_query = self._running_jobs(now, name_match_query)
running_count = running_query.distinct().count()
avialable_query = self._available_jobs(now, name_match_query, running_query)
available_count = avialable_query.select(QueueItem.queue_name).distinct().count()
available_query = self._available_jobs(now, name_match_query)
available_count = available_query.select(QueueItem.queue_name).distinct().count()
self._reporter(self._currently_processing, running_count, running_count + available_count)
available_not_running_query = self._available_jobs_not_running(now, name_match_query,
running_query)
available_not_running_count = (available_not_running_query.select(QueueItem.queue_name)
.distinct().count())
return (running_count, available_not_running_count, available_count)
def update_metrics(self):
if self._reporter is None:
return
(running_count, available_not_running_count, available_count) = self.get_metrics()
self._reporter(self._currently_processing, running_count,
running_count + available_not_running_count)
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
"""
@ -73,24 +98,31 @@ class WorkQueue(object):
params['available_after'] = available_date
with self._transaction_factory(db):
QueueItem.create(**params)
return QueueItem.create(**params)
def get(self, processing_time=300):
"""
Get an available item and mark it as unavailable for the default of five
minutes.
minutes. The result of this method must always be composed of simple
python objects which are JSON serializable for network portability reasons.
"""
now = datetime.utcnow()
name_match_query = self._name_match_query()
with self._transaction_factory(db):
running = self._running_jobs(now, name_match_query)
avail = self._available_jobs(now, name_match_query, running)
running = self._running_jobs(now, name_match_query)
avail = self._available_jobs_not_running(now, name_match_query, running)
item = None
try:
db_item = avail.order_by(QueueItem.id).get()
item = None
try:
db_item_candidate = avail.order_by(QueueItem.id).get()
with self._transaction_factory(db):
still_available_query = (db_for_update(self
._available_jobs(now, name_match_query)
.where(QueueItem.id == db_item_candidate.id)))
db_item = still_available_query.get()
db_item.available = False
db_item.processing_expires = now + timedelta(seconds=processing_time)
db_item.retries_remaining -= 1
@ -99,25 +131,26 @@ class WorkQueue(object):
item = AttrDict({
'id': db_item.id,
'body': db_item.body,
'retries_remaining': db_item.retries_remaining
})
self._currently_processing = True
except QueueItem.DoesNotExist:
self._currently_processing = False
except QueueItem.DoesNotExist:
self._currently_processing = False
# Return a view of the queue item rather than an active db object
return item
# Return a view of the queue item rather than an active db object
return item
def complete(self, completed_item):
with self._transaction_factory(db):
completed_item_obj = QueueItem.get(QueueItem.id == completed_item.id)
completed_item_obj = self._item_by_id_for_update(completed_item.id)
completed_item_obj.delete_instance()
self._currently_processing = False
def incomplete(self, incomplete_item, retry_after=300, restore_retry=False):
with self._transaction_factory(db):
retry_date = datetime.utcnow() + timedelta(seconds=retry_after)
incomplete_item_obj = QueueItem.get(QueueItem.id == incomplete_item.id)
incomplete_item_obj = self._item_by_id_for_update(incomplete_item.id)
incomplete_item_obj.available_after = retry_date
incomplete_item_obj.available = True
@ -126,16 +159,14 @@ class WorkQueue(object):
incomplete_item_obj.save()
self._currently_processing = False
return incomplete_item_obj.retries_remaining > 0
@staticmethod
def extend_processing(queue_item, seconds_from_now, retry_count=None,
minimum_extension=MINIMUM_EXTENSION):
new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now)
def extend_processing(self, item, seconds_from_now, minimum_extension=MINIMUM_EXTENSION):
with self._transaction_factory(db):
queue_item = self._item_by_id_for_update(item.id)
new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now)
# Only actually write the new expiration to the db if it moves the expiration some minimum
if new_expiration - queue_item.processing_expires > minimum_extension:
if retry_count is not None:
queue_item.retries_remaining = retry_count
queue_item.processing_expires = new_expiration
queue_item.save()
# Only actually write the new expiration to the db if it moves the expiration some minimum
if new_expiration - queue_item.processing_expires > minimum_extension:
queue_item.processing_expires = new_expiration
queue_item.save()

20
data/runmigration.py Normal file
View file

@ -0,0 +1,20 @@
import logging
from alembic.config import Config
from alembic.script import ScriptDirectory
from alembic.environment import EnvironmentContext
from alembic.migration import __name__ as migration_name
def run_alembic_migration(log_handler=None):
if log_handler:
logging.getLogger(migration_name).addHandler(log_handler)
config = Config()
config.set_main_option("script_location", "data:migrations")
script = ScriptDirectory.from_config(config)
def fn(rev, context):
return script._upgrade_revs('head', rev)
with EnvironmentContext(config, script, fn=fn, destination_rev='head'):
script.run_env()