Integrate Clickhouse as Vector Store (#5650)

<!--
Thank you for contributing to LangChain! Your PR will appear in our
release under the title you set. Please make sure it highlights your
valuable contribution.

Replace this with a description of the change, the issue it fixes (if
applicable), and relevant context. List any dependencies required for
this change.

After you're done, someone will review your PR. They may suggest
improvements. If no one reviews your PR within a few days, feel free to
@-mention the same people again, as notifications can get lost.

Finally, we'd love to show appreciation for your contribution - if you'd
like us to shout you out on Twitter, please also include your handle!
-->

#### Description

This PR is mainly to integrate open source version of ClickHouse as
Vector Store as it is easy for both local development and adoption of
LangChain for enterprises who already have large scale clickhouse
deployment.

ClickHouse is a open source real-time OLAP database with full SQL
support and a wide range of functions to assist users in writing
analytical queries. Some of these functions and data structures perform
distance operations between vectors, [enabling ClickHouse to be used as
a vector
database](https://clickhouse.com/blog/vector-search-clickhouse-p1).
Recently added ClickHouse capabilities like [Approximate Nearest
Neighbour (ANN)
indices](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/annindexes)
support faster approximate matching of vectors and provide a promising
development aimed to further enhance the vector matching capabilities of
ClickHouse.

In LangChain, some ClickHouse based commercial variant vector stores
like
[Chroma](https://github.com/hwchase17/langchain/blob/master/langchain/vectorstores/chroma.py)
and
[MyScale](https://github.com/hwchase17/langchain/blob/master/langchain/vectorstores/myscale.py),
etc are already integrated, but for some enterprises with large scale
Clickhouse clusters deployment, it will be more straightforward to
upgrade existing clickhouse infra instead of moving to another similar
vector store solution, so we believe it's a valid requirement to
integrate open source version of ClickHouse as vector store.

As `clickhouse-connect` is already included by other integrations, this
PR won't include any new dependencies.

#### Before submitting

<!-- If you're adding a new integration, please include:

1. Added a test for the integration:
https://github.com/haoch/langchain/blob/clickhouse/tests/integration_tests/vectorstores/test_clickhouse.py
2. Added an example notebook and document showing its use: 
* Notebook:
https://github.com/haoch/langchain/blob/clickhouse/docs/modules/indexes/vectorstores/examples/clickhouse.ipynb
* Doc:
https://github.com/haoch/langchain/blob/clickhouse/docs/integrations/clickhouse.md

See contribution guidelines for more information on how to write tests,
lint
etc:


https://github.com/hwchase17/langchain/blob/master/.github/CONTRIBUTING.md
-->

1. Added a test for the integration:
https://github.com/haoch/langchain/blob/clickhouse/tests/integration_tests/vectorstores/test_clickhouse.py
2. Added an example notebook and document showing its use: 
* Notebook:
https://github.com/haoch/langchain/blob/clickhouse/docs/modules/indexes/vectorstores/examples/clickhouse.ipynb
* Doc:
https://github.com/haoch/langchain/blob/clickhouse/docs/integrations/clickhouse.md


#### Who can review?

Tag maintainers/contributors who might be interested:

<!-- For a quicker response, figure out the right person to tag with @

  @hwchase17 - project lead

  Tracing / Callbacks
  - @agola11

  Async
  - @agola11

  DataLoaders
  - @eyurtsev

  Models
  - @hwchase17
  - @agola11

  Agents / Tools / Toolkits
  - @vowelparrot

  VectorStores / Retrievers / Memory
  - @dev2049

 -->
 
@hwchase17 @dev2049 Could you please help review?

---------

Co-authored-by: Dev 2049 <dev.dev2049@gmail.com>
pull/5753/head
Hao Chen 1 year ago committed by GitHub
parent 2f2d27fd82
commit a4c9053d40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,52 @@
# ClickHouse
This page covers how to use ClickHouse Vector Search within LangChain.
[ClickHouse](https://clickhouse.com) is a open source real-time OLAP database with full SQL support and a wide range of functions to assist users in writing analytical queries. Some of these functions and data structures perform distance operations between vectors, enabling ClickHouse to be used as a vector database.
Due to the fully parallelized query pipeline, ClickHouse can process vector search operations very quickly, especially when performing exact matching through a linear scan over all rows, delivering processing speed comparable to dedicated vector databases.
High compression levels, tunable through custom compression codecs, enable very large datasets to be stored and queried. ClickHouse is not memory-bound, allowing multi-TB datasets containing embeddings to be queried.
The capabilities for computing the distance between two vectors are just another SQL function and can be effectively combined with more traditional SQL filtering and aggregation capabilities. This allows vectors to be stored and queried alongside metadata, and even rich text, enabling a broad array of use cases and applications.
Finally, experimental ClickHouse capabilities like [Approximate Nearest Neighbour (ANN) indices](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/annindexes) support faster approximate matching of vectors and provide a promising development aimed to further enhance the vector matching capabilities of ClickHouse.
## Installation
- Install clickhouse server by [binary](https://clickhouse.com/docs/en/install) or [docker image](https://hub.docker.com/r/clickhouse/clickhouse-server/)
- Install the Python SDK with `pip install clickhouse-connect`
### Configure clickhouse vector index
Customize `ClickhouseSettings` object with parameters
```python
from langchain.vectorstores import ClickHouse, ClickhouseSettings
config = ClickhouseSettings(host="<clickhouse-server-host>", port=8123, ...)
index = Clickhouse(embedding_function, config)
index.add_documents(...)
```
## Wrappers
supported functions:
- `add_texts`
- `add_documents`
- `from_texts`
- `from_documents`
- `similarity_search`
- `asimilarity_search`
- `similarity_search_by_vector`
- `asimilarity_search_by_vector`
- `similarity_search_with_relevance_scores`
### VectorStore
There exists a wrapper around open source Clickhouse database, allowing you to use it as a vectorstore,
whether for semantic search or similar example retrieval.
To import this vectorstore:
```python
from langchain.vectorstores import Clickhouse
```
For a more detailed walkthrough of the MyScale wrapper, see [this notebook](../modules/indexes/vectorstores/examples/clickhouse.ipynb)

@ -0,0 +1,399 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "683953b3",
"metadata": {},
"source": [
"# ClickHouse Vector Search\n",
"\n",
"> [ClickHouse](https://clickhouse.com/) is the fastest and most resource efficient open-source database for real-time apps and analytics with full SQL support and a wide range of functions to assist users in writing analytical queries. Lately added data structures and distance search functions (like `L2Distance`) as well as [approximate nearest neighbor search indexes](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/annindexes) enable ClickHouse to be used as a high performance and scalable vector database to store and search vectors with SQL.\n",
"\n",
"This notebook shows how to use functionality related to the `ClickHouse` vector search."
]
},
{
"cell_type": "markdown",
"id": "43ead5d5-2c1f-4dce-a69a-cb00e4f9d6f0",
"metadata": {},
"source": [
"## Setting up envrionments"
]
},
{
"cell_type": "markdown",
"id": "b2c434bc",
"metadata": {},
"source": [
"Setting up local clickhouse server with docker (optional)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "249a7751",
"metadata": {
"ExecuteTime": {
"end_time": "2023-06-03T08:43:43.035606Z",
"start_time": "2023-06-03T08:43:42.618531Z"
}
},
"outputs": [],
"source": [
"! docker run -d -p 8123:8123 -p9000:9000 --name langchain-clickhouse-server --ulimit nofile=262144:262144 clickhouse/clickhouse-server:23.4.2.11"
]
},
{
"cell_type": "markdown",
"id": "7bd3c1c0",
"metadata": {},
"source": [
"Setup up clickhouse client driver"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9d614bf8",
"metadata": {},
"outputs": [],
"source": [
"!pip install clickhouse-connect"
]
},
{
"cell_type": "markdown",
"id": "15a1d477-9cdb-4d82-b019-96951ecb2b72",
"metadata": {},
"source": [
"We want to use OpenAIEmbeddings so we have to get the OpenAI API Key."
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "91003ea5-0c8c-436c-a5de-aaeaeef2f458",
"metadata": {
"ExecuteTime": {
"end_time": "2023-06-03T08:49:35.383673Z",
"start_time": "2023-06-03T08:49:33.984547Z"
}
},
"outputs": [],
"source": [
"import os\n",
"import getpass\n",
"\n",
"if not os.environ['OPENAI_API_KEY']:\n",
" os.environ['OPENAI_API_KEY'] = getpass.getpass('OpenAI API Key:')"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "aac9563e",
"metadata": {
"ExecuteTime": {
"end_time": "2023-06-03T08:33:31.554934Z",
"start_time": "2023-06-03T08:33:31.549590Z"
},
"tags": []
},
"outputs": [],
"source": [
"from langchain.embeddings.openai import OpenAIEmbeddings\n",
"from langchain.text_splitter import CharacterTextSplitter\n",
"from langchain.vectorstores import Clickhouse, ClickhouseSettings"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "a3c3999a",
"metadata": {
"ExecuteTime": {
"end_time": "2023-06-03T08:33:32.527387Z",
"start_time": "2023-06-03T08:33:32.501312Z"
},
"tags": []
},
"outputs": [],
"source": [
"from langchain.document_loaders import TextLoader\n",
"loader = TextLoader('../../../state_of_the_union.txt')\n",
"documents = loader.load()\n",
"text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)\n",
"docs = text_splitter.split_documents(documents)\n",
"\n",
"embeddings = OpenAIEmbeddings()"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "6e104aee",
"metadata": {
"ExecuteTime": {
"end_time": "2023-06-03T08:33:35.503823Z",
"start_time": "2023-06-03T08:33:33.745832Z"
}
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Inserting data...: 100%|██████████| 42/42 [00:00<00:00, 2801.49it/s]\n"
]
}
],
"source": [
"for d in docs:\n",
" d.metadata = {'some': 'metadata'}\n",
"settings = ClickhouseSettings(table=\"clickhouse_vector_search_example\")\n",
"docsearch = Clickhouse.from_documents(docs, embeddings, config=settings)\n",
"\n",
"query = \"What did the president say about Ketanji Brown Jackson\"\n",
"docs = docsearch.similarity_search(query)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "9c608226",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Tonight. I call on the Senate to: Pass the Freedom to Vote Act. Pass the John Lewis Voting Rights Act. And while youre at it, pass the Disclose Act so Americans can know who is funding our elections. \n",
"\n",
"Tonight, Id like to honor someone who has dedicated his life to serve this country: Justice Stephen Breyer—an Army veteran, Constitutional scholar, and retiring Justice of the United States Supreme Court. Justice Breyer, thank you for your service. \n",
"\n",
"One of the most serious constitutional responsibilities a President has is nominating someone to serve on the United States Supreme Court. \n",
"\n",
"And I did that 4 days ago, when I nominated Circuit Court of Appeals Judge Ketanji Brown Jackson. One of our nations top legal minds, who will continue Justice Breyers legacy of excellence.\n"
]
}
],
"source": [
"print(docs[0].page_content)"
]
},
{
"cell_type": "markdown",
"id": "e3a8b105",
"metadata": {},
"source": [
"## Get connection info and data schema"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "69996818",
"metadata": {
"ExecuteTime": {
"end_time": "2023-06-03T08:28:58.252991Z",
"start_time": "2023-06-03T08:28:58.197560Z"
},
"scrolled": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[92m\u001b[1mdefault.clickhouse_vector_search_example @ localhost:8123\u001b[0m\n",
"\n",
"\u001b[1musername: None\u001b[0m\n",
"\n",
"Table Schema:\n",
"---------------------------------------------------\n",
"|\u001b[94mid \u001b[0m|\u001b[96mNullable(String) \u001b[0m|\n",
"|\u001b[94mdocument \u001b[0m|\u001b[96mNullable(String) \u001b[0m|\n",
"|\u001b[94membedding \u001b[0m|\u001b[96mArray(Float32) \u001b[0m|\n",
"|\u001b[94mmetadata \u001b[0m|\u001b[96mObject('json') \u001b[0m|\n",
"|\u001b[94muuid \u001b[0m|\u001b[96mUUID \u001b[0m|\n",
"---------------------------------------------------\n",
"\n"
]
}
],
"source": [
"print(str(docsearch))"
]
},
{
"cell_type": "markdown",
"id": "324ac147",
"metadata": {},
"source": [
"### Clickhouse table schema"
]
},
{
"cell_type": "markdown",
"id": "b5bd7c5b",
"metadata": {},
"source": [
"> Clickhouse table will be automatically created if not exist by default. Advanced users could pre-create the table with optimized settings. For distributed Clickhouse cluster with sharding, table engine should be configured as `Distributed`."
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "54f4f561",
"metadata": {
"scrolled": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Clickhouse Table DDL:\n",
"\n",
"CREATE TABLE IF NOT EXISTS default.clickhouse_vector_search_example(\n",
" id Nullable(String),\n",
" document Nullable(String),\n",
" embedding Array(Float32),\n",
" metadata JSON,\n",
" uuid UUID DEFAULT generateUUIDv4(),\n",
" CONSTRAINT cons_vec_len CHECK length(embedding) = 1536,\n",
" INDEX vec_idx embedding TYPE annoy(100,'L2Distance') GRANULARITY 1000\n",
") ENGINE = MergeTree ORDER BY uuid SETTINGS index_granularity = 8192\n"
]
}
],
"source": [
"print(f\"Clickhouse Table DDL:\\n\\n{docsearch.schema}\")"
]
},
{
"cell_type": "markdown",
"id": "f59360c0",
"metadata": {},
"source": [
"## Filtering\n",
"\n",
"You can have direct access to ClickHouse SQL where statement. You can write `WHERE` clause following standard SQL.\n",
"\n",
"**NOTE**: Please be aware of SQL injection, this interface must not be directly called by end-user.\n",
"\n",
"If you custimized your `column_map` under your setting, you search with filter like this:"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "232055f6",
"metadata": {
"ExecuteTime": {
"end_time": "2023-06-03T08:29:36.680805Z",
"start_time": "2023-06-03T08:29:34.963676Z"
}
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Inserting data...: 100%|██████████| 42/42 [00:00<00:00, 6939.56it/s]\n"
]
}
],
"source": [
"from langchain.vectorstores import Clickhouse, ClickhouseSettings\n",
"from langchain.document_loaders import TextLoader\n",
"\n",
"loader = TextLoader('../../../state_of_the_union.txt')\n",
"documents = loader.load()\n",
"text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)\n",
"docs = text_splitter.split_documents(documents)\n",
"\n",
"embeddings = OpenAIEmbeddings()\n",
"\n",
"for i, d in enumerate(docs):\n",
" d.metadata = {'doc_id': i}\n",
"\n",
"docsearch = Clickhouse.from_documents(docs, embeddings)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "ddbcee77",
"metadata": {
"ExecuteTime": {
"end_time": "2023-06-03T08:29:43.487436Z",
"start_time": "2023-06-03T08:29:43.040831Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"0.6779101415357189 {'doc_id': 0} Madam Speaker, Madam...\n",
"0.6997970363474885 {'doc_id': 8} And so many families...\n",
"0.7044504914336727 {'doc_id': 1} Groups of citizens b...\n",
"0.7053558702165094 {'doc_id': 6} And Im taking robus...\n"
]
}
],
"source": [
"meta = docsearch.metadata_column\n",
"output = docsearch.similarity_search_with_relevance_scores('What did the president say about Ketanji Brown Jackson?', \n",
" k=4, where_str=f\"{meta}.doc_id<10\")\n",
"for d, dist in output:\n",
" print(dist, d.metadata, d.page_content[:20] + '...')"
]
},
{
"cell_type": "markdown",
"id": "a359ed74",
"metadata": {},
"source": [
"## Deleting your data"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "fb6a9d36",
"metadata": {
"ExecuteTime": {
"end_time": "2023-06-03T08:30:24.822384Z",
"start_time": "2023-06-03T08:30:24.798571Z"
}
},
"outputs": [],
"source": [
"docsearch.drop()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.2"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

@ -4,6 +4,7 @@ from langchain.vectorstores.annoy import Annoy
from langchain.vectorstores.atlas import AtlasDB
from langchain.vectorstores.base import VectorStore
from langchain.vectorstores.chroma import Chroma
from langchain.vectorstores.clickhouse import Clickhouse, ClickhouseSettings
from langchain.vectorstores.deeplake import DeepLake
from langchain.vectorstores.docarray import DocArrayHnswSearch, DocArrayInMemorySearch
from langchain.vectorstores.elastic_vector_search import ElasticVectorSearch
@ -51,4 +52,6 @@ __all__ = [
"DocArrayHnswSearch",
"DocArrayInMemorySearch",
"Typesense",
"Clickhouse",
"ClickhouseSettings",
]

@ -0,0 +1,465 @@
"""Wrapper around open source ClickHouse VectorSearch capability."""
from __future__ import annotations
import json
import logging
from hashlib import sha1
from threading import Thread
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from pydantic import BaseSettings
from langchain.docstore.document import Document
from langchain.embeddings.base import Embeddings
from langchain.vectorstores.base import VectorStore
logger = logging.getLogger()
def has_mul_sub_str(s: str, *args: Any) -> bool:
for a in args:
if a not in s:
return False
return True
class ClickhouseSettings(BaseSettings):
"""ClickHouse Client Configuration
Attribute:
clickhouse_host (str) : An URL to connect to MyScale backend.
Defaults to 'localhost'.
clickhouse_port (int) : URL port to connect with HTTP. Defaults to 8443.
username (str) : Username to login. Defaults to None.
password (str) : Password to login. Defaults to None.
index_type (str): index type string.
index_param (list): index build parameter.
index_query_params(dict): index query parameters.
database (str) : Database name to find the table. Defaults to 'default'.
table (str) : Table name to operate on.
Defaults to 'vector_table'.
metric (str) : Metric to compute distance,
supported are ('angular', 'euclidean', 'manhattan', 'hamming',
'dot'). Defaults to 'angular'.
https://github.com/spotify/annoy/blob/main/src/annoymodule.cc#L149-L169
column_map (Dict) : Column type map to project column name onto langchain
semantics. Must have keys: `text`, `id`, `vector`,
must be same size to number of columns. For example:
.. code-block:: python
{
'id': 'text_id',
'uuid': 'global_unique_id'
'embedding': 'text_embedding',
'document': 'text_plain',
'metadata': 'metadata_dictionary_in_json',
}
Defaults to identity map.
"""
host: str = "localhost"
port: int = 8123
username: Optional[str] = None
password: Optional[str] = None
index_type: str = "annoy"
# Annoy supports L2Distance and cosineDistance.
index_param: Optional[Union[List, Dict]] = [100, "'L2Distance'"]
index_query_params: Dict[str, str] = {}
column_map: Dict[str, str] = {
"id": "id",
"uuid": "uuid",
"document": "document",
"embedding": "embedding",
"metadata": "metadata",
}
database: str = "default"
table: str = "langchain"
metric: str = "angular"
def __getitem__(self, item: str) -> Any:
return getattr(self, item)
class Config:
env_file = ".env"
env_prefix = "clickhouse_"
env_file_encoding = "utf-8"
class Clickhouse(VectorStore):
"""Wrapper around ClickHouse vector database
You need a `clickhouse-connect` python package, and a valid account
to connect to ClickHouse.
ClickHouse can not only search with simple vector indexes,
it also supports complex query with multiple conditions,
constraints and even sub-queries.
For more information, please visit
[ClickHouse official site](https://clickhouse.com/clickhouse)
"""
def __init__(
self,
embedding: Embeddings,
config: Optional[ClickhouseSettings] = None,
**kwargs: Any,
) -> None:
"""ClickHouse Wrapper to LangChain
embedding_function (Embeddings):
config (ClickHouseSettings): Configuration to ClickHouse Client
Other keyword arguments will pass into
[clickhouse-connect](https://docs.clickhouse.com/)
"""
try:
from clickhouse_connect import get_client
except ImportError:
raise ValueError(
"Could not import clickhouse connect python package. "
"Please install it with `pip install clickhouse-connect`."
)
try:
from tqdm import tqdm
self.pgbar = tqdm
except ImportError:
# Just in case if tqdm is not installed
self.pgbar = lambda x, **kwargs: x
super().__init__()
if config is not None:
self.config = config
else:
self.config = ClickhouseSettings()
assert self.config
assert self.config.host and self.config.port
assert (
self.config.column_map
and self.config.database
and self.config.table
and self.config.metric
)
for k in ["id", "embedding", "document", "metadata", "uuid"]:
assert k in self.config.column_map
assert self.config.metric in [
"angular",
"euclidean",
"manhattan",
"hamming",
"dot",
]
# initialize the schema
dim = len(embedding.embed_query("test"))
index_params = (
(
",".join([f"'{k}={v}'" for k, v in self.config.index_param.items()])
if self.config.index_param
else ""
)
if isinstance(self.config.index_param, Dict)
else ",".join([str(p) for p in self.config.index_param])
if isinstance(self.config.index_param, List)
else self.config.index_param
)
self.schema = f"""\
CREATE TABLE IF NOT EXISTS {self.config.database}.{self.config.table}(
{self.config.column_map['id']} Nullable(String),
{self.config.column_map['document']} Nullable(String),
{self.config.column_map['embedding']} Array(Float32),
{self.config.column_map['metadata']} JSON,
{self.config.column_map['uuid']} UUID DEFAULT generateUUIDv4(),
CONSTRAINT cons_vec_len CHECK length({self.config.column_map['embedding']}) = {dim},
INDEX vec_idx {self.config.column_map['embedding']} TYPE \
{self.config.index_type}({index_params}) GRANULARITY 1000
) ENGINE = MergeTree ORDER BY uuid SETTINGS index_granularity = 8192\
"""
self.dim = dim
self.BS = "\\"
self.must_escape = ("\\", "'")
self.embedding_function = embedding
self.dist_order = "ASC" # Only support ConsingDistance and L2Distance
# Create a connection to clickhouse
self.client = get_client(
host=self.config.host,
port=self.config.port,
username=self.config.username,
password=self.config.password,
**kwargs,
)
# Enable JSON type
self.client.command("SET allow_experimental_object_type=1")
# Enable Annoy index
self.client.command("SET allow_experimental_annoy_index=1")
self.client.command(self.schema)
def escape_str(self, value: str) -> str:
return "".join(f"{self.BS}{c}" if c in self.must_escape else c for c in value)
def _build_insert_sql(self, transac: Iterable, column_names: Iterable[str]) -> str:
ks = ",".join(column_names)
_data = []
for n in transac:
n = ",".join([f"'{self.escape_str(str(_n))}'" for _n in n])
_data.append(f"({n})")
i_str = f"""
INSERT INTO TABLE
{self.config.database}.{self.config.table}({ks})
VALUES
{','.join(_data)}
"""
return i_str
def _insert(self, transac: Iterable, column_names: Iterable[str]) -> None:
_insert_query = self._build_insert_sql(transac, column_names)
self.client.command(_insert_query)
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
batch_size: int = 32,
ids: Optional[Iterable[str]] = None,
**kwargs: Any,
) -> List[str]:
"""Insert more texts through the embeddings and add to the VectorStore.
Args:
texts: Iterable of strings to add to the VectorStore.
ids: Optional list of ids to associate with the texts.
batch_size: Batch size of insertion
metadata: Optional column data to be inserted
Returns:
List of ids from adding the texts into the VectorStore.
"""
# Embed and create the documents
ids = ids or [sha1(t.encode("utf-8")).hexdigest() for t in texts]
colmap_ = self.config.column_map
transac = []
column_names = {
colmap_["id"]: ids,
colmap_["document"]: texts,
colmap_["embedding"]: self.embedding_function.embed_documents(list(texts)),
}
metadatas = metadatas or [{} for _ in texts]
column_names[colmap_["metadata"]] = map(json.dumps, metadatas)
assert len(set(colmap_) - set(column_names)) >= 0
keys, values = zip(*column_names.items())
try:
t = None
for v in self.pgbar(
zip(*values), desc="Inserting data...", total=len(metadatas)
):
assert (
len(v[keys.index(self.config.column_map["embedding"])]) == self.dim
)
transac.append(v)
if len(transac) == batch_size:
if t:
t.join()
t = Thread(target=self._insert, args=[transac, keys])
t.start()
transac = []
if len(transac) > 0:
if t:
t.join()
self._insert(transac, keys)
return [i for i in ids]
except Exception as e:
logger.error(f"\033[91m\033[1m{type(e)}\033[0m \033[95m{str(e)}\033[0m")
return []
@classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[Dict[Any, Any]]] = None,
config: Optional[ClickhouseSettings] = None,
text_ids: Optional[Iterable[str]] = None,
batch_size: int = 32,
**kwargs: Any,
) -> Clickhouse:
"""Create ClickHouse wrapper with existing texts
Args:
embedding_function (Embeddings): Function to extract text embedding
texts (Iterable[str]): List or tuple of strings to be added
config (ClickHouseSettings, Optional): ClickHouse configuration
text_ids (Optional[Iterable], optional): IDs for the texts.
Defaults to None.
batch_size (int, optional): Batchsize when transmitting data to ClickHouse.
Defaults to 32.
metadata (List[dict], optional): metadata to texts. Defaults to None.
Other keyword arguments will pass into
[clickhouse-connect](https://clickhouse.com/docs/en/integrations/python#clickhouse-connect-driver-api)
Returns:
ClickHouse Index
"""
ctx = cls(embedding, config, **kwargs)
ctx.add_texts(texts, ids=text_ids, batch_size=batch_size, metadatas=metadatas)
return ctx
def __repr__(self) -> str:
"""Text representation for ClickHouse Vector Store, prints backends, username
and schemas. Easy to use with `str(ClickHouse())`
Returns:
repr: string to show connection info and data schema
"""
_repr = f"\033[92m\033[1m{self.config.database}.{self.config.table} @ "
_repr += f"{self.config.host}:{self.config.port}\033[0m\n\n"
_repr += f"\033[1musername: {self.config.username}\033[0m\n\nTable Schema:\n"
_repr += "-" * 51 + "\n"
for r in self.client.query(
f"DESC {self.config.database}.{self.config.table}"
).named_results():
_repr += (
f"|\033[94m{r['name']:24s}\033[0m|\033[96m{r['type']:24s}\033[0m|\n"
)
_repr += "-" * 51 + "\n"
return _repr
def _build_query_sql(
self, q_emb: List[float], topk: int, where_str: Optional[str] = None
) -> str:
q_emb_str = ",".join(map(str, q_emb))
if where_str:
where_str = f"PREWHERE {where_str}"
else:
where_str = ""
settings_strs = []
if self.config.index_query_params:
for k in self.config.index_query_params:
settings_strs.append(f"SETTING {k}={self.config.index_query_params[k]}")
q_str = f"""
SELECT {self.config.column_map['document']},
{self.config.column_map['metadata']}, dist
FROM {self.config.database}.{self.config.table}
{where_str}
ORDER BY L2Distance({self.config.column_map['embedding']}, [{q_emb_str}])
AS dist {self.dist_order}
LIMIT {topk} {' '.join(settings_strs)}
"""
return q_str
def similarity_search(
self, query: str, k: int = 4, where_str: Optional[str] = None, **kwargs: Any
) -> List[Document]:
"""Perform a similarity search with ClickHouse
Args:
query (str): query string
k (int, optional): Top K neighbors to retrieve. Defaults to 4.
where_str (Optional[str], optional): where condition string.
Defaults to None.
NOTE: Please do not let end-user to fill this and always be aware
of SQL injection. When dealing with metadatas, remember to
use `{self.metadata_column}.attribute` instead of `attribute`
alone. The default name for it is `metadata`.
Returns:
List[Document]: List of Documents
"""
return self.similarity_search_by_vector(
self.embedding_function.embed_query(query), k, where_str, **kwargs
)
def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
where_str: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Perform a similarity search with ClickHouse by vectors
Args:
query (str): query string
k (int, optional): Top K neighbors to retrieve. Defaults to 4.
where_str (Optional[str], optional): where condition string.
Defaults to None.
NOTE: Please do not let end-user to fill this and always be aware
of SQL injection. When dealing with metadatas, remember to
use `{self.metadata_column}.attribute` instead of `attribute`
alone. The default name for it is `metadata`.
Returns:
List[Document]: List of (Document, similarity)
"""
q_str = self._build_query_sql(embedding, k, where_str)
try:
return [
Document(
page_content=r[self.config.column_map["document"]],
metadata=r[self.config.column_map["metadata"]],
)
for r in self.client.query(q_str).named_results()
]
except Exception as e:
logger.error(f"\033[91m\033[1m{type(e)}\033[0m \033[95m{str(e)}\033[0m")
return []
def similarity_search_with_relevance_scores(
self, query: str, k: int = 4, where_str: Optional[str] = None, **kwargs: Any
) -> List[Tuple[Document, float]]:
"""Perform a similarity search with ClickHouse
Args:
query (str): query string
k (int, optional): Top K neighbors to retrieve. Defaults to 4.
where_str (Optional[str], optional): where condition string.
Defaults to None.
NOTE: Please do not let end-user to fill this and always be aware
of SQL injection. When dealing with metadatas, remember to
use `{self.metadata_column}.attribute` instead of `attribute`
alone. The default name for it is `metadata`.
Returns:
List[Document]: List of documents
"""
q_str = self._build_query_sql(
self.embedding_function.embed_query(query), k, where_str
)
try:
return [
(
Document(
page_content=r[self.config.column_map["document"]],
metadata=r[self.config.column_map["metadata"]],
),
r["dist"],
)
for r in self.client.query(q_str).named_results()
]
except Exception as e:
logger.error(f"\033[91m\033[1m{type(e)}\033[0m \033[95m{str(e)}\033[0m")
return []
def drop(self) -> None:
"""
Helper function: Drop data
"""
self.client.command(
f"DROP TABLE IF EXISTS {self.config.database}.{self.config.table}"
)
@property
def metadata_column(self) -> str:
return self.config.column_map["metadata"]

@ -0,0 +1,108 @@
"""Test ClickHouse functionality."""
import pytest
from langchain.docstore.document import Document
from langchain.vectorstores import Clickhouse, ClickhouseSettings
from tests.integration_tests.vectorstores.fake_embeddings import FakeEmbeddings
def test_clickhouse() -> None:
"""Test end to end construction and search."""
texts = ["foo", "bar", "baz"]
config = ClickhouseSettings()
config.table = "test_clickhouse"
docsearch = Clickhouse.from_texts(texts, FakeEmbeddings(), config=config)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo", metadata={"_dummy": 0})]
docsearch.drop()
@pytest.mark.asyncio
async def test_clickhouse_async() -> None:
"""Test end to end construction and search."""
texts = ["foo", "bar", "baz"]
config = ClickhouseSettings()
config.table = "test_clickhouse_async"
docsearch = Clickhouse.from_texts(
texts=texts, embedding=FakeEmbeddings(), config=config
)
output = await docsearch.asimilarity_search("foo", k=1)
assert output == [Document(page_content="foo", metadata={"_dummy": 0})]
docsearch.drop()
def test_clickhouse_with_metadatas() -> None:
"""Test end to end construction and search."""
texts = ["foo", "bar", "baz"]
metadatas = [{"page": str(i)} for i in range(len(texts))]
config = ClickhouseSettings()
config.table = "test_clickhouse_with_metadatas"
docsearch = Clickhouse.from_texts(
texts=texts,
embedding=FakeEmbeddings(),
config=config,
metadatas=metadatas,
)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo", metadata={"page": "0"})]
docsearch.drop()
def test_clickhouse_with_metadatas_with_relevance_scores() -> None:
"""Test end to end construction and scored search."""
texts = ["foo", "bar", "baz"]
metadatas = [{"page": str(i)} for i in range(len(texts))]
config = ClickhouseSettings()
config.table = "test_clickhouse_with_metadatas_with_relevance_scores"
docsearch = Clickhouse.from_texts(
texts=texts, embedding=FakeEmbeddings(), metadatas=metadatas, config=config
)
output = docsearch.similarity_search_with_relevance_scores("foo", k=1)
assert output[0][0] == Document(page_content="foo", metadata={"page": "0"})
docsearch.drop()
def test_clickhouse_search_filter() -> None:
"""Test end to end construction and search with metadata filtering."""
texts = ["far", "bar", "baz"]
metadatas = [{"first_letter": "{}".format(text[0])} for text in texts]
config = ClickhouseSettings()
config.table = "test_clickhouse_search_filter"
docsearch = Clickhouse.from_texts(
texts=texts, embedding=FakeEmbeddings(), metadatas=metadatas, config=config
)
output = docsearch.similarity_search(
"far", k=1, where_str=f"{docsearch.metadata_column}.first_letter='f'"
)
assert output == [Document(page_content="far", metadata={"first_letter": "f"})]
output = docsearch.similarity_search(
"bar", k=1, where_str=f"{docsearch.metadata_column}.first_letter='b'"
)
assert output == [Document(page_content="bar", metadata={"first_letter": "b"})]
docsearch.drop()
def test_clickhouse_with_persistence() -> None:
"""Test end to end construction and search, with persistence."""
config = ClickhouseSettings()
config.table = "test_clickhouse_with_persistence"
texts = [
"foo",
"bar",
"baz",
]
docsearch = Clickhouse.from_texts(
texts=texts, embedding=FakeEmbeddings(), config=config
)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo", metadata={"_dummy": 0})]
# Get a new VectorStore with same config
# it will reuse the table spontaneously
# unless you drop it
docsearch = Clickhouse(embedding=FakeEmbeddings(), config=config)
output = docsearch.similarity_search("foo", k=1)
# Clean up
docsearch.drop()
Loading…
Cancel
Save