Skip to content

Bytes

docarray.typing.bytes

AudioBytes

Bases: BaseBytes

Bytes that store an audio and that can be load into an Audio tensor

Source code in docarray/typing/bytes/audio_bytes.py
@_register_proto(proto_type_name='audio_bytes')
class AudioBytes(BaseBytes):
    """
    Bytes that store an audio and that can be load into an Audio tensor
    """

    def load(self) -> Tuple[AudioNdArray, int]:
        """
        Load the Audio from the [`AudioBytes`][docarray.typing.AudioBytes] into an
        [`AudioNdArray`][docarray.typing.AudioNdArray].

        ---

        ```python
        from typing import Optional
        from docarray import BaseDoc
        from docarray.typing import AudioBytes, AudioNdArray, AudioUrl


        class MyAudio(BaseDoc):
            url: AudioUrl
            tensor: Optional[AudioNdArray] = None
            bytes_: Optional[AudioBytes] = None
            frame_rate: Optional[float] = None


        doc = MyAudio(url='https://www.kozco.com/tech/piano2.wav')
        doc.bytes_ = doc.url.load_bytes()
        doc.tensor, doc.frame_rate = doc.bytes_.load()

        # Note this is equivalent to do

        doc.tensor, doc.frame_rate = doc.url.load()

        assert isinstance(doc.tensor, AudioNdArray)
        ```

        ---
        :return: tuple of an [`AudioNdArray`][docarray.typing.AudioNdArray] representing the
            audio bytes content, and an integer representing the frame rate.
        """
        pydub = import_library('pydub', raise_error=True)  # noqa: F841
        from pydub import AudioSegment

        segment = AudioSegment.from_file(io.BytesIO(self))

        # Convert to float32 using NumPy
        samples = np.array(segment.get_array_of_samples())

        # Normalise float32 array so that values are between -1.0 and +1.0
        samples_norm = samples / 2 ** (segment.sample_width * 8 - 1)
        return parse_obj_as(AudioNdArray, samples_norm), segment.frame_rate

load()

Load the Audio from the AudioBytes into an AudioNdArray.


from typing import Optional
from docarray import BaseDoc
from docarray.typing import AudioBytes, AudioNdArray, AudioUrl


class MyAudio(BaseDoc):
    url: AudioUrl
    tensor: Optional[AudioNdArray] = None
    bytes_: Optional[AudioBytes] = None
    frame_rate: Optional[float] = None


doc = MyAudio(url='https://www.kozco.com/tech/piano2.wav')
doc.bytes_ = doc.url.load_bytes()
doc.tensor, doc.frame_rate = doc.bytes_.load()

# Note this is equivalent to do

doc.tensor, doc.frame_rate = doc.url.load()

assert isinstance(doc.tensor, AudioNdArray)

Returns:

Type Description
Tuple[AudioNdArray, int]

tuple of an AudioNdArray representing the audio bytes content, and an integer representing the frame rate.

Source code in docarray/typing/bytes/audio_bytes.py
def load(self) -> Tuple[AudioNdArray, int]:
    """
    Load the Audio from the [`AudioBytes`][docarray.typing.AudioBytes] into an
    [`AudioNdArray`][docarray.typing.AudioNdArray].

    ---

    ```python
    from typing import Optional
    from docarray import BaseDoc
    from docarray.typing import AudioBytes, AudioNdArray, AudioUrl


    class MyAudio(BaseDoc):
        url: AudioUrl
        tensor: Optional[AudioNdArray] = None
        bytes_: Optional[AudioBytes] = None
        frame_rate: Optional[float] = None


    doc = MyAudio(url='https://www.kozco.com/tech/piano2.wav')
    doc.bytes_ = doc.url.load_bytes()
    doc.tensor, doc.frame_rate = doc.bytes_.load()

    # Note this is equivalent to do

    doc.tensor, doc.frame_rate = doc.url.load()

    assert isinstance(doc.tensor, AudioNdArray)
    ```

    ---
    :return: tuple of an [`AudioNdArray`][docarray.typing.AudioNdArray] representing the
        audio bytes content, and an integer representing the frame rate.
    """
    pydub = import_library('pydub', raise_error=True)  # noqa: F841
    from pydub import AudioSegment

    segment = AudioSegment.from_file(io.BytesIO(self))

    # Convert to float32 using NumPy
    samples = np.array(segment.get_array_of_samples())

    # Normalise float32 array so that values are between -1.0 and +1.0
    samples_norm = samples / 2 ** (segment.sample_width * 8 - 1)
    return parse_obj_as(AudioNdArray, samples_norm), segment.frame_rate

ImageBytes

Bases: BaseBytes

Bytes that store an image and that can be load into an image tensor

