langchain/libs/community/langchain_community/document_loaders/athena.py

160 lines
5.7 KiB
Python
Raw Normal View History

from __future__ import annotations
import io
import json
import time
from typing import Any, Dict, Iterator, List, Optional, Tuple
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
class AthenaLoader(BaseLoader):
"""Load documents from `AWS Athena`.
Each document represents one row of the result.
- By default, all columns are written into the `page_content` of the document
and none into the `metadata` of the document.
- If `metadata_columns` are provided then these columns are written
into the `metadata` of the document while the rest of the columns
are written into the `page_content` of the document.
To authenticate, the AWS client uses this method to automatically load credentials:
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
If a specific credential profile should be used, you must pass
the name of the profile from the ~/.aws/credentials file that is to be used.
Make sure the credentials / roles used have the required policies to
access the Amazon Textract service.
"""
def __init__(
self,
query: str,
database: str,
s3_output_uri: str,
profile_name: str,
metadata_columns: Optional[List[str]] = None,
):
"""Initialize Athena document loader.
Args:
query: The query to run in Athena.
database: Athena database
s3_output_uri: Athena output path
metadata_columns: Optional. Columns written to Document `metadata`.
"""
self.query = query
self.database = database
self.s3_output_uri = s3_output_uri
self.metadata_columns = metadata_columns if metadata_columns is not None else []
try:
import boto3
except ImportError:
raise ImportError(
"Could not import boto3 python package. "
"Please install it with `pip install boto3`."
)
try:
session = (
boto3.Session(profile_name=profile_name)
if profile_name is not None
else boto3.Session()
)
except Exception as e:
raise ValueError(
"Could not load credentials to authenticate with AWS client. "
"Please check that credentials in the specified "
"profile name are valid."
) from e
self.athena_client = session.client("athena")
self.s3_client = session.client("s3")
def _execute_query(self) -> List[Dict[str, Any]]:
response = self.athena_client.start_query_execution(
QueryString=self.query,
QueryExecutionContext={"Database": self.database},
ResultConfiguration={"OutputLocation": self.s3_output_uri},
)
query_execution_id = response["QueryExecutionId"]
while True:
response = self.athena_client.get_query_execution(
QueryExecutionId=query_execution_id
)
state = response["QueryExecution"]["Status"]["State"]
if state == "SUCCEEDED":
break
elif state == "FAILED":
resp_status = response["QueryExecution"]["Status"]
state_change_reason = resp_status["StateChangeReason"]
err = f"Query Failed: {state_change_reason}"
raise Exception(err)
elif state == "CANCELLED":
raise Exception("Query was cancelled by the user.")
time.sleep(1)
result_set = self._get_result_set(query_execution_id)
return json.loads(result_set.to_json(orient="records"))
def _remove_suffix(self, input_string: str, suffix: str) -> str:
if suffix and input_string.endswith(suffix):
return input_string[: -len(suffix)]
return input_string
def _remove_prefix(self, input_string: str, suffix: str) -> str:
if suffix and input_string.startswith(suffix):
return input_string[len(suffix) :]
return input_string
def _get_result_set(self, query_execution_id: str) -> Any:
try:
import pandas as pd
except ImportError:
raise ImportError(
"Could not import pandas python package. "
"Please install it with `pip install pandas`."
)
output_uri = self.s3_output_uri
tokens = self._remove_prefix(
self._remove_suffix(output_uri, "/"), "s3://"
).split("/")
bucket = tokens[0]
community[patch]: document_loaders: modified athena key logic to handle s3 uris without a prefix (#17526) https://github.com/langchain-ai/langchain/issues/17525 ### Example Code ```python from langchain_community.document_loaders.athena import AthenaLoader database_name = "database" s3_output_path = "s3://bucket-no-prefix" query="""SELECT CAST(extract(hour FROM current_timestamp) AS INTEGER) AS current_hour, CAST(extract(minute FROM current_timestamp) AS INTEGER) AS current_minute, CAST(extract(second FROM current_timestamp) AS INTEGER) AS current_second; """ profile_name = "AdministratorAccess" loader = AthenaLoader( query=query, database=database_name, s3_output_uri=s3_output_path, profile_name=profile_name, ) documents = loader.load() print(documents) ``` ### Error Message and Stack Trace (if applicable) NoSuchKey: An error occurred (NoSuchKey) when calling the GetObject operation: The specified key does not exist ### Description Athena Loader errors when result s3 bucket uri has no prefix. The Loader instance call results in a "NoSuchKey: An error occurred (NoSuchKey) when calling the GetObject operation: The specified key does not exist." error. If s3_output_path contains a prefix like: ```python s3_output_path = "s3://bucket-with-prefix/prefix" ``` Execution works without an error. ## Suggested solution Modify: ```python key = "/".join(tokens[1:]) + "/" + query_execution_id + ".csv" ``` to ```python key = "/".join(tokens[1:]) + ("/" if tokens[1:] else "") + query_execution_id + ".csv" ``` https://github.com/langchain-ai/langchain/blob/9e8a3fc4fff8e20ab5d1f113515ded14906eb6f3/libs/community/langchain_community/document_loaders/athena.py#L128 ### System Info System Information ------------------ > OS: Darwin > OS Version: Darwin Kernel Version 22.6.0: Fri Sep 15 13:41:30 PDT 2023; root:xnu-8796.141.3.700.8~1/RELEASE_ARM64_T8103 > Python Version: 3.9.9 (main, Jan 9 2023, 11:42:03) [Clang 14.0.0 (clang-1400.0.29.102)] Package Information ------------------- > langchain_core: 0.1.23 > langchain: 0.1.7 > langchain_community: 0.0.20 > langsmith: 0.0.87 > langchain_openai: 0.0.6 > langchainhub: 0.1.14 Packages not installed (Not Necessarily a Problem) -------------------------------------------------- The following packages were not found: > langgraph > langserve --------- Co-authored-by: Bagatur <baskaryan@gmail.com>
2024-02-14 19:48:31 +00:00
key = "/".join(tokens[1:] + [query_execution_id]) + ".csv"
obj = self.s3_client.get_object(Bucket=bucket, Key=key)
df = pd.read_csv(io.BytesIO(obj["Body"].read()), encoding="utf8")
return df
def _get_columns(
self, query_result: List[Dict[str, Any]]
) -> Tuple[List[str], List[str]]:
content_columns = []
metadata_columns = []
all_columns = list(query_result[0].keys())
for key in all_columns:
if key in self.metadata_columns:
metadata_columns.append(key)
else:
content_columns.append(key)
return content_columns, metadata_columns
def lazy_load(self) -> Iterator[Document]:
query_result = self._execute_query()
content_columns, metadata_columns = self._get_columns(query_result)
for row in query_result:
page_content = "\n".join(
f"{k}: {v}" for k, v in row.items() if k in content_columns
)
metadata = {
k: v for k, v in row.items() if k in metadata_columns and v is not None
}
doc = Document(page_content=page_content, metadata=metadata)
yield doc