From 7a546fbbdedc7c2a1fee4ac01515474ee7184327 Mon Sep 17 00:00:00 2001 From: Adam Young Date: Thu, 29 May 2014 13:56:17 -0400 Subject: [PATCH] Block delegation escalation of privilege Forbids doing the following with either a trust or oauth based token: creating a trust approving a request_token listing request tokens Change-Id: I1528f9dd003f5e03cbc50b78e1b32dbbf85ffcc2 Closes-Bug: 1324592 --- keystone/common/controller.py | 36 ++++++++++++- keystone/contrib/oauth1/controllers.py | 12 +++++ keystone/exception.py | 2 +- keystone/tests/test_v3_auth.py | 74 +++++++++++++++++++++++++ keystone/tests/test_v3_oauth1.py | 99 ++++++++++++++++++++++++++++++++++ keystone/trust/controllers.py | 9 ++++ 6 files changed, 230 insertions(+), 2 deletions(-) diff --git a/keystone/common/controller.py b/keystone/common/controller.py index faadc09d5d3a43e510eb3485ffbc4dc0cdb59a2f..d6c33dfae4de1b7a50aa11d0f87eaaadc8aab88a 100644 --- a/keystone/common/controller.py +++ b/keystone/common/controller.py @@ -44,7 +44,7 @@ def _build_policy_check_credentials(self, action, context, kwargs): # it would otherwise need to reload the token_ref from backing store. wsgi.validate_token_bind(context, token_ref) - creds = {} + creds = {'is_delegated_auth': False} if 'token_data' in token_ref and 'token' in token_ref['token_data']: #V3 Tokens token_data = token_ref['token_data']['token'] @@ -66,9 +66,32 @@ def _build_policy_check_credentials(self, action, context, kwargs): creds['roles'] = [] for role in token_data['roles']: creds['roles'].append(role['name']) + + trust = token_data.get('OS-TRUST:trust') + if trust is None: + creds['trust_id'] = None + creds['trustor_id'] = None + creds['trustee_id'] = None + else: + creds['trust_id'] = trust['id'] + creds['trustor_id'] = trust['trustor_user']['id'] + creds['trustee_id'] = trust['trustee_user']['id'] + creds['is_delegated_auth'] = True + + oauth1 = token_data.get('OS-OAUTH1') + if oauth1 is None: + creds['consumer_id'] = None + creds['access_token_id'] = None + else: + creds['consumer_id'] = oauth1['consumer_id'] + creds['access_token_id'] = oauth1['access_token_id'] + creds['is_delegated_auth'] = True + else: #v2 Tokens creds = token_ref.get('metadata', {}).copy() + creds['is_delegated_auth'] = False + try: creds['user_id'] = token_ref['user'].get('id') except AttributeError: @@ -81,6 +104,16 @@ def _build_policy_check_credentials(self, action, context, kwargs): # NOTE(vish): this is pretty inefficient creds['roles'] = [self.identity_api.get_role(role)['name'] for role in creds.get('roles', [])] + trust = token_ref.get('trust') + if trust is None: + creds['trust_id'] = None + creds['trustor_id'] = None + creds['trustee_id'] = None + else: + creds['trust_id'] = trust.get('id') + creds['trustor_id'] = trust.get('trustor_id') + creds['trustee_id'] = trust.get('trustee_id') + creds['is_delegated_auth'] = True return creds @@ -155,6 +188,7 @@ def protected(callback=None): policy_dict.update(kwargs) self.policy_api.enforce(creds, action, flatten(policy_dict)) LOG.debug(_('RBAC: Authorization granted')) + context['environment'] = {'KEYSTONE_AUTH_CONTEXT': creds} return f(self, context, *args, **kwargs) return inner return wrapper diff --git a/keystone/contrib/oauth1/controllers.py b/keystone/contrib/oauth1/controllers.py index b8c24419285914e24eb233921babd4a745f0ebfb..d4024dffa34b14a749569ae9705169c92b03e0bf 100644 --- a/keystone/contrib/oauth1/controllers.py +++ b/keystone/contrib/oauth1/controllers.py @@ -86,6 +86,12 @@ class AccessTokenCrudV3(controller.V3Controller): @controller.protected() def list_access_tokens(self, context, user_id): + auth_context = context.get('environment', + {}).get('KEYSTONE_AUTH_CONTEXT', {}) + if auth_context.get('is_delegated_auth'): + raise exception.Forbidden( + _('Cannot list request tokens' + ' with a token issued via delegation.')) refs = self.oauth_api.list_access_tokens(user_id) formatted_refs = ([self._format_token_entity(x) for x in refs]) return AccessTokenCrudV3.wrap_collection(context, formatted_refs) @@ -314,6 +320,12 @@ class OAuthControllerV3(controller.V3Controller): there is not another easy way to make sure the user knows which roles are being requested before authorizing. """ + auth_context = context.get('environment', + {}).get('KEYSTONE_AUTH_CONTEXT', {}) + if auth_context.get('is_delegated_auth'): + raise exception.Forbidden( + _('Cannot authorize a request token' + ' with a token issued via delegation.')) req_token = self.oauth_api.get_request_token(request_token_id) diff --git a/keystone/exception.py b/keystone/exception.py index 9c54c099aaebcf6f8a8663859f621fe06a073c30..27ba83922ca4d0c802d271de573406ca5987cf62 100644 --- a/keystone/exception.py +++ b/keystone/exception.py @@ -20,7 +20,7 @@ from keystone.common import config from keystone.openstack.common import log as logging from keystone.openstack.common import strutils - +from keystone.openstack.common.gettextutils import _ CONF = config.CONF LOG = logging.getLogger(__name__) diff --git a/keystone/tests/test_v3_auth.py b/keystone/tests/test_v3_auth.py index e89e29f3276178133c2dad731723051ab0a13be7..b4cd335479856229051b174fde0967cd218fbbd1 100644 --- a/keystone/tests/test_v3_auth.py +++ b/keystone/tests/test_v3_auth.py @@ -2150,6 +2150,80 @@ class TestTrustAuth(TestAuthInfo): self.assertEqual(r.result['token']['project']['name'], self.project['name']) + def test_impersonation_token_cannot_create_new_trust(self): + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + impersonation=True, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + + r = self.post('/OS-TRUST/trusts', body={'trust': ref}) + trust = self.assertValidTrustResponse(r) + + auth_data = self.build_authentication_request( + user_id=self.trustee_user['id'], + password=self.trustee_user['password'], + trust_id=trust['id']) + r = self.post('/auth/tokens', body=auth_data) + + trust_token = r.headers['X-Subject-Token'] + + # Build second trust + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + impersonation=True, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + + self.post('/OS-TRUST/trusts', + body={'trust': ref}, + token=trust_token, + expected_status=403) + + def assertTrustTokensRevoked(self, trust_id): + revocation_response = self.get('/OS-REVOKE/events', + expected_status=200) + revocation_events = revocation_response.json_body['events'] + found = False + for event in revocation_events: + if event.get('OS-TRUST:trust_id') == trust_id: + found = True + self.assertTrue(found, 'event with trust_id %s not found in list' % + trust_id) + + def test_delete_trust_revokes_tokens(self): + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + impersonation=False, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + r = self.post('/OS-TRUST/trusts', body={'trust': ref}) + trust = self.assertValidTrustResponse(r) + trust_id = trust['id'] + auth_data = self.build_authentication_request( + user_id=self.trustee_user['id'], + password=self.trustee_user['password'], + trust_id=trust_id) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidProjectTrustScopedTokenResponse( + r, self.trustee_user) + trust_token = r.headers['X-Subject-Token'] + self.delete('/OS-TRUST/trusts/%(trust_id)s' % { + 'trust_id': trust_id}, + expected_status=204) + headers = {'X-Subject-Token': trust_token} + self.head('/auth/tokens', headers=headers, expected_status=404) + self.assertTrustTokensRevoked(trust_id) + def test_delete_trust(self): ref = self.new_trust_ref( trustor_user_id=self.user_id, diff --git a/keystone/tests/test_v3_oauth1.py b/keystone/tests/test_v3_oauth1.py index 73a34d7221473509ada5345a08f9c7bed94af6b5..de37ff02b9f2729ac9e0fb8ea01a38f420394b8e 100644 --- a/keystone/tests/test_v3_oauth1.py +++ b/keystone/tests/test_v3_oauth1.py @@ -16,6 +16,7 @@ import copy import os +import tempfile import urlparse import uuid @@ -26,6 +27,8 @@ from keystone import contrib from keystone.contrib import oauth1 from keystone.contrib.oauth1 import controllers from keystone.openstack.common import importutils +from keystone.openstack.common import jsonutils +from keystone.policy.backends import rules from keystone.tests import test_v3 @@ -447,6 +450,102 @@ class AuthTokenTests(OAuthFlowTests): self.assertTrue(len(tokens) > 0) self.assertTrue(keystone_token_uuid in tokens) + def _create_trust_get_token(self): + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.user_id, + project_id=self.project_id, + impersonation=True, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + + r = self.post('/OS-TRUST/trusts', body={'trust': ref}) + trust = self.assertValidTrustResponse(r) + + auth_data = self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + trust_id=trust['id']) + r = self.post('/auth/tokens', body=auth_data) + + trust_token = r.headers['X-Subject-Token'] + return trust_token + + def _approve_request_token_url(self): + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + consumer_secret = consumer.get('secret') + self.consumer = oauth1.Consumer(consumer_id, consumer_secret) + self.assertIsNotNone(self.consumer.key) + + url, headers = self._create_request_token(self.consumer, + self.project_id) + content = self.post(url, headers=headers) + credentials = urlparse.parse_qs(content.result) + request_key = credentials.get('oauth_token')[0] + request_secret = credentials.get('oauth_token_secret')[0] + self.request_token = oauth1.Token(request_key, request_secret) + self.assertIsNotNone(self.request_token.key) + + url = self._authorize_request_token(request_key) + + return url + + def test_oauth_token_cannot_create_new_trust(self): + self.test_oauth_flow() + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.user_id, + project_id=self.project_id, + impersonation=True, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + + self.post('/OS-TRUST/trusts', + body={'trust': ref}, + token=self.keystone_token_id, + expected_status=403) + + def test_oauth_token_cannot_authorize_request_token(self): + self.test_oauth_flow() + url = self._approve_request_token_url() + body = {'roles': [{'id': self.role_id}]} + self.put(url, body=body, token=self.keystone_token_id, + expected_status=403) + + def test_oauth_token_cannot_list_request_tokens(self): + self._set_policy({"identity:list_access_tokens": [], + "identity:create_consumer": [], + "identity:authorize_request_token": []}) + self.test_oauth_flow() + url = '/users/%s/OS-OAUTH1/access_tokens' % self.user_id + self.get(url, token=self.keystone_token_id, + expected_status=403) + + def _set_policy(self, new_policy): + _unused, self.tmpfilename = tempfile.mkstemp() + #self.orig_policy_file = CONF.policy_file + rules.reset() + self.opt(policy_file=self.tmpfilename) + with open(self.tmpfilename, "w") as policyfile: + policyfile.write(jsonutils.dumps(new_policy)) + self.addCleanup(os.remove, self.tmpfilename) + + def test_trust_token_cannot_authorize_request_token(self): + trust_token = self._create_trust_get_token() + url = self._approve_request_token_url() + body = {'roles': [{'id': self.role_id}]} + self.put(url, body=body, token=trust_token, expected_status=403) + + def test_trust_token_cannot_list_request_tokens(self): + self._set_policy({"identity:list_access_tokens": [], + "identity:create_trust": []}) + trust_token = self._create_trust_get_token() + url = '/users/%s/OS-OAUTH1/access_tokens' % self.user_id + self.get(url, token=trust_token, expected_status=403) + class MaliciousOAuth1Tests(OAuth1Tests): diff --git a/keystone/trust/controllers.py b/keystone/trust/controllers.py index 1d54f51a70854e50c12cca73f0442fdec4d13a94..7fdc8c29dc09f0910521c2198409f6a79ab090e8 100644 --- a/keystone/trust/controllers.py +++ b/keystone/trust/controllers.py @@ -144,6 +144,15 @@ class TrustV3(controller.V3Controller): # TODO(ayoung): instead of raising ValidationError on the first # problem, return a collection of all the problems. + + # Explicitly prevent a trust token from creating a new trust. + auth_context = context.get('environment', + {}).get('KEYSTONE_AUTH_CONTEXT', {}) + if auth_context.get('is_delegated_auth'): + raise exception.Forbidden( + _('Cannot create a trust' + ' with a token issued via delegation.')) + if not trust: raise exception.ValidationError(attribute='trust', target='request') -- 1.9.0