Source code in docarray/typing/bytes/image_bytes.py
@_register_proto(proto_type_name='image_bytes')
class ImageBytes(BaseBytes):
    """
    Bytes that store an image and that can be load into an image tensor
    """

    def load_pil(
        self,
    ) -> 'PILImage.Image':
        """
        Load the image from the bytes into a `PIL.Image.Image` instance

        ---

        ```python
        from pydantic import parse_obj_as

        from docarray import BaseDoc
        from docarray.typing import ImageUrl

        img_url = "https://upload.wikimedia.org/wikipedia/commons/8/80/Dag_Sebastian_Ahlander_at_G%C3%B6teborg_Book_Fair_2012b.jpg"

        img_url = parse_obj_as(ImageUrl, img_url)
        img = img_url.load_pil()

        from PIL.Image import Image

        assert isinstance(img, Image)
        ```

        ---
        :return: a Pillow image
        """
        PIL = import_library('PIL', raise_error=True)  # noqa: F841
        from PIL import Image as PILImage

        return PILImage.open(BytesIO(self))

    def load(
        self,
        width: Optional[int] = None,
        height: Optional[int] = None,
        axis_layout: Tuple[str, str, str] = ('H', 'W', 'C'),
    ) -> ImageNdArray:
        """
        Load the image from the [`ImageBytes`][docarray.typing.ImageBytes] into an
        [`ImageNdArray`][docarray.typing.ImageNdArray].

        ---

        ```python
        from docarray import BaseDoc
        from docarray.typing import ImageNdArray, ImageUrl


        class MyDoc(BaseDoc):
            img_url: ImageUrl


        doc = MyDoc(
            img_url="https://upload.wikimedia.org/wikipedia/commons/8/80/"
            "Dag_Sebastian_Ahlander_at_G%C3%B6teborg_Book_Fair_2012b.jpg"
        )

        img_tensor = doc.img_url.load()
        assert isinstance(img_tensor, ImageNdArray)

        img_tensor = doc.img_url.load(height=224, width=224)
        assert img_tensor.shape == (224, 224, 3)

        layout = ('C', 'W', 'H')
        img_tensor = doc.img_url.load(height=100, width=200, axis_layout=layout)
        assert img_tensor.shape == (3, 200, 100)
        ```

        ---

        :param width: width of the image tensor.
        :param height: height of the image tensor.
        :param axis_layout: ordering of the different image axes.
            'H' = height, 'W' = width, 'C' = color channel
        :return: [`ImageNdArray`][docarray.typing.ImageNdArray] representing the image as RGB values
        """
        raw_img = self.load_pil()

        if width or height:
            new_width = width or raw_img.width
            new_height = height or raw_img.height
            raw_img = raw_img.resize((new_width, new_height))
        try:
            tensor = np.array(raw_img.convert('RGB'))
        except Exception:
            tensor = np.array(raw_img)

        img = self._move_channel_axis(tensor, axis_layout=axis_layout)
        return parse_obj_as(ImageNdArray, img)

    @staticmethod
    def _move_channel_axis(
        tensor: np.ndarray, axis_layout: Tuple[str, str, str] = ('H', 'W', 'C')
    ) -> np.ndarray:
        """Moves channel axis around."""
        channel_to_offset = {'H': 0, 'W': 1, 'C': 2}
        permutation = tuple(channel_to_offset[axis] for axis in axis_layout)
        return np.transpose(tensor, permutation)

load(width=None, height=None, axis_layout=('H', 'W', 'C'))

Load the image from the ImageBytes into an ImageNdArray.


from docarray import BaseDoc
from docarray.typing import ImageNdArray, ImageUrl


class MyDoc(BaseDoc):
    img_url: ImageUrl


doc = MyDoc(
    img_url="https://upload.wikimedia.org/wikipedia/commons/8/80/"
    "Dag_Sebastian_Ahlander_at_G%C3%B6teborg_Book_Fair_2012b.jpg"
)

img_tensor = doc.img_url.load()
assert isinstance(img_tensor, ImageNdArray)

img_tensor = doc.img_url.load(height=224, width=224)
assert img_tensor.shape == (224, 224, 3)

layout = ('C', 'W', 'H')
img_tensor = doc.img_url.load(height=100, width=200, axis_layout=layout)
assert img_tensor.shape == (3, 200, 100)

Parameters:

Name Type Description Default
width Optional[int]

width of the image tensor.

None
height Optional[int]

height of the image tensor.

None
axis_layout Tuple[str, str, str]

ordering of the different image axes. 'H' = height, 'W' = width, 'C' = color channel

('H', 'W', 'C')

Returns:

Type Description
ImageNdArray

ImageNdArray representing the image as RGB values

