Add support for read slave databases.
This commit is contained in:
parent
d851feef6e
commit
5645b6da32
3 changed files with 148 additions and 6 deletions
|
@ -5,6 +5,7 @@ import uuid
|
|||
from random import SystemRandom
|
||||
from datetime import datetime
|
||||
from peewee import *
|
||||
from data.read_slave import ReadSlaveModel
|
||||
from sqlalchemy.engine.url import make_url
|
||||
from urlparse import urlparse
|
||||
|
||||
|
@ -19,10 +20,10 @@ SCHEME_DRIVERS = {
|
|||
}
|
||||
|
||||
db = Proxy()
|
||||
read_slave = Proxy()
|
||||
|
||||
def configure(config_object):
|
||||
db_kwargs = dict(config_object['DB_CONNECTION_ARGS'])
|
||||
parsed_url = make_url(config_object['DB_URI'])
|
||||
def _db_from_url(url, db_kwargs):
|
||||
parsed_url = make_url(url)
|
||||
|
||||
if parsed_url.host:
|
||||
db_kwargs['host'] = parsed_url.host
|
||||
|
@ -33,8 +34,16 @@ def configure(config_object):
|
|||
if parsed_url.password:
|
||||
db_kwargs['passwd'] = parsed_url.password
|
||||
|
||||
real_db = SCHEME_DRIVERS[parsed_url.drivername](parsed_url.database, **db_kwargs)
|
||||
db.initialize(real_db)
|
||||
return SCHEME_DRIVERS[parsed_url.drivername](parsed_url.database, **db_kwargs)
|
||||
|
||||
def configure(config_object):
|
||||
db_kwargs = dict(config_object['DB_CONNECTION_ARGS'])
|
||||
write_db_uri = config_object['DB_URI']
|
||||
db.initialize(_db_from_url(write_db_uri, db_kwargs))
|
||||
|
||||
read_slave_uri = config_object.get('DB_READ_SLAVE_URI', None)
|
||||
if read_slave_uri is not None:
|
||||
read_slave.initialize(_db_from_url(read_slave_uri, db_kwargs))
|
||||
|
||||
|
||||
def random_string_generator(length=16):
|
||||
|
@ -49,9 +58,10 @@ def uuid_generator():
|
|||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
class BaseModel(Model):
|
||||
class BaseModel(ReadSlaveModel):
|
||||
class Meta:
|
||||
database = db
|
||||
read_slaves = (read_slave,)
|
||||
|
||||
|
||||
class User(BaseModel):
|
||||
|
@ -78,6 +88,7 @@ class Team(BaseModel):
|
|||
|
||||
class Meta:
|
||||
database = db
|
||||
read_slaves = (read_slave,)
|
||||
indexes = (
|
||||
# A team name must be unique within an organization
|
||||
(('name', 'organization'), True),
|
||||
|
@ -90,6 +101,7 @@ class TeamMember(BaseModel):
|
|||
|
||||
class Meta:
|
||||
database = db
|
||||
read_slaves = (read_slave,)
|
||||
indexes = (
|
||||
# A user may belong to a team only once
|
||||
(('user', 'team'), True),
|
||||
|
@ -107,6 +119,7 @@ class FederatedLogin(BaseModel):
|
|||
|
||||
class Meta:
|
||||
database = db
|
||||
read_slaves = (read_slave,)
|
||||
indexes = (
|
||||
# create a unique index on service and the local service id
|
||||
(('service', 'service_ident'), True),
|
||||
|
@ -129,6 +142,7 @@ class Repository(BaseModel):
|
|||
|
||||
class Meta:
|
||||
database = db
|
||||
read_slaves = (read_slave,)
|
||||
indexes = (
|
||||
# create a unique index on namespace and name
|
||||
(('namespace', 'name'), True),
|
||||
|
@ -147,6 +161,7 @@ class RepositoryPermission(BaseModel):
|
|||
|
||||
class Meta:
|
||||
database = db
|
||||
read_slaves = (read_slave,)
|
||||
indexes = (
|
||||
(('team', 'repository'), True),
|
||||
(('user', 'repository'), True),
|
||||
|
@ -166,6 +181,7 @@ class PermissionPrototype(BaseModel):
|
|||
|
||||
class Meta:
|
||||
database = db
|
||||
read_slaves = (read_slave,)
|
||||
indexes = (
|
||||
(('org', 'activating_user'), False),
|
||||
)
|
||||
|
@ -232,6 +248,7 @@ class ImageStoragePlacement(BaseModel):
|
|||
|
||||
class Meta:
|
||||
database = db
|
||||
read_slaves = (read_slave,)
|
||||
indexes = (
|
||||
# An image can only be placed in the same place once
|
||||
(('storage', 'location'), True),
|
||||
|
@ -253,6 +270,7 @@ class Image(BaseModel):
|
|||
|
||||
class Meta:
|
||||
database = db
|
||||
read_slaves = (read_slave,)
|
||||
indexes = (
|
||||
# we don't really want duplicates
|
||||
(('repository', 'docker_image_id'), True),
|
||||
|
@ -266,6 +284,7 @@ class RepositoryTag(BaseModel):
|
|||
|
||||
class Meta:
|
||||
database = db
|
||||
read_slaves = (read_slave,)
|
||||
indexes = (
|
||||
(('repository', 'name'), True),
|
||||
)
|
||||
|
|
56
data/read_slave.py
Normal file
56
data/read_slave.py
Normal file
|
@ -0,0 +1,56 @@
|
|||
"""
|
||||
Adapted from:
|
||||
https://github.com/coleifer/peewee/blob/master/playhouse/read_slave.py
|
||||
|
||||
Support for using a dedicated read-slave. The read database is specified as a
|
||||
Model.Meta option, and will be used for SELECT statements:
|
||||
|
||||
|
||||
master = PostgresqlDatabase('master')
|
||||
read_slave = PostgresqlDatabase('read_slave')
|
||||
|
||||
class BaseModel(ReadSlaveModel):
|
||||
class Meta:
|
||||
database = master
|
||||
read_slaves = [read_slave] # This database will be used for SELECTs.
|
||||
|
||||
|
||||
# Now define your models as you would normally.
|
||||
class User(BaseModel):
|
||||
username = CharField()
|
||||
|
||||
# To force a SELECT on the master database, you can instantiate the SelectQuery
|
||||
# by hand:
|
||||
master_select = SelectQuery(User).where(...)
|
||||
"""
|
||||
from peewee import *
|
||||
|
||||
|
||||
class ReadSlaveModel(Model):
|
||||
@classmethod
|
||||
def _get_read_database(cls):
|
||||
if (not getattr(cls._meta, 'read_slaves', None) or
|
||||
cls._meta.database.transaction_depth() > 0):
|
||||
return cls._meta.database
|
||||
current_idx = getattr(cls, '_read_slave_idx', -1)
|
||||
cls._read_slave_idx = (current_idx + 1) % len(cls._meta.read_slaves)
|
||||
selected_read_slave = cls._meta.read_slaves[cls._read_slave_idx]
|
||||
|
||||
if isinstance(selected_read_slave, Proxy) and selected_read_slave.obj is None:
|
||||
# It's possible the read slave was disabled by not initializing it
|
||||
return cls._meta.database
|
||||
|
||||
return selected_read_slave
|
||||
|
||||
@classmethod
|
||||
def select(cls, *args, **kwargs):
|
||||
query = super(ReadSlaveModel, cls).select(*args, **kwargs)
|
||||
query.database = cls._get_read_database()
|
||||
return query
|
||||
|
||||
@classmethod
|
||||
def raw(cls, *args, **kwargs):
|
||||
query = super(ReadSlaveModel, cls).raw(*args, **kwargs)
|
||||
if query._sql.lower().startswith('select'):
|
||||
query.database = cls._get_read_database()
|
||||
return query
|
67
tools/migrateimage.py
Normal file
67
tools/migrateimage.py
Normal file
|
@ -0,0 +1,67 @@
|
|||
import argparse
|
||||
import logging
|
||||
|
||||
from data import model
|
||||
from data.database import ImageStoragePlacement, ImageStorageLocation
|
||||
from app import storage
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
PATHSPECS = [
|
||||
(storage.image_json_path, True),
|
||||
(storage.image_layer_path, True),
|
||||
(storage.image_ancestry_path, True),
|
||||
(storage.image_file_trie_path, False),
|
||||
(storage.image_file_diffs_path, False),
|
||||
]
|
||||
|
||||
|
||||
def migrate_image(image, destination_location):
|
||||
logger.debug('Migrating image: %s -> %s', image.docker_image_id, destination_location.name)
|
||||
destination_location_set = {destination_location.name}
|
||||
|
||||
for path_func, required in PATHSPECS:
|
||||
path = path_func(image.storage.uuid)
|
||||
|
||||
if storage.exists(image.storage.locations, path):
|
||||
if not storage.exists(destination_location_set, path):
|
||||
logger.debug('Migrating path: %s', path)
|
||||
|
||||
with storage.stream_read_file(image.storage.locations, path) as file_to_migrate:
|
||||
storage.stream_write(destination_location_set, path, file_to_migrate)
|
||||
else:
|
||||
logger.debug('File already present in destination: %s', path)
|
||||
elif required:
|
||||
raise RuntimeError('Required file not present in image to migrate: %s', path)
|
||||
|
||||
# Successfully migrated, now write the placement
|
||||
ImageStoragePlacement.create(location=destination_location, storage=image.storage)
|
||||
|
||||
parser = argparse.ArgumentParser(description='Replicate an image storage.')
|
||||
parser.add_argument('--namespace', type=str, required=True,
|
||||
help='Namespace for the repository containing the image to be replicated')
|
||||
parser.add_argument('--repository', type=str, required=True,
|
||||
help='Name for the repository containing the image to be replicated')
|
||||
parser.add_argument('--imageid', type=str, default=None,
|
||||
help='Specific image to migrate, entire repo will be migrated if omitted')
|
||||
parser.add_argument('--to', type=str, required=True,
|
||||
help='Storage region to which the data should be replicated')
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logging.getLogger('boto').setLevel(logging.CRITICAL)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
location = ImageStorageLocation.get(name=args.to)
|
||||
|
||||
images = []
|
||||
if args.imageid is not None:
|
||||
images = [model.get_image_by_id(args.namespace, args.repository, args.imageid)]
|
||||
else:
|
||||
images = model.get_repository_images(args.namespace, args.repository)
|
||||
|
||||
for img in images:
|
||||
migrate_image(img, location)
|
Reference in a new issue