Source code for docarray.math.distance.tensorflow

from typing import TYPE_CHECKING

import tensorflow as tf

if TYPE_CHECKING:  # pragma: no cover
    from tensorflow import Tensor
    import numpy


def _get_tf_device(device: str):
    return tf.device('/GPU:0') if device == 'cuda' else tf.device('/CPU:0')


[docs]def cosine( x_mat: 'Tensor', y_mat: 'Tensor', eps: float = 1e-7, device: str = 'cpu' ) -> 'numpy.ndarray': """Cosine distance between each row in x_mat and each row in y_mat. :param x_mat: np.ndarray with ndim=2 :param y_mat: np.ndarray with ndim=2 :param eps: a small jitter to avoid divde by zero :param device: the computational device for `embed_model`, can be either `cpu` or `cuda`. :return: np.ndarray with ndim=2 """ with _get_tf_device(device): normalize_a = tf.nn.l2_normalize(x_mat, 1, epsilon=eps) normalize_b = tf.nn.l2_normalize(y_mat, 1, epsilon=eps) distance = 1 - tf.matmul(normalize_a, normalize_b, transpose_b=True) return distance.numpy()
[docs]def sqeuclidean( x_mat: 'Tensor', y_mat: 'Tensor', device: str = 'cpu' ) -> 'numpy.ndarray': """Squared euclidean distance between each row in x_mat and each row in y_mat. :param x_mat: tensorflow array with ndim=2 :param y_mat: tensorflow array with ndim=2 :param device: the computational device for `embed_model`, can be either `cpu` or `cuda`. :return: np.ndarray with ndim=2 """ device = tf.device('/GPU:0') if device == 'cuda' else tf.device('/CPU:0') with _get_tf_device(device): return tf.reduce_sum( (tf.expand_dims(x_mat, 1) - tf.expand_dims(y_mat, 0)) ** 2, 2 ).numpy()
[docs]def euclidean(x_mat: 'Tensor', y_mat: 'Tensor', device: str = 'cpu') -> 'numpy.ndarray': """Euclidean distance between each row in x_mat and each row in y_mat. :param x_mat: tensorflow array with ndim=2 :param y_mat: tensorflow array with ndim=2 :param device: the computational device for `embed_model`, can be either `cpu` or `cuda`. :return: np.ndarray with ndim=2 """ device = tf.device('/GPU:0') if device == 'cuda' else tf.device('/CPU:0') with _get_tf_device(device): return tf.sqrt( tf.reduce_sum((tf.expand_dims(x_mat, 1) - tf.expand_dims(y_mat, 0)) ** 2, 2) ).numpy()