This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/api/superuser_models_interface.py

444 lines
11 KiB
Python
Raw Normal View History

import json
from abc import ABCMeta, abstractmethod
from collections import namedtuple
from datetime import datetime
from dateutil.relativedelta import relativedelta
from six import add_metaclass
from tzlocal import get_localzone
from app import avatar, superusers
from buildtrigger.basehandler import BuildTriggerHandler
from data import model
from endpoints.api import format_date
from util.morecollections import AttrDict
def user_view(user):
return {
'name': user.username,
'kind': 'user',
'is_robot': user.robot,
}
class BuildTrigger(
namedtuple('BuildTrigger', ['uuid', 'service_name', 'pull_robot', 'can_read', 'can_admin', 'for_build'])):
"""
BuildTrigger represent a trigger that is associated with a build
:type uuid: string
:type service_name: string
:type pull_robot: User
:type can_read: boolean
:type can_admin: boolean
:type for_build: boolean
"""
def to_dict(self):
if not self.uuid:
return None
build_trigger = BuildTriggerHandler.get_handler(self)
build_source = build_trigger.config.get('build_source')
repo_url = build_trigger.get_repository_url() if build_source else None
can_read = self.can_read or self.can_admin
trigger_data = {
'id': self.uuid,
'service': self.service_name,
'is_active': build_trigger.is_active(),
'build_source': build_source if can_read else None,
'repository_url': repo_url if can_read else None,
'config': build_trigger.config if self.can_admin else {},
'can_invoke': self.can_admin,
}
if not self.for_build and self.can_admin and self.pull_robot:
trigger_data['pull_robot'] = user_view(self.pull_robot)
return trigger_data
class RepositoryBuild(namedtuple('RepositoryBuild',
['uuid', 'logs_archived', 'repository_namespace_user_username', 'repository_name',
'can_write', 'can_read', 'pull_robot', 'resource_key', 'trigger', 'display_name',
'started', 'job_config', 'phase', 'status', 'error', 'archive_url'])):
"""
RepositoryBuild represents a build associated with a repostiory
:type uuid: string
:type logs_archived: boolean
:type repository_namespace_user_username: string
:type repository_name: string
:type can_write: boolean
:type can_write: boolean
:type pull_robot: User
:type resource_key: string
:type trigger: Trigger
:type display_name: string
:type started: boolean
:type job_config: {Any -> Any}
:type phase: string
:type status: string
:type error: string
:type archive_url: string
"""
def to_dict(self):
resp = {
'id': self.uuid,
'phase': self.phase,
'started': format_date(self.started),
'display_name': self.display_name,
'status': self.status or {},
'subdirectory': self.job_config.get('build_subdir', ''),
'dockerfile_path': self.job_config.get('build_subdir', ''),
'context': self.job_config.get('context', ''),
'tags': self.job_config.get('docker_tags', []),
'manual_user': self.job_config.get('manual_user', None),
'is_writer': self.can_write,
'trigger': self.trigger.to_dict(),
'trigger_metadata': self.job_config.get('trigger_metadata', None) if self.can_read else None,
'resource_key': self.resource_key,
'pull_robot': user_view(self.pull_robot) if self.pull_robot else None,
'repository': {
'namespace': self.repository_namespace_user_username,
'name': self.repository_name
},
'error': self.error,
}
if self.can_write:
if self.resource_key is not None:
resp['archive_url'] = self.archive_url
elif self.job_config.get('archive_url', None):
resp['archive_url'] = self.job_config['archive_url']
return resp
class Approval(namedtuple('Approval', ['approver', 'approval_type', 'approved_date', 'notes'])):
"""
Approval represents whether a key has been approved or not
:type approver: User
:type approval_type: string
:type approved_date: Date
:type notes: string
"""
def to_dict(self):
return {
'approver': self.approver.to_dict() if self.approver else None,
'approval_type': self.approval_type,
'approved_date': self.approved_date,
'notes': self.notes,
}
class ServiceKey(namedtuple('ServiceKey', ['name', 'kid', 'service', 'jwk', 'metadata', 'created_date',
'expiration_date', 'rotation_duration', 'approval'])):
"""
ServiceKey is an apostille signing key
:type name: string
:type kid: int
:type service: string
:type jwk: string
:type metadata: string
:type created_date: Date
:type expiration_date: Date
:type rotation_duration: Date
:type approval: Approval
"""
def to_dict(self):
return {
'name': self.name,
'kid': self.kid,
'service': self.service,
'jwk': self.jwk,
'metadata': self.metadata,
'created_date': self.created_date,
'expiration_date': self.expiration_date,
'rotation_duration': self.rotation_duration,
'approval': self.approval.to_dict() if self.approval is not None else None,
}
class User(namedtuple('User', ['username', 'email', 'verified', 'enabled', 'robot'])):
"""
User represents a single user.
:type username: string
:type email: string
:type verified: boolean
:type enabled: boolean
:type robot: User
"""
def to_dict(self):
user_data = {
'kind': 'user',
'name': self.username,
'username': self.username,
'email': self.email,
'verified': self.verified,
'avatar': avatar.get_data_for_user(self),
'super_user': superusers.is_superuser(self.username),
'enabled': self.enabled,
}
return user_data
class Organization(namedtuple('Organization', ['username', 'email'])):
"""
Organization represents a single org.
:type username: string
:type email: string
"""
def to_dict(self):
return {
'name': self.username,
'email': self.email,
'avatar': avatar.get_data_for_org(self),
}
class LogEntry(
namedtuple('LogEntry', [
'metadata_json', 'ip', 'datetime', 'performer_email', 'performer_username', 'performer_robot',
'account_organization', 'account_username', 'account_email', 'account_robot', 'kind',
])):
"""
LogEntry a single log entry.
:type metadata_json: string
:type ip: string
:type datetime: string
:type performer_email: int
:type performer_username: string
:type performer_robot: boolean
:type account_organization: boolean
:type account_username: string
:type account_email: string
:type account_robot: boolean
:type kind_id: int
"""
def to_dict(self):
view = {
'kind': self.kind,
'metadata': json.loads(self.metadata_json),
'ip': self.ip,
'datetime': format_date(self.datetime),
}
if self.performer_username:
performer = AttrDict({'username': self.performer_username, 'email': self.performer_email})
performer.robot = None
if self.performer_robot:
performer.robot = self.performer_robot
view['performer'] = {
'kind': 'user',
'name': self.performer_username,
'is_robot': self.performer_robot,
'avatar': avatar.get_data_for_user(performer),
}
if self.account_username:
account = AttrDict({'username': self.account_username, 'email': self.account_email})
if self.account_organization:
view['namespace'] = {
'kind': 'org',
'name': self.account_username,
'avatar': avatar.get_data_for_org(account),
}
else:
account.robot = None
if self.account_robot:
account.robot = self.account_robot
view['namespace'] = {
'kind': 'user',
'name': self.account_username,
'avatar': avatar.get_data_for_user(account),
}
return view
class LogEntryPage(
namedtuple('LogEntryPage', ['logs', 'next_page_token'])):
"""
LogEntryPage represents a single page of logs.
:type logs: [LogEntry]
:type next_page_token: {any -> any}
"""
class AggregatedLogEntry(
namedtuple('AggregatedLogEntry', ['count', 'kind_id', 'day', 'start_time'])):
"""
AggregatedLogEntry represents an aggregated view of logs.
:type count: int
:type kind_id: int
:type day: string
:type start_time: Date
"""
def to_dict(self):
synthetic_date = datetime(self.start_time.year, self.start_time.month, int(self.day), tzinfo=get_localzone())
if synthetic_date.day < self.start_time.day:
synthetic_date = synthetic_date + relativedelta(months=1)
kinds = model.log.get_log_entry_kinds()
view = {
'kind': kinds[self.kind_id],
'count': self.count,
'datetime': format_date(synthetic_date),
}
return view
@add_metaclass(ABCMeta)
class SuperuserDataInterface(object):
"""
Interface that represents all data store interactions required by a superuser api.
"""
@abstractmethod
def get_logs_query(self, start_time, end_time, page_token=None):
"""
Returns a LogEntryPage.
"""
@abstractmethod
def get_aggregated_logs(self, start_time, end_time):
"""
Returns a list of AggregatedLogEntry
"""
@abstractmethod
def get_organizations(self):
"""
Returns a list of Organization
"""
@abstractmethod
def get_active_users(self):
"""
Returns a list of User
"""
@abstractmethod
def create_install_user(self, username, password, email):
"""
Returns the created user and confirmation code for email confirmation
"""
@abstractmethod
def get_nonrobot_user(self, username):
"""
Returns a User
"""
@abstractmethod
def create_reset_password_email_code(self, email):
"""
Returns a recover password code
"""
@abstractmethod
def mark_user_for_deletion(self, username):
"""
Returns None
"""
@abstractmethod
def change_password(self, username, password):
"""
Returns None
"""
@abstractmethod
def update_email(self, username, email, auto_verify):
"""
Returns None
"""
@abstractmethod
def update_enabled(self, username, enabled):
"""
Returns None
"""
@abstractmethod
def take_ownership(self, namespace, authed_user):
"""
Returns id of entity and whether the entity was a user
"""
@abstractmethod
def mark_organization_for_deletion(self, name):
"""
Returns None
"""
@abstractmethod
def change_organization_name(self, old_org_name, new_org_name):
"""
Returns updated Organization
"""
@abstractmethod
def list_all_service_keys(self):
"""
Returns a list of service keys
"""
@abstractmethod
def generate_service_key(self, service, expiration_date, kid=None, name='', metadata=None, rotation_duration=None):
"""
Returns a tuple of private key and public key id
"""
@abstractmethod
def approve_service_key(self, kid, approver, approval_type, notes=''):
"""
Returns the approved Key
"""
@abstractmethod
def get_service_key(self, kid, service=None, alive_only=True, approved_only=True):
"""
Returns ServiceKey
"""
@abstractmethod
def set_key_expiration(self, kid, expiration_date):
"""
Returns None
"""
@abstractmethod
def update_service_key(self, kid, name=None, metadata=None):
"""
Returns None
"""
@abstractmethod
def delete_service_key(self, kid):
"""
Returns deleted ServiceKey
"""
@abstractmethod
def get_repository_build(self, uuid):
"""
Returns RepositoryBuild
"""