diff --git a/application/api/user/routes.py b/application/api/user/routes.py index 4ab0d00..f61b4ad 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -20,9 +20,12 @@ vectors_collection = db["vectors"] prompts_collection = db["prompts"] feedback_collection = db["feedback"] api_key_collection = db["api_keys"] -user = Blueprint('user', __name__) +user = Blueprint("user", __name__) + +current_dir = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +) -current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) @user.route("/api/delete_conversation", methods=["POST"]) def delete_conversation(): @@ -37,21 +40,25 @@ def delete_conversation(): return {"status": "ok"} + @user.route("/api/delete_all_conversations", methods=["POST"]) def delete_all_conversations(): user_id = "local" - conversations_collection.delete_many({"user":user_id}) + conversations_collection.delete_many({"user": user_id}) return {"status": "ok"} + @user.route("/api/get_conversations", methods=["get"]) def get_conversations(): # provides a list of conversations conversations = conversations_collection.find().sort("date", -1).limit(30) list_conversations = [] for conversation in conversations: - list_conversations.append({"id": str(conversation["_id"]), "name": conversation["name"]}) + list_conversations.append( + {"id": str(conversation["_id"]), "name": conversation["name"]} + ) - #list_conversations = [{"id": "default", "name": "default"}, {"id": "jeff", "name": "jeff"}] + # list_conversations = [{"id": "default", "name": "default"}, {"id": "jeff", "name": "jeff"}] return jsonify(list_conversations) @@ -61,7 +68,8 @@ def get_single_conversation(): # provides data for a conversation conversation_id = request.args.get("id") conversation = conversations_collection.find_one({"_id": ObjectId(conversation_id)}) - return jsonify(conversation['queries']) + return jsonify(conversation["queries"]) + @user.route("/api/update_conversation_name", methods=["POST"]) def update_conversation_name(): @@ -69,7 +77,7 @@ def update_conversation_name(): data = request.get_json() id = data["id"] name = data["name"] - conversations_collection.update_one({"_id": ObjectId(id)},{"$set":{"name":name}}) + conversations_collection.update_one({"_id": ObjectId(id)}, {"$set": {"name": name}}) return {"status": "ok"} @@ -80,7 +88,6 @@ def api_feedback(): answer = data["answer"] feedback = data["feedback"] - feedback_collection.insert_one( { "question": question, @@ -90,6 +97,7 @@ def api_feedback(): ) return {"status": "ok"} + @user.route("/api/delete_by_ids", methods=["get"]) def delete_by_ids(): """Delete by ID. These are the IDs in the vectorstore""" @@ -104,6 +112,7 @@ def delete_by_ids(): return {"status": "ok"} return {"status": "error"} + @user.route("/api/delete_old", methods=["get"]) def delete_old(): """Delete old indexes.""" @@ -119,7 +128,7 @@ def delete_old(): if dirs_clean[0] not in ["indexes", "vectors"]: return {"status": "error"} path_clean = "/".join(dirs_clean) - vectors_collection.delete_one({"name": dirs_clean[-1], 'user': dirs_clean[-2]}) + vectors_collection.delete_one({"name": dirs_clean[-1], "user": dirs_clean[-2]}) if settings.VECTOR_STORE == "faiss": try: shutil.rmtree(os.path.join(current_dir, path_clean)) @@ -130,9 +139,10 @@ def delete_old(): settings.VECTOR_STORE, path=os.path.join(current_dir, path_clean) ) vetorstore.delete_index() - + return {"status": "ok"} + @user.route("/api/upload", methods=["POST"]) def upload_file(): """Upload a file to get vectorized and indexed.""" @@ -144,27 +154,29 @@ def upload_file(): job_name = secure_filename(request.form["name"]) # check if the post request has the file part files = request.files.getlist("file") - - if not files or all(file.filename == '' for file in files): + + if not files or all(file.filename == "" for file in files): return {"status": "no file name"} # Directory where files will be saved save_dir = os.path.join(current_dir, settings.UPLOAD_FOLDER, user, job_name) os.makedirs(save_dir, exist_ok=True) - + if len(files) > 1: # Multiple files; prepare them for zip temp_dir = os.path.join(save_dir, "temp") os.makedirs(temp_dir, exist_ok=True) - + for file in files: filename = secure_filename(file.filename) file.save(os.path.join(temp_dir, filename)) - + # Use shutil.make_archive to zip the temp directory - zip_path = shutil.make_archive(base_name=os.path.join(save_dir, job_name), format='zip', root_dir=temp_dir) + zip_path = shutil.make_archive( + base_name=os.path.join(save_dir, job_name), format="zip", root_dir=temp_dir + ) final_filename = os.path.basename(zip_path) - + # Clean up the temporary directory after zipping shutil.rmtree(temp_dir) else: @@ -173,14 +185,19 @@ def upload_file(): final_filename = secure_filename(file.filename) file_path = os.path.join(save_dir, final_filename) file.save(file_path) - + # Call ingest with the single file or zipped file - task = ingest.delay(settings.UPLOAD_FOLDER, [".rst", ".md", ".pdf", ".txt", ".docx", - ".csv", ".epub", ".html", ".mdx"], - job_name, final_filename, user) - + task = ingest.delay( + settings.UPLOAD_FOLDER, + [".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub", ".html", ".mdx"], + job_name, + final_filename, + user, + ) + return {"status": "ok", "task_id": task.id} - + + @user.route("/api/remote", methods=["POST"]) def upload_remote(): """Upload a remote source to get vectorized and indexed.""" @@ -193,25 +210,27 @@ def upload_remote(): if "name" not in request.form: return {"status": "no name"} job_name = secure_filename(request.form["name"]) - # check if the post request has the file part if "data" not in request.form: print("No data") return {"status": "no data"} source_data = request.form["data"] if source_data: - task = ingest_remote.delay(source_data=source_data, job_name=job_name, user=user, loader=source) - # task id + task = ingest_remote.delay( + source_data=source_data, job_name=job_name, user=user, loader=source + ) task_id = task.id return {"status": "ok", "task_id": task_id} else: return {"status": "error"} + @user.route("/api/task_status", methods=["GET"]) def task_status(): """Get celery job status.""" task_id = request.args.get("task_id") from application.celery import celery + task = celery.AsyncResult(task_id) task_meta = task.info return {"status": task.status, "result": task_meta} @@ -253,11 +272,13 @@ def combined_json(): } ) if settings.VECTOR_STORE == "faiss": - data_remote = requests.get("https://d3dg1063dc54p9.cloudfront.net/combined.json").json() + data_remote = requests.get( + "https://d3dg1063dc54p9.cloudfront.net/combined.json" + ).json() for index in data_remote: index["location"] = "remote" data.append(index) - if 'duckduck_search' in settings.RETRIEVERS_ENABLED: + if "duckduck_search" in settings.RETRIEVERS_ENABLED: data.append( { "name": "DuckDuckGo Search", @@ -271,7 +292,7 @@ def combined_json(): "location": "custom", } ) - if 'brave_search' in settings.RETRIEVERS_ENABLED: + if "brave_search" in settings.RETRIEVERS_ENABLED: data.append( { "name": "Brave Search", @@ -302,11 +323,11 @@ def check_docs(): return {"status": "exists"} else: file_url = urlparse(base_path + vectorstore + "index.faiss") - + if ( - file_url.scheme in ['https'] and - file_url.netloc == 'raw.githubusercontent.com' and - file_url.path.startswith('/arc53/DocsHUB/main/') + file_url.scheme in ["https"] + and file_url.netloc == "raw.githubusercontent.com" + and file_url.path.startswith("/arc53/DocsHUB/main/") ): r = requests.get(file_url.geturl()) if r.status_code != 200: @@ -325,6 +346,7 @@ def check_docs(): return {"status": "loaded"} + @user.route("/api/create_prompt", methods=["POST"]) def create_prompt(): data = request.get_json() @@ -343,6 +365,7 @@ def create_prompt(): new_id = str(resp.inserted_id) return {"id": new_id} + @user.route("/api/get_prompts", methods=["GET"]) def get_prompts(): user = "local" @@ -352,30 +375,39 @@ def get_prompts(): list_prompts.append({"id": "creative", "name": "creative", "type": "public"}) list_prompts.append({"id": "strict", "name": "strict", "type": "public"}) for prompt in prompts: - list_prompts.append({"id": str(prompt["_id"]), "name": prompt["name"], "type": "private"}) + list_prompts.append( + {"id": str(prompt["_id"]), "name": prompt["name"], "type": "private"} + ) return jsonify(list_prompts) + @user.route("/api/get_single_prompt", methods=["GET"]) def get_single_prompt(): prompt_id = request.args.get("id") - if prompt_id == 'default': - with open(os.path.join(current_dir, "prompts", "chat_combine_default.txt"), "r") as f: + if prompt_id == "default": + with open( + os.path.join(current_dir, "prompts", "chat_combine_default.txt"), "r" + ) as f: chat_combine_template = f.read() return jsonify({"content": chat_combine_template}) - elif prompt_id == 'creative': - with open(os.path.join(current_dir, "prompts", "chat_combine_creative.txt"), "r") as f: + elif prompt_id == "creative": + with open( + os.path.join(current_dir, "prompts", "chat_combine_creative.txt"), "r" + ) as f: chat_reduce_creative = f.read() return jsonify({"content": chat_reduce_creative}) - elif prompt_id == 'strict': - with open(os.path.join(current_dir, "prompts", "chat_combine_strict.txt"), "r") as f: - chat_reduce_strict = f.read() + elif prompt_id == "strict": + with open( + os.path.join(current_dir, "prompts", "chat_combine_strict.txt"), "r" + ) as f: + chat_reduce_strict = f.read() return jsonify({"content": chat_reduce_strict}) - prompt = prompts_collection.find_one({"_id": ObjectId(prompt_id)}) return jsonify({"content": prompt["content"]}) + @user.route("/api/delete_prompt", methods=["POST"]) def delete_prompt(): data = request.get_json() @@ -387,6 +419,7 @@ def delete_prompt(): ) return {"status": "ok"} + @user.route("/api/update_prompt", methods=["POST"]) def update_prompt_name(): data = request.get_json() @@ -396,27 +429,31 @@ def update_prompt_name(): # check if name is null if name == "": return {"status": "error"} - prompts_collection.update_one({"_id": ObjectId(id)},{"$set":{"name":name, "content": content}}) + prompts_collection.update_one( + {"_id": ObjectId(id)}, {"$set": {"name": name, "content": content}} + ) return {"status": "ok"} - @user.route("/api/get_api_keys", methods=["GET"]) def get_api_keys(): user = "local" keys = api_key_collection.find({"user": user}) list_keys = [] for key in keys: - list_keys.append({ - "id": str(key["_id"]), - "name": key["name"], - "key": key["key"][:4] + "..." + key["key"][-4:], - "source": key["source"], - "prompt_id": key["prompt_id"], - "chunks": key["chunks"] - }) + list_keys.append( + { + "id": str(key["_id"]), + "name": key["name"], + "key": key["key"][:4] + "..." + key["key"][-4:], + "source": key["source"], + "prompt_id": key["prompt_id"], + "chunks": key["chunks"], + } + ) return jsonify(list_keys) + @user.route("/api/create_api_key", methods=["POST"]) def create_api_key(): data = request.get_json() @@ -433,12 +470,13 @@ def create_api_key(): "source": source, "user": user, "prompt_id": prompt_id, - "chunks": chunks + "chunks": chunks, } ) new_id = str(resp.inserted_id) return {"id": new_id, "key": key} + @user.route("/api/delete_api_key", methods=["POST"]) def delete_api_key(): data = request.get_json() @@ -449,4 +487,3 @@ def delete_api_key(): } ) return {"status": "ok"} - diff --git a/application/parser/open_ai_func.py b/application/parser/open_ai_func.py index 784cde0..6a67c53 100644 --- a/application/parser/open_ai_func.py +++ b/application/parser/open_ai_func.py @@ -15,7 +15,7 @@ def num_tokens_from_string(string: str, encoding_name: str) -> int: # Function to convert string to tokens and estimate user cost. encoding = tiktoken.get_encoding(encoding_name) num_tokens = len(encoding.encode(string)) - total_price = ((num_tokens / 1000) * 0.0004) + total_price = (num_tokens / 1000) * 0.0004 return num_tokens, total_price @@ -26,13 +26,13 @@ def store_add_texts_with_retry(store, i): def call_openai_api(docs, folder_name, task_status): - # Function to create a vector store from the documents and save it to disk. + # Function to create a vector store from the documents and save it to disk - # create output folder if it doesn't exist if not os.path.exists(f"{folder_name}"): os.makedirs(f"{folder_name}") from tqdm import tqdm + c1 = 0 if settings.VECTOR_STORE == "faiss": docs_init = [docs[0]] @@ -40,25 +40,32 @@ def call_openai_api(docs, folder_name, task_status): store = VectorCreator.create_vectorstore( settings.VECTOR_STORE, - docs_init = docs_init, + docs_init=docs_init, path=f"{folder_name}", - embeddings_key=os.getenv("EMBEDDINGS_KEY") + embeddings_key=os.getenv("EMBEDDINGS_KEY"), ) else: store = VectorCreator.create_vectorstore( settings.VECTOR_STORE, path=f"{folder_name}", - embeddings_key=os.getenv("EMBEDDINGS_KEY") + embeddings_key=os.getenv("EMBEDDINGS_KEY"), ) # Uncomment for MPNet embeddings # model_name = "sentence-transformers/all-mpnet-base-v2" # hf = HuggingFaceEmbeddings(model_name=model_name) # store = FAISS.from_documents(docs_test, hf) s1 = len(docs) - for i in tqdm(docs, desc="Embedding 🦖", unit="docs", total=len(docs), - bar_format='{l_bar}{bar}| Time Left: {remaining}'): + for i in tqdm( + docs, + desc="Embedding 🦖", + unit="docs", + total=len(docs), + bar_format="{l_bar}{bar}| Time Left: {remaining}", + ): try: - task_status.update_state(state='PROGRESS', meta={'current': int((c1 / s1) * 100)}) + task_status.update_state( + state="PROGRESS", meta={"current": int((c1 / s1) * 100)} + ) store_add_texts_with_retry(store, i) except Exception as e: print(e) @@ -80,7 +87,9 @@ def get_user_permission(docs, folder_name): for doc in docs: docs_content += doc.page_content - tokens, total_price = num_tokens_from_string(string=docs_content, encoding_name="cl100k_base") + tokens, total_price = num_tokens_from_string( + string=docs_content, encoding_name="cl100k_base" + ) # Here we print the number of tokens and the approx user cost with some visually appealing formatting. print(f"Number of Tokens = {format(tokens, ',d')}") print(f"Approx Cost = ${format(total_price, ',.2f')}") diff --git a/application/parser/remote/web_loader.py b/application/parser/remote/web_loader.py index 9fc50c1..98564a0 100644 --- a/application/parser/remote/web_loader.py +++ b/application/parser/remote/web_loader.py @@ -1,22 +1,23 @@ from application.parser.remote.base import BaseRemote +from langchain_community.document_loaders import WebBaseLoader + class WebLoader(BaseRemote): def __init__(self): - from langchain.document_loaders import WebBaseLoader self.loader = WebBaseLoader def load_data(self, inputs): urls = inputs - if isinstance(urls, str): - urls = [urls] # Convert string to list if a single URL is passed - + urls = [urls] documents = [] for url in urls: try: - loader = self.loader([url]) # Process URLs one by one + loader = self.loader( + [url], header_template={"User-Agent": "Mozilla/5.0"} + ) documents.extend(loader.load()) except Exception as e: print(f"Error processing URL {url}: {e}") - continue # Continue with the next URL if an error occurs - return documents \ No newline at end of file + continue + return documents diff --git a/application/worker.py b/application/worker.py index eb28242..aebfa58 100644 --- a/application/worker.py +++ b/application/worker.py @@ -36,6 +36,7 @@ current_dir = os.path.dirname( os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ) + def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5): """ Recursively extract zip files with a limit on recursion depth. @@ -50,7 +51,7 @@ def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5): print(f"Reached maximum recursion depth of {max_depth}") return - with zipfile.ZipFile(zip_path, 'r') as zip_ref: + with zipfile.ZipFile(zip_path, "r") as zip_ref: zip_ref.extractall(extract_to) os.remove(zip_path) # Remove the zip file after extracting @@ -96,7 +97,6 @@ def ingest_worker(self, directory, formats, name_job, filename, user): full_path = os.path.join(directory, user, name_job) import sys - print(full_path, file=sys.stderr) # check if API_URL env variable is set file_data = {"name": name_job, "file": filename, "user": user} @@ -114,7 +114,9 @@ def ingest_worker(self, directory, formats, name_job, filename, user): # check if file is .zip and extract it if filename.endswith(".zip"): - extract_zip_recursive(os.path.join(full_path, filename), full_path, 0, recursion_depth) + extract_zip_recursive( + os.path.join(full_path, filename), full_path, 0, recursion_depth + ) self.update_state(state="PROGRESS", meta={"current": 1}) @@ -176,7 +178,6 @@ def ingest_worker(self, directory, formats, name_job, filename, user): def remote_worker(self, source_data, name_job, user, loader, directory="temp"): - # sample = False token_check = True min_tokens = 150 max_tokens = 1250 @@ -184,12 +185,8 @@ def remote_worker(self, source_data, name_job, user, loader, directory="temp"): if not os.path.exists(full_path): os.makedirs(full_path) - self.update_state(state="PROGRESS", meta={"current": 1}) - # source_data {"data": [url]} for url type task just urls - - # Use RemoteCreator to load data from URL remote_loader = RemoteCreator.create_loader(loader) raw_docs = remote_loader.load_data(source_data) @@ -201,7 +198,6 @@ def remote_worker(self, source_data, name_job, user, loader, directory="temp"): ) # docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] - call_openai_api(docs, full_path, self) self.update_state(state="PROGRESS", meta={"current": 100})