Merge pull request #933 from siiddhantt/fix/remote-upload-issue

fix: remote upload error
pull/934/head
Alex 4 weeks ago committed by GitHub
commit de0193fffc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -20,9 +20,12 @@ vectors_collection = db["vectors"]
prompts_collection = db["prompts"] prompts_collection = db["prompts"]
feedback_collection = db["feedback"] feedback_collection = db["feedback"]
api_key_collection = db["api_keys"] api_key_collection = db["api_keys"]
user = Blueprint('user', __name__) user = Blueprint("user", __name__)
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@user.route("/api/delete_conversation", methods=["POST"]) @user.route("/api/delete_conversation", methods=["POST"])
def delete_conversation(): def delete_conversation():
@ -37,21 +40,25 @@ def delete_conversation():
return {"status": "ok"} return {"status": "ok"}
@user.route("/api/delete_all_conversations", methods=["POST"]) @user.route("/api/delete_all_conversations", methods=["POST"])
def delete_all_conversations(): def delete_all_conversations():
user_id = "local" user_id = "local"
conversations_collection.delete_many({"user":user_id}) conversations_collection.delete_many({"user": user_id})
return {"status": "ok"} return {"status": "ok"}
@user.route("/api/get_conversations", methods=["get"]) @user.route("/api/get_conversations", methods=["get"])
def get_conversations(): def get_conversations():
# provides a list of conversations # provides a list of conversations
conversations = conversations_collection.find().sort("date", -1).limit(30) conversations = conversations_collection.find().sort("date", -1).limit(30)
list_conversations = [] list_conversations = []
for conversation in conversations: for conversation in conversations:
list_conversations.append({"id": str(conversation["_id"]), "name": conversation["name"]}) list_conversations.append(
{"id": str(conversation["_id"]), "name": conversation["name"]}
)
#list_conversations = [{"id": "default", "name": "default"}, {"id": "jeff", "name": "jeff"}] # list_conversations = [{"id": "default", "name": "default"}, {"id": "jeff", "name": "jeff"}]
return jsonify(list_conversations) return jsonify(list_conversations)
@ -61,7 +68,8 @@ def get_single_conversation():
# provides data for a conversation # provides data for a conversation
conversation_id = request.args.get("id") conversation_id = request.args.get("id")
conversation = conversations_collection.find_one({"_id": ObjectId(conversation_id)}) conversation = conversations_collection.find_one({"_id": ObjectId(conversation_id)})
return jsonify(conversation['queries']) return jsonify(conversation["queries"])
@user.route("/api/update_conversation_name", methods=["POST"]) @user.route("/api/update_conversation_name", methods=["POST"])
def update_conversation_name(): def update_conversation_name():
@ -69,7 +77,7 @@ def update_conversation_name():
data = request.get_json() data = request.get_json()
id = data["id"] id = data["id"]
name = data["name"] name = data["name"]
conversations_collection.update_one({"_id": ObjectId(id)},{"$set":{"name":name}}) conversations_collection.update_one({"_id": ObjectId(id)}, {"$set": {"name": name}})
return {"status": "ok"} return {"status": "ok"}
@ -80,7 +88,6 @@ def api_feedback():
answer = data["answer"] answer = data["answer"]
feedback = data["feedback"] feedback = data["feedback"]
feedback_collection.insert_one( feedback_collection.insert_one(
{ {
"question": question, "question": question,
@ -90,6 +97,7 @@ def api_feedback():
) )
return {"status": "ok"} return {"status": "ok"}
@user.route("/api/delete_by_ids", methods=["get"]) @user.route("/api/delete_by_ids", methods=["get"])
def delete_by_ids(): def delete_by_ids():
"""Delete by ID. These are the IDs in the vectorstore""" """Delete by ID. These are the IDs in the vectorstore"""
@ -104,6 +112,7 @@ def delete_by_ids():
return {"status": "ok"} return {"status": "ok"}
return {"status": "error"} return {"status": "error"}
@user.route("/api/delete_old", methods=["get"]) @user.route("/api/delete_old", methods=["get"])
def delete_old(): def delete_old():
"""Delete old indexes.""" """Delete old indexes."""
@ -119,7 +128,7 @@ def delete_old():
if dirs_clean[0] not in ["indexes", "vectors"]: if dirs_clean[0] not in ["indexes", "vectors"]:
return {"status": "error"} return {"status": "error"}
path_clean = "/".join(dirs_clean) path_clean = "/".join(dirs_clean)
vectors_collection.delete_one({"name": dirs_clean[-1], 'user': dirs_clean[-2]}) vectors_collection.delete_one({"name": dirs_clean[-1], "user": dirs_clean[-2]})
if settings.VECTOR_STORE == "faiss": if settings.VECTOR_STORE == "faiss":
try: try:
shutil.rmtree(os.path.join(current_dir, path_clean)) shutil.rmtree(os.path.join(current_dir, path_clean))
@ -130,9 +139,10 @@ def delete_old():
settings.VECTOR_STORE, path=os.path.join(current_dir, path_clean) settings.VECTOR_STORE, path=os.path.join(current_dir, path_clean)
) )
vetorstore.delete_index() vetorstore.delete_index()
return {"status": "ok"} return {"status": "ok"}
@user.route("/api/upload", methods=["POST"]) @user.route("/api/upload", methods=["POST"])
def upload_file(): def upload_file():
"""Upload a file to get vectorized and indexed.""" """Upload a file to get vectorized and indexed."""
@ -144,27 +154,29 @@ def upload_file():
job_name = secure_filename(request.form["name"]) job_name = secure_filename(request.form["name"])
# check if the post request has the file part # check if the post request has the file part
files = request.files.getlist("file") files = request.files.getlist("file")
if not files or all(file.filename == '' for file in files): if not files or all(file.filename == "" for file in files):
return {"status": "no file name"} return {"status": "no file name"}
# Directory where files will be saved # Directory where files will be saved
save_dir = os.path.join(current_dir, settings.UPLOAD_FOLDER, user, job_name) save_dir = os.path.join(current_dir, settings.UPLOAD_FOLDER, user, job_name)
os.makedirs(save_dir, exist_ok=True) os.makedirs(save_dir, exist_ok=True)
if len(files) > 1: if len(files) > 1:
# Multiple files; prepare them for zip # Multiple files; prepare them for zip
temp_dir = os.path.join(save_dir, "temp") temp_dir = os.path.join(save_dir, "temp")
os.makedirs(temp_dir, exist_ok=True) os.makedirs(temp_dir, exist_ok=True)
for file in files: for file in files:
filename = secure_filename(file.filename) filename = secure_filename(file.filename)
file.save(os.path.join(temp_dir, filename)) file.save(os.path.join(temp_dir, filename))
# Use shutil.make_archive to zip the temp directory # Use shutil.make_archive to zip the temp directory
zip_path = shutil.make_archive(base_name=os.path.join(save_dir, job_name), format='zip', root_dir=temp_dir) zip_path = shutil.make_archive(
base_name=os.path.join(save_dir, job_name), format="zip", root_dir=temp_dir
)
final_filename = os.path.basename(zip_path) final_filename = os.path.basename(zip_path)
# Clean up the temporary directory after zipping # Clean up the temporary directory after zipping
shutil.rmtree(temp_dir) shutil.rmtree(temp_dir)
else: else:
@ -173,14 +185,19 @@ def upload_file():
final_filename = secure_filename(file.filename) final_filename = secure_filename(file.filename)
file_path = os.path.join(save_dir, final_filename) file_path = os.path.join(save_dir, final_filename)
file.save(file_path) file.save(file_path)
# Call ingest with the single file or zipped file # Call ingest with the single file or zipped file
task = ingest.delay(settings.UPLOAD_FOLDER, [".rst", ".md", ".pdf", ".txt", ".docx", task = ingest.delay(
".csv", ".epub", ".html", ".mdx"], settings.UPLOAD_FOLDER,
job_name, final_filename, user) [".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub", ".html", ".mdx"],
job_name,
final_filename,
user,
)
return {"status": "ok", "task_id": task.id} return {"status": "ok", "task_id": task.id}
@user.route("/api/remote", methods=["POST"]) @user.route("/api/remote", methods=["POST"])
def upload_remote(): def upload_remote():
"""Upload a remote source to get vectorized and indexed.""" """Upload a remote source to get vectorized and indexed."""
@ -193,25 +210,27 @@ def upload_remote():
if "name" not in request.form: if "name" not in request.form:
return {"status": "no name"} return {"status": "no name"}
job_name = secure_filename(request.form["name"]) job_name = secure_filename(request.form["name"])
# check if the post request has the file part
if "data" not in request.form: if "data" not in request.form:
print("No data") print("No data")
return {"status": "no data"} return {"status": "no data"}
source_data = request.form["data"] source_data = request.form["data"]
if source_data: if source_data:
task = ingest_remote.delay(source_data=source_data, job_name=job_name, user=user, loader=source) task = ingest_remote.delay(
# task id source_data=source_data, job_name=job_name, user=user, loader=source
)
task_id = task.id task_id = task.id
return {"status": "ok", "task_id": task_id} return {"status": "ok", "task_id": task_id}
else: else:
return {"status": "error"} return {"status": "error"}
@user.route("/api/task_status", methods=["GET"]) @user.route("/api/task_status", methods=["GET"])
def task_status(): def task_status():
"""Get celery job status.""" """Get celery job status."""
task_id = request.args.get("task_id") task_id = request.args.get("task_id")
from application.celery import celery from application.celery import celery
task = celery.AsyncResult(task_id) task = celery.AsyncResult(task_id)
task_meta = task.info task_meta = task.info
return {"status": task.status, "result": task_meta} return {"status": task.status, "result": task_meta}
@ -253,11 +272,13 @@ def combined_json():
} }
) )
if settings.VECTOR_STORE == "faiss": if settings.VECTOR_STORE == "faiss":
data_remote = requests.get("https://d3dg1063dc54p9.cloudfront.net/combined.json").json() data_remote = requests.get(
"https://d3dg1063dc54p9.cloudfront.net/combined.json"
).json()
for index in data_remote: for index in data_remote:
index["location"] = "remote" index["location"] = "remote"
data.append(index) data.append(index)
if 'duckduck_search' in settings.RETRIEVERS_ENABLED: if "duckduck_search" in settings.RETRIEVERS_ENABLED:
data.append( data.append(
{ {
"name": "DuckDuckGo Search", "name": "DuckDuckGo Search",
@ -271,7 +292,7 @@ def combined_json():
"location": "custom", "location": "custom",
} }
) )
if 'brave_search' in settings.RETRIEVERS_ENABLED: if "brave_search" in settings.RETRIEVERS_ENABLED:
data.append( data.append(
{ {
"name": "Brave Search", "name": "Brave Search",
@ -302,11 +323,11 @@ def check_docs():
return {"status": "exists"} return {"status": "exists"}
else: else:
file_url = urlparse(base_path + vectorstore + "index.faiss") file_url = urlparse(base_path + vectorstore + "index.faiss")
if ( if (
file_url.scheme in ['https'] and file_url.scheme in ["https"]
file_url.netloc == 'raw.githubusercontent.com' and and file_url.netloc == "raw.githubusercontent.com"
file_url.path.startswith('/arc53/DocsHUB/main/') and file_url.path.startswith("/arc53/DocsHUB/main/")
): ):
r = requests.get(file_url.geturl()) r = requests.get(file_url.geturl())
if r.status_code != 200: if r.status_code != 200:
@ -325,6 +346,7 @@ def check_docs():
return {"status": "loaded"} return {"status": "loaded"}
@user.route("/api/create_prompt", methods=["POST"]) @user.route("/api/create_prompt", methods=["POST"])
def create_prompt(): def create_prompt():
data = request.get_json() data = request.get_json()
@ -343,6 +365,7 @@ def create_prompt():
new_id = str(resp.inserted_id) new_id = str(resp.inserted_id)
return {"id": new_id} return {"id": new_id}
@user.route("/api/get_prompts", methods=["GET"]) @user.route("/api/get_prompts", methods=["GET"])
def get_prompts(): def get_prompts():
user = "local" user = "local"
@ -352,30 +375,39 @@ def get_prompts():
list_prompts.append({"id": "creative", "name": "creative", "type": "public"}) list_prompts.append({"id": "creative", "name": "creative", "type": "public"})
list_prompts.append({"id": "strict", "name": "strict", "type": "public"}) list_prompts.append({"id": "strict", "name": "strict", "type": "public"})
for prompt in prompts: for prompt in prompts:
list_prompts.append({"id": str(prompt["_id"]), "name": prompt["name"], "type": "private"}) list_prompts.append(
{"id": str(prompt["_id"]), "name": prompt["name"], "type": "private"}
)
return jsonify(list_prompts) return jsonify(list_prompts)
@user.route("/api/get_single_prompt", methods=["GET"]) @user.route("/api/get_single_prompt", methods=["GET"])
def get_single_prompt(): def get_single_prompt():
prompt_id = request.args.get("id") prompt_id = request.args.get("id")
if prompt_id == 'default': if prompt_id == "default":
with open(os.path.join(current_dir, "prompts", "chat_combine_default.txt"), "r") as f: with open(
os.path.join(current_dir, "prompts", "chat_combine_default.txt"), "r"
) as f:
chat_combine_template = f.read() chat_combine_template = f.read()
return jsonify({"content": chat_combine_template}) return jsonify({"content": chat_combine_template})
elif prompt_id == 'creative': elif prompt_id == "creative":
with open(os.path.join(current_dir, "prompts", "chat_combine_creative.txt"), "r") as f: with open(
os.path.join(current_dir, "prompts", "chat_combine_creative.txt"), "r"
) as f:
chat_reduce_creative = f.read() chat_reduce_creative = f.read()
return jsonify({"content": chat_reduce_creative}) return jsonify({"content": chat_reduce_creative})
elif prompt_id == 'strict': elif prompt_id == "strict":
with open(os.path.join(current_dir, "prompts", "chat_combine_strict.txt"), "r") as f: with open(
chat_reduce_strict = f.read() os.path.join(current_dir, "prompts", "chat_combine_strict.txt"), "r"
) as f:
chat_reduce_strict = f.read()
return jsonify({"content": chat_reduce_strict}) return jsonify({"content": chat_reduce_strict})
prompt = prompts_collection.find_one({"_id": ObjectId(prompt_id)}) prompt = prompts_collection.find_one({"_id": ObjectId(prompt_id)})
return jsonify({"content": prompt["content"]}) return jsonify({"content": prompt["content"]})
@user.route("/api/delete_prompt", methods=["POST"]) @user.route("/api/delete_prompt", methods=["POST"])
def delete_prompt(): def delete_prompt():
data = request.get_json() data = request.get_json()
@ -387,6 +419,7 @@ def delete_prompt():
) )
return {"status": "ok"} return {"status": "ok"}
@user.route("/api/update_prompt", methods=["POST"]) @user.route("/api/update_prompt", methods=["POST"])
def update_prompt_name(): def update_prompt_name():
data = request.get_json() data = request.get_json()
@ -396,27 +429,31 @@ def update_prompt_name():
# check if name is null # check if name is null
if name == "": if name == "":
return {"status": "error"} return {"status": "error"}
prompts_collection.update_one({"_id": ObjectId(id)},{"$set":{"name":name, "content": content}}) prompts_collection.update_one(
{"_id": ObjectId(id)}, {"$set": {"name": name, "content": content}}
)
return {"status": "ok"} return {"status": "ok"}
@user.route("/api/get_api_keys", methods=["GET"]) @user.route("/api/get_api_keys", methods=["GET"])
def get_api_keys(): def get_api_keys():
user = "local" user = "local"
keys = api_key_collection.find({"user": user}) keys = api_key_collection.find({"user": user})
list_keys = [] list_keys = []
for key in keys: for key in keys:
list_keys.append({ list_keys.append(
"id": str(key["_id"]), {
"name": key["name"], "id": str(key["_id"]),
"key": key["key"][:4] + "..." + key["key"][-4:], "name": key["name"],
"source": key["source"], "key": key["key"][:4] + "..." + key["key"][-4:],
"prompt_id": key["prompt_id"], "source": key["source"],
"chunks": key["chunks"] "prompt_id": key["prompt_id"],
}) "chunks": key["chunks"],
}
)
return jsonify(list_keys) return jsonify(list_keys)
@user.route("/api/create_api_key", methods=["POST"]) @user.route("/api/create_api_key", methods=["POST"])
def create_api_key(): def create_api_key():
data = request.get_json() data = request.get_json()
@ -433,12 +470,13 @@ def create_api_key():
"source": source, "source": source,
"user": user, "user": user,
"prompt_id": prompt_id, "prompt_id": prompt_id,
"chunks": chunks "chunks": chunks,
} }
) )
new_id = str(resp.inserted_id) new_id = str(resp.inserted_id)
return {"id": new_id, "key": key} return {"id": new_id, "key": key}
@user.route("/api/delete_api_key", methods=["POST"]) @user.route("/api/delete_api_key", methods=["POST"])
def delete_api_key(): def delete_api_key():
data = request.get_json() data = request.get_json()
@ -449,4 +487,3 @@ def delete_api_key():
} }
) )
return {"status": "ok"} return {"status": "ok"}

