# coding=utf-8

import json as py_json
import time
import unittest
import base64

from urllib import urlencode
from urlparse import urlparse, urlunparse, parse_qs
from datetime import datetime, timedelta
from httmock import urlmatch, HTTMock

import jwt

from Crypto.PublicKey import RSA
from flask import url_for
from jwkest.jwk import RSAKey

from app import app
from data import model
from data.database import ServiceKeyApprovalType
from endpoints import keyserver
from endpoints.api import api, api_bp
from endpoints.api.user import Signin
from endpoints.keyserver import jwk_with_kid
from endpoints.csrf import OAUTH_CSRF_TOKEN_NAME
from endpoints.web import web as web_bp
from endpoints.oauthlogin import oauthlogin as oauthlogin_bp
from initdb import setup_database_for_testing, finished_database_for_testing
from test.helpers import assert_action_logged

try:
  app.register_blueprint(oauthlogin_bp, url_prefix='/oauth')
except ValueError:
  # This blueprint was already registered
  pass

try:
  app.register_blueprint(web_bp, url_prefix='')
except ValueError:
  # This blueprint was already registered
  pass

try:
  app.register_blueprint(keyserver.key_server, url_prefix='')
except ValueError:
  # This blueprint was already registered
  pass

try:
  app.register_blueprint(api_bp, url_prefix='/api')
except ValueError:
  # This blueprint was already registered
  pass


CSRF_TOKEN_KEY = '_csrf_token'
CSRF_TOKEN = '123csrfforme'


class EndpointTestCase(unittest.TestCase):
  maxDiff = None

  @staticmethod
  def _add_csrf(without_csrf):
    parts = urlparse(without_csrf)
    query = parse_qs(parts[4])
    query[CSRF_TOKEN_KEY] = CSRF_TOKEN
    return urlunparse(list(parts[0:4]) + [urlencode(query)] + list(parts[5:]))

  def setUp(self):
    setup_database_for_testing(self)
    self.app = app.test_client()
    self.ctx = app.test_request_context()
    self.ctx.__enter__()
    self.setCsrfToken(CSRF_TOKEN)

  def tearDown(self):
    finished_database_for_testing(self)
    self.ctx.__exit__(True, None, None)

  def setCsrfToken(self, token):
    with self.app.session_transaction() as sess:
      sess[CSRF_TOKEN_KEY] = token
      sess[OAUTH_CSRF_TOKEN_NAME] = 'someoauthtoken'

  def getResponse(self, resource_name, expected_code=200, **kwargs):
    rv = self.app.get(url_for(resource_name, **kwargs))
    self.assertEquals(rv.status_code, expected_code)
    return rv.data

  def deleteResponse(self, resource_name, headers=None, expected_code=200, **kwargs):
    headers = headers or {}
    rv = self.app.delete(url_for(resource_name, **kwargs), headers=headers)
    self.assertEquals(rv.status_code, expected_code)
    return rv.data

  def deleteEmptyResponse(self, resource_name, headers=None, expected_code=204, **kwargs):
    headers = headers or {}
    rv = self.app.delete(url_for(resource_name, **kwargs), headers=headers)
    self.assertEquals(rv.status_code, expected_code)
    self.assertEquals(rv.data, '') # ensure response body empty
    return

  def putResponse(self, resource_name, headers=None, data=None, expected_code=200, **kwargs):
    headers = headers or {}
    data = data or {}
    rv = self.app.put(url_for(resource_name, **kwargs), headers=headers, data=py_json.dumps(data))
    self.assertEquals(rv.status_code, expected_code)
    return rv.data

  def postResponse(self, resource_name, headers=None, form=None, with_csrf=True, expected_code=200, **kwargs):
    headers = headers or {}
    form = form or {}
    url = url_for(resource_name, **kwargs)
    if with_csrf:
      url = EndpointTestCase._add_csrf(url)

    rv = self.app.post(url, headers=headers, data=form)
    if expected_code is not None:
      self.assertEquals(rv.status_code, expected_code)

    return rv

  def login(self, username, password):
    rv = self.app.post(EndpointTestCase._add_csrf(api.url_for(Signin)),
                       data=py_json.dumps(dict(username=username, password=password)),
                       headers={"Content-Type": "application/json"})
    self.assertEquals(rv.status_code, 200)


