community[minor]: Add glue catalog loader (#20220)

Add Glue Catalog loader
pull/20520/head
Ravindu Somawansa 2 months ago committed by GitHub
parent aab075345e
commit 5acc7ba622
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,118 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "MwTWzDxYgbrR"
},
"source": [
"# Glue Catalog\n",
"\n",
"\n",
"The [AWS Glue Data Catalog](https://docs.aws.amazon.com/en_en/glue/latest/dg/catalog-and-crawler.html) is a centralized metadata repository that allows you to manage, access, and share metadata about your data stored in AWS. It acts as a metadata store for your data assets, enabling various AWS services and your applications to query and connect to the data they need efficiently.\n",
"\n",
"When you define data sources, transformations, and targets in AWS Glue, the metadata about these elements is stored in the Data Catalog. This includes information about data locations, schema definitions, runtime metrics, and more. It supports various data store types, such as Amazon S3, Amazon RDS, Amazon Redshift, and external databases compatible with JDBC. It is also directly integrated with Amazon Athena, Amazon Redshift Spectrum, and Amazon EMR, allowing these services to directly access and query the data.\n",
"\n",
"The Langchain GlueCatalogLoader will get the schema of all tables inside the given Glue database in the same format as Pandas dtype."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setting up\n",
"\n",
"- Follow [instructions to set up an AWS accoung](https://docs.aws.amazon.com/athena/latest/ug/setting-up.html).\n",
"- Install the boto3 library: `pip install boto3`\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Example"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "076NLjfngoWJ"
},
"outputs": [],
"source": [
"from langchain_community.document_loaders.glue_catalog import GlueCatalogLoader"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "XpMRQwU9gu44"
},
"outputs": [],
"source": [
"database_name = \"my_database\"\n",
"profile_name = \"my_profile\"\n",
"\n",
"loader = GlueCatalogLoader(\n",
" database=database_name,\n",
" profile_name=profile_name,\n",
")\n",
"\n",
"schemas = loader.load()\n",
"print(schemas)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Example with table filtering\n",
"\n",
"Table filtering allows you to selectively retrieve schema information for a specific subset of tables within a Glue database. Instead of loading the schemas for all tables, you can use the `table_filter` argument to specify exactly which tables you're interested in."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from langchain_community.document_loaders.glue_catalog import GlueCatalogLoader"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"database_name = \"my_database\"\n",
"profile_name = \"my_profile\"\n",
"table_filter = [\"table1\", \"table2\", \"table3\"]\n",
"\n",
"loader = GlueCatalogLoader(\n",
" database=database_name, profile_name=profile_name, table_filter=table_filter\n",
")\n",
"\n",
"schemas = loader.load()\n",
"print(schemas)"
]
}
],
"metadata": {
"colab": {
"provenance": []
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}

@ -209,6 +209,9 @@ if TYPE_CHECKING:
GithubFileLoader, # noqa: F401
GitHubIssuesLoader, # noqa: F401
)
from langchain_community.document_loaders.glue_catalog import (
GlueCatalogLoader, # noqa: F401
)
from langchain_community.document_loaders.google_speech_to_text import (
GoogleSpeechToTextLoader, # noqa: F401
)
@ -758,6 +761,7 @@ _module_lookup = {
"GitLoader": "langchain_community.document_loaders.git",
"GitbookLoader": "langchain_community.document_loaders.gitbook",
"GithubFileLoader": "langchain_community.document_loaders.github",
"GlueCatalogLoader": "langchain_community.document_loaders.glue_catalog",
"GoogleApiClient": "langchain_community.document_loaders.youtube",
"GoogleApiYoutubeLoader": "langchain_community.document_loaders.youtube",
"GoogleDriveLoader": "langchain_community.document_loaders.googledrive",

@ -0,0 +1,126 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
if TYPE_CHECKING:
from boto3.session import Session
class GlueCatalogLoader(BaseLoader):
"""Load table schemas from AWS Glue.
This loader fetches the schema of each table within a specified AWS Glue database.
The schema details include column names and their data types, similar to pandas
dtype representation.
AWS credentials are automatically loaded using boto3, following the standard AWS
method:
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
If a specific AWS profile is required, it can be specified and will be used to
establish the session.
"""
def __init__(
self,
database: str,
*,
session: Optional[Session] = None,
profile_name: Optional[str] = None,
table_filter: Optional[List[str]] = None,
):
"""Initialize Glue database loader.
Args:
database: The name of the Glue database from which to load table schemas.
session: Optional. A boto3 Session object. If not provided, a new
session will be created.
profile_name: Optional. The name of the AWS profile to use for credentials.
table_filter: Optional. List of table names to fetch schemas for,
fetching all if None.
"""
self.database = database
self.profile_name = profile_name
self.table_filter = table_filter
if session:
self.glue_client = session.client("glue")
else:
self.glue_client = self._initialize_glue_client()
def _initialize_glue_client(self) -> Any:
"""Initialize the AWS Glue client.
Returns:
The initialized AWS Glue client.
Raises:
ValueError: If there is an issue with AWS session/client initialization.
"""
try:
import boto3
except ImportError as e:
raise ImportError(
"boto3 is required to use the GlueCatalogLoader. "
"Please install it with `pip install boto3`."
) from e
try:
session = (
boto3.Session(profile_name=self.profile_name)
if self.profile_name
else boto3.Session()
)
return session.client("glue")
except Exception as e:
raise ValueError("Issue with AWS session/client initialization.") from e
def _fetch_tables(self) -> List[str]:
"""Retrieve all table names in the specified Glue database.
Returns:
A list of table names.
"""
paginator = self.glue_client.get_paginator("get_tables")
table_names = []
for page in paginator.paginate(DatabaseName=self.database):
for table in page["TableList"]:
if self.table_filter is None or table["Name"] in self.table_filter:
table_names.append(table["Name"])
return table_names
def _fetch_table_schema(self, table_name: str) -> Dict[str, str]:
"""Fetch the schema of a specified table.
Args:
table_name: The name of the table for which to fetch the schema.
Returns:
A dictionary mapping column names to their data types.
"""
response = self.glue_client.get_table(
DatabaseName=self.database, Name=table_name
)
columns = response["Table"]["StorageDescriptor"]["Columns"]
return {col["Name"]: col["Type"] for col in columns}
def lazy_load(self) -> Iterator[Document]:
"""Lazily load table schemas as Document objects.
Yields:
Document objects, each representing the schema of a table.
"""
table_names = self._fetch_tables()
for table_name in table_names:
schema = self._fetch_table_schema(table_name)
page_content = (
f"Database: {self.database}\nTable: {table_name}\nSchema:\n"
+ "\n".join(f"{col}: {dtype}" for col, dtype in schema.items())
)
doc = Document(
page_content=page_content, metadata={"table_name": table_name}
)
yield doc

@ -70,6 +70,7 @@ EXPECTED_ALL = [
"GCSFileLoader",
"GeoDataFrameLoader",
"GithubFileLoader",
"GlueCatalogLoader",
"GitHubIssuesLoader",
"GitLoader",
"GitbookLoader",

Loading…
Cancel
Save