mistral.engine package

Submodules

mistral.engine.action_handler module

mistral.engine.action_queue module

mistral.engine.actions module

class mistral.engine.actions.Action(action_def, action_ex=None, task_ex=None)

Bases: object

Action.

Represents a workflow action and defines interface that can be used by Mistral engine or its components in order to manipulate with actions.

complete(result)

Complete action and process its result.

Parameters:result – Action result.
fail(msg)
is_sync(input_dict)

Determines if action is synchronous.

Parameters:input_dict – Dictionary with input parameters.
run(input_dict, target, index=0, desc='', save=True, safe_rerun=False)

Immediately run action.

This method runs method w/o scheduling its run for a later time. From engine perspective action will be processed in synchronous mode.

Parameters:
  • input_dict – Action input.
  • target – Target (group of action executors).
  • index – Action execution index. Makes sense for some types.
  • desc – Action execution description.
  • save – True if action execution object needs to be saved.
  • safe_rerun – If true, action would be re-run if executor dies during execution.
Returns:

Action output.

schedule(input_dict, target, index=0, desc='', safe_rerun=False)

Schedule action run.

This method is needed to schedule action run so its result can be received later by engine. In this sense it will be running in asynchronous mode from engine perspective (don’t confuse with executor asynchrony when executor doesn’t immediately send a result).

Parameters:
  • input_dict – Action input.
  • target – Target (group of action executors).
  • index – Action execution index. Makes sense for some types.
  • desc – Action execution description.
  • safe_rerun – If true, action would be re-run if executor dies during execution.
validate_input(input_dict)

Validates action input parameters.

Parameters:input_dict – Dictionary with input parameters.
class mistral.engine.actions.AdHocAction(action_def, action_ex=None, task_ex=None)

Bases: mistral.engine.actions.PythonAction

Ad-hoc action.

validate_input(input_dict)
class mistral.engine.actions.PythonAction(action_def, action_ex=None, task_ex=None)

Bases: mistral.engine.actions.Action

Regular Python action.

complete(*args, **kwargs)
is_sync(input_dict)
run(*args, **kwargs)
schedule(*args, **kwargs)
validate_input(input_dict)
class mistral.engine.actions.WorkflowAction(action_def, action_ex=None, task_ex=None)

Bases: mistral.engine.actions.Action

Workflow action.

complete(*args, **kwargs)
is_sync(input_dict)
run(*args, **kwargs)
schedule(*args, **kwargs)
validate_input(input_dict)

mistral.engine.base module

class mistral.engine.base.Engine

Bases: object

Engine interface.

on_action_complete(action_ex_id, result, wf_action=False, async_=False)

Accepts action result and continues the workflow.

Action execution result here is a result which comes from an action/workflow associated which the task.

Parameters:
  • action_ex_id – Action execution id.
  • result – Action/workflow result. Instance of mistral.workflow.base.Result
  • wf_action – If True it means that the given id points to a workflow execution rather than action execution. It happens when a nested workflow execution sends its result to a parent workflow.
  • async – If True, run action in asynchronous mode (w/o waiting for completion).
Returns:

Action(or workflow if wf_action=True) execution object.

pause_workflow(wf_ex_id)

Pauses workflow.

Parameters:wf_ex_id – Execution id.
Returns:Workflow execution object.
rerun_workflow(task_ex_id, reset=True, env=None)

Rerun workflow from the specified task.

Parameters:
  • task_ex_id – Task execution id.
  • reset – If True, reset task state including deleting its action executions.
  • env – Workflow environment.
Returns:

Workflow execution object.

resume_workflow(wf_ex_id, env=None)

Resumes workflow.

Parameters:
  • wf_ex_id – Execution id.
  • env – Workflow environment.
Returns:

Workflow execution object.

rollback_workflow(wf_ex_id)

Rolls back workflow execution.

Parameters:wf_ex_id – Execution id.
Returns:Workflow execution object.
start_action(action_name, action_input, description=None, **params)

Starts the specific action.

Parameters:
  • action_name – Action name.
  • action_input – Action input data as a dictionary.
  • description – Execution description.
  • params – Additional options for action running.
Returns:

Action execution object.

start_workflow(wf_identifier, wf_input, description='', **params)

Starts the specified workflow.

Parameters:
  • wf_identifier – Workflow ID or name. Workflow ID is recommended, workflow name will be deprecated since Mitaka.
  • wf_input – Workflow input data as a dictionary.
  • description – Execution description.
  • params – Additional workflow type specific parameters.
Returns:

Workflow execution object.

stop_workflow(wf_ex_id, state, message)

Stops workflow.

Parameters:
  • wf_ex_id – Workflow execution id.
  • state – State assigned to the workflow. Permitted states are SUCCESS or ERROR.
  • message – Optional information string.
Returns:

Workflow execution.

class mistral.engine.base.EventEngine

Bases: object

Action event trigger interface.

create_event_trigger(trigger, events)
delete_event_trigger(trigger, events)
class mistral.engine.base.Executor

Bases: object

Action executor interface.

run_action(action_ex_id, action_class_str, attributes, action_params, safe_rerun, redelivered=False)

Runs action.

Parameters:
  • action_ex_id – Corresponding action execution id.
  • action_class_str – Path to action class in dot notation.
  • attributes – Attributes of action class which will be set to.
  • action_params – Action parameters.
  • safe_rerun – Tells if given action can be safely rerun.
  • redelivered – Tells if given action was run before on another executor.
class mistral.engine.base.TaskPolicy

Bases: object

Task policy.

Provides interface to perform any work after a task has completed. An example of task policy may be ‘retry’ policy that makes engine to run a task repeatedly if it finishes with a failure.

after_task_complete(task_ex, task_spec)

Called right after task completes.

Parameters:
  • task_ex – Completed task DB model.
  • task_spec – Completed task specification.
before_task_start(task_ex, task_spec)

Called right before task start.

Parameters:
  • task_ex – DB model for task that is about to start.
  • task_spec – Task specification.

mistral.engine.default_engine module

class mistral.engine.default_engine.DefaultEngine

Bases: mistral.engine.base.Engine

on_action_complete(*args, **kw)
pause_workflow(wf_ex_id)
rerun_workflow(*args, **kw)
resume_workflow(*args, **kw)
rollback_workflow(wf_ex_id)
start_action(*args, **kw)
start_workflow(*args, **kw)
stop_workflow(wf_ex_id, state, message=None)

mistral.engine.default_executor module

class mistral.engine.default_executor.DefaultExecutor

Bases: mistral.engine.base.Executor

run_action(*args, **kwargs)

Runs action.

Parameters:
  • action_ex_id – Action execution id.
  • action_class_str – Path to action class in dot notation.
  • attributes – Attributes of action class which will be set to.
  • action_params – Action parameters.
  • safe_rerun – Tells if given action can be safely rerun.
  • redelivered – Tells if given action was run before on another executor.

mistral.engine.dispatcher module

mistral.engine.engine_server module

class mistral.engine.engine_server.EngineServer(engine, setup_profiler=True)

Bases: mistral.service.base.MistralService

Engine server.

This class manages engine life-cycle and gets registered as an RPC endpoint to process engine specific calls. It also registers a cluster member associated with this instance of engine.

on_action_complete(rpc_ctx, action_ex_id, result, wf_action)

Receives RPC calls to communicate action result to engine.

Parameters:
  • rpc_ctx – RPC request context.
  • action_ex_id – Action execution id.
  • result – Action result data.
  • wf_action – True if given id points to a workflow execution.
Returns:

Action execution.

pause_workflow(rpc_ctx, execution_id)

Receives calls over RPC to pause workflows on engine.

Parameters:
  • rpc_ctx – Request context.
  • execution_id – Workflow execution id.
Returns:

Workflow execution.

rerun_workflow(rpc_ctx, task_ex_id, reset=True, env=None)

Receives calls over RPC to rerun workflows on engine.

Parameters:
  • rpc_ctx – RPC request context.
  • task_ex_id – Task execution id.
  • reset – If true, then purge action execution for the task.
  • env – Environment variables to update.
Returns:

Workflow execution.

resume_workflow(rpc_ctx, wf_ex_id, env=None)

Receives calls over RPC to resume workflows on engine.

Parameters:
  • rpc_ctx – RPC request context.
  • wf_ex_id – Workflow execution id.
  • env – Environment variables to update.
Returns:

Workflow execution.

rollback_workflow(rpc_ctx, execution_id)

Receives calls over RPC to rollback workflows on engine.

Parameters:rpc_ctx – RPC request context.
Returns:Workflow execution.
start()
start_action(rpc_ctx, action_name, action_input, description, params)

Receives calls over RPC to start actions on engine.

Parameters:
  • rpc_ctx – RPC request context.
  • action_name – name of the Action.
  • action_input – input dictionary for Action.
  • description – description of new Action execution.
  • params – extra parameters to run Action.
Returns:

Action execution.

start_workflow(rpc_ctx, workflow_identifier, workflow_input, description, params)

Receives calls over RPC to start workflows on engine.

Parameters:
  • rpc_ctx – RPC request context.
  • workflow_identifier – Workflow definition identifier.
  • workflow_input – Workflow input.
  • description – Workflow execution description.
  • params – Additional workflow type specific parameters.
Returns:

Workflow execution.

stop(graceful=False)
stop_workflow(rpc_ctx, execution_id, state, message=None)

Receives calls over RPC to stop workflows on engine.

Sets execution state to SUCCESS or ERROR. No more tasks will be scheduled. Running tasks won’t be killed, but their results will be ignored.

Parameters:
  • rpc_ctx – RPC request context.
  • execution_id – Workflow execution id.
  • state – State assigned to the workflow. Permitted states are SUCCESS or ERROR.
  • message – Optional information string.
Returns:

Workflow execution.

mistral.engine.executor_server module

class mistral.engine.executor_server.ExecutorServer(executor, setup_profiler=True)

Bases: mistral.service.base.MistralService

Executor server.

