diff --git a/docs/docs_skeleton/docs/integrations/document_transformers/docai.ipynb b/docs/docs_skeleton/docs/integrations/document_transformers/docai.ipynb index 418e8746b6..6446e2a217 100644 --- a/docs/docs_skeleton/docs/integrations/document_transformers/docai.ipynb +++ b/docs/docs_skeleton/docs/integrations/document_transformers/docai.ipynb @@ -2,39 +2,45 @@ "cells": [ { "cell_type": "markdown", - "id": "310fce10-e051-40db-89b0-5b5bb85cd145", + "id": "b317191d", "metadata": {}, "source": [ - "# Document AI\n" + "# Google Cloud Document AI\n" ] }, { "cell_type": "markdown", - "id": "f95ac25b-f025-40c3-95b8-77919fc4da7f", + "id": "a19e6f94", "metadata": {}, "source": [ - ">[Document AI](https://cloud.google.com/document-ai/docs/overview) is a `Google Cloud Platform` service to transform unstructured data from documents into structured data, making it easier to understand, analyze, and consume. " + "Document AI is a document understanding platform from Google Cloud to transform unstructured data from documents into structured data, making it easier to understand, analyze, and consume.\n", + "\n", + "Learn more:\n", + "\n", + "- [Document AI overview](https://cloud.google.com/document-ai/docs/overview)\n", + "- [Document AI videos and labs](https://cloud.google.com/document-ai/docs/videos)\n", + "- [Try it!](https://cloud.google.com/document-ai/docs/drag-and-drop)\n" ] }, { "cell_type": "markdown", - "id": "275f2193-248f-4565-a872-93a89589cf2b", + "id": "184c0af8", "metadata": {}, "source": [ "The module contains a `PDF` parser based on DocAI from Google Cloud.\n", "\n", - "You need to install two libraries to use this parser:" + "You need to install two libraries to use this parser:\n" ] }, { "cell_type": "code", "execution_count": null, - "id": "34132fab-0069-4942-b68b-5b093ccfc92a", + "id": "c86b2f59", "metadata": {}, "outputs": [], "source": [ - "!pip install google-cloud-documentai\n", - "!pip install google-cloud-documentai-toolbox" + "%pip install google-cloud-documentai\n", + "%pip install google-cloud-documentai-toolbox\n" ] }, { @@ -42,8 +48,9 @@ "id": "51946817-798c-4d11-abd6-db2ae53a0270", "metadata": {}, "source": [ - "First, you need to set up a [`GCS` bucket and create your own OCR processor](https://cloud.google.com/document-ai/docs/create-processor) \n", - "The `GCS_OUTPUT_PATH` should be a path to a folder on GCS (starting with `gs://`) and a processor name should look like `projects/PROJECT_NUMBER/locations/LOCATION/processors/PROCESSOR_ID`. You can get it either programmatically or copy from the `Prediction endpoint` section of the `Processor details` tab in the Google Cloud Console." + "First, you need to set up a Google Cloud Storage (GCS) bucket and create your own Optical Character Recognition (OCR) processor as described here: https://cloud.google.com/document-ai/docs/create-processor\n", + "\n", + "The `GCS_OUTPUT_PATH` should be a path to a folder on GCS (starting with `gs://`) and a `PROCESSOR_NAME` should look like `projects/PROJECT_NUMBER/locations/LOCATION/processors/PROCESSOR_ID` or `projects/PROJECT_NUMBER/locations/LOCATION/processors/PROCESSOR_ID/processorVersions/PROCESSOR_VERSION_ID`. You can get it either programmatically or copy from the `Prediction endpoint` section of the `Processor details` tab in the Google Cloud Console.\n" ] }, { @@ -53,9 +60,8 @@ "metadata": {}, "outputs": [], "source": [ - "PROJECT = \"PUT_SOMETHING_HERE\"\n", - "GCS_OUTPUT_PATH = \"PUT_SOMETHING_HERE\"\n", - "PROCESSOR_NAME = \"PUT_SOMETHING_HERE\"" + "GCS_OUTPUT_PATH = \"gs://BUCKET_NAME/FOLDER_PATH\"\n", + "PROCESSOR_NAME = \"projects/PROJECT_NUMBER/locations/LOCATION/processors/PROCESSOR_ID\"\n" ] }, { @@ -66,7 +72,7 @@ "outputs": [], "source": [ "from langchain.document_loaders.blob_loaders import Blob\n", - "from langchain.document_loaders.parsers import DocAIParser" + "from langchain.document_loaders.parsers import DocAIParser\n" ] }, { @@ -74,7 +80,7 @@ "id": "fad2bcca-1c0e-4888-b82d-15823ba57e60", "metadata": {}, "source": [ - "Now, let's create a parser:" + "Now, create a `DocAIParser`.\n" ] }, { @@ -84,7 +90,8 @@ "metadata": {}, "outputs": [], "source": [ - "parser = DocAIParser(location=\"us\", processor_name=PROCESSOR_NAME, gcs_output_path=GCS_OUTPUT_PATH)" + "parser = DocAIParser(\n", + " location=\"us\", processor_name=PROCESSOR_NAME, gcs_output_path=GCS_OUTPUT_PATH)\n" ] }, { @@ -92,7 +99,11 @@ "id": "b8b5a3ff-650a-4ad3-a73a-395f86e4c9e1", "metadata": {}, "source": [ - "Let's go and parse an Alphabet's take from here: https://abc.xyz/assets/a7/5b/9e5ae0364b12b4c883f3cf748226/goog-exhibit-99-1-q1-2023-19.pdf. Copy it to your GCS bucket first, and adjust the path below." + "For this example, you can use an Alphabet earnings report that's uploaded to a public GCS bucket.\n", + "\n", + "[2022Q1_alphabet_earnings_release.pdf](https://storage.googleapis.com/cloud-samples-data/gen-app-builder/search/alphabet-investor-pdfs/2022Q1_alphabet_earnings_release.pdf)\n", + "\n", + "Pass the document to the `lazy_parse()` method to\n" ] }, { @@ -102,17 +113,7 @@ "metadata": {}, "outputs": [], "source": [ - "blob = Blob(path=\"gs://vertex-pgt/examples/goog-exhibit-99-1-q1-2023-19.pdf\")" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "id": "6ef84fad-2981-456d-a6b4-3a6a1a46d511", - "metadata": {}, - "outputs": [], - "source": [ - "docs = list(parser.lazy_parse(blob))" + "blob = Blob(path=\"gs://cloud-samples-data/gen-app-builder/search/alphabet-investor-pdfs/2022Q1_alphabet_earnings_release.pdf\")\n" ] }, { @@ -120,7 +121,7 @@ "id": "3f8e4ee1-e07d-4c29-a120-4d56aae91859", "metadata": {}, "source": [ - "We'll get one document per page, 11 in total:" + "We'll get one document per page, 11 in total:\n" ] }, { @@ -138,7 +139,8 @@ } ], "source": [ - "print(len(docs))" + "docs = list(parser.lazy_parse(blob))\n", + "print(len(docs))\n" ] }, { @@ -146,7 +148,7 @@ "id": "b104ae56-011b-4abe-ac07-e999c69494c5", "metadata": {}, "source": [ - "You can run end-to-end parsing of a blob one-by-one. If you have many documents, it might be a better approach to batch them together and maybe even detach parsing from handling the results of parsing." + "You can run end-to-end parsing of a blob one-by-one. If you have many documents, it might be a better approach to batch them together and maybe even detach parsing from handling the results of parsing.\n" ] }, { @@ -165,7 +167,7 @@ ], "source": [ "operations = parser.docai_parse([blob])\n", - "print([op.operation.name for op in operations])" + "print([op.operation.name for op in operations])\n" ] }, { @@ -173,7 +175,7 @@ "id": "a2d24d63-c2c7-454c-9df3-2a9cf51309a6", "metadata": {}, "source": [ - "You can check whether operations are finished:" + "You can check whether operations are finished:\n" ] }, { @@ -194,7 +196,7 @@ } ], "source": [ - "parser.is_running(operations)" + "parser.is_running(operations)\n" ] }, { @@ -202,7 +204,7 @@ "id": "602ca0bc-080a-4a4e-a413-0e705aeab189", "metadata": {}, "source": [ - "And when they're finished, you can parse the results:" + "And when they're finished, you can parse the results:\n" ] }, { @@ -223,7 +225,7 @@ } ], "source": [ - "parser.is_running(operations)" + "parser.is_running(operations)\n" ] }, { @@ -242,7 +244,7 @@ ], "source": [ "results = parser.get_results(operations)\n", - "print(results[0])" + "print(results[0])\n" ] }, { @@ -250,7 +252,7 @@ "id": "87e5b606-1679-46c7-9577-4cf9bc93a752", "metadata": {}, "source": [ - "And now we can finally generate Documents from parsed results:" + "And now we can finally generate Documents from parsed results:\n" ] }, { @@ -260,7 +262,7 @@ "metadata": {}, "outputs": [], "source": [ - "docs = list(parser.parse_from_results(results))" + "docs = list(parser.parse_from_results(results))\n" ] }, { @@ -278,7 +280,7 @@ } ], "source": [ - "print(len(docs))" + "print(len(docs))\n" ] } ], @@ -290,7 +292,7 @@ "uri": "gcr.io/deeplearning-platform-release/base-cpu:m109" }, "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "Python 3", "language": "python", "name": "python3" }, @@ -304,7 +306,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.12" + "version": "3.10.11" } }, "nbformat": 4, diff --git a/libs/langchain/langchain/document_loaders/parsers/docai.py b/libs/langchain/langchain/document_loaders/parsers/docai.py index dd6913ac6e..8bdfce1176 100644 --- a/libs/langchain/langchain/document_loaders/parsers/docai.py +++ b/libs/langchain/langchain/document_loaders/parsers/docai.py @@ -1,4 +1,4 @@ -"""Module contains a PDF parser based on DocAI from Google Cloud. +"""Module contains a PDF parser based on Document AI from Google Cloud. You need to install two libraries to use this parser: pip install google-cloud-documentai @@ -24,13 +24,19 @@ logger = logging.getLogger(__name__) @dataclass class DocAIParsingResults: - """A dataclass to store DocAI parsing results.""" + """A dataclass to store Document AI parsing results.""" source_path: str parsed_path: str class DocAIParser(BaseBlobParser): + """`Google Cloud Document AI` parser. + + For a detailed explanation of Document AI, refer to the product documentation. + https://cloud.google.com/document-ai/docs/overview + """ + def __init__( self, *, @@ -43,19 +49,16 @@ class DocAIParser(BaseBlobParser): Args: client: a DocumentProcessorServiceClient to use - location: a GCP location where a DOcAI parser is located - gcs_output_path: a path on GCS to store parsing results - processor_name: name of a processor + location: a Google Cloud location where a Document AI processor is located + gcs_output_path: a path on Google Cloud Storage to store parsing results + processor_name: full resource name of a Document AI processor or processor + version You should provide either a client or location (and then a client would be instantiated). """ - if client and location: - raise ValueError( - "You should provide either a client or a location but not both " - "of them." - ) - if not client and not location: + + if bool(client) == bool(location): raise ValueError( "You must specify either a client or a location to instantiate " "a client." @@ -69,11 +72,11 @@ class DocAIParser(BaseBlobParser): try: from google.api_core.client_options import ClientOptions from google.cloud.documentai import DocumentProcessorServiceClient - except ImportError: + except ImportError as exc: raise ImportError( "documentai package not found, please install it with" " `pip install google-cloud-documentai`" - ) + ) from exc options = ClientOptions( api_endpoint=f"{location}-documentai.googleapis.com" ) @@ -85,11 +88,86 @@ class DocAIParser(BaseBlobParser): Args: blobs: a Blob to parse - This is a long-running operations! A recommended way is to batch - documents together and use `batch_parse` method. + This is a long-running operation. A recommended way is to batch + documents together and use the `batch_parse()` method. """ yield from self.batch_parse([blob], gcs_output_path=self._gcs_output_path) + def online_process( + self, + blob: Blob, + enable_native_pdf_parsing: bool = True, + field_mask: Optional[str] = None, + page_range: Optional[List[int]] = None, + ) -> Iterator[Document]: + """Parses a blob lazily using online processing. + + Args: + blob: a blob to parse. + enable_native_pdf_parsing: enable pdf embedded text extraction + field_mask: a comma-separated list of which fields to include in the + Document AI response. + suggested: "text,pages.pageNumber,pages.layout" + page_range: list of page numbers to parse. If `None`, + entire document will be parsed. + """ + try: + from google.cloud import documentai + from google.cloud.documentai_v1.types import ( + IndividualPageSelector, + OcrConfig, + ProcessOptions, + ) + except ImportError as exc: + raise ImportError( + "documentai package not found, please install it with" + " `pip install google-cloud-documentai`" + ) from exc + try: + from google.cloud.documentai_toolbox.wrappers.document import ( + Document as WrappedDocument, + ) + except ImportError as exc: + raise ImportError( + "documentai_toolbox package not found, please install it with" + " `pip install google-cloud-documentai-toolbox`" + ) from exc + ocr_config = ( + OcrConfig(enable_native_pdf_parsing=enable_native_pdf_parsing) + if enable_native_pdf_parsing + else None + ) + individual_page_selector = ( + IndividualPageSelector(pages=page_range) if page_range else None + ) + + response = self._client.process_document( + documentai.ProcessRequest( + name=self._processor_name, + gcs_document=documentai.GcsDocument( + gcs_uri=blob.path, + mime_type=blob.mimetype or "application/pdf", + ), + process_options=ProcessOptions( + ocr_config=ocr_config, + individual_page_selector=individual_page_selector, + ), + skip_human_review=True, + field_mask=field_mask, + ) + ) + wrapped_document = WrappedDocument.from_documentai_document(response.document) + yield from ( + Document( + page_content=page.text, + metadata={ + "page": page.page_number, + "source": wrapped_document.gcs_input_uri, + }, + ) + for page in wrapped_document.pages + ) + def batch_parse( self, blobs: Sequence[Blob], @@ -100,13 +178,13 @@ class DocAIParser(BaseBlobParser): """Parses a list of blobs lazily. Args: - blobs: a list of blobs to parse - gcs_output_path: a path on GCS to store parsing results - timeout_sec: a timeout to wait for DocAI to complete, in seconds + blobs: a list of blobs to parse. + gcs_output_path: a path on Google Cloud Storage to store parsing results. + timeout_sec: a timeout to wait for Document AI to complete, in seconds. check_in_interval_sec: an interval to wait until next check whether parsing operations have been completed, in seconds - This is a long-running operations! A recommended way is to decouple - parsing from creating Langchain Documents: + This is a long-running operation. A recommended way is to decouple + parsing from creating LangChain Documents: >>> operations = parser.docai_parse(blobs, gcs_path) >>> parser.is_running(operations) You can get operations names and save them: @@ -116,23 +194,22 @@ class DocAIParser(BaseBlobParser): >>> results = parser.get_results(operations) >>> docs = parser.parse_from_results(results) """ - output_path = gcs_output_path if gcs_output_path else self._gcs_output_path - if output_path is None: - raise ValueError("An output path on GCS should be provided!") + output_path = gcs_output_path or self._gcs_output_path + if not output_path: + raise ValueError( + "An output path on Google Cloud Storage should be provided." + ) operations = self.docai_parse(blobs, gcs_output_path=output_path) operation_names = [op.operation.name for op in operations] logger.debug( - f"Started parsing with DocAI, submitted operations {operation_names}" + "Started parsing with Document AI, submitted operations %s", operation_names ) - is_running, time_elapsed = True, 0 - while is_running: - is_running = self.is_running(operations) - if not is_running: - break + time_elapsed = 0 + while self.is_running(operations): time.sleep(check_in_interval_sec) time_elapsed += check_in_interval_sec if time_elapsed > timeout_sec: - raise ValueError( + raise TimeoutError( "Timeout exceeded! Check operations " f"{operation_names} later!" ) logger.debug(".") @@ -144,32 +221,32 @@ class DocAIParser(BaseBlobParser): self, results: List[DocAIParsingResults] ) -> Iterator[Document]: try: - from google.cloud.documentai_toolbox.wrappers.document import _get_shards - from google.cloud.documentai_toolbox.wrappers.page import _text_from_layout - except ImportError: + from google.cloud.documentai_toolbox.utilities.gcs_utilities import ( + split_gcs_uri, + ) + from google.cloud.documentai_toolbox.wrappers.document import ( + Document as WrappedDocument, + ) + except ImportError as exc: raise ImportError( "documentai_toolbox package not found, please install it with" " `pip install google-cloud-documentai-toolbox`" - ) + ) from exc for result in results: - output_gcs = result.parsed_path.split("/") - gcs_bucket_name = output_gcs[2] - gcs_prefix = "/".join(output_gcs[3:]) + "/" - shards = _get_shards(gcs_bucket_name, gcs_prefix) - docs, page_number = [], 1 - for shard in shards: - for page in shard.pages: - docs.append( - Document( - page_content=_text_from_layout(page.layout, shard.text), - metadata={ - "page": page_number, - "source": result.source_path, - }, - ) - ) - page_number += 1 - yield from docs + gcs_bucket_name, gcs_prefix = split_gcs_uri(result.parsed_path) + wrapped_document = WrappedDocument.from_gcs( + gcs_bucket_name, gcs_prefix, gcs_input_uri=result.source_path + ) + yield from ( + Document( + page_content=page.text, + metadata={ + "page": page.page_number, + "source": wrapped_document.gcs_input_uri, + }, + ) + for page in wrapped_document.pages + ) def operations_from_names(self, operation_names: List[str]) -> List["Operation"]: """Initializes Long-Running Operations from their names.""" @@ -177,116 +254,127 @@ class DocAIParser(BaseBlobParser): from google.longrunning.operations_pb2 import ( GetOperationRequest, # type: ignore ) - except ImportError: + except ImportError as exc: raise ImportError( - "documentai package not found, please install it with" + "long running operations package not found, please install it with" " `pip install gapic-google-longrunning`" - ) + ) from exc - operations = [] - for name in operation_names: - request = GetOperationRequest(name=name) - operations.append(self._client.get_operation(request=request)) - return operations + return [ + self._client.get_operation(request=GetOperationRequest(name=name)) + for name in operation_names + ] def is_running(self, operations: List["Operation"]) -> bool: - for op in operations: - if not op.done(): - return True - return False + return any(not op.done() for op in operations) def docai_parse( self, blobs: Sequence[Blob], *, gcs_output_path: Optional[str] = None, - batch_size: int = 4000, + processor_name: Optional[str] = None, + batch_size: int = 1000, enable_native_pdf_parsing: bool = True, + field_mask: Optional[str] = None, ) -> List["Operation"]: - """Runs Google DocAI PDF parser on a list of blobs. + """Runs Google Document AI PDF Batch Processing on a list of blobs. Args: blobs: a list of blobs to be parsed gcs_output_path: a path (folder) on GCS to store results + processor_name: name of a Document AI processor. batch_size: amount of documents per batch enable_native_pdf_parsing: a config option for the parser - - DocAI has a limit on the amount of documents per batch, that's why split a - batch into mini-batches. Parsing is an async long-running operation - on Google Cloud and results are stored in a output GCS bucket. + field_mask: a comma-separated list of which fields to include in the + Document AI response. + suggested: "text,pages.pageNumber,pages.layout" + + Document AI has a 1000 file limit per batch, so batches larger than that need + to be split into multiple requests. + Batch processing is an async long-running operation + and results are stored in a output GCS bucket. """ try: from google.cloud import documentai from google.cloud.documentai_v1.types import OcrConfig, ProcessOptions - except ImportError: + except ImportError as exc: raise ImportError( "documentai package not found, please install it with" " `pip install google-cloud-documentai`" - ) + ) from exc - if not self._processor_name: - raise ValueError("Processor name is not defined, aborting!") - output_path = gcs_output_path if gcs_output_path else self._gcs_output_path + output_path = gcs_output_path or self._gcs_output_path if output_path is None: - raise ValueError("An output path on GCS should be provided!") + raise ValueError( + "An output path on Google Cloud Storage should be provided." + ) + processor_name = processor_name or self._processor_name + if processor_name is None: + raise ValueError("A Document AI processor name should be provided.") operations = [] for batch in batch_iterate(size=batch_size, iterable=blobs): - documents = [] - for blob in batch: - gcs_document = documentai.GcsDocument( - gcs_uri=blob.path, mime_type="application/pdf" - ) - documents.append(gcs_document) - gcs_documents = documentai.GcsDocuments(documents=documents) - input_config = documentai.BatchDocumentsInputConfig( - gcs_documents=gcs_documents + gcs_documents=documentai.GcsDocuments( + documents=[ + documentai.GcsDocument( + gcs_uri=blob.path, + mime_type=blob.mimetype or "application/pdf", + ) + for blob in batch + ] + ) ) - gcs_output_config = documentai.DocumentOutputConfig.GcsOutputConfig( - gcs_uri=output_path, field_mask=None - ) output_config = documentai.DocumentOutputConfig( - gcs_output_config=gcs_output_config + gcs_output_config=documentai.DocumentOutputConfig.GcsOutputConfig( + gcs_uri=output_path, field_mask=field_mask + ) ) - if enable_native_pdf_parsing: - process_options = ProcessOptions( + process_options = ( + ProcessOptions( ocr_config=OcrConfig( enable_native_pdf_parsing=enable_native_pdf_parsing ) ) - else: - process_options = ProcessOptions() - request = documentai.BatchProcessRequest( - name=self._processor_name, - input_documents=input_config, - document_output_config=output_config, - process_options=process_options, + if enable_native_pdf_parsing + else None + ) + operations.append( + self._client.batch_process_documents( + documentai.BatchProcessRequest( + name=processor_name, + input_documents=input_config, + document_output_config=output_config, + process_options=process_options, + skip_human_review=True, + ) + ) ) - operations.append(self._client.batch_process_documents(request)) return operations def get_results(self, operations: List["Operation"]) -> List[DocAIParsingResults]: try: from google.cloud.documentai_v1 import BatchProcessMetadata - except ImportError: + except ImportError as exc: raise ImportError( "documentai package not found, please install it with" " `pip install google-cloud-documentai`" - ) + ) from exc - results = [] - for op in operations: - if isinstance(op.metadata, BatchProcessMetadata): - metadata = op.metadata - else: - metadata = BatchProcessMetadata.deserialize(op.metadata.value) - for status in metadata.individual_process_statuses: - source = status.input_gcs_source - output = status.output_gcs_destination - results.append( - DocAIParsingResults(source_path=source, parsed_path=output) - ) - return results + return [ + DocAIParsingResults( + source_path=status.input_gcs_source, + parsed_path=status.output_gcs_destination, + ) + for op in operations + for status in ( + op.metadata.individual_process_statuses + if isinstance(op.metadata, BatchProcessMetadata) + else BatchProcessMetadata.deserialize( + op.metadata.value + ).individual_process_statuses + ) + ]