Source code for docarray.array.mixins.parallel

import sys
from contextlib import nullcontext
from math import ceil
from types import LambdaType
from typing import (
    Callable,
    TYPE_CHECKING,
    Generator,
    Optional,
    overload,
    TypeVar,
    Union,
)

if TYPE_CHECKING:  # pragma: no cover
    from docarray.typing import T
    from docarray import Document, DocumentArray
    from multiprocessing.pool import ThreadPool, Pool


T_DA = TypeVar('T_DA')


[docs]class ParallelMixin: """Helper functions that provide parallel map to :class:`DocumentArray`""" # overload_inject_start_apply @overload def apply( self: 'T', func: Callable[['Document'], 'Document'], backend: str = 'thread', num_worker: Optional[int] = None, show_progress: bool = False, pool: Optional[Union['Pool', 'ThreadPool']] = None, ) -> 'T': """Apply ``func`` to every Document in itself, return itself after modification. :param func: a function that takes :class:`Document` as input and outputs :class:`Document`. :param backend: `thread` for multi-threading and `process` for multi-processing. Defaults to `thread`. In general, if your ``func`` is IO-bound then `thread` is a good choice. If your ``func`` is CPU-bound, then you may use `process`. In practice, you should try yourselves to figure out the best value. However, if you wish to modify the elements in-place, regardless of IO/CPU-bound, you should always use `thread` backend. .. warning:: When using `process` backend, you should not expect ``func`` modify elements in-place. This is because the multiprocessing backend passes the variable via pickle and works in another process. The passed object and the original object do **not** share the same memory. :param num_worker: the number of parallel workers. If not given, then the number of CPUs in the system will be used. :param pool: use an existing/external process or thread pool. If given, `backend` is ignored and you will be responsible for closing the pool. :param show_progress: show a progress bar """ # overload_inject_end_apply
[docs] def apply(self: 'T', *args, **kwargs) -> 'T': # implementation_stub_inject_start_apply """Apply ``func`` to every Document in itself, return itself after modification. :param func: a function that takes :class:`Document` as input and outputs :class:`Document`. :param backend: `thread` for multi-threading and `process` for multi-processing. Defaults to `thread`. In general, if your ``func`` is IO-bound then `thread` is a good choice. If your ``func`` is CPU-bound, then you may use `process`. In practice, you should try yourselves to figure out the best value. However, if you wish to modify the elements in-place, regardless of IO/CPU-bound, you should always use `thread` backend. .. warning:: When using `process` backend, you should not expect ``func`` modify elements in-place. This is because the multiprocessing backend passes the variable via pickle and works in another process. The passed object and the original object do **not** share the same memory. :param num_worker: the number of parallel workers. If not given, then the number of CPUs in the system will be used. :param pool: use an existing/external process or thread pool. If given, `backend` is ignored and you will be responsible for closing the pool. :param show_progress: show a progress bar :return: itself after modification .. # noqa: DAR102 .. # noqa: DAR202 .. # noqa: DAR101 .. # noqa: DAR003 """ # implementation_stub_inject_end_apply for doc in self.map(*args, **kwargs): self[doc.id] = doc return self
[docs] def map( self, func: Callable[['Document'], 'T'], backend: str = 'thread', num_worker: Optional[int] = None, show_progress: bool = False, pool: Optional[Union['Pool', 'ThreadPool']] = None, ) -> Generator['T', None, None]: """Return an iterator that applies function to every **element** of iterable in parallel, yielding the results. .. seealso:: - To process on a batch of elements, please use :meth:`.map_batch`; - To return a :class:`DocumentArray`, please use :meth:`.apply`. :param func: a function that takes :class:`Document` as input and outputs anything. You can either modify elements in-place (only with `thread` backend) or work later on return elements. :param backend: `thread` for multi-threading and `process` for multi-processing. Defaults to `thread`. In general, if your ``func`` is IO-bound then `thread` is a good choice. If your ``func`` is CPU-bound, then you may use `process`. In practice, you should try yourselves to figure out the best value. However, if you wish to modify the elements in-place, regardless of IO/CPU-bound, you should always use `thread` backend. .. warning:: When using `process` backend, you should not expect ``func`` modify elements in-place. This is because the multiprocessing backing passes the variable via pickle and works in another process. The passed object and the original object do **not** share the same memory. :param num_worker: the number of parallel workers. If not given, then the number of CPUs in the system will be used. :param show_progress: show a progress bar :param pool: use an existing/external process or thread pool. If given, `backend` is ignored and you will be responsible for closing the pool. :yield: anything return from ``func`` """ if _is_lambda_or_partial_or_local_function(func) and backend == 'process': func = _globalize_lambda_function(func) from rich.progress import track if pool: p = pool ctx_p = nullcontext() else: p = _get_pool(backend, num_worker) ctx_p = p with ctx_p: for x in track( p.imap(func, self), total=len(self), disable=not show_progress ): yield x
# overload_inject_start_apply_batch @overload def apply_batch( self: 'T', func: Callable[['DocumentArray'], 'DocumentArray'], batch_size: int, backend: str = 'thread', num_worker: Optional[int] = None, shuffle: bool = False, show_progress: bool = False, pool: Optional[Union['Pool', 'ThreadPool']] = None, ) -> 'T': """Batches itself into mini-batches, applies `func` to every mini-batch, and return itself after the modifications. EXAMPLE USAGE .. code-block:: python from docarray import Document, DocumentArray da = DocumentArray([Document(text='The cake is a lie') for _ in range(100)]) def func(doc): da.texts = [t.upper() for t in da.texts] return da da.apply_batch(func, batch_size=10) print(da.texts[:3]) .. code-block:: text ['THE CAKE IS A LIE', 'THE CAKE IS A LIE', 'THE CAKE IS A LIE'] :param func: a function that takes :class:`DocumentArray` as input and outputs :class:`DocumentArray`. :param backend: `thread` for multi-threading and `process` for multi-processing. Defaults to `thread`. In general, if your ``func`` is IO-bound then `thread` is a good choice. If your ``func`` is CPU-bound, then you may use `process`. In practice, you should try yourselves to figure out the best value. However, if you wish to modify the elements in-place, regardless of IO/CPU-bound, you should always use `thread` backend. .. warning:: When using `process` backend, you should not expect ``func`` modify elements in-place. This is because the multiprocessing backend passes the variable via pickle and works in another process. The passed object and the original object do **not** share the same memory. :param num_worker: the number of parallel workers. If not given, then the number of CPUs in the system will be used. :param batch_size: Size of each generated batch (except the last batch, which might be smaller). Default: 32 :param shuffle: If set, shuffle the Documents before dividing into minibatches. :param show_progress: show a progress bar :param pool: use an existing/external process or thread pool. If given, `backend` is ignored and you will be responsible for closing the pool. """ # overload_inject_end_apply_batch
[docs] def apply_batch(self: 'T', *args, **kwargs) -> 'T': # implementation_stub_inject_start_apply_batch """Batches itself into mini-batches, applies `func` to every mini-batch, and return itself after the modifications. EXAMPLE USAGE .. code-block:: python from docarray import Document, DocumentArray da = DocumentArray([Document(text='The cake is a lie') for _ in range(100)]) def func(doc): da.texts = [t.upper() for t in da.texts] return da da.apply_batch(func, batch_size=10) print(da.texts[:3]) .. code-block:: text ['THE CAKE IS A LIE', 'THE CAKE IS A LIE', 'THE CAKE IS A LIE'] :param func: a function that takes :class:`DocumentArray` as input and outputs :class:`DocumentArray`. :param backend: `thread` for multi-threading and `process` for multi-processing. Defaults to `thread`. In general, if your ``func`` is IO-bound then `thread` is a good choice. If your ``func`` is CPU-bound, then you may use `process`. In practice, you should try yourselves to figure out the best value. However, if you wish to modify the elements in-place, regardless of IO/CPU-bound, you should always use `thread` backend. .. warning:: When using `process` backend, you should not expect ``func`` modify elements in-place. This is because the multiprocessing backend passes the variable via pickle and works in another process. The passed object and the original object do **not** share the same memory. :param num_worker: the number of parallel workers. If not given, then the number of CPUs in the system will be used. :param batch_size: Size of each generated batch (except the last batch, which might be smaller). Default: 32 :param shuffle: If set, shuffle the Documents before dividing into minibatches. :param show_progress: show a progress bar :param pool: use an existing/external process or thread pool. If given, `backend` is ignored and you will be responsible for closing the pool. :return: itself after modification .. # noqa: DAR102 .. # noqa: DAR202 .. # noqa: DAR101 .. # noqa: DAR003 """ # implementation_stub_inject_end_apply_batch for _b in self.map_batch(*args, **kwargs): self[[doc.id for doc in _b]] = _b return self
[docs] def map_batch( self: 'T_DA', func: Callable[['DocumentArray'], 'T'], batch_size: int, backend: str = 'thread', num_worker: Optional[int] = None, shuffle: bool = False, show_progress: bool = False, pool: Optional[Union['Pool', 'ThreadPool']] = None, ) -> Generator['T', None, None]: """Return an iterator that applies function to every **minibatch** of iterable in parallel, yielding the results. Each element in the returned iterator is :class:`DocumentArray`. .. seealso:: - To process single element, please use :meth:`.map`; - To return :class:`DocumentArray`, please use :meth:`.apply_batch`. :param batch_size: Size of each generated batch (except the last one, which might be smaller, default: 32) :param shuffle: If set, shuffle the Documents before dividing into minibatches. :param func: a function that takes :class:`DocumentArray` as input and outputs anything. You can either modify elements in-place (only with `thread` backend) or work later on return elements. :param backend: if to use multi-`process` or multi-`thread` as the parallelization backend. In general, if your ``func`` is IO-bound then perhaps `thread` is good enough. If your ``func`` is CPU-bound then you may use `process`. In practice, you should try yourselves to figure out the best value. However, if you wish to modify the elements in-place, regardless of IO/CPU-bound, you should always use `thread` backend. .. warning:: When using `process` backend, you should not expect ``func`` modify elements in-place. This is because the multiprocessing backing pass the variable via pickle and work in another process. The passed object and the original object do **not** share the same memory. :param num_worker: the number of parallel workers. If not given, then the number of CPUs in the system will be used. :param show_progress: show a progress bar :param pool: use an existing/external pool. If given, `backend` is ignored and you will be responsible for closing the pool. :yield: anything return from ``func`` """ if _is_lambda_or_partial_or_local_function(func) and backend == 'process': func = _globalize_lambda_function(func) from rich.progress import track if pool: p = pool ctx_p = nullcontext() else: p = _get_pool(backend, num_worker) ctx_p = p with ctx_p: for x in track( p.imap(func, self.batch(batch_size=batch_size, shuffle=shuffle)), total=ceil(len(self) / batch_size), disable=not show_progress, ): yield x
def _get_pool(backend, num_worker): if backend == 'thread': from multiprocessing.pool import ThreadPool as Pool return Pool(processes=num_worker) elif backend == 'process': from multiprocessing.pool import Pool return Pool(processes=num_worker) else: raise ValueError( f'`backend` must be either `process` or `thread`, receiving {backend}' ) def _is_lambda_or_partial_or_local_function(func): return ( (isinstance(func, LambdaType) and func.__name__ == '<lambda>') or not hasattr(func, '__qualname__') or ('<locals>' in func.__qualname__) ) def _globalize_lambda_function(func): def result(*args, **kwargs): return func(*args, **kwargs) from docarray.helper import random_identity result.__name__ = result.__qualname__ = random_identity() setattr(sys.modules[result.__module__], result.__name__, result) return result