Source code for ironic.common.inspection_rules.validation

# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import enum

import jsonschema

from ironic.common import args
from ironic.common import exception
from ironic.common.i18n import _
from ironic.common.inspection_rules import actions
from ironic.common.inspection_rules import operators
from ironic.common import utils as common_utils


_CONDITIONS_SCHEMA = None
_ACTIONS_SCHEMA = None


[docs] class InspectionPhase(enum.Enum): MAIN = 'main'
[docs] def conditions_schema(): global _CONDITIONS_SCHEMA if _CONDITIONS_SCHEMA is None: condition_plugins = list(operators.OPERATORS.keys()) condition_plugins.extend( ["!%s" % op for op in list(condition_plugins)]) _CONDITIONS_SCHEMA = { "title": "Inspection rule conditions schema", "type": "array", "minItems": 0, "items": { "type": "object", "required": ["op", "args"], "properties": { "op": { "description": "Condition operator", "enum": condition_plugins }, "args": { "description": "Arguments for the condition", "type": ["array", "object"] }, "multiple": { "description": "How to treat multiple values", "enum": ["any", "all", "first", "last"] }, "loop": { "description": "Loop behavior for conditions", "type": ["array", "object"] }, }, # other properties are validated by plugins "additionalProperties": True } } return _CONDITIONS_SCHEMA
[docs] def actions_schema(): global _ACTIONS_SCHEMA if _ACTIONS_SCHEMA is None: action_plugins = list(actions.ACTIONS.keys()) _ACTIONS_SCHEMA = { "title": "Inspection rule actions schema", "type": "array", "minItems": 1, "items": { "type": "object", "required": ["op", "args"], "properties": { "op": { "description": "action operator", "enum": action_plugins }, "args": { "description": "Arguments for the action", "type": ["array", "object"] }, "loop": { "description": "Loop behavior for actions", "type": ["array", "object"] }, }, "additionalProperties": True } } return _ACTIONS_SCHEMA
SCHEMA = { 'type': 'object', 'properties': { 'uuid': {'type': ['string', 'null']}, 'priority': {'type': 'integer', "minimum": 0}, 'description': {'type': ['string', 'null'], 'maxLength': 255}, 'sensitive': {'type': ['boolean', 'null']}, 'phase': {'type': ['string', 'null'], 'maxLength': 16}, "conditions": conditions_schema(), "actions": actions_schema() }, 'required': ['actions'], "additionalProperties": False } VALIDATOR = args.and_valid( args.schema(SCHEMA), args.dict_valid(uuid=args.uuid) )
[docs] def validate_inspection_rule(rule): """Validate an inspection rule using the JSON schema. :param rule: The inspection rule to validate. :raises: Invalid if the rule is invalid. """ if not rule.get('conditions'): rule['conditions'] = [] errors = [] try: jsonschema.validate(rule, SCHEMA) except jsonschema.ValidationError as e: errors.append(_('Validation failed for inspection rule: %s') % e) phase = rule.get('phase', InspectionPhase.MAIN.value) if phase not in (p.value for p in InspectionPhase): errors.append( _('Invalid phase: %(phase)s. Valid phases are: %(valid)s') % { 'phase': phase, 'valid': ', '.join( [p.value for p in InspectionPhase]) }) priority = rule.get('priority', 0) if priority < 0 and not rule['built_in']: errors.append( _("Priority cannot be negative for user-defined rules.")) if priority > 9999 and not rule['built_in']: errors.append( _("Priority must be between 0 and 9999 for user-defined rules.")) # Additional plugin-specific validation for condition in rule.get('conditions', []): op, invtd = common_utils.parse_inverted_operator( condition['op']) plugin = operators.get_operator(op) if not plugin or not callable(plugin): errors.append( _('Unsupported condition operator: %s') % op) try: plugin().validate(**condition) except ValueError as exc: errors.append(_('Invalid parameters for condition operator ' '%(op)s: %(error)s') % {'op': op, 'error': exc}) for action in rule['actions']: plugin = actions.get_action(action['op']) if not plugin or not callable(plugin): errors.append(_('Unsupported action operator: %s') % action['op']) try: plugin().validate(**action) except ValueError as exc: errors.append(_('Invalid parameters for action operator %(op)s: ' '%(error)s') % {'op': action['op'], 'error': exc}) if errors: if len(errors) == 1: raise exception.Invalid(errors[0]) else: raise exception.Invalid(_('Multiple validation errors occurred: ' '%s') % '; '.join(errors))