feat: added reddit loader

pull/877/head
Siddhant Rai 2 months ago
parent 80a4a094af
commit 60cfea1126

1
.gitignore vendored

@ -75,6 +75,7 @@ target/
# Jupyter Notebook # Jupyter Notebook
.ipynb_checkpoints .ipynb_checkpoints
**/*.ipynb
# IPython # IPython
profile_default/ profile_default/

@ -0,0 +1,27 @@
from application.parser.remote.base import BaseRemote
from langchain_community.document_loaders import RedditPostsLoader
class RedditPostsLoaderRemote(BaseRemote):
def load_data(self, inputs):
client_id = inputs.get("client_id")
client_secret = inputs.get("client_secret")
user_agent = inputs.get("user_agent")
categories = inputs.get("categories", ["new", "hot"])
mode = inputs.get("mode", "subreddit")
search_queries = inputs.get("search_queries")
self.loader = RedditPostsLoader(
client_id=client_id,
client_secret=client_secret,
user_agent=user_agent,
categories=categories,
mode=mode,
search_queries=search_queries,
)
documents = []
try:
documents.extend(self.loader.load())
except Exception as e:
print(f"Error processing Data: {e}")
print(f"Loaded {len(documents)} documents from Reddit")
return documents[:5]

@ -1,13 +1,15 @@
from application.parser.remote.sitemap_loader import SitemapLoader from application.parser.remote.sitemap_loader import SitemapLoader
from application.parser.remote.crawler_loader import CrawlerLoader from application.parser.remote.crawler_loader import CrawlerLoader
from application.parser.remote.web_loader import WebLoader from application.parser.remote.web_loader import WebLoader
from application.parser.remote.reddit_loader import RedditPostsLoaderRemote
class RemoteCreator: class RemoteCreator:
loaders = { loaders = {
'url': WebLoader, "url": WebLoader,
'sitemap': SitemapLoader, "sitemap": SitemapLoader,
'crawler': CrawlerLoader "crawler": CrawlerLoader,
"reddit": RedditPostsLoaderRemote,
} }
@classmethod @classmethod
@ -15,4 +17,4 @@ class RemoteCreator:
loader_class = cls.loaders.get(type.lower()) loader_class = cls.loaders.get(type.lower())
if not loader_class: if not loader_class:
raise ValueError(f"No LLM class found for type {type}") raise ValueError(f"No LLM class found for type {type}")
return loader_class(*args, **kwargs) return loader_class(*args, **kwargs)

@ -15,23 +15,27 @@ from application.parser.schema.base import Document
from application.parser.token_func import group_split from application.parser.token_func import group_split
try: try:
nltk.download('punkt', quiet=True) nltk.download("punkt", quiet=True)
nltk.download('averaged_perceptron_tagger', quiet=True) nltk.download("averaged_perceptron_tagger", quiet=True)
except FileExistsError: except FileExistsError:
pass pass
# Define a function to extract metadata from a given filename. # Define a function to extract metadata from a given filename.
def metadata_from_filename(title): def metadata_from_filename(title):
store = '/'.join(title.split('/')[1:3]) store = "/".join(title.split("/")[1:3])
return {'title': title, 'store': store} return {"title": title, "store": store}
# Define a function to generate a random string of a given length. # Define a function to generate a random string of a given length.
def generate_random_string(length): def generate_random_string(length):
return ''.join([string.ascii_letters[i % 52] for i in range(length)]) return "".join([string.ascii_letters[i % 52] for i in range(length)])
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Define the main function for ingesting and processing documents. # Define the main function for ingesting and processing documents.
def ingest_worker(self, directory, formats, name_job, filename, user): def ingest_worker(self, directory, formats, name_job, filename, user):
@ -62,38 +66,52 @@ def ingest_worker(self, directory, formats, name_job, filename, user):
token_check = True token_check = True
min_tokens = 150 min_tokens = 150
max_tokens = 1250 max_tokens = 1250
full_path = directory + '/' + user + '/' + name_job full_path = directory + "/" + user + "/" + name_job
import sys import sys
print(full_path, file=sys.stderr) print(full_path, file=sys.stderr)
# check if API_URL env variable is set # check if API_URL env variable is set
file_data = {'name': name_job, 'file': filename, 'user': user} file_data = {"name": name_job, "file": filename, "user": user}
response = requests.get(urljoin(settings.API_URL, "/api/download"), params=file_data) response = requests.get(
urljoin(settings.API_URL, "/api/download"), params=file_data
)
# check if file is in the response # check if file is in the response
print(response, file=sys.stderr) print(response, file=sys.stderr)
file = response.content file = response.content
if not os.path.exists(full_path): if not os.path.exists(full_path):
os.makedirs(full_path) os.makedirs(full_path)
with open(full_path + '/' + filename, 'wb') as f: with open(full_path + "/" + filename, "wb") as f:
f.write(file) f.write(file)
# check if file is .zip and extract it # check if file is .zip and extract it
if filename.endswith('.zip'): if filename.endswith(".zip"):
with zipfile.ZipFile(full_path + '/' + filename, 'r') as zip_ref: with zipfile.ZipFile(full_path + "/" + filename, "r") as zip_ref:
zip_ref.extractall(full_path) zip_ref.extractall(full_path)
os.remove(full_path + '/' + filename) os.remove(full_path + "/" + filename)
self.update_state(state='PROGRESS', meta={'current': 1}) self.update_state(state="PROGRESS", meta={"current": 1})
raw_docs = SimpleDirectoryReader(input_dir=full_path, input_files=input_files, recursive=recursive, raw_docs = SimpleDirectoryReader(
required_exts=formats, num_files_limit=limit, input_dir=full_path,
exclude_hidden=exclude, file_metadata=metadata_from_filename).load_data() input_files=input_files,
raw_docs = group_split(documents=raw_docs, min_tokens=min_tokens, max_tokens=max_tokens, token_check=token_check) recursive=recursive,
required_exts=formats,
num_files_limit=limit,
exclude_hidden=exclude,
file_metadata=metadata_from_filename,
).load_data()
raw_docs = group_split(
documents=raw_docs,
min_tokens=min_tokens,
max_tokens=max_tokens,
token_check=token_check,
)
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
call_openai_api(docs, full_path, self) call_openai_api(docs, full_path, self)
self.update_state(state='PROGRESS', meta={'current': 100}) self.update_state(state="PROGRESS", meta={"current": 100})
if sample: if sample:
for i in range(min(5, len(raw_docs))): for i in range(min(5, len(raw_docs))):
@ -101,70 +119,80 @@ def ingest_worker(self, directory, formats, name_job, filename, user):
# get files from outputs/inputs/index.faiss and outputs/inputs/index.pkl # get files from outputs/inputs/index.faiss and outputs/inputs/index.pkl
# and send them to the server (provide user and name in form) # and send them to the server (provide user and name in form)
file_data = {'name': name_job, 'user': user} file_data = {"name": name_job, "user": user}
if settings.VECTOR_STORE == "faiss": if settings.VECTOR_STORE == "faiss":
files = {'file_faiss': open(full_path + '/index.faiss', 'rb'), files = {
'file_pkl': open(full_path + '/index.pkl', 'rb')} "file_faiss": open(full_path + "/index.faiss", "rb"),
response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data) "file_pkl": open(full_path + "/index.pkl", "rb"),
response = requests.get(urljoin(settings.API_URL, "/api/delete_old?path=" + full_path)) }
response = requests.post(
urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data
)
response = requests.get(
urljoin(settings.API_URL, "/api/delete_old?path=" + full_path)
)
else: else:
response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data) response = requests.post(
urljoin(settings.API_URL, "/api/upload_index"), data=file_data
)
# delete local # delete local
shutil.rmtree(full_path) shutil.rmtree(full_path)
return { return {
'directory': directory, "directory": directory,
'formats': formats, "formats": formats,
'name_job': name_job, "name_job": name_job,
'filename': filename, "filename": filename,
'user': user, "user": user,
'limited': False "limited": False,
} }
def remote_worker(self, source_data, name_job, user, directory = 'temp', loader = 'url'):
def remote_worker(self, source_data, name_job, user, directory="temp", loader="url"):
# sample = False # sample = False
token_check = True token_check = True
min_tokens = 150 min_tokens = 150
max_tokens = 1250 max_tokens = 1250
full_path = directory + '/' + user + '/' + name_job full_path = directory + "/" + user + "/" + name_job
if not os.path.exists(full_path): if not os.path.exists(full_path):
os.makedirs(full_path) os.makedirs(full_path)
self.update_state(state='PROGRESS', meta={'current': 1}) self.update_state(state="PROGRESS", meta={"current": 1})
# source_data {"data": [url]} for url type task just urls # source_data {"data": [url]} for url type task just urls
# Use RemoteCreator to load data from URL # Use RemoteCreator to load data from URL
remote_loader = RemoteCreator.create_loader(loader) remote_loader = RemoteCreator.create_loader(loader)
raw_docs = remote_loader.load_data(source_data) raw_docs = remote_loader.load_data(source_data)
docs = group_split(documents=raw_docs, min_tokens=min_tokens, max_tokens=max_tokens, token_check=token_check) docs = group_split(
documents=raw_docs,
min_tokens=min_tokens,
max_tokens=max_tokens,
token_check=token_check,
)
#docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] # docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
call_openai_api(docs, full_path, self) call_openai_api(docs, full_path, self)
self.update_state(state='PROGRESS', meta={'current': 100}) self.update_state(state="PROGRESS", meta={"current": 100})
# Proceed with uploading and cleaning as in the original function # Proceed with uploading and cleaning as in the original function
file_data = {'name': name_job, 'user': user} file_data = {"name": name_job, "user": user}
if settings.VECTOR_STORE == "faiss": if settings.VECTOR_STORE == "faiss":
files = {'file_faiss': open(full_path + '/index.faiss', 'rb'), files = {
'file_pkl': open(full_path + '/index.pkl', 'rb')} "file_faiss": open(full_path + "/index.faiss", "rb"),
requests.post(urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data) "file_pkl": open(full_path + "/index.pkl", "rb"),
}
requests.post(
urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data
)
requests.get(urljoin(settings.API_URL, "/api/delete_old?path=" + full_path)) requests.get(urljoin(settings.API_URL, "/api/delete_old?path=" + full_path))
else: else:
requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data) requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data)
shutil.rmtree(full_path) shutil.rmtree(full_path)
return { return {"urls": source_data, "name_job": name_job, "user": user, "limited": False}
'urls': source_data,
'name_job': name_job,
'user': user,
'limited': False
}

@ -35,10 +35,10 @@ function Dropdown({
isOpen isOpen
? typeof selectedValue === 'string' ? typeof selectedValue === 'string'
? 'rounded-t-xl' ? 'rounded-t-xl'
: 'rounded-t-2xl' : 'rounded-t-3xl'
: typeof selectedValue === 'string' : typeof selectedValue === 'string'
? 'rounded-xl' ? 'rounded-xl'
: 'rounded-full' : 'rounded-3xl'
}`} }`}
> >
{typeof selectedValue === 'string' ? ( {typeof selectedValue === 'string' ? (

@ -21,6 +21,7 @@ export default function Upload({
{ label: 'Crawler', value: 'crawler' }, { label: 'Crawler', value: 'crawler' },
// { label: 'Sitemap', value: 'sitemap' }, // { label: 'Sitemap', value: 'sitemap' },
{ label: 'Link', value: 'url' }, { label: 'Link', value: 'url' },
{ label: 'Reddit', value: 'reddit' },
]; ];
const [urlType, setUrlType] = useState<{ label: string; value: string }>({ const [urlType, setUrlType] = useState<{ label: string; value: string }>({
label: 'Link', label: 'Link',

Loading…
Cancel
Save