diff --git a/application/requirements.txt b/application/requirements.txt index e46c91f..9e8f73b 100644 --- a/application/requirements.txt +++ b/application/requirements.txt @@ -20,24 +20,32 @@ idna==3.4 imagesize==1.4.1 itsdangerous==2.1.2 Jinja2==3.1.2 -langchain==0.0.76 +joblib==1.2.0 +langchain==0.0.81 lxml==4.9.2 MarkupSafe==2.1.2 marshmallow==3.19.0 marshmallow-enum==1.5.1 multidict==6.0.4 mypy-extensions==0.4.3 +nltk==3.8.1 numpy==1.24.1 openai==0.26.4 packaging==23.0 +pandas==1.5.3 +Pillow==9.4.0 pycryptodomex==3.17 pydantic==1.10.4 Pygments==2.14.0 +PyPDF2==3.0.1 +python-dateutil==2.8.2 python-dotenv==0.21.1 +python-pptx==0.6.21 pytz==2022.7.1 PyYAML==6.0 regex==2022.10.31 requests==2.28.2 +six==1.16.0 snowballstemmer==2.2.0 Sphinx==6.1.3 sphinxcontrib-applehelp==1.0.4 @@ -47,6 +55,7 @@ sphinxcontrib-jsmath==1.0.1 sphinxcontrib-qthelp==1.0.3 sphinxcontrib-serializinghtml==1.1.5 SQLAlchemy==1.4.46 +tenacity==8.2.1 tiktoken==0.1.2 tokenizers==0.13.2 tqdm==4.64.1 @@ -55,4 +64,5 @@ typing-inspect==0.8.0 typing_extensions==4.4.0 urllib3==1.26.14 Werkzeug==2.2.2 +XlsxWriter==3.0.8 yarl==1.8.2 diff --git a/scripts/ingest.py b/scripts/ingest.py new file mode 100644 index 0000000..e805b76 --- /dev/null +++ b/scripts/ingest.py @@ -0,0 +1,37 @@ +import sys +import nltk +import dotenv + +from langchain.text_splitter import RecursiveCharacterTextSplitter + +from parser.file.bulk import SimpleDirectoryReader +from parser.schema.base import Document +from parser.open_ai_func import call_openai_api, get_user_permission + +dotenv.load_dotenv() + +#Specify your folder HERE +directory_to_ingest = 'data_test' + +nltk.download('punkt') +nltk.download('averaged_perceptron_tagger') + +#Splits all files in specified folder to documents +raw_docs = SimpleDirectoryReader(input_dir=directory_to_ingest).load_data() +raw_docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] +# Here we split the documents, as needed, into smaller chunks. +# We do this due to the context limits of the LLMs. +text_splitter = RecursiveCharacterTextSplitter() +docs = text_splitter.split_documents(raw_docs) + +# Here we check for command line arguments for bot calls. +# If no argument exists or the permission_bypass_flag argument is not '-y', +# user permission is requested to call the API. +if len(sys.argv) > 1: + permission_bypass_flag = sys.argv[1] + if permission_bypass_flag == '-y': + call_openai_api(docs) + else: + get_user_permission(docs) +else: + get_user_permission(docs) \ No newline at end of file diff --git a/scripts/parser/file/base.py b/scripts/parser/file/base.py new file mode 100644 index 0000000..c2777a0 --- /dev/null +++ b/scripts/parser/file/base.py @@ -0,0 +1,20 @@ +"""Base reader class.""" +from abc import abstractmethod +from typing import Any, List + +from langchain.docstore.document import Document as LCDocument + +from parser.schema.base import Document + + +class BaseReader: + """Utilities for loading data from a directory.""" + + @abstractmethod + def load_data(self, *args: Any, **load_kwargs: Any) -> List[Document]: + """Load data from the input directory.""" + + def load_langchain_documents(self, **load_kwargs: Any) -> List[LCDocument]: + """Load data in LangChain document format.""" + docs = self.load_data(**load_kwargs) + return [d.to_langchain_format() for d in docs] diff --git a/scripts/parser/file/base_parser.py b/scripts/parser/file/base_parser.py new file mode 100644 index 0000000..753a56f --- /dev/null +++ b/scripts/parser/file/base_parser.py @@ -0,0 +1,38 @@ +"""Base parser and config class.""" + +from abc import abstractmethod +from pathlib import Path +from typing import Dict, List, Optional, Union + + +class BaseParser: + """Base class for all parsers.""" + + def __init__(self, parser_config: Optional[Dict] = None): + """Init params.""" + self._parser_config = parser_config + + def init_parser(self) -> None: + """Init parser and store it.""" + parser_config = self._init_parser() + self._parser_config = parser_config + + @property + def parser_config_set(self) -> bool: + """Check if parser config is set.""" + return self._parser_config is not None + + @property + def parser_config(self) -> Dict: + """Check if parser config is set.""" + if self._parser_config is None: + raise ValueError("Parser config not set.") + return self._parser_config + + @abstractmethod + def _init_parser(self) -> Dict: + """Initialize the parser with the config.""" + + @abstractmethod + def parse_file(self, file: Path, errors: str = "ignore") -> Union[str, List[str]]: + """Parse file.""" diff --git a/scripts/parser/file/bulk.py b/scripts/parser/file/bulk.py new file mode 100644 index 0000000..7808186 --- /dev/null +++ b/scripts/parser/file/bulk.py @@ -0,0 +1,158 @@ +"""Simple reader that reads files of different formats from a directory.""" +import logging +from pathlib import Path +from typing import Callable, Dict, List, Optional, Union + +from parser.file.base import BaseReader +from parser.file.base_parser import BaseParser +from parser.file.docs_parser import DocxParser, PDFParser +from parser.file.epub_parser import EpubParser +from parser.file.markdown_parser import MarkdownParser +from parser.file.rst_parser import RstParser +from parser.file.tabular_parser import PandasCSVParser +from parser.schema.base import Document + +DEFAULT_FILE_EXTRACTOR: Dict[str, BaseParser] = { + ".pdf": PDFParser(), + ".docx": DocxParser(), + ".csv": PandasCSVParser(), + ".epub": EpubParser(), + ".md": MarkdownParser(), + ".rst": RstParser(), +} + + +class SimpleDirectoryReader(BaseReader): + """Simple directory reader. + + Can read files into separate documents, or concatenates + files into one document text. + + Args: + input_dir (str): Path to the directory. + input_files (List): List of file paths to read (Optional; overrides input_dir) + exclude_hidden (bool): Whether to exclude hidden files (dotfiles). + errors (str): how encoding and decoding errors are to be handled, + see https://docs.python.org/3/library/functions.html#open + recursive (bool): Whether to recursively search in subdirectories. + False by default. + required_exts (Optional[List[str]]): List of required extensions. + Default is None. + file_extractor (Optional[Dict[str, BaseParser]]): A mapping of file + extension to a BaseParser class that specifies how to convert that file + to text. See DEFAULT_FILE_EXTRACTOR. + num_files_limit (Optional[int]): Maximum number of files to read. + Default is None. + file_metadata (Optional[Callable[str, Dict]]): A function that takes + in a filename and returns a Dict of metadata for the Document. + Default is None. + """ + + def __init__( + self, + input_dir: Optional[str] = None, + input_files: Optional[List] = None, + exclude_hidden: bool = True, + errors: str = "ignore", + recursive: bool = True, + required_exts: Optional[List[str]] = None, + file_extractor: Optional[Dict[str, BaseParser]] = None, + num_files_limit: Optional[int] = None, + file_metadata: Optional[Callable[[str], Dict]] = None, + ) -> None: + """Initialize with parameters.""" + super().__init__() + + if not input_dir and not input_files: + raise ValueError("Must provide either `input_dir` or `input_files`.") + + self.errors = errors + + self.recursive = recursive + self.exclude_hidden = exclude_hidden + self.required_exts = required_exts + self.num_files_limit = num_files_limit + + if input_files: + self.input_files = [] + for path in input_files: + input_file = Path(path) + self.input_files.append(input_file) + elif input_dir: + self.input_dir = Path(input_dir) + self.input_files = self._add_files(self.input_dir) + + self.file_extractor = file_extractor or DEFAULT_FILE_EXTRACTOR + self.file_metadata = file_metadata + + def _add_files(self, input_dir: Path) -> List[Path]: + """Add files.""" + input_files = sorted(input_dir.iterdir()) + new_input_files = [] + dirs_to_explore = [] + for input_file in input_files: + if input_file.is_dir(): + if self.recursive: + dirs_to_explore.append(input_file) + elif self.exclude_hidden and input_file.name.startswith("."): + continue + elif ( + self.required_exts is not None + and input_file.suffix not in self.required_exts + ): + continue + else: + new_input_files.append(input_file) + + for dir_to_explore in dirs_to_explore: + sub_input_files = self._add_files(dir_to_explore) + new_input_files.extend(sub_input_files) + + if self.num_files_limit is not None and self.num_files_limit > 0: + new_input_files = new_input_files[0 : self.num_files_limit] + + # print total number of files added + logging.debug( + f"> [SimpleDirectoryReader] Total files added: {len(new_input_files)}" + ) + + return new_input_files + + def load_data(self, concatenate: bool = False) -> List[Document]: + """Load data from the input directory. + + Args: + concatenate (bool): whether to concatenate all files into one document. + If set to True, file metadata is ignored. + False by default. + + Returns: + List[Document]: A list of documents. + + """ + data: Union[str, List[str]] = "" + data_list: List[str] = [] + metadata_list = [] + for input_file in self.input_files: + if input_file.suffix in self.file_extractor: + parser = self.file_extractor[input_file.suffix] + if not parser.parser_config_set: + parser.init_parser() + data = parser.parse_file(input_file, errors=self.errors) + else: + # do standard read + with open(input_file, "r", errors=self.errors) as f: + data = f.read() + if isinstance(data, List): + data_list.extend(data) + else: + data_list.append(str(data)) + if self.file_metadata is not None: + metadata_list.append(self.file_metadata(str(input_file))) + + if concatenate: + return [Document("\n".join(data_list))] + elif self.file_metadata is not None: + return [Document(d, extra_info=m) for d, m in zip(data_list, metadata_list)] + else: + return [Document(d) for d in data_list] diff --git a/scripts/parser/file/docs_parser.py b/scripts/parser/file/docs_parser.py new file mode 100644 index 0000000..0cde407 --- /dev/null +++ b/scripts/parser/file/docs_parser.py @@ -0,0 +1,59 @@ +"""Docs parser. + +Contains parsers for docx, pdf files. + +""" +from pathlib import Path +from typing import Dict + +from parser.file.base_parser import BaseParser + + +class PDFParser(BaseParser): + """PDF parser.""" + + def _init_parser(self) -> Dict: + """Init parser.""" + return {} + + def parse_file(self, file: Path, errors: str = "ignore") -> str: + """Parse file.""" + try: + import PyPDF2 + except ImportError: + raise ValueError("PyPDF2 is required to read PDF files.") + text_list = [] + with open(file, "rb") as fp: + # Create a PDF object + pdf = PyPDF2.PdfReader(fp) + + # Get the number of pages in the PDF document + num_pages = len(pdf.pages) + + # Iterate over every page + for page in range(num_pages): + # Extract the text from the page + page_text = pdf.pages[page].extract_text() + text_list.append(page_text) + text = "\n".join(text_list) + + return text + + +class DocxParser(BaseParser): + """Docx parser.""" + + def _init_parser(self) -> Dict: + """Init parser.""" + return {} + + def parse_file(self, file: Path, errors: str = "ignore") -> str: + """Parse file.""" + try: + import docx2txt + except ImportError: + raise ValueError("docx2txt is required to read Microsoft Word files.") + + text = docx2txt.process(file) + + return text diff --git a/scripts/parser/file/epub_parser.py b/scripts/parser/file/epub_parser.py new file mode 100644 index 0000000..6ece5ec --- /dev/null +++ b/scripts/parser/file/epub_parser.py @@ -0,0 +1,43 @@ +"""Epub parser. + +Contains parsers for epub files. +""" + +from pathlib import Path +from typing import Dict + +from parser.file.base_parser import BaseParser + + +class EpubParser(BaseParser): + """Epub Parser.""" + + def _init_parser(self) -> Dict: + """Init parser.""" + return {} + + def parse_file(self, file: Path, errors: str = "ignore") -> str: + """Parse file.""" + try: + import ebooklib + from ebooklib import epub + except ImportError: + raise ValueError("`EbookLib` is required to read Epub files.") + try: + import html2text + except ImportError: + raise ValueError("`html2text` is required to parse Epub files.") + + text_list = [] + book = epub.read_epub(file, options={"ignore_ncx": True}) + + # Iterate through all chapters. + for item in book.get_items(): + # Chapters are typically located in epub documents items. + if item.get_type() == ebooklib.ITEM_DOCUMENT: + text_list.append( + html2text.html2text(item.get_content().decode("utf-8")) + ) + + text = "\n".join(text_list) + return text diff --git a/scripts/parser/file/markdown_parser.py b/scripts/parser/file/markdown_parser.py new file mode 100644 index 0000000..5c94ace --- /dev/null +++ b/scripts/parser/file/markdown_parser.py @@ -0,0 +1,130 @@ +"""Markdown parser. + +Contains parser for md files. + +""" +import re +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple, Union, cast + +from parser.file.base_parser import BaseParser + + +class MarkdownParser(BaseParser): + """Markdown parser. + + Extract text from markdown files. + Returns dictionary with keys as headers and values as the text between headers. + + """ + + def __init__( + self, + *args: Any, + remove_hyperlinks: bool = True, + remove_images: bool = True, + # remove_tables: bool = True, + **kwargs: Any, + ) -> None: + """Init params.""" + super().__init__(*args, **kwargs) + self._remove_hyperlinks = remove_hyperlinks + self._remove_images = remove_images + # self._remove_tables = remove_tables + + def markdown_to_tups(self, markdown_text: str) -> List[Tuple[Optional[str], str]]: + """Convert a markdown file to a dictionary. + + The keys are the headers and the values are the text under each header. + + """ + markdown_tups: List[Tuple[Optional[str], str]] = [] + lines = markdown_text.split("\n") + + current_header = None + current_text = "" + + for line in lines: + header_match = re.match(r"^#+\s", line) + if header_match: + if current_header is not None: + if current_text == "" or None: + continue + markdown_tups.append((current_header, current_text)) + + current_header = line + current_text = "" + else: + current_text += line + "\n" + markdown_tups.append((current_header, current_text)) + + if current_header is not None: + # pass linting, assert keys are defined + markdown_tups = [ + (re.sub(r"#", "", cast(str, key)).strip(), re.sub(r"<.*?>", "", value)) + for key, value in markdown_tups + ] + else: + markdown_tups = [ + (key, re.sub("\n", "", value)) for key, value in markdown_tups + ] + + return markdown_tups + + def remove_images(self, content: str) -> str: + """Get a dictionary of a markdown file from its path.""" + pattern = r"!{1}\[\[(.*)\]\]" + content = re.sub(pattern, "", content) + return content + + # def remove_tables(self, content: str) -> List[List[str]]: + # """Convert markdown tables to nested lists.""" + # table_rows_pattern = r"((\r?\n){2}|^)([^\r\n]*\|[^\r\n]*(\r?\n)?)+(?=(\r?\n){2}|$)" + # table_cells_pattern = r"([^\|\r\n]*)\|" + # + # table_rows = re.findall(table_rows_pattern, content, re.MULTILINE) + # table_lists = [] + # for row in table_rows: + # cells = re.findall(table_cells_pattern, row[2]) + # cells = [cell.strip() for cell in cells if cell.strip()] + # table_lists.append(cells) + # return str(table_lists) + + def remove_hyperlinks(self, content: str) -> str: + """Get a dictionary of a markdown file from its path.""" + pattern = r"\[(.*?)\]\((.*?)\)" + content = re.sub(pattern, r"\1", content) + return content + + def _init_parser(self) -> Dict: + """Initialize the parser with the config.""" + return {} + + def parse_tups( + self, filepath: Path, errors: str = "ignore" + ) -> List[Tuple[Optional[str], str]]: + """Parse file into tuples.""" + with open(filepath, "r") as f: + content = f.read() + if self._remove_hyperlinks: + content = self.remove_hyperlinks(content) + if self._remove_images: + content = self.remove_images(content) + # if self._remove_tables: + # content = self.remove_tables(content) + markdown_tups = self.markdown_to_tups(content) + return markdown_tups + + def parse_file( + self, filepath: Path, errors: str = "ignore" + ) -> Union[str, List[str]]: + """Parse file into string.""" + tups = self.parse_tups(filepath, errors=errors) + results = [] + # TODO: don't include headers right now + for header, value in tups: + if header is None: + results.append(value) + else: + results.append(f"\n\n{header}\n{value}") + return results diff --git a/scripts/parser/file/rst_parser.py b/scripts/parser/file/rst_parser.py new file mode 100644 index 0000000..0b887d4 --- /dev/null +++ b/scripts/parser/file/rst_parser.py @@ -0,0 +1,151 @@ +"""reStructuredText parser. + +Contains parser for md files. + +""" +import re +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple, Union, cast + +from parser.file.base_parser import BaseParser + + +class RstParser(BaseParser): + """reStructuredText parser. + + Extract text from .rst files. + Returns dictionary with keys as headers and values as the text between headers. + + """ + + def __init__( + self, + *args: Any, + remove_hyperlinks: bool = True, + remove_images: bool = True, + remove_table_excess: bool = True, + remove_whitespaces_excess: bool = True, + #Be carefull with remove_characters_excess, might cause data loss + remove_characters_excess: bool = True, + **kwargs: Any, + ) -> None: + """Init params.""" + super().__init__(*args, **kwargs) + self._remove_hyperlinks = remove_hyperlinks + self._remove_images = remove_images + self._remove_table_excess = remove_table_excess + self._remove_whitespaces_excess = remove_whitespaces_excess + self._remove_characters_excess = remove_characters_excess + + def rst_to_tups(self, rst_text: str) -> List[Tuple[Optional[str], str]]: + """Convert a reStructuredText file to a dictionary. + + The keys are the headers and the values are the text under each header. + + """ + rst_tups: List[Tuple[Optional[str], str]] = [] + lines = rst_text.split("\n") + + current_header = None + current_text = "" + + for i, line in enumerate(lines): + header_match = re.match(r"^[^\S\n]*[-=]+[^\S\n]*$", line) + if header_match and i > 0 and (len(lines[i - 1].strip()) == len(header_match.group().strip()) or lines[i - 2] == lines[i - 2]): + if current_header is not None: + if current_text == "" or None: + continue + # removes the next heading from current Document + if current_text.endswith(lines[i - 1] + "\n"): + current_text = current_text[:len(current_text) - len(lines[i - 1] + "\n")] + rst_tups.append((current_header, current_text)) + + current_header = lines[i - 1] + current_text = "" + else: + current_text += line + "\n" + rst_tups.append((current_header, current_text)) + + #TODO: Format for rst + # + # if current_header is not None: + # # pass linting, assert keys are defined + # rst_tups = [ + # (re.sub(r"#", "", cast(str, key)).strip(), re.sub(r"<.*?>", "", value)) + # for key, value in rst_tups + # ] + # else: + # rst_tups = [ + # (key, re.sub("\n", "", value)) for key, value in rst_tups + # ] + + if current_header is None: + rst_tups = [ + (key, re.sub("\n", "", value)) for key, value in rst_tups + ] + return rst_tups + + def remove_images(self, content: str) -> str: + pattern = r"\.\. image:: (.*)" + content = re.sub(pattern, "", content) + return content + + def remove_hyperlinks(self, content: str) -> str: + pattern = r"`(.*?) <(.*?)>`_" + content = re.sub(pattern, r"\1", content) + return content + + def remove_table_excess(self, content: str) -> str: + """Pattern to remove grid table separators""" + pattern = r"^\+[-]+\+[-]+\+$" + content = re.sub(pattern, "", content, flags=re.MULTILINE) + return content + + def remove_whitespaces_excess(self, content: List[Tuple[str, Any]]) -> List[Tuple[str, Any]]: + """Pattern to match 2 or more consecutive whitespaces""" + pattern = r"\s{2,}" + content = [(key, re.sub(pattern, " ", value)) for key, value in content] + return content + + def remove_characters_excess(self, content: List[Tuple[str, Any]]) -> List[Tuple[str, Any]]: + """Pattern to match 2 or more consecutive characters""" + pattern = r"(\S)\1{2,}" + content = [(key, re.sub(pattern, r"\1\1\1", value, flags=re.MULTILINE)) for key, value in content] + return content + + def _init_parser(self) -> Dict: + """Initialize the parser with the config.""" + return {} + + def parse_tups( + self, filepath: Path, errors: str = "ignore" + ) -> List[Tuple[Optional[str], str]]: + """Parse file into tuples.""" + with open(filepath, "r") as f: + content = f.read() + if self._remove_hyperlinks: + content = self.remove_hyperlinks(content) + if self._remove_images: + content = self.remove_images(content) + if self._remove_table_excess: + content = self.remove_table_excess(content) + rst_tups = self.rst_to_tups(content) + if self._remove_whitespaces_excess: + rst_tups = self.remove_whitespaces_excess(rst_tups) + if self._remove_characters_excess: + rst_tups = self.remove_characters_excess(rst_tups) + return rst_tups + + def parse_file( + self, filepath: Path, errors: str = "ignore" + ) -> Union[str, List[str]]: + """Parse file into string.""" + tups = self.parse_tups(filepath, errors=errors) + results = [] + # TODO: don't include headers right now + for header, value in tups: + if header is None: + results.append(value) + else: + results.append(f"\n\n{header}\n{value}") + return results diff --git a/scripts/parser/file/tabular_parser.py b/scripts/parser/file/tabular_parser.py new file mode 100644 index 0000000..bbb875e --- /dev/null +++ b/scripts/parser/file/tabular_parser.py @@ -0,0 +1,115 @@ +"""Tabular parser. + +Contains parsers for tabular data files. + +""" +from pathlib import Path +from typing import Any, Dict, List, Union + +from parser.file.base_parser import BaseParser + + +class CSVParser(BaseParser): + """CSV parser. + + Args: + concat_rows (bool): whether to concatenate all rows into one document. + If set to False, a Document will be created for each row. + True by default. + + """ + + def __init__(self, *args: Any, concat_rows: bool = True, **kwargs: Any) -> None: + """Init params.""" + super().__init__(*args, **kwargs) + self._concat_rows = concat_rows + + def _init_parser(self) -> Dict: + """Init parser.""" + return {} + + def parse_file(self, file: Path, errors: str = "ignore") -> Union[str, List[str]]: + """Parse file. + + Returns: + Union[str, List[str]]: a string or a List of strings. + + """ + try: + import csv + except ImportError: + raise ValueError("csv module is required to read CSV files.") + text_list = [] + with open(file, "r") as fp: + csv_reader = csv.reader(fp) + for row in csv_reader: + text_list.append(", ".join(row)) + if self._concat_rows: + return "\n".join(text_list) + else: + return text_list + + +class PandasCSVParser(BaseParser): + r"""Pandas-based CSV parser. + + Parses CSVs using the separator detection from Pandas `read_csv`function. + If special parameters are required, use the `pandas_config` dict. + + Args: + concat_rows (bool): whether to concatenate all rows into one document. + If set to False, a Document will be created for each row. + True by default. + + col_joiner (str): Separator to use for joining cols per row. + Set to ", " by default. + + row_joiner (str): Separator to use for joining each row. + Only used when `concat_rows=True`. + Set to "\n" by default. + + pandas_config (dict): Options for the `pandas.read_csv` function call. + Refer to https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html + for more information. + Set to empty dict by default, this means pandas will try to figure + out the separators, table head, etc. on its own. + + """ + + def __init__( + self, + *args: Any, + concat_rows: bool = True, + col_joiner: str = ", ", + row_joiner: str = "\n", + pandas_config: dict = {}, + **kwargs: Any + ) -> None: + """Init params.""" + super().__init__(*args, **kwargs) + self._concat_rows = concat_rows + self._col_joiner = col_joiner + self._row_joiner = row_joiner + self._pandas_config = pandas_config + + def _init_parser(self) -> Dict: + """Init parser.""" + return {} + + def parse_file(self, file: Path, errors: str = "ignore") -> Union[str, List[str]]: + """Parse file.""" + try: + import pandas as pd + except ImportError: + raise ValueError("pandas module is required to read CSV files.") + + df = pd.read_csv(file, **self._pandas_config) + + text_list = df.apply( + lambda row: (self._col_joiner).join(row.astype(str).tolist()), axis=1 + ).tolist() + + if self._concat_rows: + return (self._row_joiner).join(text_list) + else: + return text_list diff --git a/scripts/parser/open_ai_func.py b/scripts/parser/open_ai_func.py new file mode 100644 index 0000000..500e488 --- /dev/null +++ b/scripts/parser/open_ai_func.py @@ -0,0 +1,44 @@ +import faiss +import pickle +import tiktoken +from langchain.vectorstores import FAISS +from langchain.embeddings import OpenAIEmbeddings + + +def num_tokens_from_string(string: str, encoding_name: str) -> int: +# Function to convert string to tokens and estimate user cost. + encoding = tiktoken.get_encoding(encoding_name) + num_tokens = len(encoding.encode(string)) + total_price = ((num_tokens/1000) * 0.0004) + return num_tokens, total_price + +def call_openai_api(docs): +# Function to create a vector store from the documents and save it to disk. + store = FAISS.from_documents(docs, OpenAIEmbeddings()) + faiss.write_index(store.index, "docs.index") + store.index = None + + with open("faiss_store.pkl", "wb") as f: + pickle.dump(store, f) + +def get_user_permission(docs): +# Function to ask user permission to call the OpenAI api and spend their OpenAI funds. + # Here we convert the docs list to a string and calculate the number of OpenAI tokens the string represents. + #docs_content = (" ".join(docs)) + docs_content = "" + for doc in docs: + docs_content += doc.page_content + + + tokens, total_price = num_tokens_from_string(string=docs_content, encoding_name="cl100k_base") + # Here we print the number of tokens and the approx user cost with some visually appealing formatting. + print(f"Number of Tokens = {format(tokens, ',d')}") + print(f"Approx Cost = ${format(total_price, ',.2f')}") + #Here we check for user permission before calling the API. + user_input = input("Price Okay? (Y/N) \n").lower() + if user_input == "y": + call_openai_api(docs) + elif user_input == "": + call_openai_api(docs) + else: + print("The API was not called. No money was spent.") \ No newline at end of file diff --git a/scripts/parser/schema/base.py b/scripts/parser/schema/base.py new file mode 100644 index 0000000..0871f88 --- /dev/null +++ b/scripts/parser/schema/base.py @@ -0,0 +1,35 @@ +"""Base schema for readers.""" +from dataclasses import dataclass + +from langchain.docstore.document import Document as LCDocument + +from parser.schema.schema import BaseDocument + + +@dataclass +class Document(BaseDocument): + """Generic interface for a data document. + + This document connects to data sources. + + """ + + def __post_init__(self) -> None: + """Post init.""" + if self.text is None: + raise ValueError("text field not set.") + + @classmethod + def get_type(cls) -> str: + """Get Document type.""" + return "Document" + + def to_langchain_format(self) -> LCDocument: + """Convert struct to LangChain document format.""" + metadata = self.extra_info or {} + return LCDocument(page_content=self.text, metadata=metadata) + + @classmethod + def from_langchain_format(cls, doc: LCDocument) -> "Document": + """Convert struct from LangChain document format.""" + return cls(text=doc.page_content, extra_info=doc.metadata) diff --git a/scripts/parser/schema/schema.py b/scripts/parser/schema/schema.py new file mode 100644 index 0000000..ec467e5 --- /dev/null +++ b/scripts/parser/schema/schema.py @@ -0,0 +1,64 @@ +"""Base schema for data structures.""" +from abc import abstractmethod +from dataclasses import dataclass +from typing import Any, Dict, List, Optional + +from dataclasses_json import DataClassJsonMixin + + +@dataclass +class BaseDocument(DataClassJsonMixin): + """Base document. + + Generic abstract interfaces that captures both index structs + as well as documents. + + """ + + # TODO: consolidate fields from Document/IndexStruct into base class + text: Optional[str] = None + doc_id: Optional[str] = None + embedding: Optional[List[float]] = None + + # extra fields + extra_info: Optional[Dict[str, Any]] = None + + @classmethod + @abstractmethod + def get_type(cls) -> str: + """Get Document type.""" + + def get_text(self) -> str: + """Get text.""" + if self.text is None: + raise ValueError("text field not set.") + return self.text + + def get_doc_id(self) -> str: + """Get doc_id.""" + if self.doc_id is None: + raise ValueError("doc_id not set.") + return self.doc_id + + @property + def is_doc_id_none(self) -> bool: + """Check if doc_id is None.""" + return self.doc_id is None + + def get_embedding(self) -> List[float]: + """Get embedding. + + Errors if embedding is None. + + """ + if self.embedding is None: + raise ValueError("embedding not set.") + return self.embedding + + @property + def extra_info_str(self) -> Optional[str]: + """Extra info string.""" + if self.extra_info is None: + return None + + return "\n".join([f"{k}: {str(v)}" for k, v in self.extra_info.items()])