@ -15,7 +15,7 @@ def num_tokens_from_string(string: str, encoding_name: str) -> int:
# Function to convert string to tokens and estimate user cost. # Function to convert string to tokens and estimate user cost.
encoding = tiktoken.get_encoding(encoding_name) encoding = tiktoken.get_encoding(encoding_name)
num_tokens = len(encoding.encode(string)) num_tokens = len(encoding.encode(string))
total_price = ((num_tokens / 1000) * 0.0004) total_price = (num_tokens / 1000) * 0.0004
return num_tokens, total_price return num_tokens, total_price
@ -26,13 +26,13 @@ def store_add_texts_with_retry(store, i):
def call_openai_api(docs, folder_name, task_status): def call_openai_api(docs, folder_name, task_status):
# Function to create a vector store from the documents and save it to disk. # Function to create a vector store from the documents and save it to disk
# create output folder if it doesn't exist
if not os.path.exists(f"{folder_name}"): if not os.path.exists(f"{folder_name}"):
os.makedirs(f"{folder_name}") os.makedirs(f"{folder_name}")
from tqdm import tqdm from tqdm import tqdm
c1 = 0 c1 = 0
if settings.VECTOR_STORE == "faiss": if settings.VECTOR_STORE == "faiss":
docs_init = [docs[0]] docs_init = [docs[0]]
@ -40,25 +40,32 @@ def call_openai_api(docs, folder_name, task_status):
store = VectorCreator.create_vectorstore( store = VectorCreator.create_vectorstore(
settings.VECTOR_STORE, settings.VECTOR_STORE,
docs_init = docs_init, docs_init=docs_init,
path=f"{folder_name}", path=f"{folder_name}",
embeddings_key=os.getenv("EMBEDDINGS_KEY") embeddings_key=os.getenv("EMBEDDINGS_KEY"),
) )
else: else:
store = VectorCreator.create_vectorstore( store = VectorCreator.create_vectorstore(
settings.VECTOR_STORE, settings.VECTOR_STORE,
path=f"{folder_name}", path=f"{folder_name}",
embeddings_key=os.getenv("EMBEDDINGS_KEY") embeddings_key=os.getenv("EMBEDDINGS_KEY"),
) )
# Uncomment for MPNet embeddings # Uncomment for MPNet embeddings
# model_name = "sentence-transformers/all-mpnet-base-v2" # model_name = "sentence-transformers/all-mpnet-base-v2"
# hf = HuggingFaceEmbeddings(model_name=model_name) # hf = HuggingFaceEmbeddings(model_name=model_name)
# store = FAISS.from_documents(docs_test, hf) # store = FAISS.from_documents(docs_test, hf)
s1 = len(docs) s1 = len(docs)
for i in tqdm(docs, desc="Embedding 🦖", unit="docs", total=len(docs), for i in tqdm(
bar_format='{l_bar}{bar}| Time Left: {remaining}'): docs,
desc="Embedding 🦖",
unit="docs",
total=len(docs),
bar_format="{l_bar}{bar}| Time Left: {remaining}",
):
try: try:
task_status.update_state(state='PROGRESS', meta={'current': int((c1 / s1) * 100)}) task_status.update_state(
state="PROGRESS", meta={"current": int((c1 / s1) * 100)}
)
store_add_texts_with_retry(store, i) store_add_texts_with_retry(store, i)
except Exception as e: except Exception as e:
print(e) print(e)
@ -80,7 +87,9 @@ def get_user_permission(docs, folder_name):
for doc in docs: for doc in docs:
docs_content += doc.page_content docs_content += doc.page_content
tokens, total_price = num_tokens_from_string(string=docs_content, encoding_name="cl100k_base") tokens, total_price = num_tokens_from_string(
string=docs_content, encoding_name="cl100k_base"
)
# Here we print the number of tokens and the approx user cost with some visually appealing formatting. # Here we print the number of tokens and the approx user cost with some visually appealing formatting.
print(f"Number of Tokens = {format(tokens, ',d')}") print(f"Number of Tokens = {format(tokens, ',d')}")
print(f"Approx Cost = ${format(total_price, ',.2f')}") print(f"Approx Cost = ${format(total_price, ',.2f')}")

