Source code for docarray.array.storage.elastic.seqlike
from typing import Union, Iterable, Dict, List
import warnings
from docarray.array.storage.base.seqlike import BaseSequenceLikeMixin
from docarray import Document
[docs]class SequenceLikeMixin(BaseSequenceLikeMixin):
"""Implement sequence-like methods for DocumentArray with Elastic as storage"""
def __eq__(self, other):
"""Compare this object to the other, returns True if and only if other
as the same type as self and other has the same meta information
:param other: the other object to check for equality
:return: ``True`` if other is equal to self
"""
# two DAW are considered as the same if they have the same client meta data
return (
type(self) is type(other)
and self._client.get_meta() == other._client.get_meta()
and self._config == other._config
)
def __len__(self):
"""Return the length of :class:`DocumentArray` that uses Elastic as storage
:return: the length of this :class:`DocumentArrayElastic` object
"""
try:
return self._client.count(index=self._config.index_name)["count"]
except:
return 0
def __contains__(self, x: Union[str, 'Document']):
"""Check if ``x`` is contained in this :class:`DocumentArray` with Elastic storage
:param x: the id of the document to check or the document object itself
:return: True if ``x`` is contained in self
"""
if isinstance(x, str):
return self._doc_id_exists(x)
elif isinstance(x, Document):
return self._doc_id_exists(x.id)
else:
return False
def __repr__(self):
"""Return the string representation of :class:`DocumentArrayElastic` object
:return: string representation of this object
"""
return f'<{self.__class__.__name__} (length={len(self)}) at {id(self)}>'
@staticmethod
def _parse_index_ids_from_bulk_info(
accumulated_info: List[Dict],
) -> Dict[str, List[int]]:
"""Parse ids from bulk info of failed send request to ES operation
:param accumulated_info: accumulated info of failed operation
:return: dict containing failed index ids of each operation type
"""
parsed_ids = {}
for info in accumulated_info:
for _op_type in info.keys():
if '_id' in info[_op_type]:
if _op_type not in parsed_ids:
parsed_ids[_op_type] = []
parsed_ids[_op_type].append(info[_op_type]['_id'])
return parsed_ids
def _upload_batch(self, docs: Iterable['Document'], **kwargs) -> List[int]:
requests = [self._document_to_elastic(doc) for doc in docs]
accumulated_info = self._send_requests(requests, **kwargs)
self._refresh(self._config.index_name)
successful_ids = self._parse_index_ids_from_bulk_info(accumulated_info)
if 'index' not in successful_ids:
return []
return successful_ids['index']
def _extend(self, docs: Iterable['Document'], **kwargs):
docs = list(docs)
successful_indexed_ids = self._upload_batch(docs, **kwargs)
if self._list_like:
self._offset2ids.extend(
[
_id
for _id in successful_indexed_ids
if _id not in self._offset2ids.ids
]
)
if len(successful_indexed_ids) != len(docs):
doc_ids = [doc.id for doc in docs]
failed_index_ids = set(doc_ids) - set(successful_indexed_ids)
err_msg = f'fail to add Documents with ids: {failed_index_ids}'
warnings.warn(err_msg)
raise IndexError(err_msg)