Add testing of the new secscan-for-local endpoint and fix a bug
This commit is contained in:
parent
67031e4e33
commit
232fa42897
3 changed files with 40 additions and 6 deletions
|
@ -32,7 +32,6 @@ from util.security import strictjwt
|
|||
|
||||
import endpoints.decorated
|
||||
import json
|
||||
import features
|
||||
import hashlib
|
||||
import logging
|
||||
import bencode
|
||||
|
@ -40,7 +39,6 @@ import bencode
|
|||
import tarfile
|
||||
import shutil
|
||||
|
||||
from jwkest.jws import SIGNER_ALGS
|
||||
from jwkest.jwk import RSAKey
|
||||
from Crypto.PublicKey import RSA
|
||||
|
||||
|
@ -1153,7 +1151,6 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
|
|||
# Attempt to pull by digest.
|
||||
self.do_pull('devtable', 'newrepo', 'devtable', 'password', manifest_id=digest)
|
||||
|
||||
|
||||
def test_pull_invalid_image_tag(self):
|
||||
# Add a new repository under the user, so we have a real repository to pull.
|
||||
self.do_push('devtable', 'newrepo', 'devtable', 'password')
|
||||
|
@ -1163,7 +1160,6 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
|
|||
self.do_pull('devtable', 'newrepo', 'devtable', 'password', manifest_id='invalid',
|
||||
expect_failure=FailureCodes.INVALID_REGISTRY)
|
||||
|
||||
|
||||
def test_partial_upload_below_5mb(self):
|
||||
chunksize = 1024 * 1024 * 2
|
||||
size = chunksize * 3
|
||||
|
|
|
@ -11,6 +11,7 @@ from util.secscan.analyzer import LayerAnalyzer
|
|||
from util.secscan.notifier import process_notification_data
|
||||
from data import model
|
||||
from workers.security_notification_worker import SecurityNotificationWorker
|
||||
from endpoints.v2 import v2_bp
|
||||
|
||||
|
||||
ADMIN_ACCESS_USER = 'devtable'
|
||||
|
@ -155,6 +156,43 @@ class TestSecurityScanner(unittest.TestCase):
|
|||
self.assertIsNone(result)
|
||||
|
||||
|
||||
def test_analyze_layer_nodirectdownload_success(self):
|
||||
# Disable direct download in fake storage.
|
||||
storage.put_content(['local_us'], 'supports_direct_download', 'false')
|
||||
|
||||
try:
|
||||
app.register_blueprint(v2_bp, url_prefix='/v2')
|
||||
except:
|
||||
# Already registered.
|
||||
pass
|
||||
|
||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||
self.assertFalse(layer.security_indexed)
|
||||
self.assertEquals(-1, layer.security_indexed_engine)
|
||||
|
||||
# Ensure that the download is a registry+JWT download.
|
||||
uri, auth_header = self.api._get_image_url_and_auth(layer)
|
||||
self.assertIsNotNone(uri)
|
||||
self.assertIsNotNone(auth_header)
|
||||
|
||||
# Ensure the download doesn't work without the header.
|
||||
rv = self.app.head(uri)
|
||||
self.assertEquals(rv.status_code, 401)
|
||||
|
||||
# Ensure the download works with the header. Note we use a HEAD here, as GET causes DB
|
||||
# access which messes with the test runner's rollback.
|
||||
rv = self.app.head(uri, headers=[('authorization', auth_header)])
|
||||
self.assertEquals(rv.status_code, 200)
|
||||
|
||||
# Ensure the code works when called via analyze.
|
||||
with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content):
|
||||
analyzer = LayerAnalyzer(app.config, self.api)
|
||||
analyzer.analyze_recursively(layer)
|
||||
|
||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||
self.assertAnalyzed(layer, True, 1)
|
||||
|
||||
|
||||
def test_analyze_layer_success(self):
|
||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||
self.assertFalse(layer.security_indexed)
|
||||
|
|
|
@ -74,7 +74,7 @@ class SecurityScannerAPI(object):
|
|||
repository_and_namespace = '/'.join([namespace_name, repo_name])
|
||||
|
||||
# Generate the JWT which will authorize this
|
||||
audience = 'security_scanner'
|
||||
audience = self._app.config['SERVER_HOSTNAME']
|
||||
context, subject = build_context_and_subject(None, None, None)
|
||||
access = [{
|
||||
'type': 'repository',
|
||||
|
@ -83,7 +83,7 @@ class SecurityScannerAPI(object):
|
|||
}]
|
||||
auth_jwt = generate_jwt_object(audience, subject, context, access, TOKEN_VALIDITY_LIFETIME_S,
|
||||
self._config)
|
||||
auth_header = 'Bearer: {}'.format(auth_jwt)
|
||||
auth_header = 'Bearer {}'.format(auth_jwt)
|
||||
|
||||
with self._app.test_request_context('/'):
|
||||
relative_layer_url = url_for('v2.download_blob', repository=repository_and_namespace,
|
||||
|
|
Reference in a new issue