Skip to content

Utilities#

from great_ai.utilities import *

NLP tools#

Well-tested tools that can be used in production with confidence. The toolbox of feature-extraction functions is expected to grow to cover other domains as well.

clean(text, ignore_xml=False, ignore_latex=False, remove_brackets=False, convert_to_ascii=False) #

Clean all XML, LaTeX, PDF-extraction, and Unicode artifacts from the text.

The cleaning is quite heavy-weight and can be destructive. However, when working with text, this is usually required to achieve sufficient cleanliness before further processing.

Optionally, the text can be turned into ASCII. Carefully consider whether this is absolutely needed for your use-case.

Examples:

>>> clean('<h2 color="red">Bj\\"{o}rn is \t \\textit{happy} 🙂 &lt;3</h2>')
'Björn is happy 🙂 <3'
>>> clean(
...    '<h2 color="red">Bj\\"{o}rn    is \t \\textit{happy} 🙂 &lt;3</h2>',
...    convert_to_ascii=True
... )
'Bjorn is happy <3'
>>> clean(
... '<h2 color="red">Bj\\"{o}rn       is \t \\textit{happy} 🙂 &lt;3</h2>',
... ignore_xml=True
... )
'<h2 color="red">Björn is happy 🙂 lt;3</h2>'

Parameters:

Name Type Description Default
text str

Text to be cleaned.

required
ignore_xml bool

Do not process/remove XML-tags.

False
ignore_latex bool

Do not process/remove LaTeX-tags.

False
remove_brackets bool

Do not remove brackets ([])

False
convert_to_ascii bool

Strip (or convert) non-ascii characters.

False

Returns:

Type Description
str

The cleaned input text with sensibly collapsed whitespace and optionally no markup.

Source code in great_ai/utilities/clean.py
def clean(
    text: str,
    ignore_xml: bool = False,
    ignore_latex: bool = False,
    remove_brackets: bool = False,
    convert_to_ascii: bool = False,
) -> str:
    """Clean all XML, LaTeX, PDF-extraction, and Unicode artifacts from the text.

    The cleaning is quite heavy-weight and can be destructive. However, when working
    with text, this is usually required to achieve sufficient cleanliness before further
    processing.

    Optionally, the text can be turned into ASCII. Carefully consider whether this is
    absolutely needed for your use-case.

    Examples:
        >>> clean('<h2 color="red">Bj\\\\"{o}rn is \\t \\\\textit{happy} 🙂 &lt;3</h2>')
        'Björn is happy 🙂 <3'

        >>> clean(
        ...    '<h2 color="red">Bj\\\\"{o}rn    is \\t \\\\textit{happy} 🙂 &lt;3</h2>',
        ...    convert_to_ascii=True
        ... )
        'Bjorn is happy <3'

        >>> clean(
        ... '<h2 color="red">Bj\\\\"{o}rn       is \\t \\\\textit{happy} 🙂 &lt;3</h2>',
        ... ignore_xml=True
        ... )
        '<h2 color="red">Björn is happy 🙂 lt;3</h2>'

    Args:
        text: Text to be cleaned.
        ignore_xml: Do not process/remove XML-tags.
        ignore_latex: Do not process/remove LaTeX-tags.
        remove_brackets: Do not remove brackets ([])
        convert_to_ascii: Strip (or convert) non-ascii characters.

    Returns:
        The cleaned input text with sensibly collapsed whitespace and optionally no
            markup.
    """

    if not ignore_xml:
        text = re.sub(r"<[^>]*>", " ", text)
        text = html.unescape(text)

    if not ignore_latex:
        text = text.replace("%", "\\%")  # escape LaTeX comments before parsing as LaTeX

        try:
            text = latex.latex_to_text(text, tolerant_parsing=True, strict_braces=False)
            text = text.replace("%s", " ")
        except:
            logger.exception("Latex parsing error")

    if convert_to_ascii:
        text = unicodedata.normalize("NFKD", text)

        try:
            text.encode("ASCII", errors="strict")
        except UnicodeEncodeError:
            text = "".join([c for c in text if not unicodedata.combining(c)])
            text = unidecode.unidecode(text)

    text = re.sub(
        r"\b[a-zA-Z](?:[\t ]+[a-zA-Z]\b)+", lambda m: re.sub(r"[\t ]", "", m[0]), text
    )  # A R T I C L E => ARTICLE

    if remove_brackets:
        text = re.sub(r"\[[^\]]*\]", " ", text)

    # fix hypens: break- word => break-word
    text = re.sub(r"(\S)-\s+", r"\1-", text)
    text = re.sub(r"\s+-(\S)", r"-\1", text)

    # collapse whitespace
    text = re.sub(r"\s+", " ", text)

    # fix punctuation
    text = re.sub(rf" ([{joined_left_punctuations}])", r"\1", text)
    text = re.sub(rf"([{joined_right_punctuations}]) ", r"\1", text)

    text = text.strip()

    return text

get_sentences(text, ignore_partial=False, true_case=False, remove_punctuation=False) #

Return the list of sentences found in the input text.

Use syntok to segment the sentences. Further processing can be enabled with optional arguments.

Examples:

>>> get_sentences('This is a sentence. This is a half')
['This is a sentence.', 'This is a half']
>>> get_sentences('This is a sentence. This is a half', ignore_partial=True)
['This is a sentence.']
>>> get_sentences('I like Apple.', true_case=True, remove_punctuation=True)
['i like Apple']

Parameters:

Name Type Description Default
text str

Text to be segmented into sentences.

required
ignore_partial bool

Filter out sentences that are not capitalised/don't end with a punctuation.

False
true_case bool

Crude method: lowercase the first word of each sentence.

False
remove_punctuation bool

Remove all kinds of punctuation.

False

Returns:

Type Description
List[str]

The found sentences (with partial sentences optionally filtered out).

Source code in great_ai/utilities/get_sentences.py
def get_sentences(
    text: str,
    ignore_partial: bool = False,
    true_case: bool = False,
    remove_punctuation: bool = False,
) -> List[str]:
    """Return the list of sentences found in the input text.

    Use [syntok](https://github.com/fnl/syntok) to segment the sentences. Further
    processing can be enabled with optional arguments.

    Examples:
        >>> get_sentences('This is a sentence. This is a half')
        ['This is a sentence.', 'This is a half']

        >>> get_sentences('This is a sentence. This is a half', ignore_partial=True)
        ['This is a sentence.']

        >>> get_sentences('I like Apple.', true_case=True, remove_punctuation=True)
        ['i like Apple']

    Args:
        text: Text to be segmented into sentences.
        ignore_partial: Filter out sentences that are not capitalised/don't end with a
            punctuation.
        true_case: Crude method: lowercase the first word of each sentence.
        remove_punctuation: Remove all kinds of punctuation.

    Returns:
        The found sentences (with partial sentences optionally filtered out).
    """

    tokenizer = Tokenizer(
        emit_hyphen_or_underscore_sep=True, replace_not_contraction=False
    )
    token_stream = tokenizer.tokenize(text)

    def process(sentence: str) -> str:
        if true_case:
            sentence = sentence[0].lower() + sentence[1:]  # very crude method
        if remove_punctuation:
            sentence = re.sub(punctuations_pattern, " ", sentence)
        return sentence.strip()

    sentences = [
        process(tokenizer.to_text(sentence)) for sentence in segment(token_stream)
    ]

    if ignore_partial:
        sentences = [
            sentence
            for sentence in sentences
            if sentence[0].isupper() and sentence[-1] in sentence_ending_punctuations
        ]

    return sentences

predict_language(text) #

Predict the language code from text.

A thin wrapper over langcodes for convenient language tagging.

Examples:

>>> predict_language('This is a sentence.')
'en'

Parameters:

Name Type Description Default
text Optional[str]

Text used for prediction.

required

Returns:

Type Description
str

The predicted language code (en, en-US) or und if a prediction could not be made.

Source code in great_ai/utilities/language/predict_language.py
def predict_language(text: Optional[str]) -> str:
    """Predict the language code from text.

    A thin wrapper over [langcodes](https://github.com/rspeer/langcodes) for convenient
    language tagging.

    Examples:
        >>> predict_language('This is a sentence.')
        'en'

    Args:
        text: Text used for prediction.

    Returns:
        The predicted language code (en, en-US) or `und` if a prediction could not be
            made.
    """

    if not text:
        return Language.make().to_tag()

    try:
        language_code = detect(text)
    except LangDetectException:
        return Language.make().to_tag()

    return Language.get(language_code).to_tag()

english_name_of_language(language_code) #

Human-friendly English name of language from its language_code.

A thin wrapper over langcodes for convenient language tagging.

Examples:

>>> english_name_of_language('en-US')
'English (United States)'
>>> english_name_of_language('und')
'Unknown language'

Parameters:

Name Type Description Default
language_code Optional[str]

Language code, for example, returned by great_ai.utilities.language.predict_language.predict_language.

required

Returns:

Type Description
str

English name of language.

Source code in great_ai/utilities/language/english_name_of_language.py
def english_name_of_language(language_code: Optional[str]) -> str:
    """Human-friendly English name of language from its `language_code`.

    A thin wrapper over [langcodes](https://github.com/rspeer/langcodes) for convenient
    language tagging.

    Examples:
        >>> english_name_of_language('en-US')
        'English (United States)'

        >>> english_name_of_language('und')
        'Unknown language'

    Args:
        language_code: Language code, for example, returned by
            [great_ai.utilities.language.predict_language.predict_language][].

    Returns:
        English name of language.
    """

    if not language_code:
        language_code = "und"

    return Language.get(language_code).display_name()

is_english(language_code) #

Decide whether the language_code is of an English language.

A thin wrapper over langcodes for convenient language tagging.

Examples:

>>> is_english('en-US')
True
>>> is_english(None)
False
>>> is_english('und')
False

Parameters:

Name Type Description Default
language_code Optional[str]

Language code, for example, returned by `great_ai.utilities.language.predict_language.predict_language.

required

Returns:

Type Description
bool

Boolean indicating whether it's English.

Source code in great_ai/utilities/language/is_english.py
def is_english(language_code: Optional[str]) -> bool:
    """Decide whether the `language_code` is of an English language.

    A thin wrapper over [langcodes](https://github.com/rspeer/langcodes) for convenient
    language tagging.

    Examples:
        >>> is_english('en-US')
        True

        >>> is_english(None)
        False

        >>> is_english('und')
        False

    Args:
        language_code: Language code, for example, returned by
            `[great_ai.utilities.language.predict_language.predict_language][].

    Returns:
        Boolean indicating whether it's English.
    """
    if not language_code:
        language_code = "und"

    language_code = standardize_tag(language_code)
    return tag_distance(language_code, "en") < 15

evaluate_ranking(expected, actual_scores, target_recall, title='', disable_interpolation=False, axes=None, output_svg=None, reverse_order=False, plot=True) #

Render the Precision-Recall curve of a ranking.

And improved version of scikit-learn's PR-curve

Parameters:

Name Type Description Default
expected List[T]

Expected ordering of the elements (rank if it's an integer, alphabetical if a string)

required
actual_scores List[float]

Actual ranking scores (need not be on the same scale as expected)

required
title Optional[str]

Title of the plot.

''
disable_interpolation bool

Do not interpolate.

False
axes Optional[Axes]

Matplotlib axes for plotting inside a subplot.

None
output_svg Optional[Path]

If specified, save the chart as an svg to the given Path.

None
reverse_order bool

Reverse the ranking specified by expected.

False
plot bool

Display a plot on the screen.

True

Returns:

Type Description
Dict[T, float]

Precision values at given recall.

Source code in great_ai/utilities/evaluate_ranking/evaluate_ranking.py
def evaluate_ranking(
    expected: List[T],
    actual_scores: List[float],
    target_recall: float,
    title: Optional[str] = "",
    disable_interpolation: bool = False,
    axes: Optional[plt.Axes] = None,
    output_svg: Optional[Path] = None,
    reverse_order: bool = False,
    plot: bool = True,
) -> Dict[T, float]:
    """Render the Precision-Recall curve of a ranking.

    And improved version of scikit-learn's [PR-curve](https://scikit-learn.org/stable/auto_examples/model_selection/plot_precision_recall.html#sphx-glr-auto-examples-model-selection-plot-precision-recall-py)

    Args:
        expected: Expected ordering of the elements
            (rank if it's an integer, alphabetical if a string)
        actual_scores: Actual ranking scores (need not be on the same scale as
            `expected`)
        title: Title of the plot.
        disable_interpolation: Do not interpolate.
        axes: Matplotlib axes for plotting inside a subplot.
        output_svg: If specified, save the chart as an svg to the given Path.
        reverse_order: Reverse the ranking specified by `expected`.
        plot: Display a plot on the screen.

    Returns:
        Precision values at given recall.
    """

    assert 0 <= target_recall <= 1

    if plot:
        if axes is None:
            fig = plt.figure(figsize=(10, 10))
            fig.patch.set_facecolor("white")
            ax = plt.axes()
        else:
            ax = axes

    classes = sorted(unique(expected), reverse=reverse_order)
    str_classes = [str(c) for c in classes]

    with matplotlib.rc_context({"font.size": 20}):
        if plot:
            ax.set_xmargin(0)

            draw_f1_iso_lines(axes=ax)

        results: Dict[T, float] = {}
        for i in range(len(classes) - 1):
            binarized_expected = [
                (v < classes[i]) if reverse_order else (v > classes[i])
                for v in expected
            ]

            sorted_expected_actual = sorted(
                zip(binarized_expected, actual_scores), key=lambda v: v[1], reverse=True
            )
            precision = []
            recall = []
            correct = 0
            for all, (e, score) in enumerate(sorted_expected_actual, start=1):
                correct += int(e)
                precision.append(correct / all)
                recall.append(all / len(sorted_expected_actual))

            if not disable_interpolation:
                for j in range(len(precision) - 2, -1, -1):
                    precision[j] = max(precision[j], precision[j + 1])

            closest_recall_index = np.argmin(np.abs(np.array(recall) - target_recall))
            precision_at_closest_recall = precision[closest_recall_index]
            average_precision = average_precision_score(
                binarized_expected, actual_scores
            )
            results[classes[i]] = precision_at_closest_recall

            if plot:
                ax.plot(
                    recall,
                    precision,
                    label=f"{'|'.join(str_classes[:i + 1])}{'|'.join(str_classes[i+1:])} (P@{target_recall:.2f}={precision_at_closest_recall:.2f}, AP={average_precision:.2f})",
                )

        if plot:
            ax.legend(loc="upper right")
            ax.axvline(x=target_recall, linestyle="--", color="#55c6bb", linewidth=2.0)

            if title is None:
                title = "Ranking evaluation"

            ax.set_title(f'{title} ({" < ".join(str_classes)})', pad=20)

            ax.set_xlabel("Recall")
            ax.set_ylabel("Precision")

            ax.set_xticks([target_recall] + sorted(ax.get_xticks()))

        if plot and output_svg is None:
            if axes is None:
                plt.show()
        elif output_svg:
            plt.savefig(output_svg, format="svg")

    return results

Parallel processing#

Multiprocessing and multithreading-based parallelism with support for async functions. Its main purpose is to implement great_ai.GreatAI.process_batch, however, the parallel processing functions are also convenient for covering other types of mapping needs with a friendlier API than joblib or multiprocess.

simple_parallel_map(func, input_values, *, chunk_size=None, concurrency=None) #

Execute a map operation on an list mimicking the API of the built-in map().

A thin-wrapper over parallel_map. For more options, consult the documentation of parallel_map.

Examples:

>>> import math
>>> list(simple_parallel_map(math.sqrt, [9, 4, 1]))
[3.0, 2.0, 1.0]

Parameters:

Name Type Description Default
func Callable[[T], Union[V, Awaitable[V]]]

The function that should be applied to each element of input_values. It can async, in that case, a new event loop is started for each chunk.

required
input_values Sequence[T]

An iterable of items that func is applied to.

required
chunk_size Optional[int]

Tune the number of items processed in each step. Larger numbers result in smaller communication overhead but less parallelism at the start and end. If chunk_size has a __len__ property, the chunk_size is calculated automatically if not given.

None
concurrency Optional[int]

Number of new processes to start. Shouldn't be too much more than the number of physical cores.

None

Returns:

Type Description
List[V]

An iterable of results obtained from applying func to each input value.

Raises:

Type Description
WorkerException

If there was an error in the func function in a background process.

Source code in great_ai/utilities/parallel_map/simple_parallel_map.py
def simple_parallel_map(
    func: Callable[[T], Union[V, Awaitable[V]]],
    input_values: Sequence[T],
    *,
    chunk_size: Optional[int] = None,
    concurrency: Optional[int] = None,
) -> List[V]:
    """Execute a map operation on an list mimicking the API of the built-in `map()`.

    A thin-wrapper over [parallel_map][great_ai.utilities.parallel_map.parallel_map.parallel_map].
    For more options, consult the documentation of
    [parallel_map][great_ai.utilities.parallel_map.parallel_map.parallel_map].

    Examples:
        >>> import math
        >>> list(simple_parallel_map(math.sqrt, [9, 4, 1]))
        [3.0, 2.0, 1.0]

    Args:
        func: The function that should be applied to each element of `input_values`.
            It can `async`, in that case, a new event loop is started for each chunk.
        input_values: An iterable of items that `func` is applied to.
        chunk_size: Tune the number of items processed in each step. Larger numbers
            result in smaller communication overhead but less parallelism at the start
            and end. If `chunk_size` has a `__len__` property, the `chunk_size` is
            calculated automatically if not given.
        concurrency: Number of new processes to start. Shouldn't be too much more than
            the number of physical cores.

    Returns:
        An iterable of results obtained from applying `func` to each input value.

    Raises:
        WorkerException: If there was an error in the `func` function in a background
            process.
    """

    input_values = list(input_values)  # in case the input is mistakenly not a sequence
    generator = parallel_map(
        func=func,
        input_values=input_values,
        chunk_size=chunk_size,
        concurrency=concurrency,
    )

    return list(
        tqdm(
            generator,
            total=len(input_values),
        )
    )

parallel_map(func, input_values, *, chunk_size=None, ignore_exceptions=False, concurrency=None, unordered=False) #

Execute a map operation on an iterable stream.

A custom parallel map operation supporting both synchronous and async map functions. The func function is serialised with dill. Exceptions encountered in the map function are sent to the host process where they are either raised (default) or ignored.

The new processes are forked if the OS allows it, otherwise, new Python processes are bootstrapped which can incur some start-up cost. Each process processes a single chunk at once.

Examples:

>>> import math
>>> list(parallel_map(math.sqrt, [9, 4, 1], concurrency=2))
[3.0, 2.0, 1.0]

Parameters:

Name Type Description Default
func Callable[[T], Union[V, Awaitable[V]]]

The function that should be applied to each element of input_values. It can async, in that case, a new event loop is started for each chunk.

required
input_values Union[Iterable[T], Sequence[T]]

An iterable of items that func is applied to.

required
chunk_size Optional[int]

Tune the number of items processed in each step. Larger numbers result in smaller communication overhead but less parallelism at the start and end. If chunk_size has a __len__ property, the chunk_size is calculated automatically if not given.

None
ignore_exceptions bool

Ignore chunks if next() raises an exception on input_values. And return None if func raised an exception in a worker process.

False
concurrency Optional[int]

Number of new processes to start. Shouldn't be too much more than the number of physical cores.

None
unordered bool

Do not preserve the order of the elements, yield them as soon as they have been processed. This decreases the latency caused by difficult-to-process items.

False

Yields:

Type Description
Iterable[Optional[V]]

The next result obtained from applying func to each input value. May contain None-s if ignore_exceptions=True. May have different order than the input if unordered=True.

Raises:

Type Description
WorkerException

If there was an error in the func function in a background process and ignore_exceptions=False.

Source code in great_ai/utilities/parallel_map/parallel_map.py
def parallel_map(
    func: Callable[[T], Union[V, Awaitable[V]]],
    input_values: Union[Iterable[T], Sequence[T]],
    *,
    chunk_size: Optional[int] = None,
    ignore_exceptions: bool = False,
    concurrency: Optional[int] = None,
    unordered: bool = False,
) -> Iterable[Optional[V]]:
    """Execute a map operation on an iterable stream.

    A custom parallel map operation supporting both synchronous and `async` map
    functions. The `func` function is serialised with `dill`. Exceptions encountered in
    the map function are sent to the host process where they are either raised (default)
    or ignored.

    The new processes are forked if the OS allows it, otherwise, new Python processes
    are bootstrapped which can incur some start-up cost. Each process processes a single
    chunk at once.

    Examples:
        >>> import math
        >>> list(parallel_map(math.sqrt, [9, 4, 1], concurrency=2))
        [3.0, 2.0, 1.0]

    Args:
        func: The function that should be applied to each element of `input_values`.
            It can `async`, in that case, a new event loop is started for each chunk.
        input_values: An iterable of items that `func` is applied to.
        chunk_size: Tune the number of items processed in each step. Larger numbers
            result in smaller communication overhead but less parallelism at the start
            and end. If `chunk_size` has a `__len__` property, the `chunk_size` is
            calculated automatically if not given.
        ignore_exceptions: Ignore chunks if `next()` raises an exception on
            `input_values`. And return `None` if `func` raised an exception in a worker
            process.
        concurrency: Number of new processes to start. Shouldn't be too much more than
            the number of physical cores.
        unordered: Do not preserve the order of the elements, yield them as soon as they
            have been processed. This decreases the latency caused by
            difficult-to-process items.

    Yields:
        The next result obtained from applying `func` to each input value. May
            contain `None`-s if `ignore_exceptions=True`. May have different order than
            the input if `unordered=True`.

    Raises:
        WorkerException: If there was an error in the `func` function in a background
            process and `ignore_exceptions=False`.
    """

    config = get_config(
        function=func,
        input_values=input_values,
        chunk_size=chunk_size,
        concurrency=concurrency,
    )

    ctx = (
        mp.get_context("fork")
        if "fork" in mp.get_all_start_methods()
        else mp.get_context("spawn")
    )
    ctx.freeze_support()
    manager = ctx.Manager()
    input_queue = manager.Queue(config.concurrency * 2)
    output_queue = manager.Queue(config.concurrency * 2)

    should_stop = ctx.Event()
    serialized_map_function = dill.dumps(func, byref=True, recurse=False)

    processes = [
        ctx.Process(  # type: ignore
            name=f"parallel_map_{config.function_name}_{i}",
            target=mapper_function,
            daemon=True,
            kwargs=dict(
                input_queue=input_queue,
                output_queue=output_queue,
                should_stop=should_stop,
                func=serialized_map_function,
            ),
        )
        for i in range(config.concurrency)
    ]

    for p in processes:
        p.start()

    try:
        yield from manage_communication(
            input_values=input_values,
            chunk_size=config.chunk_size,
            input_queue=input_queue,
            output_queue=output_queue,
            unordered=unordered,
            ignore_exceptions=ignore_exceptions,
        )
        should_stop.set()
    except WorkerException:
        should_stop.set()
        raise
    except Exception:
        for p in processes:
            p.terminate()
            p.kill()
        raise
    finally:
        for p in processes:
            p.join()  # terminated processes have to be joined else they remain zombies
            p.close()

        manager.shutdown()

threaded_parallel_map(func, input_values, *, chunk_size=None, ignore_exceptions=False, concurrency=None, unordered=False) #

Execute a map operation on an iterable stream.

Similar to parallel_map but uses threads instead of processes. Hence, it is not helpful in CPU-bound situations.

A custom parallel map operation supporting both synchronous and async map functions. Exceptions encountered in the map function are sent to the host thread where they are either raised (default) or ignored. Each process processes a single chunk at once.

Examples:

>>> list(threaded_parallel_map(lambda x: x ** 2, [1, 2, 3]))
[1, 4, 9]

Parameters:

Name Type Description Default
func Callable[[T], Union[V, Awaitable[V]]]

The function that should be applied to each element of input_values. It can async, in that case, a new event loop is started for each chunk.

required
input_values Union[Iterable[T], Sequence[T]]

An iterable of items that func is applied to.

required
chunk_size Optional[int]

Tune the number of items processed in each step. Larger numbers result in smaller communication overhead but less parallelism at the start and end. If chunk_size has a __len__ property, the chunk_size is calculated automatically if not given.

None
ignore_exceptions bool

Ignore chunks if next() raises an exception on input_values. And return None if func raised an exception in a worker process.

False
concurrency Optional[int]

Number of new threads to start.

None
unordered bool

Do not preserve the order of the elements, yield them as soon as they have been processed. This decreases the latency caused by difficult-to-process items.

False

Yields:

Type Description
Iterable[Optional[V]]

The next result obtained from applying func to each input value. May contain None-s if ignore_exceptions=True. May have different order than the input if unordered=True.

Raises:

Type Description
WorkerException

If there was an error in the func function in a background thread and ignore_exceptions=False.

Source code in great_ai/utilities/parallel_map/threaded_parallel_map.py
def threaded_parallel_map(
    func: Callable[[T], Union[V, Awaitable[V]]],
    input_values: Union[Iterable[T], Sequence[T]],
    *,
    chunk_size: Optional[int] = None,
    ignore_exceptions: bool = False,
    concurrency: Optional[int] = None,
    unordered: bool = False,
) -> Iterable[Optional[V]]:
    """Execute a map operation on an iterable stream.

    Similar to [parallel_map][great_ai.utilities.parallel_map.parallel_map.parallel_map]
    but uses threads instead of processes. Hence, it is not helpful in CPU-bound
    situations.

    A custom parallel map operation supporting both synchronous and `async` map
    functions. Exceptions encountered in the map function are sent to the host thread
    where they are either raised (default) or ignored. Each process processes a single
    chunk at once.

    Examples:
        >>> list(threaded_parallel_map(lambda x: x ** 2, [1, 2, 3]))
        [1, 4, 9]

    Args:
        func: The function that should be applied to each element of `input_values`.
            It can `async`, in that case, a new event loop is started for each chunk.
        input_values: An iterable of items that `func` is applied to.
        chunk_size: Tune the number of items processed in each step. Larger numbers
            result in smaller communication overhead but less parallelism at the start
            and end. If `chunk_size` has a `__len__` property, the `chunk_size` is
            calculated automatically if not given.
        ignore_exceptions: Ignore chunks if `next()` raises an exception on
            `input_values`. And return `None` if `func` raised an exception in a worker
            process.
        concurrency: Number of new threads to start.
        unordered: Do not preserve the order of the elements, yield them as soon as they
            have been processed. This decreases the latency caused by
            difficult-to-process items.

    Yields:
        The next result obtained from applying `func` to each input value. May
            contain `None`-s if `ignore_exceptions=True`. May have different order than
            the input if `unordered=True`.

    Raises:
        WorkerException: If there was an error in the `func` function in a background
            thread and `ignore_exceptions=False`.
    """

    config = get_config(
        function=func,
        input_values=input_values,
        chunk_size=chunk_size,
        concurrency=concurrency,
    )

    input_queue: queue.Queue = queue.Queue(config.concurrency * 2)
    output_queue: queue.Queue = queue.Queue(config.concurrency * 2)
    should_stop = threading.Event()

    threads = [
        threading.Thread(
            name=f"threaded_parallel_map_{config.function_name}_{i}",
            target=mapper_function,
            daemon=True,
            kwargs=dict(
                input_queue=input_queue,
                output_queue=output_queue,
                should_stop=should_stop,
                func=func,
            ),
        )
        for i in range(config.concurrency)
    ]

    for t in threads:
        t.start()

    yield from manage_communication(
        input_values=input_values,
        chunk_size=config.chunk_size,
        input_queue=input_queue,
        output_queue=output_queue,
        unordered=unordered,
        ignore_exceptions=ignore_exceptions,
    )
    should_stop.set()
    for t in threads:
        t.join(1)

Composable parallel processing#

Because both threaded_parallel_map and parallel_map have a streaming interface, it is easy to compose them and end up with, for example, a process for each CPU core with its own thread-pool or event-loop. Longer pipelines are also easy to imagine. The chunking methods help in these compositions.

chunk(values, chunk_size) #

Turn an iterable of items into an iterable of lists (chunks) of items.

Each returned chunk is of length chunk_size except the last one the length of which is between 1 and chunk_size.

Useful for parallel processing.

Examples:

>>> list(chunk(range(10), chunk_size=3))
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]

Parameters:

Name Type Description Default
values Iterable[T]

The stream of items to pack into chunks.

required
chunk_size int

Desired length of each (but the last) chunk.

required

Yields:

Type Description
Iterable[List[T]]

The next chunk.

Source code in great_ai/utilities/chunk.py
def chunk(values: Iterable[T], chunk_size: int) -> Iterable[List[T]]:
    """Turn an iterable of items into an iterable of lists (chunks) of items.

    Each returned chunk is of length `chunk_size` except the last one the length of
    which is between 1 and `chunk_size`.

    Useful for parallel processing.

    Examples:
        >>> list(chunk(range(10), chunk_size=3))
        [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]

    Args:
        values: The stream of items to pack into chunks.
        chunk_size: Desired length of each (but the last) chunk.

    Yields:
        The next chunk.
    """

    assert chunk_size >= 1

    result: List[T] = []
    for v in values:
        result.append(v)
        if len(result) == chunk_size:
            yield result
            result = []

    if len(result) > 0:
        yield result

unchunk(chunks) #

Turn a stream of chunks of items into a stream of items (flatten operation).

The inverse operation of chunk. Useful for parallel processing.

Similar to itertools.chain but ignores None chunks.

Examples:

>>> list(unchunk([[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]))
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Parameters:

Name Type Description Default
chunks Iterable[Optional[Iterable[T]]]

Stream of chunks to unpack.

required

Yields:

Type Description
Iterable[T]

The next item in the flattened iterable.

Source code in great_ai/utilities/unchunk.py
def unchunk(chunks: Iterable[Optional[Iterable[T]]]) -> Iterable[T]:
    """Turn a stream of chunks of items into a stream of items (flatten operation).

    The inverse operation of [chunk][great_ai.utilities.chunk.chunk].
    Useful for parallel processing.

    Similar to itertools.chain but ignores `None` chunks.

    Examples:
        >>> list(unchunk([[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]))
        [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

    Args:
        chunks: Stream of chunks to unpack.

    Yields:
        The next item in the flattened iterable.
    """

    for chunk in chunks:
        if chunk is not None:
            yield from chunk

Operations#

ConfigFile #

Bases: Mapping[str, str]

A small and safe INI-style configuration loader with dict and ENV support.

The values can be accessed using both dot- and index-notation. It is compatible with the dict interface.

File format example:

# comments are allowed everywhere

key = value  # you can leave or omit whitespace around the equal-sign
my_hashtag = "#great_ai" # the r-value can be quoted with " or ' or `.

my_var = my_default_value  # Default values can be given to env-vars,
                           # see next line. The default value must come first.

my_var = ENV:MY_ENV_VAR    # If the value starts with the `ENV:` prefix,
                           # it is looked up from the environment variables.

Examples:

>>> ConfigFile('tests/utilities/data/simple.conf')
ConfigFile(path=tests/utilities/data/simple.conf) {'zeroth_key': 'test', 'first_key': 'András'}
>>> ConfigFile('tests/utilities/data/simple.conf').zeroth_key
'test'
>>> ConfigFile('tests/utilities/data/simple.conf').second_key
Traceback (most recent call last):
...
KeyError: 'Key `second_key` is not found in configuration file ...
>>> a = ConfigFile('tests/utilities/data/simple.conf')
>>> {**a}
{'zeroth_key': 'test', 'first_key': 'András'}
Source code in great_ai/utilities/config_file/config_file.py
class ConfigFile(Mapping[str, str]):
    """A small and safe `INI`-style configuration loader with `dict` and `ENV` support.

    The values can be accessed using both dot- and index-notation. It is compatible
    with the `dict` interface.

    File format example:

    ```toml
    # comments are allowed everywhere

    key = value  # you can leave or omit whitespace around the equal-sign
    my_hashtag = "#great_ai" # the r-value can be quoted with " or ' or `.

    my_var = my_default_value  # Default values can be given to env-vars,
                               # see next line. The default value must come first.

    my_var = ENV:MY_ENV_VAR    # If the value starts with the `ENV:` prefix,
                               # it is looked up from the environment variables.
    ```

    Examples:
        >>> ConfigFile('tests/utilities/data/simple.conf')
        ConfigFile(path=tests/utilities/data/simple.conf) {'zeroth_key': 'test', 'first_key': 'András'}

        >>> ConfigFile('tests/utilities/data/simple.conf').zeroth_key
        'test'

        >>> ConfigFile('tests/utilities/data/simple.conf').second_key
        Traceback (most recent call last):
        ...
        KeyError: 'Key `second_key` is not found in configuration file ...

        >>> a = ConfigFile('tests/utilities/data/simple.conf')
        >>> {**a}
        {'zeroth_key': 'test', 'first_key': 'András'}

    """

    ENVIRONMENT_VARIABLE_KEY_PREFIX = "ENV"

    def __init__(self, path: Union[Path, str], *, ignore_missing: bool = False) -> None:
        """Load and parse a configuration file.

        Everything is eager-loaded, thus, exceptions may be thrown here.

        Args:
            path: Local path of the configuration file.
            ignore_missing: Don't raise an exception on missing environment variables.

        Raises:
            FileNotFoundError: If there is no file at the specified path.
            ParseError: If the provided file does not conform to the expected format.
            KeyError: If there is duplication in the keys.
            ValueError: If an environment variable is referenced but it is not set in
                the system and `ignore_missing=False`.
        """

        if not isinstance(path, Path):
            path = Path(path)

        if not path.exists():
            raise FileNotFoundError(path.absolute())

        self._ignore_missing = ignore_missing

        self._path = path
        self._key_values: Dict[str, str] = {}

        self._parse()

    @property
    def path(self) -> Path:
        """Original path from where the configuration was loaded."""
        return self._path

    def _parse(self) -> None:
        with open(self._path, encoding="utf-8") as f:
            lines: str = f.read()

        matches = pattern.findall(lines)
        for key, *values in matches:
            try:
                value = next(v for v in values if v)
            except StopIteration:
                raise ParseError(
                    f"""Cannot parse config file ({
                        self._path.absolute()
                        }), error at key `{key}`"""
                )

            already_exists = key in self._key_values
            if already_exists and not value.startswith(
                f"{self.ENVIRONMENT_VARIABLE_KEY_PREFIX}:"
            ):
                raise KeyError(
                    f"Key `{key}` has been already defined and its value is `{self._key_values[key]}`"
                )

            if value.startswith(f"{self.ENVIRONMENT_VARIABLE_KEY_PREFIX}:"):
                _, value = value.split(":")
                if value not in os.environ:
                    issue = f"""The value of `{key}` contains the "{
                        self.ENVIRONMENT_VARIABLE_KEY_PREFIX
                    }` prefix but `{value}` is not defined as an environment variable"""
                    if already_exists:
                        logger.warning(
                            f"""{issue}, using the default value defined above (`{
                                self._key_values[key]
                            }`)"""
                        )
                        continue
                    elif self._ignore_missing:
                        logger.warning(issue)
                    else:
                        raise ValueError(
                            f"{issue} and no default value has been provided"
                        )
                else:
                    value = os.environ[value]

            self._key_values[key] = value

    def __getattr__(self, key: str) -> str:
        if key in self._key_values:
            return self._key_values[key]
        raise KeyError(
            f"Key `{key}` is not found in configuration file ({self._path.absolute()})"
        )

    __getitem__ = __getattr__

    def __iter__(self) -> Iterator[str]:
        return iter(self._key_values)

    def __len__(self) -> int:
        return len(self._key_values)

    def keys(self) -> KeysView[str]:
        return self._key_values.keys()

    def values(self) -> ValuesView[str]:
        return self._key_values.values()

    def items(self) -> ItemsView[str, str]:
        return self._key_values.items()

    def __repr__(self) -> str:
        return f"{type(self).__name__}(path={self._path.as_posix()}) {self._key_values}"

path: Path property #

Original path from where the configuration was loaded.

__init__(path, *, ignore_missing=False) #

Load and parse a configuration file.

Everything is eager-loaded, thus, exceptions may be thrown here.

Parameters:

Name Type Description Default
path Union[Path, str]

Local path of the configuration file.

required
ignore_missing bool

Don't raise an exception on missing environment variables.

False

Raises:

Type Description
FileNotFoundError

If there is no file at the specified path.

ParseError

If the provided file does not conform to the expected format.

KeyError

If there is duplication in the keys.

ValueError

If an environment variable is referenced but it is not set in the system and ignore_missing=False.

Source code in great_ai/utilities/config_file/config_file.py
def __init__(self, path: Union[Path, str], *, ignore_missing: bool = False) -> None:
    """Load and parse a configuration file.

    Everything is eager-loaded, thus, exceptions may be thrown here.

    Args:
        path: Local path of the configuration file.
        ignore_missing: Don't raise an exception on missing environment variables.

    Raises:
        FileNotFoundError: If there is no file at the specified path.
        ParseError: If the provided file does not conform to the expected format.
        KeyError: If there is duplication in the keys.
        ValueError: If an environment variable is referenced but it is not set in
            the system and `ignore_missing=False`.
    """

    if not isinstance(path, Path):
        path = Path(path)

    if not path.exists():
        raise FileNotFoundError(path.absolute())

    self._ignore_missing = ignore_missing

    self._path = path
    self._key_values: Dict[str, str] = {}

    self._parse()

get_logger(name, level=logging.INFO, disable_colors=False) #

Return a customised logger used throughout the GreatAI codebase.

Uses colors, and only prints timestamps when not running inside notebook.

Source code in great_ai/utilities/logger/get_logger.py
def get_logger(
    name: str, level: int = logging.INFO, disable_colors: bool = False
) -> logging.Logger:
    """Return a customised logger used throughout the GreatAI codebase.

    Uses colors, and only prints timestamps when not running inside notebook.
    """

    if name not in loggers:
        logger = logging.getLogger(name)
        logger.setLevel(level)

        try:
            get_ipython()  # type: ignore
            log_format = "%(message)s"
        except NameError:
            # will fail outside of a notebook https://ipython.org/
            log_format = "%(asctime)s | %(levelname)8s | %(message)s"

        stdout_handler = logging.StreamHandler()
        stdout_handler.setLevel(level)
        if not disable_colors:
            stdout_handler.setFormatter(CustomFormatter(log_format))

        logger.addHandler(stdout_handler)
        loggers[name] = logger

    return loggers[name]

Last update: July 15, 2022