mirror of
https://github.com/arc53/DocsGPT
synced 2024-11-02 03:40:17 +00:00
elastic2
This commit is contained in:
parent
783e7f6939
commit
347cfe253f
@ -14,7 +14,7 @@ from transformers import GPT2TokenizerFast
|
||||
|
||||
from application.core.settings import settings
|
||||
from application.llm.openai import OpenAILLM, AzureOpenAILLM
|
||||
from application.vectorstore.faiss import FaissStore
|
||||
from application.vectorstore.vector_creator import VectorCreator
|
||||
from application.error import bad_request
|
||||
|
||||
|
||||
@ -234,7 +234,7 @@ def stream():
|
||||
vectorstore = get_vectorstore({"active_docs": data["active_docs"]})
|
||||
else:
|
||||
vectorstore = ""
|
||||
docsearch = FaissStore(vectorstore, embeddings_key)
|
||||
docsearch = VectorCreator.create_vectorstore(settings.VECTOR_STORE, vectorstore, embeddings_key)
|
||||
|
||||
return Response(
|
||||
complete_stream(question, docsearch,
|
||||
@ -268,7 +268,7 @@ def api_answer():
|
||||
vectorstore = get_vectorstore(data)
|
||||
# loading the index and the store and the prompt template
|
||||
# Note if you have used other embeddings than OpenAI, you need to change the embeddings
|
||||
docsearch = FaissStore(vectorstore, embeddings_key)
|
||||
docsearch = VectorCreator.create_vectorstore(settings.VECTOR_STORE, vectorstore, embeddings_key)
|
||||
|
||||
if is_azure_configured():
|
||||
llm = AzureOpenAILLM(
|
||||
|
@ -34,25 +34,26 @@ def upload_index_files():
|
||||
if "name" not in request.form:
|
||||
return {"status": "no name"}
|
||||
job_name = secure_filename(request.form["name"])
|
||||
if "file_faiss" not in request.files:
|
||||
print("No file part")
|
||||
return {"status": "no file"}
|
||||
file_faiss = request.files["file_faiss"]
|
||||
if file_faiss.filename == "":
|
||||
return {"status": "no file name"}
|
||||
if "file_pkl" not in request.files:
|
||||
print("No file part")
|
||||
return {"status": "no file"}
|
||||
file_pkl = request.files["file_pkl"]
|
||||
if file_pkl.filename == "":
|
||||
return {"status": "no file name"}
|
||||
|
||||
# saves index files
|
||||
save_dir = os.path.join(current_dir, "indexes", user, job_name)
|
||||
if not os.path.exists(save_dir):
|
||||
os.makedirs(save_dir)
|
||||
file_faiss.save(os.path.join(save_dir, "index.faiss"))
|
||||
file_pkl.save(os.path.join(save_dir, "index.pkl"))
|
||||
if settings.VECTOR_STORE == "faiss":
|
||||
if "file_faiss" not in request.files:
|
||||
print("No file part")
|
||||
return {"status": "no file"}
|
||||
file_faiss = request.files["file_faiss"]
|
||||
if file_faiss.filename == "":
|
||||
return {"status": "no file name"}
|
||||
if "file_pkl" not in request.files:
|
||||
print("No file part")
|
||||
return {"status": "no file"}
|
||||
file_pkl = request.files["file_pkl"]
|
||||
if file_pkl.filename == "":
|
||||
return {"status": "no file name"}
|
||||
# saves index files
|
||||
|
||||
if not os.path.exists(save_dir):
|
||||
os.makedirs(save_dir)
|
||||
file_faiss.save(os.path.join(save_dir, "index.faiss"))
|
||||
file_pkl.save(os.path.join(save_dir, "index.pkl"))
|
||||
# create entry in vectors_collection
|
||||
vectors_collection.insert_one(
|
||||
{
|
||||
|
@ -11,6 +11,8 @@ from celery.result import AsyncResult
|
||||
from application.api.user.tasks import ingest
|
||||
|
||||
from application.core.settings import settings
|
||||
from application.vectorstore.vector_creator import VectorCreator
|
||||
|
||||
mongo = MongoClient(settings.MONGO_URI)
|
||||
db = mongo["docsgpt"]
|
||||
conversations_collection = db["conversations"]
|
||||
@ -90,10 +92,17 @@ def delete_old():
|
||||
return {"status": "error"}
|
||||
path_clean = "/".join(dirs)
|
||||
vectors_collection.delete_one({"location": path})
|
||||
try:
|
||||
shutil.rmtree(path_clean)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
if settings.VECTOR_STORE == "faiss":
|
||||
try:
|
||||
shutil.rmtree(os.path.join(current_dir, path_clean))
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
else:
|
||||
vetorstore = VectorCreator.create_vectorstore(
|
||||
settings.VECTOR_STORE, path=os.path.join(current_dir, path_clean)
|
||||
)
|
||||
vetorstore.delete_index()
|
||||
|
||||
return {"status": "ok"}
|
||||
|
||||
@user.route("/api/upload", methods=["POST"])
|
||||
@ -173,11 +182,11 @@ def combined_json():
|
||||
"location": "local",
|
||||
}
|
||||
)
|
||||
|
||||
data_remote = requests.get("https://d3dg1063dc54p9.cloudfront.net/combined.json").json()
|
||||
for index in data_remote:
|
||||
index["location"] = "remote"
|
||||
data.append(index)
|
||||
if settings.VECTOR_STORE == "faiss":
|
||||
data_remote = requests.get("https://d3dg1063dc54p9.cloudfront.net/combined.json").json()
|
||||
for index in data_remote:
|
||||
index["location"] = "remote"
|
||||
data.append(index)
|
||||
|
||||
return jsonify(data)
|
||||
|
||||
|
@ -13,6 +13,7 @@ class Settings(BaseSettings):
|
||||
TOKENS_MAX_HISTORY: int = 150
|
||||
SELF_HOSTED_MODEL: bool = False
|
||||
UPLOAD_FOLDER: str = "inputs"
|
||||
VECTOR_STORE: str = "elasticsearch" # "faiss" or "elasticsearch"
|
||||
|
||||
API_URL: str = "http://localhost:7091" # backend url for celery worker
|
||||
|
||||
@ -23,6 +24,13 @@ class Settings(BaseSettings):
|
||||
AZURE_DEPLOYMENT_NAME: str = None # azure deployment name for answering
|
||||
AZURE_EMBEDDINGS_DEPLOYMENT_NAME: str = None # azure deployment name for embeddings
|
||||
|
||||
# elasticsearch
|
||||
ELASTIC_CLOUD_ID: str = 'Docsgpt:ZXUtY2VudHJhbC0xLmF3cy5jbG91ZC5lcy5pbzo0NDMkYmNiZDYxZDE0ODE0NDNhMTkxNDU2YmI2MWViNzUxNTkkN2IwODMxZWYwMDI0NDFiOGJiNzgxZmQzYjI0MjIxYjA=' # cloud id for elasticsearch
|
||||
ELASTIC_USERNAME: str = 'elastic' # username for elasticsearch
|
||||
ELASTIC_PASSWORD: str = 'eSwoSbAhIWkXBsRdvhZxGPwc' # password for elasticsearch
|
||||
ELASTIC_URL: str = None # url for elasticsearch
|
||||
ELASTIC_INDEX: str = "docsgptbeta" # index name for elasticsearch
|
||||
|
||||
|
||||
path = Path(__file__).parent.parent.absolute()
|
||||
settings = Settings(_env_file=path.joinpath(".env"), _env_file_encoding="utf-8")
|
||||
|
@ -1,7 +1,8 @@
|
||||
import os
|
||||
|
||||
import tiktoken
|
||||
from application.vectorstore.faiss import FaissStore
|
||||
from application.vectorstore.vector_creator import VectorCreator
|
||||
from application.core.settings import settings
|
||||
from retry import retry
|
||||
|
||||
|
||||
@ -33,9 +34,22 @@ def call_openai_api(docs, folder_name, task_status):
|
||||
|
||||
from tqdm import tqdm
|
||||
c1 = 0
|
||||
if settings.VECTOR_STORE == "faiss":
|
||||
docs_init = [docs[0]]
|
||||
docs.pop(0)
|
||||
|
||||
store = FaissStore(path=f"{folder_name}", embeddings_key=os.getenv("EMBEDDINGS_KEY"))
|
||||
|
||||
store = VectorCreator.create_vectorstore(
|
||||
settings.VECTOR_STORE,
|
||||
docs_init = docs_init,
|
||||
path=f"{folder_name}",
|
||||
embeddings_key=os.getenv("EMBEDDINGS_KEY")
|
||||
)
|
||||
else:
|
||||
store = VectorCreator.create_vectorstore(
|
||||
settings.VECTOR_STORE,
|
||||
path=f"{folder_name}",
|
||||
embeddings_key=os.getenv("EMBEDDINGS_KEY")
|
||||
)
|
||||
# Uncomment for MPNet embeddings
|
||||
# model_name = "sentence-transformers/all-mpnet-base-v2"
|
||||
# hf = HuggingFaceEmbeddings(model_name=model_name)
|
||||
@ -54,7 +68,8 @@ def call_openai_api(docs, folder_name, task_status):
|
||||
store.save_local(f"{folder_name}")
|
||||
break
|
||||
c1 += 1
|
||||
store.save_local(f"{folder_name}")
|
||||
if settings.VECTOR_STORE == "faiss":
|
||||
store.save_local(f"{folder_name}")
|
||||
|
||||
|
||||
def get_user_permission(docs, folder_name):
|
||||
|
@ -1,22 +1,38 @@
|
||||
from application.vectorstore.base import BaseVectorStore
|
||||
from application.core.settings import settings
|
||||
import elasticsearch
|
||||
#from langchain.vectorstores.elasticsearch import ElasticsearchStore
|
||||
|
||||
class Document(str):
|
||||
"""Class for storing a piece of text and associated metadata."""
|
||||
|
||||
page_content: str
|
||||
"""String text."""
|
||||
metadata: dict
|
||||
"""Arbitrary metadata"""
|
||||
|
||||
|
||||
class ElasticsearchStore(BaseVectorStore):
|
||||
_es_connection = None # Class attribute to hold the Elasticsearch connection
|
||||
|
||||
def __init__(self, path, embeddings_key, index_name="docsgpt"):
|
||||
def __init__(self, path, embeddings_key, index_name=settings.ELASTIC_INDEX):
|
||||
super().__init__()
|
||||
self.path = path.replace("/app/application/indexes/", "")
|
||||
self.path = path.replace("application/indexes/", "")
|
||||
self.embeddings_key = embeddings_key
|
||||
self.index_name = index_name
|
||||
|
||||
if ElasticsearchStore._es_connection is None:
|
||||
connection_params = {}
|
||||
connection_params["cloud_id"] = settings.ELASTIC_CLOUD_ID
|
||||
connection_params["basic_auth"] = (settings.ELASTIC_USERNAME, settings.ELASTIC_PASSWORD)
|
||||
if settings.ELASTIC_URL:
|
||||
connection_params["hosts"] = [settings.ELASTIC_URL]
|
||||
connection_params["http_auth"] = (settings.ELASTIC_USERNAME, settings.ELASTIC_PASSWORD)
|
||||
elif settings.ELASTIC_CLOUD_ID:
|
||||
connection_params["cloud_id"] = settings.ELASTIC_CLOUD_ID
|
||||
connection_params["basic_auth"] = (settings.ELASTIC_USERNAME, settings.ELASTIC_PASSWORD)
|
||||
else:
|
||||
raise ValueError("Please provide either elasticsearch_url or cloud_id.")
|
||||
|
||||
|
||||
|
||||
ElasticsearchStore._es_connection = elasticsearch.Elasticsearch(**connection_params)
|
||||
|
||||
self.docsearch = ElasticsearchStore._es_connection
|
||||
@ -94,106 +110,112 @@ class ElasticsearchStore(BaseVectorStore):
|
||||
},
|
||||
"rank": {"rrf": {}},
|
||||
}
|
||||
resp = self.docsearch.search(index=index_name, query=full_query['query'], size=k, knn=full_query['knn'])
|
||||
return resp
|
||||
resp = self.docsearch.search(index=self.index_name, query=full_query['query'], size=k, knn=full_query['knn'])
|
||||
# create Documnets objects from the results page_content ['_source']['text'], metadata ['_source']['metadata']
|
||||
import sys
|
||||
print(self.path, file=sys.stderr)
|
||||
print(resp, file=sys.stderr)
|
||||
doc_list = []
|
||||
for hit in resp['hits']['hits']:
|
||||
|
||||
doc_list.append(Document(page_content = hit['_source']['text'], metadata = hit['_source']['metadata']))
|
||||
return doc_list
|
||||
|
||||
def _create_index_if_not_exists(
|
||||
self, index_name, dims_length
|
||||
):
|
||||
def _create_index_if_not_exists(
|
||||
self, index_name, dims_length
|
||||
):
|
||||
|
||||
if self.client.indices.exists(index=index_name):
|
||||
print(f"Index {index_name} already exists.")
|
||||
if self._es_connection.indices.exists(index=index_name):
|
||||
print(f"Index {index_name} already exists.")
|
||||
|
||||
else:
|
||||
self.strategy.before_index_setup(
|
||||
client=self.client,
|
||||
text_field=self.query_field,
|
||||
vector_query_field=self.vector_query_field,
|
||||
)
|
||||
else:
|
||||
|
||||
indexSettings = self.index(
|
||||
dims_length=dims_length,
|
||||
)
|
||||
self.client.indices.create(index=index_name, **indexSettings)
|
||||
def index(
|
||||
self,
|
||||
dims_length,
|
||||
):
|
||||
indexSettings = self.index(
|
||||
dims_length=dims_length,
|
||||
)
|
||||
self._es_connection.indices.create(index=index_name, **indexSettings)
|
||||
|
||||
|
||||
return {
|
||||
"mappings": {
|
||||
"properties": {
|
||||
"vector": {
|
||||
"type": "dense_vector",
|
||||
"dims": dims_length,
|
||||
"index": True,
|
||||
"similarity": "cosine",
|
||||
},
|
||||
}
|
||||
def index(
|
||||
self,
|
||||
dims_length,
|
||||
):
|
||||
return {
|
||||
"mappings": {
|
||||
"properties": {
|
||||
"vector": {
|
||||
"type": "dense_vector",
|
||||
"dims": dims_length,
|
||||
"index": True,
|
||||
"similarity": "cosine",
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def add_texts(
|
||||
self,
|
||||
texts,
|
||||
metadatas = None,
|
||||
ids = None,
|
||||
refresh_indices = True,
|
||||
create_index_if_not_exists = True,
|
||||
bulk_kwargs = None,
|
||||
**kwargs,
|
||||
def add_texts(
|
||||
self,
|
||||
texts,
|
||||
metadatas = None,
|
||||
ids = None,
|
||||
refresh_indices = True,
|
||||
create_index_if_not_exists = True,
|
||||
bulk_kwargs = None,
|
||||
**kwargs,
|
||||
):
|
||||
|
||||
from elasticsearch.helpers import BulkIndexError, bulk
|
||||
|
||||
from elasticsearch.helpers import BulkIndexError, bulk
|
||||
|
||||
bulk_kwargs = bulk_kwargs or {}
|
||||
import uuid
|
||||
embeddings = []
|
||||
ids = ids or [str(uuid.uuid4()) for _ in texts]
|
||||
requests = []
|
||||
embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, self.embeddings_key)
|
||||
bulk_kwargs = bulk_kwargs or {}
|
||||
import uuid
|
||||
embeddings = []
|
||||
ids = ids or [str(uuid.uuid4()) for _ in texts]
|
||||
requests = []
|
||||
embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, self.embeddings_key)
|
||||
|
||||
vectors = embeddings.embed_documents(list(texts))
|
||||
vectors = embeddings.embed_documents(list(texts))
|
||||
|
||||
dims_length = len(vectors[0])
|
||||
dims_length = len(vectors[0])
|
||||
|
||||
if create_index_if_not_exists:
|
||||
self._create_index_if_not_exists(
|
||||
index_name=self.index_name, dims_length=dims_length
|
||||
if create_index_if_not_exists:
|
||||
self._create_index_if_not_exists(
|
||||
index_name=self.index_name, dims_length=dims_length
|
||||
)
|
||||
|
||||
for i, (text, vector) in enumerate(zip(texts, vectors)):
|
||||
metadata = metadatas[i] if metadatas else {}
|
||||
|
||||
requests.append(
|
||||
{
|
||||
"_op_type": "index",
|
||||
"_index": self.index_name,
|
||||
"text": text,
|
||||
"vector": vector,
|
||||
"metadata": metadata,
|
||||
"_id": ids[i],
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
if len(requests) > 0:
|
||||
try:
|
||||
success, failed = bulk(
|
||||
self._es_connection,
|
||||
requests,
|
||||
stats_only=True,
|
||||
refresh=refresh_indices,
|
||||
**bulk_kwargs,
|
||||
)
|
||||
return ids
|
||||
except BulkIndexError as e:
|
||||
print(f"Error adding texts: {e}")
|
||||
firstError = e.errors[0].get("index", {}).get("error", {})
|
||||
print(f"First error reason: {firstError.get('reason')}")
|
||||
raise e
|
||||
|
||||
for i, (text, vector) in enumerate(zip(texts, vectors)):
|
||||
metadata = metadatas[i] if metadatas else {}
|
||||
else:
|
||||
return []
|
||||
|
||||
requests.append(
|
||||
{
|
||||
"_op_type": "index",
|
||||
"_index": self.index_name,
|
||||
"text": text,
|
||||
"vector": vector,
|
||||
"metadata": metadata,
|
||||
"_id": ids[i],
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
if len(requests) > 0:
|
||||
try:
|
||||
success, failed = bulk(
|
||||
self.client,
|
||||
requests,
|
||||
stats_only=True,
|
||||
refresh=refresh_indices,
|
||||
**bulk_kwargs,
|
||||
)
|
||||
return ids
|
||||
except BulkIndexError as e:
|
||||
print(f"Error adding texts: {e}")
|
||||
firstError = e.errors[0].get("index", {}).get("error", {})
|
||||
print(f"First error reason: {firstError.get('reason')}")
|
||||
raise e
|
||||
|
||||
else:
|
||||
return []
|
||||
def delete_index(self):
|
||||
self._es_connection.delete_by_query(index=self.index_name, query={"match": {
|
||||
"metadata.filename.keyword": self.path}},)
|
||||
|
||||
|
@ -4,15 +4,23 @@ from application.core.settings import settings
|
||||
|
||||
class FaissStore(BaseVectorStore):
|
||||
|
||||
def __init__(self, path, embeddings_key):
|
||||
def __init__(self, path, embeddings_key, docs_init=None):
|
||||
super().__init__()
|
||||
self.path = path
|
||||
self.docsearch = FAISS.load_local(
|
||||
self.path, self._get_embeddings(settings.EMBEDDINGS_NAME, settings.EMBEDDINGS_KEY)
|
||||
)
|
||||
if docs_init:
|
||||
self.docsearch = FAISS.from_documents(
|
||||
docs_init, self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key)
|
||||
)
|
||||
else:
|
||||
self.docsearch = FAISS.load_local(
|
||||
self.path, self._get_embeddings(settings.EMBEDDINGS_NAME, settings.EMBEDDINGS_KEY)
|
||||
)
|
||||
|
||||
def search(self, *args, **kwargs):
|
||||
return self.docsearch.similarity_search(*args, **kwargs)
|
||||
|
||||
def add_texts(self, *args, **kwargs):
|
||||
return self.docsearch.add_texts(*args, **kwargs)
|
||||
|
||||
def save_local(self, *args, **kwargs):
|
||||
return self.docsearch.save_local(*args, **kwargs)
|
||||
|
16
application/vectorstore/vector_creator.py
Normal file
16
application/vectorstore/vector_creator.py
Normal file
@ -0,0 +1,16 @@
|
||||
from application.vectorstore.faiss import FaissStore
|
||||
from application.vectorstore.elasticsearch import ElasticsearchStore
|
||||
|
||||
|
||||
class VectorCreator:
|
||||
vectorstores = {
|
||||
'faiss': FaissStore,
|
||||
'elasticsearch':ElasticsearchStore
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def create_vectorstore(cls, type, *args, **kwargs):
|
||||
vectorstore_class = cls.vectorstores.get(type.lower())
|
||||
if not vectorstore_class:
|
||||
raise ValueError(f"No vectorstore class found for type {type}")
|
||||
return vectorstore_class(*args, **kwargs)
|
@ -83,11 +83,15 @@ def ingest_worker(self, directory, formats, name_job, filename, user):
|
||||
# get files from outputs/inputs/index.faiss and outputs/inputs/index.pkl
|
||||
# and send them to the server (provide user and name in form)
|
||||
file_data = {'name': name_job, 'user': user}
|
||||
files = {'file_faiss': open(full_path + '/index.faiss', 'rb'),
|
||||
'file_pkl': open(full_path + '/index.pkl', 'rb')}
|
||||
response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data)
|
||||
if settings.VECTOR_STORE == "faiss":
|
||||
files = {'file_faiss': open(full_path + '/index.faiss', 'rb'),
|
||||
'file_pkl': open(full_path + '/index.pkl', 'rb')}
|
||||
response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data)
|
||||
response = requests.get(urljoin(settings.API_URL, "/api/delete_old?path=" + full_path))
|
||||
else:
|
||||
response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data)
|
||||
|
||||
response = requests.get(urljoin(settings.API_URL, "/api/delete_old?path="))
|
||||
|
||||
# delete local
|
||||
shutil.rmtree(full_path)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user