langchain/libs/community/langchain_community/llms/yandex.py
Dmitry Tyumentsev e86e66bad7
community[patch]: YandexGPT models - add sleep_interval (#16566)
Added sleep between requests to prevent errors associated with
simultaneous requests.
2024-01-25 09:07:19 -08:00

307 lines
11 KiB
Python

from __future__ import annotations
import logging
from typing import Any, Callable, Dict, List, Mapping, Optional
from langchain_core.callbacks import (
AsyncCallbackManagerForLLMRun,
CallbackManagerForLLMRun,
)
from langchain_core.language_models.llms import LLM
from langchain_core.load.serializable import Serializable
from langchain_core.pydantic_v1 import SecretStr, root_validator
from langchain_core.utils import convert_to_secret_str, get_from_dict_or_env
from tenacity import (
before_sleep_log,
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from langchain_community.llms.utils import enforce_stop_tokens
logger = logging.getLogger(__name__)
class _BaseYandexGPT(Serializable):
iam_token: SecretStr = ""
"""Yandex Cloud IAM token for service or user account
with the `ai.languageModels.user` role"""
api_key: SecretStr = ""
"""Yandex Cloud Api Key for service account
with the `ai.languageModels.user` role"""
folder_id: str = ""
"""Yandex Cloud folder ID"""
model_uri: str = ""
"""Model uri to use."""
model_name: str = "yandexgpt-lite"
"""Model name to use."""
model_version: str = "latest"
"""Model version to use."""
temperature: float = 0.6
"""What sampling temperature to use.
Should be a double number between 0 (inclusive) and 1 (inclusive)."""
max_tokens: int = 7400
"""Sets the maximum limit on the total number of tokens
used for both the input prompt and the generated response.
Must be greater than zero and not exceed 7400 tokens."""
stop: Optional[List[str]] = None
"""Sequences when completion generation will stop."""
url: str = "llm.api.cloud.yandex.net:443"
"""The url of the API."""
max_retries: int = 6
"""Maximum number of retries to make when generating."""
sleep_interval: float = 1.0
"""Delay between API requests"""
@property
def _llm_type(self) -> str:
return "yandex_gpt"
@property
def _identifying_params(self) -> Mapping[str, Any]:
"""Get the identifying parameters."""
return {
"model_uri": self.model_uri,
"temperature": self.temperature,
"max_tokens": self.max_tokens,
"stop": self.stop,
"max_retries": self.max_retries,
}
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""Validate that iam token exists in environment."""
iam_token = convert_to_secret_str(
get_from_dict_or_env(values, "iam_token", "YC_IAM_TOKEN", "")
)
values["iam_token"] = iam_token
api_key = convert_to_secret_str(
get_from_dict_or_env(values, "api_key", "YC_API_KEY", "")
)
values["api_key"] = api_key
folder_id = get_from_dict_or_env(values, "folder_id", "YC_FOLDER_ID", "")
values["folder_id"] = folder_id
if api_key.get_secret_value() == "" and iam_token.get_secret_value() == "":
raise ValueError("Either 'YC_API_KEY' or 'YC_IAM_TOKEN' must be provided.")
if values["iam_token"]:
values["_grpc_metadata"] = [
("authorization", f"Bearer {values['iam_token'].get_secret_value()}")
]
if values["folder_id"]:
values["_grpc_metadata"].append(("x-folder-id", values["folder_id"]))
else:
values["_grpc_metadata"] = (
("authorization", f"Api-Key {values['api_key'].get_secret_value()}"),
)
if values["model_uri"] == "" and values["folder_id"] == "":
raise ValueError("Either 'model_uri' or 'folder_id' must be provided.")
if not values["model_uri"]:
values[
"model_uri"
] = f"gpt://{values['folder_id']}/{values['model_name']}/{values['model_version']}"
return values
class YandexGPT(_BaseYandexGPT, LLM):
"""Yandex large language models.
To use, you should have the ``yandexcloud`` python package installed.
There are two authentication options for the service account
with the ``ai.languageModels.user`` role:
- You can specify the token in a constructor parameter `iam_token`
or in an environment variable `YC_IAM_TOKEN`.
- You can specify the key in a constructor parameter `api_key`
or in an environment variable `YC_API_KEY`.
To use the default model specify the folder ID in a parameter `folder_id`
or in an environment variable `YC_FOLDER_ID`.
Or specify the model URI in a constructor parameter `model_uri`
Example:
.. code-block:: python
from langchain_community.llms import YandexGPT
yandex_gpt = YandexGPT(iam_token="t1.9eu...", folder_id="b1g...")
"""
def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
"""Call the Yandex GPT model and return the output.
Args:
prompt: The prompt to pass into the model.
stop: Optional list of stop words to use when generating.
Returns:
The string generated by the model.
Example:
.. code-block:: python
response = YandexGPT("Tell me a joke.")
"""
text = completion_with_retry(self, prompt=prompt)
if stop is not None:
text = enforce_stop_tokens(text, stop)
return text
async def _acall(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
"""Async call the Yandex GPT model and return the output.
Args:
prompt: The prompt to pass into the model.
stop: Optional list of stop words to use when generating.
Returns:
The string generated by the model.
"""
text = await acompletion_with_retry(self, prompt=prompt)
if stop is not None:
text = enforce_stop_tokens(text, stop)
return text
def _make_request(
self: YandexGPT,
prompt: str,
) -> str:
try:
import grpc
from google.protobuf.wrappers_pb2 import DoubleValue, Int64Value
from yandex.cloud.ai.foundation_models.v1.foundation_models_pb2 import (
CompletionOptions,
Message,
)
from yandex.cloud.ai.foundation_models.v1.foundation_models_service_pb2 import ( # noqa: E501
CompletionRequest,
)
from yandex.cloud.ai.foundation_models.v1.foundation_models_service_pb2_grpc import ( # noqa: E501
TextGenerationServiceStub,
)
except ImportError as e:
raise ImportError(
"Please install YandexCloud SDK with `pip install yandexcloud` \
or upgrade it to recent version."
) from e
channel_credentials = grpc.ssl_channel_credentials()
channel = grpc.secure_channel(self.url, channel_credentials)
request = CompletionRequest(
model_uri=self.model_uri,
completion_options=CompletionOptions(
temperature=DoubleValue(value=self.temperature),
max_tokens=Int64Value(value=self.max_tokens),
),
messages=[Message(role="user", text=prompt)],
)
stub = TextGenerationServiceStub(channel)
res = stub.Completion(request, metadata=self._grpc_metadata)
return list(res)[0].alternatives[0].message.text
async def _amake_request(self: YandexGPT, prompt: str) -> str:
try:
import asyncio
import grpc
from google.protobuf.wrappers_pb2 import DoubleValue, Int64Value
from yandex.cloud.ai.foundation_models.v1.foundation_models_pb2 import (
CompletionOptions,
Message,
)
from yandex.cloud.ai.foundation_models.v1.foundation_models_service_pb2 import ( # noqa: E501
CompletionRequest,
CompletionResponse,
)
from yandex.cloud.ai.foundation_models.v1.foundation_models_service_pb2_grpc import ( # noqa: E501
TextGenerationAsyncServiceStub,
)
from yandex.cloud.operation.operation_service_pb2 import GetOperationRequest
from yandex.cloud.operation.operation_service_pb2_grpc import (
OperationServiceStub,
)
except ImportError as e:
raise ImportError(
"Please install YandexCloud SDK with `pip install yandexcloud` \
or upgrade it to recent version."
) from e
operation_api_url = "operation.api.cloud.yandex.net:443"
channel_credentials = grpc.ssl_channel_credentials()
async with grpc.aio.secure_channel(self.url, channel_credentials) as channel:
request = CompletionRequest(
model_uri=self.model_uri,
completion_options=CompletionOptions(
temperature=DoubleValue(value=self.temperature),
max_tokens=Int64Value(value=self.max_tokens),
),
messages=[Message(role="user", text=prompt)],
)
stub = TextGenerationAsyncServiceStub(channel)
operation = await stub.Completion(request, metadata=self._grpc_metadata)
async with grpc.aio.secure_channel(
operation_api_url, channel_credentials
) as operation_channel:
operation_stub = OperationServiceStub(operation_channel)
while not operation.done:
await asyncio.sleep(1)
operation_request = GetOperationRequest(operation_id=operation.id)
operation = await operation_stub.Get(
operation_request, metadata=self._grpc_metadata
)
completion_response = CompletionResponse()
operation.response.Unpack(completion_response)
return completion_response.alternatives[0].message.text
def _create_retry_decorator(llm: YandexGPT) -> Callable[[Any], Any]:
from grpc import RpcError
min_seconds = llm.sleep_interval
max_seconds = 60
return retry(
reraise=True,
stop=stop_after_attempt(llm.max_retries),
wait=wait_exponential(multiplier=1, min=min_seconds, max=max_seconds),
retry=(retry_if_exception_type((RpcError))),
before_sleep=before_sleep_log(logger, logging.WARNING),
)
def completion_with_retry(llm: YandexGPT, **kwargs: Any) -> Any:
"""Use tenacity to retry the completion call."""
retry_decorator = _create_retry_decorator(llm)
@retry_decorator
def _completion_with_retry(**_kwargs: Any) -> Any:
return _make_request(llm, **_kwargs)
return _completion_with_retry(**kwargs)
async def acompletion_with_retry(llm: YandexGPT, **kwargs: Any) -> Any:
"""Use tenacity to retry the async completion call."""
retry_decorator = _create_retry_decorator(llm)
@retry_decorator
async def _completion_with_retry(**_kwargs: Any) -> Any:
return await _amake_request(llm, **_kwargs)
return await _completion_with_retry(**kwargs)