Source code in docarray/typing/bytes/image_bytes.py
def load(
    self,
    width: Optional[int] = None,
    height: Optional[int] = None,
    axis_layout: Tuple[str, str, str] = ('H', 'W', 'C'),
) -> ImageNdArray:
    """
    Load the image from the [`ImageBytes`][docarray.typing.ImageBytes] into an
    [`ImageNdArray`][docarray.typing.ImageNdArray].

    ---

    ```python
    from docarray import BaseDoc
    from docarray.typing import ImageNdArray, ImageUrl


    class MyDoc(BaseDoc):
        img_url: ImageUrl


    doc = MyDoc(
        img_url="https://upload.wikimedia.org/wikipedia/commons/8/80/"
        "Dag_Sebastian_Ahlander_at_G%C3%B6teborg_Book_Fair_2012b.jpg"
    )

    img_tensor = doc.img_url.load()
    assert isinstance(img_tensor, ImageNdArray)

    img_tensor = doc.img_url.load(height=224, width=224)
    assert img_tensor.shape == (224, 224, 3)

    layout = ('C', 'W', 'H')
    img_tensor = doc.img_url.load(height=100, width=200, axis_layout=layout)
    assert img_tensor.shape == (3, 200, 100)
    ```

    ---

    :param width: width of the image tensor.
    :param height: height of the image tensor.
    :param axis_layout: ordering of the different image axes.
        'H' = height, 'W' = width, 'C' = color channel
    :return: [`ImageNdArray`][docarray.typing.ImageNdArray] representing the image as RGB values
    """
    raw_img = self.load_pil()

    if width or height:
        new_width = width or raw_img.width
        new_height = height or raw_img.height
        raw_img = raw_img.resize((new_width, new_height))
    try:
        tensor = np.array(raw_img.convert('RGB'))
    except Exception:
        tensor = np.array(raw_img)

    img = self._move_channel_axis(tensor, axis_layout=axis_layout)
    return parse_obj_as(ImageNdArray, img)

load_pil()

Load the image from the bytes into a PIL.Image.Image instance


from pydantic import parse_obj_as

from docarray import BaseDoc
from docarray.typing import ImageUrl

img_url = "https://upload.wikimedia.org/wikipedia/commons/8/80/Dag_Sebastian_Ahlander_at_G%C3%B6teborg_Book_Fair_2012b.jpg"

img_url = parse_obj_as(ImageUrl, img_url)
img = img_url.load_pil()

from PIL.Image import Image

assert isinstance(img, Image)

Returns:

Type Description
Image

a Pillow image

Source code in docarray/typing/bytes/image_bytes.py
def load_pil(
    self,
) -> 'PILImage.Image':
    """
    Load the image from the bytes into a `PIL.Image.Image` instance

    ---

    ```python
    from pydantic import parse_obj_as

    from docarray import BaseDoc
    from docarray.typing import ImageUrl

    img_url = "https://upload.wikimedia.org/wikipedia/commons/8/80/Dag_Sebastian_Ahlander_at_G%C3%B6teborg_Book_Fair_2012b.jpg"

    img_url = parse_obj_as(ImageUrl, img_url)
    img = img_url.load_pil()

    from PIL.Image import Image

    assert isinstance(img, Image)
    ```

    ---
    :return: a Pillow image
    """
    PIL = import_library('PIL', raise_error=True)  # noqa: F841
    from PIL import Image as PILImage

    return PILImage.open(BytesIO(self))

VideoBytes

Bases: BaseBytes

Bytes that store a video and that can be load into a video tensor

Source code in docarray/typing/bytes/video_bytes.py
@_register_proto(proto_type_name='video_bytes')
class VideoBytes(BaseBytes):
    """
    Bytes that store a video and that can be load into a video tensor
    """

    def load(self, **kwargs) -> VideoLoadResult:
        """
        Load the video from the bytes into a VideoLoadResult object consisting of:

        - a [`VideoNdArray`][docarray.typing.VideoNdArray] (`VideoLoadResult.video`)
        - an [`AudioNdArray`][docarray.typing.AudioNdArray] (`VideoLoadResult.audio`)
        - an [`NdArray`][docarray.typing.NdArray] containing the key frame indices (`VideoLoadResult.key_frame_indices`).

        ---

        ```python
        from docarray import BaseDoc
        from docarray.typing import AudioNdArray, NdArray, VideoNdArray, VideoUrl


        class MyDoc(BaseDoc):
            video_url: VideoUrl


        doc = MyDoc(
            video_url='https://github.com/docarray/docarray/blob/main/tests/toydata/mov_bbb.mp4?raw=true'
        )

        video, audio, key_frame_indices = doc.video_url.load()
        assert isinstance(video, VideoNdArray)
        assert isinstance(audio, AudioNdArray)
        assert isinstance(key_frame_indices, NdArray)
        ```

        ---


        :param kwargs: supports all keyword arguments that are being supported by
            av.open() as described [here](https://pyav.org/docs/stable/api/_globals.html?highlight=open#av.open)
        :return: a `VideoLoadResult` instance with video, audio and keyframe indices
        """
        if TYPE_CHECKING:
            import av
        else:
            av = import_library('av')

        with av.open(BytesIO(self), **kwargs) as container:
            audio_frames: List[np.ndarray] = []
            video_frames: List[np.ndarray] = []
            keyframe_indices: List[int] = []

            for frame in container.decode():
                if type(frame) == av.audio.frame.AudioFrame:
                    audio_frames.append(frame.to_ndarray())
                elif type(frame) == av.video.frame.VideoFrame:
                    if frame.key_frame == 1:
                        curr_index = len(video_frames)
                        keyframe_indices.append(curr_index)

                    video_frames.append(frame.to_ndarray(format='rgb24'))

        if len(audio_frames) == 0:
            audio = parse_obj_as(AudioNdArray, np.array(audio_frames))
        else:
            audio = parse_obj_as(AudioNdArray, np.stack(audio_frames))

        video = parse_obj_as(VideoNdArray, np.stack(video_frames))
        indices = parse_obj_as(NdArray, keyframe_indices)

        return VideoLoadResult(video=video, audio=audio, key_frame_indices=indices)