class OAuthLoginTestCase(EndpointTestCase):
  def invoke_oauth_tests(self, callback_endpoint, attach_endpoint, service_name, service_ident,
                         new_username):
    # Test callback.
    created = self.invoke_oauth_test(callback_endpoint, service_name, service_ident, new_username)

    # Delete the created user.
    model.user.delete_user(created, [])

    # Test attach.
    self.login('devtable', 'password')
    self.invoke_oauth_test(attach_endpoint, service_name, service_ident, 'devtable')

  def invoke_oauth_test(self, endpoint_name, service_name, service_ident, username):
    # No CSRF.
    self.getResponse('oauthlogin.' + endpoint_name, expected_code=403)

    # Invalid CSRF.
    self.getResponse('oauthlogin.' + endpoint_name, state='somestate', expected_code=403)

    # Valid CSRF, invalid code.
    self.getResponse('oauthlogin.' + endpoint_name, state='someoauthtoken',
                     code='invalidcode', expected_code=400)

    # Valid CSRF, valid code.
    self.getResponse('oauthlogin.' + endpoint_name, state='someoauthtoken',
                     code='somecode', expected_code=302)

    # Ensure the user was added/modified.
    found_user = model.user.get_user(username)
    self.assertIsNotNone(found_user)

    federated_login = model.user.lookup_federated_login(found_user, service_name)
    self.assertIsNotNone(federated_login)
    self.assertEquals(federated_login.service_ident, service_ident)
    return found_user

  def test_google_oauth(self):
    @urlmatch(netloc=r'accounts.google.com', path='/o/oauth2/token')
    def account_handler(_, request):
      if request.body.find("code=somecode") > 0:
        content = {'access_token': 'someaccesstoken'}
        return py_json.dumps(content)
      else:
        return {'status_code': 400, 'content': '{"message": "Invalid code"}'}

    @urlmatch(netloc=r'www.googleapis.com', path='/oauth2/v1/userinfo')
    def user_handler(_, __):
      content = {
        'id': 'someid',
        'email': 'someemail@example.com',
        'verified_email': True,
      }
      return py_json.dumps(content)

    with HTTMock(account_handler, user_handler):
      self.invoke_oauth_tests('google_oauth_callback', 'google_oauth_attach', 'google',
                              'someid', 'someemail')

  def test_github_oauth(self):
    @urlmatch(netloc=r'github.com', path='/login/oauth/access_token')
    def account_handler(url, _):
      if url.query.find("code=somecode") > 0:
        content = {'access_token': 'someaccesstoken'}
        return py_json.dumps(content)
      else:
        return {'status_code': 400, 'content': '{"message": "Invalid code"}'}

    @urlmatch(netloc=r'github.com', path='/api/v3/user')
    def user_handler(_, __):
      content = {
        'id': 'someid',
        'login': 'someusername'
      }
      return py_json.dumps(content)

    @urlmatch(netloc=r'github.com', path='/api/v3/user/emails')
    def email_handler(_, __):
      content = [{
        'email': 'someemail@example.com',
        'verified': True,
        'primary': True,
      }]
      return py_json.dumps(content)

    with HTTMock(account_handler, email_handler, user_handler):
      self.invoke_oauth_tests('github_oauth_callback', 'github_oauth_attach', 'github',
                              'someid', 'someusername')

  def test_dex_oauth(self):
    # TODO(jschorr): Add tests for invalid and expired keys.

    # Generate a public/private key pair for the OIDC transaction.
    private_key = RSA.generate(2048)
    jwk = RSAKey(key=private_key.publickey()).serialize()
    token = jwt.encode({
      'iss': 'https://oidcserver/',
      'aud': 'someclientid',
      'sub': 'someid',
      'exp': int(time.time()) + 60,
      'iat': int(time.time()),
      'nbf': int(time.time()),
      'email': 'someemail@example.com',
      'email_verified': True,
    }, private_key.exportKey('PEM'), 'RS256')

    @urlmatch(netloc=r'oidcserver', path='/.well-known/openid-configuration')
    def wellknown_handler(url, _):
      return py_json.dumps({
        'authorization_endpoint': 'http://oidcserver/auth',
        'token_endpoint': 'http://oidcserver/token',
        'jwks_uri': 'http://oidcserver/keys',
      })

    @urlmatch(netloc=r'oidcserver', path='/token')
    def account_handler(url, request):
      if request.body.find("code=somecode") > 0:
        return py_json.dumps({
          'access_token': token,
        })
      else:
        return {'status_code': 400, 'content': '{"message": "Invalid code"}'}

    @urlmatch(netloc=r'oidcserver', path='/keys')
    def keys_handler(_, __):
      return py_json.dumps({
        "keys": [jwk],
      })

    with HTTMock(wellknown_handler, account_handler, keys_handler):
      self.invoke_oauth_tests('dex_oauth_callback', 'dex_oauth_attach', 'dex',
                              'someid', 'someemail')


