forked from Archives/langchain
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
302 lines
11 KiB
Python
302 lines
11 KiB
Python
"""Wrapper around OpenAI embedding models."""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import (
|
|
Any,
|
|
Callable,
|
|
Dict,
|
|
List,
|
|
Literal,
|
|
Optional,
|
|
Sequence,
|
|
Set,
|
|
Tuple,
|
|
Union,
|
|
)
|
|
|
|
import numpy as np
|
|
from pydantic import BaseModel, Extra, root_validator
|
|
from tenacity import (
|
|
before_sleep_log,
|
|
retry,
|
|
retry_if_exception_type,
|
|
stop_after_attempt,
|
|
wait_exponential,
|
|
)
|
|
|
|
from langchain.embeddings.base import Embeddings
|
|
from langchain.utils import get_from_dict_or_env
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _create_retry_decorator(embeddings: OpenAIEmbeddings) -> Callable[[Any], Any]:
|
|
import openai
|
|
|
|
min_seconds = 4
|
|
max_seconds = 10
|
|
# Wait 2^x * 1 second between each retry starting with
|
|
# 4 seconds, then up to 10 seconds, then 10 seconds afterwards
|
|
return retry(
|
|
reraise=True,
|
|
stop=stop_after_attempt(embeddings.max_retries),
|
|
wait=wait_exponential(multiplier=1, min=min_seconds, max=max_seconds),
|
|
retry=(
|
|
retry_if_exception_type(openai.error.Timeout)
|
|
| retry_if_exception_type(openai.error.APIError)
|
|
| retry_if_exception_type(openai.error.APIConnectionError)
|
|
| retry_if_exception_type(openai.error.RateLimitError)
|
|
| retry_if_exception_type(openai.error.ServiceUnavailableError)
|
|
),
|
|
before_sleep=before_sleep_log(logger, logging.WARNING),
|
|
)
|
|
|
|
|
|
def embed_with_retry(embeddings: OpenAIEmbeddings, **kwargs: Any) -> Any:
|
|
"""Use tenacity to retry the embedding call."""
|
|
retry_decorator = _create_retry_decorator(embeddings)
|
|
|
|
@retry_decorator
|
|
def _embed_with_retry(**kwargs: Any) -> Any:
|
|
return embeddings.client.create(**kwargs)
|
|
|
|
return _embed_with_retry(**kwargs)
|
|
|
|
|
|
class OpenAIEmbeddings(BaseModel, Embeddings):
|
|
"""Wrapper around OpenAI embedding models.
|
|
|
|
To use, you should have the ``openai`` python package installed, and the
|
|
environment variable ``OPENAI_API_KEY`` set with your API key or pass it
|
|
as a named parameter to the constructor.
|
|
|
|
Example:
|
|
.. code-block:: python
|
|
|
|
from langchain.embeddings import OpenAIEmbeddings
|
|
openai = OpenAIEmbeddings(openai_api_key="my-api-key")
|
|
|
|
In order to use the library with Microsoft Azure endpoints, you need to set
|
|
the OPENAI_API_TYPE, OPENAI_API_BASE, OPENAI_API_KEY and OPENAI_API_VERSION.
|
|
The OPENAI_API_TYPE must be set to 'azure' and the others correspond to
|
|
the properties of your endpoint.
|
|
In addition, the deployment name must be passed as the model parameter.
|
|
|
|
Example:
|
|
.. code-block:: python
|
|
|
|
import os
|
|
os.environ["OPENAI_API_TYPE"] = "azure"
|
|
os.environ["OPENAI_API_BASE"] = "https://<your-endpoint.openai.azure.com/"
|
|
os.environ["OPENAI_API_KEY"] = "your AzureOpenAI key"
|
|
os.environ["OPENAI_API_VERSION"] = "2023-03-15-preview"
|
|
|
|
from langchain.embeddings.openai import OpenAIEmbeddings
|
|
embeddings = OpenAIEmbeddings(
|
|
deployment="your-embeddings-deployment-name",
|
|
model="your-embeddings-model-name",
|
|
api_base="https://your-endpoint.openai.azure.com/",
|
|
api_type="azure",
|
|
)
|
|
text = "This is a test query."
|
|
query_result = embeddings.embed_query(text)
|
|
|
|
"""
|
|
|
|
client: Any #: :meta private:
|
|
model: str = "text-embedding-ada-002"
|
|
deployment: str = model # to support Azure OpenAI Service custom deployment names
|
|
openai_api_version: Optional[str] = None
|
|
# to support Azure OpenAI Service custom endpoints
|
|
openai_api_base: Optional[str] = None
|
|
# to support Azure OpenAI Service custom endpoints
|
|
openai_api_type: Optional[str] = None
|
|
embedding_ctx_length: int = 8191
|
|
openai_api_key: Optional[str] = None
|
|
openai_organization: Optional[str] = None
|
|
allowed_special: Union[Literal["all"], Set[str]] = set()
|
|
disallowed_special: Union[Literal["all"], Set[str], Sequence[str]] = "all"
|
|
chunk_size: int = 1000
|
|
"""Maximum number of texts to embed in each batch"""
|
|
max_retries: int = 6
|
|
"""Maximum number of retries to make when generating."""
|
|
request_timeout: Optional[Union[float, Tuple[float, float]]] = None
|
|
"""Timeout in seconds for the OpenAPI request."""
|
|
headers: Any = None
|
|
|
|
class Config:
|
|
"""Configuration for this pydantic object."""
|
|
|
|
extra = Extra.forbid
|
|
|
|
@root_validator()
|
|
def validate_environment(cls, values: Dict) -> Dict:
|
|
"""Validate that api key and python package exists in environment."""
|
|
openai_api_key = get_from_dict_or_env(
|
|
values, "openai_api_key", "OPENAI_API_KEY"
|
|
)
|
|
openai_api_base = get_from_dict_or_env(
|
|
values,
|
|
"openai_api_base",
|
|
"OPENAI_API_BASE",
|
|
default="",
|
|
)
|
|
openai_api_type = get_from_dict_or_env(
|
|
values,
|
|
"openai_api_type",
|
|
"OPENAI_API_TYPE",
|
|
default="",
|
|
)
|
|
if openai_api_type in ("azure", "azure_ad", "azuread"):
|
|
default_api_version = "2022-12-01"
|
|
else:
|
|
default_api_version = ""
|
|
openai_api_version = get_from_dict_or_env(
|
|
values,
|
|
"openai_api_version",
|
|
"OPENAI_API_VERSION",
|
|
default=default_api_version,
|
|
)
|
|
openai_organization = get_from_dict_or_env(
|
|
values,
|
|
"openai_organization",
|
|
"OPENAI_ORGANIZATION",
|
|
default="",
|
|
)
|
|
try:
|
|
import openai
|
|
|
|
openai.api_key = openai_api_key
|
|
if openai_organization:
|
|
openai.organization = openai_organization
|
|
if openai_api_base:
|
|
openai.api_base = openai_api_base
|
|
if openai_api_type:
|
|
openai.api_version = openai_api_version
|
|
if openai_api_type:
|
|
openai.api_type = openai_api_type
|
|
values["client"] = openai.Embedding
|
|
except ImportError:
|
|
raise ValueError(
|
|
"Could not import openai python package. "
|
|
"Please install it with `pip install openai`."
|
|
)
|
|
return values
|
|
|
|
# please refer to
|
|
# https://github.com/openai/openai-cookbook/blob/main/examples/Embedding_long_inputs.ipynb
|
|
def _get_len_safe_embeddings(
|
|
self, texts: List[str], *, engine: str, chunk_size: Optional[int] = None
|
|
) -> List[List[float]]:
|
|
embeddings: List[List[float]] = [[] for _ in range(len(texts))]
|
|
try:
|
|
import tiktoken
|
|
|
|
tokens = []
|
|
indices = []
|
|
encoding = tiktoken.model.encoding_for_model(self.model)
|
|
for i, text in enumerate(texts):
|
|
if self.model.endswith("001"):
|
|
# See: https://github.com/openai/openai-python/issues/418#issuecomment-1525939500
|
|
# replace newlines, which can negatively affect performance.
|
|
text = text.replace("\n", " ")
|
|
token = encoding.encode(
|
|
text,
|
|
allowed_special=self.allowed_special,
|
|
disallowed_special=self.disallowed_special,
|
|
)
|
|
for j in range(0, len(token), self.embedding_ctx_length):
|
|
tokens += [token[j : j + self.embedding_ctx_length]]
|
|
indices += [i]
|
|
|
|
batched_embeddings = []
|
|
_chunk_size = chunk_size or self.chunk_size
|
|
for i in range(0, len(tokens), _chunk_size):
|
|
response = embed_with_retry(
|
|
self,
|
|
input=tokens[i : i + _chunk_size],
|
|
engine=self.deployment,
|
|
request_timeout=self.request_timeout,
|
|
headers=self.headers,
|
|
)
|
|
batched_embeddings += [r["embedding"] for r in response["data"]]
|
|
|
|
results: List[List[List[float]]] = [[] for _ in range(len(texts))]
|
|
num_tokens_in_batch: List[List[int]] = [[] for _ in range(len(texts))]
|
|
for i in range(len(indices)):
|
|
results[indices[i]].append(batched_embeddings[i])
|
|
num_tokens_in_batch[indices[i]].append(len(tokens[i]))
|
|
|
|
for i in range(len(texts)):
|
|
_result = results[i]
|
|
if len(_result) == 0:
|
|
average = embed_with_retry(
|
|
self,
|
|
input="",
|
|
engine=self.deployment,
|
|
request_timeout=self.request_timeout,
|
|
headers=self.headers,
|
|
)["data"][0]["embedding"]
|
|
else:
|
|
average = np.average(
|
|
_result, axis=0, weights=num_tokens_in_batch[i]
|
|
)
|
|
embeddings[i] = (average / np.linalg.norm(average)).tolist()
|
|
|
|
return embeddings
|
|
|
|
except ImportError:
|
|
raise ValueError(
|
|
"Could not import tiktoken python package. "
|
|
"This is needed in order to for OpenAIEmbeddings. "
|
|
"Please install it with `pip install tiktoken`."
|
|
)
|
|
|
|
def _embedding_func(self, text: str, *, engine: str) -> List[float]:
|
|
"""Call out to OpenAI's embedding endpoint."""
|
|
# handle large input text
|
|
if len(text) > self.embedding_ctx_length:
|
|
return self._get_len_safe_embeddings([text], engine=engine)[0]
|
|
else:
|
|
if self.model.endswith("001"):
|
|
# See: https://github.com/openai/openai-python/issues/418#issuecomment-1525939500
|
|
# replace newlines, which can negatively affect performance.
|
|
text = text.replace("\n", " ")
|
|
return embed_with_retry(
|
|
self,
|
|
input=[text],
|
|
engine=engine,
|
|
request_timeout=self.request_timeout,
|
|
headers=self.headers,
|
|
)["data"][0]["embedding"]
|
|
|
|
def embed_documents(
|
|
self, texts: List[str], chunk_size: Optional[int] = 0
|
|
) -> List[List[float]]:
|
|
"""Call out to OpenAI's embedding endpoint for embedding search docs.
|
|
|
|
Args:
|
|
texts: The list of texts to embed.
|
|
chunk_size: The chunk size of embeddings. If None, will use the chunk size
|
|
specified by the class.
|
|
|
|
Returns:
|
|
List of embeddings, one for each text.
|
|
"""
|
|
# NOTE: to keep things simple, we assume the list may contain texts longer
|
|
# than the maximum context and use length-safe embedding function.
|
|
return self._get_len_safe_embeddings(texts, engine=self.deployment)
|
|
|
|
def embed_query(self, text: str) -> List[float]:
|
|
"""Call out to OpenAI's embedding endpoint for embedding query text.
|
|
|
|
Args:
|
|
text: The text to embed.
|
|
|
|
Returns:
|
|
Embedding for the text.
|
|
"""
|
|
embedding = self._embedding_func(text, engine=self.deployment)
|
|
return embedding
|