mirror of
https://github.com/arc53/DocsGPT
synced 2024-11-02 03:40:17 +00:00
98a97f34f5
still issues with celery worker.
78 lines
2.9 KiB
Python
78 lines
2.9 KiB
Python
import re
|
|
from math import ceil
|
|
from typing import List
|
|
|
|
import tiktoken
|
|
from application.parser.schema.base import Document
|
|
|
|
|
|
def separate_header_and_body(text):
|
|
header_pattern = r"^(.*?\n){3}"
|
|
match = re.match(header_pattern, text)
|
|
header = match.group(0)
|
|
body = text[len(header):]
|
|
return header, body
|
|
|
|
|
|
def group_documents(documents: List[Document], min_tokens: int, max_tokens: int) -> List[Document]:
|
|
docs = []
|
|
current_group = None
|
|
|
|
for doc in documents:
|
|
doc_len = len(tiktoken.get_encoding("cl100k_base").encode(doc.text))
|
|
|
|
if current_group is None:
|
|
current_group = Document(text=doc.text, doc_id=doc.doc_id, embedding=doc.embedding,
|
|
extra_info=doc.extra_info)
|
|
elif len(tiktoken.get_encoding("cl100k_base").encode(
|
|
current_group.text)) + doc_len < max_tokens and doc_len < min_tokens:
|
|
current_group.text += " " + doc.text
|
|
else:
|
|
docs.append(current_group)
|
|
current_group = Document(text=doc.text, doc_id=doc.doc_id, embedding=doc.embedding,
|
|
extra_info=doc.extra_info)
|
|
|
|
if current_group is not None:
|
|
docs.append(current_group)
|
|
|
|
return docs
|
|
|
|
|
|
def split_documents(documents: List[Document], max_tokens: int) -> List[Document]:
|
|
docs = []
|
|
for doc in documents:
|
|
token_length = len(tiktoken.get_encoding("cl100k_base").encode(doc.text))
|
|
if token_length <= max_tokens:
|
|
docs.append(doc)
|
|
else:
|
|
header, body = separate_header_and_body(doc.text)
|
|
if len(tiktoken.get_encoding("cl100k_base").encode(header)) > max_tokens:
|
|
body = doc.text
|
|
header = ""
|
|
num_body_parts = ceil(token_length / max_tokens)
|
|
part_length = ceil(len(body) / num_body_parts)
|
|
body_parts = [body[i:i + part_length] for i in range(0, len(body), part_length)]
|
|
for i, body_part in enumerate(body_parts):
|
|
new_doc = Document(text=header + body_part.strip(),
|
|
doc_id=f"{doc.doc_id}-{i}",
|
|
embedding=doc.embedding,
|
|
extra_info=doc.extra_info)
|
|
docs.append(new_doc)
|
|
return docs
|
|
|
|
|
|
def group_split(documents: List[Document], max_tokens: int = 2000, min_tokens: int = 150, token_check: bool = True):
|
|
if not token_check:
|
|
return documents
|
|
print("Grouping small documents")
|
|
try:
|
|
documents = group_documents(documents=documents, min_tokens=min_tokens, max_tokens=max_tokens)
|
|
except Exception:
|
|
print("Grouping failed, try running without token_check")
|
|
print("Separating large documents")
|
|
try:
|
|
documents = split_documents(documents=documents, max_tokens=max_tokens)
|
|
except Exception:
|
|
print("Grouping failed, try running without token_check")
|
|
return documents
|