diff --git a/langchain/math_utils.py b/langchain/math_utils.py index f9e5e0dd..76c6ed3d 100644 --- a/langchain/math_utils.py +++ b/langchain/math_utils.py @@ -1,5 +1,5 @@ """Math utils.""" -from typing import List, Union +from typing import List, Optional, Tuple, Union import numpy as np @@ -23,3 +23,34 @@ def cosine_similarity(X: Matrix, Y: Matrix) -> np.ndarray: similarity = np.dot(X, Y.T) / np.outer(X_norm, Y_norm) similarity[np.isnan(similarity) | np.isinf(similarity)] = 0.0 return similarity + + +def cosine_similarity_top_k( + X: Matrix, + Y: Matrix, + top_k: Optional[int] = 5, + score_threshold: Optional[float] = None, +) -> Tuple[List[Tuple[int, int]], List[float]]: + """Row-wise cosine similarity with optional top-k and score threshold filtering. + + Args: + X: Matrix. + Y: Matrix, same width as X. + top_k: Max number of results to return. + score_threshold: Minimum cosine similarity of results. + + Returns: + Tuple of two lists. First contains two-tuples of indices (X_idx, Y_idx), + second contains corresponding cosine similarities. + """ + if len(X) == 0 or len(Y) == 0: + return [], [] + score_array = cosine_similarity(X, Y) + sorted_idxs = score_array.flatten().argsort()[::-1] + top_k = top_k or len(sorted_idxs) + top_idxs = sorted_idxs[:top_k] + score_threshold = score_threshold or -1.0 + top_idxs = top_idxs[score_array.flatten()[top_idxs] > score_threshold] + ret_idxs = [(x // score_array.shape[1], x % score_array.shape[1]) for x in top_idxs] + scores = score_array.flatten()[top_idxs].tolist() + return ret_idxs, scores diff --git a/tests/unit_tests/test_math_utils.py b/tests/unit_tests/test_math_utils.py index 34b390a5..6b9126fe 100644 --- a/tests/unit_tests/test_math_utils.py +++ b/tests/unit_tests/test_math_utils.py @@ -2,8 +2,19 @@ from typing import List import numpy as np +import pytest -from langchain.math_utils import cosine_similarity +from langchain.math_utils import cosine_similarity, cosine_similarity_top_k + + +@pytest.fixture +def X() -> List[List[float]]: + return [[1.0, 2.0, 3.0], [0.0, 1.0, 0.0], [1.0, 2.0, 0.0]] + + +@pytest.fixture +def Y() -> List[List[float]]: + return [[0.5, 1.0, 1.5], [1.0, 0.0, 0.0], [2.0, 5.0, 2.0], [0.0, 0.0, 0.0]] def test_cosine_similarity_zero() -> None: @@ -27,13 +38,41 @@ def test_cosine_similarity_empty() -> None: assert len(cosine_similarity(empty_list, np.random.random((3, 3)))) == 0 -def test_cosine_similarity() -> None: - X = [[1.0, 2.0, 3.0], [0.0, 1.0, 0.0], [1.0, 2.0, 0.0]] - Y = [[0.5, 1.0, 1.5], [1.0, 0.0, 0.0], [2.0, 5.0, 2.0]] +def test_cosine_similarity(X: List[List[float]], Y: List[List[float]]) -> None: expected = [ - [1.0, 0.26726124, 0.83743579], - [0.53452248, 0.0, 0.87038828], - [0.5976143, 0.4472136, 0.93419873], + [1.0, 0.26726124, 0.83743579, 0.0], + [0.53452248, 0.0, 0.87038828, 0.0], + [0.5976143, 0.4472136, 0.93419873, 0.0], ] actual = cosine_similarity(X, Y) assert np.allclose(expected, actual) + + +def test_cosine_similarity_top_k(X: List[List[float]], Y: List[List[float]]) -> None: + expected_idxs = [(0, 0), (2, 2), (1, 2), (0, 2), (2, 0)] + expected_scores = [1.0, 0.93419873, 0.87038828, 0.83743579, 0.5976143] + actual_idxs, actual_scores = cosine_similarity_top_k(X, Y) + assert actual_idxs == expected_idxs + assert np.allclose(expected_scores, actual_scores) + + +def test_cosine_similarity_score_threshold( + X: List[List[float]], Y: List[List[float]] +) -> None: + expected_idxs = [(0, 0), (2, 2)] + expected_scores = [1.0, 0.93419873] + actual_idxs, actual_scores = cosine_similarity_top_k( + X, Y, top_k=None, score_threshold=0.9 + ) + assert actual_idxs == expected_idxs + assert np.allclose(expected_scores, actual_scores) + + +def test_cosine_similarity_top_k_and_score_threshold( + X: List[List[float]], Y: List[List[float]] +) -> None: + expected_idxs = [(0, 0), (2, 2), (1, 2), (0, 2)] + expected_scores = [1.0, 0.93419873, 0.87038828, 0.83743579] + actual_idxs, actual_scores = cosine_similarity_top_k(X, Y, score_threshold=0.8) + assert actual_idxs == expected_idxs + assert np.allclose(expected_scores, actual_scores)