forked from Archives/langchain
Harrison/big query (#2100)
Co-authored-by: lu-cashmoney <lucas.corley@gmail.com>
This commit is contained in:
parent
eff5eed719
commit
410bf37fb8
202
docs/modules/indexes/document_loaders/examples/bigquery.ipynb
Normal file
202
docs/modules/indexes/document_loaders/examples/bigquery.ipynb
Normal file
@ -0,0 +1,202 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# BigQuery Loader\n",
|
||||
"\n",
|
||||
"Load a BigQuery query with one document per row."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from langchain.document_loaders import BigQueryLoader"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 3,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"BASE_QUERY = '''\n",
|
||||
"SELECT\n",
|
||||
" id,\n",
|
||||
" dna_sequence,\n",
|
||||
" organism\n",
|
||||
"FROM (\n",
|
||||
" SELECT\n",
|
||||
" ARRAY (\n",
|
||||
" SELECT\n",
|
||||
" AS STRUCT 1 AS id, \"ATTCGA\" AS dna_sequence, \"Lokiarchaeum sp. (strain GC14_75).\" AS organism\n",
|
||||
" UNION ALL\n",
|
||||
" SELECT\n",
|
||||
" AS STRUCT 2 AS id, \"AGGCGA\" AS dna_sequence, \"Heimdallarchaeota archaeon (strain LC_2).\" AS organism\n",
|
||||
" UNION ALL\n",
|
||||
" SELECT\n",
|
||||
" AS STRUCT 3 AS id, \"TCCGGA\" AS dna_sequence, \"Acidianus hospitalis (strain W1).\" AS organism) AS new_array),\n",
|
||||
" UNNEST(new_array)\n",
|
||||
"'''"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Basic Usage"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 6,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"loader = BigQueryLoader(BASE_QUERY)\n",
|
||||
"\n",
|
||||
"data = loader.load()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 7,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"[Document(page_content='id: 1\\ndna_sequence: ATTCGA\\norganism: Lokiarchaeum sp. (strain GC14_75).', lookup_str='', metadata={}, lookup_index=0), Document(page_content='id: 2\\ndna_sequence: AGGCGA\\norganism: Heimdallarchaeota archaeon (strain LC_2).', lookup_str='', metadata={}, lookup_index=0), Document(page_content='id: 3\\ndna_sequence: TCCGGA\\norganism: Acidianus hospitalis (strain W1).', lookup_str='', metadata={}, lookup_index=0)]\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"print(data)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Specifying Which Columns are Content vs Metadata"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 8,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"loader = BigQueryLoader(BASE_QUERY, page_content_columns=[\"dna_sequence\", \"organism\"], metadata_columns=[\"id\"])\n",
|
||||
"\n",
|
||||
"data = loader.load()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 9,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"[Document(page_content='dna_sequence: ATTCGA\\norganism: Lokiarchaeum sp. (strain GC14_75).', lookup_str='', metadata={'id': 1}, lookup_index=0), Document(page_content='dna_sequence: AGGCGA\\norganism: Heimdallarchaeota archaeon (strain LC_2).', lookup_str='', metadata={'id': 2}, lookup_index=0), Document(page_content='dna_sequence: TCCGGA\\norganism: Acidianus hospitalis (strain W1).', lookup_str='', metadata={'id': 3}, lookup_index=0)]\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"print(data)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Adding Source to Metadata"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 18,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# Note that the `id` column is being returned twice, with one instance aliased as `source`\n",
|
||||
"ALIASED_QUERY = '''\n",
|
||||
"SELECT\n",
|
||||
" id,\n",
|
||||
" dna_sequence,\n",
|
||||
" organism,\n",
|
||||
" id as source\n",
|
||||
"FROM (\n",
|
||||
" SELECT\n",
|
||||
" ARRAY (\n",
|
||||
" SELECT\n",
|
||||
" AS STRUCT 1 AS id, \"ATTCGA\" AS dna_sequence, \"Lokiarchaeum sp. (strain GC14_75).\" AS organism\n",
|
||||
" UNION ALL\n",
|
||||
" SELECT\n",
|
||||
" AS STRUCT 2 AS id, \"AGGCGA\" AS dna_sequence, \"Heimdallarchaeota archaeon (strain LC_2).\" AS organism\n",
|
||||
" UNION ALL\n",
|
||||
" SELECT\n",
|
||||
" AS STRUCT 3 AS id, \"TCCGGA\" AS dna_sequence, \"Acidianus hospitalis (strain W1).\" AS organism) AS new_array),\n",
|
||||
" UNNEST(new_array)\n",
|
||||
"'''"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 19,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"loader = BigQueryLoader(ALIASED_QUERY, metadata_columns=[\"source\"])\n",
|
||||
"\n",
|
||||
"data = loader.load()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 20,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"[Document(page_content='id: 1\\ndna_sequence: ATTCGA\\norganism: Lokiarchaeum sp. (strain GC14_75).\\nsource: 1', lookup_str='', metadata={'source': 1}, lookup_index=0), Document(page_content='id: 2\\ndna_sequence: AGGCGA\\norganism: Heimdallarchaeota archaeon (strain LC_2).\\nsource: 2', lookup_str='', metadata={'source': 2}, lookup_index=0), Document(page_content='id: 3\\ndna_sequence: TCCGGA\\norganism: Acidianus hospitalis (strain W1).\\nsource: 3', lookup_str='', metadata={'source': 3}, lookup_index=0)]\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"print(data)"
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3 (ipykernel)",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.1"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
@ -8,6 +8,7 @@ from langchain.document_loaders.azure_blob_storage_container import (
|
||||
from langchain.document_loaders.azure_blob_storage_file import (
|
||||
AzureBlobStorageFileLoader,
|
||||
)
|
||||
from langchain.document_loaders.bigquery import BigQueryLoader
|
||||
from langchain.document_loaders.blackboard import BlackboardLoader
|
||||
from langchain.document_loaders.college_confidential import CollegeConfidentialLoader
|
||||
from langchain.document_loaders.conllu import CoNLLULoader
|
||||
@ -122,4 +123,5 @@ __all__ = [
|
||||
"AzureBlobStorageContainerLoader",
|
||||
"SitemapLoader",
|
||||
"DuckDBLoader",
|
||||
"BigQueryLoader",
|
||||
]
|
||||
|
57
langchain/document_loaders/bigquery.py
Normal file
57
langchain/document_loaders/bigquery.py
Normal file
@ -0,0 +1,57 @@
|
||||
from typing import List, Optional
|
||||
|
||||
from langchain.docstore.document import Document
|
||||
from langchain.document_loaders.base import BaseLoader
|
||||
|
||||
|
||||
class BigQueryLoader(BaseLoader):
|
||||
"""Loads a query result from BigQuery into a list of documents.
|
||||
|
||||
Each document represents one row of the result. The `page_content_columns`
|
||||
are written into the `page_content` of the document. The `metadata_columns`
|
||||
are written into the `metadata` of the document. By default, all columns
|
||||
are written into the `page_content` and none into the `metadata`.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
query: str,
|
||||
project: Optional[str] = None,
|
||||
page_content_columns: Optional[List[str]] = None,
|
||||
metadata_columns: Optional[List[str]] = None,
|
||||
):
|
||||
self.query = query
|
||||
self.project = project
|
||||
self.page_content_columns = page_content_columns
|
||||
self.metadata_columns = metadata_columns
|
||||
|
||||
def load(self) -> List[Document]:
|
||||
try:
|
||||
from google.cloud import bigquery
|
||||
except ImportError as ex:
|
||||
raise ValueError(
|
||||
"Could not import google-cloud-bigquery python package. "
|
||||
"Please install it with `pip install google-cloud-bigquery`."
|
||||
) from ex
|
||||
|
||||
bq_client = bigquery.Client(self.project)
|
||||
query_result = bq_client.query(self.query).result()
|
||||
docs: List[Document] = []
|
||||
|
||||
page_content_columns = self.page_content_columns
|
||||
metadata_columns = self.metadata_columns
|
||||
|
||||
if page_content_columns is None:
|
||||
page_content_columns = [column.name for column in query_result.schema]
|
||||
if metadata_columns is None:
|
||||
metadata_columns = []
|
||||
|
||||
for row in query_result:
|
||||
page_content = "\n".join(
|
||||
f"{k}: {v}" for k, v in row.items() if k in page_content_columns
|
||||
)
|
||||
metadata = {k: v for k, v in row.items() if k in metadata_columns}
|
||||
doc = Document(page_content=page_content, metadata=metadata)
|
||||
docs.append(doc)
|
||||
|
||||
return docs
|
50
tests/integration_tests/document_loaders/test_bigquery.py
Normal file
50
tests/integration_tests/document_loaders/test_bigquery.py
Normal file
@ -0,0 +1,50 @@
|
||||
import pytest
|
||||
|
||||
from langchain.document_loaders.bigquery import BigQueryLoader
|
||||
|
||||
try:
|
||||
from google.cloud import bigquery # noqa: F401
|
||||
|
||||
bigquery_installed = True
|
||||
except ImportError:
|
||||
bigquery_installed = False
|
||||
|
||||
|
||||
@pytest.mark.skipif(not bigquery_installed, reason="bigquery not installed")
|
||||
def test_bigquery_loader_no_options() -> None:
|
||||
loader = BigQueryLoader("SELECT 1 AS a, 2 AS b")
|
||||
docs = loader.load()
|
||||
|
||||
assert len(docs) == 1
|
||||
assert docs[0].page_content == "a: 1\nb: 2"
|
||||
assert docs[0].metadata == {}
|
||||
|
||||
|
||||
@pytest.mark.skipif(not bigquery_installed, reason="bigquery not installed")
|
||||
def test_bigquery_loader_page_content_columns() -> None:
|
||||
loader = BigQueryLoader(
|
||||
"SELECT 1 AS a, 2 AS b UNION ALL SELECT 3 AS a, 4 AS b",
|
||||
page_content_columns=["a"],
|
||||
)
|
||||
docs = loader.load()
|
||||
|
||||
assert len(docs) == 2
|
||||
assert docs[0].page_content == "a: 1"
|
||||
assert docs[0].metadata == {}
|
||||
|
||||
assert docs[1].page_content == "a: 3"
|
||||
assert docs[1].metadata == {}
|
||||
|
||||
|
||||
@pytest.mark.skipif(not bigquery_installed, reason="bigquery not installed")
|
||||
def test_bigquery_loader_metadata_columns() -> None:
|
||||
loader = BigQueryLoader(
|
||||
"SELECT 1 AS a, 2 AS b",
|
||||
page_content_columns=["a"],
|
||||
metadata_columns=["b"],
|
||||
)
|
||||
docs = loader.load()
|
||||
|
||||
assert len(docs) == 1
|
||||
assert docs[0].page_content == "a: 1"
|
||||
assert docs[0].metadata == {"b": 2}
|
Loading…
Reference in New Issue
Block a user