Source code for docarray.array.storage.elastic.getsetdel

from typing import Iterable, Dict, Sequence

from docarray.array.storage.base.getsetdel import BaseGetSetDelMixin
from docarray.array.storage.base.helper import Offset2ID
from docarray import Document


[docs]class GetSetDelMixin(BaseGetSetDelMixin): """Provide concrete implementation for ``__getitem__``, ``__setitem__``, and ``__delitem__`` for ``DocumentArrayElastic``""" MAX_ES_RETURNED_DOCS = 10000 def _document_to_elastic(self, doc: 'Document') -> Dict: extra_columns = { col: doc.tags.get(col) for col, _ in self._config.columns.items() } request = { '_op_type': 'index', '_id': doc.id, '_index': self._config.index_name, 'embedding': self._map_embedding(doc.embedding), 'blob': doc.to_base64(), **extra_columns, } if self._config.tag_indices: for index in self._config.tag_indices: request[index] = doc.tags.get(index) if doc.text: request['text'] = doc.text return request def _getitem(self, doc_id: str) -> 'Document': """Helper method for getting item with elastic as storage :param doc_id: id of the document :raises KeyError: raise error when elastic id does not exist in storage :return: Document """ try: result = self._client.get(index=self._config.index_name, id=doc_id) doc = Document.from_base64(result['_source']['blob']) return doc except Exception as ex: raise KeyError(doc_id) from ex def _get_doc_by_id(self, _id: str) -> 'Document': """Concrete implementation of base class' ``_get_doc_by_id`` :param _id: the id of the document :return: the retrieved document from elastic """ return self._getitem(_id) def _get_docs_by_ids(self, ids: Sequence[str]) -> Iterable['Document']: """Concrete implementation of base class' ``_get_docs_by_ids`` :param ids: ids of the document :return: Iterable[Document] """ accumulated_docs = [] accumulated_docs_id_not_found = [] if not ids: return accumulated_docs # Handle if doc len is more than MAX_ES_RETURNED_DOCS for pos in range(0, len(ids), self.MAX_ES_RETURNED_DOCS): es_docs = self._client.mget( index=self._config.index_name, ids=ids[pos : pos + self.MAX_ES_RETURNED_DOCS], )['docs'] for doc in es_docs: if doc['found']: accumulated_docs.append( Document.from_base64(doc['_source']['blob']) ) else: accumulated_docs_id_not_found.append(doc['_id']) if accumulated_docs_id_not_found: raise KeyError(accumulated_docs_id_not_found, accumulated_docs) return accumulated_docs def _set_doc_by_id(self, _id: str, value: 'Document'): """Concrete implementation of base class' ``_set_doc_by_id`` :param _id: the id of doc to update :param value: the document to update to """ if _id != value.id: self._del_doc_by_id(_id) request = [self._document_to_elastic(value)] self._send_requests(request) self._refresh(self._config.index_name) def _set_docs_by_ids(self, ids, docs: Iterable['Document'], mismatch_ids: Dict): """Overridden implementation of _set_docs_by_ids in order to add docs in batches and flush at the end :param ids: the ids used for indexing """ for _id, doc in zip(ids, docs): self._set_doc_by_id(_id, doc) self._refresh(self._config.index_name) def _del_doc_by_id(self, _id: str): """Concrete implementation of base class' ``_del_doc_by_id`` :param _id: the id of the document to delete """ if self._doc_id_exists(_id): self._client.delete(index=self._config.index_name, id=_id) self._refresh(self._config.index_name) def _clear_storage(self): """Concrete implementation of base class' ``_clear_storage``""" self._client.indices.delete(index=self._config.index_name) self._build_index() def _load_offset2ids(self): if self._list_like: ids = self._get_offset2ids_meta() self._offset2ids = Offset2ID(ids, list_like=self._list_like) else: self._offset2ids = Offset2ID([], list_like=self._list_like) def _save_offset2ids(self): if self._list_like: self._update_offset2ids_meta()