Source code for keystone.tests.unit.test_revoke

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.


import datetime
import uuid

import mock
from oslo_utils import timeutils
from six.moves import range
from testtools import matchers

from keystone.common import utils
from keystone import exception
from keystone.models import revoke_model
from keystone.tests import unit
from keystone.tests.unit import test_backend_sql
from keystone.token import provider


def _new_id():
    return uuid.uuid4().hex


def _future_time():
    expire_delta = datetime.timedelta(seconds=1000)
    future_time = timeutils.utcnow() + expire_delta
    return future_time


def _past_time():
    expire_delta = datetime.timedelta(days=-1000)
    past_time = timeutils.utcnow() + expire_delta
    return past_time


def _sample_blank_token():
    issued_delta = datetime.timedelta(minutes=-2)
    issued_at = timeutils.utcnow() + issued_delta
    token_data = revoke_model.blank_token_data(issued_at)
    return token_data


def _matches(event, token_values):
    """See if the token matches the revocation event.

    Used as a secondary check on the logic to Check
    By Tree Below:  This is abrute force approach to checking.
    Compare each attribute from the event with the corresponding
    value from the token.  If the event does not have a value for
    the attribute, a match is still possible.  If the event has a
    value for the attribute, and it does not match the token, no match
    is possible, so skip the remaining checks.

    :param event: one revocation event to match
    :param token_values: dictionary with set of values taken from the
    token
    :returns: True if the token matches the revocation event, indicating the
    token has been revoked
    """
    # The token has three attributes that can match the user_id
    if event.user_id is not None:
        for attribute_name in ['user_id', 'trustor_id', 'trustee_id']:
            if event.user_id == token_values[attribute_name]:
                break
        else:
            return False

    # The token has two attributes that can match the domain_id
    if event.domain_id is not None:
        for attribute_name in ['identity_domain_id', 'assignment_domain_id']:
            if event.domain_id == token_values[attribute_name]:
                break
        else:
            return False

    if event.domain_scope_id is not None:
        if event.domain_scope_id != token_values['assignment_domain_id']:
            return False

    # If any one check does not match, the while token does
    # not match the event. The numerous return False indicate
    # that the token is still valid and short-circuits the
    # rest of the logic.
    attribute_names = ['project_id',
                       'expires_at', 'trust_id', 'consumer_id',
                       'access_token_id', 'audit_id', 'audit_chain_id']
    for attribute_name in attribute_names:
        if getattr(event, attribute_name) is not None:
            if (getattr(event, attribute_name) !=
                    token_values[attribute_name]):
                        return False

    if event.role_id is not None:
        roles = token_values['roles']
        for role in roles:
            if event.role_id == role:
                break
        else:
            return False
    if token_values['issued_at'] > event.issued_before:
        return False
    return True


