From 8faef1a91a0c79124e63f3519cb7461ce113630f Mon Sep 17 00:00:00 2001 From: Justin Flick Date: Wed, 19 Apr 2023 22:50:39 -0500 Subject: [PATCH] Confluence DL retry/backoff (#3168) Implemented a retry/backoff logic in response to #2473 --------- Co-authored-by: Justin Flick --- langchain/document_loaders/confluence.py | 58 +++++++++++++++++++++--- 1 file changed, 51 insertions(+), 7 deletions(-) diff --git a/langchain/document_loaders/confluence.py b/langchain/document_loaders/confluence.py index b8b8218094..56598e64a8 100644 --- a/langchain/document_loaders/confluence.py +++ b/langchain/document_loaders/confluence.py @@ -1,9 +1,19 @@ """Load Data from a Confluence Space""" +import logging from typing import Any, Callable, List, Optional, Union +from tenacity import ( + before_sleep_log, + retry, + stop_after_attempt, + wait_exponential, +) + from langchain.docstore.document import Document from langchain.document_loaders.base import BaseLoader +logger = logging.getLogger(__name__) + class ConfluenceLoader(BaseLoader): """ @@ -44,8 +54,14 @@ class ConfluenceLoader(BaseLoader): :type oauth2: dict, optional :param cloud: _description_, defaults to True :type cloud: bool, optional - :raises ValueError: _description_ - :raises ImportError: _description_ + :param number_of_retries: How many times to retry, defaults to 3 + :type number_of_retries: Optional[int], optional + :param min_retry_seconds: defaults to 2 + :type min_retry_seconds: Optional[int], optional + :param max_retry_seconds: defaults to 10 + :type max_retry_seconds: Optional[int], optional + :raises ValueError: Errors while validating input + :raises ImportError: Required dependencies not installed. """ def __init__( @@ -54,13 +70,19 @@ class ConfluenceLoader(BaseLoader): api_key: Optional[str] = None, username: Optional[str] = None, oauth2: Optional[dict] = None, - cloud: bool = True, + cloud: Optional[bool] = True, + number_of_retries: Optional[int] = 3, + min_retry_seconds: Optional[int] = 2, + max_retry_seconds: Optional[int] = 10, ): errors = ConfluenceLoader.validate_init_args(url, api_key, username, oauth2) if errors: raise ValueError(f"Error(s) while validating input: {errors}") self.base_url = url + self.number_of_retries = number_of_retries + self.min_retry_seconds = min_retry_seconds + self.max_retry_seconds = max_retry_seconds try: from atlassian import Confluence # noqa: F401 @@ -196,9 +218,19 @@ class ConfluenceLoader(BaseLoader): if page_ids: for page_id in page_ids: - page = self.confluence.get_page_by_id( - page_id=page_id, expand="body.storage.value" - ) + get_page = retry( + reraise=True, + stop=stop_after_attempt( + self.number_of_retries # type: ignore[arg-type] + ), + wait=wait_exponential( + multiplier=1, # type: ignore[arg-type] + min=self.min_retry_seconds, # type: ignore[arg-type] + max=self.max_retry_seconds, # type: ignore[arg-type] + ), + before_sleep=before_sleep_log(logger, logging.WARNING), + )(self.confluence.get_page_by_id) + page = get_page(page_id=page_id, expand="body.storage.value") doc = self.process_page(page, include_attachments, text_maker) docs.append(doc) @@ -227,7 +259,19 @@ class ConfluenceLoader(BaseLoader): page = 0 docs = [] while page < limit: - batch = retrieval_method(**kwargs, start=page) + get_pages = retry( + reraise=True, + stop=stop_after_attempt( + self.number_of_retries # type: ignore[arg-type] + ), + wait=wait_exponential( + multiplier=1, + min=self.min_retry_seconds, # type: ignore[arg-type] + max=self.max_retry_seconds, # type: ignore[arg-type] + ), + before_sleep=before_sleep_log(logger, logging.WARNING), + )(retrieval_method) + batch = get_pages(**kwargs, start=page) if len(batch) < limit: page = limit else: