import string import logging import uuid import time import toposort import resumablehashlib from random import SystemRandom from datetime import datetime from peewee import * from data.read_slave import ReadSlaveModel from data.fields import ResumableSHAField, JSONField from sqlalchemy.engine.url import make_url from collections import defaultdict from util.names import urn_generator logger = logging.getLogger(__name__) SCHEME_DRIVERS = { 'mysql': MySQLDatabase, 'mysql+pymysql': MySQLDatabase, 'sqlite': SqliteDatabase, 'postgresql': PostgresqlDatabase, 'postgresql+psycopg2': PostgresqlDatabase, } SCHEME_RANDOM_FUNCTION = { 'mysql': fn.Rand, 'mysql+pymysql': fn.Rand, 'sqlite': fn.Random, 'postgresql': fn.Random, 'postgresql+psycopg2': fn.Random, } def real_for_update(query): return query.for_update() def null_for_update(query): return query SCHEME_SPECIALIZED_FOR_UPDATE = { 'sqlite': null_for_update, } class CallableProxy(Proxy): def __call__(self, *args, **kwargs): if self.obj is None: raise AttributeError('Cannot use uninitialized Proxy.') return self.obj(*args, **kwargs) class CloseForLongOperation(object): """ Helper object which disconnects the database then reconnects after the nested operation completes. """ def __init__(self, config_object): self.config_object = config_object def __enter__(self): close_db_filter(None) def __exit__(self, type, value, traceback): # Note: Nothing to do. The next SQL call will reconnect automatically. pass class UseThenDisconnect(object): """ Helper object for conducting work with a database and then tearing it down. """ def __init__(self, config_object): self.config_object = config_object def __enter__(self): configure(self.config_object) def __exit__(self, type, value, traceback): close_db_filter(None) class TupleSelector(object): """ Helper class for selecting tuples from a peewee query and easily accessing them as if they were objects. """ class _TupleWrapper(object): def __init__(self, data, fields): self._data = data self._fields = fields def get(self, field): return self._data[self._fields.index(TupleSelector.tuple_reference_key(field))] @classmethod def tuple_reference_key(cls, field): """ Returns a string key for referencing a field in a TupleSelector. """ if field._node_type == 'func': return field.name + ','.join([cls.tuple_reference_key(arg) for arg in field.arguments]) if field._node_type == 'field': return field.name + ':' + field.model_class.__name__ raise Exception('Unknown field type %s in TupleSelector' % field._node_type) def __init__(self, query, fields): self._query = query.select(*fields).tuples() self._fields = [TupleSelector.tuple_reference_key(field) for field in fields] def __iter__(self): return self._build_iterator() def _build_iterator(self): for tuple_data in self._query: yield TupleSelector._TupleWrapper(tuple_data, self._fields) db = Proxy() read_slave = Proxy() db_random_func = CallableProxy() db_for_update = CallableProxy() db_transaction = CallableProxy() def validate_database_url(url, db_kwargs, connect_timeout=5): db_kwargs = db_kwargs.copy() db_kwargs['connect_timeout'] = connect_timeout driver = _db_from_url(url, db_kwargs) driver.connect() driver.close() def _db_from_url(url, db_kwargs): parsed_url = make_url(url) if parsed_url.host: db_kwargs['host'] = parsed_url.host if parsed_url.port: db_kwargs['port'] = parsed_url.port if parsed_url.username: db_kwargs['user'] = parsed_url.username if parsed_url.password: db_kwargs['password'] = parsed_url.password # Note: sqlite does not support connect_timeout. if parsed_url.drivername == 'sqlite' and 'connect_timeout' in db_kwargs: del db_kwargs['connect_timeout'] return SCHEME_DRIVERS[parsed_url.drivername](parsed_url.database, **db_kwargs) def configure(config_object): logger.debug('Configuring database') db_kwargs = dict(config_object['DB_CONNECTION_ARGS']) write_db_uri = config_object['DB_URI'] db.initialize(_db_from_url(write_db_uri, db_kwargs)) parsed_write_uri = make_url(write_db_uri) db_random_func.initialize(SCHEME_RANDOM_FUNCTION[parsed_write_uri.drivername]) db_for_update.initialize(SCHEME_SPECIALIZED_FOR_UPDATE.get(parsed_write_uri.drivername, real_for_update)) read_slave_uri = config_object.get('DB_READ_SLAVE_URI', None) if read_slave_uri is not None: read_slave.initialize(_db_from_url(read_slave_uri, db_kwargs)) def _db_transaction(): return config_object['DB_TRANSACTION_FACTORY'](db) db_transaction.initialize(_db_transaction) def random_string_generator(length=16): def random_string(): random = SystemRandom() return ''.join([random.choice(string.ascii_uppercase + string.digits) for _ in range(length)]) return random_string def uuid_generator(): return str(uuid.uuid4()) get_epoch_timestamp = lambda: int(time.time()) def close_db_filter(_): if not db.is_closed(): logger.debug('Disconnecting from database.') db.close() if read_slave.obj is not None and not read_slave.is_closed(): logger.debug('Disconnecting from read slave.') read_slave.close() class QuayUserField(ForeignKeyField): def __init__(self, allows_robots=False, robot_null_delete=False, *args, **kwargs): self.allows_robots = allows_robots self.robot_null_delete = robot_null_delete if not 'rel_model' in kwargs: kwargs['rel_model'] = User super(QuayUserField, self).__init__(*args, **kwargs) class BaseModel(ReadSlaveModel): class Meta: database = db read_slaves = (read_slave,) def __getattribute__(self, name): """ Adds _id accessors so that foreign key field IDs can be looked up without making a database roundtrip. """ if name.endswith('_id'): field_name = name[0:len(name) - 3] if field_name in self._meta.fields: return self._data.get(field_name) return super(BaseModel, self).__getattribute__(name) class User(BaseModel): uuid = CharField(default=uuid_generator, max_length=36, null=True) username = CharField(unique=True, index=True) password_hash = CharField(null=True) email = CharField(unique=True, index=True, default=random_string_generator(length=64)) verified = BooleanField(default=False) stripe_id = CharField(index=True, null=True) organization = BooleanField(default=False, index=True) robot = BooleanField(default=False, index=True) invoice_email = BooleanField(default=False) invalid_login_attempts = IntegerField(default=0) last_invalid_login = DateTimeField(default=datetime.utcnow) removed_tag_expiration_s = IntegerField(default=1209600) # Two weeks enabled = BooleanField(default=True) def delete_instance(self, recursive=False, delete_nullable=False): # If we are deleting a robot account, only execute the subset of queries necessary. if self.robot: # For all the model dependencies, only delete those that allow robots. for query, fk in reversed(list(self.dependencies(search_nullable=True))): if isinstance(fk, QuayUserField) and fk.allows_robots: model = fk.model_class if fk.robot_null_delete: model.update(**{fk.name: None}).where(query).execute() else: model.delete().where(query).execute() # Delete the instance itself. super(User, self).delete_instance(recursive=False, delete_nullable=False) else: super(User, self).delete_instance(recursive=recursive, delete_nullable=delete_nullable) Namespace = User.alias() class TeamRole(BaseModel): name = CharField(index=True) class Team(BaseModel): name = CharField(index=True) organization = QuayUserField(index=True) role = ForeignKeyField(TeamRole) description = TextField(default='') class Meta: database = db read_slaves = (read_slave,) indexes = ( # A team name must be unique within an organization (('name', 'organization'), True), ) class TeamMember(BaseModel): user = QuayUserField(allows_robots=True, index=True) team = ForeignKeyField(Team, index=True) class Meta: database = db read_slaves = (read_slave,) indexes = ( # A user may belong to a team only once (('user', 'team'), True), ) class TeamMemberInvite(BaseModel): # Note: Either user OR email will be filled in, but not both. user = QuayUserField(index=True, null=True) email = CharField(null=True) team = ForeignKeyField(Team, index=True) inviter = ForeignKeyField(User, related_name='inviter') invite_token = CharField(default=urn_generator(['teaminvite'])) class LoginService(BaseModel): name = CharField(unique=True, index=True) class FederatedLogin(BaseModel): user = QuayUserField(allows_robots=True, index=True) service = ForeignKeyField(LoginService, index=True) service_ident = CharField() metadata_json = TextField(default='{}') class Meta: database = db read_slaves = (read_slave,) indexes = ( # create a unique index on service and the local service id (('service', 'service_ident'), True), # a user may only have one federated login per service (('service', 'user'), True), ) class Visibility(BaseModel): name = CharField(index=True, unique=True) class Repository(BaseModel): namespace_user = QuayUserField(null=True) name = CharField() visibility = ForeignKeyField(Visibility) description = TextField(null=True) badge_token = CharField(default=uuid_generator) class Meta: database = db read_slaves = (read_slave,) indexes = ( # create a unique index on namespace and name (('namespace_user', 'name'), True), ) def delete_instance(self, recursive=False, delete_nullable=False): if not recursive: raise RuntimeError('Non-recursive delete on repository.') # These models don't need to use transitive deletes, because the referenced objects # are cleaned up directly skip_transitive_deletes = {RepositoryTag, RepositoryBuild, RepositoryBuildTrigger, BlobUpload, Image} # We need to sort the ops so that models get cleaned in order of their dependencies ops = reversed(list(self.dependencies(delete_nullable))) filtered_ops = [] dependencies = defaultdict(set) for query, fk in ops: # We only want to skip transitive deletes, which are done using subqueries in the form of # DELETE FROM in . If an op is not using a subquery, we allow it to be # applied directly. if fk.model_class not in skip_transitive_deletes or query.op != 'in': filtered_ops.append((query, fk)) if query.op == 'in': dependencies[fk.model_class.__name__].add(query.rhs.model_class.__name__) elif query.op == '=': dependencies[fk.model_class.__name__].add(Repository.__name__) else: raise RuntimeError('Unknown operator in recursive repository delete query') sorted_models = list(reversed(toposort.toposort_flatten(dependencies))) def sorted_model_key(query_fk_tuple): cmp_query, cmp_fk = query_fk_tuple if cmp_query.op == 'in': return -1 return sorted_models.index(cmp_fk.model_class.__name__) filtered_ops.sort(key=sorted_model_key) with db_transaction(): for query, fk in filtered_ops: model = fk.model_class if fk.null and not delete_nullable: model.update(**{fk.name: None}).where(query).execute() else: model.delete().where(query).execute() return self.delete().where(self._pk_expr()).execute() class Star(BaseModel): user = ForeignKeyField(User, index=True) repository = ForeignKeyField(Repository, index=True) created = DateTimeField(default=datetime.now) class Meta: database = db read_slaves = (read_slave,) indexes = ( # create a unique index on user and repository (('user', 'repository'), True), ) class Role(BaseModel): name = CharField(index=True, unique=True) class RepositoryPermission(BaseModel): team = ForeignKeyField(Team, index=True, null=True) user = QuayUserField(allows_robots=True, index=True, null=True) repository = ForeignKeyField(Repository, index=True) role = ForeignKeyField(Role) class Meta: database = db read_slaves = (read_slave,) indexes = ( (('team', 'repository'), True), (('user', 'repository'), True), ) class PermissionPrototype(BaseModel): org = QuayUserField(index=True, related_name='orgpermissionproto') uuid = CharField(default=uuid_generator) activating_user = QuayUserField(allows_robots=True, index=True, null=True, related_name='userpermissionproto') delegate_user = QuayUserField(allows_robots=True,related_name='receivingpermission', null=True) delegate_team = ForeignKeyField(Team, related_name='receivingpermission', null=True) role = ForeignKeyField(Role) class Meta: database = db read_slaves = (read_slave,) indexes = ( (('org', 'activating_user'), False), ) class AccessTokenKind(BaseModel): name = CharField(unique=True, index=True) class AccessToken(BaseModel): friendly_name = CharField(null=True) code = CharField(default=random_string_generator(length=64), unique=True, index=True) repository = ForeignKeyField(Repository) created = DateTimeField(default=datetime.now) role = ForeignKeyField(Role) temporary = BooleanField(default=True) kind = ForeignKeyField(AccessTokenKind, null=True) class BuildTriggerService(BaseModel): name = CharField(index=True, unique=True) class RepositoryBuildTrigger(BaseModel): uuid = CharField(default=uuid_generator) service = ForeignKeyField(BuildTriggerService, index=True) repository = ForeignKeyField(Repository, index=True) connected_user = QuayUserField() auth_token = CharField(null=True) private_key = TextField(null=True) config = TextField(default='{}') write_token = ForeignKeyField(AccessToken, null=True) pull_robot = QuayUserField(allows_robots=True, null=True, related_name='triggerpullrobot', robot_null_delete=True) class EmailConfirmation(BaseModel): code = CharField(default=random_string_generator(), unique=True, index=True) user = QuayUserField() pw_reset = BooleanField(default=False) new_email = CharField(null=True) email_confirm = BooleanField(default=False) created = DateTimeField(default=datetime.now) class ImageStorage(BaseModel): uuid = CharField(default=uuid_generator, index=True, unique=True) image_size = BigIntegerField(null=True) uncompressed_size = BigIntegerField(null=True) uploading = BooleanField(default=True, null=True) cas_path = BooleanField(default=True) content_checksum = CharField(null=True, index=True) class ImageStorageTransformation(BaseModel): name = CharField(index=True, unique=True) class ImageStorageSignatureKind(BaseModel): name = CharField(index=True, unique=True) class ImageStorageSignature(BaseModel): storage = ForeignKeyField(ImageStorage, index=True) kind = ForeignKeyField(ImageStorageSignatureKind) signature = TextField(null=True) uploading = BooleanField(default=True, null=True) class Meta: database = db read_slaves = (read_slave,) indexes = ( (('kind', 'storage'), True), ) class ImageStorageLocation(BaseModel): name = CharField(unique=True, index=True) class ImageStoragePlacement(BaseModel): storage = ForeignKeyField(ImageStorage) location = ForeignKeyField(ImageStorageLocation) class Meta: database = db read_slaves = (read_slave,) indexes = ( # An image can only be placed in the same place once (('storage', 'location'), True), ) class UserRegion(BaseModel): user = QuayUserField(index=True, allows_robots=False) location = ForeignKeyField(ImageStorageLocation) indexes = ( (('user', 'location'), True), ) _ImageProxy = Proxy() class Image(BaseModel): # This class is intentionally denormalized. Even though images are supposed # to be globally unique we can't treat them as such for permissions and # security reasons. So rather than Repository <-> Image being many to many # each image now belongs to exactly one repository. docker_image_id = CharField(index=True) repository = ForeignKeyField(Repository) # '/' separated list of ancestory ids, e.g. /1/2/6/7/10/ ancestors = CharField(index=True, default='/', max_length=64535, null=True) storage = ForeignKeyField(ImageStorage, index=True, null=True) created = DateTimeField(null=True) comment = TextField(null=True) command = TextField(null=True) aggregate_size = BigIntegerField(null=True) v1_json_metadata = TextField(null=True) v1_checksum = CharField(null=True) security_indexed = BooleanField(default=False, index=True) security_indexed_engine = IntegerField(default=-1, index=True) # We use a proxy here instead of 'self' in order to disable the foreign key constraint parent = ForeignKeyField(_ImageProxy, index=True, null=True, related_name='children') class Meta: database = db read_slaves = (read_slave,) indexes = ( # we don't really want duplicates (('repository', 'docker_image_id'), True), (('security_indexed_engine', 'security_indexed'), False), ) _ImageProxy.initialize(Image) class DerivedStorageForImage(BaseModel): source_image = ForeignKeyField(Image) derivative = ForeignKeyField(ImageStorage) transformation = ForeignKeyField(ImageStorageTransformation) class Meta: database = db read_slaves = (read_slave,) indexes = ( (('source_image', 'transformation'), True), ) class RepositoryTag(BaseModel): name = CharField() image = ForeignKeyField(Image) repository = ForeignKeyField(Repository) lifetime_start_ts = IntegerField(default=get_epoch_timestamp) lifetime_end_ts = IntegerField(null=True, index=True) hidden = BooleanField(default=False) reversion = BooleanField(default=False) class Meta: database = db read_slaves = (read_slave,) indexes = ( (('repository', 'name'), False), # This unique index prevents deadlocks when concurrently moving and deleting tags (('repository', 'name', 'lifetime_end_ts'), True), ) class TagManifest(BaseModel): tag = ForeignKeyField(RepositoryTag, index=True, unique=True) digest = CharField(index=True, unique=True) json_data = TextField() class BUILD_PHASE(object): """ Build phases enum """ ERROR = 'error' INTERNAL_ERROR = 'internalerror' BUILD_SCHEDULED = 'build-scheduled' UNPACKING = 'unpacking' PULLING = 'pulling' BUILDING = 'building' PUSHING = 'pushing' WAITING = 'waiting' COMPLETE = 'complete' class QueueItem(BaseModel): queue_name = CharField(index=True, max_length=1024) body = TextField() available_after = DateTimeField(default=datetime.utcnow, index=True) available = BooleanField(default=True, index=True) processing_expires = DateTimeField(null=True, index=True) retries_remaining = IntegerField(default=5, index=True) class RepositoryBuild(BaseModel): uuid = CharField(default=uuid_generator, index=True) repository = ForeignKeyField(Repository, index=True) access_token = ForeignKeyField(AccessToken) resource_key = CharField(index=True, null=True) job_config = TextField() phase = CharField(default=BUILD_PHASE.WAITING) started = DateTimeField(default=datetime.now) display_name = CharField() trigger = ForeignKeyField(RepositoryBuildTrigger, null=True, index=True) pull_robot = QuayUserField(null=True, related_name='buildpullrobot', allows_robots=True, robot_null_delete=True) logs_archived = BooleanField(default=False) queue_id = CharField(null=True, index=True) class LogEntryKind(BaseModel): name = CharField(index=True, unique=True) class LogEntry(BaseModel): kind = ForeignKeyField(LogEntryKind, index=True) account = QuayUserField(index=True, related_name='account') performer = QuayUserField(allows_robots=True, index=True, null=True, related_name='performer', robot_null_delete=True) repository = ForeignKeyField(Repository, index=True, null=True) datetime = DateTimeField(default=datetime.now, index=True) ip = CharField(null=True) metadata_json = TextField(default='{}') class Meta: database = db read_slaves = (read_slave,) indexes = ( # create an index on repository and date (('repository', 'datetime'), False), # create an index on repository, date and kind (('repository', 'datetime', 'kind'), False), ) class RepositoryActionCount(BaseModel): repository = ForeignKeyField(Repository, index=True) count = IntegerField() date = DateField(index=True) class Meta: database = db read_slaves = (read_slave,) indexes = ( # create a unique index on repository and date (('repository', 'date'), True), ) class OAuthApplication(BaseModel): client_id = CharField(index=True, default=random_string_generator(length=20)) client_secret = CharField(default=random_string_generator(length=40)) redirect_uri = CharField() application_uri = CharField() organization = QuayUserField() name = CharField() description = TextField(default='') avatar_email = CharField(null=True, db_column='gravatar_email') class OAuthAuthorizationCode(BaseModel): application = ForeignKeyField(OAuthApplication) code = CharField(index=True) scope = CharField() data = TextField() # Context for the code, such as the user class OAuthAccessToken(BaseModel): uuid = CharField(default=uuid_generator, index=True) application = ForeignKeyField(OAuthApplication) authorized_user = QuayUserField() scope = CharField() access_token = CharField(index=True) token_type = CharField(default='Bearer') expires_at = DateTimeField() refresh_token = CharField(index=True, null=True) data = TextField() # This is context for which this token was generated, such as the user class NotificationKind(BaseModel): name = CharField(index=True, unique=True) class Notification(BaseModel): uuid = CharField(default=uuid_generator, index=True) kind = ForeignKeyField(NotificationKind, index=True) target = QuayUserField(index=True) metadata_json = TextField(default='{}') created = DateTimeField(default=datetime.now, index=True) dismissed = BooleanField(default=False) class ExternalNotificationEvent(BaseModel): name = CharField(index=True, unique=True) class ExternalNotificationMethod(BaseModel): name = CharField(index=True, unique=True) class RepositoryNotification(BaseModel): uuid = CharField(default=uuid_generator, index=True) repository = ForeignKeyField(Repository, index=True) event = ForeignKeyField(ExternalNotificationEvent) method = ForeignKeyField(ExternalNotificationMethod) title = CharField(null=True) config_json = TextField() event_config_json = TextField(default='{}') class RepositoryAuthorizedEmail(BaseModel): repository = ForeignKeyField(Repository, index=True) email = CharField() code = CharField(default=random_string_generator(), unique=True, index=True) confirmed = BooleanField(default=False) class Meta: database = db read_slaves = (read_slave,) indexes = ( # create a unique index on email and repository (('email', 'repository'), True), ) class BlobUpload(BaseModel): repository = ForeignKeyField(Repository, index=True) uuid = CharField(index=True, unique=True) byte_count = IntegerField(default=0) sha_state = ResumableSHAField(null=True, default=resumablehashlib.sha256) location = ForeignKeyField(ImageStorageLocation) storage_metadata = JSONField(null=True, default={}) class Meta: database = db read_slaves = (read_slave,) indexes = ( # create a unique index on email and repository (('repository', 'uuid'), True), ) class QuayService(BaseModel): name = CharField(index=True, unique=True) class QuayRegion(BaseModel): name = CharField(index=True, unique=True) class QuayRelease(BaseModel): service = ForeignKeyField(QuayService) version = CharField() region = ForeignKeyField(QuayRegion) reverted = BooleanField(default=False) created = DateTimeField(default=datetime.now, index=True) class Meta: database = db read_slaves = (read_slave,) indexes = ( # unique release per region (('service', 'version', 'region'), True), # get recent releases (('service', 'region', 'created'), False), ) all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission, Visibility, RepositoryTag, EmailConfirmation, FederatedLogin, LoginService, QueueItem, RepositoryBuild, Team, TeamMember, TeamRole, LogEntryKind, LogEntry, PermissionPrototype, ImageStorage, BuildTriggerService, RepositoryBuildTrigger, OAuthApplication, OAuthAuthorizationCode, OAuthAccessToken, NotificationKind, Notification, ImageStorageLocation, ImageStoragePlacement, ExternalNotificationEvent, ExternalNotificationMethod, RepositoryNotification, RepositoryAuthorizedEmail, ImageStorageTransformation, TeamMemberInvite, ImageStorageSignature, ImageStorageSignatureKind, AccessTokenKind, Star, RepositoryActionCount, TagManifest, UserRegion, QuayService, QuayRegion, QuayRelease, BlobUpload, DerivedStorageForImage]