Add a fixed number of retries to each item that gets put in the work queue.:

This commit is contained in:
yackob03 2013-11-15 15:49:26 -05:00
parent 82805cb7aa
commit f07690956d
3 changed files with 12 additions and 2 deletions

View file

@ -104,6 +104,11 @@ class Repository(BaseModel):
) )
class Webhook(BaseModel):
repository = ForeignKeyField(Repository)
parameters = TextField()
class Role(BaseModel): class Role(BaseModel):
name = CharField(index=True) name = CharField(index=True)
@ -198,6 +203,7 @@ class QueueItem(BaseModel):
available_after = DateTimeField(default=datetime.now, index=True) available_after = DateTimeField(default=datetime.now, index=True)
available = BooleanField(default=True, index=True) available = BooleanField(default=True, index=True)
processing_expires = DateTimeField(null=True, index=True) processing_expires = DateTimeField(null=True, index=True)
retries_remaining = IntegerField(default=5)
all_models = [User, Repository, Image, AccessToken, Role, all_models = [User, Repository, Image, AccessToken, Role,

View file

@ -7,7 +7,7 @@ class WorkQueue(object):
def __init__(self, queue_name): def __init__(self, queue_name):
self.queue_name = queue_name self.queue_name = queue_name
def put(self, message, available_after=0): def put(self, message, available_after=0, retries_remaining=5):
""" """
Put an item, if it shouldn't be processed for some number of seconds, Put an item, if it shouldn't be processed for some number of seconds,
specify that amount as available_after. specify that amount as available_after.
@ -16,6 +16,7 @@ class WorkQueue(object):
params = { params = {
'queue_name': self.queue_name, 'queue_name': self.queue_name,
'body': message, 'body': message,
'retries_remaining': retries_remaining,
} }
if available_after: if available_after:
@ -37,7 +38,8 @@ class WorkQueue(object):
# have one worker. # have one worker.
avail = QueueItem.select().where(QueueItem.queue_name == self.queue_name, avail = QueueItem.select().where(QueueItem.queue_name == self.queue_name,
QueueItem.available_after <= now, QueueItem.available_after <= now,
available_or_expired) available_or_expired,
QueueItem.retries_remaining > 0)
found = list(avail.limit(1).order_by(QueueItem.available_after)) found = list(avail.limit(1).order_by(QueueItem.available_after))
@ -45,6 +47,7 @@ class WorkQueue(object):
item = found[0] item = found[0]
item.available = False item.available = False
item.processing_expires = now + timedelta(seconds=processing_time) item.processing_expires = now + timedelta(seconds=processing_time)
item.retries_remaining -= 1
item.save() item.save()
return item return item
@ -61,3 +64,4 @@ class WorkQueue(object):
image_diff_queue = WorkQueue('imagediff') image_diff_queue = WorkQueue('imagediff')
dockerfile_build_queue = WorkQueue('dockerfilebuild') dockerfile_build_queue = WorkQueue('dockerfilebuild')
webhook_queue = WorkQueue('webhook')

Binary file not shown.