Source code for docarray.array.storage.weaviate.getsetdel

from typing import Iterable, Dict

from docarray.array.storage.base.getsetdel import BaseGetSetDelMixin
from docarray.array.storage.base.helper import Offset2ID
from docarray import Document


[docs]class GetSetDelMixin(BaseGetSetDelMixin): """Provide concrete implementation for ``__getitem__``, ``__setitem__``, and ``__delitem__`` for ``DocumentArrayWeaviate``""" def _getitem(self, wid: str) -> 'Document': """Helper method for getting item with weaviate as storage :param wid: weaviate id :raises KeyError: raise error when weaviate id does not exist in storage :return: Document """ try: resp = self._client.data_object.get_by_id( wid, with_vector=True, class_name=self._class_name ) return Document.from_base64( resp['properties']['_serialized'], **self._serialize_config ) except Exception as ex: raise KeyError(wid) from ex def _get_doc_by_id(self, _id: str) -> 'Document': """Concrete implementation of base class' ``_get_doc_by_id`` :param _id: the id of the document :return: the retrieved document from weaviate """ return self._getitem(self._map_id(_id)) def _set_doc_by_id(self, _id: str, value: 'Document', flush: bool = True): """Concrete implementation of base class' ``_set_doc_by_id`` :param _id: the id of doc to update :param value: the document to update to """ if _id != value.id: self._del_doc_by_id(_id) payload = self._doc2weaviate_create_payload(value) self._client.batch.add_data_object(**payload) if flush: self._client.batch.flush() def _set_docs_by_ids(self, ids, docs: Iterable['Document'], mismatch_ids: Dict): """Overridden implementation of _set_docs_by_ids in order to add docs in batches and flush at the end :param ids: the ids used for indexing """ for _id, doc in zip(ids, docs): self._set_doc_by_id(_id, doc, flush=False) self._client.batch.flush() def _del_doc_by_id(self, _id: str): """Concrete implementation of base class' ``_del_doc_by_id`` :param _id: the id of the document to delete """ if self._client.data_object.exists( self._map_id(_id), class_name=self._class_name ): self._client.data_object.delete( self._map_id(_id), class_name=self._class_name ) def _clear_storage(self): """Concrete implementation of base class' ``_clear_storage``""" if self._class_name: self._client.schema.delete_class(self._class_name) self._client.schema.delete_class(self._meta_name) self._load_or_create_weaviate_schema() def _load_offset2ids(self): if self._list_like: ids, self._offset2ids_wid = self._get_offset2ids_meta() self._offset2ids = Offset2ID(ids, list_like=self._list_like) else: self._offset2ids = Offset2ID([], list_like=self._list_like) def _save_offset2ids(self): if self._list_like: self._update_offset2ids_meta()