load(**kwargs)

Load the video from the bytes into a VideoLoadResult object consisting of:

  • a VideoNdArray (VideoLoadResult.video)
  • an AudioNdArray (VideoLoadResult.audio)
  • an NdArray containing the key frame indices (VideoLoadResult.key_frame_indices).

from docarray import BaseDoc
from docarray.typing import AudioNdArray, NdArray, VideoNdArray, VideoUrl


class MyDoc(BaseDoc):
    video_url: VideoUrl


doc = MyDoc(
    video_url='https://github.com/docarray/docarray/blob/main/tests/toydata/mov_bbb.mp4?raw=true'
)

video, audio, key_frame_indices = doc.video_url.load()
assert isinstance(video, VideoNdArray)
assert isinstance(audio, AudioNdArray)
assert isinstance(key_frame_indices, NdArray)

Parameters:

Name Type Description Default
kwargs

supports all keyword arguments that are being supported by av.open() as described here

{}

Returns:

Type Description
VideoLoadResult

a VideoLoadResult instance with video, audio and keyframe indices

Source code in docarray/typing/bytes/video_bytes.py
def load(self, **kwargs) -> VideoLoadResult:
    """
    Load the video from the bytes into a VideoLoadResult object consisting of:

    - a [`VideoNdArray`][docarray.typing.VideoNdArray] (`VideoLoadResult.video`)
    - an [`AudioNdArray`][docarray.typing.AudioNdArray] (`VideoLoadResult.audio`)
    - an [`NdArray`][docarray.typing.NdArray] containing the key frame indices (`VideoLoadResult.key_frame_indices`).

    ---

    ```python
    from docarray import BaseDoc
    from docarray.typing import AudioNdArray, NdArray, VideoNdArray, VideoUrl


    class MyDoc(BaseDoc):
        video_url: VideoUrl


    doc = MyDoc(
        video_url='https://github.com/docarray/docarray/blob/main/tests/toydata/mov_bbb.mp4?raw=true'
    )

    video, audio, key_frame_indices = doc.video_url.load()
    assert isinstance(video, VideoNdArray)
    assert isinstance(audio, AudioNdArray)
    assert isinstance(key_frame_indices, NdArray)
    ```

    ---


    :param kwargs: supports all keyword arguments that are being supported by
        av.open() as described [here](https://pyav.org/docs/stable/api/_globals.html?highlight=open#av.open)
    :return: a `VideoLoadResult` instance with video, audio and keyframe indices
    """
    if TYPE_CHECKING:
        import av
    else:
        av = import_library('av')

    with av.open(BytesIO(self), **kwargs) as container:
        audio_frames: List[np.ndarray] = []
        video_frames: List[np.ndarray] = []
        keyframe_indices: List[int] = []

        for frame in container.decode():
            if type(frame) == av.audio.frame.AudioFrame:
                audio_frames.append(frame.to_ndarray())
            elif type(frame) == av.video.frame.VideoFrame:
                if frame.key_frame == 1:
                    curr_index = len(video_frames)
                    keyframe_indices.append(curr_index)

                video_frames.append(frame.to_ndarray(format='rgb24'))

    if len(audio_frames) == 0:
        audio = parse_obj_as(AudioNdArray, np.array(audio_frames))
    else:
        audio = parse_obj_as(AudioNdArray, np.stack(audio_frames))

    video = parse_obj_as(VideoNdArray, np.stack(video_frames))
    indices = parse_obj_as(NdArray, keyframe_indices)

    return VideoLoadResult(video=video, audio=audio, key_frame_indices=indices)

audio_bytes

AudioBytes

Bases: BaseBytes

Bytes that store an audio and that can be load into an Audio tensor

