parent
35437c9f55
commit
4a4eee5e05
10 changed files with 199 additions and 35 deletions
|
@ -568,6 +568,54 @@ class RegistryTestsMixin(object):
|
|||
self.assertEquals('public', logs[0]['performer']['name'])
|
||||
|
||||
|
||||
def test_push_pull_logging_byrobot(self):
|
||||
# Lookup the robot's password.
|
||||
self.conduct_api_login('devtable', 'password')
|
||||
resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/ownerbot')
|
||||
robot_token = json.loads(resp.text)['token']
|
||||
|
||||
# Push a new repository.
|
||||
self.do_push('buynlarge', 'newrepo', 'buynlarge+ownerbot', robot_token)
|
||||
|
||||
# Retrieve the logs and ensure the push was added.
|
||||
result = self.conduct('GET', '/api/v1/repository/buynlarge/newrepo/logs')
|
||||
logs = result.json()['logs']
|
||||
|
||||
self.assertEquals(1, len(logs))
|
||||
self.assertEquals('push_repo', logs[0]['kind'])
|
||||
self.assertEquals('buynlarge+ownerbot', logs[0]['performer']['name'])
|
||||
|
||||
# Pull the repository.
|
||||
self.do_pull('buynlarge', 'newrepo', 'buynlarge+ownerbot', robot_token)
|
||||
|
||||
# Retrieve the logs and ensure the pull was added.
|
||||
result = self.conduct('GET', '/api/v1/repository/buynlarge/newrepo/logs')
|
||||
logs = result.json()['logs']
|
||||
|
||||
self.assertEquals(2, len(logs))
|
||||
self.assertEquals('pull_repo', logs[0]['kind'])
|
||||
self.assertEquals('buynlarge+ownerbot', logs[0]['performer']['name'])
|
||||
|
||||
|
||||
def test_push_pull_logging_byoauth(self):
|
||||
# Push the repository.
|
||||
self.do_push('devtable', 'newrepo', 'devtable', 'password')
|
||||
|
||||
# Pull the repository.
|
||||
self.do_pull('devtable', 'newrepo', '$oauthtoken', 'test')
|
||||
|
||||
# Retrieve the logs and ensure the pull was added.
|
||||
self.conduct_api_login('devtable', 'password')
|
||||
result = self.conduct('GET', '/api/v1/repository/devtable/newrepo/logs')
|
||||
logs = result.json()['logs']
|
||||
|
||||
self.assertEquals(2, len(logs))
|
||||
self.assertEquals('pull_repo', logs[0]['kind'])
|
||||
|
||||
self.assertEquals('devtable', logs[0]['performer']['name'])
|
||||
self.assertEquals(1, logs[0]['metadata']['oauth_token_id'])
|
||||
|
||||
|
||||
def test_pull_publicrepo_anonymous(self):
|
||||
# Add a new repository under the public user, so we have a real repository to pull.
|
||||
self.do_push('public', 'newrepo', 'public', 'password')
|
||||
|
|
|
@ -6,9 +6,9 @@ from cryptography.hazmat.backends import default_backend
|
|||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
|
||||
from app import app
|
||||
from endpoints.v2.v2auth import (TOKEN_VALIDITY_LIFETIME_S, load_certificate_bytes,
|
||||
load_private_key, ANONYMOUS_SUB)
|
||||
from auth.jwt_auth import identity_from_bearer_token, load_public_key, InvalidJWTException
|
||||
from endpoints.v2.v2auth import TOKEN_VALIDITY_LIFETIME_S, load_certificate_bytes, load_private_key
|
||||
from auth.jwt_auth import (identity_from_bearer_token, load_public_key, InvalidJWTException,
|
||||
build_context_and_subject, ANONYMOUS_SUB)
|
||||
from util.morecollections import AttrDict
|
||||
|
||||
|
||||
|
@ -27,13 +27,15 @@ class TestRegistryV2Auth(unittest.TestCase):
|
|||
|
||||
def _generate_token_data(self, access=[], audience=TEST_AUDIENCE, user=TEST_USER, iat=None,
|
||||
exp=None, nbf=None, iss=app.config['JWT_AUTH_TOKEN_ISSUER']):
|
||||
|
||||
_, subject = build_context_and_subject(user, None, None)
|
||||
return {
|
||||
'iss': iss,
|
||||
'aud': audience,
|
||||
'nbf': nbf if nbf is not None else int(time.time()),
|
||||
'iat': iat if iat is not None else int(time.time()),
|
||||
'exp': exp if exp is not None else int(time.time() + TOKEN_VALIDITY_LIFETIME_S),
|
||||
'sub': user.username if user else ANONYMOUS_SUB,
|
||||
'sub': subject,
|
||||
'access': access,
|
||||
}
|
||||
|
||||
|
@ -50,7 +52,7 @@ class TestRegistryV2Auth(unittest.TestCase):
|
|||
return 'Bearer {0}'.format(token_data)
|
||||
|
||||
def _parse_token(self, token):
|
||||
return identity_from_bearer_token(token, MAX_SIGNED_S, self.public_key)
|
||||
return identity_from_bearer_token(token, MAX_SIGNED_S, self.public_key)[0]
|
||||
|
||||
def _generate_public_key(self):
|
||||
key = rsa.generate_private_key(
|
||||
|
|
Reference in a new issue