115 lines
		
	
	
	
		
			3.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			115 lines
		
	
	
	
		
			3.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import hashlib
 | |
| import os
 | |
| 
 | |
| from io import BytesIO
 | |
| 
 | |
| import pytest
 | |
| 
 | |
| from data.registry_model.datatypes import RepositoryReference
 | |
| from data.registry_model.blobuploader import (create_blob_upload, retrieve_blob_upload_manager,
 | |
|                                               upload_blob, BlobUploadException,
 | |
|                                               BlobDigestMismatchException, BlobTooLargeException,
 | |
|                                               BlobUploadSettings)
 | |
| from data.registry_model.registry_pre_oci_model import PreOCIModel
 | |
| 
 | |
| from storage.distributedstorage import DistributedStorage
 | |
| from storage.fakestorage import FakeStorage
 | |
| from test.fixtures import *
 | |
| 
 | |
| @pytest.fixture()
 | |
| def pre_oci_model(initialized_db):
 | |
|   return PreOCIModel()
 | |
| 
 | |
| @pytest.mark.parametrize('chunk_count', [
 | |
|   0,
 | |
|   1,
 | |
|   2,
 | |
|   10,
 | |
| ])
 | |
| @pytest.mark.parametrize('subchunk', [
 | |
|   True,
 | |
|   False,
 | |
| ])
 | |
| def test_basic_upload_blob(chunk_count, subchunk, pre_oci_model):
 | |
|   repository_ref = pre_oci_model.lookup_repository('devtable', 'complex')
 | |
|   storage = DistributedStorage({'local_us': FakeStorage(None)}, ['local_us'])
 | |
|   settings = BlobUploadSettings('2M', 512 * 1024, 3600)
 | |
|   app_config = {'TESTING': True}
 | |
| 
 | |
|   data = ''
 | |
|   with upload_blob(repository_ref, storage, settings) as manager:
 | |
|     assert manager
 | |
|     assert manager.blob_upload_id
 | |
| 
 | |
|     for index in range(0, chunk_count):
 | |
|       chunk_data = os.urandom(100)
 | |
|       data += chunk_data
 | |
| 
 | |
|       if subchunk:
 | |
|         manager.upload_chunk(app_config, BytesIO(chunk_data))
 | |
|         manager.upload_chunk(app_config, BytesIO(chunk_data), (index * 100) + 50)
 | |
|       else:
 | |
|         manager.upload_chunk(app_config, BytesIO(chunk_data))
 | |
| 
 | |
|     blob = manager.commit_to_blob(app_config)
 | |
| 
 | |
|   # Check the blob.
 | |
|   assert blob.compressed_size == len(data)
 | |
|   assert not blob.uploading
 | |
|   assert blob.digest == 'sha256:' + hashlib.sha256(data).hexdigest()
 | |
| 
 | |
|   # Ensure the blob exists in storage and has the expected data.
 | |
|   assert storage.get_content(['local_us'], blob.storage_path) == data
 | |
| 
 | |
| 
 | |
| def test_cancel_upload(pre_oci_model):
 | |
|   repository_ref = pre_oci_model.lookup_repository('devtable', 'complex')
 | |
|   storage = DistributedStorage({'local_us': FakeStorage(None)}, ['local_us'])
 | |
|   settings = BlobUploadSettings('2M', 512 * 1024, 3600)
 | |
|   app_config = {'TESTING': True}
 | |
| 
 | |
|   blob_upload_id = None
 | |
|   with upload_blob(repository_ref, storage, settings) as manager:
 | |
|     blob_upload_id = manager.blob_upload_id
 | |
|     assert pre_oci_model.lookup_blob_upload(repository_ref, blob_upload_id) is not None
 | |
| 
 | |
|     manager.upload_chunk(app_config, BytesIO('hello world'))
 | |
| 
 | |
|   # Since the blob was not comitted, the upload should be deleted.
 | |
|   assert blob_upload_id
 | |
|   assert pre_oci_model.lookup_blob_upload(repository_ref, blob_upload_id) is None
 | |
| 
 | |
| 
 | |
| def test_too_large(pre_oci_model):
 | |
|   repository_ref = pre_oci_model.lookup_repository('devtable', 'complex')
 | |
|   storage = DistributedStorage({'local_us': FakeStorage(None)}, ['local_us'])
 | |
|   settings = BlobUploadSettings('1K', 512 * 1024, 3600)
 | |
|   app_config = {'TESTING': True}
 | |
| 
 | |
|   with upload_blob(repository_ref, storage, settings) as manager:
 | |
|     with pytest.raises(BlobTooLargeException):
 | |
|       manager.upload_chunk(app_config, BytesIO(os.urandom(1024 * 1024 * 2)))
 | |
| 
 | |
| 
 | |
| def test_extra_blob_stream_handlers(pre_oci_model):
 | |
|   handler1_result = []
 | |
|   handler2_result = []
 | |
| 
 | |
|   def handler1(bytes):
 | |
|     handler1_result.append(bytes)
 | |
| 
 | |
|   def handler2(bytes):
 | |
|     handler2_result.append(bytes)
 | |
| 
 | |
|   repository_ref = pre_oci_model.lookup_repository('devtable', 'complex')
 | |
|   storage = DistributedStorage({'local_us': FakeStorage(None)}, ['local_us'])
 | |
|   settings = BlobUploadSettings('1K', 512 * 1024, 3600)
 | |
|   app_config = {'TESTING': True}
 | |
| 
 | |
|   with upload_blob(repository_ref, storage, settings,
 | |
|                    extra_blob_stream_handlers=[handler1, handler2]) as manager:
 | |
|     manager.upload_chunk(app_config, BytesIO('hello '))
 | |
|     manager.upload_chunk(app_config, BytesIO('world'))
 | |
| 
 | |
|   assert ''.join(handler1_result) == 'hello world'
 | |
|   assert ''.join(handler2_result) == 'hello world'
 |