This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/util/tufmetadata/test/test_tufmetadata.py

135 lines
No EOL
4.8 KiB
Python

import pytest
import requests
from mock import mock,patch
from flask import Flask
from test import testconfig
from test.fixtures import init_db_path
from util.tufmetadata import api
from data import model
valid_response = {
'signed' : {
'type': 'Targets',
'delegations': {
'keys': {},
'roles': {},
},
'expires': '2020-03-30T18:55:26.594764859-04:00',
'targets': {
'latest': {
'hashes': {
'sha256': 'mLmxwTyUrqIRDaz8uaBapfrp3GPERfsDg2kiMujlteo='
},
'length': 1500
}
},
'version': 2
},
'signatures': [
{
'method': 'ecdsa',
'sig': 'yYnJGsYAYEL9PSwXisdG7JUEM1YK2IIKM147K3fWJthF4w+vl3xOm67r5ZNuDK6ss/Ff+x8yljZdT3sE/Hg5mw=='
}
]
}
@pytest.mark.parametrize('tuf_prefix,server_hostname,namespace,repo,gun', [
("quay.dev", "quay.io", "ns", "repo", "quay.dev/ns/repo"),
(None, "quay.io", "ns", "repo", "quay.io/ns/repo"),
("quay.dev/", "quay.io", "ns", "repo", "quay.dev/ns/repo"),
(None, "quay.io/", "ns", "repo", "quay.io/ns/repo"),
(None, "localhost:5000/", "ns", "repo", "localhost:5000/ns/repo"),
(None, "localhost:5000", "ns", "repo", "localhost:5000/ns/repo"),
])
def test_gun(tuf_prefix, server_hostname, namespace, repo, gun):
app = Flask(__name__)
app.config.from_object(testconfig.TestConfig())
app.config['TUF_GUN_PREFIX'] = tuf_prefix
app.config['SERVER_HOSTNAME'] = server_hostname
tuf_api = api.TUFMetadataAPI(app, app.config)
assert gun == tuf_api._gun(namespace, repo)
@pytest.mark.parametrize('response_code,response_body,expected', [
(200, valid_response, (valid_response['signed']['targets'], '2020-03-30T18:55:26.594764859-04:00')),
(200, {'garbage': 'data'}, (None, None))
])
def test_get_metadata(response_code, response_body, expected):
app = Flask(__name__)
app.config.from_object(testconfig.TestConfig())
client = mock.Mock()
request = mock.Mock(status_code=response_code)
request.json.return_value = response_body
client.request.return_value = request
tuf_api = api.TUFMetadataAPI(app, app.config, client=client)
response = tuf_api.get_default_tags_with_expiration('quay', 'quay')
assert response == expected
@pytest.mark.parametrize('connection_error,response_code,exception', [
(True, 200, requests.exceptions.Timeout),
(True, 200, requests.exceptions.ConnectionError),
(False, 200, requests.exceptions.RequestException),
(False, 200, ValueError),
(True, 500, api.Non200ResponseException(mock.Mock(status_code=500))),
(False, 400, api.Non200ResponseException(mock.Mock(status_code=400))),
(False, 404, api.Non200ResponseException(mock.Mock(status_code=404))),
(False, 200, api.InvalidMetadataException)
])
def test_get_metadata_exception(connection_error, response_code, exception):
app = Flask(__name__)
app.config.from_object(testconfig.TestConfig())
request = mock.Mock(status_code=response_code)
client = mock.Mock(request=request)
client.request.side_effect = exception
tuf_api = api.TUFMetadataAPI(app, app.config, client=client)
tags, expiration = tuf_api.get_default_tags_with_expiration('quay', 'quay')
assert tags == None
assert expiration == None
@pytest.mark.parametrize('response_code,expected', [
(200, True),
(400, False),
(401, False),
])
def test_delete_metadata(response_code, expected):
app = Flask(__name__)
app.config.from_object(testconfig.TestConfig())
client = mock.Mock()
request = mock.Mock(status_code=response_code)
client.request.return_value = request
tuf_api = api.TUFMetadataAPI(app, app.config, client=client)
response = tuf_api.delete_metadata('quay', 'quay')
assert response == expected
@pytest.mark.parametrize('response_code,exception', [
(200, requests.exceptions.Timeout),
(200, requests.exceptions.ConnectionError),
(200, requests.exceptions.RequestException),
(200, ValueError),
(500, api.Non200ResponseException(mock.Mock(status_code=500))),
(400, api.Non200ResponseException(mock.Mock(status_code=400))),
(401, api.Non200ResponseException(mock.Mock(status_code=401))),
(404, api.Non200ResponseException(mock.Mock(status_code=404))),
])
def test_delete_metadata_exception(response_code, exception):
app = Flask(__name__)
app.config.from_object(testconfig.TestConfig())
request = mock.Mock(status_code=response_code)
client = mock.Mock(request=request)
client.request.side_effect = exception
tuf_api = api.TUFMetadataAPI(app, app.config, client=client)
response = tuf_api.delete_metadata('quay', 'quay')
assert response == False
def test_purge_repo(init_db_path):
app = Flask(__name__)
app.config.from_object(testconfig.TestConfig())
app.config["DB_URI"] = init_db_path
with patch('app.tuf_metadata_api') as mock_tuf:
model.repository.purge_repository("ns", "repo")
assert mock_tuf.delete_metadata.called_with("ns", "repo")