refactor: Combine workflow steps in a single file to make it easy to port to other projects

doc-sources
namuan 1 year ago
parent ef2bbe2164
commit 55dcc91c9d

@ -1,11 +1,230 @@
from .ask_question import AskQuestion
from .combine_all_text import CombineAllText
from .convert_images_to_text import ConvertImagesToText
from .convert_pdf_to_pages import ConvertPDFToImages
from .create_index import CreateIndex
from .find_interesting_blocks import FindInterestingBlocks
from .load_index import LoadIndex
from .verify_input_file import VerifyInputFile
import logging
import pickle
from pathlib import Path
from typing import Any
import faiss # type: ignore
import openai
from langchain import OpenAI, VectorDBQA
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores.faiss import FAISS
from py_executable_checklist.workflow import WorkflowBase
from py_executable_checklist.workflow import run_command
from pypdf import PdfReader
from doc_search import retry
class VerifyInputFile(WorkflowBase):
"""
Verify input file and return pdf stats
"""
input_pdf_path: Path
start_page: int
end_page: int
def execute(self) -> dict:
reader = PdfReader(self.input_pdf_path)
total_pages = len(reader.pages)
start_page = self.start_page if self.start_page != -1 else 1
end_page = self.end_page if self.end_page != -1 else total_pages
return {"start_page": start_page, "end_page": end_page, "total_pages": total_pages}
class ConvertPDFToImages(WorkflowBase):
"""
Convert PDF to images using ImageMagick
"""
input_pdf_path: Path
app_dir: Path
start_page: int
end_page: int
def execute(self) -> dict:
pdf_file_name = self.input_pdf_path.stem
output_dir = self.app_dir / "OutputDir/dr-doc-search" / pdf_file_name / "images"
output_dir.mkdir(parents=True, exist_ok=True)
for i in range(self.start_page, self.end_page):
input_file_page = f"{self.input_pdf_path}[{i}]"
image_path = output_dir / f"output-{i}.png"
if image_path.exists():
continue
convert_command = f"""convert -density 150 -trim -background white -alpha remove -quality 100 -sharpen 0x1.0 {input_file_page} -quality 100 {image_path}"""
run_command(convert_command)
return {"pdf_images_path": output_dir}
class ConvertImagesToText(WorkflowBase):
"""
Convert images to text using tessaract OCR
"""
pdf_images_path: Path
input_pdf_path: Path
app_dir: Path
def execute(self) -> dict:
pdf_file_name = self.input_pdf_path.stem
output_dir = self.app_dir / "OutputDir/dr-doc-search" / pdf_file_name / "scanned"
output_dir.mkdir(parents=True, exist_ok=True)
for image_path in self.pdf_images_path.glob("*.png"):
image_name = image_path.stem
text_path = output_dir / f"{image_name}"
if text_path.with_suffix(".txt").exists():
continue
tesseract_command = f"tesseract {image_path} {text_path} --oem 1 -l eng"
run_command(tesseract_command)
return {"pages_text_path": output_dir}
class CombineAllText(WorkflowBase):
"""
Combine all text files in the pages_text_path directory into one large text file and chunk it using Splitter
"""
pages_text_path: Path
def execute(self) -> dict:
text = ""
for file in self.pages_text_path.glob("*.txt"):
text += file.read_text()
text_splitter = CharacterTextSplitter(chunk_size=2000, chunk_overlap=0)
texts = text_splitter.split_text(text)
return {
"chunked_text_list": texts,
}
class CreateIndex(WorkflowBase):
"""
Create index for embedding search
"""
input_pdf_path: Path
app_dir: Path
overwrite_index: bool
chunked_text_list: list[str]
@retry(exceptions=openai.error.RateLimitError, tries=2, delay=60, back_off=2)
def append_to_index(self, docsearch: FAISS, text: str, embeddings: OpenAIEmbeddings) -> None:
docsearch.from_texts([text], embeddings)
def execute(self) -> dict:
pdf_file_name = self.input_pdf_path.stem
output_dir = self.app_dir / "OutputDir/dr-doc-search" / pdf_file_name / "index"
output_dir.mkdir(parents=True, exist_ok=True)
faiss_db = output_dir / "index.pkl"
index_path = output_dir / "docsearch.index"
if not self.overwrite_index and faiss_db.exists():
logging.info("Index already exists at %s", faiss_db)
return {"index_path": index_path, "faiss_db": faiss_db}
else:
logging.info(
"Creating index at %s either because overwrite_index == %s or index file exists == %s",
faiss_db,
self.overwrite_index,
faiss_db.exists(),
)
embeddings = OpenAIEmbeddings()
docsearch: FAISS = FAISS.from_texts(self.chunked_text_list[:2], embeddings)
for text in self.chunked_text_list[2:]:
self.append_to_index(docsearch, text, embeddings)
faiss.write_index(docsearch.index, index_path.as_posix())
with open(faiss_db, "wb") as f:
pickle.dump(docsearch, f)
return {"index_path": index_path, "faiss_db": faiss_db}
class LoadIndex(WorkflowBase):
"""
Load existing index for embedding search
"""
index_path: Path
faiss_db: Path
def execute(self) -> dict:
if not self.faiss_db.exists():
raise FileNotFoundError(f"FAISS DB file not found: {self.faiss_db}")
index = faiss.read_index(self.index_path.as_posix())
with open(self.faiss_db, "rb") as f:
search_index = pickle.load(f)
search_index.index = index
return {"search_index": search_index}
class FindInterestingBlocks(WorkflowBase):
"""
Load existing index for embedding search
"""
input_question: str
search_index: Any
def prompt_from_question(self, question: str) -> str:
return f"""Instructions:
- You are a text based search engine.
- Provide keywords and summary which should be relevant to answer the question.
- Retain as much information as needed to answer the question later.
Question:
{question}"""
def execute(self) -> dict:
prompt = self.prompt_from_question(self.input_question)
docs = self.search_index.similarity_search(prompt)
return {"selected_blocks": docs[0].page_content}
class AskQuestion(WorkflowBase):
"""
Load existing index for embedding search
"""
input_question: str
selected_blocks: str
search_index: Any
def prompt_from_question(self, question: str, selected_blocks: str) -> str:
return f"""
Instructions:
- Answer and guide the human when they ask for it.
- Provide detailed responses that relate to the humans prompt.
Summarize text.
{selected_blocks}
- Human:
${question}
AI:"""
def execute(self) -> dict:
prompt = self.prompt_from_question(self.input_question, self.selected_blocks)
qa = VectorDBQA.from_llm(llm=OpenAI(), vectorstore=self.search_index)
output = self.send_prompt(prompt, qa)
return {"output": output}
@retry(exceptions=openai.error.RateLimitError, tries=2, delay=60, back_off=2)
def send_prompt(self, prompt: str, qa: VectorDBQA) -> Any:
return qa.run(prompt)
def workflow_steps() -> list:

@ -1,41 +0,0 @@
from typing import Any
import openai
from langchain import OpenAI, VectorDBQA
from py_executable_checklist.workflow import WorkflowBase
from doc_search import retry
class AskQuestion(WorkflowBase):
"""
Load existing index for embedding search
"""
input_question: str
selected_blocks: str
search_index: Any
def prompt_from_question(self, question: str, selected_blocks: str) -> str:
return f"""
Instructions:
- Answer and guide the human when they ask for it.
- Provide detailed responses that relate to the humans prompt.
Summarize text.
{selected_blocks}
- Human:
${question}
AI:"""
def execute(self) -> dict:
prompt = self.prompt_from_question(self.input_question, self.selected_blocks)
qa = VectorDBQA.from_llm(llm=OpenAI(), vectorstore=self.search_index)
output = self.send_prompt(prompt, qa)
return {"output": output}
@retry(exceptions=openai.error.RateLimitError, tries=2, delay=60, back_off=2)
def send_prompt(self, prompt: str, qa: VectorDBQA) -> Any:
return qa.run(prompt)

@ -1,24 +0,0 @@
from pathlib import Path
from langchain.text_splitter import CharacterTextSplitter
from py_executable_checklist.workflow import WorkflowBase
class CombineAllText(WorkflowBase):
"""
Combine all text files in the pages_text_path directory into one large text file and chunk it using Splitter
"""
pages_text_path: Path
def execute(self) -> dict:
text = ""
for file in self.pages_text_path.glob("*.txt"):
text += file.read_text()
text_splitter = CharacterTextSplitter(chunk_size=2000, chunk_overlap=0)
texts = text_splitter.split_text(text)
return {
"chunked_text_list": texts,
}

@ -1,29 +0,0 @@
from pathlib import Path
from py_executable_checklist.workflow import WorkflowBase, run_command
class ConvertImagesToText(WorkflowBase):
"""
Convert images to text using tessaract OCR
"""
pdf_images_path: Path
input_pdf_path: Path
app_dir: Path
def execute(self) -> dict:
pdf_file_name = self.input_pdf_path.stem
output_dir = self.app_dir / "OutputDir/dr-doc-search" / pdf_file_name / "scanned"
output_dir.mkdir(parents=True, exist_ok=True)
for image_path in self.pdf_images_path.glob("*.png"):
image_name = image_path.stem
text_path = output_dir / f"{image_name}"
if text_path.with_suffix(".txt").exists():
continue
tesseract_command = f"tesseract {image_path} {text_path} --oem 1 -l eng"
run_command(tesseract_command)
return {"pages_text_path": output_dir}

