Source code for docarray.array.storage.redis.seqlike

from typing import Iterable, Union

from docarray import Document, DocumentArray
from docarray.array.storage.base.seqlike import BaseSequenceLikeMixin


[docs]class SequenceLikeMixin(BaseSequenceLikeMixin): """Implement sequence-like methods for DocumentArray with Redis as storage""" def __eq__(self, other): """Compare this object to the other, returns True if and only if other as the same type as self and other has the same meta information :param other: the other object to check for equality :return: ``True`` if other is equal to self """ # two DA are considered as the same if they have the same client meta data return ( type(self) is type(other) and self._client.client_info() == other._client.client_info() and self._config == other._config ) def __len__(self): """Return the length of :class:`DocumentArray` that uses Redis as storage :return: the length of this :class:`DocumentArrayRedis` object """ if self._list_like: return len(self._offset2ids) try: lua_script = f'return #redis.pcall("keys", "{self._config.index_name}:*")' cmd = self._client.register_script(lua_script) return cmd() except: return 0 def __contains__(self, x: Union[str, 'Document']): """Check if ``x`` is contained in this :class:`DocumentArray` with Redis storage :param x: the id of the document to check or the document object itself :return: True if ``x`` is contained in self """ if isinstance(x, str): return self._doc_id_exists(x) elif isinstance(x, Document): return self._doc_id_exists(x.id) else: return False def __repr__(self): """Return the string representation of :class:`DocumentArrayRedis` object :return: string representation of this object """ return f'<DocumentArray[Redis] (length={len(self)}) at {id(self)}>' def _upload_batch(self, batch_of_docs: Iterable['Document']): pipe = self._client.pipeline() for doc in batch_of_docs: payload = self._document_to_redis(doc) pipe.hset(self._doc_prefix + doc.id, mapping=payload) pipe.execute() def _extend(self, docs: Iterable['Document']): da = DocumentArray(docs) for batch_of_docs in da.batch(self._config.batch_size): self._upload_batch(batch_of_docs) if self._list_like: self._offset2ids.extend(batch_of_docs[:, 'id'])