class WebEndpointTestCase(EndpointTestCase):
  def test_index(self):
    self.getResponse('web.index')

  def test_robots(self):
    self.getResponse('web.robots')

  def test_sitemap(self):
    self.getResponse('web.sitemap')

  def test_repo_view(self):
    self.getResponse('web.repository', path='devtable/simple')

  def test_org_view(self):
    self.getResponse('web.org_view', path='buynlarge')

  def test_user_view(self):
    self.getResponse('web.user_view', path='devtable')

  def test_confirm_repo_email(self):
    code = model.repository.create_email_authorization_for_repo('devtable', 'simple', 'foo@bar.com')
    self.getResponse('web.confirm_repo_email', code=code.code)

    found = model.repository.get_email_authorized_for_repo('devtable', 'simple', 'foo@bar.com')
    self.assertTrue(found.confirmed)

  def test_confirm_email(self):
    user = model.user.get_user('devtable')
    self.assertNotEquals(user.email, 'foo@bar.com')

    code = model.user.create_confirm_email_code(user, 'foo@bar.com')
    self.getResponse('web.confirm_email', code=code.code, expected_code=302)

    user = model.user.get_user('devtable')
    self.assertEquals(user.email, 'foo@bar.com')

  def test_confirm_recovery(self):
    # Try for an invalid code.
    self.getResponse('web.confirm_recovery', code='someinvalidcode', expected_code=200)

    # Create a valid code and try.
    user = model.user.get_user('devtable')
    code = model.user.create_reset_password_email_code(user.email)
    self.getResponse('web.confirm_recovery', code=code.code, expected_code=302)

  def test_confirm_recovery_verified(self):
    # Create a valid code and try.
    user = model.user.get_user('devtable')
    user.verified = False
    user.save()

    code = model.user.create_reset_password_email_code(user.email)
    self.getResponse('web.confirm_recovery', code=code.code, expected_code=302)

    # Ensure the current user is the expected user and that they are verified.
    user = model.user.get_user('devtable')
    self.assertTrue(user.verified)

    self.getResponse('web.receipt', expected_code=404) # Will 401 if no user.

  def test_build_status_badge(self):
    # Try for an invalid repository.
    self.getResponse('web.build_status_badge', repository='foo/bar', expected_code=404)

    # Try for a public repository.
    self.getResponse('web.build_status_badge', repository='public/publicrepo')

    # Try for an private repository.
    self.getResponse('web.build_status_badge', repository='devtable/simple',
                     expected_code=404)

    # Try for an private repository with an invalid token.
    self.getResponse('web.build_status_badge', repository='devtable/simple',
                     token='sometoken', expected_code=404)

    # Try for an private repository with a valid token.
    repository = model.repository.get_repository('devtable', 'simple')
    self.getResponse('web.build_status_badge', repository='devtable/simple',
                     token=repository.badge_token)

  def test_attach_custom_build_trigger(self):
    self.getResponse('web.attach_custom_build_trigger', repository='foo/bar', expected_code=401)
    self.getResponse('web.attach_custom_build_trigger', repository='devtable/simple', expected_code=401)

    self.login('freshuser', 'password')
    self.getResponse('web.attach_custom_build_trigger', repository='devtable/simple', expected_code=403)

    self.login('devtable', 'password')
    self.getResponse('web.attach_custom_build_trigger', repository='devtable/simple', expected_code=302)

  def test_redirect_to_repository(self):
    self.getResponse('web.redirect_to_repository', repository='foo/bar', expected_code=404)
    self.getResponse('web.redirect_to_repository', repository='public/publicrepo', expected_code=302)
    self.getResponse('web.redirect_to_repository', repository='devtable/simple', expected_code=403)

    self.login('devtable', 'password')
    self.getResponse('web.redirect_to_repository', repository='devtable/simple', expected_code=302)

  def test_redirect_to_namespace(self):
    self.getResponse('web.redirect_to_namespace', namespace='unknown', expected_code=404)
    self.getResponse('web.redirect_to_namespace', namespace='devtable', expected_code=302)
    self.getResponse('web.redirect_to_namespace', namespace='buynlarge', expected_code=302)