[docs]class RevokeTests(object):
[docs] def test_list(self): self.revoke_api.revoke_by_user(user_id=1) self.assertEqual(1, len(self.revoke_api.list_events())) self.revoke_api.revoke_by_user(user_id=2) self.assertEqual(2, len(self.revoke_api.list_events()))
[docs] def test_list_since(self): self.revoke_api.revoke_by_user(user_id=1) self.revoke_api.revoke_by_user(user_id=2) past = timeutils.utcnow() - datetime.timedelta(seconds=1000) self.assertEqual(2, len(self.revoke_api.list_events(last_fetch=past))) future = timeutils.utcnow() + datetime.timedelta(seconds=1000) self.assertEqual(0, len(self.revoke_api.list_events(last_fetch=future)))
[docs] def test_past_expiry_are_removed(self): user_id = 1 self.revoke_api.revoke_by_expiration(user_id, _future_time()) self.assertEqual(1, len(self.revoke_api.list_events())) event = revoke_model.RevokeEvent() event.revoked_at = _past_time() self.revoke_api.revoke(event) self.assertEqual(1, len(self.revoke_api.list_events()))
@mock.patch.object(timeutils, 'utcnow')
[docs] def test_expired_events_removed_validate_token_success(self, mock_utcnow): def _sample_token_values(): token = _sample_blank_token() token['expires_at'] = utils.isotime(_future_time(), subsecond=True) return token now = datetime.datetime.utcnow() now_plus_2h = now + datetime.timedelta(hours=2) mock_utcnow.return_value = now # Build a token and validate it. This will seed the cache for the # future 'synchronize' call. token_values = _sample_token_values() user_id = _new_id() self.revoke_api.revoke_by_user(user_id) token_values['user_id'] = user_id self.assertRaises(exception.TokenNotFound, self.revoke_api.check_token, token_values) # Move our clock forward by 2h, build a new token and validate it. # 'synchronize' should now be exercised and remove old expired events mock_utcnow.return_value = now_plus_2h self.revoke_api.revoke_by_expiration(_new_id(), now_plus_2h) # should no longer throw an exception self.revoke_api.check_token(token_values)
[docs] def test_revoke_by_expiration_project_and_domain_fails(self): user_id = _new_id() expires_at = utils.isotime(_future_time(), subsecond=True) domain_id = _new_id() project_id = _new_id() self.assertThat( lambda: self.revoke_api.revoke_by_expiration( user_id, expires_at, domain_id=domain_id, project_id=project_id), matchers.raises(exception.UnexpectedError))
[docs]class SqlRevokeTests(test_backend_sql.SqlTests, RevokeTests):
[docs] def config_overrides(self): super(SqlRevokeTests, self).config_overrides() self.config_fixture.config( group='token', provider='pki', revoke_by_id=False)
[docs]class RevokeTreeTests(unit.TestCase):
[docs] def setUp(self): super(RevokeTreeTests, self).setUp() self.events = [] self.tree = revoke_model.RevokeTree() self._sample_data()
def _sample_data(self): user_ids = [] project_ids = [] role_ids = [] for i in range(0, 3): user_ids.append(_new_id()) project_ids.append(_new_id()) role_ids.append(_new_id()) project_tokens = [] i = len(project_tokens) project_tokens.append(_sample_blank_token()) project_tokens[i]['user_id'] = user_ids[0] project_tokens[i]['project_id'] = project_ids[0] project_tokens[i]['roles'] = [role_ids[1]] i = len(project_tokens) project_tokens.append(_sample_blank_token()) project_tokens[i]['user_id'] = user_ids[1] project_tokens[i]['project_id'] = project_ids[0] project_tokens[i]['roles'] = [role_ids[0]] i = len(project_tokens) project_tokens.append(_sample_blank_token()) project_tokens[i]['user_id'] = user_ids[0] project_tokens[i]['project_id'] = project_ids[1] project_tokens[i]['roles'] = [role_ids[0]] token_to_revoke = _sample_blank_token() token_to_revoke['user_id'] = user_ids[0] token_to_revoke['project_id'] = project_ids[0] token_to_revoke['roles'] = [role_ids[0]] self.project_tokens = project_tokens self.user_ids = user_ids self.project_ids = project_ids self.role_ids = role_ids self.token_to_revoke = token_to_revoke def _assertTokenRevoked(self, token_data): self.assertTrue(any([_matches(e, token_data) for e in self.events])) return self.assertTrue(self.tree.is_revoked(token_data), 'Token should be revoked') def _assertTokenNotRevoked(self, token_data): self.assertFalse(any([_matches(e, token_data) for e in self.events])) return self.assertFalse(self.tree.is_revoked(token_data), 'Token should not be revoked') def _revoke_by_user(self, user_id): return self.tree.add_event( revoke_model.RevokeEvent(user_id=user_id)) def _revoke_by_audit_id(self, audit_id): event = self.tree.add_event( revoke_model.RevokeEvent(audit_id=audit_id)) self.events.append(event) return event def _revoke_by_audit_chain_id(self, audit_chain_id, project_id=None, domain_id=None): event = self.tree.add_event( revoke_model.RevokeEvent(audit_chain_id=audit_chain_id, project_id=project_id, domain_id=domain_id) ) self.events.append(event) return event def _revoke_by_expiration(self, user_id, expires_at, project_id=None, domain_id=None): event = self.tree.add_event( revoke_model.RevokeEvent(user_id=user_id, expires_at=expires_at, project_id=project_id, domain_id=domain_id)) self.events.append(event) return event def _revoke_by_grant(self, role_id, user_id=None, domain_id=None, project_id=None): event = self.tree.add_event( revoke_model.RevokeEvent(user_id=user_id, role_id=role_id, domain_id=domain_id, project_id=project_id)) self.events.append(event) return event def _revoke_by_user_and_project(self, user_id, project_id): event = self.tree.add_event( revoke_model.RevokeEvent(project_id=project_id, user_id=user_id)) self.events.append(event) return event def _revoke_by_project_role_assignment(self, project_id, role_id): event = self.tree.add_event( revoke_model.RevokeEvent(project_id=project_id, role_id=role_id)) self.events.append(event) return event def _revoke_by_domain_role_assignment(self, domain_id, role_id): event = self.tree.add_event( revoke_model.RevokeEvent(domain_id=domain_id, role_id=role_id)) self.events.append(event) return event def _revoke_by_domain(self, domain_id): event = self.tree.add_event( revoke_model.RevokeEvent(domain_id=domain_id)) self.events.append(event) def _user_field_test(self, field_name): user_id = _new_id() event = self._revoke_by_user(user_id) self.events.append(event) token_data_u1 = _sample_blank_token() token_data_u1[field_name] = user_id self._assertTokenRevoked(token_data_u1) token_data_u2 = _sample_blank_token() token_data_u2[field_name] = _new_id() self._assertTokenNotRevoked(token_data_u2) self.tree.remove_event(event) self.events.remove(event) self._assertTokenNotRevoked(token_data_u1)
[docs] def test_revoke_by_user(self): self._user_field_test('user_id')
[docs] def test_revoke_by_user_matches_trustee(self): self._user_field_test('trustee_id')
[docs] def test_revoke_by_user_matches_trustor(self): self._user_field_test('trustor_id')
[docs] def test_by_user_expiration(self): future_time = _future_time() user_id = 1 event = self._revoke_by_expiration(user_id, future_time) token_data_1 = _sample_blank_token() token_data_1['user_id'] = user_id token_data_1['expires_at'] = future_time.replace(microsecond=0) self._assertTokenRevoked(token_data_1) token_data_2 = _sample_blank_token() token_data_2['user_id'] = user_id expire_delta = datetime.timedelta(seconds=2000) future_time = timeutils.utcnow() + expire_delta token_data_2['expires_at'] = future_time self._assertTokenNotRevoked(token_data_2) self.remove_event(event) self._assertTokenNotRevoked(token_data_1)
[docs] def test_revoke_by_audit_id(self): audit_id = provider.audit_info(parent_audit_id=None)[0] token_data_1 = _sample_blank_token() # Audit ID and Audit Chain ID are populated with the same value # if the token is an original token token_data_1['audit_id'] = audit_id token_data_1['audit_chain_id'] = audit_id event = self._revoke_by_audit_id(audit_id) self._assertTokenRevoked(token_data_1) audit_id_2 = provider.audit_info(parent_audit_id=audit_id)[0] token_data_2 = _sample_blank_token() token_data_2['audit_id'] = audit_id_2 token_data_2['audit_chain_id'] = audit_id self._assertTokenNotRevoked(token_data_2) self.remove_event(event) self._assertTokenNotRevoked(token_data_1)
[docs] def test_revoke_by_audit_chain_id(self): audit_id = provider.audit_info(parent_audit_id=None)[0] token_data_1 = _sample_blank_token() # Audit ID and Audit Chain ID are populated with the same value # if the token is an original token token_data_1['audit_id'] = audit_id token_data_1['audit_chain_id'] = audit_id event = self._revoke_by_audit_chain_id(audit_id) self._assertTokenRevoked(token_data_1) audit_id_2 = provider.audit_info(parent_audit_id=audit_id)[0] token_data_2 = _sample_blank_token() token_data_2['audit_id'] = audit_id_2 token_data_2['audit_chain_id'] = audit_id self._assertTokenRevoked(token_data_2) self.remove_event(event) self._assertTokenNotRevoked(token_data_1) self._assertTokenNotRevoked(token_data_2)
[docs] def test_by_user_project(self): # When a user has a project-scoped token and the project-scoped token # is revoked then the token is revoked. user_id = _new_id() project_id = _new_id() future_time = _future_time() token_data = _sample_blank_token() token_data['user_id'] = user_id token_data['project_id'] = project_id token_data['expires_at'] = future_time.replace(microsecond=0) self._revoke_by_expiration(user_id, future_time, project_id=project_id) self._assertTokenRevoked(token_data)
[docs] def test_by_user_domain(self): # When a user has a domain-scoped token and the domain-scoped token # is revoked then the token is revoked. user_id = _new_id() domain_id = _new_id() future_time = _future_time() token_data = _sample_blank_token() token_data['user_id'] = user_id token_data['assignment_domain_id'] = domain_id token_data['expires_at'] = future_time.replace(microsecond=0) self._revoke_by_expiration(user_id, future_time, domain_id=domain_id) self._assertTokenRevoked(token_data)
[docs] def remove_event(self, event): self.events.remove(event) self.tree.remove_event(event)
[docs] def test_by_project_grant(self): token_to_revoke = self.token_to_revoke tokens = self.project_tokens self._assertTokenNotRevoked(token_to_revoke) for token in tokens: self._assertTokenNotRevoked(token) event = self._revoke_by_grant(role_id=self.role_ids[0], user_id=self.user_ids[0], project_id=self.project_ids[0]) self._assertTokenRevoked(token_to_revoke) for token in tokens: self._assertTokenNotRevoked(token) self.remove_event(event) self._assertTokenNotRevoked(token_to_revoke) for token in tokens: self._assertTokenNotRevoked(token) token_to_revoke['roles'] = [self.role_ids[0], self.role_ids[1], self.role_ids[2]] event = self._revoke_by_grant(role_id=self.role_ids[0], user_id=self.user_ids[0], project_id=self.project_ids[0]) self._assertTokenRevoked(token_to_revoke) self.remove_event(event) self._assertTokenNotRevoked(token_to_revoke) event = self._revoke_by_grant(role_id=self.role_ids[1], user_id=self.user_ids[0], project_id=self.project_ids[0]) self._assertTokenRevoked(token_to_revoke) self.remove_event(event) self._assertTokenNotRevoked(token_to_revoke) self._revoke_by_grant(role_id=self.role_ids[0], user_id=self.user_ids[0], project_id=self.project_ids[0]) self._revoke_by_grant(role_id=self.role_ids[1], user_id=self.user_ids[0], project_id=self.project_ids[0]) self._revoke_by_grant(role_id=self.role_ids[2], user_id=self.user_ids[0], project_id=self.project_ids[0]) self._assertTokenRevoked(token_to_revoke)
[docs] def test_by_project_and_user_and_role(self): user_id1 = _new_id() user_id2 = _new_id() project_id = _new_id() self.events.append(self._revoke_by_user(user_id1)) self.events.append( self._revoke_by_user_and_project(user_id2, project_id)) token_data = _sample_blank_token() token_data['user_id'] = user_id2 token_data['project_id'] = project_id self._assertTokenRevoked(token_data)
[docs] def test_by_domain_user(self): # If revoke a domain, then a token for a user in the domain is revoked user_id = _new_id() domain_id = _new_id() token_data = _sample_blank_token() token_data['user_id'] = user_id token_data['identity_domain_id'] = domain_id self._revoke_by_domain(domain_id) self._assertTokenRevoked(token_data)
[docs] def test_by_domain_project(self): # If revoke a domain, then a token scoped to a project in the domain # is revoked. user_id = _new_id() user_domain_id = _new_id() project_id = _new_id() project_domain_id = _new_id() token_data = _sample_blank_token() token_data['user_id'] = user_id token_data['identity_domain_id'] = user_domain_id token_data['project_id'] = project_id token_data['assignment_domain_id'] = project_domain_id self._revoke_by_domain(project_domain_id) self._assertTokenRevoked(token_data)
[docs] def test_by_domain_domain(self): # If revoke a domain, then a token scoped to the domain is revoked. user_id = _new_id() user_domain_id = _new_id() domain_id = _new_id() token_data = _sample_blank_token() token_data['user_id'] = user_id token_data['identity_domain_id'] = user_domain_id token_data['assignment_domain_id'] = domain_id self._revoke_by_domain(domain_id) self._assertTokenRevoked(token_data)
def _assertEmpty(self, collection): return self.assertEqual(0, len(collection), "collection not empty") def _assertEventsMatchIteration(self, turn): self.assertEqual(1, len(self.tree.revoke_map)) self.assertEqual(turn + 1, len(self.tree.revoke_map ['trust_id=*'] ['consumer_id=*'] ['access_token_id=*'] ['audit_id=*'] ['audit_chain_id=*'])) # two different functions add domain_ids, +1 for None self.assertEqual(2 * turn + 1, len(self.tree.revoke_map ['trust_id=*'] ['consumer_id=*'] ['access_token_id=*'] ['audit_id=*'] ['audit_chain_id=*'] ['expires_at=*'])) # two different functions add project_ids, +1 for None self.assertEqual(2 * turn + 1, len(self.tree.revoke_map ['trust_id=*'] ['consumer_id=*'] ['access_token_id=*'] ['audit_id=*'] ['audit_chain_id=*'] ['expires_at=*'] ['domain_id=*'])) # 10 users added self.assertEqual(turn, len(self.tree.revoke_map ['trust_id=*'] ['consumer_id=*'] ['access_token_id=*'] ['audit_id=*'] ['audit_chain_id=*'] ['expires_at=*'] ['domain_id=*'] ['project_id=*']))
[docs] def test_cleanup(self): events = self.events self._assertEmpty(self.tree.revoke_map) expiry_base_time = _future_time() for i in range(0, 10): events.append( self._revoke_by_user(_new_id())) args = (_new_id(), expiry_base_time + datetime.timedelta(seconds=i)) events.append( self._revoke_by_expiration(*args)) self.assertEqual(i + 2, len(self.tree.revoke_map ['trust_id=*'] ['consumer_id=*'] ['access_token_id=*'] ['audit_id=*'] ['audit_chain_id=*']), 'adding %s to %s' % (args, self.tree.revoke_map)) events.append( self._revoke_by_project_role_assignment(_new_id(), _new_id())) events.append( self._revoke_by_domain_role_assignment(_new_id(), _new_id())) events.append( self._revoke_by_domain_role_assignment(_new_id(), _new_id())) events.append( self._revoke_by_user_and_project(_new_id(), _new_id())) self._assertEventsMatchIteration(i + 1) for event in self.events: self.tree.remove_event(event) self._assertEmpty(self.tree.revoke_map)

Project Source