Fix some typos and bugs in the worker.

This commit is contained in:
yackob03 2013-10-18 15:28:16 -04:00
parent a1164269be
commit 4514f5a969
3 changed files with 22 additions and 9 deletions

View file

@ -22,3 +22,12 @@ running:
sudo nginx -c `pwd`/nginx.conf sudo nginx -c `pwd`/nginx.conf
STACK=prod gunicorn -D --workers 4 -b unix:/tmp/gunicorn.sock --worker-class eventlet -t 500 application:application STACK=prod gunicorn -D --workers 4 -b unix:/tmp/gunicorn.sock --worker-class eventlet -t 500 application:application
``` ```
set up the snapshot script:
(instructions in the seo-snapshots directory)[seo-snapshots/README.md]
start the workers:
```
python -m workers.diffsworker -D
```

View file

@ -7,7 +7,7 @@ class WorkQueue(object):
def __init__(self, queue_name): def __init__(self, queue_name):
self.queue_name = queue_name self.queue_name = queue_name
def put(message, available_after=0): def put(self, message, available_after=0):
""" """
Put an item, if it shouldn't be processed for some number of seconds, Put an item, if it shouldn't be processed for some number of seconds,
specify that amount as available_after. specify that amount as available_after.
@ -24,7 +24,7 @@ class WorkQueue(object):
QueueItem.create(**params) QueueItem.create(**params)
def get(processing_time=300): def get(self, processing_time=300):
""" """
Get an available item and mark it as unavailable for the default of five Get an available item and mark it as unavailable for the default of five
minutes. minutes.
@ -35,9 +35,9 @@ class WorkQueue(object):
# TODO the query and the update should be atomic, but for now we only # TODO the query and the update should be atomic, but for now we only
# have one worker. # have one worker.
avaial = QueueItem.select().where(QueueItem.queue_name = self.queue_name, avail = QueueItem.select().where(QueueItem.queue_name == self.queue_name,
QueueItem.available_after <= now, QueueItem.available_after <= now,
available_or_expired) available_or_expired)
found = list(avail.limit(1).order_by(QueueItem.available_after)) found = list(avail.limit(1).order_by(QueueItem.available_after))
@ -51,5 +51,5 @@ class WorkQueue(object):
return None return None
def complete(completed_item): def complete(self, completed_item):
item.delete_instance() item.delete_instance()

View file

@ -1,5 +1,7 @@
import logging import logging
import json import json
import daemon
import time
from apscheduler.scheduler import Scheduler from apscheduler.scheduler import Scheduler
@ -15,7 +17,7 @@ image_diff_queue = WorkQueue('imagediff')
def process_work_items(): def process_work_items():
logger.debug('Getting work item from queue.') logger.debug('Getting work item from queue.')
item = imagediff.get() item = image_diff_queue.get()
if item: if item:
logger.debug('Queue gave us some work: %s' % item.body) logger.debug('Queue gave us some work: %s' % item.body)
@ -26,7 +28,6 @@ def process_work_items():
else: else:
logger.debug('No work today.') logger.debug('No work today.')
FORMAT = '%(asctime)-15s - %(levelname)s - %(pathname)s - %(funcName)s - %(message)s' FORMAT = '%(asctime)-15s - %(levelname)s - %(pathname)s - %(funcName)s - %(message)s'
logging.basicConfig(format=FORMAT, level=logging.DEBUG) logging.basicConfig(format=FORMAT, level=logging.DEBUG)
@ -34,3 +35,6 @@ sched = Scheduler()
sched.start() sched.start()
sched.add_interval_job(process_work_items, seconds=10) sched.add_interval_job(process_work_items, seconds=10)
while True:
time.sleep(60 * 60 * 24) # sleep one day, basically forever