class OAuthTestCase(EndpointTestCase):
  def test_authorize_nologin(self):
    form = {
      'client_id': 'someclient',
      'redirect_uri': 'http://localhost:5000/foobar',
      'scope': 'user:admin',
    }

    self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=401)

  def test_authorize_invalidclient(self):
    self.login('devtable', 'password')

    form = {
      'client_id': 'someclient',
      'redirect_uri': 'http://localhost:5000/foobar',
      'scope': 'user:admin',
    }

    resp = self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=302)
    self.assertEquals('http://localhost:5000/foobar?error=unauthorized_client', resp.headers['Location'])

  def test_authorize_invalidscope(self):
    self.login('devtable', 'password')

    form = {
      'client_id': 'deadbeef',
      'redirect_uri': 'http://localhost:8000/o2c.html',
      'scope': 'invalid:scope',
    }

    resp = self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=302)
    self.assertEquals('http://localhost:8000/o2c.html?error=invalid_scope', resp.headers['Location'])

  def test_authorize_invalidredirecturi(self):
    self.login('devtable', 'password')

    # Note: Defined in initdb.py
    form = {
      'client_id': 'deadbeef',
      'redirect_uri': 'http://some/invalid/uri',
      'scope': 'user:admin',
    }

    self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=400)

  def test_authorize_success(self):
    self.login('devtable', 'password')

    # Note: Defined in initdb.py
    form = {
      'client_id': 'deadbeef',
      'redirect_uri': 'http://localhost:8000/o2c.html',
      'scope': 'user:admin',
    }

    resp = self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=302)
    self.assertTrue('access_token=' in resp.headers['Location'])

  def test_authorize_nocsrf(self):
    self.login('devtable', 'password')

    # Note: Defined in initdb.py
    form = {
      'client_id': 'deadbeef',
      'redirect_uri': 'http://localhost:8000/o2c.html',
      'scope': 'user:admin',
    }

    self.postResponse('web.authorize_application', form=form, with_csrf=False, expected_code=403)

  def test_authorize_nocsrf_withinvalidheader(self):
    self.login('devtable', 'password')

    # Note: Defined in initdb.py
    form = {
      'client_id': 'deadbeef',
      'redirect_uri': 'http://localhost:8000/o2c.html',
      'scope': 'user:admin',
    }

    headers = dict(authorization='Some random header')
    self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=401)

  def test_authorize_nocsrf_withbadheader(self):
    self.login('devtable', 'password')

    # Note: Defined in initdb.py
    form = {
      'client_id': 'deadbeef',
      'redirect_uri': 'http://localhost:8000/o2c.html',
      'scope': 'user:admin',
    }

    headers = dict(authorization='Basic ' + base64.b64encode('devtable:invalidpassword'))
    self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=401)

  def test_authorize_nocsrf_correctheader(self):
    # Note: Defined in initdb.py
    form = {
      'client_id': 'deadbeef',
      'redirect_uri': 'http://localhost:8000/o2c.html',
      'scope': 'user:admin',
    }

    # Try without the client id being in the whitelist.
    headers = dict(authorization='Basic ' + base64.b64encode('devtable:password'))
    self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=403)

    # Add the client ID to the whitelist and try again.
    app.config['DIRECT_OAUTH_CLIENTID_WHITELIST'] = ['deadbeef']

    headers = dict(authorization='Basic ' + base64.b64encode('devtable:password'))
    resp = self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=True, expected_code=302)
    self.assertTrue('access_token=' in resp.headers['Location'])

  def test_authorize_nocsrf_ratelimiting(self):
    # Note: Defined in initdb.py
    form = {
      'client_id': 'deadbeef',
      'redirect_uri': 'http://localhost:8000/o2c.html',
      'scope': 'user:admin',
    }

    # Try without the client id being in the whitelist a few times, making sure we eventually get rate limited.
    headers = dict(authorization='Basic ' + base64.b64encode('devtable:invalidpassword'))
    self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=401)

    counter = 0
    while True:
      r = self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=None)
      self.assertNotEquals(200, r.status_code)
      counter = counter + 1
      if counter > 5:
        self.fail('Exponential backoff did not fire')

      if r.status_code == 429:
        break