@ -1,29 +0,0 @@
from pathlib import Path
from py_executable_checklist.workflow import WorkflowBase, run_command
class ConvertPDFToImages(WorkflowBase):
"""
Convert PDF to images using ImageMagick
"""
input_pdf_path: Path
app_dir: Path
start_page: int
end_page: int
def execute(self) -> dict:
pdf_file_name = self.input_pdf_path.stem
output_dir = self.app_dir / "OutputDir/dr-doc-search" / pdf_file_name / "images"
output_dir.mkdir(parents=True, exist_ok=True)
for i in range(self.start_page, self.end_page):
input_file_page = f"{self.input_pdf_path}[{i}]"
image_path = output_dir / f"output-{i}.png"
if image_path.exists():
continue
convert_command = f"""convert -density 150 -trim -background white -alpha remove -quality 100 -sharpen 0x1.0 {input_file_page} -quality 100 {image_path}"""
run_command(convert_command)
return {"pdf_images_path": output_dir}

@ -1,55 +0,0 @@
import logging
import pickle
from pathlib import Path
import faiss # type: ignore
import openai
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores.faiss import FAISS
from py_executable_checklist.workflow import WorkflowBase
from doc_search import retry
class CreateIndex(WorkflowBase):
"""
Create index for embedding search
"""
input_pdf_path: Path
app_dir: Path
overwrite_index: bool
chunked_text_list: list[str]
@retry(exceptions=openai.error.RateLimitError, tries=2, delay=60, back_off=2)
def append_to_index(self, docsearch: FAISS, text: str, embeddings: OpenAIEmbeddings) -> None:
docsearch.from_texts([text], embeddings)
def execute(self) -> dict:
pdf_file_name = self.input_pdf_path.stem
output_dir = self.app_dir / "OutputDir/dr-doc-search" / pdf_file_name / "index"
output_dir.mkdir(parents=True, exist_ok=True)
faiss_db = output_dir / "index.pkl"
index_path = output_dir / "docsearch.index"
if not self.overwrite_index and faiss_db.exists():
logging.info("Index already exists at %s", faiss_db)
return {"index_path": index_path, "faiss_db": faiss_db}
else:
logging.info(
"Creating index at %s either because overwrite_index == %s or index file exists == %s",
faiss_db,
self.overwrite_index,
faiss_db.exists(),
)
embeddings = OpenAIEmbeddings()
docsearch: FAISS = FAISS.from_texts(self.chunked_text_list[:2], embeddings)
for text in self.chunked_text_list[2:]:
self.append_to_index(docsearch, text, embeddings)
faiss.write_index(docsearch.index, index_path.as_posix())
with open(faiss_db, "wb") as f:
pickle.dump(docsearch, f)
return {"index_path": index_path, "faiss_db": faiss_db}

@ -1,26 +0,0 @@
from typing import Any
from py_executable_checklist.workflow import WorkflowBase
class FindInterestingBlocks(WorkflowBase):
"""
Load existing index for embedding search
"""
input_question: str
search_index: Any
def prompt_from_question(self, question: str) -> str:
return f"""Instructions:
- You are a text based search engine.
- Provide keywords and summary which should be relevant to answer the question.
- Retain as much information as needed to answer the question later.
Question:
{question}"""
def execute(self) -> dict:
prompt = self.prompt_from_question(self.input_question)
docs = self.search_index.similarity_search(prompt)
return {"selected_blocks": docs[0].page_content}

@ -1,25 +0,0 @@
import pickle
from pathlib import Path
import faiss # type: ignore
from py_executable_checklist.workflow import WorkflowBase
class LoadIndex(WorkflowBase):
"""
Load existing index for embedding search
"""
index_path: Path
faiss_db: Path
def execute(self) -> dict:
if not self.faiss_db.exists():
raise FileNotFoundError(f"FAISS DB file not found: {self.faiss_db}")
index = faiss.read_index(self.index_path.as_posix())
with open(self.faiss_db, "rb") as f:
search_index = pickle.load(f)
search_index.index = index
return {"search_index": search_index}

@ -1,22 +0,0 @@
from pathlib import Path
from py_executable_checklist.workflow import WorkflowBase
from pypdf import PdfReader
class VerifyInputFile(WorkflowBase):
"""
Verify input file and return pdf stats
"""
input_pdf_path: Path
start_page: int
end_page: int
def execute(self) -> dict:
reader = PdfReader(self.input_pdf_path)
total_pages = len(reader.pages)
start_page = self.start_page if self.start_page != -1 else 1
end_page = self.end_page if self.end_page != -1 else total_pages
return {"start_page": start_page, "end_page": end_page, "total_pages": total_pages}
Loading…
Cancel
Save