This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/data/database.py
2015-10-23 13:49:23 -04:00

820 lines
25 KiB
Python

import string
import logging
import uuid
import time
import toposort
import resumablehashlib
from random import SystemRandom
from datetime import datetime
from peewee import *
from data.read_slave import ReadSlaveModel
from data.fields import ResumableSHAField, JSONField
from sqlalchemy.engine.url import make_url
from collections import defaultdict
from util.names import urn_generator
logger = logging.getLogger(__name__)
SCHEME_DRIVERS = {
'mysql': MySQLDatabase,
'mysql+pymysql': MySQLDatabase,
'sqlite': SqliteDatabase,
'postgresql': PostgresqlDatabase,
'postgresql+psycopg2': PostgresqlDatabase,
}
SCHEME_RANDOM_FUNCTION = {
'mysql': fn.Rand,
'mysql+pymysql': fn.Rand,
'sqlite': fn.Random,
'postgresql': fn.Random,
'postgresql+psycopg2': fn.Random,
}
def real_for_update(query):
return query.for_update()
def null_for_update(query):
return query
SCHEME_SPECIALIZED_FOR_UPDATE = {
'sqlite': null_for_update,
}
class CallableProxy(Proxy):
def __call__(self, *args, **kwargs):
if self.obj is None:
raise AttributeError('Cannot use uninitialized Proxy.')
return self.obj(*args, **kwargs)
class CloseForLongOperation(object):
""" Helper object which disconnects the database then reconnects after the nested operation
completes.
"""
def __init__(self, config_object):
self.config_object = config_object
def __enter__(self):
close_db_filter(None)
def __exit__(self, type, value, traceback):
# Note: Nothing to do. The next SQL call will reconnect automatically.
pass
class UseThenDisconnect(object):
""" Helper object for conducting work with a database and then tearing it down. """
def __init__(self, config_object):
self.config_object = config_object
def __enter__(self):
configure(self.config_object)
def __exit__(self, type, value, traceback):
close_db_filter(None)
class TupleSelector(object):
""" Helper class for selecting tuples from a peewee query and easily accessing
them as if they were objects.
"""
class _TupleWrapper(object):
def __init__(self, data, fields):
self._data = data
self._fields = fields
def get(self, field):
return self._data[self._fields.index(TupleSelector.tuple_reference_key(field))]
@classmethod
def tuple_reference_key(cls, field):
""" Returns a string key for referencing a field in a TupleSelector. """
if field._node_type == 'func':
return field.name + ','.join([cls.tuple_reference_key(arg) for arg in field.arguments])
if field._node_type == 'field':
return field.name + ':' + field.model_class.__name__
raise Exception('Unknown field type %s in TupleSelector' % field._node_type)
def __init__(self, query, fields):
self._query = query.select(*fields).tuples()
self._fields = [TupleSelector.tuple_reference_key(field) for field in fields]
def __iter__(self):
return self._build_iterator()
def _build_iterator(self):
for tuple_data in self._query:
yield TupleSelector._TupleWrapper(tuple_data, self._fields)
db = Proxy()
read_slave = Proxy()
db_random_func = CallableProxy()
db_for_update = CallableProxy()
db_transaction = CallableProxy()
def validate_database_url(url, db_kwargs, connect_timeout=5):
db_kwargs = db_kwargs.copy()
db_kwargs['connect_timeout'] = connect_timeout
driver = _db_from_url(url, db_kwargs)
driver.connect()
driver.close()
def _db_from_url(url, db_kwargs):
parsed_url = make_url(url)
if parsed_url.host:
db_kwargs['host'] = parsed_url.host
if parsed_url.port:
db_kwargs['port'] = parsed_url.port
if parsed_url.username:
db_kwargs['user'] = parsed_url.username
if parsed_url.password:
db_kwargs['password'] = parsed_url.password
# Note: sqlite does not support connect_timeout.
if parsed_url.drivername == 'sqlite' and 'connect_timeout' in db_kwargs:
del db_kwargs['connect_timeout']
return SCHEME_DRIVERS[parsed_url.drivername](parsed_url.database, **db_kwargs)
def configure(config_object):
logger.debug('Configuring database')
db_kwargs = dict(config_object['DB_CONNECTION_ARGS'])
write_db_uri = config_object['DB_URI']
db.initialize(_db_from_url(write_db_uri, db_kwargs))
parsed_write_uri = make_url(write_db_uri)
db_random_func.initialize(SCHEME_RANDOM_FUNCTION[parsed_write_uri.drivername])
db_for_update.initialize(SCHEME_SPECIALIZED_FOR_UPDATE.get(parsed_write_uri.drivername,
real_for_update))
read_slave_uri = config_object.get('DB_READ_SLAVE_URI', None)
if read_slave_uri is not None:
read_slave.initialize(_db_from_url(read_slave_uri, db_kwargs))
def _db_transaction():
return config_object['DB_TRANSACTION_FACTORY'](db)
db_transaction.initialize(_db_transaction)
def random_string_generator(length=16):
def random_string():
random = SystemRandom()
return ''.join([random.choice(string.ascii_uppercase + string.digits)
for _ in range(length)])
return random_string
def uuid_generator():
return str(uuid.uuid4())
get_epoch_timestamp = lambda: int(time.time())
def close_db_filter(_):
if not db.is_closed():
logger.debug('Disconnecting from database.')
db.close()
if read_slave.obj is not None and not read_slave.is_closed():
logger.debug('Disconnecting from read slave.')
read_slave.close()
class QuayUserField(ForeignKeyField):
def __init__(self, allows_robots=False, robot_null_delete=False, *args, **kwargs):
self.allows_robots = allows_robots
self.robot_null_delete = robot_null_delete
if not 'rel_model' in kwargs:
kwargs['rel_model'] = User
super(QuayUserField, self).__init__(*args, **kwargs)
class BaseModel(ReadSlaveModel):
class Meta:
database = db
read_slaves = (read_slave,)
def __getattribute__(self, name):
""" Adds _id accessors so that foreign key field IDs can be looked up without making
a database roundtrip.
"""
if name.endswith('_id'):
field_name = name[0:len(name) - 3]
if field_name in self._meta.fields:
return self._data.get(field_name)
return super(BaseModel, self).__getattribute__(name)
class User(BaseModel):
uuid = CharField(default=uuid_generator, max_length=36, null=True)
username = CharField(unique=True, index=True)
password_hash = CharField(null=True)
email = CharField(unique=True, index=True,
default=random_string_generator(length=64))
verified = BooleanField(default=False)
stripe_id = CharField(index=True, null=True)
organization = BooleanField(default=False, index=True)
robot = BooleanField(default=False, index=True)
invoice_email = BooleanField(default=False)
invalid_login_attempts = IntegerField(default=0)
last_invalid_login = DateTimeField(default=datetime.utcnow)
removed_tag_expiration_s = IntegerField(default=1209600) # Two weeks
enabled = BooleanField(default=True)
def delete_instance(self, recursive=False, delete_nullable=False):
# If we are deleting a robot account, only execute the subset of queries necessary.
if self.robot:
# For all the model dependencies, only delete those that allow robots.
for query, fk in reversed(list(self.dependencies(search_nullable=True))):
if isinstance(fk, QuayUserField) and fk.allows_robots:
model = fk.model_class
if fk.robot_null_delete:
model.update(**{fk.name: None}).where(query).execute()
else:
model.delete().where(query).execute()
# Delete the instance itself.
super(User, self).delete_instance(recursive=False, delete_nullable=False)
else:
super(User, self).delete_instance(recursive=recursive, delete_nullable=delete_nullable)
Namespace = User.alias()
class TeamRole(BaseModel):
name = CharField(index=True)
class Team(BaseModel):
name = CharField(index=True)
organization = QuayUserField(index=True)
role = ForeignKeyField(TeamRole)
description = TextField(default='')
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
# A team name must be unique within an organization
(('name', 'organization'), True),
)
class TeamMember(BaseModel):
user = QuayUserField(allows_robots=True, index=True)
team = ForeignKeyField(Team, index=True)
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
# A user may belong to a team only once
(('user', 'team'), True),
)
class TeamMemberInvite(BaseModel):
# Note: Either user OR email will be filled in, but not both.
user = QuayUserField(index=True, null=True)
email = CharField(null=True)
team = ForeignKeyField(Team, index=True)
inviter = ForeignKeyField(User, related_name='inviter')
invite_token = CharField(default=urn_generator(['teaminvite']))
class LoginService(BaseModel):
name = CharField(unique=True, index=True)
class FederatedLogin(BaseModel):
user = QuayUserField(allows_robots=True, index=True)
service = ForeignKeyField(LoginService, index=True)
service_ident = CharField()
metadata_json = TextField(default='{}')
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
# create a unique index on service and the local service id
(('service', 'service_ident'), True),
# a user may only have one federated login per service
(('service', 'user'), True),
)
class Visibility(BaseModel):
name = CharField(index=True, unique=True)
class Repository(BaseModel):
namespace_user = QuayUserField(null=True)
name = CharField()
visibility = ForeignKeyField(Visibility)
description = TextField(null=True)
badge_token = CharField(default=uuid_generator)
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
# create a unique index on namespace and name
(('namespace_user', 'name'), True),
)
def delete_instance(self, recursive=False, delete_nullable=False):
if not recursive:
raise RuntimeError('Non-recursive delete on repository.')
# These models don't need to use transitive deletes, because the referenced objects
# are cleaned up directly
skip_transitive_deletes = {RepositoryTag, RepositoryBuild, RepositoryBuildTrigger, BlobUpload}
# We need to sort the ops so that models get cleaned in order of their dependencies
ops = reversed(list(self.dependencies(delete_nullable)))
filtered_ops = []
dependencies = defaultdict(set)
for query, fk in ops:
# We only want to skip transitive deletes, which are done using subqueries in the form of
# DELETE FROM <table> in <subquery>. If an op is not using a subquery, we allow it to be
# applied directly.
if fk.model_class not in skip_transitive_deletes or query.op != 'in':
filtered_ops.append((query, fk))
if query.op == 'in':
dependencies[fk.model_class.__name__].add(query.rhs.model_class.__name__)
elif query.op == '=':
dependencies[fk.model_class.__name__].add(Repository.__name__)
else:
raise RuntimeError('Unknown operator in recursive repository delete query')
sorted_models = list(reversed(toposort.toposort_flatten(dependencies)))
def sorted_model_key(query_fk_tuple):
cmp_query, cmp_fk = query_fk_tuple
if cmp_query.op == 'in':
return -1
return sorted_models.index(cmp_fk.model_class.__name__)
filtered_ops.sort(key=sorted_model_key)
with db_transaction():
for query, fk in filtered_ops:
model = fk.model_class
if fk.null and not delete_nullable:
model.update(**{fk.name: None}).where(query).execute()
else:
model.delete().where(query).execute()
return self.delete().where(self._pk_expr()).execute()
class Star(BaseModel):
user = ForeignKeyField(User, index=True)
repository = ForeignKeyField(Repository, index=True)
created = DateTimeField(default=datetime.now)
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
# create a unique index on user and repository
(('user', 'repository'), True),
)
class Role(BaseModel):
name = CharField(index=True, unique=True)
class RepositoryPermission(BaseModel):
team = ForeignKeyField(Team, index=True, null=True)
user = QuayUserField(allows_robots=True, index=True, null=True)
repository = ForeignKeyField(Repository, index=True)
role = ForeignKeyField(Role)
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
(('team', 'repository'), True),
(('user', 'repository'), True),
)
class PermissionPrototype(BaseModel):
org = QuayUserField(index=True, related_name='orgpermissionproto')
uuid = CharField(default=uuid_generator)
activating_user = QuayUserField(allows_robots=True, index=True, null=True,
related_name='userpermissionproto')
delegate_user = QuayUserField(allows_robots=True,related_name='receivingpermission',
null=True)
delegate_team = ForeignKeyField(Team, related_name='receivingpermission',
null=True)
role = ForeignKeyField(Role)
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
(('org', 'activating_user'), False),
)
class AccessTokenKind(BaseModel):
name = CharField(unique=True, index=True)
class AccessToken(BaseModel):
friendly_name = CharField(null=True)
code = CharField(default=random_string_generator(length=64), unique=True,
index=True)
repository = ForeignKeyField(Repository)
created = DateTimeField(default=datetime.now)
role = ForeignKeyField(Role)
temporary = BooleanField(default=True)
kind = ForeignKeyField(AccessTokenKind, null=True)
class BuildTriggerService(BaseModel):
name = CharField(index=True, unique=True)
class RepositoryBuildTrigger(BaseModel):
uuid = CharField(default=uuid_generator)
service = ForeignKeyField(BuildTriggerService, index=True)
repository = ForeignKeyField(Repository, index=True)
connected_user = QuayUserField()
auth_token = CharField(null=True)
private_key = TextField(null=True)
config = TextField(default='{}')
write_token = ForeignKeyField(AccessToken, null=True)
pull_robot = QuayUserField(allows_robots=True, null=True, related_name='triggerpullrobot',
robot_null_delete=True)
# TODO(jschorr): Remove this column once we verify the backfill has succeeded.
used_legacy_github = BooleanField(null=True, default=False)
class EmailConfirmation(BaseModel):
code = CharField(default=random_string_generator(), unique=True, index=True)
user = QuayUserField()
pw_reset = BooleanField(default=False)
new_email = CharField(null=True)
email_confirm = BooleanField(default=False)
created = DateTimeField(default=datetime.now)
class ImageStorage(BaseModel):
uuid = CharField(default=uuid_generator, index=True, unique=True)
checksum = CharField(null=True)
image_size = BigIntegerField(null=True)
uncompressed_size = BigIntegerField(null=True)
uploading = BooleanField(default=True, null=True)
cas_path = BooleanField(default=True)
class ImageStorageTransformation(BaseModel):
name = CharField(index=True, unique=True)
class ImageStorageSignatureKind(BaseModel):
name = CharField(index=True, unique=True)
class ImageStorageSignature(BaseModel):
storage = ForeignKeyField(ImageStorage, index=True)
kind = ForeignKeyField(ImageStorageSignatureKind)
signature = TextField(null=True)
uploading = BooleanField(default=True, null=True)
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
(('kind', 'storage'), True),
)
class DerivedImageStorage(BaseModel):
source = ForeignKeyField(ImageStorage, null=True, related_name='source')
derivative = ForeignKeyField(ImageStorage, related_name='derivative')
transformation = ForeignKeyField(ImageStorageTransformation)
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
(('source', 'transformation'), True),
)
class ImageStorageLocation(BaseModel):
name = CharField(unique=True, index=True)
class ImageStoragePlacement(BaseModel):
storage = ForeignKeyField(ImageStorage)
location = ForeignKeyField(ImageStorageLocation)
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
# An image can only be placed in the same place once
(('storage', 'location'), True),
)
class UserRegion(BaseModel):
user = QuayUserField(index=True, allows_robots=False)
location = ForeignKeyField(ImageStorageLocation)
indexes = (
(('user', 'location'), True),
)
class Image(BaseModel):
# This class is intentionally denormalized. Even though images are supposed
# to be globally unique we can't treat them as such for permissions and
# security reasons. So rather than Repository <-> Image being many to many
# each image now belongs to exactly one repository.
docker_image_id = CharField(index=True)
repository = ForeignKeyField(Repository)
# '/' separated list of ancestory ids, e.g. /1/2/6/7/10/
ancestors = CharField(index=True, default='/', max_length=64535, null=True)
storage = ForeignKeyField(ImageStorage, index=True, null=True)
created = DateTimeField(null=True)
comment = TextField(null=True)
command = TextField(null=True)
aggregate_size = BigIntegerField(null=True)
v1_json_metadata = TextField(null=True)
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
# we don't really want duplicates
(('repository', 'docker_image_id'), True),
)
class RepositoryTag(BaseModel):
name = CharField()
image = ForeignKeyField(Image)
repository = ForeignKeyField(Repository)
lifetime_start_ts = IntegerField(default=get_epoch_timestamp)
lifetime_end_ts = IntegerField(null=True, index=True)
hidden = BooleanField(default=False)
reversion = BooleanField(default=False)
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
(('repository', 'name'), False),
# This unique index prevents deadlocks when concurrently moving and deleting tags
(('repository', 'name', 'lifetime_end_ts'), True),
)
class TagManifest(BaseModel):
tag = ForeignKeyField(RepositoryTag, index=True, unique=True)
digest = CharField(index=True, unique=True)
json_data = TextField()
class BUILD_PHASE(object):
""" Build phases enum """
ERROR = 'error'
INTERNAL_ERROR = 'internalerror'
BUILD_SCHEDULED = 'build-scheduled'
UNPACKING = 'unpacking'
PULLING = 'pulling'
BUILDING = 'building'
PUSHING = 'pushing'
WAITING = 'waiting'
COMPLETE = 'complete'
class QueueItem(BaseModel):
queue_name = CharField(index=True, max_length=1024)
body = TextField()
available_after = DateTimeField(default=datetime.utcnow, index=True)
available = BooleanField(default=True, index=True)
processing_expires = DateTimeField(null=True, index=True)
retries_remaining = IntegerField(default=5, index=True)
class RepositoryBuild(BaseModel):
uuid = CharField(default=uuid_generator, index=True)
repository = ForeignKeyField(Repository, index=True)
access_token = ForeignKeyField(AccessToken)
resource_key = CharField(index=True, null=True)
job_config = TextField()
phase = CharField(default=BUILD_PHASE.WAITING)
started = DateTimeField(default=datetime.now)
display_name = CharField()
trigger = ForeignKeyField(RepositoryBuildTrigger, null=True, index=True)
pull_robot = QuayUserField(null=True, related_name='buildpullrobot', allows_robots=True,
robot_null_delete=True)
logs_archived = BooleanField(default=False)
queue_id = CharField(null=True, index=True)
class LogEntryKind(BaseModel):
name = CharField(index=True, unique=True)
class LogEntry(BaseModel):
kind = ForeignKeyField(LogEntryKind, index=True)
account = QuayUserField(index=True, related_name='account')
performer = QuayUserField(allows_robots=True, index=True, null=True,
related_name='performer', robot_null_delete=True)
repository = ForeignKeyField(Repository, index=True, null=True)
datetime = DateTimeField(default=datetime.now, index=True)
ip = CharField(null=True)
metadata_json = TextField(default='{}')
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
# create an index on repository and date
(('repository', 'datetime'), False),
# create an index on repository, date and kind
(('repository', 'datetime', 'kind'), False),
)
class RepositoryActionCount(BaseModel):
repository = ForeignKeyField(Repository, index=True)
count = IntegerField()
date = DateField(index=True)
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
# create a unique index on repository and date
(('repository', 'date'), True),
)
class OAuthApplication(BaseModel):
client_id = CharField(index=True, default=random_string_generator(length=20))
client_secret = CharField(default=random_string_generator(length=40))
redirect_uri = CharField()
application_uri = CharField()
organization = QuayUserField()
name = CharField()
description = TextField(default='')
avatar_email = CharField(null=True, db_column='gravatar_email')
class OAuthAuthorizationCode(BaseModel):
application = ForeignKeyField(OAuthApplication)
code = CharField(index=True)
scope = CharField()
data = TextField() # Context for the code, such as the user
class OAuthAccessToken(BaseModel):
uuid = CharField(default=uuid_generator, index=True)
application = ForeignKeyField(OAuthApplication)
authorized_user = QuayUserField()
scope = CharField()
access_token = CharField(index=True)
token_type = CharField(default='Bearer')
expires_at = DateTimeField()
refresh_token = CharField(index=True, null=True)
data = TextField() # This is context for which this token was generated, such as the user
class NotificationKind(BaseModel):
name = CharField(index=True, unique=True)
class Notification(BaseModel):
uuid = CharField(default=uuid_generator, index=True)
kind = ForeignKeyField(NotificationKind, index=True)
target = QuayUserField(index=True)
metadata_json = TextField(default='{}')
created = DateTimeField(default=datetime.now, index=True)
dismissed = BooleanField(default=False)
class ExternalNotificationEvent(BaseModel):
name = CharField(index=True, unique=True)
class ExternalNotificationMethod(BaseModel):
name = CharField(index=True, unique=True)
class RepositoryNotification(BaseModel):
uuid = CharField(default=uuid_generator, index=True)
repository = ForeignKeyField(Repository, index=True)
event = ForeignKeyField(ExternalNotificationEvent)
method = ForeignKeyField(ExternalNotificationMethod)
title = CharField(null=True)
config_json = TextField()
class RepositoryAuthorizedEmail(BaseModel):
repository = ForeignKeyField(Repository, index=True)
email = CharField()
code = CharField(default=random_string_generator(), unique=True, index=True)
confirmed = BooleanField(default=False)
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
# create a unique index on email and repository
(('email', 'repository'), True),
)
class BlobUpload(BaseModel):
repository = ForeignKeyField(Repository, index=True)
uuid = CharField(index=True, unique=True)
byte_count = IntegerField(default=0)
sha_state = ResumableSHAField(null=True, default=resumablehashlib.sha256)
location = ForeignKeyField(ImageStorageLocation)
storage_metadata = JSONField(null=True, default={})
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
# create a unique index on email and repository
(('repository', 'uuid'), True),
)
class QuayService(BaseModel):
name = CharField(index=True, unique=True)
class QuayRegion(BaseModel):
name = CharField(index=True, unique=True)
class QuayRelease(BaseModel):
service = ForeignKeyField(QuayService)
version = CharField()
region = ForeignKeyField(QuayRegion)
reverted = BooleanField(default=False)
created = DateTimeField(default=datetime.now, index=True)
class Meta:
database = db
read_slaves = (read_slave,)
indexes = (
# unique release per region
(('service', 'version', 'region'), True),
# get recent releases
(('service', 'region', 'created'), False),
)
all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission, Visibility,
RepositoryTag, EmailConfirmation, FederatedLogin, LoginService, QueueItem,
RepositoryBuild, Team, TeamMember, TeamRole, LogEntryKind, LogEntry,
PermissionPrototype, ImageStorage, BuildTriggerService, RepositoryBuildTrigger,
OAuthApplication, OAuthAuthorizationCode, OAuthAccessToken, NotificationKind,
Notification, ImageStorageLocation, ImageStoragePlacement,
ExternalNotificationEvent, ExternalNotificationMethod, RepositoryNotification,
RepositoryAuthorizedEmail, ImageStorageTransformation, DerivedImageStorage,
TeamMemberInvite, ImageStorageSignature, ImageStorageSignatureKind,
AccessTokenKind, Star, RepositoryActionCount, TagManifest, UserRegion,
QuayService, QuayRegion, QuayRelease, BlobUpload]