Source code in docarray/typing/bytes/audio_bytes.py
@_register_proto(proto_type_name='audio_bytes')
class AudioBytes(BaseBytes):
    """
    Bytes that store an audio and that can be load into an Audio tensor
    """

    def load(self) -> Tuple[AudioNdArray, int]:
        """
        Load the Audio from the [`AudioBytes`][docarray.typing.AudioBytes] into an
        [`AudioNdArray`][docarray.typing.AudioNdArray].

        ---

        ```python
        from typing import Optional
        from docarray import BaseDoc
        from docarray.typing import AudioBytes, AudioNdArray, AudioUrl


        class MyAudio(BaseDoc):
            url: AudioUrl
            tensor: Optional[AudioNdArray] = None
            bytes_: Optional[AudioBytes] = None
            frame_rate: Optional[float] = None


        doc = MyAudio(url='https://www.kozco.com/tech/piano2.wav')
        doc.bytes_ = doc.url.load_bytes()
        doc.tensor, doc.frame_rate = doc.bytes_.load()

        # Note this is equivalent to do

        doc.tensor, doc.frame_rate = doc.url.load()

        assert isinstance(doc.tensor, AudioNdArray)
        ```

        ---
        :return: tuple of an [`AudioNdArray`][docarray.typing.AudioNdArray] representing the
            audio bytes content, and an integer representing the frame rate.
        """
        pydub = import_library('pydub', raise_error=True)  # noqa: F841
        from pydub import AudioSegment

        segment = AudioSegment.from_file(io.BytesIO(self))

        # Convert to float32 using NumPy
        samples = np.array(segment.get_array_of_samples())

        # Normalise float32 array so that values are between -1.0 and +1.0
        samples_norm = samples / 2 ** (segment.sample_width * 8 - 1)
        return parse_obj_as(AudioNdArray, samples_norm), segment.frame_rate
load()

Load the Audio from the AudioBytes into an AudioNdArray.


from typing import Optional
from docarray import BaseDoc
from docarray.typing import AudioBytes, AudioNdArray, AudioUrl


class MyAudio(BaseDoc):
    url: AudioUrl
    tensor: Optional[AudioNdArray] = None
    bytes_: Optional[AudioBytes] = None
    frame_rate: Optional[float] = None


doc = MyAudio(url='https://www.kozco.com/tech/piano2.wav')
doc.bytes_ = doc.url.load_bytes()
doc.tensor, doc.frame_rate = doc.bytes_.load()

# Note this is equivalent to do

doc.tensor, doc.frame_rate = doc.url.load()

assert isinstance(doc.tensor, AudioNdArray)

Returns:

Type Description
Tuple[AudioNdArray, int]

tuple of an AudioNdArray representing the audio bytes content, and an integer representing the frame rate.

Source code in docarray/typing/bytes/audio_bytes.py
def load(self) -> Tuple[AudioNdArray, int]:
    """
    Load the Audio from the [`AudioBytes`][docarray.typing.AudioBytes] into an
    [`AudioNdArray`][docarray.typing.AudioNdArray].

    ---

    ```python
    from typing import Optional
    from docarray import BaseDoc
    from docarray.typing import AudioBytes, AudioNdArray, AudioUrl


    class MyAudio(BaseDoc):
        url: AudioUrl
        tensor: Optional[AudioNdArray] = None
        bytes_: Optional[AudioBytes] = None
        frame_rate: Optional[float] = None


    doc = MyAudio(url='https://www.kozco.com/tech/piano2.wav')
    doc.bytes_ = doc.url.load_bytes()
    doc.tensor, doc.frame_rate = doc.bytes_.load()

    # Note this is equivalent to do

    doc.tensor, doc.frame_rate = doc.url.load()

    assert isinstance(doc.tensor, AudioNdArray)
    ```

    ---
    :return: tuple of an [`AudioNdArray`][docarray.typing.AudioNdArray] representing the
        audio bytes content, and an integer representing the frame rate.
    """
    pydub = import_library('pydub', raise_error=True)  # noqa: F841
    from pydub import AudioSegment

    segment = AudioSegment.from_file(io.BytesIO(self))

    # Convert to float32 using NumPy
    samples = np.array(segment.get_array_of_samples())

    # Normalise float32 array so that values are between -1.0 and +1.0
    samples_norm = samples / 2 ** (segment.sample_width * 8 - 1)
    return parse_obj_as(AudioNdArray, samples_norm), segment.frame_rate

base_bytes

BaseBytes

Bases: bytes, AbstractType

Bytes type for docarray

Source code in docarray/typing/bytes/base_bytes.py
class BaseBytes(bytes, AbstractType):
    """
    Bytes type for docarray
    """

    @classmethod
    def _docarray_validate(
        cls: Type[T],
        value: Any,
    ) -> T:
        value = bytes_validator(value)
        return cls(value)

    @classmethod
    def from_protobuf(cls: Type[T], pb_msg: T) -> T:
        return parse_obj_as(cls, pb_msg)

    def _to_node_protobuf(self: T) -> 'NodeProto':
        from docarray.proto import NodeProto

        return NodeProto(blob=self, type=self._proto_type_name)

    if is_pydantic_v2:

        @classmethod
        @abstractmethod
        def __get_pydantic_core_schema__(
            cls, _source_type: Any, _handler: 'GetCoreSchemaHandler'
        ) -> 'core_schema.CoreSchema':
            return core_schema.general_after_validator_function(
                cls.validate,
                core_schema.bytes_schema(),
            )

