From 2eabf1a2914e88da0e4f28b0d3becbb6ec19b3e3 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 18 Oct 2016 23:44:08 -0400 Subject: [PATCH] Fix tests and test provider for real license format --- endpoints/api/superuser.py | 2 +- test/test_license.py | 509 ++++++++++++++++++++++----- util/config/provider/testprovider.py | 17 +- util/license.py | 5 +- 4 files changed, 436 insertions(+), 97 deletions(-) diff --git a/endpoints/api/superuser.py b/endpoints/api/superuser.py index b5a84f137..5f635e642 100644 --- a/endpoints/api/superuser.py +++ b/endpoints/api/superuser.py @@ -860,7 +860,7 @@ class SuperUserLicense(ApiResource): raise InvalidRequest('License is insufficient') return { - 'decoded': decoded_license.subscription, + 'decoded': {}, 'success': True } diff --git a/test/test_license.py b/test/test_license.py index 9dc920d61..c10f6a23e 100644 --- a/test/test_license.py +++ b/test/test_license.py @@ -9,9 +9,13 @@ from Crypto.PublicKey import RSA from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.serialization import load_der_public_key -from util.license import decode_license, LICENSE_PRODUCT_NAME, LicenseValidationError +from util.license import (decode_license, LicenseDecodeError, ExpirationType, + MONTHLY_GRACE_PERIOD, YEARLY_GRACE_PERIOD, TRIAL_GRACE_PERIOD) +def get_date(delta): + return str(datetime.now() + delta) + class TestLicense(unittest.TestCase): def keys(self): with open('test/data/test.pem') as f: @@ -21,12 +25,12 @@ class TestLicense(unittest.TestCase): backend=default_backend()) return (public_key, private_key) - def create_license(self, license_data): + def create_license(self, license_data, keys=None): jwt_data = { 'license': json.dumps(license_data), } - (public_key, private_key) = self.keys() + (public_key, private_key) = keys or self.keys() # Encode the license with the JWT key. encoded = jwt.encode(jwt_data, private_key, algorithm='RS256') @@ -34,102 +38,435 @@ class TestLicense(unittest.TestCase): # Decode it into a license object. return decode_license(encoded, public_key_instance=public_key) - def get_license(self, expiration_delta=None, **kwargs): - license_data = { - 'expirationDate': str(datetime.now() + expiration_delta), + def test_license_decodeerror_invalid(self): + with self.assertRaises(LicenseDecodeError): + decode_license('some random stuff') + + def test_license_decodeerror_badkey(self): + (_, private_key) = self.keys() + jwt_data = { + 'license': json.dumps({}), } - if kwargs: - sub = { - 'productName': LICENSE_PRODUCT_NAME, - } + encoded_stuff = jwt.encode(jwt_data, private_key, algorithm='RS256') + with self.assertRaises(LicenseDecodeError): + # Note that since we don't give a key here, the prod one will be used, and it should fail. + decode_license(encoded_stuff) - sub['trialOnly'] = kwargs.get('trial_only', False) - sub['inTrial'] = kwargs.get('in_trial', False) - sub['entitlements'] = kwargs.get('entitlements', []) + def assertValid(self, license, config=None): + results = license.validate(config or {}) + is_met = all([r.is_met() for r in results]) + self.assertTrue(is_met, [r for r in results if not r.is_met()]) - if 'trial_end' in kwargs: - sub['trialEnd'] = str(datetime.now() + kwargs['trial_end']) + def assertNotValid(self, license, config=None, requirement=None, expired=None): + results = license.validate(config or {}) + is_met = all([r.is_met() for r in results]) + self.assertFalse(is_met) - if 'service_end' in kwargs: - sub['serviceEnd'] = str(datetime.now() + kwargs['service_end']) + invalid_results = [r for r in results if not r.is_met()] + if requirement is not None: + self.assertEquals(invalid_results[0].requirement.name, requirement) - if 'duration' in kwargs: - sub['durationPeriod'] = kwargs['duration'] + if expired is not None: + self.assertEquals(invalid_results[0].entitlement.expiration.expiration_type, expired) - license_data['subscriptions'] = {'somesub': sub} - - decoded_license = self.create_license(license_data) - return decoded_license - - def test_license_itself_expired(self): - # License is expired. - license = self.get_license(timedelta(days=-30)) - - def test_no_qe_subscription(self): - # License is not expired, but there is no QE sub, so not valid. - license = self.get_license(timedelta(days=30)) - - def test_trial_withingrace(self): - license = self.get_license(timedelta(days=30), trial_only=True, trial_end=timedelta(days=-1)) - self.assertFalse(license.is_expired) - - def test_trial_outsidegrace(self): - license = self.get_license(timedelta(days=30), trial_only=True, trial_end=timedelta(days=-10)) - self.assertTrue(license.is_expired) - - def test_trial_intrial_withingrace(self): - license = self.get_license(timedelta(days=30), in_trial=True, service_end=timedelta(days=-1)) - self.assertFalse(license.is_expired) - - def test_trial_intrial_outsidegrace(self): - license = self.get_license(timedelta(days=30), in_trial=True, service_end=timedelta(days=-10)) - self.assertTrue(license.is_expired) - - def test_monthly_license_valid(self): - license = self.get_license(timedelta(days=30), service_end=timedelta(days=10), duration='months') - self.assertFalse(license.is_expired) - - def test_monthly_license_withingrace(self): - license = self.get_license(timedelta(days=30), service_end=timedelta(days=-10), duration='months') - self.assertFalse(license.is_expired) - - def test_monthly_license_outsidegrace(self): - license = self.get_license(timedelta(days=30), service_end=timedelta(days=-40), duration='months') - self.assertTrue(license.is_expired) - - def test_yearly_license_withingrace(self): - license = self.get_license(timedelta(days=30), service_end=timedelta(days=-40), duration='years') - self.assertFalse(license.is_expired) - - def test_yearly_license_outsidegrace(self): - license = self.get_license(timedelta(days=30), service_end=timedelta(days=-100), duration='years') - self.assertTrue(license.is_expired) - - def test_valid_license(self): - license = self.get_license(timedelta(days=300), service_end=timedelta(days=40), duration='years') - self.assertFalse(license.is_expired) - - def test_validate_basic_license(self): - decoded = self.get_license(timedelta(days=30), service_end=timedelta(days=40), - duration='months', entitlements={}) - decoded.validate({'DISTRIBUTED_STORAGE_CONFIG': [{}]}) - - def test_validate_storage_entitlement_valid(self): - decoded = self.get_license(timedelta(days=30), service_end=timedelta(days=40), entitlements={ - 'software.quay.regions': 2, + def test_missing_subscriptions(self): + license = self.create_license({ + "expirationDate": get_date(timedelta(days=10)), }) - decoded.validate({'DISTRIBUTED_STORAGE_CONFIG': [{}]}) + self.assertNotValid(license, requirement='software.quay') - def test_validate_storage_entitlement_invalid(self): - decoded = self.get_license(timedelta(days=30), service_end=timedelta(days=40), entitlements={ - 'software.quay.regions': 1, + def test_empty_subscriptions(self): + license = self.create_license({ + "expirationDate": get_date(timedelta(days=10)), + "subscriptions": {}, }) - with self.assertRaises(LicenseValidationError): - decoded.validate({'DISTRIBUTED_STORAGE_CONFIG': [{}, {}]}) + self.assertNotValid(license, requirement='software.quay') + def test_missing_quay_entitlement(self): + license = self.create_license({ + "expirationDate": get_date(timedelta(days=10)), + "subscriptions": { + "somesub": { + "serviceEnd": get_date(timedelta(days=10)), + "entitlements": { + "software.quay.regions": 0, + }, + }, + }, + }) + + self.assertNotValid(license, requirement='software.quay') + + def test_valid_quay_entitlement(self): + license = self.create_license({ + "expirationDate": get_date(timedelta(days=10)), + "subscriptions": { + "somesub": { + "serviceEnd": get_date(timedelta(days=10)), + "entitlements": { + "software.quay": 1, + "software.quay.regions": 1, + }, + }, + }, + }) + + self.assertValid(license) + + def test_missing_expiration(self): + license = self.create_license({ + "subscriptions": { + "somesub": { + "serviceEnd": get_date(timedelta(days=10)), + "entitlements": { + "software.quay": 1, + "software.quay.regions": 1, + }, + }, + }, + }) + + self.assertNotValid(license, expired=ExpirationType.license_wide) + + def test_expired_license(self): + license = self.create_license({ + "expirationDate": get_date(timedelta(days=-10)), + "subscriptions": { + "somesub": { + "serviceEnd": get_date(timedelta(days=10)), + "entitlements": { + "software.quay": 1, + "software.quay.regions": 1, + }, + }, + }, + }) + + self.assertNotValid(license, expired=ExpirationType.license_wide) + + def test_expired_sub_implicit_monthly_withingrace(self): + license = self.create_license({ + "expirationDate": get_date(timedelta(days=10)), + "subscriptions": { + "somesub": { + "serviceEnd": get_date(MONTHLY_GRACE_PERIOD * -1 + timedelta(days=1)), + "entitlements": { + "software.quay": 1, + "software.quay.regions": 1, + }, + }, + }, + }) + + self.assertValid(license) + + def test_expired_sub_monthly_withingrace(self): + license = self.create_license({ + "expirationDate": get_date(timedelta(days=10)), + "subscriptions": { + "somesub": { + "serviceEnd": get_date(MONTHLY_GRACE_PERIOD * -1 + timedelta(days=1)), + "durationPeriod": "monthly", + "entitlements": { + "software.quay": 1, + "software.quay.regions": 1, + }, + }, + }, + }) + + self.assertValid(license) + + def test_expired_sub_monthly_outsidegrace(self): + license = self.create_license({ + "expirationDate": get_date(timedelta(days=10)), + "subscriptions": { + "somesub": { + "serviceEnd": get_date(MONTHLY_GRACE_PERIOD * -1 + timedelta(days=-1)), + "durationPeriod": "monthly", + "entitlements": { + "software.quay": 1, + "software.quay.regions": 1, + }, + }, + }, + }) + + self.assertNotValid(license, expired=ExpirationType.monthly) + + def test_expired_sub_yearly_withingrace(self): + license = self.create_license({ + "expirationDate": get_date(timedelta(days=10)), + "subscriptions": { + "somesub": { + "serviceEnd": get_date(YEARLY_GRACE_PERIOD * -1 + timedelta(days=1)), + "durationPeriod": "yearly", + "entitlements": { + "software.quay": 1, + "software.quay.regions": 1, + }, + }, + }, + }) + + self.assertValid(license) + + def test_expired_sub_yearly_outsidegrace(self): + license = self.create_license({ + "expirationDate": get_date(timedelta(days=10)), + "subscriptions": { + "somesub": { + "serviceEnd": get_date(YEARLY_GRACE_PERIOD * -1 + timedelta(days=-1)), + "durationPeriod": "yearly", + "entitlements": { + "software.quay": 1, + "software.quay.regions": 1, + }, + }, + }, + }) + + self.assertNotValid(license, expired=ExpirationType.yearly) + + def test_expired_sub_intrial_withingrace(self): + license = self.create_license({ + "expirationDate": get_date(timedelta(days=10)), + "subscriptions": { + "somesub": { + "serviceEnd": get_date(TRIAL_GRACE_PERIOD * -1 + timedelta(days=1)), + "inTrial": True, + "entitlements": { + "software.quay": 1, + "software.quay.regions": 1, + }, + }, + }, + }) + + self.assertValid(license) + + def test_expired_sub_intrial_outsidegrace(self): + license = self.create_license({ + "expirationDate": get_date(timedelta(days=10)), + "subscriptions": { + "somesub": { + "serviceEnd": get_date(TRIAL_GRACE_PERIOD * -1 + timedelta(days=-1)), + "inTrial": True, + "entitlements": { + "software.quay": 1, + "software.quay.regions": 1, + }, + }, + }, + }) + + self.assertNotValid(license, expired=ExpirationType.in_trial) + + def test_expired_sub_trialonly_withingrace(self): + license = self.create_license({ + "expirationDate": get_date(timedelta(days=10)), + "subscriptions": { + "somesub": { + "trialEnd": get_date(TRIAL_GRACE_PERIOD * -1 + timedelta(days=1)), + "trialOnly": True, + "entitlements": { + "software.quay": 1, + "software.quay.regions": 1, + }, + }, + }, + }) + + self.assertValid(license) + + def test_expired_sub_trialonly_outsidegrace(self): + license = self.create_license({ + "expirationDate": get_date(timedelta(days=10)), + "subscriptions": { + "somesub": { + "trialEnd": get_date(TRIAL_GRACE_PERIOD * -1 + timedelta(days=-1)), + "trialOnly": True, + "entitlements": { + "software.quay": 1, + "software.quay.regions": 1, + }, + }, + }, + }) + + self.assertNotValid(license, expired=ExpirationType.trial_only) + + def test_valid_quay_entitlement_regions(self): + license = self.create_license({ + "expirationDate": get_date(timedelta(days=10)), + "subscriptions": { + "somesub": { + "serviceEnd": get_date(timedelta(days=10)), + "entitlements": { + "software.quay": 1, + "software.quay.regions": 1, + }, + }, + }, + }) + + config = { + 'DISTRIBUTED_STORAGE_CONFIG': [ + {'name': 'first'}, + ], + } + + self.assertValid(license, config=config) + + def test_invalid_quay_entitlement_regions(self): + license = self.create_license({ + "expirationDate": get_date(timedelta(days=10)), + "subscriptions": { + "somesub": { + "serviceEnd": get_date(timedelta(days=10)), + "entitlements": { + "software.quay": 1, + "software.quay.regions": 1, + }, + }, + }, + }) + + config = { + 'DISTRIBUTED_STORAGE_CONFIG': [ + {'name': 'first'}, + {'name': 'second'}, + ], + } + + self.assertNotValid(license, config=config, requirement='software.quay.regions') + + def test_valid_regions_across_multiple_sub(self): + license = self.create_license({ + "expirationDate": get_date(timedelta(days=10)), + "subscriptions": { + "somesub": { + "serviceEnd": get_date(timedelta(days=10)), + "entitlements": { + "software.quay": 1, + "software.quay.regions": 1, + }, + }, + "anothersub": { + "serviceEnd": get_date(timedelta(days=20)), + "entitlements": { + "software.quay.regions": 5, + }, + }, + }, + }) + + config = { + 'DISTRIBUTED_STORAGE_CONFIG': [ + {'name': 'first'}, + {'name': 'second'}, + ], + } + + self.assertValid(license, config=config) + + def test_valid_regions_across_multiple_sub_one_expired(self): + # Setup a license with one sub having too few regions, and another having enough, but it is + # expired. + license = self.create_license({ + "expirationDate": get_date(timedelta(days=10)), + "subscriptions": { + "somesub": { + "serviceEnd": get_date(timedelta(days=10)), + "entitlements": { + "software.quay": 1, + "software.quay.regions": 1, + }, + }, + "anothersub": { + "trialEnd": get_date(TRIAL_GRACE_PERIOD * -1 + timedelta(days=-1)), + "trialOnly": True, + "entitlements": { + "software.quay.regions": 5, + }, + }, + }, + }) + + config = { + 'DISTRIBUTED_STORAGE_CONFIG': [ + {'name': 'first'}, + {'name': 'second'}, + ], + } + + self.assertNotValid(license, config=config, requirement='software.quay.regions', + expired=ExpirationType.trial_only) + + def test_valid_regions_across_multiple_sub_one_expired(self): + license = self.create_license({ + "expirationDate": get_date(timedelta(days=10)), + "subscriptions": { + "somesub": { + "trialEnd": get_date(TRIAL_GRACE_PERIOD * -1 + timedelta(days=-1)), + "trialOnly": True, + "entitlements": { + "software.quay": 1, + "software.quay.regions": 3, + }, + }, + "anothersub": { + "serviceEnd": get_date(timedelta(days=20)), + "entitlements": { + "software.quay": 1, + "software.quay.regions": 5, + }, + }, + }, + }) + + config = { + 'DISTRIBUTED_STORAGE_CONFIG': [ + {'name': 'first'}, + {'name': 'second'}, + ], + } + + self.assertValid(license, config=config) + + def test_quay_is_under_expired_sub(self): + license = self.create_license({ + "expirationDate": get_date(timedelta(days=10)), + "subscriptions": { + "somesub": { + "trialEnd": get_date(TRIAL_GRACE_PERIOD * -1 + timedelta(days=-1)), + "trialOnly": True, + "entitlements": { + "software.quay": 1, + "software.quay.regions": 3, + }, + }, + "anothersub": { + "serviceEnd": get_date(timedelta(days=20)), + "entitlements": { + "software.quay.regions": 5, + }, + }, + }, + }) + + config = { + 'DISTRIBUTED_STORAGE_CONFIG': [ + {'name': 'first'}, + {'name': 'second'}, + ], + } + + self.assertNotValid(license, config=config, expired=ExpirationType.trial_only, + requirement='software.quay') if __name__ == '__main__': unittest.main() diff --git a/util/config/provider/testprovider.py b/util/config/provider/testprovider.py index 6f9400b50..ecf2376e9 100644 --- a/util/config/provider/testprovider.py +++ b/util/config/provider/testprovider.py @@ -1,21 +1,22 @@ import json import io +from datetime import datetime, timedelta from util.config.provider.baseprovider import BaseProvider +from util.license import (EntitlementValidationResult, Entitlement, Expiration, ExpirationType, + EntitlementRequirement) REAL_FILES = ['test/data/signing-private.gpg', 'test/data/signing-public.gpg'] class TestLicense(object): - @property - def subscription(self): - return {} - - @property - def is_expired(self): - return False + def validate_entitlement_requirement(self, entitlement_req, check_time): + expiration = Expiration(ExpirationType.license_wide, datetime.now() + timedelta(days=31)) + entitlement = Entitlement('fake', 0, 'someprod', expiration) + fakereq = EntitlementRequirement('fake', 0) + return EntitlementValidationResult(fakereq, datetime.now(), entitlement) def validate(self, config): - pass + return [self.validate_entitlement_requirement(None, None)] class TestConfigProvider(BaseProvider): """ Implementation of the config provider for testing. Everything is kept in-memory instead on diff --git a/util/license.py b/util/license.py index c8bc4b107..694a0b54b 100644 --- a/util/license.py +++ b/util/license.py @@ -220,7 +220,6 @@ class License(object): # We assume monthly license unless specified otherwise return Expiration(ExpirationType.monthly, service_end, MONTHLY_GRACE_PERIOD) - def validate(self, config): """ Returns a list of EntitlementValidationResult objects, one per requirement. """ @@ -284,8 +283,10 @@ class LicenseValidator(Thread): synchronization primitive. """ def __init__(self, config_provider, *args, **kwargs): + config = config_provider.get_config() or {} + self._config_provider = config_provider - self._entitlement_requirements = _gen_entitlement_requirements(config_provider.get_config()) + self._entitlement_requirements = _gen_entitlement_requirements(config) # multiprocessing.Value does not ensure consistent write-after-reads, but we don't need that. self._license_is_insufficient = multiprocessing.Value(c_bool, True)