Source code for docarray.array.storage.sqlite.getsetdel

from operator import itemgetter
from typing import Sequence, Iterable

from docarray.array.storage.base.getsetdel import BaseGetSetDelMixin
from docarray.array.storage.base.helper import Offset2ID
from docarray import Document


[docs]class GetSetDelMixin(BaseGetSetDelMixin): """Implement required and derived functions that power `getitem`, `setitem`, `delitem`""" # essential methods start def _del_doc_by_id(self, _id: str): self._sql(f'DELETE FROM {self._table_name} WHERE doc_id=?', (_id,)) self._save_offset2ids() self._commit() def _set_doc_by_id(self, _id: str, value: 'Document'): self._sql( f'UPDATE {self._table_name} SET serialized_value=?, doc_id=? WHERE doc_id=?', (value, value.id, _id), ) self._commit() def _get_doc_by_id(self, id: str) -> 'Document': r = self._sql( f'SELECT serialized_value FROM {self._table_name} WHERE doc_id = ?', (id,) ) res = r.fetchone() if res is None: raise KeyError(f'Can not find Document with id=`{id}`') return res[0] # essentials end here # now start the optimized bulk methods def _get_docs_by_offsets(self, offsets: Sequence[int]) -> Iterable['Document']: ids = [self._offset2ids.get_id(offset) for offset in offsets] return self._get_docs_by_ids(ids) def _clear_storage(self): self._sql(f'DELETE FROM {self._table_name}') self._commit() def _del_docs_by_ids(self, ids: str) -> Iterable['Document']: self._sql( f"DELETE FROM {self._table_name} WHERE doc_id in ({','.join(['?'] * len(ids))})", ids, ) self._save_offset2ids() self._commit() def _load_offset2ids(self): if self._list_like: r = self._sql( f"SELECT doc_id FROM {self._table_name} ORDER BY item_order", ) self._offset2ids = Offset2ID( list(map(itemgetter(0), r)), list_like=self._list_like ) else: self._offset2ids = Offset2ID([], list_like=self._list_like) def _save_offset2ids(self): if self._list_like: for offset, doc_id in enumerate(self._offset2ids): self._sql( f""" UPDATE {self._table_name} SET item_order = ? WHERE {self._table_name}.doc_id = ? """, (offset, doc_id), ) self._commit() def _del_docs(self, ids): super()._del_docs(ids) self._save_offset2ids() def _del_doc_by_offset(self, offset: int): super()._del_doc_by_offset(offset) self._save_offset2ids()