image_bytes

ImageBytes

Bases: BaseBytes

Bytes that store an image and that can be load into an image tensor

Source code in docarray/typing/bytes/image_bytes.py
@_register_proto(proto_type_name='image_bytes')
class ImageBytes(BaseBytes):
    """
    Bytes that store an image and that can be load into an image tensor
    """

    def load_pil(
        self,
    ) -> 'PILImage.Image':
        """
        Load the image from the bytes into a `PIL.Image.Image` instance

        ---

        ```python
        from pydantic import parse_obj_as

        from docarray import BaseDoc
        from docarray.typing import ImageUrl

        img_url = "https://upload.wikimedia.org/wikipedia/commons/8/80/Dag_Sebastian_Ahlander_at_G%C3%B6teborg_Book_Fair_2012b.jpg"

        img_url = parse_obj_as(ImageUrl, img_url)
        img = img_url.load_pil()

        from PIL.Image import Image

        assert isinstance(img, Image)
        ```

        ---
        :return: a Pillow image
        """
        PIL = import_library('PIL', raise_error=True)  # noqa: F841
        from PIL import Image as PILImage

        return PILImage.open(BytesIO(self))

    def load(
        self,
        width: Optional[int] = None,
        height: Optional[int] = None,
        axis_layout: Tuple[str, str, str] = ('H', 'W', 'C'),
    ) -> ImageNdArray:
        """
        Load the image from the [`ImageBytes`][docarray.typing.ImageBytes] into an
        [`ImageNdArray`][docarray.typing.ImageNdArray].

        ---

        ```python
        from docarray import BaseDoc
        from docarray.typing import ImageNdArray, ImageUrl


        class MyDoc(BaseDoc):
            img_url: ImageUrl


        doc = MyDoc(
            img_url="https://upload.wikimedia.org/wikipedia/commons/8/80/"
            "Dag_Sebastian_Ahlander_at_G%C3%B6teborg_Book_Fair_2012b.jpg"
        )

        img_tensor = doc.img_url.load()
        assert isinstance(img_tensor, ImageNdArray)

        img_tensor = doc.img_url.load(height=224, width=224)
        assert img_tensor.shape == (224, 224, 3)

        layout = ('C', 'W', 'H')
        img_tensor = doc.img_url.load(height=100, width=200, axis_layout=layout)
        assert img_tensor.shape == (3, 200, 100)
        ```

        ---

        :param width: width of the image tensor.
        :param height: height of the image tensor.
        :param axis_layout: ordering of the different image axes.
            'H' = height, 'W' = width, 'C' = color channel
        :return: [`ImageNdArray`][docarray.typing.ImageNdArray] representing the image as RGB values
        """
        raw_img = self.load_pil()

        if width or height:
            new_width = width or raw_img.width
            new_height = height or raw_img.height
            raw_img = raw_img.resize((new_width, new_height))
        try:
            tensor = np.array(raw_img.convert('RGB'))
        except Exception:
            tensor = np.array(raw_img)

        img = self._move_channel_axis(tensor, axis_layout=axis_layout)
        return parse_obj_as(ImageNdArray, img)

    @staticmethod
    def _move_channel_axis(
        tensor: np.ndarray, axis_layout: Tuple[str, str, str] = ('H', 'W', 'C')
    ) -> np.ndarray:
        """Moves channel axis around."""
        channel_to_offset = {'H': 0, 'W': 1, 'C': 2}
        permutation = tuple(channel_to_offset[axis] for axis in axis_layout)
        return np.transpose(tensor, permutation)
load(width=None, height=None, axis_layout=('H', 'W', 'C'))

Load the image from the ImageBytes into an ImageNdArray.


from docarray import BaseDoc
from docarray.typing import ImageNdArray, ImageUrl


class MyDoc(BaseDoc):
    img_url: ImageUrl


doc = MyDoc(
    img_url="https://upload.wikimedia.org/wikipedia/commons/8/80/"
    "Dag_Sebastian_Ahlander_at_G%C3%B6teborg_Book_Fair_2012b.jpg"
)

img_tensor = doc.img_url.load()
assert isinstance(img_tensor, ImageNdArray)

img_tensor = doc.img_url.load(height=224, width=224)
assert img_tensor.shape == (224, 224, 3)

layout = ('C', 'W', 'H')
img_tensor = doc.img_url.load(height=100, width=200, axis_layout=layout)
assert img_tensor.shape == (3, 200, 100)

Parameters:

Name Type Description Default
width Optional[int]

width of the image tensor.

None
height Optional[int]

height of the image tensor.

None
axis_layout Tuple[str, str, str]

ordering of the different image axes. 'H' = height, 'W' = width, 'C' = color channel

('H', 'W', 'C')

Returns:

