Merge master in delta
This commit is contained in:
commit
48949627e0
105 changed files with 3330 additions and 1758 deletions
|
@ -29,6 +29,16 @@ SCHEME_RANDOM_FUNCTION = {
|
|||
'postgresql+psycopg2': fn.Random,
|
||||
}
|
||||
|
||||
def real_for_update(query):
|
||||
return query.for_update()
|
||||
|
||||
def null_for_update(query):
|
||||
return query
|
||||
|
||||
SCHEME_SPECIALIZED_FOR_UPDATE = {
|
||||
'sqlite': null_for_update,
|
||||
}
|
||||
|
||||
class CallableProxy(Proxy):
|
||||
def __call__(self, *args, **kwargs):
|
||||
if self.obj is None:
|
||||
|
@ -68,6 +78,15 @@ class UseThenDisconnect(object):
|
|||
db = Proxy()
|
||||
read_slave = Proxy()
|
||||
db_random_func = CallableProxy()
|
||||
db_for_update = CallableProxy()
|
||||
|
||||
|
||||
def validate_database_url(url, connect_timeout=5):
|
||||
driver = _db_from_url(url, {
|
||||
'connect_timeout': connect_timeout
|
||||
})
|
||||
driver.connect()
|
||||
driver.close()
|
||||
|
||||
|
||||
def _db_from_url(url, db_kwargs):
|
||||
|
@ -82,6 +101,10 @@ def _db_from_url(url, db_kwargs):
|
|||
if parsed_url.password:
|
||||
db_kwargs['password'] = parsed_url.password
|
||||
|
||||
# Note: sqlite does not support connect_timeout.
|
||||
if parsed_url.drivername == 'sqlite' and 'connect_timeout' in db_kwargs:
|
||||
del db_kwargs['connect_timeout']
|
||||
|
||||
return SCHEME_DRIVERS[parsed_url.drivername](parsed_url.database, **db_kwargs)
|
||||
|
||||
|
||||
|
@ -93,6 +116,8 @@ def configure(config_object):
|
|||
|
||||
parsed_write_uri = make_url(write_db_uri)
|
||||
db_random_func.initialize(SCHEME_RANDOM_FUNCTION[parsed_write_uri.drivername])
|
||||
db_for_update.initialize(SCHEME_SPECIALIZED_FOR_UPDATE.get(parsed_write_uri.drivername,
|
||||
real_for_update))
|
||||
|
||||
read_slave_uri = config_object.get('DB_READ_SLAVE_URI', None)
|
||||
if read_slave_uri is not None:
|
||||
|
@ -122,8 +147,9 @@ def close_db_filter(_):
|
|||
|
||||
|
||||
class QuayUserField(ForeignKeyField):
|
||||
def __init__(self, allows_robots=False, *args, **kwargs):
|
||||
def __init__(self, allows_robots=False, robot_null_delete=False, *args, **kwargs):
|
||||
self.allows_robots = allows_robots
|
||||
self.robot_null_delete = robot_null_delete
|
||||
if not 'rel_model' in kwargs:
|
||||
kwargs['rel_model'] = User
|
||||
|
||||
|
@ -157,7 +183,11 @@ class User(BaseModel):
|
|||
for query, fk in self.dependencies(search_nullable=True):
|
||||
if isinstance(fk, QuayUserField) and fk.allows_robots:
|
||||
model = fk.model_class
|
||||
model.delete().where(query).execute()
|
||||
|
||||
if fk.robot_null_delete:
|
||||
model.update(**{fk.name: None}).where(query).execute()
|
||||
else:
|
||||
model.delete().where(query).execute()
|
||||
|
||||
# Delete the instance itself.
|
||||
super(User, self).delete_instance(recursive=False, delete_nullable=False)
|
||||
|
@ -352,6 +382,24 @@ class ImageStorageTransformation(BaseModel):
|
|||
name = CharField(index=True, unique=True)
|
||||
|
||||
|
||||
class ImageStorageSignatureKind(BaseModel):
|
||||
name = CharField(index=True, unique=True)
|
||||
|
||||
|
||||
class ImageStorageSignature(BaseModel):
|
||||
storage = ForeignKeyField(ImageStorage, index=True)
|
||||
kind = ForeignKeyField(ImageStorageSignatureKind)
|
||||
signature = TextField(null=True)
|
||||
uploading = BooleanField(default=True, null=True)
|
||||
|
||||
class Meta:
|
||||
database = db
|
||||
read_slaves = (read_slave,)
|
||||
indexes = (
|
||||
(('kind', 'storage'), True),
|
||||
)
|
||||
|
||||
|
||||
class DerivedImageStorage(BaseModel):
|
||||
source = ForeignKeyField(ImageStorage, null=True, related_name='source')
|
||||
derivative = ForeignKeyField(ImageStorage, related_name='derivative')
|
||||
|
@ -459,7 +507,7 @@ class LogEntry(BaseModel):
|
|||
kind = ForeignKeyField(LogEntryKind, index=True)
|
||||
account = QuayUserField(index=True, related_name='account')
|
||||
performer = QuayUserField(allows_robots=True, index=True, null=True,
|
||||
related_name='performer')
|
||||
related_name='performer', robot_null_delete=True)
|
||||
repository = ForeignKeyField(Repository, index=True, null=True)
|
||||
datetime = DateTimeField(default=datetime.now, index=True)
|
||||
ip = CharField(null=True)
|
||||
|
@ -550,4 +598,4 @@ all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission,
|
|||
Notification, ImageStorageLocation, ImageStoragePlacement,
|
||||
ExternalNotificationEvent, ExternalNotificationMethod, RepositoryNotification,
|
||||
RepositoryAuthorizedEmail, ImageStorageTransformation, DerivedImageStorage,
|
||||
TeamMemberInvite]
|
||||
TeamMemberInvite, ImageStorageSignature, ImageStorageSignatureKind]
|
||||
|
|
|
@ -2,13 +2,14 @@ set -e
|
|||
|
||||
DOCKER_IP=`echo $DOCKER_HOST | sed 's/tcp:\/\///' | sed 's/:.*//'`
|
||||
MYSQL_CONFIG_OVERRIDE="{\"DB_URI\":\"mysql+pymysql://root:password@$DOCKER_IP/genschema\"}"
|
||||
PERCONA_CONFIG_OVERRIDE="{\"DB_URI\":\"mysql+pymysql://root@$DOCKER_IP/genschema\"}"
|
||||
PGSQL_CONFIG_OVERRIDE="{\"DB_URI\":\"postgresql://postgres@$DOCKER_IP/genschema\"}"
|
||||
|
||||
up_mysql() {
|
||||
# Run a SQL database on port 3306 inside of Docker.
|
||||
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=password -d mysql
|
||||
|
||||
# Sleep for 5s to get MySQL get started.
|
||||
# Sleep for 10s to get MySQL get started.
|
||||
echo 'Sleeping for 10...'
|
||||
sleep 10
|
||||
|
||||
|
@ -21,6 +22,40 @@ down_mysql() {
|
|||
docker rm mysql
|
||||
}
|
||||
|
||||
up_mariadb() {
|
||||
# Run a SQL database on port 3306 inside of Docker.
|
||||
docker run --name mariadb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=password -d mariadb
|
||||
|
||||
# Sleep for 10s to get MySQL get started.
|
||||
echo 'Sleeping for 10...'
|
||||
sleep 10
|
||||
|
||||
# Add the database to mysql.
|
||||
docker run --rm --link mariadb:mariadb mariadb sh -c 'echo "create database genschema" | mysql -h"$MARIADB_PORT_3306_TCP_ADDR" -P"$MARIADB_PORT_3306_TCP_PORT" -uroot -ppassword'
|
||||
}
|
||||
|
||||
down_mariadb() {
|
||||
docker kill mariadb
|
||||
docker rm mariadb
|
||||
}
|
||||
|
||||
up_percona() {
|
||||
# Run a SQL database on port 3306 inside of Docker.
|
||||
docker run --name percona -p 3306:3306 -d dockerfile/percona
|
||||
|
||||
# Sleep for 10s
|
||||
echo 'Sleeping for 10...'
|
||||
sleep 10
|
||||
|
||||
# Add the daabase to mysql.
|
||||
docker run --rm --link percona:percona dockerfile/percona sh -c 'echo "create database genschema" | mysql -h $PERCONA_PORT_3306_TCP_ADDR'
|
||||
}
|
||||
|
||||
down_percona() {
|
||||
docker kill percona
|
||||
docker rm percona
|
||||
}
|
||||
|
||||
up_postgres() {
|
||||
# Run a SQL database on port 5432 inside of Docker.
|
||||
docker run --name postgres -p 5432:5432 -d postgres
|
||||
|
@ -73,6 +108,26 @@ test_migrate $MYSQL_CONFIG_OVERRIDE
|
|||
set -e
|
||||
down_mysql
|
||||
|
||||
# Test via MariaDB.
|
||||
echo '> Starting MariaDB'
|
||||
up_mariadb
|
||||
|
||||
echo '> Testing Migration (mariadb)'
|
||||
set +e
|
||||
test_migrate $MYSQL_CONFIG_OVERRIDE
|
||||
set -e
|
||||
down_mariadb
|
||||
|
||||
# Test via Percona.
|
||||
echo '> Starting Percona'
|
||||
up_percona
|
||||
|
||||
echo '> Testing Migration (percona)'
|
||||
set +e
|
||||
test_migrate $PERCONA_CONFIG_OVERRIDE
|
||||
set -e
|
||||
down_percona
|
||||
|
||||
# Test via Postgres.
|
||||
echo '> Starting Postgres'
|
||||
up_postgres
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
"""mysql max index lengths
|
||||
|
||||
Revision ID: 228d1af6af1c
|
||||
Revises: 5b84373e5db
|
||||
Create Date: 2015-01-06 14:35:24.651424
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '228d1af6af1c'
|
||||
down_revision = '5b84373e5db'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import mysql
|
||||
|
||||
def upgrade(tables):
|
||||
op.drop_index('queueitem_queue_name', table_name='queueitem')
|
||||
op.create_index('queueitem_queue_name', 'queueitem', ['queue_name'], unique=False, mysql_length=767)
|
||||
|
||||
op.drop_index('image_ancestors', table_name='image')
|
||||
op.create_index('image_ancestors', 'image', ['ancestors'], unique=False, mysql_length=767)
|
||||
|
||||
def downgrade(tables):
|
||||
pass
|
|
@ -53,7 +53,7 @@ def upgrade(tables):
|
|||
op.create_index('queueitem_available', 'queueitem', ['available'], unique=False)
|
||||
op.create_index('queueitem_available_after', 'queueitem', ['available_after'], unique=False)
|
||||
op.create_index('queueitem_processing_expires', 'queueitem', ['processing_expires'], unique=False)
|
||||
op.create_index('queueitem_queue_name', 'queueitem', ['queue_name'], unique=False)
|
||||
op.create_index('queueitem_queue_name', 'queueitem', ['queue_name'], unique=False, mysql_length=767)
|
||||
op.create_table('role',
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('name', sa.String(length=255), nullable=False),
|
||||
|
@ -376,7 +376,7 @@ def upgrade(tables):
|
|||
sa.ForeignKeyConstraint(['storage_id'], ['imagestorage.id'], ),
|
||||
sa.PrimaryKeyConstraint('id')
|
||||
)
|
||||
op.create_index('image_ancestors', 'image', ['ancestors'], unique=False)
|
||||
op.create_index('image_ancestors', 'image', ['ancestors'], unique=False, mysql_length=767)
|
||||
op.create_index('image_repository_id', 'image', ['repository_id'], unique=False)
|
||||
op.create_index('image_repository_id_docker_image_id', 'image', ['repository_id', 'docker_image_id'], unique=True)
|
||||
op.create_index('image_storage_id', 'image', ['storage_id'], unique=False)
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
"""Add signature storage
|
||||
|
||||
Revision ID: 5ad999136045
|
||||
Revises: 228d1af6af1c
|
||||
Create Date: 2015-02-05 15:01:54.989573
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '5ad999136045'
|
||||
down_revision = '228d1af6af1c'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import mysql
|
||||
|
||||
def upgrade(tables):
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('imagestoragesignaturekind',
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('name', sa.String(length=255), nullable=False),
|
||||
sa.PrimaryKeyConstraint('id', name=op.f('pk_imagestoragesignaturekind'))
|
||||
)
|
||||
op.create_index('imagestoragesignaturekind_name', 'imagestoragesignaturekind', ['name'], unique=True)
|
||||
op.create_table('imagestoragesignature',
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('storage_id', sa.Integer(), nullable=False),
|
||||
sa.Column('kind_id', sa.Integer(), nullable=False),
|
||||
sa.Column('signature', sa.Text(), nullable=True),
|
||||
sa.Column('uploading', sa.Boolean(), nullable=True),
|
||||
sa.ForeignKeyConstraint(['kind_id'], ['imagestoragesignaturekind.id'], name=op.f('fk_imagestoragesignature_kind_id_imagestoragesignaturekind')),
|
||||
sa.ForeignKeyConstraint(['storage_id'], ['imagestorage.id'], name=op.f('fk_imagestoragesignature_storage_id_imagestorage')),
|
||||
sa.PrimaryKeyConstraint('id', name=op.f('pk_imagestoragesignature'))
|
||||
)
|
||||
op.create_index('imagestoragesignature_kind_id', 'imagestoragesignature', ['kind_id'], unique=False)
|
||||
op.create_index('imagestoragesignature_kind_id_storage_id', 'imagestoragesignature', ['kind_id', 'storage_id'], unique=True)
|
||||
op.create_index('imagestoragesignature_storage_id', 'imagestoragesignature', ['storage_id'], unique=False)
|
||||
### end Alembic commands ###
|
||||
|
||||
op.bulk_insert(tables.imagestoragetransformation,
|
||||
[
|
||||
{'id': 2, 'name':'aci'},
|
||||
])
|
||||
|
||||
op.bulk_insert(tables.imagestoragesignaturekind,
|
||||
[
|
||||
{'id': 1, 'name':'gpg2'},
|
||||
])
|
||||
|
||||
|
||||
def downgrade(tables):
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_table('imagestoragesignature')
|
||||
op.drop_table('imagestoragesignaturekind')
|
||||
### end Alembic commands ###
|
|
@ -0,0 +1,24 @@
|
|||
"""Convert slack webhook data
|
||||
|
||||
Revision ID: 5b84373e5db
|
||||
Revises: 1c5b738283a5
|
||||
Create Date: 2014-12-16 12:02:55.167744
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '5b84373e5db'
|
||||
down_revision = '1c5b738283a5'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
from util.migrateslackwebhook import run_slackwebhook_migration
|
||||
|
||||
|
||||
def upgrade(tables):
|
||||
run_slackwebhook_migration()
|
||||
|
||||
|
||||
def downgrade(tables):
|
||||
pass
|
|
@ -14,7 +14,8 @@ from data.database import (User, Repository, Image, AccessToken, Role, Repositor
|
|||
ExternalNotificationEvent, ExternalNotificationMethod,
|
||||
RepositoryNotification, RepositoryAuthorizedEmail, TeamMemberInvite,
|
||||
DerivedImageStorage, ImageStorageTransformation, random_string_generator,
|
||||
db, BUILD_PHASE, QuayUserField)
|
||||
db, BUILD_PHASE, QuayUserField, ImageStorageSignature,
|
||||
ImageStorageSignatureKind, validate_database_url, db_for_update)
|
||||
from peewee import JOIN_LEFT_OUTER, fn
|
||||
from util.validation import (validate_username, validate_email, validate_password,
|
||||
INVALID_PASSWORD_MESSAGE)
|
||||
|
@ -295,6 +296,9 @@ def delete_robot(robot_username):
|
|||
|
||||
|
||||
def _list_entity_robots(entity_name):
|
||||
""" Return the list of robots for the specified entity. This MUST return a query, not a
|
||||
materialized list so that callers can use db_for_update.
|
||||
"""
|
||||
return (User
|
||||
.select()
|
||||
.join(FederatedLogin)
|
||||
|
@ -903,14 +907,17 @@ def change_password(user, new_password):
|
|||
delete_notifications_by_kind(user, 'password_required')
|
||||
|
||||
|
||||
def change_username(user, new_username):
|
||||
def change_username(user_id, new_username):
|
||||
(username_valid, username_issue) = validate_username(new_username)
|
||||
if not username_valid:
|
||||
raise InvalidUsernameException('Invalid username %s: %s' % (new_username, username_issue))
|
||||
|
||||
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
||||
# Reload the user for update
|
||||
user = db_for_update(User.select().where(User.id == user_id)).get()
|
||||
|
||||
# Rename the robots
|
||||
for robot in _list_entity_robots(user.username):
|
||||
for robot in db_for_update(_list_entity_robots(user.username)):
|
||||
_, robot_shortname = parse_robot_username(robot.username)
|
||||
new_robot_name = format_robot_username(new_username, robot_shortname)
|
||||
robot.username = new_robot_name
|
||||
|
@ -1270,9 +1277,9 @@ def _find_or_link_image(existing_image, repository, username, translations, pref
|
|||
storage.locations = {placement.location.name
|
||||
for placement in storage.imagestorageplacement_set}
|
||||
|
||||
new_image = Image.create(docker_image_id=existing_image.docker_image_id,
|
||||
repository=repository, storage=storage,
|
||||
ancestors=new_image_ancestry)
|
||||
new_image = Image.create(docker_image_id=existing_image.docker_image_id,
|
||||
repository=repository, storage=storage,
|
||||
ancestors=new_image_ancestry)
|
||||
|
||||
logger.debug('Storing translation %s -> %s', existing_image.id, new_image.id)
|
||||
translations[existing_image.id] = new_image.id
|
||||
|
@ -1336,7 +1343,28 @@ def find_create_or_link_image(docker_image_id, repository, username, translation
|
|||
ancestors='/')
|
||||
|
||||
|
||||
def find_or_create_derived_storage(source, transformation_name, preferred_location):
|
||||
def find_or_create_storage_signature(storage, signature_kind):
|
||||
found = lookup_storage_signature(storage, signature_kind)
|
||||
if found is None:
|
||||
kind = ImageStorageSignatureKind.get(name=signature_kind)
|
||||
found = ImageStorageSignature.create(storage=storage, kind=kind)
|
||||
|
||||
return found
|
||||
|
||||
|
||||
def lookup_storage_signature(storage, signature_kind):
|
||||
kind = ImageStorageSignatureKind.get(name=signature_kind)
|
||||
try:
|
||||
return (ImageStorageSignature
|
||||
.select()
|
||||
.where(ImageStorageSignature.storage == storage,
|
||||
ImageStorageSignature.kind == kind)
|
||||
.get())
|
||||
except ImageStorageSignature.DoesNotExist:
|
||||
return None
|
||||
|
||||
|
||||
def find_derived_storage(source, transformation_name):
|
||||
try:
|
||||
found = (ImageStorage
|
||||
.select(ImageStorage, DerivedImageStorage)
|
||||
|
@ -1349,11 +1377,19 @@ def find_or_create_derived_storage(source, transformation_name, preferred_locati
|
|||
found.locations = {placement.location.name for placement in found.imagestorageplacement_set}
|
||||
return found
|
||||
except ImageStorage.DoesNotExist:
|
||||
logger.debug('Creating storage dervied from source: %s', source.uuid)
|
||||
trans = ImageStorageTransformation.get(name=transformation_name)
|
||||
new_storage = _create_storage(preferred_location)
|
||||
DerivedImageStorage.create(source=source, derivative=new_storage, transformation=trans)
|
||||
return new_storage
|
||||
return None
|
||||
|
||||
|
||||
def find_or_create_derived_storage(source, transformation_name, preferred_location):
|
||||
existing = find_derived_storage(source, transformation_name)
|
||||
if existing is not None:
|
||||
return existing
|
||||
|
||||
logger.debug('Creating storage dervied from source: %s', source.uuid)
|
||||
trans = ImageStorageTransformation.get(name=transformation_name)
|
||||
new_storage = _create_storage(preferred_location)
|
||||
DerivedImageStorage.create(source=source, derivative=new_storage, transformation=trans)
|
||||
return new_storage
|
||||
|
||||
|
||||
def delete_derived_storage_by_uuid(storage_uuid):
|
||||
|
@ -1422,7 +1458,7 @@ def set_image_metadata(docker_image_id, namespace_name, repository_name, created
|
|||
Image.docker_image_id == docker_image_id))
|
||||
|
||||
try:
|
||||
fetched = query.get()
|
||||
fetched = db_for_update(query).get()
|
||||
except Image.DoesNotExist:
|
||||
raise DataModelException('No image with specified id and repository')
|
||||
|
||||
|
@ -2275,11 +2311,20 @@ def delete_user(user):
|
|||
# TODO: also delete any repository data associated
|
||||
|
||||
|
||||
def check_health():
|
||||
def check_health(app_config):
|
||||
# Attempt to connect to the database first. If the DB is not responding,
|
||||
# using the validate_database_url will timeout quickly, as opposed to
|
||||
# making a normal connect which will just hang (thus breaking the health
|
||||
# check).
|
||||
try:
|
||||
validate_database_url(app_config['DB_URI'], connect_timeout=3)
|
||||
except Exception:
|
||||
logger.exception('Could not connect to the database')
|
||||
return False
|
||||
|
||||
# We will connect to the db, check that it contains some log entry kinds
|
||||
try:
|
||||
found_count = LogEntryKind.select().count()
|
||||
return found_count > 0
|
||||
return bool(list(LogEntryKind.select().limit(1)))
|
||||
except:
|
||||
return False
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
from datetime import datetime, timedelta
|
||||
|
||||
from data.database import QueueItem, db
|
||||
from data.database import QueueItem, db, db_for_update
|
||||
from util.morecollections import AttrDict
|
||||
|
||||
|
||||
|
@ -31,16 +31,24 @@ class WorkQueue(object):
|
|||
QueueItem.processing_expires > now,
|
||||
QueueItem.queue_name ** name_match_query))
|
||||
|
||||
def _available_jobs(self, now, name_match_query, running_query):
|
||||
def _available_jobs(self, now, name_match_query):
|
||||
return (QueueItem
|
||||
.select()
|
||||
.where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now,
|
||||
((QueueItem.available == True) | (QueueItem.processing_expires <= now)),
|
||||
QueueItem.retries_remaining > 0, ~(QueueItem.queue_name << running_query)))
|
||||
QueueItem.retries_remaining > 0))
|
||||
|
||||
def _available_jobs_not_running(self, now, name_match_query, running_query):
|
||||
return (self
|
||||
._available_jobs(now, name_match_query)
|
||||
.where(~(QueueItem.queue_name << running_query)))
|
||||
|
||||
def _name_match_query(self):
|
||||
return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list)
|
||||
|
||||
def _item_by_id_for_update(self, queue_id):
|
||||
return db_for_update(QueueItem.select().where(QueueItem.id == queue_id)).get()
|
||||
|
||||
def update_metrics(self):
|
||||
if self._reporter is None:
|
||||
return
|
||||
|
@ -52,7 +60,7 @@ class WorkQueue(object):
|
|||
running_query = self._running_jobs(now, name_match_query)
|
||||
running_count = running_query.distinct().count()
|
||||
|
||||
avialable_query = self._available_jobs(now, name_match_query, running_query)
|
||||
avialable_query = self._available_jobs_not_running(now, name_match_query, running_query)
|
||||
available_count = avialable_query.select(QueueItem.queue_name).distinct().count()
|
||||
|
||||
self._reporter(self._currently_processing, running_count, running_count + available_count)
|
||||
|
@ -78,19 +86,26 @@ class WorkQueue(object):
|
|||
def get(self, processing_time=300):
|
||||
"""
|
||||
Get an available item and mark it as unavailable for the default of five
|
||||
minutes.
|
||||
minutes. The result of this method must always be composed of simple
|
||||
python objects which are JSON serializable for network portability reasons.
|
||||
"""
|
||||
now = datetime.utcnow()
|
||||
|
||||
name_match_query = self._name_match_query()
|
||||
|
||||
with self._transaction_factory(db):
|
||||
running = self._running_jobs(now, name_match_query)
|
||||
avail = self._available_jobs(now, name_match_query, running)
|
||||
running = self._running_jobs(now, name_match_query)
|
||||
avail = self._available_jobs_not_running(now, name_match_query, running)
|
||||
|
||||
item = None
|
||||
try:
|
||||
db_item = avail.order_by(QueueItem.id).get()
|
||||
item = None
|
||||
try:
|
||||
db_item_candidate = avail.order_by(QueueItem.id).get()
|
||||
|
||||
with self._transaction_factory(db):
|
||||
still_available_query = (db_for_update(self
|
||||
._available_jobs(now, name_match_query)
|
||||
.where(QueueItem.id == db_item_candidate.id)))
|
||||
|
||||
db_item = still_available_query.get()
|
||||
db_item.available = False
|
||||
db_item.processing_expires = now + timedelta(seconds=processing_time)
|
||||
db_item.retries_remaining -= 1
|
||||
|
@ -102,22 +117,22 @@ class WorkQueue(object):
|
|||
})
|
||||
|
||||
self._currently_processing = True
|
||||
except QueueItem.DoesNotExist:
|
||||
self._currently_processing = False
|
||||
except QueueItem.DoesNotExist:
|
||||
self._currently_processing = False
|
||||
|
||||
# Return a view of the queue item rather than an active db object
|
||||
return item
|
||||
# Return a view of the queue item rather than an active db object
|
||||
return item
|
||||
|
||||
def complete(self, completed_item):
|
||||
with self._transaction_factory(db):
|
||||
completed_item_obj = QueueItem.get(QueueItem.id == completed_item.id)
|
||||
completed_item_obj = self._item_by_id_for_update(completed_item.id)
|
||||
completed_item_obj.delete_instance()
|
||||
self._currently_processing = False
|
||||
|
||||
def incomplete(self, incomplete_item, retry_after=300, restore_retry=False):
|
||||
with self._transaction_factory(db):
|
||||
retry_date = datetime.utcnow() + timedelta(seconds=retry_after)
|
||||
incomplete_item_obj = QueueItem.get(QueueItem.id == incomplete_item.id)
|
||||
incomplete_item_obj = self._item_by_id_for_update(incomplete_item.id)
|
||||
incomplete_item_obj.available_after = retry_date
|
||||
incomplete_item_obj.available = True
|
||||
|
||||
|
@ -127,16 +142,12 @@ class WorkQueue(object):
|
|||
incomplete_item_obj.save()
|
||||
self._currently_processing = False
|
||||
|
||||
@staticmethod
|
||||
def extend_processing(queue_item_info, seconds_from_now, retry_count=None,
|
||||
minimum_extension=MINIMUM_EXTENSION):
|
||||
queue_item = QueueItem.get(QueueItem.id == queue_item_info.id)
|
||||
new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now)
|
||||
def extend_processing(self, item, seconds_from_now, minimum_extension=MINIMUM_EXTENSION):
|
||||
with self._transaction_factory(db):
|
||||
queue_item = self._item_by_id_for_update(item.id)
|
||||
new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now)
|
||||
|
||||
# Only actually write the new expiration to the db if it moves the expiration some minimum
|
||||
if new_expiration - queue_item.processing_expires > minimum_extension:
|
||||
if retry_count is not None:
|
||||
queue_item.retries_remaining = retry_count
|
||||
|
||||
queue_item.processing_expires = new_expiration
|
||||
queue_item.save()
|
||||
# Only actually write the new expiration to the db if it moves the expiration some minimum
|
||||
if new_expiration - queue_item.processing_expires > minimum_extension:
|
||||
queue_item.processing_expires = new_expiration
|
||||
queue_item.save()
|
||||
|
|
Reference in a new issue