mirror of
https://github.com/hwchase17/langchain
synced 2024-11-06 03:20:49 +00:00
e57ebf3922
# Row-wise cosine similarity between two equal-width matrices and return the max top_k score and index, the score all greater than threshold_score. Co-authored-by: Dev 2049 <dev.dev2049@gmail.com>
57 lines
1.9 KiB
Python
57 lines
1.9 KiB
Python
"""Math utils."""
|
|
from typing import List, Optional, Tuple, Union
|
|
|
|
import numpy as np
|
|
|
|
Matrix = Union[List[List[float]], List[np.ndarray], np.ndarray]
|
|
|
|
|
|
def cosine_similarity(X: Matrix, Y: Matrix) -> np.ndarray:
|
|
"""Row-wise cosine similarity between two equal-width matrices."""
|
|
if len(X) == 0 or len(Y) == 0:
|
|
return np.array([])
|
|
X = np.array(X)
|
|
Y = np.array(Y)
|
|
if X.shape[1] != Y.shape[1]:
|
|
raise ValueError(
|
|
f"Number of columns in X and Y must be the same. X has shape {X.shape} "
|
|
f"and Y has shape {Y.shape}."
|
|
)
|
|
|
|
X_norm = np.linalg.norm(X, axis=1)
|
|
Y_norm = np.linalg.norm(Y, axis=1)
|
|
similarity = np.dot(X, Y.T) / np.outer(X_norm, Y_norm)
|
|
similarity[np.isnan(similarity) | np.isinf(similarity)] = 0.0
|
|
return similarity
|
|
|
|
|
|
def cosine_similarity_top_k(
|
|
X: Matrix,
|
|
Y: Matrix,
|
|
top_k: Optional[int] = 5,
|
|
score_threshold: Optional[float] = None,
|
|
) -> Tuple[List[Tuple[int, int]], List[float]]:
|
|
"""Row-wise cosine similarity with optional top-k and score threshold filtering.
|
|
|
|
Args:
|
|
X: Matrix.
|
|
Y: Matrix, same width as X.
|
|
top_k: Max number of results to return.
|
|
score_threshold: Minimum cosine similarity of results.
|
|
|
|
Returns:
|
|
Tuple of two lists. First contains two-tuples of indices (X_idx, Y_idx),
|
|
second contains corresponding cosine similarities.
|
|
"""
|
|
if len(X) == 0 or len(Y) == 0:
|
|
return [], []
|
|
score_array = cosine_similarity(X, Y)
|
|
sorted_idxs = score_array.flatten().argsort()[::-1]
|
|
top_k = top_k or len(sorted_idxs)
|
|
top_idxs = sorted_idxs[:top_k]
|
|
score_threshold = score_threshold or -1.0
|
|
top_idxs = top_idxs[score_array.flatten()[top_idxs] > score_threshold]
|
|
ret_idxs = [(x // score_array.shape[1], x % score_array.shape[1]) for x in top_idxs]
|
|
scores = score_array.flatten()[top_idxs].tolist()
|
|
return ret_idxs, scores
|