Type Description
ImageNdArray

ImageNdArray representing the image as RGB values

Source code in docarray/typing/bytes/image_bytes.py
def load(
    self,
    width: Optional[int] = None,
    height: Optional[int] = None,
    axis_layout: Tuple[str, str, str] = ('H', 'W', 'C'),
) -> ImageNdArray:
    """
    Load the image from the [`ImageBytes`][docarray.typing.ImageBytes] into an
    [`ImageNdArray`][docarray.typing.ImageNdArray].

    ---

    ```python
    from docarray import BaseDoc
    from docarray.typing import ImageNdArray, ImageUrl


    class MyDoc(BaseDoc):
        img_url: ImageUrl


    doc = MyDoc(
        img_url="https://upload.wikimedia.org/wikipedia/commons/8/80/"
        "Dag_Sebastian_Ahlander_at_G%C3%B6teborg_Book_Fair_2012b.jpg"
    )

    img_tensor = doc.img_url.load()
    assert isinstance(img_tensor, ImageNdArray)

    img_tensor = doc.img_url.load(height=224, width=224)
    assert img_tensor.shape == (224, 224, 3)

    layout = ('C', 'W', 'H')
    img_tensor = doc.img_url.load(height=100, width=200, axis_layout=layout)
    assert img_tensor.shape == (3, 200, 100)
    ```

    ---

    :param width: width of the image tensor.
    :param height: height of the image tensor.
    :param axis_layout: ordering of the different image axes.
        'H' = height, 'W' = width, 'C' = color channel
    :return: [`ImageNdArray`][docarray.typing.ImageNdArray] representing the image as RGB values
    """
    raw_img = self.load_pil()

    if width or height:
        new_width = width or raw_img.width
        new_height = height or raw_img.height
        raw_img = raw_img.resize((new_width, new_height))
    try:
        tensor = np.array(raw_img.convert('RGB'))
    except Exception:
        tensor = np.array(raw_img)

    img = self._move_channel_axis(tensor, axis_layout=axis_layout)
    return parse_obj_as(ImageNdArray, img)
load_pil()

Load the image from the bytes into a PIL.Image.Image instance


from pydantic import parse_obj_as

from docarray import BaseDoc
from docarray.typing import ImageUrl

img_url = "https://upload.wikimedia.org/wikipedia/commons/8/80/Dag_Sebastian_Ahlander_at_G%C3%B6teborg_Book_Fair_2012b.jpg"

img_url = parse_obj_as(ImageUrl, img_url)
img = img_url.load_pil()

from PIL.Image import Image

assert isinstance(img, Image)

Returns:

Type Description
Image

a Pillow image

Source code in docarray/typing/bytes/image_bytes.py
def load_pil(
    self,
) -> 'PILImage.Image':
    """
    Load the image from the bytes into a `PIL.Image.Image` instance

    ---

    ```python
    from pydantic import parse_obj_as

    from docarray import BaseDoc
    from docarray.typing import ImageUrl

    img_url = "https://upload.wikimedia.org/wikipedia/commons/8/80/Dag_Sebastian_Ahlander_at_G%C3%B6teborg_Book_Fair_2012b.jpg"

    img_url = parse_obj_as(ImageUrl, img_url)
    img = img_url.load_pil()

    from PIL.Image import Image

    assert isinstance(img, Image)
    ```

    ---
    :return: a Pillow image
    """
    PIL = import_library('PIL', raise_error=True)  # noqa: F841
    from PIL import Image as PILImage

    return PILImage.open(BytesIO(self))

video_bytes

VideoBytes

Bases: BaseBytes

Bytes that store a video and that can be load into a video tensor

