95 lines
3.4 KiB
Python
95 lines
3.4 KiB
Python
|
from sqlalchemy import (Table, MetaData, Column, ForeignKey, Integer, String, Boolean, Text,
|
||
|
DateTime, Date, BigInteger, Index, text)
|
||
|
from peewee import (PrimaryKeyField, CharField, BooleanField, DateTimeField, TextField,
|
||
|
ForeignKeyField, BigIntegerField, IntegerField, DateField)
|
||
|
|
||
|
|
||
|
OPTIONS_TO_COPY = [
|
||
|
'null',
|
||
|
'default',
|
||
|
'primary_key',
|
||
|
]
|
||
|
|
||
|
|
||
|
OPTION_TRANSLATIONS = {
|
||
|
'null': 'nullable',
|
||
|
}
|
||
|
|
||
|
def gen_sqlalchemy_metadata(peewee_model_list):
|
||
|
metadata = MetaData(naming_convention={
|
||
|
"ix": 'ix_%(column_0_label)s',
|
||
|
"uq": "uq_%(table_name)s_%(column_0_name)s",
|
||
|
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
|
||
|
"pk": "pk_%(table_name)s"
|
||
|
})
|
||
|
|
||
|
for model in peewee_model_list:
|
||
|
meta = model._meta
|
||
|
|
||
|
all_indexes = set(meta.indexes)
|
||
|
fulltext_indexes = []
|
||
|
|
||
|
columns = []
|
||
|
for field in meta.sorted_fields:
|
||
|
alchemy_type = None
|
||
|
col_args = []
|
||
|
col_kwargs = {}
|
||
|
if isinstance(field, PrimaryKeyField):
|
||
|
alchemy_type = Integer
|
||
|
elif isinstance(field, CharField):
|
||
|
alchemy_type = String(field.max_length)
|
||
|
elif isinstance(field, BooleanField):
|
||
|
alchemy_type = Boolean
|
||
|
elif isinstance(field, DateTimeField):
|
||
|
alchemy_type = DateTime
|
||
|
elif isinstance(field, DateField):
|
||
|
alchemy_type = Date
|
||
|
elif isinstance(field, TextField):
|
||
|
alchemy_type = Text
|
||
|
elif isinstance(field, ForeignKeyField):
|
||
|
alchemy_type = Integer
|
||
|
all_indexes.add(((field.name, ), field.unique))
|
||
|
if not field.deferred:
|
||
|
target_name = '%s.%s' % (field.rel_model._meta.table_name, field.rel_field.column_name)
|
||
|
col_args.append(ForeignKey(target_name))
|
||
|
elif isinstance(field, BigIntegerField):
|
||
|
alchemy_type = BigInteger
|
||
|
elif isinstance(field, IntegerField):
|
||
|
alchemy_type = Integer
|
||
|
else:
|
||
|
raise RuntimeError('Unknown column type: %s' % field)
|
||
|
|
||
|
if hasattr(field, '__fulltext__'):
|
||
|
# Add the fulltext index for the field, based on whether we are under MySQL or Postgres.
|
||
|
fulltext_indexes.append(field.name)
|
||
|
|
||
|
for option_name in OPTIONS_TO_COPY:
|
||
|
alchemy_option_name = (OPTION_TRANSLATIONS[option_name]
|
||
|
if option_name in OPTION_TRANSLATIONS else option_name)
|
||
|
if alchemy_option_name not in col_kwargs:
|
||
|
option_val = getattr(field, option_name)
|
||
|
col_kwargs[alchemy_option_name] = option_val
|
||
|
|
||
|
if field.unique or field.index:
|
||
|
all_indexes.add(((field.name, ), field.unique))
|
||
|
|
||
|
new_col = Column(field.column_name, alchemy_type, *col_args, **col_kwargs)
|
||
|
columns.append(new_col)
|
||
|
|
||
|
new_table = Table(meta.table_name, metadata, *columns)
|
||
|
|
||
|
for col_prop_names, unique in all_indexes:
|
||
|
col_names = [meta.fields[prop_name].column_name for prop_name in col_prop_names]
|
||
|
index_name = '%s_%s' % (meta.table_name, '_'.join(col_names))
|
||
|
col_refs = [getattr(new_table.c, col_name) for col_name in col_names]
|
||
|
Index(index_name, *col_refs, unique=unique)
|
||
|
|
||
|
for col_field_name in fulltext_indexes:
|
||
|
index_name = '%s_%s__fulltext' % (meta.table_name, col_field_name)
|
||
|
col_ref = getattr(new_table.c, col_field_name)
|
||
|
Index(index_name, col_ref, postgresql_ops={col_field_name: 'gin_trgm_ops'},
|
||
|
postgresql_using='gin',
|
||
|
mysql_prefix='FULLTEXT')
|
||
|
|
||
|
return metadata
|