Proper PEP8 formatting

pull/232/head
Anton Larin 1 year ago
parent 7f56f57778
commit 168648e789

@ -90,10 +90,12 @@ mongo = MongoClient(app.config['MONGO_URI'])
db = mongo["docsgpt"]
vectors_collection = db["vectors"]
async def async_generate(chain, question, chat_history):
result = await chain.arun({"question": question, "chat_history": chat_history})
return result
def run_async_chain(chain, question, chat_history):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
@ -105,6 +107,7 @@ def run_async_chain(chain, question, chat_history):
result["answer"] = answer
return result
@celery.task(bind=True)
def ingest(self, directory, formats, name_job, filename, user):
resp = ingest_worker(self, directory, formats, name_job, filename, user)
@ -206,7 +209,7 @@ def api_answer():
combine_docs_chain=doc_chain,
)
chat_history = []
#result = chain({"question": question, "chat_history": chat_history})
# result = chain({"question": question, "chat_history": chat_history})
# generate async with async generate method
result = run_async_chain(chain, question, chat_history)
else:

@ -52,17 +52,17 @@ class SimpleDirectoryReader(BaseReader):
"""
def __init__(
self,
input_dir: Optional[str] = None,
input_files: Optional[List] = None,
exclude_hidden: bool = True,
errors: str = "ignore",
recursive: bool = True,
required_exts: Optional[List[str]] = None,
file_extractor: Optional[Dict[str, BaseParser]] = None,
num_files_limit: Optional[int] = None,
file_metadata: Optional[Callable[[str], Dict]] = None,
chunk_size_max: int = 2048,
self,
input_dir: Optional[str] = None,
input_files: Optional[List] = None,
exclude_hidden: bool = True,
errors: str = "ignore",
recursive: bool = True,
required_exts: Optional[List[str]] = None,
file_extractor: Optional[Dict[str, BaseParser]] = None,
num_files_limit: Optional[int] = None,
file_metadata: Optional[Callable[[str], Dict]] = None,
chunk_size_max: int = 2048,
) -> None:
"""Initialize with parameters."""
super().__init__()
@ -102,8 +102,8 @@ class SimpleDirectoryReader(BaseReader):
elif self.exclude_hidden and input_file.name.startswith("."):
continue
elif (
self.required_exts is not None
and input_file.suffix not in self.required_exts
self.required_exts is not None
and input_file.suffix not in self.required_exts
):
continue
else:
@ -114,7 +114,7 @@ class SimpleDirectoryReader(BaseReader):
new_input_files.extend(sub_input_files)
if self.num_files_limit is not None and self.num_files_limit > 0:
new_input_files = new_input_files[0 : self.num_files_limit]
new_input_files = new_input_files[0: self.num_files_limit]
# print total number of files added
logging.debug(

@ -11,10 +11,10 @@ import tiktoken
import sys
from argparse import ArgumentParser
import ast
import json
dotenv.load_dotenv()
ps = list(Path("inputs").glob("**/*.py"))
data = []
sources = []
@ -24,7 +24,6 @@ for p in ps:
sources.append(p)
# with open('inputs/client.py', 'r') as f:
# tree = ast.parse(f.read())
@ -64,11 +63,9 @@ for code in data:
c1 += 1
# save the structure dict as json
import json
with open('structure_dict.json', 'w') as f:
json.dump(structure_dict, f)
# llm = OpenAI(temperature=0)
# prompt = PromptTemplate(
# input_variables=["code"],
@ -119,8 +116,3 @@ for source, classes in structure_dict.items():
else:
with open(f"outputs/{source_w}", "a") as f:
f.write(f"\n\nFunction: {functions[function]}, \nDocumentation: {response}")

@ -16,7 +16,6 @@ from parser.js2doc import extract_functions_and_classes as extract_js
from parser.java2doc import extract_functions_and_classes as extract_java
from parser.token_func import group_split
dotenv.load_dotenv()
app = typer.Typer(add_completion=False)
@ -25,28 +24,28 @@ nltk.download('punkt', quiet=True)
nltk.download('averaged_perceptron_tagger', quiet=True)
#Splits all files in specified folder to documents
# Splits all files in specified folder to documents
@app.command()
def ingest(yes: bool = typer.Option(False, "-y", "--yes", prompt=False,
help="Whether to skip price confirmation"),
help="Whether to skip price confirmation"),
dir: Optional[List[str]] = typer.Option(["inputs"],
help="""List of paths to directory for index creation.
E.g. --dir inputs --dir inputs2"""),
file: Optional[List[str]] = typer.Option(None,
help="""File paths to use (Optional; overrides dir).
help="""File paths to use (Optional; overrides dir).
E.g. --file inputs/1.md --file inputs/2.md"""),
recursive: Optional[bool] = typer.Option(True, help="Whether to recursively search in subdirectories."),
limit: Optional[int] = typer.Option(None, help="Maximum number of files to read."),
formats: Optional[List[str]] = typer.Option([".rst", ".md"],
help="""List of required extensions (list with .)
help="""List of required extensions (list with .)
Currently supported: .rst, .md, .pdf, .docx, .csv, .epub, .html, .mdx"""),
exclude: Optional[bool] = typer.Option(True, help="Whether to exclude hidden files (dotfiles)."),
sample: Optional[bool] = typer.Option(False, help="Whether to output sample of the first 5 split documents."),
sample: Optional[bool] = typer.Option(False,
help="Whether to output sample of the first 5 split documents."),
token_check: Optional[bool] = typer.Option(True, help="Whether to group small documents and split large."),
min_tokens: Optional[int] = typer.Option(150, help="Minimum number of tokens to not group."),
max_tokens: Optional[int] = typer.Option(2000, help="Maximum number of tokens to not split."),
):
"""
Creates index from specified location or files.
By default /inputs folder is used, .rst and .md are parsed.
@ -59,19 +58,19 @@ def ingest(yes: bool = typer.Option(False, "-y", "--yes", prompt=False,
# Here we split the documents, as needed, into smaller chunks.
# We do this due to the context limits of the LLMs.
raw_docs = group_split(documents=raw_docs, min_tokens=min_tokens, max_tokens=max_tokens, token_check=token_check)
#Old method
raw_docs = group_split(documents=raw_docs, min_tokens=min_tokens, max_tokens=max_tokens,
token_check=token_check)
# Old method
# text_splitter = RecursiveCharacterTextSplitter()
# docs = text_splitter.split_documents(raw_docs)
#Sample feature
# Sample feature
if sample == True:
for i in range(min(5, len(raw_docs))):
print(raw_docs[i].text)
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
# Here we check for command line arguments for bot calls.
# If no argument exists or the yes is not True, then the
# user permission is requested to call the API.
@ -98,12 +97,11 @@ def ingest(yes: bool = typer.Option(False, "-y", "--yes", prompt=False,
@app.command()
def convert(dir: Optional[str] = typer.Option("inputs",
help="""Path to directory to make documentation for.
help="""Path to directory to make documentation for.
E.g. --dir inputs """),
formats: Optional[str] = typer.Option("py",
help="""Required language.
help="""Required language.
py, js, java supported for now""")):
"""
Creates documentation linked to original functions from specified location.
By default /inputs folder is used, .py is parsed.
@ -117,7 +115,7 @@ def convert(dir: Optional[str] = typer.Option("inputs",
else:
raise Exception("Sorry, language not supported yet")
transform_to_docs(functions_dict, classes_dict, formats, dir)
if __name__ == "__main__":
app()
if __name__ == "__main__":
app()

@ -52,17 +52,17 @@ class SimpleDirectoryReader(BaseReader):
"""
def __init__(
self,
input_dir: Optional[str] = None,
input_files: Optional[List] = None,
exclude_hidden: bool = True,
errors: str = "ignore",
recursive: bool = True,
required_exts: Optional[List[str]] = None,
file_extractor: Optional[Dict[str, BaseParser]] = None,
num_files_limit: Optional[int] = None,
file_metadata: Optional[Callable[[str], Dict]] = None,
chunk_size_max: int = 2048,
self,
input_dir: Optional[str] = None,
input_files: Optional[List] = None,
exclude_hidden: bool = True,
errors: str = "ignore",
recursive: bool = True,
required_exts: Optional[List[str]] = None,
file_extractor: Optional[Dict[str, BaseParser]] = None,
num_files_limit: Optional[int] = None,
file_metadata: Optional[Callable[[str], Dict]] = None,
chunk_size_max: int = 2048,
) -> None:
"""Initialize with parameters."""
super().__init__()
@ -103,8 +103,8 @@ class SimpleDirectoryReader(BaseReader):
elif self.exclude_hidden and input_file.name.startswith("."):
continue
elif (
self.required_exts is not None
and input_file.suffix not in self.required_exts
self.required_exts is not None
and input_file.suffix not in self.required_exts
):
continue
else:
@ -115,7 +115,7 @@ class SimpleDirectoryReader(BaseReader):
new_input_files.extend(sub_input_files)
if self.num_files_limit is not None and self.num_files_limit > 0:
new_input_files = new_input_files[0 : self.num_files_limit]
new_input_files = new_input_files[0: self.num_files_limit]
# print total number of files added
logging.debug(

@ -9,6 +9,7 @@ from typing import Dict, Union
from parser.file.base_parser import BaseParser
class HTMLParser(BaseParser):
"""HTML parser."""
@ -32,12 +33,12 @@ class HTMLParser(BaseParser):
# Using the unstructured library to convert the html to isd format
# isd sample : isd = [
# {"text": "My Title", "type": "Title"},
# {"text": "My Narrative", "type": "NarrativeText"}
# ]
# {"text": "My Title", "type": "Title"},
# {"text": "My Narrative", "type": "NarrativeText"}
# ]
with open(file, "r", encoding="utf-8") as fp:
elements = partition_html(file=fp)
isd = convert_to_isd(elements)
isd = convert_to_isd(elements)
# Removing non ascii charactwers from isd_el['text']
for isd_el in isd:
@ -46,15 +47,15 @@ class HTMLParser(BaseParser):
# Removing all the \n characters from isd_el['text'] using regex and replace with single space
# Removing all the extra spaces from isd_el['text'] using regex and replace with single space
for isd_el in isd:
isd_el['text'] = re.sub(r'\n', ' ', isd_el['text'], flags=re.MULTILINE|re.DOTALL)
isd_el['text'] = re.sub(r"\s{2,}"," ", isd_el['text'], flags=re.MULTILINE|re.DOTALL)
isd_el['text'] = re.sub(r'\n', ' ', isd_el['text'], flags=re.MULTILINE | re.DOTALL)
isd_el['text'] = re.sub(r"\s{2,}", " ", isd_el['text'], flags=re.MULTILINE | re.DOTALL)
# more cleaning: extra_whitespaces, dashes, bullets, trailing_punctuation
for isd_el in isd:
clean(isd_el['text'], extra_whitespace=True, dashes=True, bullets=True, trailing_punctuation=True )
clean(isd_el['text'], extra_whitespace=True, dashes=True, bullets=True, trailing_punctuation=True)
# Creating a list of all the indexes of isd_el['type'] = 'Title'
title_indexes = [i for i,isd_el in enumerate(isd) if isd_el['type'] == 'Title']
title_indexes = [i for i, isd_el in enumerate(isd) if isd_el['type'] == 'Title']
# Creating 'Chunks' - List of lists of strings
# each list starting with with isd_el['type'] = 'Title' and all the data till the next 'Title'
@ -64,7 +65,7 @@ class HTMLParser(BaseParser):
Chunks = [[]]
final_chunks = list(list())
for i,isd_el in enumerate(isd):
for i, isd_el in enumerate(isd):
if i in title_indexes:
Chunks.append([])
Chunks[-1].append(isd_el['text'])
@ -76,7 +77,7 @@ class HTMLParser(BaseParser):
sum += len(str(chunk))
if sum < 25:
Chunks.remove(chunk)
else :
else:
# appending all the approved chunks to final_chunks as a single string
final_chunks.append(" ".join([str(item) for item in chunk]))
return final_chunks

@ -20,13 +20,13 @@ class MarkdownParser(BaseParser):
"""
def __init__(
self,
*args: Any,
remove_hyperlinks: bool = True,
remove_images: bool = True,
max_tokens: int = 2048,
# remove_tables: bool = True,
**kwargs: Any,
self,
*args: Any,
remove_hyperlinks: bool = True,
remove_images: bool = True,
max_tokens: int = 2048,
# remove_tables: bool = True,
**kwargs: Any,
) -> None:
"""Init params."""
super().__init__(*args, **kwargs)
@ -35,8 +35,8 @@ class MarkdownParser(BaseParser):
self._max_tokens = max_tokens
# self._remove_tables = remove_tables
def tups_chunk_append(self, tups: List[Tuple[Optional[str], str]], current_header: Optional[str], current_text: str):
def tups_chunk_append(self, tups: List[Tuple[Optional[str], str]], current_header: Optional[str],
current_text: str):
"""Append to tups chunk."""
num_tokens = len(tiktoken.get_encoding("cl100k_base").encode(current_text))
if num_tokens > self._max_tokens:
@ -46,6 +46,7 @@ class MarkdownParser(BaseParser):
else:
tups.append((current_header, current_text))
return tups
def markdown_to_tups(self, markdown_text: str) -> List[Tuple[Optional[str], str]]:
"""Convert a markdown file to a dictionary.
@ -115,7 +116,7 @@ class MarkdownParser(BaseParser):
return {}
def parse_tups(
self, filepath: Path, errors: str = "ignore"
self, filepath: Path, errors: str = "ignore"
) -> List[Tuple[Optional[str], str]]:
"""Parse file into tuples."""
with open(filepath, "r") as f:
@ -130,7 +131,7 @@ class MarkdownParser(BaseParser):
return markdown_tups
def parse_file(
self, filepath: Path, errors: str = "ignore"
self, filepath: Path, errors: str = "ignore"
) -> Union[str, List[str]]:
"""Parse file into string."""
tups = self.parse_tups(filepath, errors=errors)

@ -10,6 +10,7 @@ from typing import Any, Dict, List, Optional, Tuple, Union, cast
from parser.file.base_parser import BaseParser
import tiktoken
class RstParser(BaseParser):
"""reStructuredText parser.
@ -19,17 +20,17 @@ class RstParser(BaseParser):
"""
def __init__(
self,
*args: Any,
remove_hyperlinks: bool = True,
remove_images: bool = True,
remove_table_excess: bool = True,
remove_interpreters: bool = True,
remove_directives: bool = True,
remove_whitespaces_excess: bool = True,
#Be carefull with remove_characters_excess, might cause data loss
remove_characters_excess: bool = True,
**kwargs: Any,
self,
*args: Any,
remove_hyperlinks: bool = True,
remove_images: bool = True,
remove_table_excess: bool = True,
remove_interpreters: bool = True,
remove_directives: bool = True,
remove_whitespaces_excess: bool = True,
# Be carefull with remove_characters_excess, might cause data loss
remove_characters_excess: bool = True,
**kwargs: Any,
) -> None:
"""Init params."""
super().__init__(*args, **kwargs)
@ -41,7 +42,6 @@ class RstParser(BaseParser):
self._remove_whitespaces_excess = remove_whitespaces_excess
self._remove_characters_excess = remove_characters_excess
def rst_to_tups(self, rst_text: str) -> List[Tuple[Optional[str], str]]:
"""Convert a reStructuredText file to a dictionary.
@ -56,7 +56,8 @@ class RstParser(BaseParser):
for i, line in enumerate(lines):
header_match = re.match(r"^[^\S\n]*[-=]+[^\S\n]*$", line)
if header_match and i > 0 and (len(lines[i - 1].strip()) == len(header_match.group().strip()) or lines[i - 2] == lines[i - 2]):
if header_match and i > 0 and (
len(lines[i - 1].strip()) == len(header_match.group().strip()) or lines[i - 2] == lines[i - 2]):
if current_header is not None:
if current_text == "" or None:
continue
@ -72,7 +73,7 @@ class RstParser(BaseParser):
rst_tups.append((current_header, current_text))
#TODO: Format for rst
# TODO: Format for rst
#
# if current_header is not None:
# # pass linting, assert keys are defined
@ -136,7 +137,7 @@ class RstParser(BaseParser):
return {}
def parse_tups(
self, filepath: Path, errors: str = "ignore"
self, filepath: Path, errors: str = "ignore"
) -> List[Tuple[Optional[str], str]]:
"""Parse file into tuples."""
with open(filepath, "r") as f:
@ -159,7 +160,7 @@ class RstParser(BaseParser):
return rst_tups
def parse_file(
self, filepath: Path, errors: str = "ignore"
self, filepath: Path, errors: str = "ignore"
) -> Union[str, List[str]]:
"""Parse file into string."""
tups = self.parse_tups(filepath, errors=errors)

@ -77,13 +77,13 @@ class PandasCSVParser(BaseParser):
"""
def __init__(
self,
*args: Any,
concat_rows: bool = True,
col_joiner: str = ", ",
row_joiner: str = "\n",
pandas_config: dict = {},
**kwargs: Any
self,
*args: Any,
concat_rows: bool = True,
col_joiner: str = ", ",
row_joiner: str = "\n",
pandas_config: dict = {},
**kwargs: Any
) -> None:
"""Init params."""
super().__init__(*args, **kwargs)

@ -1,6 +1,7 @@
import os
import javalang
def find_files(directory):
files_list = []
for root, dirs, files in os.walk(directory):
@ -9,6 +10,7 @@ def find_files(directory):
files_list.append(os.path.join(root, file))
return files_list
def extract_functions(file_path):
with open(file_path, "r") as file:
java_code = file.read()
@ -28,6 +30,7 @@ def extract_functions(file_path):
methods[method_name] = method_source_code
return methods
def extract_classes(file_path):
with open(file_path, 'r') as file:
source_code = file.read()
@ -47,6 +50,7 @@ def extract_classes(file_path):
classes[class_name] = class_string
return classes
def extract_functions_and_classes(directory):
files = find_files(directory)
functions_dict = {}
@ -58,4 +62,4 @@ def extract_functions_and_classes(directory):
classes = extract_classes(file)
if classes:
classes_dict[file] = classes
return functions_dict, classes_dict
return functions_dict, classes_dict

@ -11,6 +11,7 @@ def find_files(directory):
files_list.append(os.path.join(root, file))
return files_list
def extract_functions(file_path):
with open(file_path, 'r') as file:
source_code = file.read()
@ -38,6 +39,7 @@ def extract_functions(file_path):
functions[func_name] = escodegen.generate(declaration.init)
return functions
def extract_classes(file_path):
with open(file_path, 'r') as file:
source_code = file.read()
@ -53,6 +55,7 @@ def extract_classes(file_path):
classes[class_name] = ", ".join(function_names)
return classes
def extract_functions_and_classes(directory):
files = find_files(directory)
functions_dict = {}

@ -5,28 +5,29 @@ import tiktoken
from langchain.vectorstores import FAISS
from langchain.embeddings import OpenAIEmbeddings
#from langchain.embeddings import HuggingFaceEmbeddings
#from langchain.embeddings import HuggingFaceInstructEmbeddings
#from langchain.embeddings import CohereEmbeddings
# from langchain.embeddings import HuggingFaceEmbeddings
# from langchain.embeddings import HuggingFaceInstructEmbeddings
# from langchain.embeddings import CohereEmbeddings
from retry import retry
def num_tokens_from_string(string: str, encoding_name: str) -> int:
# Function to convert string to tokens and estimate user cost.
# Function to convert string to tokens and estimate user cost.
encoding = tiktoken.get_encoding(encoding_name)
num_tokens = len(encoding.encode(string))
total_price = ((num_tokens/1000) * 0.0004)
total_price = ((num_tokens / 1000) * 0.0004)
return num_tokens, total_price
@retry(tries=10, delay=60)
def store_add_texts_with_retry(store, i):
store.add_texts([i.page_content], metadatas=[i.metadata])
#store_pine.add_texts([i.page_content], metadatas=[i.metadata])
# store_pine.add_texts([i.page_content], metadatas=[i.metadata])
def call_openai_api(docs, folder_name):
# Function to create a vector store from the documents and save it to disk.
# Function to create a vector store from the documents and save it to disk.
# create output folder if it doesn't exist
if not os.path.exists(f"outputs/{folder_name}"):
@ -37,21 +38,22 @@ def call_openai_api(docs, folder_name):
# remove the first element from docs
docs.pop(0)
# cut first n docs if you want to restart
#docs = docs[:n]
# docs = docs[:n]
c1 = 0
# pinecone.init(
# api_key="", # find at app.pinecone.io
# environment="us-east1-gcp" # next to api key in console
# )
#index_name = "pandas"
# index_name = "pandas"
store = FAISS.from_documents(docs_test, OpenAIEmbeddings())
#store_pine = Pinecone.from_documents(docs_test, OpenAIEmbeddings(), index_name=index_name)
# store_pine = Pinecone.from_documents(docs_test, OpenAIEmbeddings(), index_name=index_name)
# Uncomment for MPNet embeddings
# model_name = "sentence-transformers/all-mpnet-base-v2"
# hf = HuggingFaceEmbeddings(model_name=model_name)
# store = FAISS.from_documents(docs_test, hf)
for i in tqdm(docs, desc="Embedding 🦖", unit="docs", total=len(docs), bar_format='{l_bar}{bar}| Time Left: {remaining}'):
for i in tqdm(docs, desc="Embedding 🦖", unit="docs", total=len(docs),
bar_format='{l_bar}{bar}| Time Left: {remaining}'):
try:
store_add_texts_with_retry(store, i)
except Exception as e:
@ -64,20 +66,20 @@ def call_openai_api(docs, folder_name):
c1 += 1
store.save_local(f"outputs/{folder_name}")
def get_user_permission(docs, folder_name):
# Function to ask user permission to call the OpenAI api and spend their OpenAI funds.
# Function to ask user permission to call the OpenAI api and spend their OpenAI funds.
# Here we convert the docs list to a string and calculate the number of OpenAI tokens the string represents.
#docs_content = (" ".join(docs))
# docs_content = (" ".join(docs))
docs_content = ""
for doc in docs:
docs_content += doc.page_content
tokens, total_price = num_tokens_from_string(string=docs_content, encoding_name="cl100k_base")
# Here we print the number of tokens and the approx user cost with some visually appealing formatting.
print(f"Number of Tokens = {format(tokens, ',d')}")
print(f"Approx Cost = ${format(total_price, ',.2f')}")
#Here we check for user permission before calling the API.
# Here we check for user permission before calling the API.
user_input = input("Price Okay? (Y/N) \n").lower()
if user_input == "y":
call_openai_api(docs, folder_name)

@ -5,6 +5,7 @@ from pathlib import Path
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
def find_files(directory):
files_list = []
for root, dirs, files in os.walk(directory):
@ -13,6 +14,7 @@ def find_files(directory):
files_list.append(os.path.join(root, file))
return files_list
def extract_functions(file_path):
with open(file_path, 'r') as file:
source_code = file.read()
@ -25,6 +27,7 @@ def extract_functions(file_path):
functions[func_name] = func_def
return functions
def extract_classes(file_path):
with open(file_path, 'r') as file:
source_code = file.read()
@ -40,6 +43,7 @@ def extract_classes(file_path):
classes[class_name] = ", ".join(function_names)
return classes
def extract_functions_and_classes(directory):
files = find_files(directory)
functions_dict = {}
@ -53,11 +57,12 @@ def extract_functions_and_classes(directory):
classes_dict[file] = classes
return functions_dict, classes_dict
def parse_functions(functions_dict, formats, dir):
c1 = len(functions_dict)
for i, (source, functions) in enumerate(functions_dict.items(), start=1):
print(f"Processing file {i}/{c1}")
source_w = source.replace(dir+"/", "").replace("."+formats, ".md")
source_w = source.replace(dir + "/", "").replace("." + formats, ".md")
subfolders = "/".join(source_w.split("/")[:-1])
Path(f"outputs/{subfolders}").mkdir(parents=True, exist_ok=True)
for j, (name, function) in enumerate(functions.items(), start=1):
@ -70,18 +75,19 @@ def parse_functions(functions_dict, formats, dir):
response = llm(prompt.format(code=function))
mode = "a" if Path(f"outputs/{source_w}").exists() else "w"
with open(f"outputs/{source_w}", mode) as f:
f.write(f"\n\n# Function name: {name} \n\nFunction: \n```\n{function}\n```, \nDocumentation: \n{response}")
f.write(
f"\n\n# Function name: {name} \n\nFunction: \n```\n{function}\n```, \nDocumentation: \n{response}")
def parse_classes(classes_dict, formats, dir):
c1 = len(classes_dict)
for i, (source, classes) in enumerate(classes_dict.items()):
print(f"Processing file {i+1}/{c1}")
source_w = source.replace(dir+"/", "").replace("."+formats, ".md")
print(f"Processing file {i + 1}/{c1}")
source_w = source.replace(dir + "/", "").replace("." + formats, ".md")
subfolders = "/".join(source_w.split("/")[:-1])
Path(f"outputs/{subfolders}").mkdir(parents=True, exist_ok=True)
for name, function_names in classes.items():
print(f"Processing Class {i+1}/{c1}")
print(f"Processing Class {i + 1}/{c1}")
prompt = PromptTemplate(
input_variables=["class_name", "functions_names"],
template="Class name: {class_name} \nFunctions: {functions_names}, \nDocumentation: ",
@ -92,6 +98,7 @@ def parse_classes(classes_dict, formats, dir):
with open(f"outputs/{source_w}", "a" if Path(f"outputs/{source_w}").exists() else "w") as f:
f.write(f"\n\n# Class name: {name} \n\nFunctions: \n{function_names}, \nDocumentation: \n{response}")
def transform_to_docs(functions_dict, classes_dict, formats, dir):
docs_content = ''.join([str(key) + str(value) for key, value in functions_dict.items()])
docs_content += ''.join([str(key) + str(value) for key, value in classes_dict.items()])
@ -110,4 +117,4 @@ def transform_to_docs(functions_dict, classes_dict, formats, dir):
parse_classes(classes_dict, formats, dir)
print("All done!")
else:
print("The API was not called. No money was spent.")
print("The API was not called. No money was spent.")

@ -13,6 +13,7 @@ def separate_header_and_body(text):
body = text[len(header):]
return header, body
def group_documents(documents: List[Document], min_tokens: int, max_tokens: int) -> List[Document]:
docs = []
current_group = None
@ -23,7 +24,8 @@ def group_documents(documents: List[Document], min_tokens: int, max_tokens: int)
if current_group is None:
current_group = Document(text=doc.text, doc_id=doc.doc_id, embedding=doc.embedding,
extra_info=doc.extra_info)
elif len(tiktoken.get_encoding("cl100k_base").encode(current_group.text)) + doc_len < max_tokens and doc_len >= min_tokens:
elif len(tiktoken.get_encoding("cl100k_base").encode(
current_group.text)) + doc_len < max_tokens and doc_len >= min_tokens:
current_group.text += " " + doc.text
else:
docs.append(current_group)
@ -35,6 +37,7 @@ def group_documents(documents: List[Document], min_tokens: int, max_tokens: int)
return docs
def split_documents(documents: List[Document], max_tokens: int) -> List[Document]:
docs = []
for doc in documents:
@ -54,6 +57,7 @@ def split_documents(documents: List[Document], max_tokens: int) -> List[Document
docs.append(new_doc)
return docs
def group_split(documents: List[Document], max_tokens: int = 2000, min_tokens: int = 150, token_check: bool = True):
if token_check == False:
return documents

Loading…
Cancel
Save