Add ACI signing tests

This commit is contained in:
Joseph Schorr 2016-05-13 18:29:57 -04:00
parent de6b7bc88d
commit 64fe11a5f1
5 changed files with 96 additions and 29 deletions

View file

@ -7,6 +7,8 @@ import string
import resumablehashlib
import binascii
import uuid
import time
import gpgme
import Crypto.Random
from cachetools import lru_cache
@ -352,6 +354,9 @@ class V1RegistryPushMixin(V1RegistryMixin):
if 'size' in image_data:
image_json_data['Size'] = image_data['size']
if 'parent' in image_data:
image_json_data['parent'] = image_data['parent']
self.conduct('PUT', '/v1/images/%s/json' % image_id,
data=json.dumps(image_json_data), auth='sig')
@ -368,7 +373,7 @@ class V1RegistryPushMixin(V1RegistryMixin):
# PUT /v1/repositories/{namespace}/{repository}/tags/latest
self.do_tag(namespace, repository, 'latest', images[0]['id'])
self.do_tag(namespace, repository, 'latest', images[-1]['id'])
# PUT /v1/repositories/{namespace}/{repository}/images
self.conduct('PUT', '/v1/repositories/%s/images' % repo_name,
@ -423,7 +428,7 @@ class V1RegistryPullMixin(V1RegistryMixin):
response = self.conduct('GET', image_prefix + 'layer', auth='sig')
# Ensure we can parse the layer bytes and that they contain the contents.
self.assertContents(images[index], response)
self.assertContents(images[len(images) - index - 1], response)
class V2RegistryMixin(BaseRegistryMixin):
@ -529,7 +534,7 @@ class V2RegistryPushMixin(V2RegistryMixin):
builder = SignedManifestBuilder(namespace, repository, tag_name)
full_contents = {}
for image_data in images:
for image_data in reversed(images):
full_contents[image_data['id']] = _get_full_contents(image_data, additional_fields=munge_shas)
checksum = 'sha256:' + hashlib.sha256(full_contents[image_data['id']]).hexdigest()
if invalid:
@ -544,7 +549,7 @@ class V2RegistryPushMixin(V2RegistryMixin):
# Push the image's layers.
checksums = {}
for image_data in images:
for image_data in reversed(images):
image_id = image_data['id']
layer_bytes = full_contents[image_data['id']]
chunks = image_data.get('chunks')
@ -662,7 +667,7 @@ class V2RegistryPullMixin(V2RegistryMixin):
# Verify the layers.
blobs = {}
for index, layer in enumerate(manifest_data['fsLayers']):
for index, layer in enumerate(reversed(manifest_data['fsLayers'])):
blob_id = layer['blobSum']
result = self.conduct('GET', '/v2/%s/blobs/%s' % (repo_name, blob_id),
expected_code=200, auth='jwt')
@ -726,15 +731,15 @@ class RegistryTestsMixin(object):
return
images = [
{
'id': 'baseid',
'contents': 'The base image',
},
{
'id': 'latestid',
'contents': 'the latest image',
'parent': 'baseid',
},
{
'id': 'baseid',
'contents': 'The base image',
}
]
# Push a new repository.
@ -756,16 +761,16 @@ class RegistryTestsMixin(object):
return
images = [
{
'id': 'baseid',
'contents': 'The base image',
},
{
'id': 'latestid',
'contents': 'The latest image',
'unicode': u'the Pawe\xc5\x82 Kami\xc5\x84ski image',
'parent': 'baseid',
},
{
'id': 'baseid',
'contents': 'The base image',
}
]
# Push a new repository.
@ -1260,15 +1265,15 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
def test_multiple_layers(self):
# Push a manifest with multiple layers.
images = [
{
'id': 'baseid',
'contents': 'The base image',
},
{
'id': 'latestid',
'contents': 'the latest image',
'parent': 'baseid',
},
{
'id': 'baseid',
'contents': 'The base image',
}
]
self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images)
@ -1438,10 +1443,34 @@ class ACIConversionTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerT
""" Tests for registry ACI conversion. """
def get_converted_image(self):
response = self.conduct('GET', '/c1/aci/localhost:5000/devtable/newrepo/latest/aci/linux/amd64', auth='sig')
response = self.conduct('GET', '/c1/aci/localhost:5000/devtable/newrepo/latest/aci/linux/amd64/', auth='sig')
tar = tarfile.open(fileobj=StringIO(response.content))
return tar, response.content
def get_converted_signature(self):
# Give time for the signature to be written before continuing.
time.sleep(1)
response = self.conduct('GET', '/c1/aci/localhost:5000/devtable/newrepo/latest/aci.asc/linux/amd64/', auth='sig')
return response.content
def _verify_signature(self, signature, converted):
sig_bytes = StringIO(signature)
content_bytes = StringIO(converted)
ctx = gpgme.Context()
sigs = ctx.verify(sig_bytes, content_bytes, None)
self.assertEqual(len(sigs), 1)
self.assertEqual(sigs[0].summary, 0)
self.assertEqual(sigs[0].fpr, '07692864E17025DD1BEA88E44632047EEEB32221')
self.assertEqual(sigs[0].status, None)
self.assertEqual(sigs[0].notations, [])
self.assertEqual(sigs[0].exp_timestamp, 0)
self.assertEqual(sigs[0].wrong_key_usage, False)
self.assertEqual(sigs[0].validity, gpgme.VALIDITY_UNKNOWN)
self.assertEqual(sigs[0].validity_reason, None)
def test_basic_conversion(self):
initial_images = [
{
@ -1454,7 +1483,10 @@ class ACIConversionTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerT
self.do_push('devtable', 'newrepo', 'devtable', 'password', images=initial_images)
# Pull the squashed version of the tag.
tar, _ = self.get_converted_image()
tar, converted = self.get_converted_image()
signature = self.get_converted_signature()
# Verify the manifest.
self.assertEquals(['manifest', 'rootfs', 'rootfs/contents'], tar.getnames())
manifest = json.loads(tar.extractfile(tar.getmember('manifest')).read())
@ -1488,16 +1520,19 @@ class ACIConversionTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerT
self.assertEquals(manifest, expected_manifest)
self.assertEquals('the initial image', tar.extractfile(tar.getmember('rootfs/contents')).read())
# Verify the signature.
self._verify_signature(signature, converted)
def test_multilayer_conversion(self):
images = [
{
'id': 'baseid',
'contents': 'The base image',
},
{
'id': 'latestid',
'contents': 'the latest image',
'parent': 'baseid',
},
{
'id': 'baseid',
'contents': 'The base image',
}
]
@ -1505,10 +1540,15 @@ class ACIConversionTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerT
self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images)
# Pull the squashed version of the tag.
tar, _ = self.get_converted_image()
tar, converted = self.get_converted_image()
signature = self.get_converted_signature()
self.assertEquals(['manifest', 'rootfs', 'rootfs/contents'], tar.getnames())
self.assertEquals('the latest image', tar.extractfile(tar.getmember('rootfs/contents')).read())
# Verify the signature.
self._verify_signature(signature, converted)
class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestCase):
""" Tests for registry squashing. """
@ -1573,15 +1613,15 @@ class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestC
def test_multilayer_squashing(self):
images = [
{
'id': 'baseid',
'contents': 'The base image',
},
{
'id': 'latestid',
'contents': 'the latest image',
'parent': 'baseid',
},
{
'id': 'baseid',
'contents': 'The base image',
}
]
# Create the repo.

