473 lines
13 KiB
Python
473 lines
13 KiB
Python
import unittest
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
import jwt
|
|
import json
|
|
|
|
from Crypto.PublicKey import RSA
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives.serialization import load_der_public_key
|
|
|
|
from util.license import (decode_license, LicenseDecodeError, ExpirationType,
|
|
MONTHLY_GRACE_PERIOD, YEARLY_GRACE_PERIOD, TRIAL_GRACE_PERIOD)
|
|
|
|
|
|
def get_date(delta):
|
|
return str(datetime.now() + delta)
|
|
|
|
class TestLicense(unittest.TestCase):
|
|
def keys(self):
|
|
with open('test/data/test.pem') as f:
|
|
private_key = f.read()
|
|
|
|
public_key = load_der_public_key(RSA.importKey(private_key).publickey().exportKey('DER'),
|
|
backend=default_backend())
|
|
return (public_key, private_key)
|
|
|
|
def create_license(self, license_data, keys=None):
|
|
jwt_data = {
|
|
'license': json.dumps(license_data),
|
|
}
|
|
|
|
(public_key, private_key) = keys or self.keys()
|
|
|
|
# Encode the license with the JWT key.
|
|
encoded = jwt.encode(jwt_data, private_key, algorithm='RS256')
|
|
|
|
# Decode it into a license object.
|
|
return decode_license(encoded, public_key_instance=public_key)
|
|
|
|
def test_license_decodeerror_invalid(self):
|
|
with self.assertRaises(LicenseDecodeError):
|
|
decode_license('some random stuff')
|
|
|
|
def test_license_decodeerror_badkey(self):
|
|
(_, private_key) = self.keys()
|
|
jwt_data = {
|
|
'license': json.dumps({}),
|
|
}
|
|
|
|
encoded_stuff = jwt.encode(jwt_data, private_key, algorithm='RS256')
|
|
with self.assertRaises(LicenseDecodeError):
|
|
# Note that since we don't give a key here, the prod one will be used, and it should fail.
|
|
decode_license(encoded_stuff)
|
|
|
|
def assertValid(self, license, config=None):
|
|
results = license.validate(config or {})
|
|
is_met = all([r.is_met() for r in results])
|
|
self.assertTrue(is_met, [r for r in results if not r.is_met()])
|
|
|
|
def assertNotValid(self, license, config=None, requirement=None, expired=None):
|
|
results = license.validate(config or {})
|
|
is_met = all([r.is_met() for r in results])
|
|
self.assertFalse(is_met)
|
|
|
|
invalid_results = [r for r in results if not r.is_met()]
|
|
if requirement is not None:
|
|
self.assertEquals(invalid_results[0].requirement.name, requirement)
|
|
|
|
if expired is not None:
|
|
self.assertEquals(invalid_results[0].entitlement.expiration.expiration_type, expired)
|
|
|
|
def test_missing_subscriptions(self):
|
|
license = self.create_license({
|
|
"expirationDate": get_date(timedelta(days=10)),
|
|
})
|
|
|
|
self.assertNotValid(license, requirement='software.quay')
|
|
|
|
def test_empty_subscriptions(self):
|
|
license = self.create_license({
|
|
"expirationDate": get_date(timedelta(days=10)),
|
|
"subscriptions": {},
|
|
})
|
|
|
|
self.assertNotValid(license, requirement='software.quay')
|
|
|
|
def test_missing_quay_entitlement(self):
|
|
license = self.create_license({
|
|
"expirationDate": get_date(timedelta(days=10)),
|
|
"subscriptions": {
|
|
"somesub": {
|
|
"serviceEnd": get_date(timedelta(days=10)),
|
|
"entitlements": {
|
|
"software.quay.regions": 0,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
self.assertNotValid(license, requirement='software.quay')
|
|
|
|
def test_valid_quay_entitlement(self):
|
|
license = self.create_license({
|
|
"expirationDate": get_date(timedelta(days=10)),
|
|
"subscriptions": {
|
|
"somesub": {
|
|
"serviceEnd": get_date(timedelta(days=10)),
|
|
"entitlements": {
|
|
"software.quay": 1,
|
|
"software.quay.regions": 1,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
self.assertValid(license)
|
|
|
|
def test_missing_expiration(self):
|
|
license = self.create_license({
|
|
"subscriptions": {
|
|
"somesub": {
|
|
"serviceEnd": get_date(timedelta(days=10)),
|
|
"entitlements": {
|
|
"software.quay": 1,
|
|
"software.quay.regions": 1,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
self.assertNotValid(license, expired=ExpirationType.license_wide)
|
|
|
|
def test_expired_license(self):
|
|
license = self.create_license({
|
|
"expirationDate": get_date(timedelta(days=-10)),
|
|
"subscriptions": {
|
|
"somesub": {
|
|
"serviceEnd": get_date(timedelta(days=10)),
|
|
"entitlements": {
|
|
"software.quay": 1,
|
|
"software.quay.regions": 1,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
self.assertNotValid(license, expired=ExpirationType.license_wide)
|
|
|
|
def test_expired_sub_implicit_monthly_withingrace(self):
|
|
license = self.create_license({
|
|
"expirationDate": get_date(timedelta(days=10)),
|
|
"subscriptions": {
|
|
"somesub": {
|
|
"serviceEnd": get_date(MONTHLY_GRACE_PERIOD * -1 + timedelta(days=1)),
|
|
"entitlements": {
|
|
"software.quay": 1,
|
|
"software.quay.regions": 1,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
self.assertValid(license)
|
|
|
|
def test_expired_sub_monthly_withingrace(self):
|
|
license = self.create_license({
|
|
"expirationDate": get_date(timedelta(days=10)),
|
|
"subscriptions": {
|
|
"somesub": {
|
|
"serviceEnd": get_date(MONTHLY_GRACE_PERIOD * -1 + timedelta(days=1)),
|
|
"durationPeriod": "monthly",
|
|
"entitlements": {
|
|
"software.quay": 1,
|
|
"software.quay.regions": 1,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
self.assertValid(license)
|
|
|
|
def test_expired_sub_monthly_outsidegrace(self):
|
|
license = self.create_license({
|
|
"expirationDate": get_date(timedelta(days=10)),
|
|
"subscriptions": {
|
|
"somesub": {
|
|
"serviceEnd": get_date(MONTHLY_GRACE_PERIOD * -1 + timedelta(days=-1)),
|
|
"durationPeriod": "monthly",
|
|
"entitlements": {
|
|
"software.quay": 1,
|
|
"software.quay.regions": 1,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
self.assertNotValid(license, expired=ExpirationType.monthly)
|
|
|
|
def test_expired_sub_yearly_withingrace(self):
|
|
license = self.create_license({
|
|
"expirationDate": get_date(timedelta(days=10)),
|
|
"subscriptions": {
|
|
"somesub": {
|
|
"serviceEnd": get_date(YEARLY_GRACE_PERIOD * -1 + timedelta(days=1)),
|
|
"durationPeriod": "yearly",
|
|
"entitlements": {
|
|
"software.quay": 1,
|
|
"software.quay.regions": 1,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
self.assertValid(license)
|
|
|
|
def test_expired_sub_yearly_outsidegrace(self):
|
|
license = self.create_license({
|
|
"expirationDate": get_date(timedelta(days=10)),
|
|
"subscriptions": {
|
|
"somesub": {
|
|
"serviceEnd": get_date(YEARLY_GRACE_PERIOD * -1 + timedelta(days=-1)),
|
|
"durationPeriod": "yearly",
|
|
"entitlements": {
|
|
"software.quay": 1,
|
|
"software.quay.regions": 1,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
self.assertNotValid(license, expired=ExpirationType.yearly)
|
|
|
|
def test_expired_sub_intrial_withingrace(self):
|
|
license = self.create_license({
|
|
"expirationDate": get_date(timedelta(days=10)),
|
|
"subscriptions": {
|
|
"somesub": {
|
|
"serviceEnd": get_date(TRIAL_GRACE_PERIOD * -1 + timedelta(days=1)),
|
|
"inTrial": True,
|
|
"entitlements": {
|
|
"software.quay": 1,
|
|
"software.quay.regions": 1,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
self.assertValid(license)
|
|
|
|
def test_expired_sub_intrial_outsidegrace(self):
|
|
license = self.create_license({
|
|
"expirationDate": get_date(timedelta(days=10)),
|
|
"subscriptions": {
|
|
"somesub": {
|
|
"serviceEnd": get_date(TRIAL_GRACE_PERIOD * -1 + timedelta(days=-1)),
|
|
"inTrial": True,
|
|
"entitlements": {
|
|
"software.quay": 1,
|
|
"software.quay.regions": 1,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
self.assertNotValid(license, expired=ExpirationType.in_trial)
|
|
|
|
def test_expired_sub_trialonly_withingrace(self):
|
|
license = self.create_license({
|
|
"expirationDate": get_date(timedelta(days=10)),
|
|
"subscriptions": {
|
|
"somesub": {
|
|
"trialEnd": get_date(TRIAL_GRACE_PERIOD * -1 + timedelta(days=1)),
|
|
"trialOnly": True,
|
|
"entitlements": {
|
|
"software.quay": 1,
|
|
"software.quay.regions": 1,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
self.assertValid(license)
|
|
|
|
def test_expired_sub_trialonly_outsidegrace(self):
|
|
license = self.create_license({
|
|
"expirationDate": get_date(timedelta(days=10)),
|
|
"subscriptions": {
|
|
"somesub": {
|
|
"trialEnd": get_date(TRIAL_GRACE_PERIOD * -1 + timedelta(days=-1)),
|
|
"trialOnly": True,
|
|
"entitlements": {
|
|
"software.quay": 1,
|
|
"software.quay.regions": 1,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
self.assertNotValid(license, expired=ExpirationType.trial_only)
|
|
|
|
def test_valid_quay_entitlement_regions(self):
|
|
license = self.create_license({
|
|
"expirationDate": get_date(timedelta(days=10)),
|
|
"subscriptions": {
|
|
"somesub": {
|
|
"serviceEnd": get_date(timedelta(days=10)),
|
|
"entitlements": {
|
|
"software.quay": 1,
|
|
"software.quay.regions": 1,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
config = {
|
|
'DISTRIBUTED_STORAGE_CONFIG': [
|
|
{'name': 'first'},
|
|
],
|
|
}
|
|
|
|
self.assertValid(license, config=config)
|
|
|
|
def test_invalid_quay_entitlement_regions(self):
|
|
license = self.create_license({
|
|
"expirationDate": get_date(timedelta(days=10)),
|
|
"subscriptions": {
|
|
"somesub": {
|
|
"serviceEnd": get_date(timedelta(days=10)),
|
|
"entitlements": {
|
|
"software.quay": 1,
|
|
"software.quay.regions": 1,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
config = {
|
|
'DISTRIBUTED_STORAGE_CONFIG': [
|
|
{'name': 'first'},
|
|
{'name': 'second'},
|
|
],
|
|
}
|
|
|
|
self.assertNotValid(license, config=config, requirement='software.quay.regions')
|
|
|
|
def test_valid_regions_across_multiple_sub(self):
|
|
license = self.create_license({
|
|
"expirationDate": get_date(timedelta(days=10)),
|
|
"subscriptions": {
|
|
"somesub": {
|
|
"serviceEnd": get_date(timedelta(days=10)),
|
|
"entitlements": {
|
|
"software.quay": 1,
|
|
"software.quay.regions": 1,
|
|
},
|
|
},
|
|
"anothersub": {
|
|
"serviceEnd": get_date(timedelta(days=20)),
|
|
"entitlements": {
|
|
"software.quay.regions": 5,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
config = {
|
|
'DISTRIBUTED_STORAGE_CONFIG': [
|
|
{'name': 'first'},
|
|
{'name': 'second'},
|
|
],
|
|
}
|
|
|
|
self.assertValid(license, config=config)
|
|
|
|
def test_valid_regions_across_multiple_sub_one_expired(self):
|
|
# Setup a license with one sub having too few regions, and another having enough, but it is
|
|
# expired.
|
|
license = self.create_license({
|
|
"expirationDate": get_date(timedelta(days=10)),
|
|
"subscriptions": {
|
|
"somesub": {
|
|
"serviceEnd": get_date(timedelta(days=10)),
|
|
"entitlements": {
|
|
"software.quay": 1,
|
|
"software.quay.regions": 1,
|
|
},
|
|
},
|
|
"anothersub": {
|
|
"trialEnd": get_date(TRIAL_GRACE_PERIOD * -1 + timedelta(days=-1)),
|
|
"trialOnly": True,
|
|
"entitlements": {
|
|
"software.quay.regions": 5,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
config = {
|
|
'DISTRIBUTED_STORAGE_CONFIG': [
|
|
{'name': 'first'},
|
|
{'name': 'second'},
|
|
],
|
|
}
|
|
|
|
self.assertNotValid(license, config=config, requirement='software.quay.regions',
|
|
expired=ExpirationType.trial_only)
|
|
|
|
def test_valid_regions_across_multiple_sub_one_expired(self):
|
|
license = self.create_license({
|
|
"expirationDate": get_date(timedelta(days=10)),
|
|
"subscriptions": {
|
|
"somesub": {
|
|
"trialEnd": get_date(TRIAL_GRACE_PERIOD * -1 + timedelta(days=-1)),
|
|
"trialOnly": True,
|
|
"entitlements": {
|
|
"software.quay": 1,
|
|
"software.quay.regions": 3,
|
|
},
|
|
},
|
|
"anothersub": {
|
|
"serviceEnd": get_date(timedelta(days=20)),
|
|
"entitlements": {
|
|
"software.quay": 1,
|
|
"software.quay.regions": 5,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
config = {
|
|
'DISTRIBUTED_STORAGE_CONFIG': [
|
|
{'name': 'first'},
|
|
{'name': 'second'},
|
|
],
|
|
}
|
|
|
|
self.assertValid(license, config=config)
|
|
|
|
def test_quay_is_under_expired_sub(self):
|
|
license = self.create_license({
|
|
"expirationDate": get_date(timedelta(days=10)),
|
|
"subscriptions": {
|
|
"somesub": {
|
|
"trialEnd": get_date(TRIAL_GRACE_PERIOD * -1 + timedelta(days=-1)),
|
|
"trialOnly": True,
|
|
"entitlements": {
|
|
"software.quay": 1,
|
|
"software.quay.regions": 3,
|
|
},
|
|
},
|
|
"anothersub": {
|
|
"serviceEnd": get_date(timedelta(days=20)),
|
|
"entitlements": {
|
|
"software.quay.regions": 5,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
config = {
|
|
'DISTRIBUTED_STORAGE_CONFIG': [
|
|
{'name': 'first'},
|
|
{'name': 'second'},
|
|
],
|
|
}
|
|
|
|
self.assertNotValid(license, config=config, expired=ExpirationType.trial_only,
|
|
requirement='software.quay')
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|
|
|