community[minor]: add neptune analytics graph (#20047)

Replacement for PR
[#19772](https://github.com/langchain-ai/langchain/pull/19772).

---------

Co-authored-by: Dave Bechberger <dbechbe@amazon.com>
Co-authored-by: bechbd <bechbd@users.noreply.github.com>
This commit is contained in:
Piyush Jain 2024-04-09 07:20:59 -07:00 committed by GitHub
parent ad9750403b
commit cd7abc495a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 302 additions and 117 deletions

View File

@ -12,12 +12,23 @@
">\n",
">[Cypher](https://en.wikipedia.org/wiki/Cypher_(query_language)) is a declarative graph query language that allows for expressive and efficient data querying in a property graph.\n",
">\n",
">[openCypher](https://opencypher.org/) is an open-source implementation of Cypher."
">[openCypher](https://opencypher.org/) is an open-source implementation of Cypher.",
"# Neptune Open Cypher QA Chain\n",
"This QA chain queries Amazon Neptune using openCypher and returns human readable response\n",
"\n",
"LangChain supports both [Neptune Database](https://docs.aws.amazon.com/neptune/latest/userguide/intro.html) and [Neptune Analytics](https://docs.aws.amazon.com/neptune-analytics/latest/userguide/what-is-neptune-analytics.html) with `NeptuneOpenCypherQAChain` \n",
"\n",
"\n",
"Neptune Database is a serverless graph database designed for optimal scalability and availability. It provides a solution for graph database workloads that need to scale to 100,000 queries per second, Multi-AZ high availability, and multi-Region deployments. You can use Neptune Database for social networking, fraud alerting, and Customer 360 applications.\n",
"\n",
"Neptune Analytics is an analytics database engine that can quickly analyze large amounts of graph data in memory to get insights and find trends. Neptune Analytics is a solution for quickly analyzing existing graph databases or graph datasets stored in a data lake. It uses popular graph analytic algorithms and low-latency analytic queries.\n",
"\n",
"## Using Neptune Database"
]
},
{
"cell_type": "code",
"execution_count": 1,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
@ -30,9 +41,36 @@
"graph = NeptuneGraph(host=host, port=port, use_https=use_https)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Using Neptune Analytics"
]
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from langchain_community.graphs import NeptuneAnalyticsGraph\n",
"\n",
"graph = NeptuneAnalyticsGraph(graph_identifier=\"<neptune-analytics-graph-id>\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Using NeptuneOpenCypherQAChain\n",
"\n",
"This QA chain queries Neptune graph database using openCypher and returns human readable response."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
@ -54,7 +92,7 @@
"\n",
"chain = NeptuneOpenCypherQAChain.from_llm(llm=llm, graph=graph)\n",
"\n",
"chain.run(\"how many outgoing routes does the Austin airport have?\")"
"chain.invoke(\"how many outgoing routes does the Austin airport have?\")"
]
}
],

View File

