From 60cfea112609df2ffdc48b97d05493a458479154 Mon Sep 17 00:00:00 2001 From: Siddhant Rai Date: Sat, 16 Mar 2024 20:22:05 +0530 Subject: [PATCH 1/3] feat: added reddit loader --- .gitignore | 1 + application/parser/remote/reddit_loader.py | 27 ++++ application/parser/remote/remote_creator.py | 10 +- application/worker.py | 132 ++++++++++++-------- frontend/src/components/Dropdown.tsx | 4 +- frontend/src/upload/Upload.tsx | 1 + 6 files changed, 117 insertions(+), 58 deletions(-) create mode 100644 application/parser/remote/reddit_loader.py diff --git a/.gitignore b/.gitignore index d7747efb..ac5ff190 100644 --- a/.gitignore +++ b/.gitignore @@ -75,6 +75,7 @@ target/ # Jupyter Notebook .ipynb_checkpoints +**/*.ipynb # IPython profile_default/ diff --git a/application/parser/remote/reddit_loader.py b/application/parser/remote/reddit_loader.py new file mode 100644 index 00000000..f377717b --- /dev/null +++ b/application/parser/remote/reddit_loader.py @@ -0,0 +1,27 @@ +from application.parser.remote.base import BaseRemote +from langchain_community.document_loaders import RedditPostsLoader + + +class RedditPostsLoaderRemote(BaseRemote): + def load_data(self, inputs): + client_id = inputs.get("client_id") + client_secret = inputs.get("client_secret") + user_agent = inputs.get("user_agent") + categories = inputs.get("categories", ["new", "hot"]) + mode = inputs.get("mode", "subreddit") + search_queries = inputs.get("search_queries") + self.loader = RedditPostsLoader( + client_id=client_id, + client_secret=client_secret, + user_agent=user_agent, + categories=categories, + mode=mode, + search_queries=search_queries, + ) + documents = [] + try: + documents.extend(self.loader.load()) + except Exception as e: + print(f"Error processing Data: {e}") + print(f"Loaded {len(documents)} documents from Reddit") + return documents[:5] diff --git a/application/parser/remote/remote_creator.py b/application/parser/remote/remote_creator.py index e45333d4..d2a58f8d 100644 --- a/application/parser/remote/remote_creator.py +++ b/application/parser/remote/remote_creator.py @@ -1,13 +1,15 @@ from application.parser.remote.sitemap_loader import SitemapLoader from application.parser.remote.crawler_loader import CrawlerLoader from application.parser.remote.web_loader import WebLoader +from application.parser.remote.reddit_loader import RedditPostsLoaderRemote class RemoteCreator: loaders = { - 'url': WebLoader, - 'sitemap': SitemapLoader, - 'crawler': CrawlerLoader + "url": WebLoader, + "sitemap": SitemapLoader, + "crawler": CrawlerLoader, + "reddit": RedditPostsLoaderRemote, } @classmethod @@ -15,4 +17,4 @@ class RemoteCreator: loader_class = cls.loaders.get(type.lower()) if not loader_class: raise ValueError(f"No LLM class found for type {type}") - return loader_class(*args, **kwargs) \ No newline at end of file + return loader_class(*args, **kwargs) diff --git a/application/worker.py b/application/worker.py index 21bb319f..b783c335 100644 --- a/application/worker.py +++ b/application/worker.py @@ -15,23 +15,27 @@ from application.parser.schema.base import Document from application.parser.token_func import group_split try: - nltk.download('punkt', quiet=True) - nltk.download('averaged_perceptron_tagger', quiet=True) + nltk.download("punkt", quiet=True) + nltk.download("averaged_perceptron_tagger", quiet=True) except FileExistsError: pass # Define a function to extract metadata from a given filename. def metadata_from_filename(title): - store = '/'.join(title.split('/')[1:3]) - return {'title': title, 'store': store} + store = "/".join(title.split("/")[1:3]) + return {"title": title, "store": store} # Define a function to generate a random string of a given length. def generate_random_string(length): - return ''.join([string.ascii_letters[i % 52] for i in range(length)]) + return "".join([string.ascii_letters[i % 52] for i in range(length)]) + + +current_dir = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +) -current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # Define the main function for ingesting and processing documents. def ingest_worker(self, directory, formats, name_job, filename, user): @@ -62,38 +66,52 @@ def ingest_worker(self, directory, formats, name_job, filename, user): token_check = True min_tokens = 150 max_tokens = 1250 - full_path = directory + '/' + user + '/' + name_job + full_path = directory + "/" + user + "/" + name_job import sys + print(full_path, file=sys.stderr) # check if API_URL env variable is set - file_data = {'name': name_job, 'file': filename, 'user': user} - response = requests.get(urljoin(settings.API_URL, "/api/download"), params=file_data) + file_data = {"name": name_job, "file": filename, "user": user} + response = requests.get( + urljoin(settings.API_URL, "/api/download"), params=file_data + ) # check if file is in the response print(response, file=sys.stderr) file = response.content if not os.path.exists(full_path): os.makedirs(full_path) - with open(full_path + '/' + filename, 'wb') as f: + with open(full_path + "/" + filename, "wb") as f: f.write(file) # check if file is .zip and extract it - if filename.endswith('.zip'): - with zipfile.ZipFile(full_path + '/' + filename, 'r') as zip_ref: + if filename.endswith(".zip"): + with zipfile.ZipFile(full_path + "/" + filename, "r") as zip_ref: zip_ref.extractall(full_path) - os.remove(full_path + '/' + filename) + os.remove(full_path + "/" + filename) - self.update_state(state='PROGRESS', meta={'current': 1}) + self.update_state(state="PROGRESS", meta={"current": 1}) - raw_docs = SimpleDirectoryReader(input_dir=full_path, input_files=input_files, recursive=recursive, - required_exts=formats, num_files_limit=limit, - exclude_hidden=exclude, file_metadata=metadata_from_filename).load_data() - raw_docs = group_split(documents=raw_docs, min_tokens=min_tokens, max_tokens=max_tokens, token_check=token_check) + raw_docs = SimpleDirectoryReader( + input_dir=full_path, + input_files=input_files, + recursive=recursive, + required_exts=formats, + num_files_limit=limit, + exclude_hidden=exclude, + file_metadata=metadata_from_filename, + ).load_data() + raw_docs = group_split( + documents=raw_docs, + min_tokens=min_tokens, + max_tokens=max_tokens, + token_check=token_check, + ) docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] call_openai_api(docs, full_path, self) - self.update_state(state='PROGRESS', meta={'current': 100}) + self.update_state(state="PROGRESS", meta={"current": 100}) if sample: for i in range(min(5, len(raw_docs))): @@ -101,70 +119,80 @@ def ingest_worker(self, directory, formats, name_job, filename, user): # get files from outputs/inputs/index.faiss and outputs/inputs/index.pkl # and send them to the server (provide user and name in form) - file_data = {'name': name_job, 'user': user} + file_data = {"name": name_job, "user": user} if settings.VECTOR_STORE == "faiss": - files = {'file_faiss': open(full_path + '/index.faiss', 'rb'), - 'file_pkl': open(full_path + '/index.pkl', 'rb')} - response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data) - response = requests.get(urljoin(settings.API_URL, "/api/delete_old?path=" + full_path)) + files = { + "file_faiss": open(full_path + "/index.faiss", "rb"), + "file_pkl": open(full_path + "/index.pkl", "rb"), + } + response = requests.post( + urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data + ) + response = requests.get( + urljoin(settings.API_URL, "/api/delete_old?path=" + full_path) + ) else: - response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data) + response = requests.post( + urljoin(settings.API_URL, "/api/upload_index"), data=file_data + ) - # delete local shutil.rmtree(full_path) return { - 'directory': directory, - 'formats': formats, - 'name_job': name_job, - 'filename': filename, - 'user': user, - 'limited': False + "directory": directory, + "formats": formats, + "name_job": name_job, + "filename": filename, + "user": user, + "limited": False, } -def remote_worker(self, source_data, name_job, user, directory = 'temp', loader = 'url'): + +def remote_worker(self, source_data, name_job, user, directory="temp", loader="url"): # sample = False token_check = True min_tokens = 150 max_tokens = 1250 - full_path = directory + '/' + user + '/' + name_job + full_path = directory + "/" + user + "/" + name_job if not os.path.exists(full_path): os.makedirs(full_path) - self.update_state(state='PROGRESS', meta={'current': 1}) - + self.update_state(state="PROGRESS", meta={"current": 1}) + # source_data {"data": [url]} for url type task just urls - + # Use RemoteCreator to load data from URL remote_loader = RemoteCreator.create_loader(loader) raw_docs = remote_loader.load_data(source_data) - docs = group_split(documents=raw_docs, min_tokens=min_tokens, max_tokens=max_tokens, token_check=token_check) + docs = group_split( + documents=raw_docs, + min_tokens=min_tokens, + max_tokens=max_tokens, + token_check=token_check, + ) - #docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] + # docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] call_openai_api(docs, full_path, self) - self.update_state(state='PROGRESS', meta={'current': 100}) - - + self.update_state(state="PROGRESS", meta={"current": 100}) # Proceed with uploading and cleaning as in the original function - file_data = {'name': name_job, 'user': user} + file_data = {"name": name_job, "user": user} if settings.VECTOR_STORE == "faiss": - files = {'file_faiss': open(full_path + '/index.faiss', 'rb'), - 'file_pkl': open(full_path + '/index.pkl', 'rb')} - requests.post(urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data) + files = { + "file_faiss": open(full_path + "/index.faiss", "rb"), + "file_pkl": open(full_path + "/index.pkl", "rb"), + } + requests.post( + urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data + ) requests.get(urljoin(settings.API_URL, "/api/delete_old?path=" + full_path)) else: requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data) shutil.rmtree(full_path) - return { - 'urls': source_data, - 'name_job': name_job, - 'user': user, - 'limited': False - } \ No newline at end of file + return {"urls": source_data, "name_job": name_job, "user": user, "limited": False} diff --git a/frontend/src/components/Dropdown.tsx b/frontend/src/components/Dropdown.tsx index 5654b430..7a4936b3 100644 --- a/frontend/src/components/Dropdown.tsx +++ b/frontend/src/components/Dropdown.tsx @@ -35,10 +35,10 @@ function Dropdown({ isOpen ? typeof selectedValue === 'string' ? 'rounded-t-xl' - : 'rounded-t-2xl' + : 'rounded-t-3xl' : typeof selectedValue === 'string' ? 'rounded-xl' - : 'rounded-full' + : 'rounded-3xl' }`} > {typeof selectedValue === 'string' ? ( diff --git a/frontend/src/upload/Upload.tsx b/frontend/src/upload/Upload.tsx index dae5656b..6870ee26 100644 --- a/frontend/src/upload/Upload.tsx +++ b/frontend/src/upload/Upload.tsx @@ -21,6 +21,7 @@ export default function Upload({ { label: 'Crawler', value: 'crawler' }, // { label: 'Sitemap', value: 'sitemap' }, { label: 'Link', value: 'url' }, + { label: 'Reddit', value: 'reddit' }, ]; const [urlType, setUrlType] = useState<{ label: string; value: string }>({ label: 'Link', From eed1bfbe50e191dbdf0e5d7aca15618cd77e7612 Mon Sep 17 00:00:00 2001 From: Siddhant Rai Date: Tue, 26 Mar 2024 16:07:44 +0530 Subject: [PATCH 2/3] feat: fields to handle reddit loader + minor changes --- application/parser/remote/reddit_loader.py | 22 ++-- application/worker.py | 2 +- frontend/src/upload/Upload.tsx | 134 +++++++++++++++++---- 3 files changed, 120 insertions(+), 38 deletions(-) diff --git a/application/parser/remote/reddit_loader.py b/application/parser/remote/reddit_loader.py index f377717b..3c9f93ea 100644 --- a/application/parser/remote/reddit_loader.py +++ b/application/parser/remote/reddit_loader.py @@ -4,12 +4,13 @@ from langchain_community.document_loaders import RedditPostsLoader class RedditPostsLoaderRemote(BaseRemote): def load_data(self, inputs): - client_id = inputs.get("client_id") - client_secret = inputs.get("client_secret") - user_agent = inputs.get("user_agent") - categories = inputs.get("categories", ["new", "hot"]) - mode = inputs.get("mode", "subreddit") - search_queries = inputs.get("search_queries") + data = eval(inputs) + client_id = data.get("client_id") + client_secret = data.get("client_secret") + user_agent = data.get("user_agent") + categories = data.get("categories", ["new", "hot"]) + mode = data.get("mode", "subreddit") + search_queries = data.get("search_queries") self.loader = RedditPostsLoader( client_id=client_id, client_secret=client_secret, @@ -17,11 +18,8 @@ class RedditPostsLoaderRemote(BaseRemote): categories=categories, mode=mode, search_queries=search_queries, + number_posts=10, ) - documents = [] - try: - documents.extend(self.loader.load()) - except Exception as e: - print(f"Error processing Data: {e}") + documents = self.loader.load() print(f"Loaded {len(documents)} documents from Reddit") - return documents[:5] + return documents diff --git a/application/worker.py b/application/worker.py index b783c335..3891fde9 100644 --- a/application/worker.py +++ b/application/worker.py @@ -149,7 +149,7 @@ def ingest_worker(self, directory, formats, name_job, filename, user): } -def remote_worker(self, source_data, name_job, user, directory="temp", loader="url"): +def remote_worker(self, source_data, name_job, user, loader, directory="temp"): # sample = False token_check = True min_tokens = 150 diff --git a/frontend/src/upload/Upload.tsx b/frontend/src/upload/Upload.tsx index 6870ee26..1614375d 100644 --- a/frontend/src/upload/Upload.tsx +++ b/frontend/src/upload/Upload.tsx @@ -17,6 +17,12 @@ export default function Upload({ const [docName, setDocName] = useState(''); const [urlName, setUrlName] = useState(''); const [url, setUrl] = useState(''); + const [redditData, setRedditData] = useState({ + client_id: '', + client_secret: '', + user_agent: '', + search_queries: [''], + }); const urlOptions: { label: string; value: string }[] = [ { label: 'Crawler', value: 'crawler' }, // { label: 'Sitemap', value: 'sitemap' }, @@ -164,7 +170,6 @@ export default function Upload({ }; const uploadRemote = () => { - console.log('here'); const formData = new FormData(); formData.append('name', urlName); formData.append('user', 'local'); @@ -172,6 +177,13 @@ export default function Upload({ formData.append('source', urlType?.value); } formData.append('data', url); + if ( + redditData.client_id.length > 0 && + redditData.client_secret.length > 0 + ) { + formData.set('name', 'other'); + formData.set('data', JSON.stringify(redditData)); + } const apiHost = import.meta.env.VITE_API_HOST; const xhr = new XMLHttpRequest(); xhr.upload.addEventListener('progress', (event) => { @@ -203,6 +215,19 @@ export default function Upload({ ['.docx'], }, }); + const handleChange = (e: React.ChangeEvent) => { + const { name, value } = e.target; + if (name === 'search_queries' && value.length > 0) { + setRedditData({ + ...redditData, + [name]: value.split(',').map((item) => item.trim()), + }); + } else + setRedditData({ + ...redditData, + [name]: value, + }); + }; let view; if (progress?.type === 'UPLOAD') { view = ; @@ -282,30 +307,89 @@ export default function Upload({ setUrlType(value) } /> - setUrlName(e.target.value)} - > -
- - Name - -
- setUrl(e.target.value)} - > -
- - Link - -
+ {urlType.label !== 'Reddit' ? ( + <> + setUrlName(e.target.value)} + > +
+ + Name + +
+ setUrl(e.target.value)} + > +
+ + Link + +
+ + ) : ( + <> + +
+ + Client ID + +
+ +
+ + Client secret + +
+ +
+ + User agent + +
+ +
+ + Search queries + +
+ + )} )}
From e01071426f1fc989f463943cfbbb16bf32086be9 Mon Sep 17 00:00:00 2001 From: Siddhant Rai Date: Wed, 27 Mar 2024 19:20:55 +0530 Subject: [PATCH 3/3] feat: field to pass number of posts as a parameter --- application/parser/remote/reddit_loader.py | 3 ++- frontend/src/upload/Upload.tsx | 14 ++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/application/parser/remote/reddit_loader.py b/application/parser/remote/reddit_loader.py index 3c9f93ea..0230653a 100644 --- a/application/parser/remote/reddit_loader.py +++ b/application/parser/remote/reddit_loader.py @@ -11,6 +11,7 @@ class RedditPostsLoaderRemote(BaseRemote): categories = data.get("categories", ["new", "hot"]) mode = data.get("mode", "subreddit") search_queries = data.get("search_queries") + number_posts = data.get("number_posts", 10) self.loader = RedditPostsLoader( client_id=client_id, client_secret=client_secret, @@ -18,7 +19,7 @@ class RedditPostsLoaderRemote(BaseRemote): categories=categories, mode=mode, search_queries=search_queries, - number_posts=10, + number_posts=number_posts, ) documents = self.loader.load() print(f"Loaded {len(documents)} documents from Reddit") diff --git a/frontend/src/upload/Upload.tsx b/frontend/src/upload/Upload.tsx index 1614375d..45fc4e1a 100644 --- a/frontend/src/upload/Upload.tsx +++ b/frontend/src/upload/Upload.tsx @@ -22,6 +22,7 @@ export default function Upload({ client_secret: '', user_agent: '', search_queries: [''], + number_posts: 10, }); const urlOptions: { label: string; value: string }[] = [ { label: 'Crawler', value: 'crawler' }, @@ -388,6 +389,19 @@ export default function Upload({ Search queries
+ +
+ + Number of posts + +
)}