forked from Archives/langchain
Confluence DL retry/backoff (#3168)
Implemented a retry/backoff logic in response to #2473 --------- Co-authored-by: Justin Flick <jflick@homesite.com>
This commit is contained in:
parent
c03a65c6dc
commit
8faef1a91a
@ -1,9 +1,19 @@
|
||||
"""Load Data from a Confluence Space"""
|
||||
import logging
|
||||
from typing import Any, Callable, List, Optional, Union
|
||||
|
||||
from tenacity import (
|
||||
before_sleep_log,
|
||||
retry,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
)
|
||||
|
||||
from langchain.docstore.document import Document
|
||||
from langchain.document_loaders.base import BaseLoader
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ConfluenceLoader(BaseLoader):
|
||||
"""
|
||||
@ -44,8 +54,14 @@ class ConfluenceLoader(BaseLoader):
|
||||
:type oauth2: dict, optional
|
||||
:param cloud: _description_, defaults to True
|
||||
:type cloud: bool, optional
|
||||
:raises ValueError: _description_
|
||||
:raises ImportError: _description_
|
||||
:param number_of_retries: How many times to retry, defaults to 3
|
||||
:type number_of_retries: Optional[int], optional
|
||||
:param min_retry_seconds: defaults to 2
|
||||
:type min_retry_seconds: Optional[int], optional
|
||||
:param max_retry_seconds: defaults to 10
|
||||
:type max_retry_seconds: Optional[int], optional
|
||||
:raises ValueError: Errors while validating input
|
||||
:raises ImportError: Required dependencies not installed.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@ -54,13 +70,19 @@ class ConfluenceLoader(BaseLoader):
|
||||
api_key: Optional[str] = None,
|
||||
username: Optional[str] = None,
|
||||
oauth2: Optional[dict] = None,
|
||||
cloud: bool = True,
|
||||
cloud: Optional[bool] = True,
|
||||
number_of_retries: Optional[int] = 3,
|
||||
min_retry_seconds: Optional[int] = 2,
|
||||
max_retry_seconds: Optional[int] = 10,
|
||||
):
|
||||
errors = ConfluenceLoader.validate_init_args(url, api_key, username, oauth2)
|
||||
if errors:
|
||||
raise ValueError(f"Error(s) while validating input: {errors}")
|
||||
|
||||
self.base_url = url
|
||||
self.number_of_retries = number_of_retries
|
||||
self.min_retry_seconds = min_retry_seconds
|
||||
self.max_retry_seconds = max_retry_seconds
|
||||
|
||||
try:
|
||||
from atlassian import Confluence # noqa: F401
|
||||
@ -196,9 +218,19 @@ class ConfluenceLoader(BaseLoader):
|
||||
|
||||
if page_ids:
|
||||
for page_id in page_ids:
|
||||
page = self.confluence.get_page_by_id(
|
||||
page_id=page_id, expand="body.storage.value"
|
||||
)
|
||||
get_page = retry(
|
||||
reraise=True,
|
||||
stop=stop_after_attempt(
|
||||
self.number_of_retries # type: ignore[arg-type]
|
||||
),
|
||||
wait=wait_exponential(
|
||||
multiplier=1, # type: ignore[arg-type]
|
||||
min=self.min_retry_seconds, # type: ignore[arg-type]
|
||||
max=self.max_retry_seconds, # type: ignore[arg-type]
|
||||
),
|
||||
before_sleep=before_sleep_log(logger, logging.WARNING),
|
||||
)(self.confluence.get_page_by_id)
|
||||
page = get_page(page_id=page_id, expand="body.storage.value")
|
||||
doc = self.process_page(page, include_attachments, text_maker)
|
||||
docs.append(doc)
|
||||
|
||||
@ -227,7 +259,19 @@ class ConfluenceLoader(BaseLoader):
|
||||
page = 0
|
||||
docs = []
|
||||
while page < limit:
|
||||
batch = retrieval_method(**kwargs, start=page)
|
||||
get_pages = retry(
|
||||
reraise=True,
|
||||
stop=stop_after_attempt(
|
||||
self.number_of_retries # type: ignore[arg-type]
|
||||
),
|
||||
wait=wait_exponential(
|
||||
multiplier=1,
|
||||
min=self.min_retry_seconds, # type: ignore[arg-type]
|
||||
max=self.max_retry_seconds, # type: ignore[arg-type]
|
||||
),
|
||||
before_sleep=before_sleep_log(logger, logging.WARNING),
|
||||
)(retrieval_method)
|
||||
batch = get_pages(**kwargs, start=page)
|
||||
if len(batch) < limit:
|
||||
page = limit
|
||||
else:
|
||||
|
Loading…
Reference in New Issue
Block a user