class KeyServerTestCase(EndpointTestCase):
  def _get_test_jwt_payload(self):
    return {
      'iss': 'sample_service',
      'aud': keyserver.JWT_AUDIENCE,
      'exp': int(time.time()) + 60,
      'iat': int(time.time()),
      'nbf': int(time.time()),
    }

  def test_list_service_keys(self):
    # Retrieve all the keys.
    all_keys = model.service_keys.list_all_keys()
    visible_jwks = [jwk_with_kid(key) for key in model.service_keys.list_service_keys('sample_service')]

    invisible_jwks = []
    for key in all_keys:
      is_expired = key.expiration_date and key.expiration_date <= datetime.utcnow()
      if key.service != 'sample_service' or key.approval is None or is_expired:
        invisible_jwks.append(key.jwk)

    rv = self.getResponse('key_server.list_service_keys', service='sample_service')
    jwkset = py_json.loads(rv)

    # Make sure the hidden keys are not returned and the visible ones are returned.
    self.assertTrue(len(visible_jwks) > 0)
    self.assertTrue(len(invisible_jwks) > 0)
    self.assertEquals(len(visible_jwks), len(jwkset['keys']))

    for jwk in jwkset['keys']:
      self.assertIn(jwk, visible_jwks)
      self.assertNotIn(jwk, invisible_jwks)


  def test_get_service_key(self):
    # 200 for an approved key
    self.getResponse('key_server.get_service_key', service='sample_service', kid='kid1')

    # 409 for an unapproved key
    self.getResponse('key_server.get_service_key', service='sample_service', kid='kid3',
                     expected_code=409)

    # 404 for a non-existant key
    self.getResponse('key_server.get_service_key', service='sample_service', kid='kid9999',
                     expected_code=404)

    # 403 for an approved but expired key that is inside of the 2 week window.
    self.getResponse('key_server.get_service_key', service='sample_service', kid='kid6',
                     expected_code=403)

    # 404 for an approved, expired key that is outside of the 2 week window.
    self.getResponse('key_server.get_service_key', service='sample_service', kid='kid7',
                     expected_code=404)

  def test_put_service_key(self):
    # No Authorization header should yield a 400
    self.putResponse('key_server.put_service_key', service='sample_service', kid='kid420',
                     expected_code=400)

    # Mint a JWT with our test payload
    private_key = RSA.generate(2048)
    jwk = RSAKey(key=private_key.publickey()).serialize()
    payload = self._get_test_jwt_payload()
    token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256')

    # Invalid service name should yield a 400.
    self.putResponse('key_server.put_service_key', service='sample service', kid='kid420',
                     headers={
                      'Authorization': 'Bearer %s' % token,
                      'Content-Type': 'application/json',
                     }, data=jwk, expected_code=400)

    # Publish a new key
    with assert_action_logged('service_key_create'):
      self.putResponse('key_server.put_service_key', service='sample_service', kid='kid420',
                       headers={
                        'Authorization': 'Bearer %s' % token,
                        'Content-Type': 'application/json',
                       }, data=jwk, expected_code=202)

    # Ensure that the key exists but is unapproved.
    self.getResponse('key_server.get_service_key', service='sample_service', kid='kid420',
                     expected_code=409)

    # Attempt to rotate the key. Since not approved, it will fail.
    token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256', headers={'kid': 'kid420'})
    self.putResponse('key_server.put_service_key', service='sample_service', kid='kid6969',
                     headers={
                       'Authorization': 'Bearer %s' % token,
                       'Content-Type': 'application/json',
                     }, data=jwk, expected_code=403)

    # Approve the key.
    model.service_keys.approve_service_key('kid420', 1, ServiceKeyApprovalType.SUPERUSER)

    # Rotate that new key
    with assert_action_logged('service_key_rotate'):
      token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256', headers={'kid': 'kid420'})
      self.putResponse('key_server.put_service_key', service='sample_service', kid='kid6969',
                       headers={
                         'Authorization': 'Bearer %s' % token,
                         'Content-Type': 'application/json',
                       }, data=jwk, expected_code=200)

    # Rotation should only work when signed by the previous key
    private_key = RSA.generate(2048)
    jwk = RSAKey(key=private_key.publickey()).serialize()
    token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256', headers={'kid': 'kid420'})
    self.putResponse('key_server.put_service_key', service='sample_service', kid='kid6969',
                     headers={
                       'Authorization': 'Bearer %s' % token,
                       'Content-Type': 'application/json',
                     }, data=jwk, expected_code=403)


  def test_attempt_delete_service_key_with_no_kid_signer(self):
    # Generate two keys, approving the first.
    private_key, _ = model.service_keys.generate_service_key('sample_service', None, kid='first')

    # Mint a JWT with our test payload but *no kid*.
    token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
                       headers={})

    # Using the credentials of our key, attempt to delete our unapproved key
    self.deleteResponse('key_server.delete_service_key',
                        headers={'Authorization': 'Bearer %s' % token},
                        expected_code=400, service='sample_service', kid='first')


  def test_attempt_delete_service_key_with_expired_key(self):
    # Generate two keys, approving the first.
    private_key, _ = model.service_keys.generate_service_key('sample_service', None, kid='first')
    model.service_keys.approve_service_key('first', 1, ServiceKeyApprovalType.SUPERUSER)
    model.service_keys.generate_service_key('sample_service', None, kid='second')

    # Mint a JWT with our test payload
    token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
                       headers={'kid': 'first'})

    # Set the expiration of the first to now - some time.
    model.service_keys.set_key_expiration('first', datetime.utcnow() - timedelta(seconds=100))

    # Using the credentials of our second key, attempt to delete our unapproved key
    self.deleteResponse('key_server.delete_service_key',
                        headers={'Authorization': 'Bearer %s' % token},
                        expected_code=403, service='sample_service', kid='second')

    # Set the expiration to the future and delete the key.
    model.service_keys.set_key_expiration('first', datetime.utcnow() + timedelta(seconds=100))

    with assert_action_logged('service_key_delete'):
      self.deleteEmptyResponse('key_server.delete_service_key',
                          headers={'Authorization': 'Bearer %s' % token},
                          expected_code=204, service='sample_service', kid='second')


  def test_delete_unapproved_service_key(self):
    # No Authorization header should yield a 400
    self.deleteResponse('key_server.delete_service_key', expected_code=400,
                        service='sample_service', kid='kid1')

    # Generate an unapproved key.
    private_key, _ = model.service_keys.generate_service_key('sample_service', None,
                                                             kid='unapprovedkeyhere')

    # Mint a JWT with our test payload
    token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
                       headers={'kid': 'unapprovedkeyhere'})

    # Delete our unapproved key with itself.
    with assert_action_logged('service_key_delete'):
      self.deleteEmptyResponse('key_server.delete_service_key',
                          headers={'Authorization': 'Bearer %s' % token},
                          expected_code=204, service='sample_service', kid='unapprovedkeyhere')


  def test_delete_chained_service_key(self):
    # No Authorization header should yield a 400
    self.deleteResponse('key_server.delete_service_key', expected_code=400,
                        service='sample_service', kid='kid1')

    # Generate two keys.
    private_key, _ = model.service_keys.generate_service_key('sample_service', None, kid='kid123')
    model.service_keys.generate_service_key('sample_service', None, kid='kid321')

    # Mint a JWT with our test payload
    token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
                       headers={'kid': 'kid123'})

    # Using the credentials of our second key, attempt tp delete our unapproved key
    self.deleteResponse('key_server.delete_service_key',
                        headers={'Authorization': 'Bearer %s' % token},
                        expected_code=403, service='sample_service', kid='kid321')

    # Approve the second key.
    model.service_keys.approve_service_key('kid123', 1, ServiceKeyApprovalType.SUPERUSER)

    # Using the credentials of our approved key, delete our unapproved key
    with assert_action_logged('service_key_delete'):
      self.deleteEmptyResponse('key_server.delete_service_key',
                          headers={'Authorization': 'Bearer %s' % token},
                          expected_code=204, service='sample_service', kid='kid321')

    # Attempt to delete a key signed by a key from a different service
    bad_token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
                           headers={'kid': 'kid5'})
    self.deleteResponse('key_server.delete_service_key',
                        headers={'Authorization': 'Bearer %s' % bad_token},
                        expected_code=403, service='sample_service', kid='kid123')

    # Delete a self-signed, approved key
    with assert_action_logged('service_key_delete'):
      self.deleteEmptyResponse('key_server.delete_service_key',
                          headers={'Authorization': 'Bearer %s' % token},
                          expected_code=204, service='sample_service', kid='kid123')


if __name__ == '__main__':
  unittest.main()