55 lines
		
	
	
	
		
			1.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			55 lines
		
	
	
	
		
			1.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from datetime import datetime, timedelta
 | |
| 
 | |
| from database import QueueItem
 | |
| 
 | |
| 
 | |
| class WorkQueue(object):
 | |
|   def __init__(self, queue_name):
 | |
|     self.queue_name = queue_name
 | |
| 
 | |
|   def put(self, message, available_after=0):
 | |
|     """
 | |
|     Put an item, if it shouldn't be processed for some number of seconds,
 | |
|     specify that amount as available_after.
 | |
|     """
 | |
| 
 | |
|     params = {
 | |
|       'queue_name': self.queue_name,
 | |
|       'body': message,
 | |
|     }
 | |
| 
 | |
|     if available_after:
 | |
|       available_date = datetime.now() + timedelta(seconds=available_after)
 | |
|       params['available_after'] = available_date
 | |
| 
 | |
|     QueueItem.create(**params)
 | |
| 
 | |
|   def get(self, processing_time=300):
 | |
|     """
 | |
|     Get an available item and mark it as unavailable for the default of five
 | |
|     minutes.
 | |
|     """
 | |
|     now = datetime.now()
 | |
|     available_or_expired = (QueueItem.available == True |
 | |
|                             QueueItem.processing_expires <= now)
 | |
| 
 | |
|     # TODO the query and the update should be atomic, but for now we only
 | |
|     # have one worker.
 | |
|     avail = QueueItem.select().where(QueueItem.queue_name == self.queue_name,
 | |
|                                      QueueItem.available_after <= now,
 | |
|                                      available_or_expired)
 | |
| 
 | |
|     found = list(avail.limit(1).order_by(QueueItem.available_after))
 | |
| 
 | |
|     if found:
 | |
|       item = found[0]
 | |
|       item.available = False
 | |
|       item.processing_expires = now + timedelta(seconds=processing_time)
 | |
|       item.save()
 | |
| 
 | |
|       return item
 | |
| 
 | |
|     return None
 | |
| 
 | |
|   def complete(self, completed_item):
 | |
|     item.delete_instance()
 |