Utilities

Warning

External usage of internal utility functions and modules should be kept to a minimum as they may be altered, refactored or moved to other locations without notice (and without the typical deprecation cycle).

Async

taskflow.utils.async_utils.make_completed_future(result)[source]

Make and return a future completed with a given result.

Banner

taskflow.utils.banner.make_banner(what, chapters)[source]

Makes a taskflow banner string.

For example:

>>> from taskflow.utils import banner
>>> chapters = {
    'Connection details': {
        'Topic': 'hello',
    },
    'Powered by': {
        'Executor': 'parallel',
    },
}
>>> print(banner.make_banner('Worker', chapters))

This will output:

___    __
 |    |_
 |ask |low v1.26.1
*Worker*
Connection details:
  Topic => hello
Powered by:
  Executor => parallel

Eventlet

taskflow.utils.eventlet_utils.check_for_eventlet(exc=None)[source]

Check if eventlet is available and if not raise a runtime error.

Parameters

exc (exception) – exception to raise instead of raising a runtime error

Iterators

taskflow.utils.iter_utils.fill(it, desired_len, filler=None)[source]

Iterates over a provided iterator up to the desired length.

If the source iterator does not have enough values then the filler value is yielded until the desired length is reached.

taskflow.utils.iter_utils.count(it)[source]

Returns how many values in the iterator (depletes the iterator).

taskflow.utils.iter_utils.generate_delays(delay, max_delay, multiplier=2)[source]

Generator/iterator that provides back delays values.

The values it generates increments by a given multiple after each iteration (using the max delay as a upper bound). Negative values will never be generated… and it will iterate forever (ie it will never stop generating values).

taskflow.utils.iter_utils.unique_seen(its, seen_selector=None)[source]

Yields unique values from iterator(s) (and retains order).

taskflow.utils.iter_utils.find_first_match(it, matcher, not_found_value=None)[source]

Searches iterator for first value that matcher callback returns true.

taskflow.utils.iter_utils.while_is_not(it, stop_value)[source]

Yields given values from iterator until stop value is passed.

This uses the is operator to determine equivalency (and not the == operator).

taskflow.utils.iter_utils.iter_forever(limit)[source]

Yields values from iterator until a limit is reached.

if limit is negative, we iterate forever.

Kazoo

taskflow.utils.kazoo_utils.prettify_failures(failures, limit=- 1)[source]

Prettifies a checked commits failures (ignores sensitive data…).

exception taskflow.utils.kazoo_utils.KazooTransactionException(message, failures)[source]

Bases: kazoo.exceptions.KazooException

Exception raised when a checked commit fails.

taskflow.utils.kazoo_utils.checked_commit(txn)[source]

Commits a kazoo transcation and validates the result.

NOTE(harlowja): Until https://github.com/python-zk/kazoo/pull/224 is fixed or a similar pull request is merged we have to workaround the transaction failing silently.

taskflow.utils.kazoo_utils.finalize_client(client)[source]

Stops and closes a client, even if it wasn’t started.

taskflow.utils.kazoo_utils.check_compatible(client, min_version=None, max_version=None)[source]

Checks if a kazoo client is backed by a zookeeper server version.

This check will verify that the zookeeper server version that the client is connected to satisfies a given minimum version (inclusive) and maximum (inclusive) version range. If the server is not in the provided version range then a exception is raised indiciating this.

taskflow.utils.kazoo_utils.make_client(conf)[source]

Creates a kazoo client given a configuration dictionary.

Parameters

conf (dict) – configuration dictionary that will be used to configure the created client

