You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
DocsGPT/application/worker.py

199 lines
6.3 KiB
Python

import os
import shutil
import string
import zipfile
from urllib.parse import urljoin
import nltk
import requests
from application.core.settings import settings
from application.parser.file.bulk import SimpleDirectoryReader
from application.parser.remote.remote_creator import RemoteCreator
from application.parser.open_ai_func import call_openai_api
from application.parser.schema.base import Document
from application.parser.token_func import group_split
try:
nltk.download("punkt", quiet=True)
nltk.download("averaged_perceptron_tagger", quiet=True)
except FileExistsError:
pass
# Define a function to extract metadata from a given filename.
def metadata_from_filename(title):
store = "/".join(title.split("/")[1:3])
return {"title": title, "store": store}
# Define a function to generate a random string of a given length.
2 years ago
def generate_random_string(length):
return "".join([string.ascii_letters[i % 52] for i in range(length)])
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
2 years ago
# Define the main function for ingesting and processing documents.
def ingest_worker(self, directory, formats, name_job, filename, user):
"""
Ingest and process documents.
Args:
self: Reference to the instance of the task.
directory (str): Specifies the directory for ingesting ('inputs' or 'temp').
formats (list of str): List of file extensions to consider for ingestion (e.g., [".rst", ".md"]).
name_job (str): Name of the job for this ingestion task.
filename (str): Name of the file to be ingested.
user (str): Identifier for the user initiating the ingestion.
Returns:
dict: Information about the completed ingestion task, including input parameters and a "limited" flag.
"""
2 years ago
# directory = 'inputs' or 'temp'
# formats = [".rst", ".md"]
input_files = None
recursive = True
limit = None
exclude = True
# name_job = 'job1'
# filename = 'install.rst'
# user = 'local'
2 years ago
sample = False
token_check = True
min_tokens = 150
max_tokens = 1250
full_path = directory + "/" + user + "/" + name_job
12 months ago
import sys
12 months ago
print(full_path, file=sys.stderr)
# check if API_URL env variable is set
file_data = {"name": name_job, "file": filename, "user": user}
response = requests.get(
urljoin(settings.API_URL, "/api/download"), params=file_data
)
12 months ago
# check if file is in the response
print(response, file=sys.stderr)
file = response.content
2 years ago
if not os.path.exists(full_path):
os.makedirs(full_path)
with open(full_path + "/" + filename, "wb") as f:
f.write(file)
# check if file is .zip and extract it
if filename.endswith(".zip"):
with zipfile.ZipFile(full_path + "/" + filename, "r") as zip_ref:
2 years ago
zip_ref.extractall(full_path)
os.remove(full_path + "/" + filename)
self.update_state(state="PROGRESS", meta={"current": 1})
raw_docs = SimpleDirectoryReader(
input_dir=full_path,
input_files=input_files,
recursive=recursive,
required_exts=formats,
num_files_limit=limit,
exclude_hidden=exclude,
file_metadata=metadata_from_filename,
).load_data()
raw_docs = group_split(
documents=raw_docs,
min_tokens=min_tokens,
max_tokens=max_tokens,
token_check=token_check,
)
2 years ago
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
2 years ago
call_openai_api(docs, full_path, self)
self.update_state(state="PROGRESS", meta={"current": 100})
if sample:
2 years ago
for i in range(min(5, len(raw_docs))):
print(raw_docs[i].text)
# get files from outputs/inputs/index.faiss and outputs/inputs/index.pkl
# and send them to the server (provide user and name in form)
file_data = {"name": name_job, "user": user}
12 months ago
if settings.VECTOR_STORE == "faiss":
files = {
"file_faiss": open(full_path + "/index.faiss", "rb"),
"file_pkl": open(full_path + "/index.pkl", "rb"),
}
response = requests.post(
urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data
)
response = requests.get(
urljoin(settings.API_URL, "/api/delete_old?path=" + full_path)
)
12 months ago
else:
response = requests.post(
urljoin(settings.API_URL, "/api/upload_index"), data=file_data
)
12 months ago
2 years ago
# delete local
shutil.rmtree(full_path)
return {
"directory": directory,
"formats": formats,
"name_job": name_job,
"filename": filename,
"user": user,
"limited": False,
}
def remote_worker(self, source_data, name_job, user, directory="temp", loader="url"):
# sample = False
token_check = True
min_tokens = 150
max_tokens = 1250
full_path = directory + "/" + user + "/" + name_job
if not os.path.exists(full_path):
os.makedirs(full_path)
self.update_state(state="PROGRESS", meta={"current": 1})
# source_data {"data": [url]} for url type task just urls
# Use RemoteCreator to load data from URL
remote_loader = RemoteCreator.create_loader(loader)
raw_docs = remote_loader.load_data(source_data)
docs = group_split(
documents=raw_docs,
min_tokens=min_tokens,
max_tokens=max_tokens,
token_check=token_check,
)
# docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
call_openai_api(docs, full_path, self)
self.update_state(state="PROGRESS", meta={"current": 100})
# Proceed with uploading and cleaning as in the original function
file_data = {"name": name_job, "user": user}
if settings.VECTOR_STORE == "faiss":
files = {
"file_faiss": open(full_path + "/index.faiss", "rb"),
"file_pkl": open(full_path + "/index.pkl", "rb"),
}
requests.post(
urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data
)
requests.get(urljoin(settings.API_URL, "/api/delete_old?path=" + full_path))
else:
requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data)
shutil.rmtree(full_path)
return {"urls": source_data, "name_job": name_job, "user": user, "limited": False}