Pull out security scanner validation into validator class

This commit is contained in:
Joseph Schorr 2017-02-09 17:28:39 -08:00
parent c0f7530b29
commit 3db4c15459
5 changed files with 168 additions and 29 deletions

View file

@ -0,0 +1,86 @@
import os
import pytest
import shutil
from flask import Flask, jsonify
from flask.ext.login import LoginManager
from peewee import SqliteDatabase
from app import app as application
from data import model
from data.database import (close_db_filter, db)
from data.model.user import LoginWrappedDBUser
from endpoints.api import api_bp
from initdb import initialize_database, populate_database
from path_converters import APIRepositoryPathConverter, RegexConverter
@pytest.fixture()
def app(appconfig):
""" Used by pytest-flask plugin to inject app by test for client See test_security by name injection of client. """
app = Flask(__name__)
login_manager = LoginManager(app)
@app.errorhandler(model.DataModelException)
def handle_dme(ex):
response = jsonify({'message': ex.message})
response.status_code = 400
return response
@login_manager.user_loader
def load_user(user_uuid):
return LoginWrappedDBUser(user_uuid)
app.url_map.converters['regex'] = RegexConverter
app.url_map.converters['apirepopath'] = APIRepositoryPathConverter
app.register_blueprint(api_bp, url_prefix='/api')
app.config.update(appconfig)
return app
@pytest.fixture(scope="session")
def init_db_path(tmpdir_factory):
""" Creates a new db and appropriate configuration. Used for parameter by name injection. """
sqlitedb_file = str(tmpdir_factory.mktemp("data").join("test.db"))
sqlitedb = 'sqlite:///{0}'.format(sqlitedb_file)
conf = {"TESTING": True,
"DEBUG": True,
"DB_URI": sqlitedb}
os.environ['TEST_DATABASE_URI'] = str(sqlitedb)
os.environ['DB_URI'] = str(sqlitedb)
db.initialize(SqliteDatabase(sqlitedb_file))
application.config.update(conf)
application.config.update({"DB_URI": sqlitedb})
initialize_database()
populate_database()
close_db_filter(None)
return str(sqlitedb_file)
@pytest.fixture()
def database_uri(monkeypatch, init_db_path, sqlitedb_file):
""" Creates the db uri. Used for parameter by name injection. """
shutil.copy2(init_db_path, sqlitedb_file)
db.initialize(SqliteDatabase(sqlitedb_file))
db_path = 'sqlite:///{0}'.format(sqlitedb_file)
monkeypatch.setenv("DB_URI", db_path)
return db_path
@pytest.fixture()
def sqlitedb_file(tmpdir):
""" Makes file for db. Used for parameter by name injection. """
test_db_file = tmpdir.mkdir("quaydb").join("test.db")
return str(test_db_file)
@pytest.fixture()
def appconfig(database_uri):
""" Makes conf with database_uri. Used for parameter by name injection """
conf = {
"TESTING": True,
"DEBUG": True,
"DB_URI": database_uri,
"SECRET_KEY": 'superdupersecret!!!1',
}
return conf

View file

@ -0,0 +1,36 @@
import pytest
from util.config.validators import ConfigValidationException
from util.config.validators.validate_secscan import SecurityScannerValidator
from util.secscan.fake import fake_security_scanner
@pytest.mark.parametrize('unvalidated_config', [
({'DISTRIBUTED_STORAGE_PREFERENCE': []}),
])
def test_validate_noop(unvalidated_config, app):
SecurityScannerValidator.validate(unvalidated_config, None, None)
@pytest.mark.parametrize('unvalidated_config, expected_error, error_message', [
({
'TESTING': True,
'DISTRIBUTED_STORAGE_PREFERENCE': [],
'FEATURE_SECURITY_SCANNER': True,
'SECURITY_SCANNER_ENDPOINT': 'http://invalidhost',
}, Exception, 'Connection error when trying to connect to security scanner endpoint'),
({
'TESTING': True,
'DISTRIBUTED_STORAGE_PREFERENCE': [],
'FEATURE_SECURITY_SCANNER': True,
'SECURITY_SCANNER_ENDPOINT': 'http://fakesecurityscanner',
}, None, None),
])
def test_validate(unvalidated_config, expected_error, error_message, app):
with fake_security_scanner(hostname='fakesecurityscanner'):
if expected_error is not None:
with pytest.raises(expected_error) as ipe:
SecurityScannerValidator.validate(unvalidated_config, None, None)
assert ipe.value.message == error_message
else:
SecurityScannerValidator.validate(unvalidated_config, None, None)

View file

@ -0,0 +1,36 @@
import time
from app import app
from boot import setup_jwt_proxy
from util.secscan.api import SecurityScannerAPI
from util.config.validators import BaseValidator, ConfigValidationException
class SecurityScannerValidator(BaseValidator):
name = "security-scanner"
@classmethod
def validate(cls, config, user, user_password):
""" Validates the configuration for talking to a Quay Security Scanner. """
if not config.get('FEATURE_SECURITY_SCANNER', False):
return
client = app.config['HTTPCLIENT']
api = SecurityScannerAPI(app, config, None, client=client, skip_validation=True)
if not config.get('TESTING', False):
# Generate a temporary Quay key to use for signing the outgoing requests.
setup_jwt_proxy()
# We have to wait for JWT proxy to restart with the newly generated key.
max_tries = 5
response = None
while max_tries > 0:
response = api.ping()
if response.status_code == 200:
return
time.sleep(1)
max_tries = max_tries - 1
message = 'Expected 200 status code, got %s: %s' % (response.status_code, response.text)
raise ConfigValidationException('Could not ping security scanner: %s' % message)