141 lines
		
	
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			141 lines
		
	
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import json
 | |
| import tarfile
 | |
| 
 | |
| from abc import ABCMeta, abstractmethod
 | |
| from collections import namedtuple
 | |
| from cStringIO import StringIO
 | |
| from enum import Enum, unique
 | |
| from six import add_metaclass
 | |
| 
 | |
| from image.docker.schema2 import EMPTY_LAYER_BYTES
 | |
| 
 | |
| Image = namedtuple('Image', ['id', 'parent_id', 'bytes', 'size', 'config', 'created', 'urls',
 | |
|                              'is_empty'])
 | |
| Image.__new__.__defaults__ = (None, None, None, None, False)
 | |
| 
 | |
| PushResult = namedtuple('PushResult', ['manifests', 'headers'])
 | |
| PullResult = namedtuple('PullResult', ['manifests', 'image_ids'])
 | |
| 
 | |
| 
 | |
| def layer_bytes_for_contents(contents, mode='|gz', other_files=None, empty=False):
 | |
|   if empty:
 | |
|     return EMPTY_LAYER_BYTES
 | |
| 
 | |
|   layer_data = StringIO()
 | |
|   tar_file = tarfile.open(fileobj=layer_data, mode='w' + mode)
 | |
| 
 | |
|   def add_file(name, contents):
 | |
|     tar_file_info = tarfile.TarInfo(name=name)
 | |
|     tar_file_info.type = tarfile.REGTYPE
 | |
|     tar_file_info.size = len(contents)
 | |
|     tar_file_info.mtime = 1
 | |
| 
 | |
|     tar_file.addfile(tar_file_info, StringIO(contents))
 | |
| 
 | |
|   add_file('contents', contents)
 | |
| 
 | |
|   if other_files is not None:
 | |
|     for file_name, file_contents in other_files.iteritems():
 | |
|       add_file(file_name, file_contents)
 | |
| 
 | |
|   tar_file.close()
 | |
| 
 | |
|   layer_bytes = layer_data.getvalue()
 | |
|   layer_data.close()
 | |
|   return layer_bytes
 | |
| 
 | |
| 
 | |
| @unique
 | |
| class Failures(Enum):
 | |
|   """ Defines the various forms of expected failure. """
 | |
|   UNAUTHENTICATED = 'unauthenticated'
 | |
|   UNAUTHORIZED = 'unauthorized'
 | |
|   INVALID_REGISTRY = 'invalid-registry'
 | |
|   INVALID_REPOSITORY = 'invalid-repository'
 | |
|   SLASH_REPOSITORY = 'slash-repository'
 | |
|   APP_REPOSITORY = 'app-repository'
 | |
|   UNKNOWN_TAG = 'unknown-tag'
 | |
|   ANONYMOUS_NOT_ALLOWED = 'anonymous-not-allowed'
 | |
|   DISALLOWED_LIBRARY_NAMESPACE = 'disallowed-library-namespace'
 | |
|   MISSING_TAG = 'missing-tag'
 | |
|   INVALID_TAG = 'invalid-tag'
 | |
|   INVALID_MANIFEST = 'invalid-manifest'
 | |
|   INVALID_IMAGES = 'invalid-images'
 | |
|   UNSUPPORTED_CONTENT_TYPE = 'unsupported-content-type'
 | |
|   INVALID_BLOB = 'invalid-blob'
 | |
|   NAMESPACE_DISABLED = 'namespace-disabled'
 | |
|   UNAUTHORIZED_FOR_MOUNT = 'unauthorized-for-mount'
 | |
|   GEO_BLOCKED = 'geo-blocked'
 | |
| 
 | |
| 
 | |
| class ProtocolOptions(object):
 | |
|   def __init__(self):
 | |
|     self.scopes = None
 | |
|     self.cancel_blob_upload = False
 | |
|     self.manifest_invalid_blob_references = False
 | |
|     self.chunks_for_upload = None
 | |
|     self.skip_head_checks = False
 | |
|     self.manifest_content_type = None
 | |
|     self.accept_mimetypes = None
 | |
|     self.mount_blobs = None
 | |
|     self.push_by_manifest_digest = False
 | |
|     self.request_addr = None
 | |
|     self.skip_blob_push_checks = False
 | |
|     self.ensure_ascii = True
 | |
| 
 | |
| 
 | |
| @add_metaclass(ABCMeta)
 | |
| class RegistryProtocol(object):
 | |
|   """ Interface for protocols. """
 | |
|   FAILURE_CODES = {}
 | |
| 
 | |
|   @abstractmethod
 | |
|   def login(self, session, username, password, scopes, expect_success):
 | |
|     """ Performs the login flow with the given credentials, over the given scopes. """
 | |
| 
 | |
|   @abstractmethod
 | |
|   def pull(self, session, namespace, repo_name, tag_names, images, credentials=None,
 | |
|            expected_failure=None, options=None):
 | |
|     """ Pulls the given tag via the given session, using the given credentials, and
 | |
|         ensures the given images match.
 | |
|     """
 | |
| 
 | |
|   @abstractmethod
 | |
|   def push(self, session, namespace, repo_name, tag_names, images, credentials=None,
 | |
|            expected_failure=None, options=None):
 | |
|     """ Pushes the specified images as the given tag via the given session, using
 | |
|         the given credentials.
 | |
|     """
 | |
| 
 | |
|   @abstractmethod
 | |
|   def delete(self, session, namespace, repo_name, tag_names, credentials=None,
 | |
|              expected_failure=None, options=None):
 | |
|     """ Deletes some tags. """
 | |
|     
 | |
|   def repo_name(self, namespace, repo_name):
 | |
|     if namespace:
 | |
|       return '%s/%s' % (namespace, repo_name)
 | |
| 
 | |
|     return repo_name
 | |
| 
 | |
|   def conduct(self, session, method, url, expected_status=200, params=None, data=None,
 | |
|               json_data=None, headers=None, auth=None, options=None):
 | |
|     if json_data is not None:
 | |
|       data = json.dumps(json_data).encode('utf-8')
 | |
|       headers = headers or {}
 | |
|       headers['Content-Type'] = 'application/json'
 | |
| 
 | |
|     if options and options.request_addr:
 | |
|       headers = headers or {}
 | |
|       headers['X-Override-Remote-Addr-For-Testing'] = options.request_addr
 | |
| 
 | |
|     if isinstance(expected_status, tuple):
 | |
|       expected_status, expected_failure, protocol_step = expected_status
 | |
|       if expected_failure is not None:
 | |
|         failures = self.__class__.FAILURE_CODES.get(protocol_step, {})
 | |
|         expected_status = failures.get(expected_failure, expected_status)
 | |
| 
 | |
|     result = session.request(method, url, params=params, data=data, headers=headers, auth=auth)
 | |
|     msg = "Expected response %s, got %s: %s" % (expected_status, result.status_code, result.text)
 | |
|     assert result.status_code == expected_status, msg
 | |
|     return result
 |