@ -1,22 +1,32 @@
from application.parser.remote.base import BaseRemote from application.parser.remote.base import BaseRemote
from langchain_community.document_loaders import WebBaseLoader
headers = {
"User-Agent": "Mozilla/5.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*"
";q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Referer": "https://www.google.com/",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}
class WebLoader(BaseRemote): class WebLoader(BaseRemote):
def __init__(self): def __init__(self):
from langchain.document_loaders import WebBaseLoader
self.loader = WebBaseLoader self.loader = WebBaseLoader
def load_data(self, inputs): def load_data(self, inputs):
urls = inputs urls = inputs
if isinstance(urls, str): if isinstance(urls, str):
urls = [urls] # Convert string to list if a single URL is passed urls = [urls]
documents = [] documents = []
for url in urls: for url in urls:
try: try:
loader = self.loader([url]) # Process URLs one by one loader = self.loader([url], header_template=headers)
documents.extend(loader.load()) documents.extend(loader.load())
except Exception as e: except Exception as e:
print(f"Error processing URL {url}: {e}") print(f"Error processing URL {url}: {e}")
continue # Continue with the next URL if an error occurs continue
return documents return documents

@ -36,6 +36,7 @@ current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))) os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
) )
def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5): def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5):
""" """
Recursively extract zip files with a limit on recursion depth. Recursively extract zip files with a limit on recursion depth.
@ -50,7 +51,7 @@ def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5):
print(f"Reached maximum recursion depth of {max_depth}") print(f"Reached maximum recursion depth of {max_depth}")
return return
with zipfile.ZipFile(zip_path, 'r') as zip_ref: with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(extract_to) zip_ref.extractall(extract_to)
os.remove(zip_path) # Remove the zip file after extracting os.remove(zip_path) # Remove the zip file after extracting
@ -96,7 +97,6 @@ def ingest_worker(self, directory, formats, name_job, filename, user):
full_path = os.path.join(directory, user, name_job) full_path = os.path.join(directory, user, name_job)
import sys import sys
print(full_path, file=sys.stderr) print(full_path, file=sys.stderr)
# check if API_URL env variable is set # check if API_URL env variable is set
file_data = {"name": name_job, "file": filename, "user": user} file_data = {"name": name_job, "file": filename, "user": user}
@ -114,7 +114,9 @@ def ingest_worker(self, directory, formats, name_job, filename, user):
# check if file is .zip and extract it # check if file is .zip and extract it
if filename.endswith(".zip"): if filename.endswith(".zip"):
extract_zip_recursive(os.path.join(full_path, filename), full_path, 0, recursion_depth) extract_zip_recursive(
os.path.join(full_path, filename), full_path, 0, recursion_depth
)
self.update_state(state="PROGRESS", meta={"current": 1}) self.update_state(state="PROGRESS", meta={"current": 1})
@ -176,7 +178,6 @@ def ingest_worker(self, directory, formats, name_job, filename, user):
def remote_worker(self, source_data, name_job, user, loader, directory="temp"): def remote_worker(self, source_data, name_job, user, loader, directory="temp"):
# sample = False
token_check = True token_check = True
min_tokens = 150 min_tokens = 150
max_tokens = 1250 max_tokens = 1250
@ -184,12 +185,8 @@ def remote_worker(self, source_data, name_job, user, loader, directory="temp"):
if not os.path.exists(full_path): if not os.path.exists(full_path):
os.makedirs(full_path) os.makedirs(full_path)
self.update_state(state="PROGRESS", meta={"current": 1}) self.update_state(state="PROGRESS", meta={"current": 1})
# source_data {"data": [url]} for url type task just urls
# Use RemoteCreator to load data from URL
remote_loader = RemoteCreator.create_loader(loader) remote_loader = RemoteCreator.create_loader(loader)
raw_docs = remote_loader.load_data(source_data) raw_docs = remote_loader.load_data(source_data)
@ -201,7 +198,6 @@ def remote_worker(self, source_data, name_job, user, loader, directory="temp"):
) )
# docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] # docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
call_openai_api(docs, full_path, self) call_openai_api(docs, full_path, self)
self.update_state(state="PROGRESS", meta={"current": 100}) self.update_state(state="PROGRESS", meta={"current": 100})

Loading…
Cancel
Save