The keys that will be extracted are:

  • read_only: boolean that specifies whether to allow connections to read only servers, defaults to False

  • randomize_hosts: boolean that specifies whether to randomize host lists provided, defaults to False

  • command_retry: a kazoo retry object (or dict of options which will be used for creating one) that will be used for retrying commands that are executed

  • connection_retry: a kazoo retry object (or dict of options which will be used for creating one) that will be used for retrying connection failures that occur

  • hosts: a string, list, set (or dict with host keys) that will specify the hosts the kazoo client should be connected to, if none is provided then localhost:2181 will be used by default

  • timeout: a float value that specifies the default timeout that the kazoo client will use

  • handler: a kazoo handler object that can be used to provide the client with alternate async strategies (the default is thread based, but gevent, or eventlet ones can be provided as needed)

  • keyfile : SSL keyfile to use for authentication

  • keyfile_password: SSL keyfile password

  • certfile: SSL certfile to use for authentication

  • ca: SSL CA file to use for authentication

  • use_ssl: argument to control whether SSL is used or not

  • verify_certs: when using SSL, argument to bypass

    certs verification

Kombu

class taskflow.utils.kombu_utils.DelayedPretty(message)[source]

Bases: object

Wraps a message and delays prettifying it until requested.

TODO(harlowja): remove this when https://github.com/celery/kombu/pull/454/ is merged and a release is made that contains it (since that pull request is equivalent and/or better than this).

Miscellaneous

class taskflow.utils.misc.StrEnum(value)[source]

Bases: str, enum.Enum

An enumeration that is also a string and can be compared to strings.

class taskflow.utils.misc.StringIO(initial_value='', newline='\n')[source]

Bases: _io.StringIO

String buffer with some small additions.

class taskflow.utils.misc.BytesIO(initial_bytes=b'')[source]

Bases: _io.BytesIO

Byte buffer with some small additions.

taskflow.utils.misc.get_hostname(unknown_hostname='<unknown>')[source]

Gets the machines hostname; if not able to returns an invalid one.

taskflow.utils.misc.match_type(obj, matchers)[source]

Matches a given object using the given matchers list/iterable.

NOTE(harlowja): each element of the provided list/iterable must be tuple of (valid types, result).

Returns the result (the second element of the provided tuple) if a type match occurs, otherwise none if no matches are found.

taskflow.utils.misc.countdown_iter(start_at, decr=1)[source]

Generator that decrements after each generation until <= zero.

NOTE(harlowja): we can likely remove this when we can use an itertools.count that takes a step (on py2.6 which we still support that step parameter does not exist and therefore can’t be used).

taskflow.utils.misc.extract_driver_and_conf(conf, conf_key)[source]

Common function to get a driver name and its configuration.

taskflow.utils.misc.reverse_enumerate(items)[source]

Like reversed(enumerate(items)) but with less copying/cloning…

taskflow.utils.misc.merge_uri(uri, conf)[source]

Merges a parsed uri into the given configuration dictionary.

Merges the username, password, hostname, port, and query parameters of a URI into the given configuration dictionary (it does not overwrite existing configuration keys if they already exist) and returns the merged configuration.

NOTE(harlowja): does not merge the path, scheme or fragment.

taskflow.utils.misc.find_subclasses(locations, base_cls, exclude_hidden=True)[source]

Finds subclass types in the given locations.

This will examines the given locations for types which are subclasses of the base class type provided and returns the found subclasses (or fails with exceptions if this introspection can not be accomplished).

If a string is provided as one of the locations it will be imported and examined if it is a subclass of the base class. If a module is given, all of its members will be examined for attributes which are subclasses of the base class. If a type itself is given it will be examined for being a subclass of the base class.

taskflow.utils.misc.pick_first_not_none(*values)[source]

Returns first of values that is not None (or None if all are/were).

taskflow.utils.misc.parse_uri(uri)[source]

Parses a uri into its components.

taskflow.utils.misc.disallow_when_frozen(excp_cls)[source]

Frozen checking/raising method decorator.

taskflow.utils.misc.clamp(value, minimum, maximum, on_clamped=None)[source]

Clamps a value to ensure its >= minimum and <= maximum.

taskflow.utils.misc.fix_newlines(text, replacement='\n')[source]

