Merge pull request #1439 from coreos-inc/fixtestlocalsecscan

Add testing of the new secscan-for-local endpoint and fix a bug
This commit is contained in:
josephschorr 2016-05-04 22:02:59 -04:00
commit 836a99083e
3 changed files with 40 additions and 6 deletions

View file

@ -32,7 +32,6 @@ from util.security import strictjwt
import endpoints.decorated import endpoints.decorated
import json import json
import features
import hashlib import hashlib
import logging import logging
import bencode import bencode
@ -40,7 +39,6 @@ import bencode
import tarfile import tarfile
import shutil import shutil
from jwkest.jws import SIGNER_ALGS
from jwkest.jwk import RSAKey from jwkest.jwk import RSAKey
from Crypto.PublicKey import RSA from Crypto.PublicKey import RSA
@ -1153,7 +1151,6 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
# Attempt to pull by digest. # Attempt to pull by digest.
self.do_pull('devtable', 'newrepo', 'devtable', 'password', manifest_id=digest) self.do_pull('devtable', 'newrepo', 'devtable', 'password', manifest_id=digest)
def test_pull_invalid_image_tag(self): def test_pull_invalid_image_tag(self):
# Add a new repository under the user, so we have a real repository to pull. # Add a new repository under the user, so we have a real repository to pull.
self.do_push('devtable', 'newrepo', 'devtable', 'password') self.do_push('devtable', 'newrepo', 'devtable', 'password')
@ -1163,7 +1160,6 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
self.do_pull('devtable', 'newrepo', 'devtable', 'password', manifest_id='invalid', self.do_pull('devtable', 'newrepo', 'devtable', 'password', manifest_id='invalid',
expect_failure=FailureCodes.INVALID_REGISTRY) expect_failure=FailureCodes.INVALID_REGISTRY)
def test_partial_upload_below_5mb(self): def test_partial_upload_below_5mb(self):
chunksize = 1024 * 1024 * 2 chunksize = 1024 * 1024 * 2
size = chunksize * 3 size = chunksize * 3

View file

@ -11,6 +11,7 @@ from util.secscan.analyzer import LayerAnalyzer
from util.secscan.notifier import process_notification_data from util.secscan.notifier import process_notification_data
from data import model from data import model
from workers.security_notification_worker import SecurityNotificationWorker from workers.security_notification_worker import SecurityNotificationWorker
from endpoints.v2 import v2_bp
ADMIN_ACCESS_USER = 'devtable' ADMIN_ACCESS_USER = 'devtable'
@ -155,6 +156,43 @@ class TestSecurityScanner(unittest.TestCase):
self.assertIsNone(result) self.assertIsNone(result)
def test_analyze_layer_nodirectdownload_success(self):
# Disable direct download in fake storage.
storage.put_content(['local_us'], 'supports_direct_download', 'false')
try:
app.register_blueprint(v2_bp, url_prefix='/v2')
except:
# Already registered.
pass
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
self.assertFalse(layer.security_indexed)
self.assertEquals(-1, layer.security_indexed_engine)
# Ensure that the download is a registry+JWT download.
uri, auth_header = self.api._get_image_url_and_auth(layer)
self.assertIsNotNone(uri)
self.assertIsNotNone(auth_header)
# Ensure the download doesn't work without the header.
rv = self.app.head(uri)
self.assertEquals(rv.status_code, 401)
# Ensure the download works with the header. Note we use a HEAD here, as GET causes DB
# access which messes with the test runner's rollback.
rv = self.app.head(uri, headers=[('authorization', auth_header)])
self.assertEquals(rv.status_code, 200)
# Ensure the code works when called via analyze.
with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content):
analyzer = LayerAnalyzer(app.config, self.api)
analyzer.analyze_recursively(layer)
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
self.assertAnalyzed(layer, True, 1)
def test_analyze_layer_success(self): def test_analyze_layer_success(self):
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
self.assertFalse(layer.security_indexed) self.assertFalse(layer.security_indexed)

View file

@ -74,7 +74,7 @@ class SecurityScannerAPI(object):
repository_and_namespace = '/'.join([namespace_name, repo_name]) repository_and_namespace = '/'.join([namespace_name, repo_name])
# Generate the JWT which will authorize this # Generate the JWT which will authorize this
audience = 'security_scanner' audience = self._app.config['SERVER_HOSTNAME']
context, subject = build_context_and_subject(None, None, None) context, subject = build_context_and_subject(None, None, None)
access = [{ access = [{
'type': 'repository', 'type': 'repository',
@ -83,7 +83,7 @@ class SecurityScannerAPI(object):
}] }]
auth_jwt = generate_jwt_object(audience, subject, context, access, TOKEN_VALIDITY_LIFETIME_S, auth_jwt = generate_jwt_object(audience, subject, context, access, TOKEN_VALIDITY_LIFETIME_S,
self._config) self._config)
auth_header = 'Bearer: {}'.format(auth_jwt) auth_header = 'Bearer {}'.format(auth_jwt)
with self._app.test_request_context('/'): with self._app.test_request_context('/'):
relative_layer_url = url_for('v2.download_blob', repository=repository_and_namespace, relative_layer_url = url_for('v2.download_blob', repository=repository_and_namespace,