{ "cells": [ { "cell_type": "markdown", "id": "1f3a5ebf", "metadata": {}, "source": [ "# Airbyte CDK" ] }, { "cell_type": "markdown", "id": "35ac77b1-449b-44f7-b8f3-3494d55c286e", "metadata": {}, "source": [ ">[Airbyte](https://github.com/airbytehq/airbyte) is a data integration platform for ELT pipelines from APIs, databases & files to warehouses & lakes. It has the largest catalog of ELT connectors to data warehouses and databases.\n", "\n", "A lot of source connectors are implemented using the [Airbyte CDK](https://docs.airbyte.com/connector-development/cdk-python/). This loader allows to run any of these connectors and return the data as documents." ] }, { "cell_type": "markdown", "id": "3b06fbde", "metadata": {}, "source": [ "## Installation" ] }, { "cell_type": "markdown", "id": "e3e9dc79", "metadata": {}, "source": [ "First, you need to install the `airbyte-cdk` python package." ] }, { "cell_type": "code", "execution_count": null, "id": "4d35e4e0", "metadata": {}, "outputs": [], "source": [ "#!pip install airbyte-cdk" ] }, { "cell_type": "markdown", "id": "085aa658", "metadata": {}, "source": [ "Then, either install an existing connector from the [Airbyte Github repository](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors) or create your own connector using the [Airbyte CDK](https://docs.airbyte.io/connector-development/connector-development).\n", "\n", "For example, to install the Github connector, run" ] }, { "cell_type": "code", "execution_count": null, "id": "f6d04ef4", "metadata": {}, "outputs": [], "source": [ "#!pip install \"source_github@git+https://github.com/airbytehq/airbyte.git@master#subdirectory=airbyte-integrations/connectors/source-github\"" ] }, { "cell_type": "markdown", "id": "36069b74", "metadata": {}, "source": [ "Some sources are also published as regular packages on PyPI" ] }, { "cell_type": "markdown", "id": "ae855210", "metadata": {}, "source": [ "## Example" ] }, { "cell_type": "markdown", "id": "02208f52", "metadata": {}, "source": [ "Now you can create an `AirbyteCDKLoader` based on the imported source. It takes a `config` object that's passed to the connector. You also have to pick the stream you want to retrieve records from by name (`stream_name`). Check the connectors documentation page and spec definition for more information on the config object and available streams. For the Github connectors these are:\n", "* [https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-github/source_github/spec.json](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-github/source_github/spec.json).\n", "* [https://docs.airbyte.com/integrations/sources/github/](https://docs.airbyte.com/integrations/sources/github/)" ] }, { "cell_type": "code", "execution_count": null, "id": "89a99e58", "metadata": {}, "outputs": [], "source": [ "\n", "from langchain.document_loaders.airbyte import AirbyteCDKLoader\n", "from source_github.source import SourceGithub # plug in your own source here\n", "\n", "config = {\n", " # your github configuration\n", " \"credentials\": {\n", " \"api_url\": \"api.github.com\",\n", " \"personal_access_token\": \"\"\n", " },\n", " \"repository\": \"\",\n", " \"start_date\": \"\"\n", "}\n", "\n", "issues_loader = AirbyteCDKLoader(source_class=SourceGithub, config=config, stream_name=\"issues\")" ] }, { "cell_type": "markdown", "id": "2cea23fc", "metadata": {}, "source": [ "Now you can load documents the usual way" ] }, { "cell_type": "code", "execution_count": null, "id": "dae75cdb", "metadata": {}, "outputs": [], "source": [ "docs = issues_loader.load()" ] }, { "cell_type": "markdown", "id": "4a93dc2a", "metadata": {}, "source": [ "As `load` returns a list, it will block until all documents are loaded. To have better control over this process, you can also you the `lazy_load` method which returns an iterator instead:" ] }, { "cell_type": "code", "execution_count": null, "id": "1782db09", "metadata": {}, "outputs": [], "source": [ "docs_iterator = issues_loader.lazy_load()" ] }, { "cell_type": "markdown", "id": "3a124086", "metadata": {}, "source": [ "Keep in mind that by default the page content is empty and the metadata object contains all the information from the record. To create documents in a different, pass in a record_handler function when creating the loader:" ] }, { "cell_type": "code", "execution_count": null, "id": "5671395d", "metadata": {}, "outputs": [], "source": [ "from langchain.docstore.document import Document\n", "\n", "def handle_record(record, id):\n", " return Document(page_content=record.data[\"title\"] + \"\\n\" + (record.data[\"body\"] or \"\"), metadata=record.data)\n", "\n", "issues_loader = AirbyteCDKLoader(source_class=SourceGithub, config=config, stream_name=\"issues\", record_handler=handle_record)\n", "\n", "docs = issues_loader.load()" ] }, { "cell_type": "markdown", "id": "223eb8bc", "metadata": {}, "source": [ "## Incremental loads\n", "\n", "Some streams allow incremental loading, this means the source keeps track of synced records and won't load them again. This is useful for sources that have a high volume of data and are updated frequently.\n", "\n", "To take advantage of this, store the `last_state` property of the loader and pass it in when creating the loader again. This will ensure that only new records are loaded." ] }, { "cell_type": "code", "execution_count": null, "id": "7061e735", "metadata": {}, "outputs": [], "source": [ "last_state = issues_loader.last_state # store safely\n", "\n", "incremental_issue_loader = AirbyteCDKLoader(source_class=SourceGithub, config=config, stream_name=\"issues\", state=last_state)\n", "\n", "new_docs = incremental_issue_loader.load()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.6" } }, "nbformat": 4, "nbformat_minor": 5 }