BIN
test/signing-private.gpg Normal file

Binary file not shown.

19
test/signing-public.gpg Normal file
View file

@ -0,0 +1,19 @@
-----BEGIN PGP PUBLIC KEY BLOCK-----
Version: GnuPG v2
mQENBFTVMzABCAC8jcnCrNHKk0LgyZTdTFtf9Qm2bK27Y0EyyI8tWefUt4LhQRCA
14dksJVzqWBtpHJnqkYUwfoXZmdz4e9fSS1mmoiHlDwzkuNXx2J1HAnXSxgNMV1D
JQmfxhKQzFTgkTEN03txPZrOMrDNIZSw0gkAbiBGuQXk9/HNGbzdjkd3vk1GF7Vk
v1vITmWQG+QQi7H8zR1NYYuFQb5cdDDuOoQWHXNMIZmK27StZ6MUot3NlquZbs1q
5Gr1HHog0qx+0uYn441zghZ9R1JqaAig0V3eJ8UAbTIMZPO09UUBQKC7O7OgOX/H
92zGWGwkTMUqJNJUr/dj5ocQbpFk8X3yz+d9ABEBAAG0RFF1YXkuaW8gQUNJIENv
bnZlcnRlciAoQUNJIGNvbnZlcnNpb24gc2lnbmluZyBrZXkpIDxzdXBwb3J0QHF1
YXkuaW8+iQE5BBMBAgAjBQJU1TMwAhsDBwsJCAcDAgEGFQgCCQoLBBYCAwECHgEC
F4AACgkQRjIEfu6zIiHo9Af+MCE4bUOrQ6yrHSPHebHwSARULaTB0Rlj4BAXlv+A
nUJDaaYaYExo8SHZMWF5X4d4mh57DJOsIXMjIWNKpf9/0hpxRu+P8p77YtXOOeRS
3xFdq7cOK1yQ8h/iRoXyLaxAFgWvVH+Ttmx4DLr+NsyzEQBjADeBCcF4YR9OZ7fj
ZYsoq68hH0W7zgZTSrCgvyGxdpu+UWWk/eV/foktxKBMV8K2GmAwyOlsAm59PgUI
EhfFH0WAEx6+jsMFLkn7USPWomFeyMwJJEiMKYCwALWIbNz1/1dtrZZs2QmBcjAu
AMFQhx8fykH4ON8a6fpS3TOEzf0HV1NX295O8wb8vS9B7w==
=aImY
-----END PGP PUBLIC KEY BLOCK-----

View file

@ -54,6 +54,7 @@ class TestConfig(DefaultConfig):
FEATURE_GITHUB_BUILD = True
FEATURE_BITTORRENT = True
FEATURE_ACI_CONVERSION = True
INSTANCE_SERVICE_KEY_LOCATION = 'test/data/test.pem'
@ -65,3 +66,10 @@ class TestConfig(DefaultConfig):
SECURITY_SCANNER_API_VERSION = 'v1'
SECURITY_SCANNER_ENGINE_VERSION_TARGET = 1
SECURITY_SCANNER_API_TIMEOUT_SECONDS = 1
SIGNING_ENGINE = 'gpg2'
GPG2_PRIVATE_KEY_NAME = 'EEB32221'
GPG2_PRIVATE_KEY_FILENAME = 'signing-private.gpg'
GPG2_PUBLIC_KEY_FILENAME = 'signing-public.gpg'

View file

@ -44,7 +44,7 @@ class QueueFile(object):
handled = True
if handled:
return
return ''
else:
raise result.exception