Source code for docarray.array.storage.opensearch.find

from typing import (
    TYPE_CHECKING,
    TypeVar,
    Sequence,
    List,
    Union,
    Optional,
    Dict,
)

import numpy as np

from docarray import Document, DocumentArray
from docarray.math import ndarray
from docarray.math.helper import EPSILON
from docarray.math.ndarray import to_numpy_array
from docarray.score import NamedScore
from docarray.array.mixins.find import FindMixin as BaseFindMixin


if TYPE_CHECKING:  # pragma: no cover
    import tensorflow
    import torch

    OpenSearchArrayType = TypeVar(
        'OpenSearchArrayType',
        np.ndarray,
        tensorflow.Tensor,
        torch.Tensor,
        Sequence[float],
        Dict,
    )


[docs]class FindMixin(BaseFindMixin): def _find_similar_vectors( self, query: 'OpenSearchArrayType', filter: Optional[Dict] = None, limit=10, **kwargs, ): """ Return vector search results for the input query. `script_score` will be used in filter_field is set. :param query: query vector used for vector search :param filter: filter query used for post-filtering :param limit: number of items to be retrieved :return: DocumentArray containing the closest documents to the query if it is a single query, otherwise a list of DocumentArrays containing the closest Document objects for each of the queries in `query`. """ query = to_numpy_array(query) is_all_zero = np.all(query == 0) if is_all_zero: query = query + EPSILON filter_query = {'match_all': {}} if filter: filter_query = {'bool': {'filter': filter}} knn_query = { 'size': limit, 'query': { 'script_score': { 'query': filter_query, 'script': { 'lang': 'knn', 'source': 'knn_score', 'params': { 'field': 'embedding', 'query_value': query, 'space_type': self._get_distance_metric( kwargs.get('distance') ), }, }, } }, } resp = self._client.search(index=self._config.index_name, body=knn_query) list_of_hits = resp['hits']['hits'] da = DocumentArray() for result in list_of_hits: doc = Document.from_base64(result['_source']['blob']) doc.scores['score'] = NamedScore(value=result['_score']) doc.embedding = result['_source']['embedding'] da.append(doc) return da def _get_distance_metric(self, distance=None): return distance if distance else self._config.distance def _find_similar_documents_from_text( self, query: str, index: str = 'text', filter: Union[dict, list] = None, limit: int = 10, ): """ Return keyword matches for the input query :param query: text used for keyword search :param limit: number of items to be retrieved :return: DocumentArray containing the closest documents to the query if it is a single query, otherwise a list of DocumentArrays containing the closest Document objects for each of the queries in `query`. """ query = { '_source': ['id', 'blob', 'text'], 'size': limit, 'query': { "bool": { "must": [ {"match": {index: query}}, ], 'filter': filter, } }, } resp = self._client.search(index=self._config.index_name, body=query) list_of_hits = resp['hits']['hits'] da = DocumentArray() for result in list_of_hits[:limit]: doc = Document.from_base64(result['_source']['blob']) doc.scores['score'] = NamedScore(value=result['_score']) da.append(doc) return da def _find_by_text( self, query: Union[str, List[str]], index: str = 'text', filter: Union[dict, list] = None, limit: int = 10, ): if isinstance(query, str): query = [query] return [ self._find_similar_documents_from_text( q, index=index, filter=filter, limit=limit, ) for q in query ] def _find( self, query: 'OpenSearchArrayType', limit: int = 10, filter: Optional[Dict] = None, **kwargs, ) -> List['DocumentArray']: """Returns approximate nearest neighbors given a batch of input queries. :param query: input supported to be stored in OpenSearch. This includes any from the list '[np.ndarray, tensorflow.Tensor, torch.Tensor, Sequence[float]]' :param limit: number of retrieved items :param filter: filter query used for pre-filtering :return: DocumentArray containing the closest documents to the query if it is a single query, otherwise a list of DocumentArrays containing the closest Document objects for each of the queries in `query`. """ query = np.array(query).astype(np.float32) num_rows, n_dim = ndarray.get_array_rows(query) if n_dim != 2: query = query.reshape((num_rows, -1)) return [ self._find_similar_vectors(q, filter=filter, limit=limit, **kwargs) for q in query ] def _find_with_filter(self, query: Dict, limit: Optional[Union[int, float]] = 20): resp = self._client.search( index=self._config.index_name, body={'query': query, 'size': limit} ) list_of_hits = resp['hits']['hits'] da = DocumentArray() for result in list_of_hits[:limit]: doc = Document.from_base64(result['_source']['blob']) doc.scores['score'] = NamedScore(value=result['_score']) da.append(doc) return da def _filter( self, query: Dict, limit: Optional[Union[int, float]] = 20 ) -> 'DocumentArray': return self._find_with_filter(query, limit=limit)