Source code for taskflow.utils.iter_utils

# -*- coding: utf-8 -*-

#    Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import collections
import itertools

import six
from six.moves import range as compat_range


def _ensure_iterable(func):

    @six.wraps(func)
    def wrapper(it, *args, **kwargs):
        if not isinstance(it, collections.Iterable):
            raise ValueError("Iterable expected, but '%s' is not"
                             " iterable" % it)
        return func(it, *args, **kwargs)

    return wrapper


[docs]@_ensure_iterable def fill(it, desired_len, filler=None): """Iterates over a provided iterator up to the desired length. If the source iterator does not have enough values then the filler value is yielded until the desired length is reached. """ if desired_len > 0: count = 0 for value in it: yield value count += 1 if count >= desired_len: return while count < desired_len: yield filler count += 1
[docs]@_ensure_iterable def count(it): """Returns how many values in the iterator (depletes the iterator).""" return sum(1 for _value in it)
[docs]def generate_delays(delay, max_delay, multiplier=2): """Generator/iterator that provides back delays values. The values it generates increments by a given multiple after each iteration (using the max delay as a upper bound). Negative values will never be generated... and it will iterate forever (ie it will never stop generating values). """ if max_delay < 0: raise ValueError("Provided delay (max) must be greater" " than or equal to zero") if delay < 0: raise ValueError("Provided delay must start off greater" " than or equal to zero") if multiplier < 1.0: raise ValueError("Provided multiplier must be greater than" " or equal to 1.0") def _gen_it(): # NOTE(harlowja): Generation is delayed so that validation # can happen before generation/iteration... (instead of # during generation/iteration) curr_delay = delay while True: curr_delay = max(0, min(max_delay, curr_delay)) yield curr_delay curr_delay = curr_delay * multiplier return _gen_it()
[docs]def unique_seen(its, seen_selector=None): """Yields unique values from iterator(s) (and retains order).""" def _gen_it(all_its): # NOTE(harlowja): Generation is delayed so that validation # can happen before generation/iteration... (instead of # during generation/iteration) seen = set() for it in all_its: for value in it: if seen_selector is not None: maybe_seen_value = seen_selector(value) else: maybe_seen_value = value if maybe_seen_value not in seen: yield value seen.add(maybe_seen_value) all_its = list(its) for it in all_its: if not isinstance(it, collections.Iterable): raise ValueError("Iterable expected, but '%s' is" " not iterable" % it) return _gen_it(all_its)
[docs]@_ensure_iterable def find_first_match(it, matcher, not_found_value=None): """Searches iterator for first value that matcher callback returns true.""" for value in it: if matcher(value): return value return not_found_value
[docs]@_ensure_iterable def while_is_not(it, stop_value): """Yields given values from iterator until stop value is passed. This uses the ``is`` operator to determine equivalency (and not the ``==`` operator). """ for value in it: yield value if value is stop_value: break
[docs]def iter_forever(limit): """Yields values from iterator until a limit is reached. if limit is negative, we iterate forever. """ if limit < 0: i = itertools.count() while True: yield next(i) else: for i in compat_range(0, limit): yield i