From 7befc048090ce849ae85a81ee1772e6772239ab5 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 25 Mar 2014 15:17:02 -0400 Subject: [PATCH] Fix API usage tests to send the proper CSRF token and add a "invalid CSRF token" test --- test/test_api_usage.py | 48 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 4 deletions(-) diff --git a/test/test_api_usage.py b/test/test_api_usage.py index 0a5847d80..977c20f2c 100644 --- a/test/test_api_usage.py +++ b/test/test_api_usage.py @@ -1,6 +1,9 @@ import unittest import json as py_json +from urllib import urlencode +from urlparse import urlparse, urlunparse, parse_qs + from endpoints.api import api_bp, api from endpoints.webhooks import webhooks from endpoints.trigger import BuildTrigger as BuildTriggerBase @@ -61,17 +64,37 @@ NEW_USER_DETAILS = { FAKE_APPLICATION_CLIENT_ID = 'deadbeef' +CSRF_TOKEN_KEY = '_csrf_token' +CSRF_TOKEN = '123csrfforme' + class ApiTestCase(unittest.TestCase): + @staticmethod + def _add_csrf(without_csrf): + parts = urlparse(without_csrf) + query = parse_qs(parts[4]) + query[CSRF_TOKEN_KEY] = CSRF_TOKEN + return urlunparse(list(parts[0:4]) + [urlencode(query)] + list(parts[5:])) + + def url_for(self, resource_name, params={}): + url = api.url_for(resource_name, **params) + url = ApiTestCase._add_csrf(url) + return url + def setUp(self): setup_database_for_testing(self) self.app = app.test_client() self.ctx = app.test_request_context() self.ctx.__enter__() + self.setCsrfToken(CSRF_TOKEN) def tearDown(self): finished_database_for_testing(self) self.ctx.__exit__(True, None, None) + def setCsrfToken(self, token): + with self.app.session_transaction() as sess: + sess[CSRF_TOKEN_KEY] = token + def getJsonResponse(self, resource_name, params={}, expected_code=200): rv = self.app.get(api.url_for(resource_name, **params)) self.assertEquals(expected_code, rv.status_code) @@ -80,7 +103,7 @@ class ApiTestCase(unittest.TestCase): return parsed def postResponse(self, resource_name, params={}, data={}, expected_code=200): - rv = self.app.post(api.url_for(resource_name, **params), + rv = self.app.post(self.url_for(resource_name, params), data=py_json.dumps(data), headers={"Content-Type": "application/json"}) self.assertEquals(rv.status_code, expected_code) @@ -92,13 +115,13 @@ class ApiTestCase(unittest.TestCase): return rv.data def deleteResponse(self, resource_name, params={}, expected_code=204): - rv = self.app.delete(api.url_for(resource_name, **params)) + rv = self.app.delete(self.url_for(resource_name, params)) self.assertEquals(rv.status_code, expected_code) return rv.data def postJsonResponse(self, resource_name, params={}, data={}, expected_code=200): - rv = self.app.post(api.url_for(resource_name, **params), + rv = self.app.post(self.url_for(resource_name, params), data=py_json.dumps(data), headers={"Content-Type": "application/json"}) @@ -112,7 +135,7 @@ class ApiTestCase(unittest.TestCase): def putJsonResponse(self, resource_name, params={}, data={}, expected_code=200): - rv = self.app.put(api.url_for(resource_name, **params), + rv = self.app.put(self.url_for(resource_name, params), data=py_json.dumps(data), headers={"Content-Type": "application/json"}) @@ -128,6 +151,23 @@ class ApiTestCase(unittest.TestCase): return self.postJsonResponse(Signin, data=dict(username=username, password=password)) +class TestCSRFFailure(ApiTestCase): + def test_csrf_failure(self): + self.login(READ_ACCESS_USER) + + # Make sure a simple post call succeeds. + self.putJsonResponse(User, + data=dict(password='newpasswordiscool')) + + # Change the session's CSRF token. + self.setCsrfToken('someinvalidtoken') + + # Verify that the call now fails. + self.putJsonResponse(User, + data=dict(password='newpasswordiscool'), + expected_code=403) + + class TestDiscovery(ApiTestCase): def test_discovery(self): json = self.getJsonResponse(DiscoveryResource)