experimental[patch]: Enhance LLMGraphTransformer with async processing and improved readability (#19205)

- [x] **PR title**: "experimental: Enhance LLMGraphTransformer with
async processing and improved readability"


- [x] **PR message**: 
- **Description:** This pull request refactors the `process_response`
and `convert_to_graph_documents` methods in the LLMGraphTransformer
class to improve code readability and adds async versions of these
methods for concurrent processing.
    The main changes include:
- Simplifying list comprehensions and conditional logic in the
process_response method for better readability.
- Adding async versions aprocess_response and
aconvert_to_graph_documents to enable concurrent processing of
documents.
These enhancements aim to improve the overall efficiency and
maintainability of the `LLMGraphTransformer` class.
  - **Issue:** N/A
  - **Dependencies:** No additional dependencies required.
  - **Twitter handle:** @jjovalle99


- [x] **Add tests and docs**: N/A (This PR does not introduce a new
integration)


- [x] **Lint and test**: Ran make format, make lint, and make test from
the root of the modified package(s). All tests pass successfully.

Additional notes:

- The changes made in this PR are backwards compatible and do not
introduce any breaking changes.
- The PR touches only the `LLMGraphTransformer` class within the
experimental package.

---------

Co-authored-by: Bagatur <22008038+baskaryan@users.noreply.github.com>
pull/18474/head^2
Juan Jose Miguel Ovalle Villamil 3 months ago committed by GitHub
parent f12cb0bea4
commit 1fe10a3e3d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,3 +1,4 @@
import asyncio
from typing import Any, List, Optional, Sequence
from langchain_community.graphs.graph_document import GraphDocument, Node, Relationship
@ -207,29 +208,20 @@ class LLMGraphTransformer:
"""
text = document.page_content
raw_schema = self.chain.invoke({"input": text})
if raw_schema.nodes:
nodes = [map_to_base_node(node) for node in raw_schema.nodes]
else:
nodes = []
if raw_schema.relationships:
relationships = [
map_to_base_relationship(rel) for rel in raw_schema.relationships
]
else:
relationships = []
nodes = (
[map_to_base_node(node) for node in raw_schema.nodes]
if raw_schema.nodes
else []
)
relationships = (
[map_to_base_relationship(rel) for rel in raw_schema.relationships]
if raw_schema.relationships
else []
)
# Strict mode filtering
if self.strict_mode and (self.allowed_nodes or self.allowed_relationships):
if self.allowed_relationships and self.allowed_nodes:
nodes = [node for node in nodes if node.type in self.allowed_nodes]
relationships = [
rel
for rel in relationships
if rel.type in self.allowed_relationships
and rel.source.type in self.allowed_nodes
and rel.target.type in self.allowed_nodes
]
elif self.allowed_nodes and not self.allowed_relationships:
if self.allowed_nodes:
nodes = [node for node in nodes if node.type in self.allowed_nodes]
relationships = [
rel
@ -237,17 +229,14 @@ class LLMGraphTransformer:
if rel.source.type in self.allowed_nodes
and rel.target.type in self.allowed_nodes
]
if self.allowed_relationships and not self.allowed_nodes:
if self.allowed_relationships:
relationships = [
rel
for rel in relationships
if rel.type in self.allowed_relationships
]
graph_document = GraphDocument(
nodes=nodes, relationships=relationships, source=document
)
return graph_document
return GraphDocument(nodes=nodes, relationships=relationships, source=document)
def convert_to_graph_documents(
self, documents: Sequence[Document]
@ -261,8 +250,54 @@ class LLMGraphTransformer:
Returns:
Sequence[GraphDocument]: The transformed documents as graphs.
"""
results = []
for document in documents:
graph_document = self.process_response(document)
results.append(graph_document)
return [self.process_response(document) for document in documents]
async def aprocess_response(self, document: Document) -> GraphDocument:
"""
Asynchronously processes a single document, transforming it into a
graph document.
"""
text = document.page_content
raw_schema = await self.chain.ainvoke({"input": text})
nodes = (
[map_to_base_node(node) for node in raw_schema.nodes]
if raw_schema.nodes
else []
)
relationships = (
[map_to_base_relationship(rel) for rel in raw_schema.relationships]
if raw_schema.relationships
else []
)
if self.strict_mode and (self.allowed_nodes or self.allowed_relationships):
if self.allowed_nodes:
nodes = [node for node in nodes if node.type in self.allowed_nodes]
relationships = [
rel
for rel in relationships
if rel.source.type in self.allowed_nodes
and rel.target.type in self.allowed_nodes
]
if self.allowed_relationships:
relationships = [
rel
for rel in relationships
if rel.type in self.allowed_relationships
]
return GraphDocument(nodes=nodes, relationships=relationships, source=document)
async def aconvert_to_graph_documents(
self, documents: Sequence[Document]
) -> List[GraphDocument]:
"""
Asynchronously convert a sequence of documents into graph documents.
"""
tasks = [
asyncio.create_task(self.aprocess_response(document))
for document in documents
]
results = await asyncio.gather(*tasks)
return results

Loading…
Cancel
Save