mirror of
https://github.com/arc53/DocsGPT
synced 2024-11-02 03:40:17 +00:00
v1
This commit is contained in:
parent
a44cde33ed
commit
bac25112b7
@ -1,13 +1,17 @@
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import nltk
|
||||
import dotenv
|
||||
import typer
|
||||
import ast
|
||||
import tiktoken
|
||||
from math import ceil
|
||||
|
||||
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
||||
|
||||
@ -28,6 +32,57 @@ nltk.download('punkt', quiet=True)
|
||||
nltk.download('averaged_perceptron_tagger', quiet=True)
|
||||
|
||||
|
||||
def group_documents(documents: List[Document], min_tokens: int = 50, max_tokens: int = 2000) -> List[Document]:
|
||||
groups = []
|
||||
current_group = None
|
||||
|
||||
for doc in documents:
|
||||
doc_len = len(tiktoken.get_encoding("cl100k_base").encode(doc.text))
|
||||
|
||||
if current_group is None:
|
||||
current_group = Document(text=doc.text, doc_id=doc.doc_id, embedding=doc.embedding,
|
||||
extra_info=doc.extra_info)
|
||||
elif len(tiktoken.get_encoding("cl100k_base").encode(current_group.text)) + doc_len < max_tokens and doc_len >= min_tokens:
|
||||
current_group.text += " " + doc.text
|
||||
else:
|
||||
groups.append(current_group)
|
||||
current_group = Document(text=doc.text, doc_id=doc.doc_id, embedding=doc.embedding,
|
||||
extra_info=doc.extra_info)
|
||||
|
||||
if current_group is not None:
|
||||
groups.append(current_group)
|
||||
|
||||
return groups
|
||||
|
||||
|
||||
def separate_header_and_body(text):
|
||||
header_pattern = r"^(.*?\n){3}"
|
||||
match = re.match(header_pattern, text)
|
||||
header = match.group(0)
|
||||
body = text[len(header):]
|
||||
return header, body
|
||||
|
||||
def split_documents(documents: List[Document], max_tokens: int = 2000) -> List[Document]:
|
||||
new_documents = []
|
||||
for doc in documents:
|
||||
token_length = len(tiktoken.get_encoding("cl100k_base").encode(doc.text))
|
||||
print(token_length)
|
||||
if token_length <= max_tokens:
|
||||
new_documents.append(doc)
|
||||
else:
|
||||
header, body = separate_header_and_body(doc.text)
|
||||
num_body_parts = ceil(token_length / max_tokens)
|
||||
part_length = ceil(len(body) / num_body_parts)
|
||||
body_parts = [body[i:i + part_length] for i in range(0, len(body), part_length)]
|
||||
for i, body_part in enumerate(body_parts):
|
||||
new_doc = Document(text=header + body_part.strip(),
|
||||
doc_id=f"{doc.doc_id}-{i}",
|
||||
embedding=doc.embedding,
|
||||
extra_info=doc.extra_info)
|
||||
new_documents.append(new_doc)
|
||||
return new_documents
|
||||
|
||||
|
||||
#Splits all files in specified folder to documents
|
||||
@app.command()
|
||||
def ingest(yes: bool = typer.Option(False, "-y", "--yes", prompt=False,
|
||||
@ -56,6 +111,11 @@ def ingest(yes: bool = typer.Option(False, "-y", "--yes", prompt=False,
|
||||
raw_docs = SimpleDirectoryReader(input_dir=directory, input_files=file, recursive=recursive,
|
||||
required_exts=formats, num_files_limit=limit,
|
||||
exclude_hidden=exclude).load_data()
|
||||
|
||||
raw_docs = group_documents(raw_docs)
|
||||
raw_docs = split_documents(raw_docs)
|
||||
|
||||
print(raw_docs)
|
||||
raw_docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
|
||||
# Here we split the documents, as needed, into smaller chunks.
|
||||
# We do this due to the context limits of the LLMs.
|
||||
@ -109,3 +169,5 @@ def convert(dir: Optional[str] = typer.Option("inputs",
|
||||
transform_to_docs(functions_dict, classes_dict, formats, dir)
|
||||
if __name__ == "__main__":
|
||||
app()
|
||||
|
||||
|
||||
|
@ -29,7 +29,7 @@ class RstParser(BaseParser):
|
||||
remove_whitespaces_excess: bool = True,
|
||||
#Be carefull with remove_characters_excess, might cause data loss
|
||||
remove_characters_excess: bool = True,
|
||||
max_tokens: int = 2048,
|
||||
# max_tokens: int = 2048,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
"""Init params."""
|
||||
@ -41,18 +41,18 @@ class RstParser(BaseParser):
|
||||
self._remove_directives = remove_directives
|
||||
self._remove_whitespaces_excess = remove_whitespaces_excess
|
||||
self._remove_characters_excess = remove_characters_excess
|
||||
self._max_tokens = max_tokens
|
||||
# self._max_tokens = max_tokens
|
||||
|
||||
def tups_chunk_append(self, tups: List[Tuple[Optional[str], str]], current_header: Optional[str], current_text: str):
|
||||
"""Append to tups chunk."""
|
||||
num_tokens = len(tiktoken.get_encoding("cl100k_base").encode(current_text))
|
||||
if num_tokens > self._max_tokens:
|
||||
chunks = [current_text[i:i + self._max_tokens] for i in range(0, len(current_text), self._max_tokens)]
|
||||
for chunk in chunks:
|
||||
tups.append((current_header, chunk))
|
||||
else:
|
||||
tups.append((current_header, current_text))
|
||||
return tups
|
||||
# def tups_chunk_append(self, tups: List[Tuple[Optional[str], str]], current_header: Optional[str], current_text: str):
|
||||
# """Append to tups chunk."""
|
||||
# num_tokens = len(tiktoken.get_encoding("cl100k_base").encode(current_text))
|
||||
# if num_tokens > self._max_tokens:
|
||||
# chunks = [current_text[i:i + self._max_tokens] for i in range(0, len(current_text), self._max_tokens)]
|
||||
# for chunk in chunks:
|
||||
# tups.append((current_header, chunk))
|
||||
# else:
|
||||
# tups.append((current_header, current_text))
|
||||
# return tups
|
||||
|
||||
|
||||
def rst_to_tups(self, rst_text: str) -> List[Tuple[Optional[str], str]]:
|
||||
@ -76,14 +76,14 @@ class RstParser(BaseParser):
|
||||
# removes the next heading from current Document
|
||||
if current_text.endswith(lines[i - 1] + "\n"):
|
||||
current_text = current_text[:len(current_text) - len(lines[i - 1] + "\n")]
|
||||
rst_tups = self.tups_chunk_append(rst_tups, current_header, current_text)
|
||||
rst_tups.append((current_header, current_text))
|
||||
|
||||
current_header = lines[i - 1]
|
||||
current_text = ""
|
||||
else:
|
||||
current_text += line + "\n"
|
||||
|
||||
rst_tups = self.tups_chunk_append(rst_tups, current_header, current_text)
|
||||
rst_tups.append((current_header, current_text))
|
||||
|
||||
#TODO: Format for rst
|
||||
#
|
||||
|
Loading…
Reference in New Issue
Block a user