Source code for monasca_log_api.middleware.role_middleware

# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from oslo_config import cfg
from oslo_log import log
from oslo_middleware import base as om
from webob import response

CONF = cfg.CONF
LOG = log.getLogger(__name__)

role_m_opts = [
    cfg.ListOpt(name='path',
                default='/',
                help='List of paths where middleware applies to'),
    cfg.ListOpt(name='default_roles',
                default=None,
                help='List of roles allowed to enter api'),
    cfg.ListOpt(name='agent_roles',
                default=None,
                help=('List of roles, that if set, mean that request '
                      'comes from agent, thus is authorized in the same '
                      'time'))
]
role_m_group = cfg.OptGroup(name='roles_middleware', title='roles_middleware')

CONF.register_group(role_m_group)
CONF.register_opts(role_m_opts, role_m_group)

_X_IDENTITY_STATUS = 'X-Identity-Status'
_X_ROLES = 'X-Roles'
_X_MONASCA_LOG_AGENT = 'X-MONASCA-LOG-AGENT'
_CONFIRMED_STATUS = 'Confirmed'


def _ensure_lower_roles(roles):
    if not roles:
        return []
    return [role.strip().lower() for role in roles]


def _intersect(a, b):
    return list(set(a) & set(b))


[docs]class RoleMiddleware(om.ConfigurableMiddleware): """Authorization middleware for X-Roles header. RoleMiddleware is responsible for authorizing user's access against **X-Roles** header. Middleware expects authentication to be completed (i.e. keystone middleware has been already called). If tenant is authenticated and authorized middleware exits silently (that is considered a success). Otherwise middleware produces JSON response according to following schema .. code-block:: json { 'title': u'Unauthorized', 'message': explanation (str) } Configuration example .. code-block:: cfg [roles_middleware] path = /v2.0/log default_roles = monasca-user agent_roles = monasca-log-agent Configuration explained: * path (list) - path (or list of paths) middleware should be applied * agent_roles (list) - list of roles that identifies tenant as an agent * default_roles (list) - list of roles that should be authorized Note: Being an agent means that tenant is automatically authorized. Note: Middleware works only for configured paths and for all requests apart from HTTP method **OPTIONS**. """ def __init__(self, application, conf=None): super(RoleMiddleware, self).__init__(application, conf) middleware = CONF.roles_middleware self._path = middleware.path self._default_roles = _ensure_lower_roles(middleware.default_roles) self._agent_roles = _ensure_lower_roles(middleware.agent_roles) LOG.debug('RolesMiddleware initialized for paths=%s', self._path)
[docs] def process_request(self, req): if not self._can_apply_middleware(req): LOG.debug('%s skipped in role middleware', req.path) return None is_authenticated = self._is_authenticated(req) is_authorized, is_agent = self._is_authorized(req) tenant_id = req.headers.get('X-Tenant-Id') req.environ[_X_MONASCA_LOG_AGENT] = is_agent LOG.debug('%s is authenticated=%s, authorized=%s, log_agent=%s', tenant_id, is_authenticated, is_authorized, is_agent) if is_authenticated and is_authorized: LOG.debug('%s has been authenticated and authorized', tenant_id) return # do return nothing to enter API internal # whoops if is_authorized: explanation = u'Failed to authenticate request for %s' % tenant_id else: explanation = (u'Tenant %s is missing a required role to access ' u'this service' % tenant_id) if explanation is not None: LOG.error(explanation) json_body = {u'title': u'Unauthorized', u'message': explanation} return response.Response(status=401, json_body=json_body, content_type='application/json')
def _is_authorized(self, req): headers = req.headers roles = headers.get(_X_ROLES) if not roles: LOG.warning('Couldn\'t locate %s header,or it was empty', _X_ROLES) return False, False else: roles = _ensure_lower_roles(roles.split(',')) is_agent = len(_intersect(roles, self._agent_roles)) > 0 is_authorized = (len(_intersect(roles, self._default_roles)) > 0 or is_agent) return is_authorized, is_agent def _is_authenticated(self, req): headers = req.headers if _X_IDENTITY_STATUS in headers: status = req.headers.get(_X_IDENTITY_STATUS) return _CONFIRMED_STATUS == status return False def _can_apply_middleware(self, req): path = req.path method = req.method if method == 'OPTIONS': return False if self._path: for p in self._path: if path.startswith(p): return True return False # if no configured paths, or nothing matches