Source code for docarray.array.storage.opensearch.backend

import copy
import uuid
from typing import (
    Optional,
    TYPE_CHECKING,
    Union,
    Dict,
    List,
    Mapping,
    Any,
    Tuple,
    Iterable,
)
from dataclasses import dataclass, field

from docarray.array.storage.base.backend import BaseBackendMixin, TypeMap
from docarray import Document
from docarray.helper import dataclass_from_dict, _safe_cast_int

import numpy as np

from opensearchpy import OpenSearch
from opensearchpy.helpers import parallel_bulk
import warnings

if TYPE_CHECKING:
    from docarray.typing import (
        DocumentArraySourceType,
    )
    from docarray.typing import DocumentArraySourceType, ArrayType


[docs]@dataclass class OpenSearchConfig: n_dim: int # dims in opensearch distance: str = 'cosinesimil' # similarity in opensearch hosts: Union[ str, List[Union[str, Mapping[str, Union[str, int]]]], None ] = 'http://localhost:9900' index_name: Optional[str] = None list_like: bool = True opensearch_config: Dict[str, Any] = field(default_factory=dict) index_text: bool = False tag_indices: List[str] = field(default_factory=list) batch_size: int = 64 ef_construction: Optional[int] = 512 m: Optional[int] = 16 columns: Optional[Union[List[Tuple[str, str]], Dict[str, str]]] = None engine: str = 'nmslib' ef_search: Optional[int] = None encoder: Optional[str] = None algorithm: str = 'hnsw' root_id: bool = True
_banned_indexname_chars = ['[', ' ', '"', '*', '\\', '<', '|', ',', '>', '/', '?', ']'] def _sanitize_index_name(name): new_name = name for char in _banned_indexname_chars: new_name = new_name.replace(char, '') return new_name
[docs]class BackendMixin(BaseBackendMixin): TYPE_MAP = { 'str': TypeMap(type='text', converter=str), 'float': TypeMap(type='float', converter=float), 'int': TypeMap(type='integer', converter=_safe_cast_int), 'double': TypeMap(type='double', converter=float), 'long': TypeMap(type='long', converter=_safe_cast_int), 'bool': TypeMap(type='boolean', converter=bool), } def _init_storage( self, _docs: Optional['DocumentArraySourceType'] = None, config: Optional[Union[OpenSearchConfig, Dict]] = None, **kwargs, ): config = copy.deepcopy(config) if not config: raise ValueError('Empty config is not allowed for OpenSearch storage') elif isinstance(config, dict): config = dataclass_from_dict(OpenSearchConfig, config) if config.index_name is None: id = uuid.uuid4().hex config.index_name = 'index_name__' + id self._index_name_offset2id = 'offset2id__' + config.index_name self._config = config self._config.columns = self._normalize_columns(self._config.columns) self.n_dim = self._config.n_dim self._client = self._build_client() self._list_like = self._config.list_like self._build_offset2id_index() # Note super()._init_storage() calls _load_offset2ids which calls _get_offset2ids_meta super()._init_storage(**kwargs) if _docs is None: return elif isinstance(_docs, Iterable): self.extend(_docs) else: if isinstance(_docs, Document): self.append(_docs) def _ensure_unique_config( self, config_root: dict, config_subindex: dict, config_joined: dict, subindex_name: str, ) -> dict: if 'index_name' not in config_subindex: unique_index_name = _sanitize_index_name( config_joined['index_name'] + '_subindex_' + subindex_name ) config_joined['index_name'] = unique_index_name return config_joined def _build_offset2id_index(self): if not self._client.indices.exists(index=self._index_name_offset2id): self._client.indices.create(index=self._index_name_offset2id, ignore=[404]) def _get_offset2ids_meta(self) -> List: """Return the offset2ids stored in opensearch :return: a list containing ids :raises ValueError: error is raised if index _client is not found or no offsets are found """ if not self._client: raise ValueError('OpenSearch client does not exist') n_docs = self._client.count(index=self._index_name_offset2id)["count"] if n_docs != 0: offsets = [x for x in range(n_docs)] resp = self._client.mget( index=self._index_name_offset2id, body={'ids': offsets} ) ids = [x['_source']['blob'] for x in resp['docs']] return ids else: return [] def _build_client(self): client = OpenSearch( hosts=self._config.hosts, **self._config.opensearch_config, ) schema = self._build_schema_from_opensearch_config(self._config) if not client.indices.exists(index=self._config.index_name): client.indices.create( index=self._config.index_name, body={ 'settings': { 'index': {'knn': True, "knn.algo_param.ef_search": 100} }, 'mappings': schema['mappings'], }, ) client.indices.refresh(index=self._config.index_name) return client def _build_schema_from_opensearch_config(self, opensearch_config: OpenSearchConfig): da_schema = { 'mappings': { 'dynamic': 'true', '_source': {'enabled': 'true'}, 'properties': { 'embedding': { 'type': 'knn_vector', 'dimension': opensearch_config.n_dim, 'method': { 'name': opensearch_config.algorithm, 'space_type': opensearch_config.distance, 'engine': opensearch_config.engine, 'parameters': { # 'ef_search': opensearch_config.ef_search, TODO: add 'ef_construction': opensearch_config.ef_construction, 'm': opensearch_config.m, # 'encoder': opensearch_config.encoder, TODO: add }, }, }, 'text': {'type': 'text', 'index': opensearch_config.index_text}, }, } } if opensearch_config.tag_indices: for index in opensearch_config.tag_indices: da_schema['mappings']['properties'][index] = { 'type': 'text', 'index': True, } for col, coltype in self._config.columns.items(): da_schema['mappings']['properties'][col] = { 'type': self._map_type(coltype), 'index': True, } if self._config.m or self._config.ef_construction: index_options = { 'm': self._config.m or 16, 'ef_construction': self._config.ef_construction or 512, } if self._config.ef_search: index_options['ef_search'] = self._config.ef_search if self._config.encoder: index_options['encoder'] = self._config.encoder da_schema['mappings']['properties']['embedding']['method'][ 'parameters' ] = index_options return da_schema def _refresh(self, index_name): self._client.indices.refresh(index=index_name) def _doc_id_exists(self, doc_id): return self._client.exists(index=self._config.index_name, id=doc_id) def _send_requests(self, request, **kwargs) -> List[Dict]: """Send bulk request to OpenSearch and gather the successful info""" # for backward compatibility if 'chunk_size' not in kwargs: kwargs['chunk_size'] = self._config.batch_size accumulated_info = [] for success, info in parallel_bulk( self._client, request, raise_on_error=False, raise_on_exception=False, **kwargs, ): if not success: warnings.warn(str(info)) else: accumulated_info.append(info) return accumulated_info def _update_offset2ids_meta(self): """Update the offset2ids in opensearch""" if self._client.indices.exists(index=self._index_name_offset2id): requests = [ { '_op_type': 'index', '_id': offset_, # note offset goes here because it's what we want to get by '_index': self._index_name_offset2id, 'blob': f'{id_}', } # id here for offset_, id_ in enumerate(self._offset2ids.ids) ] self._send_requests(requests) self._client.indices.refresh(index=self._index_name_offset2id) # Clean trailing unused offsets offset_count = self._client.count(index=self._index_name_offset2id) unused_offsets = range(len(self._offset2ids.ids), offset_count['count']) if len(unused_offsets) > 0: requests = [ { '_op_type': 'delete', '_id': offset_, # note offset goes here because it's what we want to get by '_index': self._index_name_offset2id, } for offset_ in unused_offsets ] self._send_requests(requests) self._client.indices.refresh(index=self._index_name_offset2id) def _map_embedding(self, embedding: 'ArrayType') -> List[float]: from docarray.math.helper import EPSILON if embedding is None: embedding = np.zeros(self.n_dim) + EPSILON else: from docarray.math.ndarray import to_numpy_array embedding = to_numpy_array(embedding) if embedding.ndim > 1: embedding = np.asarray(embedding).squeeze() if np.all(embedding == 0): embedding = embedding + EPSILON return embedding def __getstate__(self): d = dict(self.__dict__) del d['_client'] return d def __setstate__(self, state): self.__dict__ = state self._client = self._build_client()