mirror of
https://github.com/hwchase17/langchain
synced 2024-10-31 15:20:26 +00:00
227 lines
6.8 KiB
Plaintext
227 lines
6.8 KiB
Plaintext
|
{
|
||
|
"cells": [
|
||
|
{
|
||
|
"cell_type": "markdown",
|
||
|
"id": "1f3a5ebf",
|
||
|
"metadata": {},
|
||
|
"source": [
|
||
|
"# Airbyte CDK"
|
||
|
]
|
||
|
},
|
||
|
{
|
||
|
"cell_type": "markdown",
|
||
|
"id": "35ac77b1-449b-44f7-b8f3-3494d55c286e",
|
||
|
"metadata": {},
|
||
|
"source": [
|
||
|
">[Airbyte](https://github.com/airbytehq/airbyte) is a data integration platform for ELT pipelines from APIs, databases & files to warehouses & lakes. It has the largest catalog of ELT connectors to data warehouses and databases.\n",
|
||
|
"\n",
|
||
|
"A lot of source connectors are implemented using the [Airbyte CDK](https://docs.airbyte.com/connector-development/cdk-python/). This loader allows to run any of these connectors and return the data as documents."
|
||
|
]
|
||
|
},
|
||
|
{
|
||
|
"cell_type": "markdown",
|
||
|
"id": "3b06fbde",
|
||
|
"metadata": {},
|
||
|
"source": [
|
||
|
"## Installation"
|
||
|
]
|
||
|
},
|
||
|
{
|
||
|
"cell_type": "markdown",
|
||
|
"id": "e3e9dc79",
|
||
|
"metadata": {},
|
||
|
"source": [
|
||
|
"First, you need to install the `airbyte-cdk` python package."
|
||
|
]
|
||
|
},
|
||
|
{
|
||
|
"cell_type": "code",
|
||
|
"execution_count": null,
|
||
|
"id": "4d35e4e0",
|
||
|
"metadata": {},
|
||
|
"outputs": [],
|
||
|
"source": [
|
||
|
"#!pip install airbyte-cdk"
|
||
|
]
|
||
|
},
|
||
|
{
|
||
|
"cell_type": "markdown",
|
||
|
"id": "085aa658",
|
||
|
"metadata": {},
|
||
|
"source": [
|
||
|
"Then, either install an existing connector from the [Airbyte Github repository](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors) or create your own connector using the [Airbyte CDK](https://docs.airbyte.io/connector-development/connector-development).\n",
|
||
|
"\n",
|
||
|
"For example, to install the Github connector, run"
|
||
|
]
|
||
|
},
|
||
|
{
|
||
|
"cell_type": "code",
|
||
|
"execution_count": null,
|
||
|
"id": "f6d04ef4",
|
||
|
"metadata": {},
|
||
|
"outputs": [],
|
||
|
"source": [
|
||
|
"#!pip install \"source_github@git+https://github.com/airbytehq/airbyte.git@master#subdirectory=airbyte-integrations/connectors/source-github\""
|
||
|
]
|
||
|
},
|
||
|
{
|
||
|
"cell_type": "markdown",
|
||
|
"id": "36069b74",
|
||
|
"metadata": {},
|
||
|
"source": [
|
||
|
"Some sources are also published as regular packages on PyPI"
|
||
|
]
|
||
|
},
|
||
|
{
|
||
|
"cell_type": "markdown",
|
||
|
"id": "ae855210",
|
||
|
"metadata": {},
|
||
|
"source": [
|
||
|
"## Example"
|
||
|
]
|
||
|
},
|
||
|
{
|
||
|
"cell_type": "markdown",
|
||
|
"id": "02208f52",
|
||
|
"metadata": {},
|
||
|
"source": [
|
||
|
"Now you can create an `AirbyteCDKLoader` based on the imported source. It takes a `config` object that's passed to the connector. You also have to pick the stream you want to retrieve records from by name (`stream_name`). Check the connectors documentation page and spec definition for more information on the config object and available streams. For the Github connectors these are:\n",
|
||
|
"* [https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-github/source_github/spec.json](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-github/source_github/spec.json).\n",
|
||
|
"* [https://docs.airbyte.com/integrations/sources/github/](https://docs.airbyte.com/integrations/sources/github/)"
|
||
|
]
|
||
|
},
|
||
|
{
|
||
|
"cell_type": "code",
|
||
|
"execution_count": null,
|
||
|
"id": "89a99e58",
|
||
|
"metadata": {},
|
||
|
"outputs": [],
|
||
|
"source": [
|
||
|
"\n",
|
||
|
"from langchain.document_loaders.airbyte import AirbyteCDKLoader\n",
|
||
|
"from source_github.source import SourceGithub # plug in your own source here\n",
|
||
|
"\n",
|
||
|
"config = {\n",
|
||
|
" # your github configuration\n",
|
||
|
" \"credentials\": {\n",
|
||
|
" \"api_url\": \"api.github.com\",\n",
|
||
|
" \"personal_access_token\": \"<token>\"\n",
|
||
|
" },\n",
|
||
|
" \"repository\": \"<repo>\",\n",
|
||
|
" \"start_date\": \"<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>\"\n",
|
||
|
"}\n",
|
||
|
"\n",
|
||
|
"issues_loader = AirbyteCDKLoader(source_class=SourceGithub, config=config, stream_name=\"issues\")"
|
||
|
]
|
||
|
},
|
||
|
{
|
||
|
"cell_type": "markdown",
|
||
|
"id": "2cea23fc",
|
||
|
"metadata": {},
|
||
|
"source": [
|
||
|
"Now you can load documents the usual way"
|
||
|
]
|
||
|
},
|
||
|
{
|
||
|
"cell_type": "code",
|
||
|
"execution_count": null,
|
||
|
"id": "dae75cdb",
|
||
|
"metadata": {},
|
||
|
"outputs": [],
|
||
|
"source": [
|
||
|
"docs = issues_loader.load()"
|
||
|
]
|
||
|
},
|
||
|
{
|
||
|
"cell_type": "markdown",
|
||
|
"id": "4a93dc2a",
|
||
|
"metadata": {},
|
||
|
"source": [
|
||
|
"As `load` returns a list, it will block until all documents are loaded. To have better control over this process, you can also you the `lazy_load` method which returns an iterator instead:"
|
||
|
]
|
||
|
},
|
||
|
{
|
||
|
"cell_type": "code",
|
||
|
"execution_count": null,
|
||
|
"id": "1782db09",
|
||
|
"metadata": {},
|
||
|
"outputs": [],
|
||
|
"source": [
|
||
|
"docs_iterator = issues_loader.lazy_load()"
|
||
|
]
|
||
|
},
|
||
|
{
|
||
|
"cell_type": "markdown",
|
||
|
"id": "3a124086",
|
||
|
"metadata": {},
|
||
|
"source": [
|
||
|
"Keep in mind that by default the page content is empty and the metadata object contains all the information from the record. To create documents in a different, pass in a record_handler function when creating the loader:"
|
||
|
]
|
||
|
},
|
||
|
{
|
||
|
"cell_type": "code",
|
||
|
"execution_count": null,
|
||
|
"id": "5671395d",
|
||
|
"metadata": {},
|
||
|
"outputs": [],
|
||
|
"source": [
|
||
|
"from langchain.docstore.document import Document\n",
|
||
|
"\n",
|
||
|
"def handle_record(record, id):\n",
|
||
|
" return Document(page_content=record.data[\"title\"] + \"\\n\" + (record.data[\"body\"] or \"\"), metadata=record.data)\n",
|
||
|
"\n",
|
||
|
"issues_loader = AirbyteCDKLoader(source_class=SourceGithub, config=config, stream_name=\"issues\", record_handler=handle_record)\n",
|
||
|
"\n",
|
||
|
"docs = issues_loader.load()"
|
||
|
]
|
||
|
},
|
||
|
{
|
||
|
"cell_type": "markdown",
|
||
|
"id": "223eb8bc",
|
||
|
"metadata": {},
|
||
|
"source": [
|
||
|
"## Incremental loads\n",
|
||
|
"\n",
|
||
|
"Some streams allow incremental loading, this means the source keeps track of synced records and won't load them again. This is useful for sources that have a high volume of data and are updated frequently.\n",
|
||
|
"\n",
|
||
|
"To take advantage of this, store the `last_state` property of the loader and pass it in when creating the loader again. This will ensure that only new records are loaded."
|
||
|
]
|
||
|
},
|
||
|
{
|
||
|
"cell_type": "code",
|
||
|
"execution_count": null,
|
||
|
"id": "7061e735",
|
||
|
"metadata": {},
|
||
|
"outputs": [],
|
||
|
"source": [
|
||
|
"last_state = issues_loader.last_state # store safely\n",
|
||
|
"\n",
|
||
|
"incremental_issue_loader = AirbyteCDKLoader(source_class=SourceGithub, config=config, stream_name=\"issues\", state=last_state)\n",
|
||
|
"\n",
|
||
|
"new_docs = incremental_issue_loader.load()"
|
||
|
]
|
||
|
}
|
||
|
],
|
||
|
"metadata": {
|
||
|
"kernelspec": {
|
||
|
"display_name": "Python 3 (ipykernel)",
|
||
|
"language": "python",
|
||
|
"name": "python3"
|
||
|
},
|
||
|
"language_info": {
|
||
|
"codemirror_mode": {
|
||
|
"name": "ipython",
|
||
|
"version": 3
|
||
|
},
|
||
|
"file_extension": ".py",
|
||
|
"mimetype": "text/x-python",
|
||
|
"name": "python",
|
||
|
"nbconvert_exporter": "python",
|
||
|
"pygments_lexer": "ipython3",
|
||
|
"version": "3.10.6"
|
||
|
}
|
||
|
},
|
||
|
"nbformat": 4,
|
||
|
"nbformat_minor": 5
|
||
|
}
|