Overview

This is engine that schedules tasks to workers – separate processes dedicated for certain atoms execution, possibly running on other machines, connected via amqp (or other supported kombu transports).

Note

This engine is under active development and is usable and does work but is missing some features (please check the blueprint page for known issues and plans) that will make it more production ready.

Terminology

Client
Code or program or service (or user) that uses this library to define flows and run them via engines.
Transport + protocol
Mechanism (and protocol on top of that mechanism) used to pass information between the client and worker (for example amqp as a transport and a json encoded message format as the protocol).
Executor
Part of the worker-based engine and is used to publish task requests, so these requests can be accepted and processed by remote workers.
Worker
Workers are started on remote hosts and each has a list of tasks it can perform (on request). Workers accept and process task requests that are published by an executor. Several requests can be processed simultaneously in separate threads (or processes...). For example, an executor can be passed to the worker and configured to run in as many threads (green or not) as desired.
Proxy
Executors interact with workers via a proxy. The proxy maintains the underlying transport and publishes messages (and invokes callbacks on message reception).

Requirements

  • Transparent: it should work as ad-hoc replacement for existing (local) engines with minimal, if any refactoring (e.g. it should be possible to run the same flows on it without changing client code if everything is set up and configured properly).
  • Transport-agnostic: the means of transport should be abstracted so that we can use oslo.messaging, gearmand, amqp, zookeeper, marconi, websockets or anything else that allows for passing information between a client and a worker.
  • Simple: it should be simple to write and deploy.
  • Non-uniformity: it should support non-uniform workers which allows different workers to execute different sets of atoms depending on the workers published capabilities.

Design

There are two communication sides, the executor (and associated engine derivative) and worker that communicate using a proxy component. The proxy is designed to accept/publish messages from/into a named exchange.

High level architecture

_images/worker-engine.svg

Executor and worker communication

Let’s consider how communication between an executor and a worker happens. First of all an engine resolves all atoms dependencies and schedules atoms that can be performed at the moment. This uses the same scheduling and dependency resolution logic that is used for every other engine type. Then the atoms which can be executed immediately (ones that are dependent on outputs of other tasks will be executed when that output is ready) are executed by the worker-based engine executor in the following manner:

  1. The executor initiates task execution/reversion using a proxy object.
  2. Proxy publishes task request (format is described below) into a named exchange using a routing key that is used to deliver request to particular workers topic. The executor then waits for the task requests to be accepted and confirmed by workers. If the executor doesn’t get a task confirmation from workers within the given timeout the task is considered as timed-out and a timeout exception is raised.
  3. A worker receives a request message and starts a new thread for processing it.
    1. The worker dispatches the request (gets desired endpoint that actually executes the task).
    2. If dispatched succeeded then the worker sends a confirmation response to the executor otherwise the worker sends a failed response along with a serialized failure object that contains what has failed (and why).
    3. The worker executes the task and once it is finished sends the result back to the originating executor (every time a task progress event is triggered it sends progress notification to the executor where it is handled by the engine, dispatching to listeners and so-on).
  4. The executor gets the task request confirmation from the worker and the task request state changes from the PENDING to the RUNNING state. Once a task request is in the RUNNING state it can’t be timed-out (considering that the task execution process may take an unpredictable amount of time).
  5. The executor gets the task execution result from the worker and passes it back to the executor and worker-based engine to finish task processing (this repeats for subsequent tasks).

Note

Failure objects are not directly json-serializable (they contain references to tracebacks which are not serializable), so they are converted to dicts before sending and converted from dicts after receiving on both executor & worker sides (this translation is lossy since the traceback can’t be fully retained, due to its contents containing internal interpreter references and details).

Protocol

taskflow.engines.worker_based.protocol.make_an_event(new_state)[source]

Turns a new/target state into an event name.

taskflow.engines.worker_based.protocol.build_a_machine(freeze=True)[source]

Builds a state machine that requests are allowed to go through.

taskflow.engines.worker_based.protocol.failure_to_dict(failure)[source]

Attempts to convert a failure object into a jsonifyable dictionary.

class taskflow.engines.worker_based.protocol.Message[source]

Bases: object

Base class for all message types.

to_dict()[source]

Return json-serializable message representation.

class taskflow.engines.worker_based.protocol.Notify(**data)[source]

Bases: taskflow.engines.worker_based.protocol.Message

Represents notify message type.

TYPE = 'NOTIFY'

String constant representing this message type.

RESPONSE_SCHEMA = {'properties': {'topic': {'type': 'string'}, 'tasks': {'type': 'array', 'items': {'type': 'string'}}}, 'type': 'object', 'additionalProperties': False, 'required': ['topic', 'tasks']}

Expected notify response message schema (in json schema format).

SENDER_SCHEMA = {'type': 'object', 'additionalProperties': False}

Expected sender request message schema (in json schema format).

class taskflow.engines.worker_based.protocol.Request(task, uuid, action, arguments, timeout=60, result=<object object>, failures=None)[source]

Bases: taskflow.engines.worker_based.protocol.Message

Represents request with execution results.

Every request is created in the WAITING state and is expired within the given timeout if it does not transition out of the (WAITING, PENDING) states.

State machine a request goes through as it progresses (or expires):

+------------+------------+---------+----------+---------+
|   Start    |   Event    |   End   | On Enter | On Exit |
+------------+------------+---------+----------+---------+
| FAILURE[$] |     .      |    .    |    .     |    .    |
|  PENDING   | on_failure | FAILURE |    .     |    .    |
|  PENDING   | on_running | RUNNING |    .     |    .    |
|  RUNNING   | on_failure | FAILURE |    .     |    .    |
|  RUNNING   | on_success | SUCCESS |    .     |    .    |
| SUCCESS[$] |     .      |    .    |    .     |    .    |
| WAITING[^] | on_failure | FAILURE |    .     |    .    |
| WAITING[^] | on_pending | PENDING |    .     |    .    |
+------------+------------+---------+----------+---------+
TYPE = 'REQUEST'

String constant representing this message type.

SCHEMA = {'properties': {'task_version': {'oneOf': [{'type': 'string'}, {'type': 'array'}]}, 'task_cls': {'type': 'string'}, 'arguments': {'type': 'object'}, 'failures': {'type': 'object'}, 'action': {'type': 'string', 'enum': ['revert', 'execute']}, 'result': {}, 'task_name': {'type': 'string'}}, 'type': 'object', 'required': ['task_cls', 'task_name', 'task_version', 'action']}

Expected message schema (in json schema format).

current_state

Current state the request is in.

set_result(result)[source]

Sets the responses futures result.

expired

Check if request has expired.

When new request is created its state is set to the WAITING, creation time is stored and timeout is given via constructor arguments.

Request is considered to be expired when it is in the WAITING/PENDING state for more then the given timeout (it is not considered to be expired in any other state).

to_dict()[source]

Return json-serializable request.

To convert requests that have failed due to some exception this will convert all failure.Failure objects into dictionaries (which will then be reconstituted by the receiver).

transition_and_log_error(new_state, logger=None)[source]

Transitions and logs an error if that transitioning raises.

This overlays the transition function and performs nearly the same functionality but instead of raising if the transition was not valid it logs a warning to the provided logger and returns False to indicate that the transition was not performed (note that this is different from the transition function where False means ignored).

transition(*args, **kwargs)[source]

Transitions the request to a new state.

If transition was performed, it returns True. If transition was ignored, it returns False. If transition was not valid (and will not be performed), it raises an InvalidState exception.

static from_dict(data, task_uuid=None)[source]

Parses validated data into a work unit.

All Failure objects that have been converted to dict(s) on the remote side will now converted back to py:class:~taskflow.types.failure.Failure objects.

class taskflow.engines.worker_based.protocol.Response(state, **data)[source]

Bases: taskflow.engines.worker_based.protocol.Message

Represents response message type.

TYPE = 'RESPONSE'

String constant representing this message type.

SCHEMA = {'properties': {'data': {'anyOf': [{'$ref': '#/definitions/event'}, {'$ref': '#/definitions/completion'}, {'$ref': '#/definitions/empty'}]}, 'state': {'type': 'string', 'enum': ['WAITING', 'PENDING', 'RUNNING', 'SUCCESS', 'FAILURE', 'EVENT']}}, 'type': 'object', 'definitions': {'event': {'properties': {'event_type': {'type': 'string'}, 'details': {'type': 'object'}}, 'type': 'object', 'additionalProperties': False, 'required': ['event_type', 'details']}, 'empty': {'type': 'object', 'additionalProperties': False}, 'completion': {'properties': {'result': {}}, 'type': 'object', 'additionalProperties': False, 'required': ['result']}}, 'additionalProperties': False, 'required': ['state', 'data']}

Expected message schema (in json schema format).

Examples

Request (execute)

  • task_name - full task name to be performed
  • task_cls - full task class name to be performed
  • action - task action to be performed (e.g. execute, revert)
  • arguments - arguments the task action to be called with
  • result - task execution result (result or Failure) [passed to revert only]

Additionally, the following parameters are added to the request message:

  • reply_to - executor named exchange workers will send responses back to
  • correlation_id - executor request id (since there can be multiple request being processed simultaneously)

Example:

{
    "action": "execute",
    "arguments": {
        "x": 111
    },
    "task_cls": "taskflow.tests.utils.TaskOneArgOneReturn",
    "task_name": "taskflow.tests.utils.TaskOneArgOneReturn",
    "task_version": [
        1,
        0
    ]
}

Request (revert)

When reverting:

{
    "action": "revert",
    "arguments": {},
    "failures": {
        "taskflow.tests.utils.TaskWithFailure": {
            "exc_type_names": [
                "RuntimeError",
                "StandardError",
                "Exception"
            ],
            "exception_str": "Woot!",
            "traceback_str": "  File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n    result = task.execute(**arguments)\n  File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n    raise RuntimeError('Woot!')\n",
            "version": 1
        }
    },
    "result": [
        "failure",
        {
            "exc_type_names": [
                "RuntimeError",
                "StandardError",
                "Exception"
            ],
            "exception_str": "Woot!",
            "traceback_str": "  File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n    result = task.execute(**arguments)\n  File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n    raise RuntimeError('Woot!')\n",
            "version": 1
        }
    ],
    "task_cls": "taskflow.tests.utils.TaskWithFailure",
    "task_name": "taskflow.tests.utils.TaskWithFailure",
    "task_version": [
        1,
        0
    ]
}

Worker response(s)

When running:

{
    "data": {},
    "state": "RUNNING"
}

When progressing:

{
    "details": {
        "progress": 0.5
    },
    "event_type": "update_progress",
    "state": "EVENT"
}

When succeeded:

{
    "data": {
        "result": 666
    },
    "state": "SUCCESS"
}

When failed:

{
    "data": {
        "result": {
            "exc_type_names": [
                "RuntimeError",
                "StandardError",
                "Exception"
            ],
            "exception_str": "Woot!",
            "traceback_str": "  File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n    result = task.execute(**arguments)\n  File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n    raise RuntimeError('Woot!')\n",
            "version": 1
        }
    },
    "state": "FAILURE"
}

Request state transitions

WBE request state transitions

WAITING - Request placed on queue (or other kombu message bus/transport) but not yet consumed.

PENDING - Worker accepted request and is pending to run using its executor (threads, processes, or other).

FAILURE - Worker failed after running request (due to task exception) or no worker moved/started executing (by placing the request into RUNNING state) with-in specified time span (this defaults to 60 seconds unless overridden).

RUNNING - Workers executor (using threads, processes...) has started to run requested task (once this state is transitioned to any request timeout no longer becomes applicable; since at this point it is unknown how long a task will run since it can not be determined if a task is just taking a long time or has failed).

SUCCESS - Worker finished running task without exception.

Note

During the WAITING and PENDING stages the engine keeps track of how long the request has been alive for and if a timeout is reached the request will automatically transition to FAILURE and any further transitions from a worker will be disallowed (for example, if a worker accepts the request in the future and sets the task to PENDING this transition will be logged and ignored). This timeout can be adjusted and/or removed by setting the engine transition_timeout option to a higher/lower value or by setting it to None (to remove the timeout completely). In the future this will be improved to be more dynamic by implementing the blueprints associated with failover and info/resilence.

Usage

Workers

To use the worker based engine a set of workers must first be established on remote machines. These workers must be provided a list of task objects, task names, modules names (or entrypoints that can be examined for valid tasks) they can respond to (this is done so that arbitrary code execution is not possible).

For complete parameters and object usage please visit Worker.

Example:

from taskflow.engines.worker_based import worker as w

config = {
    'url': 'amqp://guest:guest@localhost:5672//',
    'exchange': 'test-exchange',
    'topic': 'test-tasks',
    'tasks': ['tasks:TestTask1', 'tasks:TestTask2'],
}
worker = w.Worker(**config)
worker.run()

Engines

To use the worker based engine a flow must be constructed (which contains tasks that are visible on remote machines) and the specific worker based engine entrypoint must be selected. Certain configuration options must also be provided so that the transport backend can be configured and initialized correctly. Otherwise the usage should be mostly transparent (and is nearly identical to using any other engine type).

For complete parameters and object usage please see WorkerBasedActionEngine.

Example with amqp transport:

flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine='worker-based',
                            url='amqp://guest:guest@localhost:5672//',
                            exchange='test-exchange',
                            topics=['topic1', 'topic2'])
eng.run()

Example with filesystem transport:

flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine='worker-based',
                            exchange='test-exchange',
                            topics=['topic1', 'topic2'],
                            transport='filesystem',
                            transport_options={
                                'data_folder_in': '/tmp/in',
                                'data_folder_out': '/tmp/out',
                            })
eng.run()

Additional supported keyword arguments:

  • executor: a class that provides a WorkerTaskExecutor interface; it will be used for executing, reverting and waiting for remote tasks.

Limitations

  • Atoms inside a flow must receive and accept parameters only from the ways defined in persistence. In other words, the task that is created when a workflow is constructed will not be the same task that is executed on a remote worker (and any internal state not passed via the input and output mechanism can not be transferred). This means resource objects (database handles, file descriptors, sockets, ...) can not be directly sent across to remote workers (instead the configuration that defines how to fetch/create these objects must be instead).
  • Worker-based engines will in the future be able to run lightweight tasks locally to avoid transport overhead for very simple tasks (currently it will run even lightweight tasks remotely, which may be non-performant).
  • Fault detection, currently when a worker acknowledges a task the engine will wait for the task result indefinitely (a task may take an indeterminate amount of time to finish). In the future there needs to be a way to limit the duration of a remote workers execution (and track their liveness) and possibly spawn the task on a secondary worker if a timeout is reached (aka the first worker has died or has stopped responding).

Implementations

class taskflow.engines.worker_based.engine.WorkerBasedActionEngine(flow, flow_detail, backend, options)[source]

Bases: taskflow.engines.action_engine.engine.ActionEngine

Worker based action engine.

Specific backend options (extracted from provided engine options):

Parameters:
  • exchange – broker exchange exchange name in which executor / worker communication is performed
  • url – broker connection url (see format in kombu documentation)
  • topics – list of workers topics to communicate with (this will also be learned by listening to the notifications that workers emit).
  • transport – transport to be used (e.g. amqp, memory, etc.)
  • transition_timeout – numeric value (or None for infinite) to wait for submitted remote requests to transition out of the (PENDING, WAITING) request states. When expired the associated task the request was made for will have its result become a RequestTimeout exception instead of its normally returned value (or raised exception).
  • transport_options – transport specific options (see: http://kombu.readthedocs.org/ for what these options imply and are expected to be)
  • retry_options – retry specific options (see: DEFAULT_RETRY_OPTIONS)
  • worker_expiry – numeric value (or negative/zero/None for infinite) that defines the number of seconds to continue to send messages to workers that have not responded back to a prior notification/ping request (this defaults to 60 seconds).

Components

Warning

External usage of internal engine functions, components and modules should be kept to a minimum as they may be altered, refactored or moved to other locations without notice (and without the typical deprecation cycle).

class taskflow.engines.worker_based.dispatcher.Handler(process_message, validator=None)[source]

Bases: object

Component(s) that will be called on reception of messages.

process_message

Main callback that is called to process a received message.

This is only called after the format has been validated (using the validator callback if applicable) and only after the message has been acknowledged.

validator

Optional callback that will be activated before processing.

This callback if present is expected to validate the message and raise InvalidFormat if the message is not valid.

class taskflow.engines.worker_based.dispatcher.TypeDispatcher(type_handlers=None, requeue_filters=None)[source]

Bases: object

Receives messages and dispatches to type specific handlers.

type_handlers

Dictionary of message type -> callback to handle that message.

The callback(s) will be activated by looking for a message property ‘type’ and locating a callback in this dictionary that maps to that type; if one is found it is expected to be a callback that accepts two positional parameters; the first being the message data and the second being the message object. If a callback is not found then the message is rejected and it will be up to the underlying message transport to determine what this means/implies...

requeue_filters

List of filters (callbacks) to request a message to be requeued.

The callback(s) will be activated before the message has been acked and it can be used to instruct the dispatcher to requeue the message instead of processing it. The callback, when called, will be provided two positional parameters; the first being the message data and the second being the message object. Using these provided parameters the filter should return a truthy object if the message should be requeued and a falsey object if it should not.

on_message(data, message)[source]

This method is called on incoming messages.

class taskflow.engines.worker_based.endpoint.Endpoint(task_cls)[source]

Bases: object

Represents a single task with execute/revert methods.

class taskflow.engines.worker_based.executor.WorkerTaskExecutor(uuid, exchange, topics, transition_timeout=60, url=None, transport=None, transport_options=None, retry_options=None, worker_expiry=60)[source]

Bases: taskflow.engines.action_engine.executor.TaskExecutor

Executes tasks on remote workers.

wait_for_workers(workers=1, timeout=None)[source]

Waits for geq workers to notify they are ready to do work.

NOTE(harlowja): if a timeout is provided this function will wait until that timeout expires, if the amount of workers does not reach the desired amount of workers before the timeout expires then this will return how many workers are still needed, otherwise it will return zero.

start()[source]

Starts message processing thread.

stop()[source]

Stops message processing thread.

class taskflow.engines.worker_based.proxy.Proxy(topic, exchange, type_handlers=None, on_wait=None, url=None, transport=None, transport_options=None, retry_options=None)[source]

Bases: object

A proxy processes messages from/to the named exchange.

For internal usage only (not for public consumption).

DEFAULT_RETRY_OPTIONS = {'interval_step': 1, 'max_retries': 3, 'interval_start': 1, 'interval_max': 1}

Settings used (by default) to reconnect under transient failures.

See: http://kombu.readthedocs.org/ (and connection ensure_options) for what these values imply/mean...

dispatcher

Dispatcher internally used to dispatch message(s) that match.

connection_details

Details about the connection (read-only).

is_running

Return whether the proxy is running.

publish(msg, routing_key, reply_to=None, correlation_id=None)[source]

Publish message to the named exchange with given routing key.

start()[source]

Start proxy.

wait()[source]

Wait until proxy is started.

stop()[source]

Stop proxy.

class taskflow.engines.worker_based.worker.Worker(exchange, topic, tasks, executor=None, threads_count=None, url=None, transport=None, transport_options=None, retry_options=None)[source]

Bases: object

Worker that can be started on a remote host for handling tasks requests.

Parameters:
  • url – broker url
  • exchange – broker exchange name
  • topic – topic name under which worker is stated
  • tasks – task list that worker is capable of performing, items in the list can be one of the following types; 1, a string naming the python module name to search for tasks in or the task class name; 2, a python module to search for tasks in; 3, a task class object that will be used to create tasks from.
  • executor – custom executor object that can used for processing requests in separate threads (if not provided one will be created)
  • threads_count – threads count to be passed to the default executor (used only if an executor is not passed in)
  • transport – transport to be used (e.g. amqp, memory, etc.)
  • transport_options – transport specific options (see: http://kombu.readthedocs.org/ for what these options imply and are expected to be)
  • retry_options – retry specific options (see: DEFAULT_RETRY_OPTIONS)
banner

A banner that can be useful to display before running.

run(display_banner=True, banner_writer=None)[source]

Runs the worker.

wait()[source]

Wait until worker is started.

stop()[source]

Stop worker.

class taskflow.engines.worker_based.types.TopicWorker(topic, tasks, identity=<object object>)[source]

Bases: object

A (read-only) worker and its relevant information + useful methods.

class taskflow.engines.worker_based.types.ProxyWorkerFinder(uuid, proxy, topics, beat_periodicity=5, worker_expiry=60)[source]

Bases: object

Requests and receives responses about workers topic+task details.

total_workers

Number of workers currently known.

wait_for_workers(workers=1, timeout=None)[source]

Waits for geq workers to notify they are ready to do work.

NOTE(harlowja): if a timeout is provided this function will wait until that timeout expires, if the amount of workers does not reach the desired amount of workers before the timeout expires then this will return how many workers are still needed, otherwise it will return zero.

messages_processed

How many notify response messages have been processed.

maybe_publish()[source]

Periodically called to publish notify message to each topic.

These messages (especially the responses) are how this find learns about workers and what tasks they can perform (so that we can then match workers to tasks to run).

process_response(data, message)[source]

Process notify message sent from remote side.

clean()[source]

Cleans out any dead/expired/not responding workers.

Returns how many workers were removed.

reset()[source]

Resets finders internal state.

get_worker_for_task(task)[source]

Gets a worker that can perform a given task.