community: Retry retriable errors in Neo4j (#26211)

Co-authored-by: Erick Friis <erick@langchain.dev>
pull/26560/head^2
Tomaz Bratanic 3 days ago committed by GitHub
parent acbb4e4701
commit 03b9aca55d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -411,7 +411,9 @@ class Neo4jGraph(GraphStore):
return self.structured_schema return self.structured_schema
def query( def query(
self, query: str, params: dict = {}, retry_on_session_expired: bool = True self,
query: str,
params: dict = {},
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
"""Query Neo4j database. """Query Neo4j database.
@ -423,26 +425,44 @@ class Neo4jGraph(GraphStore):
List[Dict[str, Any]]: The list of dictionaries containing the query results. List[Dict[str, Any]]: The list of dictionaries containing the query results.
""" """
from neo4j import Query from neo4j import Query
from neo4j.exceptions import CypherSyntaxError, SessionExpired from neo4j.exceptions import Neo4jError
with self._driver.session(database=self._database) as session:
try: try:
data = session.run(Query(text=query, timeout=self.timeout), params) data, _, _ = self._driver.execute_query(
Query(text=query, timeout=self.timeout),
database=self._database,
parameters_=params,
)
json_data = [r.data() for r in data] json_data = [r.data() for r in data]
if self.sanitize: if self.sanitize:
json_data = [value_sanitize(el) for el in json_data] json_data = [value_sanitize(el) for el in json_data]
return json_data return json_data
except CypherSyntaxError as e: except Neo4jError as e:
raise ValueError(f"Generated Cypher Statement is not valid\n{e}") if not (
except ( (
SessionExpired ( # isCallInTransactionError
) as e: # Session expired is a transient error that can be retried e.code == "Neo.DatabaseError.Statement.ExecutionFailed"
if retry_on_session_expired: or e.code
return self.query( == "Neo.DatabaseError.Transaction.TransactionStartFailed"
query, params=params, retry_on_session_expired=False
) )
else: and "in an implicit transaction" in e.message
raise e )
or ( # isPeriodicCommitError
e.code == "Neo.ClientError.Statement.SemanticError"
and (
"in an open transaction is not possible" in e.message
or "tried to execute in an explicit transaction" in e.message
)
)
):
raise
# fallback to allow implicit transactions
with self._driver.session() as session:
data = session.run(Query(text=query, timeout=self.timeout), params)
json_data = [r.data() for r in data]
if self.sanitize:
json_data = [value_sanitize(el) for el in json_data]
return json_data
def refresh_schema(self) -> None: def refresh_schema(self) -> None:
""" """

@ -595,11 +595,8 @@ class Neo4jVector(VectorStore):
query: str, query: str,
*, *,
params: Optional[dict] = None, params: Optional[dict] = None,
retry_on_session_expired: bool = True,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """Query Neo4j database with retries and exponential backoff.
This method sends a Cypher query to the connected Neo4j database
and returns the results as a list of dictionaries.
Args: Args:
query (str): The Cypher query to execute. query (str): The Cypher query to execute.
@ -608,24 +605,38 @@ class Neo4jVector(VectorStore):
Returns: Returns:
List[Dict[str, Any]]: List of dictionaries containing the query results. List[Dict[str, Any]]: List of dictionaries containing the query results.
""" """
from neo4j.exceptions import CypherSyntaxError, SessionExpired from neo4j import Query
from neo4j.exceptions import Neo4jError
params = params or {} params = params or {}
with self._driver.session(database=self._database) as session:
try: try:
data = session.run(query, params) data, _, _ = self._driver.execute_query(
query, database=self._database, parameters_=params
)
return [r.data() for r in data] return [r.data() for r in data]
except CypherSyntaxError as e: except Neo4jError as e:
raise ValueError(f"Cypher Statement is not valid\n{e}") if not (
except ( (
SessionExpired ( # isCallInTransactionError
) as e: # Session expired is a transient error that can be retried e.code == "Neo.DatabaseError.Statement.ExecutionFailed"
if retry_on_session_expired: or e.code
return self.query( == "Neo.DatabaseError.Transaction.TransactionStartFailed"
query, params=params, retry_on_session_expired=False
) )
else: and "in an implicit transaction" in e.message
raise e )
or ( # isPeriodicCommitError
e.code == "Neo.ClientError.Statement.SemanticError"
and (
"in an open transaction is not possible" in e.message
or "tried to execute in an explicit transaction" in e.message
)
)
):
raise
# Fallback to allow implicit transactions
with self._driver.session() as session:
data = session.run(Query(text=query), params)
return [r.data() for r in data]
def verify_version(self) -> None: def verify_version(self) -> None:
""" """

Loading…
Cancel
Save