mirror of
https://github.com/hwchase17/langchain
synced 2024-11-16 06:13:16 +00:00
288 lines
9.9 KiB
Python
288 lines
9.9 KiB
Python
from typing import Any, Callable, Iterator, Mapping, Optional
|
|
|
|
from langchain_core.documents import Document
|
|
from langchain_core.utils.utils import guard_import
|
|
|
|
from langchain_community.document_loaders.base import BaseLoader
|
|
|
|
RecordHandler = Callable[[Any, Optional[str]], Document]
|
|
|
|
|
|
class AirbyteCDKLoader(BaseLoader):
|
|
"""Load with an `Airbyte` source connector implemented using the `CDK`."""
|
|
|
|
def __init__(
|
|
self,
|
|
config: Mapping[str, Any],
|
|
source_class: Any,
|
|
stream_name: str,
|
|
record_handler: Optional[RecordHandler] = None,
|
|
state: Optional[Any] = None,
|
|
) -> None:
|
|
"""Initializes the loader.
|
|
|
|
Args:
|
|
config: The config to pass to the source connector.
|
|
source_class: The source connector class.
|
|
stream_name: The name of the stream to load.
|
|
record_handler: A function that takes in a record and an optional id and
|
|
returns a Document. If None, the record will be used as the document.
|
|
Defaults to None.
|
|
state: The state to pass to the source connector. Defaults to None.
|
|
"""
|
|
from airbyte_cdk.models.airbyte_protocol import AirbyteRecordMessage
|
|
from airbyte_cdk.sources.embedded.base_integration import (
|
|
BaseEmbeddedIntegration,
|
|
)
|
|
from airbyte_cdk.sources.embedded.runner import CDKRunner
|
|
|
|
class CDKIntegration(BaseEmbeddedIntegration):
|
|
"""A wrapper around the CDK integration."""
|
|
|
|
def _handle_record(
|
|
self, record: AirbyteRecordMessage, id: Optional[str]
|
|
) -> Document:
|
|
if record_handler:
|
|
return record_handler(record, id)
|
|
return Document(page_content="", metadata=record.data)
|
|
|
|
self._integration = CDKIntegration(
|
|
config=config,
|
|
runner=CDKRunner(source=source_class(), name=source_class.__name__),
|
|
)
|
|
self._stream_name = stream_name
|
|
self._state = state
|
|
|
|
def lazy_load(self) -> Iterator[Document]:
|
|
return self._integration._load_data(
|
|
stream_name=self._stream_name, state=self._state
|
|
)
|
|
|
|
@property
|
|
def last_state(self) -> Any:
|
|
return self._integration.last_state
|
|
|
|
|
|
class AirbyteHubspotLoader(AirbyteCDKLoader):
|
|
"""Load from `Hubspot` using an `Airbyte` source connector."""
|
|
|
|
def __init__(
|
|
self,
|
|
config: Mapping[str, Any],
|
|
stream_name: str,
|
|
record_handler: Optional[RecordHandler] = None,
|
|
state: Optional[Any] = None,
|
|
) -> None:
|
|
"""Initializes the loader.
|
|
|
|
Args:
|
|
config: The config to pass to the source connector.
|
|
stream_name: The name of the stream to load.
|
|
record_handler: A function that takes in a record and an optional id and
|
|
returns a Document. If None, the record will be used as the document.
|
|
Defaults to None.
|
|
state: The state to pass to the source connector. Defaults to None.
|
|
"""
|
|
source_class = guard_import(
|
|
"source_hubspot", pip_name="airbyte-source-hubspot"
|
|
).SourceHubspot
|
|
super().__init__(
|
|
config=config,
|
|
source_class=source_class,
|
|
stream_name=stream_name,
|
|
record_handler=record_handler,
|
|
state=state,
|
|
)
|
|
|
|
|
|
class AirbyteStripeLoader(AirbyteCDKLoader):
|
|
"""Load from `Stripe` using an `Airbyte` source connector."""
|
|
|
|
def __init__(
|
|
self,
|
|
config: Mapping[str, Any],
|
|
stream_name: str,
|
|
record_handler: Optional[RecordHandler] = None,
|
|
state: Optional[Any] = None,
|
|
) -> None:
|
|
"""Initializes the loader.
|
|
|
|
Args:
|
|
config: The config to pass to the source connector.
|
|
stream_name: The name of the stream to load.
|
|
record_handler: A function that takes in a record and an optional id and
|
|
returns a Document. If None, the record will be used as the document.
|
|
Defaults to None.
|
|
state: The state to pass to the source connector. Defaults to None.
|
|
"""
|
|
source_class = guard_import(
|
|
"source_stripe", pip_name="airbyte-source-stripe"
|
|
).SourceStripe
|
|
super().__init__(
|
|
config=config,
|
|
source_class=source_class,
|
|
stream_name=stream_name,
|
|
record_handler=record_handler,
|
|
state=state,
|
|
)
|
|
|
|
|
|
class AirbyteTypeformLoader(AirbyteCDKLoader):
|
|
"""Load from `Typeform` using an `Airbyte` source connector."""
|
|
|
|
def __init__(
|
|
self,
|
|
config: Mapping[str, Any],
|
|
stream_name: str,
|
|
record_handler: Optional[RecordHandler] = None,
|
|
state: Optional[Any] = None,
|
|
) -> None:
|
|
"""Initializes the loader.
|
|
|
|
Args:
|
|
config: The config to pass to the source connector.
|
|
stream_name: The name of the stream to load.
|
|
record_handler: A function that takes in a record and an optional id and
|
|
returns a Document. If None, the record will be used as the document.
|
|
Defaults to None.
|
|
state: The state to pass to the source connector. Defaults to None.
|
|
"""
|
|
source_class = guard_import(
|
|
"source_typeform", pip_name="airbyte-source-typeform"
|
|
).SourceTypeform
|
|
super().__init__(
|
|
config=config,
|
|
source_class=source_class,
|
|
stream_name=stream_name,
|
|
record_handler=record_handler,
|
|
state=state,
|
|
)
|
|
|
|
|
|
class AirbyteZendeskSupportLoader(AirbyteCDKLoader):
|
|
"""Load from `Zendesk Support` using an `Airbyte` source connector."""
|
|
|
|
def __init__(
|
|
self,
|
|
config: Mapping[str, Any],
|
|
stream_name: str,
|
|
record_handler: Optional[RecordHandler] = None,
|
|
state: Optional[Any] = None,
|
|
) -> None:
|
|
"""Initializes the loader.
|
|
|
|
Args:
|
|
config: The config to pass to the source connector.
|
|
stream_name: The name of the stream to load.
|
|
record_handler: A function that takes in a record and an optional id and
|
|
returns a Document. If None, the record will be used as the document.
|
|
Defaults to None.
|
|
state: The state to pass to the source connector. Defaults to None.
|
|
"""
|
|
source_class = guard_import(
|
|
"source_zendesk_support", pip_name="airbyte-source-zendesk-support"
|
|
).SourceZendeskSupport
|
|
super().__init__(
|
|
config=config,
|
|
source_class=source_class,
|
|
stream_name=stream_name,
|
|
record_handler=record_handler,
|
|
state=state,
|
|
)
|
|
|
|
|
|
class AirbyteShopifyLoader(AirbyteCDKLoader):
|
|
"""Load from `Shopify` using an `Airbyte` source connector."""
|
|
|
|
def __init__(
|
|
self,
|
|
config: Mapping[str, Any],
|
|
stream_name: str,
|
|
record_handler: Optional[RecordHandler] = None,
|
|
state: Optional[Any] = None,
|
|
) -> None:
|
|
"""Initializes the loader.
|
|
|
|
Args:
|
|
config: The config to pass to the source connector.
|
|
stream_name: The name of the stream to load.
|
|
record_handler: A function that takes in a record and an optional id and
|
|
returns a Document. If None, the record will be used as the document.
|
|
Defaults to None.
|
|
state: The state to pass to the source connector. Defaults to None.
|
|
"""
|
|
source_class = guard_import(
|
|
"source_shopify", pip_name="airbyte-source-shopify"
|
|
).SourceShopify
|
|
super().__init__(
|
|
config=config,
|
|
source_class=source_class,
|
|
stream_name=stream_name,
|
|
record_handler=record_handler,
|
|
state=state,
|
|
)
|
|
|
|
|
|
class AirbyteSalesforceLoader(AirbyteCDKLoader):
|
|
"""Load from `Salesforce` using an `Airbyte` source connector."""
|
|
|
|
def __init__(
|
|
self,
|
|
config: Mapping[str, Any],
|
|
stream_name: str,
|
|
record_handler: Optional[RecordHandler] = None,
|
|
state: Optional[Any] = None,
|
|
) -> None:
|
|
"""Initializes the loader.
|
|
|
|
Args:
|
|
config: The config to pass to the source connector.
|
|
stream_name: The name of the stream to load.
|
|
record_handler: A function that takes in a record and an optional id and
|
|
returns a Document. If None, the record will be used as the document.
|
|
Defaults to None.
|
|
state: The state to pass to the source connector. Defaults to None.
|
|
"""
|
|
source_class = guard_import(
|
|
"source_salesforce", pip_name="airbyte-source-salesforce"
|
|
).SourceSalesforce
|
|
super().__init__(
|
|
config=config,
|
|
source_class=source_class,
|
|
stream_name=stream_name,
|
|
record_handler=record_handler,
|
|
state=state,
|
|
)
|
|
|
|
|
|
class AirbyteGongLoader(AirbyteCDKLoader):
|
|
"""Load from `Gong` using an `Airbyte` source connector."""
|
|
|
|
def __init__(
|
|
self,
|
|
config: Mapping[str, Any],
|
|
stream_name: str,
|
|
record_handler: Optional[RecordHandler] = None,
|
|
state: Optional[Any] = None,
|
|
) -> None:
|
|
"""Initializes the loader.
|
|
|
|
Args:
|
|
config: The config to pass to the source connector.
|
|
stream_name: The name of the stream to load.
|
|
record_handler: A function that takes in a record and an optional id and
|
|
returns a Document. If None, the record will be used as the document.
|
|
Defaults to None.
|
|
state: The state to pass to the source connector. Defaults to None.
|
|
"""
|
|
source_class = guard_import(
|
|
"source_gong", pip_name="airbyte-source-gong"
|
|
).SourceGong
|
|
super().__init__(
|
|
config=config,
|
|
source_class=source_class,
|
|
stream_name=stream_name,
|
|
record_handler=record_handler,
|
|
state=state,
|
|
)
|