Make the worker post json. Add a retry timeout after an incompletel queue item is processed. Submit webhook jobs to the queue on a successful push.
This commit is contained in:
		
							parent
							
								
									f55e4074e0
								
							
						
					
					
						commit
						e787d8b2d8
					
				
					 4 changed files with 31 additions and 4 deletions
				
			
		|  | @ -32,6 +32,7 @@ start the workers: | |||
| ``` | ||||
| STACK=prod python -m workers.diffsworker -D | ||||
| STACK=prod python -m workers.dockerfilebuild -D | ||||
| STACK=prod python -m workers.webhookworker -D | ||||
| ``` | ||||
| 
 | ||||
| bouncing the servers: | ||||
|  |  | |||
|  | @ -56,7 +56,9 @@ class WorkQueue(object): | |||
|   def complete(self, completed_item): | ||||
|     completed_item.delete_instance() | ||||
| 
 | ||||
|   def incomplete(self, incomplete_item): | ||||
|   def incomplete(self, incomplete_item, retry_after=300): | ||||
|     retry_date = datetime.now() + timedelta(seconds=retry_after) | ||||
|     incomplete_item.available_after = retry_date | ||||
|     incomplete_item.available = True | ||||
|     incomplete_item.save() | ||||
| 
 | ||||
|  |  | |||
|  | @ -6,6 +6,7 @@ from flask import request, make_response, jsonify, abort | |||
| from functools import wraps | ||||
| 
 | ||||
| from data import model | ||||
| from data.queue import webhook_queue | ||||
| from app import app, mixpanel | ||||
| from auth.auth import (process_auth, get_authenticated_user, | ||||
|                        get_validated_token) | ||||
|  | @ -178,18 +179,39 @@ def update_images(namespace, repository): | |||
|   permission = ModifyRepositoryPermission(namespace, repository) | ||||
| 
 | ||||
|   if permission.can(): | ||||
|     repository = model.get_repository(namespace, repository) | ||||
|     if not repository: | ||||
|     repo = model.get_repository(namespace, repository) | ||||
|     if not repo: | ||||
|       # Make sure the repo actually exists. | ||||
|       abort(404) | ||||
| 
 | ||||
|     image_with_checksums = json.loads(request.data) | ||||
| 
 | ||||
|     updated_tags = {} | ||||
|     for image in image_with_checksums: | ||||
|       logger.debug('Setting checksum for image id: %s to %s' % | ||||
|                    (image['id'], image['checksum'])) | ||||
|       updated_tags[image['Tag']] = image['id'] | ||||
|       model.set_image_checksum(image['id'], repository, image['checksum']) | ||||
| 
 | ||||
|     # Generate a job for each webhook that has been added to this repo | ||||
|     webhooks = model.list_webhooks(namespace, repository) | ||||
|     for webhook in webhooks: | ||||
|       webhook_data = json.loads(webhook.parameters) | ||||
|       repo_string = '%s/%s' % (namespace, repository) | ||||
|       logger.debug('Creating webhook for repository \'%s\' for url \'%s\'' % | ||||
|                    (repo_string, webhook_data['url'])) | ||||
|       webhook_data['payload'] = { | ||||
|         'repository': repo_string, | ||||
|         'namespace': namespace, | ||||
|         'name': repository, | ||||
|         'docker_url': 'quay.io/%s' % repo_string, | ||||
|         'homepage': 'https://quay.io/repository/%s' % repo_string, | ||||
|         'visibility': repo.visibility.name, | ||||
|         'updated_tags': updated_tags, | ||||
|         'pushed_image_count': len(image_with_checksums),  | ||||
|       } | ||||
|       webhook_queue.put(json.dumps(webhook_data)) | ||||
| 
 | ||||
|     return make_response('Updated', 204) | ||||
| 
 | ||||
|   abort(403) | ||||
|  |  | |||
|  | @ -2,6 +2,7 @@ import logging | |||
| import daemon | ||||
| import argparse | ||||
| import requests | ||||
| import json | ||||
| 
 | ||||
| from data.queue import webhook_queue | ||||
| from workers.worker import Worker | ||||
|  | @ -20,9 +21,10 @@ class WebhookWorker(Worker): | |||
|   def process_queue_item(self, job_details): | ||||
|     url = job_details['url'] | ||||
|     payload = job_details['payload'] | ||||
|     headers = {'Content-type': 'application/json'} | ||||
| 
 | ||||
|     try: | ||||
|       resp = requests.post(url, data=payload) | ||||
|       resp = requests.post(url, data=json.dumps(payload), headers=headers) | ||||
|       if resp.status_code/100 != 2: | ||||
|         logger.error('%s response for webhook to url: %s' % (resp.status_code, | ||||
|                                                              url)) | ||||
|  |  | |||
		Reference in a new issue