Fixes text that may end with wrong nl by replacing with right nl.

taskflow.utils.misc.binary_encode(text, encoding='utf-8', errors='strict')[source]

Encodes a text string into a binary string using given encoding.

Does nothing if data is already a binary string (raises on unknown types).

taskflow.utils.misc.binary_decode(data, encoding='utf-8', errors='strict')[source]

Decodes a binary string into a text string using given encoding.

Does nothing if data is already a text string (raises on unknown types).

taskflow.utils.misc.decode_msgpack(raw_data, root_types=(<class 'dict'>, ))[source]

Parse raw data to get decoded object.

Decodes a msgback encoded ‘blob’ from a given raw data binary string and checks that the root type of that decoded object is in the allowed set of types (by default a dict should be the root type).

taskflow.utils.misc.decode_json(raw_data, root_types=(<class 'dict'>, ))[source]

Parse raw data to get decoded object.

Decodes a JSON encoded ‘blob’ from a given raw data binary string and checks that the root type of that decoded object is in the allowed set of types (by default a dict should be the root type).

class taskflow.utils.misc.cachedproperty(fget=None, require_lock=True)[source]

Bases: object

A thread-safe descriptor property that is only evaluated once.

This caching descriptor can be placed on instance methods to translate those methods into properties that will be cached in the instance (avoiding repeated attribute checking logic to do the equivalent).

NOTE(harlowja): by default the property that will be saved will be under the decorated methods name prefixed with an underscore. For example if we were to attach this descriptor to an instance method ‘get_thing(self)’ the cached property would be stored under ‘_get_thing’ in the self object after the first call to ‘get_thing’ occurs.

taskflow.utils.misc.millis_to_datetime(milliseconds)[source]

Converts number of milliseconds (from epoch) into a datetime object.

taskflow.utils.misc.get_version_string(obj)[source]

Gets a object’s version as a string.

Returns string representation of object’s version taken from its ‘version’ attribute, or None if object does not have such attribute or its version is None.

taskflow.utils.misc.sequence_minus(seq1, seq2)[source]

Calculate difference of two sequences.

Result contains the elements from first sequence that are not present in second sequence, in original order. Works even if sequence elements are not hashable.

taskflow.utils.misc.as_int(obj, quiet=False)[source]

Converts an arbitrary value into a integer.

taskflow.utils.misc.capture_failure()[source]

Captures the occurring exception and provides a failure object back.

This will save the current exception information and yield back a failure object for the caller to use (it will raise a runtime error if no active exception is being handled).

This is useful since in some cases the exception context can be cleared, resulting in None being attempted to be saved after an exception handler is run. This can happen when eventlet switches greenthreads or when running an exception handler, code raises and catches an exception. In both cases the exception context will be cleared.

To work around this, we save the exception state, yield a failure and then run other code.

For example:

>>> from taskflow.utils import misc
>>>
>>> def cleanup():
...     pass
...
>>>
>>> def save_failure(f):
...     print("Saving %s" % f)
...
>>>
>>> try:
...     raise IOError("Broken")
... except Exception:
...     with misc.capture_failure() as fail:
...         print("Activating cleanup")
...         cleanup()
...         save_failure(fail)
...
Activating cleanup
Saving Failure: IOError: Broken
taskflow.utils.misc.is_iterable(obj)[source]

Tests an object to to determine whether it is iterable.

This function will test the specified object to determine whether it is iterable. String types (both str and unicode) are ignored and will return False.

Parameters

obj – object to be tested for iterable

Returns

True if object is iterable and is not a string

taskflow.utils.misc.safe_copy_dict(obj)[source]

Copy an existing dictionary or default to empty dict…

This will return a empty dict if given object is falsey, otherwise it will create a dict of the given object (which if provided a dictionary object will make a shallow copy of that object).

Persistence

taskflow.utils.persistence_utils.temporary_log_book(backend=None)[source]

