From 0ce7858529fb245f863583728a12224cdd16e40a Mon Sep 17 00:00:00 2001 From: Ashley Xu <139821907+ashleyxuu@users.noreply.github.com> Date: Tue, 2 Jan 2024 15:57:14 -0800 Subject: [PATCH] feat: add Google BigQueryVectorSearch in vectorstore (#14829) BigQuery vector search lets you use GoogleSQL to do semantic search, using vector indexes for fast but approximate results, or using brute force for exact results. This PR integrates LangChain vectorstore with BigQuery Vector Search. --------- Co-authored-by: Vlad Kolesnikov --- docs/docs/integrations/platforms/google.mdx | 22 + .../vectorstores/bigquery_vector_search.ipynb | 353 ++++++++ .../vectorstores/__init__.py | 10 + .../vectorstores/bigquery_vector_search.py | 835 ++++++++++++++++++ .../test_bigquery_vector_search.py | 102 +++ 5 files changed, 1322 insertions(+) create mode 100644 docs/docs/integrations/vectorstores/bigquery_vector_search.ipynb create mode 100644 libs/community/langchain_community/vectorstores/bigquery_vector_search.py create mode 100644 libs/community/tests/integration_tests/vectorstores/test_bigquery_vector_search.py diff --git a/docs/docs/integrations/platforms/google.mdx b/docs/docs/integrations/platforms/google.mdx index b3f4d01659..595f132614 100644 --- a/docs/docs/integrations/platforms/google.mdx +++ b/docs/docs/integrations/platforms/google.mdx @@ -202,6 +202,28 @@ See a [usage example](/docs/integrations/vectorstores/matchingengine). from langchain_community.vectorstores import MatchingEngine ``` +### Google BigQuery Vector Search + +> [Google BigQuery](https://cloud.google.com/bigquery), +> BigQuery is a serverless and cost-effective enterprise data warehouse in Google Cloud. +> +> Google BigQuery Vector Search +> BigQuery vector search lets you use GoogleSQL to do semantic search, using vector indexes for fast but approximate results, or using brute force for exact results. + +> It can calculate Euclidean or Cosine distance. With LangChain, we default to use Euclidean distance. + +We need to install several python packages. + +```bash +pip install google-cloud-bigquery +``` + +See a [usage example](/docs/integrations/vectorstores/bigquery_vector_search). + +```python +from langchain.vectorstores import BigQueryVectorSearch +``` + ### Google ScaNN >[Google ScaNN](https://github.com/google-research/google-research/tree/master/scann) diff --git a/docs/docs/integrations/vectorstores/bigquery_vector_search.ipynb b/docs/docs/integrations/vectorstores/bigquery_vector_search.ipynb new file mode 100644 index 0000000000..236d23529a --- /dev/null +++ b/docs/docs/integrations/vectorstores/bigquery_vector_search.ipynb @@ -0,0 +1,353 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "E_RJy7C1bpCT" + }, + "source": [ + "# BigQuery Vector Search\n", + "> **BigQueryVectorSearch**:\n", + "BigQuery vector search lets you use GoogleSQL to do semantic search, using vector indexes for fast approximate results, or using brute force for exact results.\n", + "\n", + "\n", + "This tutorial illustrates how to work with an end-to-end data and embedding management system in LangChain, and provide scalable semantic search in BigQuery." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "EmPJkpOCckyh" + }, + "source": [ + "## Getting started\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "IR54BmgvdHT_" + }, + "source": [ + "### Install the library" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 1000 + }, + "id": "0ZITIDE160OD", + "outputId": "e184bc0d-6541-4e0a-82d2-1e216db00a2d" + }, + "outputs": [], + "source": [ + "! pip install langchain google-cloud-aiplatform google-cloud-bigquery --upgrade --user" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "v40bB_GMcr9f" + }, + "source": [ + "**Colab only:** Uncomment the following cell to restart the kernel or use the button to restart the kernel. For Vertex AI Workbench you can restart the terminal using the button on top." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "6o0iGVIdDD6K" + }, + "outputs": [], + "source": [ + "# # Automatically restart kernel after installs so that your environment can access the new packages\n", + "# import IPython\n", + "\n", + "# app = IPython.Application.instance()\n", + "# app.kernel.do_shutdown(True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Before you begin" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Set your project ID\n", + "\n", + "If you don't know your project ID, try the following:\n", + "* Run `gcloud config list`.\n", + "* Run `gcloud projects list`.\n", + "* See the support page: [Locate the project ID](https://support.google.com/googleapi/answer/7014113)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# @title Project { display-mode: \"form\" }\n", + "PROJECT_ID = \"\" # @param {type:\"string\"}\n", + "\n", + "# Set the project id\n", + "! gcloud config set project {PROJECT_ID}" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Set the region\n", + "\n", + "You can also change the `REGION` variable used by BigQuery. Learn more about [BigQuery regions](https://cloud.google.com/bigquery/docs/locations#supported_locations)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# @title Region { display-mode: \"form\" }\n", + "REGION = \"US\" # @param {type: \"string\"}" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Set the dataset and table names\n", + "\n", + "They will be your BigQuery Vector Store." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# @title Dataset and Table { display-mode: \"form\" }\n", + "DATASET = \"my_langchain_dataset\" # @param {type: \"string\"}\n", + "TABLE = \"doc_and_vectors\" # @param {type: \"string\"}" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Authenticating your notebook environment\n", + "\n", + "- If you are using **Colab** to run this notebook, uncomment the cell below and continue.\n", + "- If you are using **Vertex AI Workbench**, check out the setup instructions [here](https://github.com/GoogleCloudPlatform/generative-ai/tree/main/setup-env)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from google.colab import auth as google_auth\n", + "\n", + "google_auth.authenticate_user()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "AD3yG49BdLlr" + }, + "source": [ + "## Demo: BigQueryVectorSearch" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create an embedding class instance\n", + "\n", + "You may need to enable Vertex AI API in your project by running\n", + "`gcloud services enable aiplatform.googleapis.com --project {PROJECT_ID}`\n", + "(replace `{PROJECT_ID}` with the name of your project).\n", + "\n", + "You can use any [LangChain embeddings model](https://python.langchain.com/docs/integrations/text_embedding/)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "Vb2RJocV9_LQ", + "outputId": "37f5dc74-2512-47b2-c135-f34c10afdcf4" + }, + "outputs": [], + "source": [ + "from langchain_community.embeddings import VertexAIEmbeddings\n", + "\n", + "embedding = VertexAIEmbeddings(\n", + " model_name=\"textembedding-gecko@latest\", project=PROJECT_ID\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create BigQuery Dataset\n", + "\n", + "Optional step to create the dataset if it doesn't exist." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from google.cloud import bigquery\n", + "\n", + "client = bigquery.Client(project=PROJECT_ID, location=REGION)\n", + "client.create_dataset(dataset=DATASET, exists_ok=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Initialize BigQueryVectorSearch Vector Store with an existing BigQuery dataset" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from langchain.vectorstores.utils import DistanceStrategy\n", + "from langchain_community.vectorstores import BigQueryVectorSearch\n", + "\n", + "store = BigQueryVectorSearch(\n", + " project_id=PROJECT_ID,\n", + " dataset_name=DATASET,\n", + " table_name=TABLE,\n", + " location=REGION,\n", + " embedding=embedding,\n", + " distance_strategy=DistanceStrategy.EUCLIDEAN_DISTANCE,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Add texts" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "all_texts = [\"Apples and oranges\", \"Cars and airplanes\", \"Pineapple\", \"Train\", \"Banana\"]\n", + "metadatas = [{\"len\": len(t)} for t in all_texts]\n", + "\n", + "store.add_texts(all_texts, metadatas=metadatas)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Search for documents" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "query = \"I'd like a fruit.\"\n", + "docs = store.similarity_search(query)\n", + "print(docs)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Search for documents by vector" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "query_vector = embedding.embed_query(query)\n", + "docs = store.similarity_search_by_vector(query_vector, k=2)\n", + "print(docs)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Search for documents with metadata filter" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# This should only return \"Banana\" document.\n", + "docs = store.similarity_search_by_vector(query_vector, filter={\"len\": 6})\n", + "print(docs)" + ] + } + ], + "metadata": { + "colab": { + "provenance": [], + "toc_visible": true + }, + "kernelspec": { + "display_name": "Python 3", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.0" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/libs/community/langchain_community/vectorstores/__init__.py b/libs/community/langchain_community/vectorstores/__init__.py index 6d490362fa..0ddaafed85 100644 --- a/libs/community/langchain_community/vectorstores/__init__.py +++ b/libs/community/langchain_community/vectorstores/__init__.py @@ -104,6 +104,14 @@ def _import_baiducloud_vector_search() -> Any: return BESVectorStore +def _import_bigquery() -> Any: + from langchain_community.vectorstores.bigquery_vector_search import ( + BigQueryVectorSearch, + ) + + return BigQueryVectorSearch + + def _import_cassandra() -> Any: from langchain_community.vectorstores.cassandra import Cassandra @@ -473,6 +481,8 @@ def __getattr__(name: str) -> Any: return _import_azuresearch() elif name == "Bagel": return _import_bageldb() + elif name == "BigQueryVectorSearch": + return _import_bigquery() elif name == "BESVectorStore": return _import_baiducloud_vector_search() elif name == "Cassandra": diff --git a/libs/community/langchain_community/vectorstores/bigquery_vector_search.py b/libs/community/langchain_community/vectorstores/bigquery_vector_search.py new file mode 100644 index 0000000000..d132e7e071 --- /dev/null +++ b/libs/community/langchain_community/vectorstores/bigquery_vector_search.py @@ -0,0 +1,835 @@ +"""Vector Store in Google Cloud BigQuery.""" +from __future__ import annotations + +import asyncio +import json +import logging +import sys +import uuid +from datetime import datetime +from functools import partial +from threading import Lock, Thread +from typing import Any, Callable, Dict, List, Optional, Tuple, Type + +import numpy as np +from langchain_core.documents import Document +from langchain_core.embeddings import Embeddings +from langchain_core.vectorstores import VectorStore + +from langchain_community.vectorstores.utils import ( + DistanceStrategy, + maximal_marginal_relevance, +) + +DEFAULT_DISTANCE_STRATEGY = DistanceStrategy.EUCLIDEAN_DISTANCE +DEFAULT_DOC_ID_COLUMN_NAME = "doc_id" # document id +DEFAULT_TEXT_EMBEDDING_COLUMN_NAME = "text_embedding" # embeddings vectors +DEFAULT_METADATA_COLUMN_NAME = "metadata" # document metadata +DEFAULT_CONTENT_COLUMN_NAME = "content" # text content, do not rename +DEFAULT_TOP_K = 4 # default number of documents returned from similarity search + +_INDEX_CHECK_PERIOD_SECONDS = 60 # Do not check for index more often that this. + +_vector_table_lock = Lock() # process-wide BigQueryVectorSearch table lock + + +class BigQueryVectorSearch(VectorStore): + """Google Cloud BigQuery vector store. + + To use, you need the following packages installed: + google-cloud-bigquery + """ + + def __init__( + self, + embedding: Embeddings, + project_id: str, + dataset_name: str, + table_name: str, + location: str = "US", + content_field: str = DEFAULT_CONTENT_COLUMN_NAME, + metadata_field: str = DEFAULT_METADATA_COLUMN_NAME, + text_embedding_field: str = DEFAULT_TEXT_EMBEDDING_COLUMN_NAME, + doc_id_field: str = DEFAULT_DOC_ID_COLUMN_NAME, + distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, + credentials: Optional[Any] = None, + ): + """Constructor for BigQueryVectorSearch. + + Args: + embedding (Embeddings): Text Embedding model to use. + project_id (str): GCP project. + dataset_name (str): BigQuery dataset to store documents and embeddings. + table_name (str): BigQuery table name. + location (str, optional): BigQuery region. Defaults to + `US`(multi-region). + content_field (str): Specifies the column to store the content. + Defaults to `content`. + metadata_field (str): Specifies the column to store the metadata. + Defaults to `metadata`. + text_embedding_field (str): Specifies the column to store + the embeddings vector. + Defaults to `text_embedding`. + doc_id_field (str): Specifies the column to store the document id. + Defaults to `doc_id`. + distance_strategy (DistanceStrategy, optional): + Determines the strategy employed for calculating + the distance between vectors in the embedding space. + Defaults to EUCLIDEAN_DISTANCE. + Available options are: + - COSINE: Measures the similarity between two vectors of an inner + product space. + - EUCLIDEAN_DISTANCE: Computes the Euclidean distance between + two vectors. This metric considers the geometric distance in + the vector space, and might be more suitable for embeddings + that rely on spatial relationships. This is the default behavior + credentials (Credentials, optional): Custom Google Cloud credentials + to use. Defaults to None. + """ + try: + from google.cloud import bigquery + + self.bq_client = bigquery.Client( + project=project_id, location=location, credentials=credentials + ) + except ModuleNotFoundError: + raise ImportError( + "Please, install or upgrade the google-cloud-bigquery library: " + "pip install google-cloud-bigquery" + ) + self._logger = logging.getLogger(__name__) + self._creating_index = False + self._have_index = False + self.embedding_model = embedding + self.project_id = project_id + self.dataset_name = dataset_name + self.table_name = table_name + self.location = location + self.content_field = content_field + self.metadata_field = metadata_field + self.text_embedding_field = text_embedding_field + self.doc_id_field = doc_id_field + self.distance_strategy = distance_strategy + self._full_table_id = ( + f"{self.project_id}." f"{self.dataset_name}." f"{self.table_name}" + ) + self._logger.debug("Using table `%s`", self.full_table_id) + with _vector_table_lock: + self.vectors_table = self._initialize_table() + self._last_index_check = datetime.min + self._initialize_vector_index() + + def _initialize_table(self) -> Any: + """Validates or creates the BigQuery table.""" + from google.cloud import bigquery + + table_ref = bigquery.TableReference.from_string(self._full_table_id) + table = self.bq_client.create_table(table_ref, exists_ok=True) + changed_schema = False + schema = table.schema.copy() + columns = {c.name: c for c in schema} + if self.doc_id_field not in columns: + changed_schema = True + schema.append( + bigquery.SchemaField(name=self.doc_id_field, field_type="STRING") + ) + elif ( + columns[self.doc_id_field].field_type != "STRING" + or columns[self.doc_id_field].mode == "REPEATED" + ): + raise ValueError(f"Column {self.doc_id_field} must be of " "STRING type") + if self.metadata_field not in columns: + changed_schema = True + schema.append( + bigquery.SchemaField(name=self.metadata_field, field_type="JSON") + ) + elif ( + columns[self.metadata_field].field_type not in ["JSON", "STRING"] + or columns[self.metadata_field].mode == "REPEATED" + ): + raise ValueError( + f"Column {self.metadata_field} must be of STRING or JSON type" + ) + if self.content_field not in columns: + changed_schema = True + schema.append( + bigquery.SchemaField(name=self.content_field, field_type="STRING") + ) + elif ( + columns[self.content_field].field_type != "STRING" + or columns[self.content_field].mode == "REPEATED" + ): + raise ValueError(f"Column {self.content_field} must be of " "STRING type") + if self.text_embedding_field not in columns: + changed_schema = True + schema.append( + bigquery.SchemaField( + name=self.text_embedding_field, + field_type="FLOAT64", + mode="REPEATED", + ) + ) + elif ( + columns[self.text_embedding_field].field_type not in ("FLOAT", "FLOAT64") + or columns[self.text_embedding_field].mode != "REPEATED" + ): + raise ValueError( + f"Column {self.text_embedding_field} must be of " "ARRAY type" + ) + if changed_schema: + self._logger.debug("Updated table `%s` schema.", self.full_table_id) + table.schema = schema + table = self.bq_client.update_table(table, fields=["schema"]) + return table + + def _initialize_vector_index(self) -> Any: + """ + A vector index in BigQuery table enables efficient + approximate vector search. + """ + from google.cloud import bigquery + + if self._have_index or self._creating_index: + # Already have an index or in the process of creating one. + return + if ( + datetime.utcnow() - self._last_index_check + ).total_seconds() < _INDEX_CHECK_PERIOD_SECONDS: + return + with _vector_table_lock: + if self._creating_index or self._have_index: + return + self._last_index_check = datetime.utcnow() + # Check if index exists, create if necessary + check_query = ( + f"SELECT 1 FROM `{self.project_id}.{self.dataset_name}" + ".INFORMATION_SCHEMA.VECTOR_INDEXES` WHERE" + f" table_name = '{self.table_name}'" + ) + job = self.bq_client.query( + check_query, api_method=bigquery.enums.QueryApiMethod.QUERY + ) + if job.result().total_rows == 0: + # Need to create an index. Make it in a separate thread. + self._create_index_in_background() + else: + self._logger.debug("Vector index already exists.") + self._have_index = True + + def _create_index_in_background(self): + if self._have_index or self._creating_index: + # Already have an index or in the process of creating one. + return + self._creating_index = True + self._logger.debug("Trying to create a vector index.") + thread = Thread(target=self._create_index, daemon=True) + thread.start() + + def _create_index(self): + from google.api_core.exceptions import ClientError + + if self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE: + distance_type = "EUCLIDEAN" + elif self.distance_strategy == DistanceStrategy.COSINE: + distance_type = "COSINE" + # Default to EUCLIDEAN_DISTANCE + else: + distance_type = "EUCLIDEAN" + index_name = f"{self.table_name}_langchain_index" + try: + sql = f""" + CREATE VECTOR INDEX IF NOT EXISTS + `{index_name}` + ON `{self.full_table_id}`({self.text_embedding_field}) + OPTIONS(distance_type="{distance_type}", index_type="IVF") + """ + self.bq_client.query(sql).result() + self._have_index = True + except ClientError as ex: + self._logger.debug("Vector index creation failed (%s).", ex.args[0]) + finally: + self._creating_index = False + + def _persist(self, data: Dict[str, Any]) -> None: + """Saves documents and embeddings to BigQuery.""" + from google.cloud import bigquery + + data_len = len(data[list(data.keys())[0]]) + if data_len == 0: + return + + list_of_dicts = [dict(zip(data, t)) for t in zip(*data.values())] + + job_config = bigquery.LoadJobConfig() + job_config.schema = self.vectors_table.schema + job_config.schema_update_options = ( + bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION + ) + job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND + job = self.bq_client.load_table_from_json( + list_of_dicts, self.vectors_table, job_config=job_config + ) + job.result() + + @property + def embeddings(self) -> Optional[Embeddings]: + return self.embedding_model + + @property + def full_table_id(self) -> str: + return self._full_table_id + + def add_texts( + self, + texts: List[str], + metadatas: Optional[List[dict]] = None, + **kwargs: Any, + ) -> List[str]: + """Run more texts through the embeddings and add to the vectorstore. + + Args: + texts: List of strings to add to the vectorstore. + metadatas: Optional list of metadata associated with the texts. + + Returns: + List of ids from adding the texts into the vectorstore. + """ + embs = self.embedding_model.embed_documents(texts) + return self.add_texts_with_embeddings(texts, embs, metadatas, **kwargs) + + def add_texts_with_embeddings( + self, + texts: List[str], + embs: List[List[float]], + metadatas: Optional[List[dict]] = None, + **kwargs: Any, + ) -> List[str]: + """Run more texts through the embeddings and add to the vectorstore. + + Args: + texts: List of strings to add to the vectorstore. + embs: List of lists of floats with text embeddings for texts. + metadatas: Optional list of metadata associated with the texts. + + Returns: + List of ids from adding the texts into the vectorstore. + """ + ids = [uuid.uuid4().hex for _ in texts] + values_dict: Dict[str, List[Any]] = { + self.content_field: texts, + self.doc_id_field: ids, + } + if not metadatas: + metadatas = [] + len_diff = len(ids) - len(metadatas) + add_meta = [None for _ in range(0, len_diff)] + metadatas = [m if m is not None else {} for m in metadatas + add_meta] + values_dict[self.metadata_field] = metadatas + values_dict[self.text_embedding_field] = embs + self._persist(values_dict) + return ids + + def get_documents( + self, ids: Optional[List[str]] = None, filter: Optional[Dict[str, Any]] = None + ) -> List[Document]: + """Search documents by their ids or metadata values. + + Args: + ids: List of ids of documents to retrieve from the vectorstore. + filter: Filter on metadata properties, e.g. + { + "str_property": "foo", + "int_property": 123 + } + Returns: + List of ids from adding the texts into the vectorstore. + """ + if ids and len(ids) > 0: + from google.cloud import bigquery + + job_config = bigquery.QueryJobConfig( + query_parameters=[ + bigquery.ArrayQueryParameter("ids", "STRING", ids), + ] + ) + id_expr = f"{self.doc_id_field} IN UNNEST(@ids)" + else: + job_config = None + id_expr = "TRUE" + if filter: + filter_expressions = [] + for i in filter.items(): + if isinstance(i[1], float): + expr = ( + "ABS(CAST(JSON_VALUE(" + f"`{self.metadata_field}`,'$.{i[0]}') " + f"AS FLOAT64) - {i[1]}) " + f"<= {sys.float_info.epsilon}" + ) + else: + val = str(i[1]).replace('"', '\\"') + expr = ( + f"JSON_VALUE(`{self.metadata_field}`,'$.{i[0]}')" f' = "{val}"' + ) + filter_expressions.append(expr) + filter_expression_str = " AND ".join(filter_expressions) + where_filter_expr = f" AND ({filter_expression_str})" + else: + where_filter_expr = "" + + job = self.bq_client.query( + f""" + SELECT * FROM `{self.full_table_id}` WHERE {id_expr} + {where_filter_expr} + """, + job_config=job_config, + ) + docs: List[Document] = [] + for row in job: + metadata = None + if self.metadata_field: + metadata = row[self.metadata_field] + if metadata: + metadata = json.loads(metadata) + else: + metadata = {} + metadata["__id"] = row[self.doc_id_field] + doc = Document(page_content=row[self.content_field], metadata=metadata) + docs.append(doc) + return docs + + def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]: + """Delete by vector ID or other criteria. + + Args: + ids: List of ids to delete. + **kwargs: Other keyword arguments that subclasses might use. + + Returns: + Optional[bool]: True if deletion is successful, + False otherwise, None if not implemented. + """ + if not ids or len(ids) == 0: + return True + from google.cloud import bigquery + + job_config = bigquery.QueryJobConfig( + query_parameters=[ + bigquery.ArrayQueryParameter("ids", "STRING", ids), + ] + ) + self.bq_client.query( + f""" + DELETE FROM `{self.full_table_id}` WHERE {self.doc_id_field} + IN UNNEST(@ids) + """, + job_config=job_config, + ).result() + return True + + async def adelete( + self, ids: Optional[List[str]] = None, **kwargs: Any + ) -> Optional[bool]: + """Delete by vector ID or other criteria. + + Args: + ids: List of ids to delete. + **kwargs: Other keyword arguments that subclasses might use. + + Returns: + Optional[bool]: True if deletion is successful, + False otherwise, None if not implemented. + """ + return await asyncio.get_running_loop().run_in_executor( + None, partial(self.delete, **kwargs), ids + ) + + def _search_with_score_and_embeddings_by_vector( + self, + embedding: List[float], + k: int = DEFAULT_TOP_K, + filter: Optional[Dict[str, Any]] = None, + brute_force: bool = False, + fraction_lists_to_search: Optional[float] = None, + ) -> List[Tuple[Document, List[float], float]]: + from google.cloud import bigquery + + # Create an index if no index exists. + if not self._have_index and not self._creating_index: + self._initialize_vector_index() + # Prepare filter + filter_expr = "TRUE" + if filter: + filter_expressions = [] + for i in filter.items(): + if isinstance(i[1], float): + expr = ( + "ABS(CAST(JSON_VALUE(" + f"base.`{self.metadata_field}`,'$.{i[0]}') " + f"AS FLOAT64) - {i[1]}) " + f"<= {sys.float_info.epsilon}" + ) + else: + val = str(i[1]).replace('"', '\\"') + expr = ( + f"JSON_VALUE(base.`{self.metadata_field}`,'$.{i[0]}')" + f' = "{val}"' + ) + filter_expressions.append(expr) + filter_expression_str = " AND ".join(filter_expressions) + filter_expr += f" AND ({filter_expression_str})" + # Configure and run a query job. + job_config = bigquery.QueryJobConfig( + query_parameters=[ + bigquery.ArrayQueryParameter("v", "FLOAT64", embedding), + ], + use_query_cache=False, + priority=bigquery.QueryPriority.BATCH, + ) + if self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE: + distance_type = "EUCLIDEAN" + elif self.distance_strategy == DistanceStrategy.COSINE: + distance_type = "COSINE" + # Default to EUCLIDEAN_DISTANCE + else: + distance_type = "EUCLIDEAN" + if brute_force: + options_string = ",options => '{\"use_brute_force\":true}'" + elif fraction_lists_to_search: + if fraction_lists_to_search == 0 or fraction_lists_to_search >= 1.0: + raise ValueError( + "`fraction_lists_to_search` must be between " "0.0 and 1.0" + ) + options_string = ( + ',options => \'{"fraction_lists_to_search":' + f"{fraction_lists_to_search}}}'" + ) + else: + options_string = "" + query = f""" + SELECT + base.*, + distance AS _vector_search_distance + FROM VECTOR_SEARCH( + TABLE `{self.full_table_id}`, + "{self.text_embedding_field}", + (SELECT @v AS {self.text_embedding_field}), + distance_type => "{distance_type}", + top_k => {k} + {options_string} + ) + WHERE {filter_expr} + LIMIT {k} + """ + document_tuples: List[Tuple[Document, List[float], float]] = [] + # TODO(vladkol): Use jobCreationMode=JOB_CREATION_OPTIONAL when available. + job = self.bq_client.query( + query, job_config=job_config, api_method=bigquery.enums.QueryApiMethod.QUERY + ) + # Process job results. + for row in job: + metadata = row[self.metadata_field] + if metadata: + metadata = json.loads(metadata) + else: + metadata = {} + metadata["__id"] = row[self.doc_id_field] + doc = Document(page_content=row[self.content_field], metadata=metadata) + document_tuples.append( + (doc, row[self.text_embedding_field], row["_vector_search_distance"]) + ) + return document_tuples + + def similarity_search_with_score_by_vector( + self, + embedding: List[float], + k: int = DEFAULT_TOP_K, + filter: Optional[Dict[str, Any]] = None, + brute_force: bool = False, + fraction_lists_to_search: Optional[float] = None, + **kwargs: Any, + ) -> List[Tuple[Document, float]]: + """Return docs most similar to embedding vector. + + Args: + embedding: Embedding to look up documents similar to. + k: Number of Documents to return. Defaults to 4. + filter: Filter on metadata properties, e.g. + { + "str_property": "foo", + "int_property": 123 + } + brute_force: Whether to use brute force search. Defaults to False. + fraction_lists_to_search: Optional percentage of lists to search, + must be in range 0.0 and 1.0, exclusive. + If Node, uses service's default which is 0.05. + + Returns: + List of Documents most similar to the query vector with distance. + """ + del kwargs + document_tuples = self._search_with_score_and_embeddings_by_vector( + embedding, k, filter, brute_force, fraction_lists_to_search + ) + return [(doc, distance) for doc, _, distance in document_tuples] + + def similarity_search_by_vector( + self, + embedding: List[float], + k: int = DEFAULT_TOP_K, + filter: Optional[Dict[str, Any]] = None, + brute_force: bool = False, + fraction_lists_to_search: Optional[float] = None, + **kwargs: Any, + ) -> List[Document]: + """Return docs most similar to embedding vector. + + Args: + embedding: Embedding to look up documents similar to. + k: Number of Documents to return. Defaults to 4. + filter: Filter on metadata properties, e.g. + { + "str_property": "foo", + "int_property": 123 + } + brute_force: Whether to use brute force search. Defaults to False. + fraction_lists_to_search: Optional percentage of lists to search, + must be in range 0.0 and 1.0, exclusive. + If Node, uses service's default which is 0.05. + + Returns: + List of Documents most similar to the query vector. + """ + tuples = self.similarity_search_with_score_by_vector( + embedding, k, filter, brute_force, fraction_lists_to_search, **kwargs + ) + return [i[0] for i in tuples] + + def similarity_search_with_score( + self, + query: str, + k: int = DEFAULT_TOP_K, + filter: Optional[Dict[str, Any]] = None, + brute_force: bool = False, + fraction_lists_to_search: Optional[float] = None, + **kwargs: Any, + ) -> List[Tuple[Document, float]]: + """Run similarity search with score. + + Args: + query: search query text. + k: Number of Documents to return. Defaults to 4. + filter: Filter on metadata properties, e.g. + { + "str_property": "foo", + "int_property": 123 + } + brute_force: Whether to use brute force search. Defaults to False. + fraction_lists_to_search: Optional percentage of lists to search, + must be in range 0.0 and 1.0, exclusive. + If Node, uses service's default which is 0.05. + + Returns: + List of Documents most similar to the query vector, with similarity scores. + """ + emb = self.embedding_model.embed_query(query) # type: ignore + return self.similarity_search_with_score_by_vector( + emb, k, filter, brute_force, fraction_lists_to_search, **kwargs + ) + + def similarity_search( + self, + query: str, + k: int = DEFAULT_TOP_K, + filter: Optional[Dict[str, Any]] = None, + brute_force: bool = False, + fraction_lists_to_search: Optional[float] = None, + **kwargs: Any, + ) -> List[Document]: + """Run similarity search. + + Args: + query: search query text. + k: Number of Documents to return. Defaults to 4. + filter: Filter on metadata properties, e.g. + { + "str_property": "foo", + "int_property": 123 + } + brute_force: Whether to use brute force search. Defaults to False. + fraction_lists_to_search: Optional percentage of lists to search, + must be in range 0.0 and 1.0, exclusive. + If Node, uses service's default which is 0.05. + + Returns: + List of Documents most similar to the query vector. + """ + tuples = self.similarity_search_with_score( + query, k, filter, brute_force, fraction_lists_to_search, **kwargs + ) + return [i[0] for i in tuples] + + def _select_relevance_score_fn(self) -> Callable[[float], float]: + if self.distance_strategy == DistanceStrategy.COSINE: + return BigQueryVectorSearch._cosine_relevance_score_fn + else: + raise ValueError( + "Relevance score is not supported " + f"for `{self.distance_strategy}` distance." + ) + + def max_marginal_relevance_search( + self, + query: str, + k: int = DEFAULT_TOP_K, + fetch_k: int = DEFAULT_TOP_K * 5, + lambda_mult: float = 0.5, + filter: Optional[Dict[str, Any]] = None, + brute_force: bool = False, + fraction_lists_to_search: Optional[float] = None, + **kwargs: Any, + ) -> List[Document]: + """Return docs selected using the maximal marginal relevance. + + Maximal marginal relevance optimizes for similarity to query AND diversity + among selected documents. + + Args: + query: search query text. + k: Number of Documents to return. Defaults to 4. + fetch_k: Number of Documents to fetch to pass to MMR algorithm. + lambda_mult: Number between 0 and 1 that determines the degree + of diversity among the results with 0 corresponding + to maximum diversity and 1 to minimum diversity. + Defaults to 0.5. + filter: Filter on metadata properties, e.g. + { + "str_property": "foo", + "int_property": 123 + } + brute_force: Whether to use brute force search. Defaults to False. + fraction_lists_to_search: Optional percentage of lists to search, + must be in range 0.0 and 1.0, exclusive. + If Node, uses service's default which is 0.05. + Returns: + List of Documents selected by maximal marginal relevance. + """ + query_embedding = self.embedding_model.embed_query( # type: ignore + query + ) + doc_tuples = self._search_with_score_and_embeddings_by_vector( + query_embedding, fetch_k, filter, brute_force, fraction_lists_to_search + ) + doc_embeddings = [d[1] for d in doc_tuples] + mmr_doc_indexes = maximal_marginal_relevance( + np.array(query_embedding), doc_embeddings, lambda_mult=lambda_mult, k=k + ) + return [doc_tuples[i][0] for i in mmr_doc_indexes] + + def max_marginal_relevance_search_by_vector( + self, + embedding: List[float], + k: int = DEFAULT_TOP_K, + fetch_k: int = DEFAULT_TOP_K * 5, + lambda_mult: float = 0.5, + filter: Optional[Dict[str, Any]] = None, + brute_force: bool = False, + fraction_lists_to_search: Optional[float] = None, + **kwargs: Any, + ) -> List[Document]: + """Return docs selected using the maximal marginal relevance. + + Maximal marginal relevance optimizes for similarity to query AND diversity + among selected documents. + + Args: + embedding: Embedding to look up documents similar to. + k: Number of Documents to return. Defaults to 4. + fetch_k: Number of Documents to fetch to pass to MMR algorithm. + lambda_mult: Number between 0 and 1 that determines the degree + of diversity among the results with 0 corresponding + to maximum diversity and 1 to minimum diversity. + Defaults to 0.5. + filter: Filter on metadata properties, e.g. + { + "str_property": "foo", + "int_property": 123 + } + brute_force: Whether to use brute force search. Defaults to False. + fraction_lists_to_search: Optional percentage of lists to search, + must be in range 0.0 and 1.0, exclusive. + If Node, uses service's default which is 0.05. + Returns: + List of Documents selected by maximal marginal relevance. + """ + doc_tuples = self._search_with_score_and_embeddings_by_vector( + embedding, fetch_k, filter, brute_force, fraction_lists_to_search + ) + doc_embeddings = [d[1] for d in doc_tuples] + mmr_doc_indexes = maximal_marginal_relevance( + np.array(embedding), doc_embeddings, lambda_mult=lambda_mult, k=k + ) + return [doc_tuples[i][0] for i in mmr_doc_indexes] + + async def amax_marginal_relevance_search( + self, + query: str, + k: int = DEFAULT_TOP_K, + fetch_k: int = DEFAULT_TOP_K * 5, + lambda_mult: float = 0.5, + filter: Optional[Dict[str, Any]] = None, + brute_force: bool = False, + fraction_lists_to_search: Optional[float] = None, + **kwargs: Any, + ) -> List[Document]: + """Return docs selected using the maximal marginal relevance.""" + + func = partial( + self.max_marginal_relevance_search, + query, + k=k, + fetch_k=fetch_k, + lambda_mult=lambda_mult, + filter=filter, + brute_force=brute_force, + fraction_lists_to_search=fraction_lists_to_search, + **kwargs, + ) + return await asyncio.get_event_loop().run_in_executor(None, func) + + async def amax_marginal_relevance_search_by_vector( + self, + embedding: List[float], + k: int = DEFAULT_TOP_K, + fetch_k: int = DEFAULT_TOP_K * 5, + lambda_mult: float = 0.5, + filter: Optional[Dict[str, Any]] = None, + brute_force: bool = False, + fraction_lists_to_search: Optional[float] = None, + **kwargs: Any, + ) -> List[Document]: + """Return docs selected using the maximal marginal relevance.""" + return await asyncio.get_running_loop().run_in_executor( + None, + partial(self.max_marginal_relevance_search_by_vector, **kwargs), + embedding, + k, + fetch_k, + lambda_mult, + filter, + brute_force, + fraction_lists_to_search, + ) + + @classmethod + def from_texts( + cls: Type["BigQueryVectorSearch"], + texts: List[str], + embedding: Embeddings, + metadatas: Optional[List[dict]] = None, + **kwargs: Any, + ) -> "BigQueryVectorSearch": + """Return VectorStore initialized from texts and embeddings.""" + vs_obj = BigQueryVectorSearch(embedding=embedding, **kwargs) + vs_obj.add_texts(texts, metadatas) + return vs_obj diff --git a/libs/community/tests/integration_tests/vectorstores/test_bigquery_vector_search.py b/libs/community/tests/integration_tests/vectorstores/test_bigquery_vector_search.py new file mode 100644 index 0000000000..57da87d669 --- /dev/null +++ b/libs/community/tests/integration_tests/vectorstores/test_bigquery_vector_search.py @@ -0,0 +1,102 @@ +"""Test BigQuery Vector Search. +In order to run this test, you need to install Google Cloud BigQuery SDK +pip install google-cloud-bigquery +Your end-user credentials would be used to make the calls (make sure you've run +`gcloud auth login` first). +""" + +import os +import uuid + +import pytest + +from langchain_community.vectorstores.bigquery_vector_search import BigQueryVectorSearch +from tests.integration_tests.vectorstores.fake_embeddings import FakeEmbeddings + +TEST_TABLE_NAME = "langchain_test_table" + + +@pytest.fixture(scope="class") +def store(request: pytest.FixtureRequest) -> BigQueryVectorSearch: + """BigQueryVectorStore tests context. + + In order to run this test, you define PROJECT environment variable + with GCP project id. + + Example: + export PROJECT=... + """ + from google.cloud import bigquery + + bigquery.Client(location="US").create_dataset( + TestBigQueryVectorStore.dataset_name, exists_ok=True + ) + TestBigQueryVectorStore.store = BigQueryVectorSearch( + project_id=os.environ.get("PROJECT", None), + embedding=FakeEmbeddings(), + dataset_name=TestBigQueryVectorStore.dataset_name, + table_name=TEST_TABLE_NAME, + ) + TestBigQueryVectorStore.store.add_texts( + TestBigQueryVectorStore.texts, TestBigQueryVectorStore.metadatas + ) + + def teardown() -> None: + bigquery.Client(location="US").delete_dataset( + TestBigQueryVectorStore.dataset_name, + delete_contents=True, + not_found_ok=True, + ) + + request.addfinalizer(teardown) + return TestBigQueryVectorStore.store + + +class TestBigQueryVectorStore: + """BigQueryVectorStore tests class.""" + + dataset_name = uuid.uuid4().hex + store: BigQueryVectorSearch + texts = ["apple", "ice cream", "Saturn", "candy", "banana"] + metadatas = [ + { + "kind": "fruit", + }, + { + "kind": "treat", + }, + { + "kind": "planet", + }, + { + "kind": "treat", + }, + { + "kind": "fruit", + }, + ] + + def test_semantic_search(self, store: BigQueryVectorSearch) -> None: + """Test on semantic similarity.""" + docs = store.similarity_search("food", k=4) + print(docs) + kinds = [d.metadata["kind"] for d in docs] + assert "fruit" in kinds + assert "treat" in kinds + assert "planet" not in kinds + + def test_semantic_search_filter_fruits(self, store: BigQueryVectorSearch) -> None: + """Test on semantic similarity with metadata filter.""" + docs = store.similarity_search("food", filter={"kind": "fruit"}) + kinds = [d.metadata["kind"] for d in docs] + assert "fruit" in kinds + assert "treat" not in kinds + assert "planet" not in kinds + + def test_get_doc_by_filter(self, store: BigQueryVectorSearch) -> None: + """Test on document retrieval with metadata filter.""" + docs = store.get_documents(filter={"kind": "fruit"}) + kinds = [d.metadata["kind"] for d in docs] + assert "fruit" in kinds + assert "treat" not in kinds + assert "planet" not in kinds