Source code in docarray/typing/bytes/video_bytes.py
@_register_proto(proto_type_name='video_bytes')
class VideoBytes(BaseBytes):
    """
    Bytes that store a video and that can be load into a video tensor
    """

    def load(self, **kwargs) -> VideoLoadResult:
        """
        Load the video from the bytes into a VideoLoadResult object consisting of:

        - a [`VideoNdArray`][docarray.typing.VideoNdArray] (`VideoLoadResult.video`)
        - an [`AudioNdArray`][docarray.typing.AudioNdArray] (`VideoLoadResult.audio`)
        - an [`NdArray`][docarray.typing.NdArray] containing the key frame indices (`VideoLoadResult.key_frame_indices`).

        ---

        ```python
        from docarray import BaseDoc
        from docarray.typing import AudioNdArray, NdArray, VideoNdArray, VideoUrl


        class MyDoc(BaseDoc):
            video_url: VideoUrl


        doc = MyDoc(
            video_url='https://github.com/docarray/docarray/blob/main/tests/toydata/mov_bbb.mp4?raw=true'
        )

        video, audio, key_frame_indices = doc.video_url.load()
        assert isinstance(video, VideoNdArray)
        assert isinstance(audio, AudioNdArray)
        assert isinstance(key_frame_indices, NdArray)
        ```

        ---


        :param kwargs: supports all keyword arguments that are being supported by
            av.open() as described [here](https://pyav.org/docs/stable/api/_globals.html?highlight=open#av.open)
        :return: a `VideoLoadResult` instance with video, audio and keyframe indices
        """
        if TYPE_CHECKING:
            import av
        else:
            av = import_library('av')

        with av.open(BytesIO(self), **kwargs) as container:
            audio_frames: List[np.ndarray] = []
            video_frames: List[np.ndarray] = []
            keyframe_indices: List[int] = []

            for frame in container.decode():
                if type(frame) == av.audio.frame.AudioFrame:
                    audio_frames.append(frame.to_ndarray())
                elif type(frame) == av.video.frame.VideoFrame:
                    if frame.key_frame == 1:
                        curr_index = len(video_frames)
                        keyframe_indices.append(curr_index)

                    video_frames.append(frame.to_ndarray(format='rgb24'))

        if len(audio_frames) == 0:
            audio = parse_obj_as(AudioNdArray, np.array(audio_frames))
        else:
            audio = parse_obj_as(AudioNdArray, np.stack(audio_frames))

        video = parse_obj_as(VideoNdArray, np.stack(video_frames))
        indices = parse_obj_as(NdArray, keyframe_indices)

        return VideoLoadResult(video=video, audio=audio, key_frame_indices=indices)
load(**kwargs)

Load the video from the bytes into a VideoLoadResult object consisting of:

  • a VideoNdArray (VideoLoadResult.video)
  • an AudioNdArray (VideoLoadResult.audio)
  • an NdArray containing the key frame indices (VideoLoadResult.key_frame_indices).

from docarray import BaseDoc
from docarray.typing import AudioNdArray, NdArray, VideoNdArray, VideoUrl


class MyDoc(BaseDoc):
    video_url: VideoUrl


doc = MyDoc(
    video_url='https://github.com/docarray/docarray/blob/main/tests/toydata/mov_bbb.mp4?raw=true'
)

video, audio, key_frame_indices = doc.video_url.load()
assert isinstance(video, VideoNdArray)
assert isinstance(audio, AudioNdArray)
assert isinstance(key_frame_indices, NdArray)

Parameters:

Name Type Description Default
kwargs

supports all keyword arguments that are being supported by av.open() as described here

{}

Returns:

Type Description
VideoLoadResult

a VideoLoadResult instance with video, audio and keyframe indices

Source code in docarray/typing/bytes/video_bytes.py
def load(self, **kwargs) -> VideoLoadResult:
    """
    Load the video from the bytes into a VideoLoadResult object consisting of:

    - a [`VideoNdArray`][docarray.typing.VideoNdArray] (`VideoLoadResult.video`)
    - an [`AudioNdArray`][docarray.typing.AudioNdArray] (`VideoLoadResult.audio`)
    - an [`NdArray`][docarray.typing.NdArray] containing the key frame indices (`VideoLoadResult.key_frame_indices`).

    ---

    ```python
    from docarray import BaseDoc
    from docarray.typing import AudioNdArray, NdArray, VideoNdArray, VideoUrl


    class MyDoc(BaseDoc):
        video_url: VideoUrl


    doc = MyDoc(
        video_url='https://github.com/docarray/docarray/blob/main/tests/toydata/mov_bbb.mp4?raw=true'
    )

    video, audio, key_frame_indices = doc.video_url.load()
    assert isinstance(video, VideoNdArray)
    assert isinstance(audio, AudioNdArray)
    assert isinstance(key_frame_indices, NdArray)
    ```

    ---


    :param kwargs: supports all keyword arguments that are being supported by
        av.open() as described [here](https://pyav.org/docs/stable/api/_globals.html?highlight=open#av.open)
    :return: a `VideoLoadResult` instance with video, audio and keyframe indices
    """
    if TYPE_CHECKING:
        import av
    else:
        av = import_library('av')

    with av.open(BytesIO(self), **kwargs) as container:
        audio_frames: List[np.ndarray] = []
        video_frames: List[np.ndarray] = []
        keyframe_indices: List[int] = []

        for frame in container.decode():
            if type(frame) == av.audio.frame.AudioFrame:
                audio_frames.append(frame.to_ndarray())
            elif type(frame) == av.video.frame.VideoFrame:
                if frame.key_frame == 1:
                    curr_index = len(video_frames)
                    keyframe_indices.append(curr_index)

                video_frames.append(frame.to_ndarray(format='rgb24'))

    if len(audio_frames) == 0:
        audio = parse_obj_as(AudioNdArray, np.array(audio_frames))
    else:
        audio = parse_obj_as(AudioNdArray, np.stack(audio_frames))

    video = parse_obj_as(VideoNdArray, np.stack(video_frames))
    indices = parse_obj_as(NdArray, keyframe_indices)

    return VideoLoadResult(video=video, audio=audio, key_frame_indices=indices)