Creates a temporary logbook for temporary usage in the given backend.

Mainly useful for tests and other use cases where a temporary logbook is needed for a short-period of time.

taskflow.utils.persistence_utils.temporary_flow_detail(backend=None, meta=None)[source]

Creates a temporary flow detail and logbook in the given backend.

Mainly useful for tests and other use cases where a temporary flow detail and a temporary logbook is needed for a short-period of time.

taskflow.utils.persistence_utils.create_flow_detail(flow, book=None, backend=None, meta=None)[source]

Creates a flow detail for a flow & adds & saves it in a logbook.

This will create a flow detail for the given flow using the flow name, and add it to the provided logbook and then uses the given backend to save the logbook and then returns the created flow detail.

If no book is provided a temporary one will be created automatically (no reference to the logbook will be returned, so this should nearly always be provided or only used in situations where no logbook is needed, for example in tests). If no backend is provided then no saving will occur and the created flow detail will not be persisted even if the flow detail was added to a given (or temporarily generated) logbook.

Redis

class taskflow.utils.redis_utils.RedisClient(*args, **kwargs)[source]

Bases: redis.client.Redis

A redis client that can be closed (and raises on-usage after closed).

TODO(harlowja): if https://github.com/andymccurdy/redis-py/issues/613 ever gets resolved or merged or other then we can likely remove this.

execute_command(*args, **options)

Execute a command and return a parsed response

transaction(func, *watches, **kwargs)

Convenience method for executing the callable func as a transaction while watching all keys specified in watches. The ‘func’ callable should expect a single argument which is a Pipeline object.

pubsub(**kwargs)

Return a Publish/Subscribe object. With this object, you can subscribe to channels and listen for messages that get published to them.

class taskflow.utils.redis_utils.UnknownExpire(value)[source]

Bases: enum.IntEnum

Non-expiry (not ttls) results return from get_expiry().

See: http://redis.io/commands/ttl or http://redis.io/commands/pttl

DOES_NOT_EXPIRE = -1

The command returns -1 if the key exists but has no associated expire.

KEY_NOT_FOUND = -2

The command returns -2 if the key does not exist.

taskflow.utils.redis_utils.get_expiry(client, key, prior_version=None)[source]

Gets an expiry for a key (using best determined ttl method).

taskflow.utils.redis_utils.apply_expiry(client, key, expiry, prior_version=None)[source]

Applies an expiry to a key (using best determined expiry method).

taskflow.utils.redis_utils.is_server_new_enough(client, min_version, default=False, prior_version=None)[source]

Checks if a client is attached to a new enough redis server.

Schema

taskflow.utils.schema_utils.schema_validate(data, schema)[source]

Validates given data using provided json schema.

Threading

taskflow.utils.threading_utils.is_alive(thread)[source]

Helper to determine if a thread is alive (handles none safely).

taskflow.utils.threading_utils.get_ident()[source]

Return the ‘thread identifier’ of the current thread.

taskflow.utils.threading_utils.get_optimal_thread_count(default=2)[source]

Try to guess optimal thread count for current system.

taskflow.utils.threading_utils.daemon_thread(target, *args, **kwargs)[source]

Makes a daemon thread that calls the given target when started.

taskflow.utils.threading_utils.no_op(*args, **kwargs)[source]

Function that does nothing.

class taskflow.utils.threading_utils.ThreadBundle[source]

Bases: object

A group/bundle of threads that start/stop together.

bind(thread_factory, before_start=None, after_start=None, before_join=None, after_join=None)[source]

Adds a thread (to-be) into this bundle (with given callbacks).

NOTE(harlowja): callbacks provided should not attempt to call

mutating methods (stop(), start(), bind() …) on this object as that will result in dead-lock since the lock on this object is not meant to be (and is not) reentrant…

start()[source]

Creates & starts all associated threads (that are not running).

stop()[source]

Stops & joins all associated threads (that have been started).