@ -12,6 +12,8 @@ _module_lookup = {
"MemgraphGraph": "langchain_community.graphs.memgraph_graph",
"NebulaGraph": "langchain_community.graphs.nebula_graph",
"Neo4jGraph": "langchain_community.graphs.neo4j_graph",
"BaseNeptuneGraph": "langchain_community.graphs.neptune_graph",
"NeptuneAnalyticsGraph": "langchain_community.graphs.neptune_graph",
"NeptuneGraph": "langchain_community.graphs.neptune_graph",
"NeptuneRdfGraph": "langchain_community.graphs.neptune_rdf_graph",
"NetworkxEntityGraph": "langchain_community.graphs.networkx_graph",

View File

@ -1,3 +1,5 @@
import json
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Tuple, Union
@ -19,7 +21,253 @@ class NeptuneQueryException(Exception):
return self.details
class NeptuneGraph:
class BaseNeptuneGraph(ABC):
@property
def get_schema(self) -> str:
"""Returns the schema of the Neptune database"""
return self.schema
@abstractmethod
def query(self, query: str, params: dict = {}) -> dict:
raise NotImplementedError()
@abstractmethod
def _get_summary(self) -> Dict:
raise NotImplementedError()
def _get_labels(self) -> Tuple[List[str], List[str]]:
"""Get node and edge labels from the Neptune statistics summary"""
summary = self._get_summary()
n_labels = summary["nodeLabels"]
e_labels = summary["edgeLabels"]
return n_labels, e_labels
def _get_triples(self, e_labels: List[str]) -> List[str]:
triple_query = """
MATCH (a)-[e:`{e_label}`]->(b)
WITH a,e,b LIMIT 3000
RETURN DISTINCT labels(a) AS from, type(e) AS edge, labels(b) AS to
LIMIT 10
"""
triple_template = "(:`{a}`)-[:`{e}`]->(:`{b}`)"
triple_schema = []
for label in e_labels:
q = triple_query.format(e_label=label)
data = self.query(q)
for d in data:
triple = triple_template.format(
a=d["from"][0], e=d["edge"], b=d["to"][0]
)
triple_schema.append(triple)
return triple_schema
def _get_node_properties(self, n_labels: List[str], types: Dict) -> List:
node_properties_query = """
MATCH (a:`{n_label}`)
RETURN properties(a) AS props
LIMIT 100
"""
node_properties = []
for label in n_labels:
q = node_properties_query.format(n_label=label)
data = {"label": label, "properties": self.query(q)}
s = set({})
for p in data["properties"]:
for k, v in p["props"].items():
s.add((k, types[type(v).__name__]))
np = {
"properties": [{"property": k, "type": v} for k, v in s],
"labels": label,
}
node_properties.append(np)
return node_properties
def _get_edge_properties(self, e_labels: List[str], types: Dict[str, Any]) -> List:
edge_properties_query = """
MATCH ()-[e:`{e_label}`]->()
RETURN properties(e) AS props
LIMIT 100
"""
edge_properties = []
for label in e_labels:
q = edge_properties_query.format(e_label=label)
data = {"label": label, "properties": self.query(q)}
s = set({})
for p in data["properties"]:
for k, v in p["props"].items():
s.add((k, types[type(v).__name__]))
ep = {
"type": label,
"properties": [{"property": k, "type": v} for k, v in s],
}
edge_properties.append(ep)
return edge_properties
def _refresh_schema(self) -> None:
"""
Refreshes the Neptune graph schema information.
"""
types = {
"str": "STRING",
"float": "DOUBLE",
"int": "INTEGER",
"list": "LIST",
"dict": "MAP",
"bool": "BOOLEAN",
}
n_labels, e_labels = self._get_labels()
triple_schema = self._get_triples(e_labels)
node_properties = self._get_node_properties(n_labels, types)
edge_properties = self._get_edge_properties(e_labels, types)
self.schema = f"""
Node properties are the following:
{node_properties}
Relationship properties are the following:
{edge_properties}
The relationships are the following:
{triple_schema}
"""
class NeptuneAnalyticsGraph(BaseNeptuneGraph):
"""Neptune Analytics wrapper for graph operations.
Args:
client: optional boto3 Neptune client
credentials_profile_name: optional AWS profile name
region_name: optional AWS region, e.g., us-west-2
graph_identifier: the graph identifier for a Neptune Analytics graph
Example:
.. code-block:: python
graph = NeptuneAnalyticsGraph(
graph_identifier='<my-graph-id>'
)
*Security note*: Make sure that the database connection uses credentials
that are narrowly-scoped to only include necessary permissions.
Failure to do so may result in data corruption or loss, since the calling
code may attempt commands that would result in deletion, mutation
of data if appropriately prompted or reading sensitive data if such
data is present in the database.
The best way to guard against such negative outcomes is to (as appropriate)
limit the permissions granted to the credentials used with this tool.
See https://python.langchain.com/docs/security for more information.
"""
def __init__(
self,
graph_identifier: str,
client: Any = None,
credentials_profile_name: Optional[str] = None,
region_name: Optional[str] = None,
) -> None:
"""Create a new Neptune Analytics graph wrapper instance."""
try:
if client is not None:
self.client = client
else:
import boto3
if credentials_profile_name is not None:
session = boto3.Session(profile_name=credentials_profile_name)
else:
# use default credentials
session = boto3.Session()
self.graph_identifier = graph_identifier
if region_name:
self.client = session.client(
"neptune-graph", region_name=region_name
)
else:
self.client = session.client("neptune-graph")
except ImportError:
raise ModuleNotFoundError(
"Could not import boto3 python package. "
"Please install it with `pip install boto3`."
)
except Exception as e:
if type(e).__name__ == "UnknownServiceError":
raise ModuleNotFoundError(
"NeptuneGraph requires a boto3 version 1.34.40 or greater."
"Please install it with `pip install -U boto3`."
) from e
else:
raise ValueError(
"Could not load credentials to authenticate with AWS client. "
"Please check that credentials in the specified "
"profile name are valid."
) from e
try:
self._refresh_schema()
except Exception as e:
raise NeptuneQueryException(
{
"message": "Could not get schema for Neptune database",
"detail": str(e),
}
)
def query(self, query: str, params: dict = {}) -> Dict[str, Any]:
"""Query Neptune database."""
try:
resp = self.client.execute_query(
graphIdentifier=self.graph_identifier,
queryString=query,
parameters=params,
language="OPEN_CYPHER",
)
return json.loads(resp["payload"].read().decode("UTF-8"))["results"]
except Exception as e:
raise NeptuneQueryException(
{
"message": "An error occurred while executing the query.",
"details": str(e),
}
)
def _get_summary(self) -> Dict:
try:
response = self.client.get_graph_summary(
graphIdentifier=self.graph_identifier, mode="detailed"
)
except Exception as e:
raise NeptuneQueryException(
{
"message": ("Summary API error occurred on Neptune Analytics"),
"details": str(e),
}
)
try:
summary = response["graphSummary"]
except Exception:
raise NeptuneQueryException(
{
"message": "Summary API did not return a valid response.",
"details": response.content.decode(),
}
)
else:
return summary
class NeptuneGraph(BaseNeptuneGraph):
"""Neptune wrapper for graph operations.
Args:
@ -60,7 +308,6 @@ class NeptuneGraph:
client: Any = None,
credentials_profile_name: Optional[str] = None,
region_name: Optional[str] = None,
service: str = "neptunedata",
sign: bool = True,
) -> None:
"""Create a new Neptune graph wrapper instance."""
@ -86,13 +333,13 @@ class NeptuneGraph:
client_params["endpoint_url"] = f"{protocol}://{host}:{port}"
if sign:
self.client = session.client(service, **client_params)
self.client = session.client("neptunedata", **client_params)
else:
from botocore import UNSIGNED
from botocore.config import Config
self.client = session.client(
service,
"neptunedata",
**client_params,
config=Config(signature_version=UNSIGNED),
)
@ -125,15 +372,12 @@ class NeptuneGraph:
}
)
@property
def get_schema(self) -> str:
"""Returns the schema of the Neptune database"""
return self.schema
def query(self, query: str, params: dict = {}) -> Dict[str, Any]:
"""Query Neptune database."""
try:
return self.client.execute_open_cypher_query(openCypherQuery=query)
return self.client.execute_open_cypher_query(openCypherQuery=query)[
"results"
]
except Exception as e:
raise NeptuneQueryException(
{
@ -167,104 +411,3 @@ class NeptuneGraph:
)
else:
return summary
def _get_labels(self) -> Tuple[List[str], List[str]]:
"""Get node and edge labels from the Neptune statistics summary"""
summary = self._get_summary()
n_labels = summary["nodeLabels"]
e_labels = summary["edgeLabels"]
return n_labels, e_labels
def _get_triples(self, e_labels: List[str]) -> List[str]:
triple_query = """
MATCH (a)-[e:`{e_label}`]->(b)
WITH a,e,b LIMIT 3000
RETURN DISTINCT labels(a) AS from, type(e) AS edge, labels(b) AS to
LIMIT 10
"""
triple_template = "(:`{a}`)-[:`{e}`]->(:`{b}`)"
triple_schema = []
for label in e_labels:
q = triple_query.format(e_label=label)
data = self.query(q)
for d in data["results"]:
triple = triple_template.format(
a=d["from"][0], e=d["edge"], b=d["to"][0]
)
triple_schema.append(triple)
return triple_schema
def _get_node_properties(self, n_labels: List[str], types: Dict) -> List:
node_properties_query = """
MATCH (a:`{n_label}`)
RETURN properties(a) AS props
LIMIT 100
"""
node_properties = []
for label in n_labels:
q = node_properties_query.format(n_label=label)
data = {"label": label, "properties": self.query(q)["results"]}
s = set({})
for p in data["properties"]:
for k, v in p["props"].items():
s.add((k, types[type(v).__name__]))
np = {
"properties": [{"property": k, "type": v} for k, v in s],
"labels": label,
}
node_properties.append(np)
return node_properties
def _get_edge_properties(self, e_labels: List[str], types: Dict[str, Any]) -> List:
edge_properties_query = """
MATCH ()-[e:`{e_label}`]->()
RETURN properties(e) AS props
LIMIT 100
"""
edge_properties = []
for label in e_labels:
q = edge_properties_query.format(e_label=label)
data = {"label": label, "properties": self.query(q)["results"]}
s = set({})
for p in data["properties"]:
for k, v in p["props"].items():
s.add((k, types[type(v).__name__]))
ep = {
"type": label,
"properties": [{"property": k, "type": v} for k, v in s],
}
edge_properties.append(ep)
return edge_properties
def _refresh_schema(self) -> None:
"""
Refreshes the Neptune graph schema information.
"""
types = {
"str": "STRING",
"float": "DOUBLE",
"int": "INTEGER",
"list": "LIST",
"dict": "MAP",
"bool": "BOOLEAN",
}
n_labels, e_labels = self._get_labels()
triple_schema = self._get_triples(e_labels)
node_properties = self._get_node_properties(n_labels, types)
edge_properties = self._get_edge_properties(e_labels, types)
self.schema = f"""
Node properties are the following:
{node_properties}
Relationship properties are the following:
{edge_properties}
The relationships are the following:
{triple_schema}
"""

View File

@ -5,6 +5,8 @@ EXPECTED_ALL = [
"NetworkxEntityGraph",
"Neo4jGraph",
"NebulaGraph",
"BaseNeptuneGraph",
"NeptuneAnalyticsGraph",
"NeptuneGraph",
"NeptuneRdfGraph",
"KuzuGraph",

View File

@ -3,7 +3,7 @@ from __future__ import annotations
import re
from typing import Any, Dict, List, Optional
from langchain_community.graphs import NeptuneGraph
from langchain_community.graphs import BaseNeptuneGraph
from langchain_core.callbacks import CallbackManagerForChainRun
from langchain_core.language_models import BaseLanguageModel
from langchain_core.prompts.base import BasePromptTemplate
@ -107,7 +107,7 @@ class NeptuneOpenCypherQAChain(Chain):
response = chain.run(query)
"""
graph: NeptuneGraph = Field(exclude=True)
graph: BaseNeptuneGraph = Field(exclude=True)
cypher_generation_chain: LLMChain
qa_chain: LLMChain
input_key: str = "query" #: :meta private: