Parallelization#
See also
map()
: to parallel process Document by Document, return an interator of elements;map_batch()
: to parallel process minibatch DocumentArray, return an iterator of DocumentArray;apply()
: like.map()
, modify a DocumentArray inplace;apply_batch()
: like.map_batch()
, modify a DocumentArray inplace.
Working with large DocumentArrays in an element-wise manner can be time-consuming. The naive way is to run a for-loop and enumerate Documents one by one. DocArray provides map()
to speed up things a lot. It’s like Python’s
built-in map()
function but maps the function to every element of the DocumentArray in parallel. There is also map_batch()
that works on the minibatch level.
map()
returns an iterator of processed Documents. If you only modify elements in-place, and don’t need the return values, you can use apply()
instead:
from docarray import DocumentArray
da = DocumentArray(...)
da.apply(func)
This is often more popular than map()
in practice. However, map()
has its own charm as we shall see in the next section.
Let’s see an example, where we want to preprocess about 6,000 image Documents. First we fill the URI of each Document:
from docarray import DocumentArray
docs = DocumentArray.from_files('*.jpg') # 6016 image Documents with .uri set
Now let’s load and preprocess the Documents:
def foo(d):
return (
d.load_uri_to_image_tensor()
.set_image_tensor_normalization()
.set_image_tensor_channel_axis(-1, 0)
)
This loads the image from the file into the .tensor
does some normalization and sets the channel axis. Now, let’s see the time difference when we do things sequentially compared to using .apply()
:
for d in docs:
foo(d)
docs.apply(foo)
foo-loop ... foo-loop takes 11.5 seconds
apply ... apply takes 3.5 seconds
You can see a significant speedup with .apply()
.
By default, parallelization is conducted with thread
backend, i.e. multi-threading. It also supports process
backend by setting .apply(..., backend='process')
.
When to choose process or thread backend?
It depends on your func
in .apply(func)
. There are three options:
First, if you want
func
to modify elements inplace, then only use thethread
backend. With theprocess
backend you can only rely on the return values of.map()
– the modification that happens insidefunc
is lost.Second, follow what people often suggest: IO-bound
func
usesthread
, CPU-boundfunc
usesprocess
.Last, ignore everything above. Test it for yourself and use whatever’s faster.
Use map_batch()
to overlap CPU & GPU computation#
As I said, map()
/map_batch()
has its own charm: it returns an iterator (of a batch) where the partial result is immediately available, regardless of whether your function is still running. You can leverage this to speed up computation, especially when working with a CPU-GPU pipeline.
Let’s see an example: Say we have a DocumentArray with 1,024 Documents. Assuming we can run a CPU job for a 16-Document batch in 1 second per core; and we can run a GPU job for a 16-Document batch in 2 seconds per core. Say we have four CPU cores and one GPU core as the total resources.
Question: how long will it take to process 1,024 Documents?
import time
from docarray import DocumentArray
da = DocumentArray.empty(1024)
def cpu_job(da):
print(f'cpu on {len(da)} docs')
time.sleep(1)
return da
def gpu_job(da):
print(f'GPU on {len(da)} docs')
time.sleep(2)
Before jumping to the code, let’s first whiteboard it with simple math:
CPU time: 1024/16/4 * 1s = 16s
GPU time: 1024/16 * 2s = 128s
Total time: 16s + 128s = 144s
So 144s, right? Yes, if we implement with apply()
, it is around 144s.
However, we can do better. What if we overlap the computation of CPU and GPU? The whole procedure is GPU-bounded anyway. If we can ensure the GPU works on every batch right away when it’s ready from the CPU, rather than wait until all batches are ready from the CPU, then we can save a lot of time. To be precise, we could do it in 129s.
Why 129s? Why not 128s?
If you immediately know the answer, send your CV to us!
Because the very first batch must be done by the CPU first. This is inevitable, which makes the first second non-overlapping. The rest of the time will be overlapped and dominated by the GPU’s 128s. Hence, 1s + 128s = 129s.
Okay, let’s program these two ways and validate our guess:
da.apply_batch(cpu_job, batch_size=16, num_worker=4)
for b in da.batch(batch_size=16):
gpu_job(b)
for b in da.map_batch(cpu_job, batch_size=16, num_worker=4):
gpu_job(b)
Which gives you:
apply: 144.476s
map: 129.326s
Hopefully this sheds light on solving the data-draining/blocking problem when you use DocArray in a CPU-GPU pipeline.
Use map_batch()
to overlap CPU and network time#
Such a technique and mindset can be extended to other pipelines that have potential data-blocking issues. For example, in the implementation of push()
, you find code similar to below:
def gen():
yield _head
def _get_chunk(_batch):
return b''.join(
d._to_stream_bytes(protocol='protobuf', compress='gzip')
for d in _batch
), len(_batch)
for chunk, num_doc_in_chunk in self.map_batch(_get_chunk, batch_size=32):
yield chunk
yield _tail
response = requests.post(
'https://api.hubble.jina.ai/v2/rpc/artifact.upload',
data=gen(),
headers=headers,
)
This overlaps the time of sending network requests (IO-bounded) with the time of serializing DocumentArrays (CPU-bounded) and hence improves performance a lot.