diff --git a/docs/docs/integrations/graphs/amazon_neptune_open_cypher.ipynb b/docs/docs/integrations/graphs/amazon_neptune_open_cypher.ipynb index 2fba8ee16d..520ebc8a42 100644 --- a/docs/docs/integrations/graphs/amazon_neptune_open_cypher.ipynb +++ b/docs/docs/integrations/graphs/amazon_neptune_open_cypher.ipynb @@ -12,12 +12,23 @@ ">\n", ">[Cypher](https://en.wikipedia.org/wiki/Cypher_(query_language)) is a declarative graph query language that allows for expressive and efficient data querying in a property graph.\n", ">\n", - ">[openCypher](https://opencypher.org/) is an open-source implementation of Cypher." + ">[openCypher](https://opencypher.org/) is an open-source implementation of Cypher.", + "# Neptune Open Cypher QA Chain\n", + "This QA chain queries Amazon Neptune using openCypher and returns human readable response\n", + "\n", + "LangChain supports both [Neptune Database](https://docs.aws.amazon.com/neptune/latest/userguide/intro.html) and [Neptune Analytics](https://docs.aws.amazon.com/neptune-analytics/latest/userguide/what-is-neptune-analytics.html) with `NeptuneOpenCypherQAChain` \n", + "\n", + "\n", + "Neptune Database is a serverless graph database designed for optimal scalability and availability. It provides a solution for graph database workloads that need to scale to 100,000 queries per second, Multi-AZ high availability, and multi-Region deployments. You can use Neptune Database for social networking, fraud alerting, and Customer 360 applications.\n", + "\n", + "Neptune Analytics is an analytics database engine that can quickly analyze large amounts of graph data in memory to get insights and find trends. Neptune Analytics is a solution for quickly analyzing existing graph databases or graph datasets stored in a data lake. It uses popular graph analytic algorithms and low-latency analytic queries.\n", + "\n", + "## Using Neptune Database" ] }, { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -30,9 +41,36 @@ "graph = NeptuneGraph(host=host, port=port, use_https=use_https)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Using Neptune Analytics" + ] + }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from langchain_community.graphs import NeptuneAnalyticsGraph\n", + "\n", + "graph = NeptuneAnalyticsGraph(graph_identifier=\"\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Using NeptuneOpenCypherQAChain\n", + "\n", + "This QA chain queries Neptune graph database using openCypher and returns human readable response." + ] + }, + { + "cell_type": "code", + "execution_count": null, "metadata": {}, "outputs": [ { @@ -54,7 +92,7 @@ "\n", "chain = NeptuneOpenCypherQAChain.from_llm(llm=llm, graph=graph)\n", "\n", - "chain.run(\"how many outgoing routes does the Austin airport have?\")" + "chain.invoke(\"how many outgoing routes does the Austin airport have?\")" ] } ], diff --git a/libs/community/langchain_community/graphs/__init__.py b/libs/community/langchain_community/graphs/__init__.py index f714246866..e188e00dbe 100644 --- a/libs/community/langchain_community/graphs/__init__.py +++ b/libs/community/langchain_community/graphs/__init__.py @@ -12,6 +12,8 @@ _module_lookup = { "MemgraphGraph": "langchain_community.graphs.memgraph_graph", "NebulaGraph": "langchain_community.graphs.nebula_graph", "Neo4jGraph": "langchain_community.graphs.neo4j_graph", + "BaseNeptuneGraph": "langchain_community.graphs.neptune_graph", + "NeptuneAnalyticsGraph": "langchain_community.graphs.neptune_graph", "NeptuneGraph": "langchain_community.graphs.neptune_graph", "NeptuneRdfGraph": "langchain_community.graphs.neptune_rdf_graph", "NetworkxEntityGraph": "langchain_community.graphs.networkx_graph", diff --git a/libs/community/langchain_community/graphs/neptune_graph.py b/libs/community/langchain_community/graphs/neptune_graph.py index d1ee0db14e..f7566ecffe 100644 --- a/libs/community/langchain_community/graphs/neptune_graph.py +++ b/libs/community/langchain_community/graphs/neptune_graph.py @@ -1,3 +1,5 @@ +import json +from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional, Tuple, Union @@ -19,7 +21,253 @@ class NeptuneQueryException(Exception): return self.details -class NeptuneGraph: +class BaseNeptuneGraph(ABC): + @property + def get_schema(self) -> str: + """Returns the schema of the Neptune database""" + return self.schema + + @abstractmethod + def query(self, query: str, params: dict = {}) -> dict: + raise NotImplementedError() + + @abstractmethod + def _get_summary(self) -> Dict: + raise NotImplementedError() + + def _get_labels(self) -> Tuple[List[str], List[str]]: + """Get node and edge labels from the Neptune statistics summary""" + summary = self._get_summary() + n_labels = summary["nodeLabels"] + e_labels = summary["edgeLabels"] + return n_labels, e_labels + + def _get_triples(self, e_labels: List[str]) -> List[str]: + triple_query = """ + MATCH (a)-[e:`{e_label}`]->(b) + WITH a,e,b LIMIT 3000 + RETURN DISTINCT labels(a) AS from, type(e) AS edge, labels(b) AS to + LIMIT 10 + """ + + triple_template = "(:`{a}`)-[:`{e}`]->(:`{b}`)" + triple_schema = [] + for label in e_labels: + q = triple_query.format(e_label=label) + data = self.query(q) + for d in data: + triple = triple_template.format( + a=d["from"][0], e=d["edge"], b=d["to"][0] + ) + triple_schema.append(triple) + + return triple_schema + + def _get_node_properties(self, n_labels: List[str], types: Dict) -> List: + node_properties_query = """ + MATCH (a:`{n_label}`) + RETURN properties(a) AS props + LIMIT 100 + """ + node_properties = [] + for label in n_labels: + q = node_properties_query.format(n_label=label) + data = {"label": label, "properties": self.query(q)} + s = set({}) + for p in data["properties"]: + for k, v in p["props"].items(): + s.add((k, types[type(v).__name__])) + + np = { + "properties": [{"property": k, "type": v} for k, v in s], + "labels": label, + } + node_properties.append(np) + + return node_properties + + def _get_edge_properties(self, e_labels: List[str], types: Dict[str, Any]) -> List: + edge_properties_query = """ + MATCH ()-[e:`{e_label}`]->() + RETURN properties(e) AS props + LIMIT 100 + """ + edge_properties = [] + for label in e_labels: + q = edge_properties_query.format(e_label=label) + data = {"label": label, "properties": self.query(q)} + s = set({}) + for p in data["properties"]: + for k, v in p["props"].items(): + s.add((k, types[type(v).__name__])) + + ep = { + "type": label, + "properties": [{"property": k, "type": v} for k, v in s], + } + edge_properties.append(ep) + + return edge_properties + + def _refresh_schema(self) -> None: + """ + Refreshes the Neptune graph schema information. + """ + + types = { + "str": "STRING", + "float": "DOUBLE", + "int": "INTEGER", + "list": "LIST", + "dict": "MAP", + "bool": "BOOLEAN", + } + n_labels, e_labels = self._get_labels() + triple_schema = self._get_triples(e_labels) + node_properties = self._get_node_properties(n_labels, types) + edge_properties = self._get_edge_properties(e_labels, types) + + self.schema = f""" + Node properties are the following: + {node_properties} + Relationship properties are the following: + {edge_properties} + The relationships are the following: + {triple_schema} + """ + + +class NeptuneAnalyticsGraph(BaseNeptuneGraph): + """Neptune Analytics wrapper for graph operations. + + Args: + client: optional boto3 Neptune client + credentials_profile_name: optional AWS profile name + region_name: optional AWS region, e.g., us-west-2 + graph_identifier: the graph identifier for a Neptune Analytics graph + + Example: + .. code-block:: python + + graph = NeptuneAnalyticsGraph( + graph_identifier='' + ) + + *Security note*: Make sure that the database connection uses credentials + that are narrowly-scoped to only include necessary permissions. + Failure to do so may result in data corruption or loss, since the calling + code may attempt commands that would result in deletion, mutation + of data if appropriately prompted or reading sensitive data if such + data is present in the database. + The best way to guard against such negative outcomes is to (as appropriate) + limit the permissions granted to the credentials used with this tool. + + See https://python.langchain.com/docs/security for more information. + """ + + def __init__( + self, + graph_identifier: str, + client: Any = None, + credentials_profile_name: Optional[str] = None, + region_name: Optional[str] = None, + ) -> None: + """Create a new Neptune Analytics graph wrapper instance.""" + + try: + if client is not None: + self.client = client + else: + import boto3 + + if credentials_profile_name is not None: + session = boto3.Session(profile_name=credentials_profile_name) + else: + # use default credentials + session = boto3.Session() + + self.graph_identifier = graph_identifier + + if region_name: + self.client = session.client( + "neptune-graph", region_name=region_name + ) + else: + self.client = session.client("neptune-graph") + + except ImportError: + raise ModuleNotFoundError( + "Could not import boto3 python package. " + "Please install it with `pip install boto3`." + ) + except Exception as e: + if type(e).__name__ == "UnknownServiceError": + raise ModuleNotFoundError( + "NeptuneGraph requires a boto3 version 1.34.40 or greater." + "Please install it with `pip install -U boto3`." + ) from e + else: + raise ValueError( + "Could not load credentials to authenticate with AWS client. " + "Please check that credentials in the specified " + "profile name are valid." + ) from e + + try: + self._refresh_schema() + except Exception as e: + raise NeptuneQueryException( + { + "message": "Could not get schema for Neptune database", + "detail": str(e), + } + ) + + def query(self, query: str, params: dict = {}) -> Dict[str, Any]: + """Query Neptune database.""" + try: + resp = self.client.execute_query( + graphIdentifier=self.graph_identifier, + queryString=query, + parameters=params, + language="OPEN_CYPHER", + ) + return json.loads(resp["payload"].read().decode("UTF-8"))["results"] + except Exception as e: + raise NeptuneQueryException( + { + "message": "An error occurred while executing the query.", + "details": str(e), + } + ) + + def _get_summary(self) -> Dict: + try: + response = self.client.get_graph_summary( + graphIdentifier=self.graph_identifier, mode="detailed" + ) + except Exception as e: + raise NeptuneQueryException( + { + "message": ("Summary API error occurred on Neptune Analytics"), + "details": str(e), + } + ) + + try: + summary = response["graphSummary"] + except Exception: + raise NeptuneQueryException( + { + "message": "Summary API did not return a valid response.", + "details": response.content.decode(), + } + ) + else: + return summary + + +class NeptuneGraph(BaseNeptuneGraph): """Neptune wrapper for graph operations. Args: @@ -60,7 +308,6 @@ class NeptuneGraph: client: Any = None, credentials_profile_name: Optional[str] = None, region_name: Optional[str] = None, - service: str = "neptunedata", sign: bool = True, ) -> None: """Create a new Neptune graph wrapper instance.""" @@ -86,13 +333,13 @@ class NeptuneGraph: client_params["endpoint_url"] = f"{protocol}://{host}:{port}" if sign: - self.client = session.client(service, **client_params) + self.client = session.client("neptunedata", **client_params) else: from botocore import UNSIGNED from botocore.config import Config self.client = session.client( - service, + "neptunedata", **client_params, config=Config(signature_version=UNSIGNED), ) @@ -125,15 +372,12 @@ class NeptuneGraph: } ) - @property - def get_schema(self) -> str: - """Returns the schema of the Neptune database""" - return self.schema - def query(self, query: str, params: dict = {}) -> Dict[str, Any]: """Query Neptune database.""" try: - return self.client.execute_open_cypher_query(openCypherQuery=query) + return self.client.execute_open_cypher_query(openCypherQuery=query)[ + "results" + ] except Exception as e: raise NeptuneQueryException( { @@ -167,104 +411,3 @@ class NeptuneGraph: ) else: return summary - - def _get_labels(self) -> Tuple[List[str], List[str]]: - """Get node and edge labels from the Neptune statistics summary""" - summary = self._get_summary() - n_labels = summary["nodeLabels"] - e_labels = summary["edgeLabels"] - return n_labels, e_labels - - def _get_triples(self, e_labels: List[str]) -> List[str]: - triple_query = """ - MATCH (a)-[e:`{e_label}`]->(b) - WITH a,e,b LIMIT 3000 - RETURN DISTINCT labels(a) AS from, type(e) AS edge, labels(b) AS to - LIMIT 10 - """ - - triple_template = "(:`{a}`)-[:`{e}`]->(:`{b}`)" - triple_schema = [] - for label in e_labels: - q = triple_query.format(e_label=label) - data = self.query(q) - for d in data["results"]: - triple = triple_template.format( - a=d["from"][0], e=d["edge"], b=d["to"][0] - ) - triple_schema.append(triple) - - return triple_schema - - def _get_node_properties(self, n_labels: List[str], types: Dict) -> List: - node_properties_query = """ - MATCH (a:`{n_label}`) - RETURN properties(a) AS props - LIMIT 100 - """ - node_properties = [] - for label in n_labels: - q = node_properties_query.format(n_label=label) - data = {"label": label, "properties": self.query(q)["results"]} - s = set({}) - for p in data["properties"]: - for k, v in p["props"].items(): - s.add((k, types[type(v).__name__])) - - np = { - "properties": [{"property": k, "type": v} for k, v in s], - "labels": label, - } - node_properties.append(np) - - return node_properties - - def _get_edge_properties(self, e_labels: List[str], types: Dict[str, Any]) -> List: - edge_properties_query = """ - MATCH ()-[e:`{e_label}`]->() - RETURN properties(e) AS props - LIMIT 100 - """ - edge_properties = [] - for label in e_labels: - q = edge_properties_query.format(e_label=label) - data = {"label": label, "properties": self.query(q)["results"]} - s = set({}) - for p in data["properties"]: - for k, v in p["props"].items(): - s.add((k, types[type(v).__name__])) - - ep = { - "type": label, - "properties": [{"property": k, "type": v} for k, v in s], - } - edge_properties.append(ep) - - return edge_properties - - def _refresh_schema(self) -> None: - """ - Refreshes the Neptune graph schema information. - """ - - types = { - "str": "STRING", - "float": "DOUBLE", - "int": "INTEGER", - "list": "LIST", - "dict": "MAP", - "bool": "BOOLEAN", - } - n_labels, e_labels = self._get_labels() - triple_schema = self._get_triples(e_labels) - node_properties = self._get_node_properties(n_labels, types) - edge_properties = self._get_edge_properties(e_labels, types) - - self.schema = f""" - Node properties are the following: - {node_properties} - Relationship properties are the following: - {edge_properties} - The relationships are the following: - {triple_schema} - """ diff --git a/libs/community/tests/unit_tests/graphs/test_imports.py b/libs/community/tests/unit_tests/graphs/test_imports.py index 272400085f..d7311ae323 100644 --- a/libs/community/tests/unit_tests/graphs/test_imports.py +++ b/libs/community/tests/unit_tests/graphs/test_imports.py @@ -5,6 +5,8 @@ EXPECTED_ALL = [ "NetworkxEntityGraph", "Neo4jGraph", "NebulaGraph", + "BaseNeptuneGraph", + "NeptuneAnalyticsGraph", "NeptuneGraph", "NeptuneRdfGraph", "KuzuGraph", diff --git a/libs/langchain/langchain/chains/graph_qa/neptune_cypher.py b/libs/langchain/langchain/chains/graph_qa/neptune_cypher.py index 8fec19f5e1..2b9447e70c 100644 --- a/libs/langchain/langchain/chains/graph_qa/neptune_cypher.py +++ b/libs/langchain/langchain/chains/graph_qa/neptune_cypher.py @@ -3,7 +3,7 @@ from __future__ import annotations import re from typing import Any, Dict, List, Optional -from langchain_community.graphs import NeptuneGraph +from langchain_community.graphs import BaseNeptuneGraph from langchain_core.callbacks import CallbackManagerForChainRun from langchain_core.language_models import BaseLanguageModel from langchain_core.prompts.base import BasePromptTemplate @@ -107,7 +107,7 @@ class NeptuneOpenCypherQAChain(Chain): response = chain.run(query) """ - graph: NeptuneGraph = Field(exclude=True) + graph: BaseNeptuneGraph = Field(exclude=True) cypher_generation_chain: LLMChain qa_chain: LLMChain input_key: str = "query" #: :meta private: