DocsGPT/scripts/parser/token_func.py

77 lines
2.9 KiB
Python
Raw Normal View History

2023-03-14 09:32:29 +00:00
import re
from math import ceil
2023-03-14 09:32:29 +00:00
from typing import List
import tiktoken
2023-03-14 09:32:29 +00:00
from parser.schema.base import Document
def separate_header_and_body(text):
header_pattern = r"^(.*?\n){3}"
match = re.match(header_pattern, text)
header = match.group(0)
body = text[len(header):]
return header, body
2023-05-12 10:02:25 +00:00
2023-03-14 13:33:19 +00:00
def group_documents(documents: List[Document], min_tokens: int, max_tokens: int) -> List[Document]:
2023-03-14 09:32:29 +00:00
docs = []
current_group = None
for doc in documents:
doc_len = len(tiktoken.get_encoding("cl100k_base").encode(doc.text))
if current_group is None:
current_group = Document(text=doc.text, doc_id=doc.doc_id, embedding=doc.embedding,
extra_info=doc.extra_info)
2023-05-12 10:02:25 +00:00
elif len(tiktoken.get_encoding("cl100k_base").encode(
current_group.text)) + doc_len < max_tokens and doc_len < min_tokens:
2023-03-14 09:32:29 +00:00
current_group.text += " " + doc.text
else:
docs.append(current_group)
current_group = Document(text=doc.text, doc_id=doc.doc_id, embedding=doc.embedding,
extra_info=doc.extra_info)
if current_group is not None:
docs.append(current_group)
return docs
2023-05-12 10:02:25 +00:00
2023-03-14 13:33:19 +00:00
def split_documents(documents: List[Document], max_tokens: int) -> List[Document]:
2023-03-14 09:32:29 +00:00
docs = []
for doc in documents:
token_length = len(tiktoken.get_encoding("cl100k_base").encode(doc.text))
if token_length <= max_tokens:
docs.append(doc)
else:
header, body = separate_header_and_body(doc.text)
2023-07-24 15:14:08 +00:00
if len(tiktoken.get_encoding("cl100k_base").encode(header)) > max_tokens:
body = doc.text
header = ""
2023-03-14 09:32:29 +00:00
num_body_parts = ceil(token_length / max_tokens)
part_length = ceil(len(body) / num_body_parts)
body_parts = [body[i:i + part_length] for i in range(0, len(body), part_length)]
for i, body_part in enumerate(body_parts):
new_doc = Document(text=header + body_part.strip(),
doc_id=f"{doc.doc_id}-{i}",
embedding=doc.embedding,
extra_info=doc.extra_info)
docs.append(new_doc)
return docs
2023-05-12 10:02:25 +00:00
2023-03-14 13:33:19 +00:00
def group_split(documents: List[Document], max_tokens: int = 2000, min_tokens: int = 150, token_check: bool = True):
if not token_check:
2023-03-14 09:32:29 +00:00
return documents
print("Grouping small documents")
try:
documents = group_documents(documents=documents, min_tokens=min_tokens, max_tokens=max_tokens)
except Exception:
2023-03-14 09:32:29 +00:00
print("Grouping failed, try running without token_check")
print("Separating large documents")
try:
documents = split_documents(documents=documents, max_tokens=max_tokens)
except Exception:
2023-03-14 09:32:29 +00:00
print("Grouping failed, try running without token_check")
return documents