This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.

123 lines
3.3 KiB
Raw Normal View History

import json
import logging
import uuid
from abc import ABCMeta, abstractmethod
from datetime import datetime
from six import add_metaclass
from alembic import op
from sqlalchemy import text
from util.abchelpers import nooper
logger = logging.getLogger(__name__)
def escape_table_name(table_name):
if op.get_bind() == 'postgresql':
# Needed for the `user` table.
return '"%s"' % table_name
return table_name
class DataTypes(object):
def DateTime():
def Date():
def String():
return 'somestringvalue'
def UTF8Char():
return 'some other value'
def UUID():
return str(uuid.uuid4())
def JSON():
return json.dumps(dict(foo='bar', baz='meh'))
def Boolean():
if op.get_bind() == 'postgresql':
return True
return 1
def BigInteger():
return 21474836470
def Integer():
return 42
def Foreign(table_name):
def get_index():
result = op.get_bind().execute("SELECT id FROM %s LIMIT 1" % escape_table_name(table_name))
return list(result)[0][0]
except IndexError:
raise Exception('Could not find row for table %s' % table_name)
return get_index
class MigrationTester(object):
""" Implements an interface for adding testing capabilities to the
data model migration system in Alembic.
TestDataType = DataTypes
def populate_table(self, table_name, fields):
""" Called to populate a table with the given fields filled in with testing data. """
def populate_column(self, table_name, col_name, field_type):
""" Called to populate a column in a table to be filled in with testing data. """
class NoopTester(MigrationTester):
""" No-op version of the tester, designed for production workloads. """
class PopulateTestDataTester(MigrationTester):
def populate_table(self, table_name, fields):
columns = {field_name: field_type() for field_name, field_type in fields}
field_name_vars = [':' + field_name for field_name, _ in fields]
if op.get_bind() == 'postgresql':
field_names = ["%s" % field_name for field_name, _ in fields]
field_names = ["`%s`" % field_name for field_name, _ in fields]
table_name = escape_table_name(table_name)
query = text('INSERT INTO %s (%s) VALUES (%s)' % (table_name, ', '.join(field_names),
', '.join(field_name_vars)))"Executing test query %s with values %s", query, columns.values())
op.get_bind().execute(query, **columns)
def populate_column(self, table_name, col_name, field_type):
col_value = field_type()
row_id = DataTypes.Foreign(table_name)()
table_name = escape_table_name(table_name)
update_text = text("UPDATE %s SET %s=:col_value where ID=:row_id" % (table_name, col_name))"Executing test query %s with value %s on row %s", update_text, col_value, row_id)
op.get_bind().execute(update_text, col_value=col_value, row_id=row_id)