This class manages executor life-cycle and gets registered as an RPC endpoint to process executor specific calls. It also registers a cluster member associated with this instance of executor.

run_action(rpc_ctx, action_ex_id, action_class_str, attributes, params, safe_rerun)

Receives calls over RPC to run action on executor.

Parameters:
  • rpc_ctx – RPC request context dictionary.
  • action_ex_id – Action execution id.
  • action_class_str – Action class name.
  • attributes – Action class attributes.
  • params – Action input parameters.
  • safe_rerun – Tells if given action can be safely rerun.
Returns:

Action result.

start()
stop(graceful=False)

mistral.engine.policies module

class mistral.engine.policies.ConcurrencyPolicy(concurrency)

Bases: mistral.engine.base.TaskPolicy

before_task_start(task_ex, task_spec)
class mistral.engine.policies.PauseBeforePolicy(expression)

Bases: mistral.engine.base.TaskPolicy

before_task_start(task_ex, task_spec)
class mistral.engine.policies.RetryPolicy(count, delay, break_on, continue_on)

Bases: mistral.engine.base.TaskPolicy

after_task_complete(task_ex, task_spec)

Possible Cases:

  1. state = SUCCESS if continue_on is not specified, no need to move to next iteration; if current:count achieve retry:count then policy breaks the loop (regardless on continue-on condition); otherwise - check continue_on condition and if it is True - schedule the next iteration, otherwise policy breaks the loop.
  2. retry:count = 5, current:count = 2, state = ERROR, state = IDLE/DELAYED, current:count = 3

3. retry:count = 5, current:count = 4, state = ERROR Iterations complete therefore state = #{state}, current:count = 4.

class mistral.engine.policies.TimeoutPolicy(timeout_sec)

Bases: mistral.engine.base.TaskPolicy

before_task_start(task_ex, task_spec)
class mistral.engine.policies.WaitAfterPolicy(delay)

Bases: mistral.engine.base.TaskPolicy

after_task_complete(task_ex, task_spec)
class mistral.engine.policies.WaitBeforePolicy(delay)

Bases: mistral.engine.base.TaskPolicy

before_task_start(task_ex, task_spec)

mistral.engine.task_handler module

mistral.engine.tasks module

class mistral.engine.tasks.RegularTask(wf_ex, wf_spec, task_spec, ctx, task_ex=None, unique_key=None, waiting=False)

Bases: mistral.engine.tasks.Task

Regular task.

Takes care of processing regular tasks with one action.

on_action_complete(*args, **kwargs)
run(*args, **kwargs)
class mistral.engine.tasks.Task(wf_ex, wf_spec, task_spec, ctx, task_ex=None, unique_key=None, waiting=False)

Bases: object

Task.

Represents a workflow task and defines interface that can be used by Mistral engine or its components in order to manipulate with tasks.

complete(*args, **kwargs)

Complete task and set specified state.

Method sets specified task state and runs all necessary post completion logic such as publishing workflow variables and scheduling new workflow commands.

Parameters:
  • state – New task state.
  • state_info – New state information (i.e. error message).
defer(*args, **kwargs)

Defers task.

This method puts task to a waiting state.

is_completed()
is_created()
is_state_changed()
is_waiting()
on_action_complete(action_ex)

Handle action completion.

Parameters:action_ex – Action execution.
reset()
run()

Runs task.

set_state(*args, **kwargs)

Sets task state without executing post completion logic.

Parameters:
  • state – New task state.
  • state_info – New state information (i.e. error message).
  • processed – New “processed” flag value.
class mistral.engine.tasks.WithItemsTask(wf_ex, wf_spec, task_spec, ctx, task_ex=None, unique_key=None, waiting=False)

Bases: mistral.engine.tasks.RegularTask

With-items task.

Takes care of processing “with-items” tasks.

is_with_items_completed()
on_action_complete(*args, **kwargs)

mistral.engine.utils module

mistral.engine.workflow_handler module

mistral.engine.workflows module

class mistral.engine.workflows.Workflow(wf_def, wf_ex=None)

Bases: object

Workflow.

Represents a workflow and defines interface that can be used by Mistral engine or its components in order to manipulate with workflows.

check_and_complete(*args, **kwargs)

Completes the workflow if it needs to be completed.

The method simply checks if there are any tasks that are not in a terminal state. If there aren’t any then it performs all necessary logic to finalize the workflow (calculate output etc.). :return: Number of incomplete tasks.

lock(*args, **kwargs)
rerun(task_ex, reset=True, env=None)

Rerun workflow from the given task.

Parameters:
  • task_ex – Task execution that the workflow needs to rerun from.
  • reset – If True, reset task state including deleting its action executions.
  • env – Environment.
resume(env=None)

Resume workflow.

Parameters:env – Environment.
set_state(*args, **kwargs)
start(*args, **kwargs)

Start workflow.

Parameters:
  • input_dict – Workflow input.
  • desc – Workflow execution description.
  • params – Workflow type specific parameters.
stop(state, msg=None)

Stop workflow.

Parameters:
  • state – New workflow state.
  • msg – Additional explaining message.

Module contents