From 783e7f6939da251d2f76bd592eaaeefa98db299b Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 29 Sep 2023 00:32:19 +0100 Subject: [PATCH 1/4] working es --- application/parser/open_ai_func.py | 7 +- application/requirements.txt | 1 + application/vectorstore/base.py | 2 +- application/vectorstore/elasticsearch.py | 199 +++++++++++++++++++++++ application/vectorstore/faiss.py | 5 +- frontend/package-lock.json | 52 +++--- 6 files changed, 235 insertions(+), 31 deletions(-) create mode 100644 application/vectorstore/elasticsearch.py diff --git a/application/parser/open_ai_func.py b/application/parser/open_ai_func.py index 969165d2..0489eb87 100644 --- a/application/parser/open_ai_func.py +++ b/application/parser/open_ai_func.py @@ -1,8 +1,7 @@ import os import tiktoken -from langchain.embeddings import OpenAIEmbeddings -from langchain.vectorstores import FAISS +from application.vectorstore.faiss import FaissStore from retry import retry @@ -33,11 +32,9 @@ def call_openai_api(docs, folder_name, task_status): os.makedirs(f"{folder_name}") from tqdm import tqdm - docs_test = [docs[0]] - docs.pop(0) c1 = 0 - store = FAISS.from_documents(docs_test, OpenAIEmbeddings(openai_api_key=os.getenv("EMBEDDINGS_KEY"))) + store = FaissStore(path=f"{folder_name}", embeddings_key=os.getenv("EMBEDDINGS_KEY")) # Uncomment for MPNet embeddings # model_name = "sentence-transformers/all-mpnet-base-v2" diff --git a/application/requirements.txt b/application/requirements.txt index d978cb41..68532aa1 100644 --- a/application/requirements.txt +++ b/application/requirements.txt @@ -22,6 +22,7 @@ decorator==5.1.1 dill==0.3.6 dnspython==2.3.0 ecdsa==0.18.0 +elasticsearch==8.9.0 entrypoints==0.4 faiss-cpu==1.7.3 filelock==3.9.0 diff --git a/application/vectorstore/base.py b/application/vectorstore/base.py index ad481744..18a3881b 100644 --- a/application/vectorstore/base.py +++ b/application/vectorstore/base.py @@ -19,7 +19,7 @@ class BaseVectorStore(ABC): def is_azure_configured(self): return settings.OPENAI_API_BASE and settings.OPENAI_API_VERSION and settings.AZURE_DEPLOYMENT_NAME - def _get_docsearch(self, embeddings_name, embeddings_key=None): + def _get_embeddings(self, embeddings_name, embeddings_key=None): embeddings_factory = { "openai_text-embedding-ada-002": OpenAIEmbeddings, "huggingface_sentence-transformers/all-mpnet-base-v2": HuggingFaceHubEmbeddings, diff --git a/application/vectorstore/elasticsearch.py b/application/vectorstore/elasticsearch.py new file mode 100644 index 00000000..b87f851a --- /dev/null +++ b/application/vectorstore/elasticsearch.py @@ -0,0 +1,199 @@ +from application.vectorstore.base import BaseVectorStore +from application.core.settings import settings +import elasticsearch +#from langchain.vectorstores.elasticsearch import ElasticsearchStore + + +class ElasticsearchStore(BaseVectorStore): + _es_connection = None # Class attribute to hold the Elasticsearch connection + + def __init__(self, path, embeddings_key, index_name="docsgpt"): + super().__init__() + self.path = path.replace("/app/application/indexes/", "") + self.embeddings_key = embeddings_key + self.index_name = index_name + + if ElasticsearchStore._es_connection is None: + connection_params = {} + connection_params["cloud_id"] = settings.ELASTIC_CLOUD_ID + connection_params["basic_auth"] = (settings.ELASTIC_USERNAME, settings.ELASTIC_PASSWORD) + ElasticsearchStore._es_connection = elasticsearch.Elasticsearch(**connection_params) + + self.docsearch = ElasticsearchStore._es_connection + + def connect_to_elasticsearch( + *, + es_url = None, + cloud_id = None, + api_key = None, + username = None, + password = None, + ): + try: + import elasticsearch + except ImportError: + raise ImportError( + "Could not import elasticsearch python package. " + "Please install it with `pip install elasticsearch`." + ) + + if es_url and cloud_id: + raise ValueError( + "Both es_url and cloud_id are defined. Please provide only one." + ) + + connection_params = {} + + if es_url: + connection_params["hosts"] = [es_url] + elif cloud_id: + connection_params["cloud_id"] = cloud_id + else: + raise ValueError("Please provide either elasticsearch_url or cloud_id.") + + if api_key: + connection_params["api_key"] = api_key + elif username and password: + connection_params["basic_auth"] = (username, password) + + es_client = elasticsearch.Elasticsearch( + **connection_params, + ) + try: + es_client.info() + except Exception as e: + raise e + + return es_client + + def search(self, question, k=2, index_name=settings.ELASTIC_INDEX, *args, **kwargs): + embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, self.embeddings_key) + vector = embeddings.embed_query(question) + knn = { + "filter": [{"match": {"metadata.filename.keyword": self.path}}], + "field": "vector", + "k": k, + "num_candidates": 100, + "query_vector": vector, + } + full_query = { + "knn": knn, + "query": { + "bool": { + "must": [ + { + "match": { + "text": { + "query": question, + } + } + } + ], + "filter": [{"match": {"metadata.filename.keyword": self.path}}], + } + }, + "rank": {"rrf": {}}, + } + resp = self.docsearch.search(index=index_name, query=full_query['query'], size=k, knn=full_query['knn']) + return resp + + def _create_index_if_not_exists( + self, index_name, dims_length + ): + + if self.client.indices.exists(index=index_name): + print(f"Index {index_name} already exists.") + + else: + self.strategy.before_index_setup( + client=self.client, + text_field=self.query_field, + vector_query_field=self.vector_query_field, + ) + + indexSettings = self.index( + dims_length=dims_length, + ) + self.client.indices.create(index=index_name, **indexSettings) + def index( + self, + dims_length, + ): + + + return { + "mappings": { + "properties": { + "vector": { + "type": "dense_vector", + "dims": dims_length, + "index": True, + "similarity": "cosine", + }, + } + } + } + + def add_texts( + self, + texts, + metadatas = None, + ids = None, + refresh_indices = True, + create_index_if_not_exists = True, + bulk_kwargs = None, + **kwargs, + ): + + from elasticsearch.helpers import BulkIndexError, bulk + + bulk_kwargs = bulk_kwargs or {} + import uuid + embeddings = [] + ids = ids or [str(uuid.uuid4()) for _ in texts] + requests = [] + embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, self.embeddings_key) + + vectors = embeddings.embed_documents(list(texts)) + + dims_length = len(vectors[0]) + + if create_index_if_not_exists: + self._create_index_if_not_exists( + index_name=self.index_name, dims_length=dims_length + ) + + for i, (text, vector) in enumerate(zip(texts, vectors)): + metadata = metadatas[i] if metadatas else {} + + requests.append( + { + "_op_type": "index", + "_index": self.index_name, + "text": text, + "vector": vector, + "metadata": metadata, + "_id": ids[i], + } + ) + + + if len(requests) > 0: + try: + success, failed = bulk( + self.client, + requests, + stats_only=True, + refresh=refresh_indices, + **bulk_kwargs, + ) + return ids + except BulkIndexError as e: + print(f"Error adding texts: {e}") + firstError = e.errors[0].get("index", {}).get("error", {}) + print(f"First error reason: {firstError.get('reason')}") + raise e + + else: + return [] + diff --git a/application/vectorstore/faiss.py b/application/vectorstore/faiss.py index 9a562dce..d85b6084 100644 --- a/application/vectorstore/faiss.py +++ b/application/vectorstore/faiss.py @@ -8,8 +8,11 @@ class FaissStore(BaseVectorStore): super().__init__() self.path = path self.docsearch = FAISS.load_local( - self.path, self._get_docsearch(settings.EMBEDDINGS_NAME, settings.EMBEDDINGS_KEY) + self.path, self._get_embeddings(settings.EMBEDDINGS_NAME, settings.EMBEDDINGS_KEY) ) def search(self, *args, **kwargs): return self.docsearch.similarity_search(*args, **kwargs) + + def add_texts(self, *args, **kwargs): + return self.docsearch.add_texts(*args, **kwargs) diff --git a/frontend/package-lock.json b/frontend/package-lock.json index ff8a21f6..415c483b 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -1346,9 +1346,9 @@ } }, "node_modules/@typescript-eslint/eslint-plugin/node_modules/semver": { - "version": "7.3.8", - "resolved": "https://registry.npmjs.org/semver/-/semver-7.3.8.tgz", - "integrity": "sha512-NB1ctGL5rlHrPJtFDVIVzTyQylMLu9N9VICA6HSFJo8MCGVTMW6gfpicwKmmK/dAjTOrqu5l63JJOpDSrAis3A==", + "version": "7.5.4", + "resolved": "https://registry.npmjs.org/semver/-/semver-7.5.4.tgz", + "integrity": "sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA==", "dev": true, "dependencies": { "lru-cache": "^6.0.0" @@ -1490,9 +1490,9 @@ } }, "node_modules/@typescript-eslint/typescript-estree/node_modules/semver": { - "version": "7.3.8", - "resolved": "https://registry.npmjs.org/semver/-/semver-7.3.8.tgz", - "integrity": "sha512-NB1ctGL5rlHrPJtFDVIVzTyQylMLu9N9VICA6HSFJo8MCGVTMW6gfpicwKmmK/dAjTOrqu5l63JJOpDSrAis3A==", + "version": "7.5.4", + "resolved": "https://registry.npmjs.org/semver/-/semver-7.5.4.tgz", + "integrity": "sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA==", "dev": true, "dependencies": { "lru-cache": "^6.0.0" @@ -1549,9 +1549,9 @@ } }, "node_modules/@typescript-eslint/utils/node_modules/semver": { - "version": "7.3.8", - "resolved": "https://registry.npmjs.org/semver/-/semver-7.3.8.tgz", - "integrity": "sha512-NB1ctGL5rlHrPJtFDVIVzTyQylMLu9N9VICA6HSFJo8MCGVTMW6gfpicwKmmK/dAjTOrqu5l63JJOpDSrAis3A==", + "version": "7.5.4", + "resolved": "https://registry.npmjs.org/semver/-/semver-7.5.4.tgz", + "integrity": "sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA==", "dev": true, "dependencies": { "lru-cache": "^6.0.0" @@ -1991,9 +1991,9 @@ } }, "node_modules/builtins/node_modules/semver": { - "version": "7.3.8", - "resolved": "https://registry.npmjs.org/semver/-/semver-7.3.8.tgz", - "integrity": "sha512-NB1ctGL5rlHrPJtFDVIVzTyQylMLu9N9VICA6HSFJo8MCGVTMW6gfpicwKmmK/dAjTOrqu5l63JJOpDSrAis3A==", + "version": "7.5.4", + "resolved": "https://registry.npmjs.org/semver/-/semver-7.5.4.tgz", + "integrity": "sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA==", "dev": true, "dependencies": { "lru-cache": "^6.0.0" @@ -2055,9 +2055,9 @@ } }, "node_modules/caniuse-lite": { - "version": "1.0.30001450", - "resolved": "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001450.tgz", - "integrity": "sha512-qMBmvmQmFXaSxexkjjfMvD5rnDL0+m+dUMZKoDYsGG8iZN29RuYh9eRoMvKsT6uMAWlyUUGDEQGJJYjzCIO9ew==", + "version": "1.0.30001541", + "resolved": "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001541.tgz", + "integrity": "sha512-bLOsqxDgTqUBkzxbNlSBt8annkDpQB9NdzdTbO2ooJ+eC/IQcvDspDc058g84ejCelF7vHUx57KIOjEecOHXaw==", "dev": true, "funding": [ { @@ -2067,6 +2067,10 @@ { "type": "tidelift", "url": "https://tidelift.com/funding/github/npm/caniuse-lite" + }, + { + "type": "github", + "url": "https://github.com/sponsors/ai" } ] }, @@ -2889,9 +2893,9 @@ } }, "node_modules/eslint-plugin-n/node_modules/semver": { - "version": "7.3.8", - "resolved": "https://registry.npmjs.org/semver/-/semver-7.3.8.tgz", - "integrity": "sha512-NB1ctGL5rlHrPJtFDVIVzTyQylMLu9N9VICA6HSFJo8MCGVTMW6gfpicwKmmK/dAjTOrqu5l63JJOpDSrAis3A==", + "version": "7.5.4", + "resolved": "https://registry.npmjs.org/semver/-/semver-7.5.4.tgz", + "integrity": "sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA==", "dev": true, "dependencies": { "lru-cache": "^6.0.0" @@ -4478,9 +4482,9 @@ } }, "node_modules/lint-staged/node_modules/yaml": { - "version": "2.2.1", - "resolved": "https://registry.npmjs.org/yaml/-/yaml-2.2.1.tgz", - "integrity": "sha512-e0WHiYql7+9wr4cWMx3TVQrNwejKaEe7/rHNmQmqRjazfOP5W8PB6Jpebb5o6fIapbz9o9+2ipcaTM2ZwDI6lw==", + "version": "2.3.2", + "resolved": "https://registry.npmjs.org/yaml/-/yaml-2.3.2.tgz", + "integrity": "sha512-N/lyzTPaJasoDmfV7YTrYCI0G/3ivm/9wdG0aHuheKowWQwGTsK0Eoiw6utmzAnI6pkJa0DUVygvp3spqqEKXg==", "dev": true, "engines": { "node": ">= 14" @@ -6532,9 +6536,9 @@ } }, "node_modules/semver": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz", - "integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==", + "version": "6.3.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz", + "integrity": "sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==", "dev": true, "bin": { "semver": "bin/semver.js" From 347cfe253f92f8384a1bf99a9d47095083674d4d Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 29 Sep 2023 17:17:48 +0100 Subject: [PATCH 2/4] elastic2 --- application/api/answer/routes.py | 6 +- application/api/internal/routes.py | 37 ++-- application/api/user/routes.py | 27 ++- application/core/settings.py | 8 + application/parser/open_ai_func.py | 23 ++- application/vectorstore/elasticsearch.py | 204 ++++++++++++---------- application/vectorstore/faiss.py | 16 +- application/vectorstore/vector_creator.py | 16 ++ application/worker.py | 12 +- 9 files changed, 216 insertions(+), 133 deletions(-) create mode 100644 application/vectorstore/vector_creator.py diff --git a/application/api/answer/routes.py b/application/api/answer/routes.py index ae9ef71f..b787115e 100644 --- a/application/api/answer/routes.py +++ b/application/api/answer/routes.py @@ -14,7 +14,7 @@ from transformers import GPT2TokenizerFast from application.core.settings import settings from application.llm.openai import OpenAILLM, AzureOpenAILLM -from application.vectorstore.faiss import FaissStore +from application.vectorstore.vector_creator import VectorCreator from application.error import bad_request @@ -234,7 +234,7 @@ def stream(): vectorstore = get_vectorstore({"active_docs": data["active_docs"]}) else: vectorstore = "" - docsearch = FaissStore(vectorstore, embeddings_key) + docsearch = VectorCreator.create_vectorstore(settings.VECTOR_STORE, vectorstore, embeddings_key) return Response( complete_stream(question, docsearch, @@ -268,7 +268,7 @@ def api_answer(): vectorstore = get_vectorstore(data) # loading the index and the store and the prompt template # Note if you have used other embeddings than OpenAI, you need to change the embeddings - docsearch = FaissStore(vectorstore, embeddings_key) + docsearch = VectorCreator.create_vectorstore(settings.VECTOR_STORE, vectorstore, embeddings_key) if is_azure_configured(): llm = AzureOpenAILLM( diff --git a/application/api/internal/routes.py b/application/api/internal/routes.py index ca6da174..e8a1b80b 100644 --- a/application/api/internal/routes.py +++ b/application/api/internal/routes.py @@ -34,25 +34,26 @@ def upload_index_files(): if "name" not in request.form: return {"status": "no name"} job_name = secure_filename(request.form["name"]) - if "file_faiss" not in request.files: - print("No file part") - return {"status": "no file"} - file_faiss = request.files["file_faiss"] - if file_faiss.filename == "": - return {"status": "no file name"} - if "file_pkl" not in request.files: - print("No file part") - return {"status": "no file"} - file_pkl = request.files["file_pkl"] - if file_pkl.filename == "": - return {"status": "no file name"} - - # saves index files save_dir = os.path.join(current_dir, "indexes", user, job_name) - if not os.path.exists(save_dir): - os.makedirs(save_dir) - file_faiss.save(os.path.join(save_dir, "index.faiss")) - file_pkl.save(os.path.join(save_dir, "index.pkl")) + if settings.VECTOR_STORE == "faiss": + if "file_faiss" not in request.files: + print("No file part") + return {"status": "no file"} + file_faiss = request.files["file_faiss"] + if file_faiss.filename == "": + return {"status": "no file name"} + if "file_pkl" not in request.files: + print("No file part") + return {"status": "no file"} + file_pkl = request.files["file_pkl"] + if file_pkl.filename == "": + return {"status": "no file name"} + # saves index files + + if not os.path.exists(save_dir): + os.makedirs(save_dir) + file_faiss.save(os.path.join(save_dir, "index.faiss")) + file_pkl.save(os.path.join(save_dir, "index.pkl")) # create entry in vectors_collection vectors_collection.insert_one( { diff --git a/application/api/user/routes.py b/application/api/user/routes.py index f04631d2..2b1d505a 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -11,6 +11,8 @@ from celery.result import AsyncResult from application.api.user.tasks import ingest from application.core.settings import settings +from application.vectorstore.vector_creator import VectorCreator + mongo = MongoClient(settings.MONGO_URI) db = mongo["docsgpt"] conversations_collection = db["conversations"] @@ -90,10 +92,17 @@ def delete_old(): return {"status": "error"} path_clean = "/".join(dirs) vectors_collection.delete_one({"location": path}) - try: - shutil.rmtree(path_clean) - except FileNotFoundError: - pass + if settings.VECTOR_STORE == "faiss": + try: + shutil.rmtree(os.path.join(current_dir, path_clean)) + except FileNotFoundError: + pass + else: + vetorstore = VectorCreator.create_vectorstore( + settings.VECTOR_STORE, path=os.path.join(current_dir, path_clean) + ) + vetorstore.delete_index() + return {"status": "ok"} @user.route("/api/upload", methods=["POST"]) @@ -173,11 +182,11 @@ def combined_json(): "location": "local", } ) - - data_remote = requests.get("https://d3dg1063dc54p9.cloudfront.net/combined.json").json() - for index in data_remote: - index["location"] = "remote" - data.append(index) + if settings.VECTOR_STORE == "faiss": + data_remote = requests.get("https://d3dg1063dc54p9.cloudfront.net/combined.json").json() + for index in data_remote: + index["location"] = "remote" + data.append(index) return jsonify(data) diff --git a/application/core/settings.py b/application/core/settings.py index d127c293..7ec4aae8 100644 --- a/application/core/settings.py +++ b/application/core/settings.py @@ -13,6 +13,7 @@ class Settings(BaseSettings): TOKENS_MAX_HISTORY: int = 150 SELF_HOSTED_MODEL: bool = False UPLOAD_FOLDER: str = "inputs" + VECTOR_STORE: str = "elasticsearch" # "faiss" or "elasticsearch" API_URL: str = "http://localhost:7091" # backend url for celery worker @@ -23,6 +24,13 @@ class Settings(BaseSettings): AZURE_DEPLOYMENT_NAME: str = None # azure deployment name for answering AZURE_EMBEDDINGS_DEPLOYMENT_NAME: str = None # azure deployment name for embeddings + # elasticsearch + ELASTIC_CLOUD_ID: str = 'Docsgpt:ZXUtY2VudHJhbC0xLmF3cy5jbG91ZC5lcy5pbzo0NDMkYmNiZDYxZDE0ODE0NDNhMTkxNDU2YmI2MWViNzUxNTkkN2IwODMxZWYwMDI0NDFiOGJiNzgxZmQzYjI0MjIxYjA=' # cloud id for elasticsearch + ELASTIC_USERNAME: str = 'elastic' # username for elasticsearch + ELASTIC_PASSWORD: str = 'eSwoSbAhIWkXBsRdvhZxGPwc' # password for elasticsearch + ELASTIC_URL: str = None # url for elasticsearch + ELASTIC_INDEX: str = "docsgptbeta" # index name for elasticsearch + path = Path(__file__).parent.parent.absolute() settings = Settings(_env_file=path.joinpath(".env"), _env_file_encoding="utf-8") diff --git a/application/parser/open_ai_func.py b/application/parser/open_ai_func.py index 0489eb87..ede635a8 100644 --- a/application/parser/open_ai_func.py +++ b/application/parser/open_ai_func.py @@ -1,7 +1,8 @@ import os import tiktoken -from application.vectorstore.faiss import FaissStore +from application.vectorstore.vector_creator import VectorCreator +from application.core.settings import settings from retry import retry @@ -33,9 +34,22 @@ def call_openai_api(docs, folder_name, task_status): from tqdm import tqdm c1 = 0 + if settings.VECTOR_STORE == "faiss": + docs_init = [docs[0]] + docs.pop(0) - store = FaissStore(path=f"{folder_name}", embeddings_key=os.getenv("EMBEDDINGS_KEY")) - + store = VectorCreator.create_vectorstore( + settings.VECTOR_STORE, + docs_init = docs_init, + path=f"{folder_name}", + embeddings_key=os.getenv("EMBEDDINGS_KEY") + ) + else: + store = VectorCreator.create_vectorstore( + settings.VECTOR_STORE, + path=f"{folder_name}", + embeddings_key=os.getenv("EMBEDDINGS_KEY") + ) # Uncomment for MPNet embeddings # model_name = "sentence-transformers/all-mpnet-base-v2" # hf = HuggingFaceEmbeddings(model_name=model_name) @@ -54,7 +68,8 @@ def call_openai_api(docs, folder_name, task_status): store.save_local(f"{folder_name}") break c1 += 1 - store.save_local(f"{folder_name}") + if settings.VECTOR_STORE == "faiss": + store.save_local(f"{folder_name}") def get_user_permission(docs, folder_name): diff --git a/application/vectorstore/elasticsearch.py b/application/vectorstore/elasticsearch.py index b87f851a..ca98c5ea 100644 --- a/application/vectorstore/elasticsearch.py +++ b/application/vectorstore/elasticsearch.py @@ -1,22 +1,38 @@ from application.vectorstore.base import BaseVectorStore from application.core.settings import settings import elasticsearch -#from langchain.vectorstores.elasticsearch import ElasticsearchStore + +class Document(str): + """Class for storing a piece of text and associated metadata.""" + + page_content: str + """String text.""" + metadata: dict + """Arbitrary metadata""" class ElasticsearchStore(BaseVectorStore): _es_connection = None # Class attribute to hold the Elasticsearch connection - def __init__(self, path, embeddings_key, index_name="docsgpt"): + def __init__(self, path, embeddings_key, index_name=settings.ELASTIC_INDEX): super().__init__() - self.path = path.replace("/app/application/indexes/", "") + self.path = path.replace("application/indexes/", "") self.embeddings_key = embeddings_key self.index_name = index_name if ElasticsearchStore._es_connection is None: connection_params = {} - connection_params["cloud_id"] = settings.ELASTIC_CLOUD_ID - connection_params["basic_auth"] = (settings.ELASTIC_USERNAME, settings.ELASTIC_PASSWORD) + if settings.ELASTIC_URL: + connection_params["hosts"] = [settings.ELASTIC_URL] + connection_params["http_auth"] = (settings.ELASTIC_USERNAME, settings.ELASTIC_PASSWORD) + elif settings.ELASTIC_CLOUD_ID: + connection_params["cloud_id"] = settings.ELASTIC_CLOUD_ID + connection_params["basic_auth"] = (settings.ELASTIC_USERNAME, settings.ELASTIC_PASSWORD) + else: + raise ValueError("Please provide either elasticsearch_url or cloud_id.") + + + ElasticsearchStore._es_connection = elasticsearch.Elasticsearch(**connection_params) self.docsearch = ElasticsearchStore._es_connection @@ -94,106 +110,112 @@ class ElasticsearchStore(BaseVectorStore): }, "rank": {"rrf": {}}, } - resp = self.docsearch.search(index=index_name, query=full_query['query'], size=k, knn=full_query['knn']) - return resp + resp = self.docsearch.search(index=self.index_name, query=full_query['query'], size=k, knn=full_query['knn']) + # create Documnets objects from the results page_content ['_source']['text'], metadata ['_source']['metadata'] + import sys + print(self.path, file=sys.stderr) + print(resp, file=sys.stderr) + doc_list = [] + for hit in resp['hits']['hits']: + + doc_list.append(Document(page_content = hit['_source']['text'], metadata = hit['_source']['metadata'])) + return doc_list - def _create_index_if_not_exists( - self, index_name, dims_length - ): + def _create_index_if_not_exists( + self, index_name, dims_length + ): - if self.client.indices.exists(index=index_name): - print(f"Index {index_name} already exists.") + if self._es_connection.indices.exists(index=index_name): + print(f"Index {index_name} already exists.") - else: - self.strategy.before_index_setup( - client=self.client, - text_field=self.query_field, - vector_query_field=self.vector_query_field, - ) + else: - indexSettings = self.index( - dims_length=dims_length, - ) - self.client.indices.create(index=index_name, **indexSettings) - def index( - self, - dims_length, - ): + indexSettings = self.index( + dims_length=dims_length, + ) + self._es_connection.indices.create(index=index_name, **indexSettings) - - return { - "mappings": { - "properties": { - "vector": { - "type": "dense_vector", - "dims": dims_length, - "index": True, - "similarity": "cosine", - }, - } + def index( + self, + dims_length, + ): + return { + "mappings": { + "properties": { + "vector": { + "type": "dense_vector", + "dims": dims_length, + "index": True, + "similarity": "cosine", + }, } } + } - def add_texts( - self, - texts, - metadatas = None, - ids = None, - refresh_indices = True, - create_index_if_not_exists = True, - bulk_kwargs = None, - **kwargs, + def add_texts( + self, + texts, + metadatas = None, + ids = None, + refresh_indices = True, + create_index_if_not_exists = True, + bulk_kwargs = None, + **kwargs, ): - - from elasticsearch.helpers import BulkIndexError, bulk + + from elasticsearch.helpers import BulkIndexError, bulk - bulk_kwargs = bulk_kwargs or {} - import uuid - embeddings = [] - ids = ids or [str(uuid.uuid4()) for _ in texts] - requests = [] - embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, self.embeddings_key) + bulk_kwargs = bulk_kwargs or {} + import uuid + embeddings = [] + ids = ids or [str(uuid.uuid4()) for _ in texts] + requests = [] + embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, self.embeddings_key) - vectors = embeddings.embed_documents(list(texts)) + vectors = embeddings.embed_documents(list(texts)) - dims_length = len(vectors[0]) + dims_length = len(vectors[0]) - if create_index_if_not_exists: - self._create_index_if_not_exists( - index_name=self.index_name, dims_length=dims_length + if create_index_if_not_exists: + self._create_index_if_not_exists( + index_name=self.index_name, dims_length=dims_length + ) + + for i, (text, vector) in enumerate(zip(texts, vectors)): + metadata = metadatas[i] if metadatas else {} + + requests.append( + { + "_op_type": "index", + "_index": self.index_name, + "text": text, + "vector": vector, + "metadata": metadata, + "_id": ids[i], + } + ) + + + if len(requests) > 0: + try: + success, failed = bulk( + self._es_connection, + requests, + stats_only=True, + refresh=refresh_indices, + **bulk_kwargs, ) + return ids + except BulkIndexError as e: + print(f"Error adding texts: {e}") + firstError = e.errors[0].get("index", {}).get("error", {}) + print(f"First error reason: {firstError.get('reason')}") + raise e - for i, (text, vector) in enumerate(zip(texts, vectors)): - metadata = metadatas[i] if metadatas else {} + else: + return [] - requests.append( - { - "_op_type": "index", - "_index": self.index_name, - "text": text, - "vector": vector, - "metadata": metadata, - "_id": ids[i], - } - ) - - - if len(requests) > 0: - try: - success, failed = bulk( - self.client, - requests, - stats_only=True, - refresh=refresh_indices, - **bulk_kwargs, - ) - return ids - except BulkIndexError as e: - print(f"Error adding texts: {e}") - firstError = e.errors[0].get("index", {}).get("error", {}) - print(f"First error reason: {firstError.get('reason')}") - raise e - - else: - return [] + def delete_index(self): + self._es_connection.delete_by_query(index=self.index_name, query={"match": { + "metadata.filename.keyword": self.path}},) diff --git a/application/vectorstore/faiss.py b/application/vectorstore/faiss.py index d85b6084..5c5cee70 100644 --- a/application/vectorstore/faiss.py +++ b/application/vectorstore/faiss.py @@ -4,15 +4,23 @@ from application.core.settings import settings class FaissStore(BaseVectorStore): - def __init__(self, path, embeddings_key): + def __init__(self, path, embeddings_key, docs_init=None): super().__init__() self.path = path - self.docsearch = FAISS.load_local( - self.path, self._get_embeddings(settings.EMBEDDINGS_NAME, settings.EMBEDDINGS_KEY) - ) + if docs_init: + self.docsearch = FAISS.from_documents( + docs_init, self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key) + ) + else: + self.docsearch = FAISS.load_local( + self.path, self._get_embeddings(settings.EMBEDDINGS_NAME, settings.EMBEDDINGS_KEY) + ) def search(self, *args, **kwargs): return self.docsearch.similarity_search(*args, **kwargs) def add_texts(self, *args, **kwargs): return self.docsearch.add_texts(*args, **kwargs) + + def save_local(self, *args, **kwargs): + return self.docsearch.save_local(*args, **kwargs) diff --git a/application/vectorstore/vector_creator.py b/application/vectorstore/vector_creator.py new file mode 100644 index 00000000..cbc491f5 --- /dev/null +++ b/application/vectorstore/vector_creator.py @@ -0,0 +1,16 @@ +from application.vectorstore.faiss import FaissStore +from application.vectorstore.elasticsearch import ElasticsearchStore + + +class VectorCreator: + vectorstores = { + 'faiss': FaissStore, + 'elasticsearch':ElasticsearchStore + } + + @classmethod + def create_vectorstore(cls, type, *args, **kwargs): + vectorstore_class = cls.vectorstores.get(type.lower()) + if not vectorstore_class: + raise ValueError(f"No vectorstore class found for type {type}") + return vectorstore_class(*args, **kwargs) \ No newline at end of file diff --git a/application/worker.py b/application/worker.py index 91c19c30..141aa881 100644 --- a/application/worker.py +++ b/application/worker.py @@ -83,11 +83,15 @@ def ingest_worker(self, directory, formats, name_job, filename, user): # get files from outputs/inputs/index.faiss and outputs/inputs/index.pkl # and send them to the server (provide user and name in form) file_data = {'name': name_job, 'user': user} - files = {'file_faiss': open(full_path + '/index.faiss', 'rb'), - 'file_pkl': open(full_path + '/index.pkl', 'rb')} - response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data) + if settings.VECTOR_STORE == "faiss": + files = {'file_faiss': open(full_path + '/index.faiss', 'rb'), + 'file_pkl': open(full_path + '/index.pkl', 'rb')} + response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data) + response = requests.get(urljoin(settings.API_URL, "/api/delete_old?path=" + full_path)) + else: + response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data) - response = requests.get(urljoin(settings.API_URL, "/api/delete_old?path=")) + # delete local shutil.rmtree(full_path) From d85eb83ea2364b27e394f74ce85cdfa5f2f5fb75 Mon Sep 17 00:00:00 2001 From: Alex Date: Sat, 30 Sep 2023 15:25:31 +0100 Subject: [PATCH 3/4] elastic search fixes --- application/vectorstore/elasticsearch.py | 22 +++++++++++----------- application/worker.py | 4 +++- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/application/vectorstore/elasticsearch.py b/application/vectorstore/elasticsearch.py index ca98c5ea..9df8800b 100644 --- a/application/vectorstore/elasticsearch.py +++ b/application/vectorstore/elasticsearch.py @@ -5,10 +5,13 @@ import elasticsearch class Document(str): """Class for storing a piece of text and associated metadata.""" - page_content: str - """String text.""" - metadata: dict - """Arbitrary metadata""" + def __new__(cls, page_content: str, metadata: dict): + instance = super().__new__(cls, page_content) + instance.page_content = page_content + instance.metadata = metadata + return instance + + class ElasticsearchStore(BaseVectorStore): @@ -16,7 +19,7 @@ class ElasticsearchStore(BaseVectorStore): def __init__(self, path, embeddings_key, index_name=settings.ELASTIC_INDEX): super().__init__() - self.path = path.replace("application/indexes/", "") + self.path = path.replace("application/indexes/", "").rstrip("/") self.embeddings_key = embeddings_key self.index_name = index_name @@ -86,7 +89,7 @@ class ElasticsearchStore(BaseVectorStore): embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, self.embeddings_key) vector = embeddings.embed_query(question) knn = { - "filter": [{"match": {"metadata.filename.keyword": self.path}}], + "filter": [{"match": {"metadata.store.keyword": self.path}}], "field": "vector", "k": k, "num_candidates": 100, @@ -105,16 +108,13 @@ class ElasticsearchStore(BaseVectorStore): } } ], - "filter": [{"match": {"metadata.filename.keyword": self.path}}], + "filter": [{"match": {"metadata.store.keyword": self.path}}], } }, "rank": {"rrf": {}}, } resp = self.docsearch.search(index=self.index_name, query=full_query['query'], size=k, knn=full_query['knn']) # create Documnets objects from the results page_content ['_source']['text'], metadata ['_source']['metadata'] - import sys - print(self.path, file=sys.stderr) - print(resp, file=sys.stderr) doc_list = [] for hit in resp['hits']['hits']: @@ -217,5 +217,5 @@ class ElasticsearchStore(BaseVectorStore): def delete_index(self): self._es_connection.delete_by_query(index=self.index_name, query={"match": { - "metadata.filename.keyword": self.path}},) + "metadata.store.keyword": self.path}},) diff --git a/application/worker.py b/application/worker.py index 141aa881..5c87c707 100644 --- a/application/worker.py +++ b/application/worker.py @@ -21,7 +21,9 @@ except FileExistsError: def metadata_from_filename(title): - return {'title': title} + store = title.split('/') + store = store[1] + '/' + store[2] + return {'title': title, 'store': store} def generate_random_string(length): From 3eacfb91aa30dab64c9c5a061a5fbbae67c5d9e7 Mon Sep 17 00:00:00 2001 From: Alex Date: Sat, 30 Sep 2023 15:32:37 +0100 Subject: [PATCH 4/4] fix314 --- application/core/settings.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/application/core/settings.py b/application/core/settings.py index 947038e7..c3109144 100644 --- a/application/core/settings.py +++ b/application/core/settings.py @@ -25,11 +25,11 @@ class Settings(BaseSettings): AZURE_EMBEDDINGS_DEPLOYMENT_NAME: str = None # azure deployment name for embeddings # elasticsearch - ELASTIC_CLOUD_ID: str = 'Docsgpt:ZXUtY2VudHJhbC0xLmF3cy5jbG91ZC5lcy5pbzo0NDMkYmNiZDYxZDE0ODE0NDNhMTkxNDU2YmI2MWViNzUxNTkkN2IwODMxZWYwMDI0NDFiOGJiNzgxZmQzYjI0MjIxYjA=' # cloud id for elasticsearch - ELASTIC_USERNAME: str = 'elastic' # username for elasticsearch - ELASTIC_PASSWORD: str = 'eSwoSbAhIWkXBsRdvhZxGPwc' # password for elasticsearch + ELASTIC_CLOUD_ID: str = None # cloud id for elasticsearch + ELASTIC_USERNAME: str = None # username for elasticsearch + ELASTIC_PASSWORD: str = None # password for elasticsearch ELASTIC_URL: str = None # url for elasticsearch - ELASTIC_INDEX: str = "docsgptbeta" # index name for elasticsearch + ELASTIC_INDEX: str = "docsgpt" # index name for elasticsearch path = Path(__file__).parent.parent.absolute()