nvidia-trt, nvidia-ai-endpoints: move to repo (#18814)

NVIDIA maintained in https://github.com/langchain-ai/langchain-nvidia
pull/18815/head
Erick Friis 3 months ago committed by GitHub
parent e54a49b697
commit ad29806255
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -24,6 +24,10 @@ jobs:
with:
repository: langchain-ai/langchain-datastax
path: langchain-datastax
- uses: actions/checkout@v4
with:
repository: langchain-ai/langchain-nvidia
path: langchain-nvidia
- name: Set Git config
working-directory: langchain
@ -42,10 +46,14 @@ jobs:
rm -rf \
langchain/libs/partners/google-genai \
langchain/libs/partners/google-vertexai \
langchain/libs/partners/astradb
langchain/libs/partners/astradb \
langchain/libs/partners/nvidia-trt \
langchain/libs/partners/nvidia-ai-endpoints
mv langchain-google/libs/genai langchain/libs/partners/google-genai
mv langchain-google/libs/vertexai langchain/libs/partners/google-vertexai
mv langchain-datastax/libs/astradb langchain/libs/partners/astradb
mv langchain-nvidia/libs/ai-endpoints langchain/libs/partners/nvidia-ai-endpoints
mv langchain-nvidia/libs/trt langchain/libs/partners/nvidia-trt
- name: Set up Python ${{ env.PYTHON_VERSION }} + Poetry ${{ env.POETRY_VERSION }}
uses: "./langchain/.github/actions/poetry_setup"

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2023 LangChain, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

@ -1,62 +0,0 @@
.PHONY: all format lint test tests integration_tests help
# Default target executed when no arguments are given to make.
all: help
# Define a variable for the test file path.
TEST_FILE ?= tests/unit_tests/
test:
poetry run pytest $(TEST_FILE)
tests:
poetry run pytest $(TEST_FILE)
check_imports: $(shell find langchain_nvidia_ai_endpoints -name '*.py')
poetry run python ./scripts/check_imports.py $^
integration_tests:
poetry run pytest tests/integration_tests
######################
# LINTING AND FORMATTING
######################
# Define a variable for Python and notebook files.
PYTHON_FILES=.
MYPY_CACHE=.mypy_cache
lint format: PYTHON_FILES=.
lint_diff format_diff: PYTHON_FILES=$(shell git diff --name-only --diff-filter=d master | grep -E '\.py$$|\.ipynb$$')
lint_package: PYTHON_FILES=langchain_nvidia_ai_endpoints
lint_tests: PYTHON_FILES=tests
lint_tests: MYPY_CACHE=.mypy_cache_test
lint lint_diff lint_package lint_tests:
./scripts/check_pydantic.sh .
./scripts/lint_imports.sh
poetry run ruff .
[ "$(PYTHON_FILES)" = "" ] || poetry run ruff format $(PYTHON_FILES) --diff
[ "$(PYTHON_FILES)" = "" ] || poetry run mypy $(PYTHON_FILES)
format format_diff:
poetry run ruff format $(PYTHON_FILES)
poetry run ruff --select I --fix $(PYTHON_FILES)
spell_check:
poetry run codespell --toml pyproject.toml
spell_fix:
poetry run codespell --toml pyproject.toml -w
######################
# HELP
######################
help:
@echo '----'
@echo 'format - run code formatters'
@echo 'lint - run linters'
@echo 'test - run unit tests'
@echo 'tests - run unit tests'
@echo 'test TEST_FILE=<test_file> - run all tests in file'

@ -1,358 +1,3 @@
# langchain-nvidia-ai-endpoints
The `langchain-nvidia-ai-endpoints` package contains LangChain integrations for chat models and embeddings powered by the [NVIDIA AI Foundation Model](https://www.nvidia.com/en-us/ai-data-science/foundation-models/) playground environment.
> [NVIDIA AI Foundation Endpoints](https://www.nvidia.com/en-us/ai-data-science/foundation-models/) give users easy access to hosted endpoints for generative AI models like Llama-2, SteerLM, Mistral, etc. Using the API, you can query live endpoints available on the [NVIDIA GPU Cloud (NGC)](https://catalog.ngc.nvidia.com/ai-foundation-models) to get quick results from a DGX-hosted cloud compute environment. All models are source-accessible and can be deployed on your own compute cluster.
Below is an example on how to use some common functionality surrounding text-generative and embedding models
## Installation
```python
%pip install -U --quiet langchain-nvidia-ai-endpoints
```
## Setup
**To get started:**
1. Create a free account with the [NVIDIA GPU Cloud](https://catalog.ngc.nvidia.com/) service, which hosts AI solution catalogs, containers, models, etc.
2. Navigate to `Catalog > AI Foundation Models > (Model with API endpoint)`.
3. Select the `API` option and click `Generate Key`.
4. Save the generated key as `NVIDIA_API_KEY`. From there, you should have access to the endpoints.
```python
import getpass
import os
if not os.environ.get("NVIDIA_API_KEY", "").startswith("nvapi-"):
nvidia_api_key = getpass.getpass("Enter your NVIDIA AIPLAY API key: ")
assert nvidia_api_key.startswith("nvapi-"), f"{nvidia_api_key[:5]}... is not a valid key"
os.environ["NVIDIA_API_KEY"] = nvidia_api_key
```
```python
## Core LC Chat Interface
from langchain_nvidia_ai_endpoints import ChatNVIDIA
llm = ChatNVIDIA(model="mixtral_8x7b")
result = llm.invoke("Write a ballad about LangChain.")
print(result.content)
```
## Stream, Batch, and Async
These models natively support streaming, and as is the case with all LangChain LLMs they expose a batch method to handle concurrent requests, as well as async methods for invoke, stream, and batch. Below are a few examples.
```python
print(llm.batch(["What's 2*3?", "What's 2*6?"]))
# Or via the async API
# await llm.abatch(["What's 2*3?", "What's 2*6?"])
```
```python
for chunk in llm.stream("How far can a seagull fly in one day?"):
# Show the token separations
print(chunk.content, end="|")
```
```python
async for chunk in llm.astream("How long does it take for monarch butterflies to migrate?"):
print(chunk.content, end="|")
```
## Supported models
Querying `available_models` will still give you all of the other models offered by your API credentials.
The `playground_` prefix is optional.
```python
list(llm.available_models)
# ['playground_llama2_13b',
# 'playground_llama2_code_13b',
# 'playground_clip',
# 'playground_fuyu_8b',
# 'playground_mistral_7b',
# 'playground_nvolveqa_40k',
# 'playground_yi_34b',
# 'playground_nemotron_steerlm_8b',
# 'playground_nv_llama2_rlhf_70b',
# 'playground_llama2_code_34b',
# 'playground_mixtral_8x7b',
# 'playground_neva_22b',
# 'playground_steerlm_llama_70b',
# 'playground_nemotron_qa_8b',
# 'playground_sdxl']
```
## Model types
All of these models above are supported and can be accessed via `ChatNVIDIA`.
Some model types support unique prompting techniques and chat messages. We will review a few important ones below.
**To find out more about a specific model, please navigate to the API section of an AI Foundation Model [as linked here](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/ai-foundation/models/codellama-13b/api).**
### General Chat
Models such as `llama2_13b` and `mixtral_8x7b` are good all-around models that you can use for with any LangChain chat messages. Example below.
```python
from langchain_nvidia_ai_endpoints import ChatNVIDIA
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_messages(
[
("system", "You are a helpful AI assistant named Fred."),
("user", "{input}")
]
)
chain = (
prompt
| ChatNVIDIA(model="llama2_13b")
| StrOutputParser()
)
for txt in chain.stream({"input": "What's your name?"}):
print(txt, end="")
```
### Code Generation
These models accept the same arguments and input structure as regular chat models, but they tend to perform better on code-genreation and structured code tasks. An example of this is `llama2_code_13b`.
```python
prompt = ChatPromptTemplate.from_messages(
[
("system", "You are an expert coding AI. Respond only in valid python; no narration whatsoever."),
("user", "{input}")
]
)
chain = (
prompt
| ChatNVIDIA(model="llama2_code_13b")
| StrOutputParser()
)
for txt in chain.stream({"input": "How do I solve this fizz buzz problem?"}):
print(txt, end="")
```
## Steering LLMs
> [SteerLM-optimized models](https://developer.nvidia.com/blog/announcing-steerlm-a-simple-and-practical-technique-to-customize-llms-during-inference/) supports "dynamic steering" of model outputs at inference time.
This lets you "control" the complexity, verbosity, and creativity of the model via integer labels on a scale from 0 to 9. Under the hood, these are passed as a special type of assistant message to the model.
The "steer" models support this type of input, such as `steerlm_llama_70b`
```python
from langchain_nvidia_ai_endpoints import ChatNVIDIA
llm = ChatNVIDIA(model="steerlm_llama_70b")
# Try making it uncreative and not verbose
complex_result = llm.invoke(
"What's a PB&J?",
labels={"creativity": 0, "complexity": 3, "verbosity": 0}
)
print("Un-creative\n")
print(complex_result.content)
# Try making it very creative and verbose
print("\n\nCreative\n")
creative_result = llm.invoke(
"What's a PB&J?",
labels={"creativity": 9, "complexity": 3, "verbosity": 9}
)
print(creative_result.content)
```
#### Use within LCEL
The labels are passed as invocation params. You can `bind` these to the LLM using the `bind` method on the LLM to include it within a declarative, functional chain. Below is an example.
```python
from langchain_nvidia_ai_endpoints import ChatNVIDIA
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_messages(
[
("system", "You are a helpful AI assistant named Fred."),
("user", "{input}")
]
)
chain = (
prompt
| ChatNVIDIA(model="steerlm_llama_70b").bind(labels={"creativity": 9, "complexity": 0, "verbosity": 9})
| StrOutputParser()
)
for txt in chain.stream({"input": "Why is a PB&J?"}):
print(txt, end="")
```
## Multimodal
NVIDIA also supports multimodal inputs, meaning you can provide both images and text for the model to reason over.
These models also accept `labels`, similar to the Steering LLMs above. In addition to `creativity`, `complexity`, and `verbosity`, these models support a `quality` toggle.
An example model supporting multimodal inputs is `playground_neva_22b`.
These models accept LangChain's standard image formats. Below are examples.
```python
import requests
image_url = "https://picsum.photos/seed/kitten/300/200"
image_content = requests.get(image_url).content
```
Initialize the model like so:
```python
from langchain_nvidia_ai_endpoints import ChatNVIDIA
llm = ChatNVIDIA(model="playground_neva_22b")
```
#### Passing an image as a URL
```python
from langchain_core.messages import HumanMessage
llm.invoke(
[
HumanMessage(content=[
{"type": "text", "text": "Describe this image:"},
{"type": "image_url", "image_url": {"url": image_url}},
])
])
```
```python
### You can specify the labels for steering here as well. You can try setting a low verbosity, for instance
from langchain_core.messages import HumanMessage
llm.invoke(
[
HumanMessage(content=[
{"type": "text", "text": "Describe this image:"},
{"type": "image_url", "image_url": {"url": image_url}},
])
],
labels={
"creativity": 0,
"quality": 9,
"complexity": 0,
"verbosity": 0
}
)
```
#### Passing an image as a base64 encoded string
```python
import base64
b64_string = base64.b64encode(image_content).decode('utf-8')
llm.invoke(
[
HumanMessage(content=[
{"type": "text", "text": "Describe this image:"},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64_string}"}},
])
])
```
#### Directly within the string
The NVIDIA API uniquely accepts images as base64 images inlined within <img> HTML tags. While this isn't interoperable with other LLMs, you can directly prompt the model accordingly.
```python
base64_with_mime_type = f"data:image/png;base64,{b64_string}"
llm.invoke(
f'What\'s in this image?\n<img src="{base64_with_mime_type}" />'
)
```
## RAG: Context models
NVIDIA also has Q&A models that support a special "context" chat message containing retrieved context (such as documents within a RAG chain). This is useful to avoid prompt-injecting the model.
**Note:** Only "user" (human) and "context" chat messages are supported for these models, not system or AI messages useful in conversational flows.
The `_qa_` models like `nemotron_qa_8b` support this.
```python
from langchain_nvidia_ai_endpoints import ChatNVIDIA
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.messages import ChatMessage
prompt = ChatPromptTemplate.from_messages(
[
ChatMessage(role="context", content="Parrots and Cats have signed the peace accord."),
("user", "{input}")
]
)
llm = ChatNVIDIA(model="nemotron_qa_8b")
chain = (
prompt
| llm
| StrOutputParser()
)
chain.invoke({"input": "What was signed?"})
```
## Embeddings
You can also connect to embeddings models through this package. Below is an example:
```
from langchain_nvidia_ai_endpoints import NVIDIAEmbeddings
embedder = NVIDIAEmbeddings(model="nvolveqa_40k")
embedder.embed_query("What's the temperature today?")
embedder.embed_documents([
"The temperature is 42 degrees.",
"Class is dismissed at 9 PM."
])
```
By default the embedding model will use the "passage" type for documents and "query" type for queries, but you can fix this on the instance.
```python
query_embedder = NVIDIAEmbeddings(model="nvolveqa_40k", model_type="query")
doc_embeddder = NVIDIAEmbeddings(model="nvolveqa_40k", model_type="passage")
```
This package has moved!
https://github.com/langchain-ai/langchain-nvidia/tree/main/libs/ai-endpoints

@ -1,45 +0,0 @@
"""
**LangChain NVIDIA AI Foundation Model Playground Integration**
This comprehensive module integrates NVIDIA's state-of-the-art AI Foundation Models, featuring advanced models for conversational AI and semantic embeddings, into the LangChain framework. It provides robust classes for seamless interaction with NVIDIA's AI models, particularly tailored for enriching conversational experiences and enhancing semantic understanding in various applications.
**Features:**
1. **Chat Models (`ChatNVIDIA`):** This class serves as the primary interface for interacting with NVIDIA's Foundation chat models. Users can effortlessly utilize NVIDIA's advanced models like 'Mistral' to engage in rich, context-aware conversations, applicable across diverse domains from customer support to interactive storytelling.
2. **Semantic Embeddings (`NVIDIAEmbeddings`):** The module offers capabilities to generate sophisticated embeddings using NVIDIA's AI models. These embeddings are instrumental for tasks like semantic analysis, text similarity assessments, and contextual understanding, significantly enhancing the depth of NLP applications.
**Installation:**
Install this module easily using pip:
```python
pip install langchain-nvidia-ai-endpoints
```
## Utilizing Chat Models:
After setting up the environment, interact with NVIDIA AI Foundation models:
```python
from langchain_nvidia_ai_endpoints import ChatNVIDIA
ai_chat_model = ChatNVIDIA(model="llama2_13b")
response = ai_chat_model.invoke("Tell me about the LangChain integration.")
```
# Generating Semantic Embeddings:
Use NVIDIA's models for creating embeddings, useful in various NLP tasks:
```python
from langchain_nvidia_ai_endpoints import NVIDIAEmbeddings
embed_model = NVIDIAEmbeddings(model="nvolveqa_40k")
embedding_output = embed_model.embed_query("Exploring AI capabilities.")
```
""" # noqa: E501
from langchain_nvidia_ai_endpoints.chat_models import ChatNVIDIA
from langchain_nvidia_ai_endpoints.embeddings import NVIDIAEmbeddings
__all__ = ["ChatNVIDIA", "NVIDIAEmbeddings"]

@ -1,482 +0,0 @@
from __future__ import annotations
import json
import logging
import time
from functools import partial
from typing import (
Any,
AsyncIterator,
Callable,
Dict,
Generator,
Iterator,
List,
Optional,
Sequence,
Tuple,
Union,
)
import aiohttp
import requests
from langchain_core.pydantic_v1 import (
BaseModel,
Field,
PrivateAttr,
SecretStr,
root_validator,
)
from langchain_core.utils import get_from_dict_or_env
from requests.models import Response
logger = logging.getLogger(__name__)
class NVEModel(BaseModel):
"""
Underlying Client for interacting with the AI Foundation Model Function API.
Leveraged by the NVIDIABaseModel to provide a simple requests-oriented interface.
Direct abstraction over NGC-recommended streaming/non-streaming Python solutions.
NOTE: Models in the playground does not currently support raw text continuation.
"""
## Core defaults. These probably should not be changed
fetch_url_format: str = Field("https://api.nvcf.nvidia.com/v2/nvcf/pexec/status/")
call_invoke_base: str = Field("https://api.nvcf.nvidia.com/v2/nvcf/pexec/functions")
func_list_format: str = Field("https://api.nvcf.nvidia.com/v2/nvcf/functions")
get_session_fn: Callable = Field(requests.Session)
get_asession_fn: Callable = Field(aiohttp.ClientSession)
nvidia_api_key: SecretStr = Field(
...,
description="API key for NVIDIA Foundation Endpoints. Starts with `nvapi-`",
)
is_staging: bool = Field(False, description="Whether to use staging API")
## Generation arguments
timeout: float = Field(60, ge=0, description="Timeout for waiting on response (s)")
interval: float = Field(0.02, ge=0, description="Interval for pulling response")
last_inputs: dict = Field({}, description="Last inputs sent over to the server")
payload_fn: Callable = Field(lambda d: d, description="Function to process payload")
headers_tmpl: dict = Field(
...,
description="Headers template for API calls."
" Should contain `call` and `stream` keys.",
)
_available_functions: Optional[List[dict]] = PrivateAttr(default=None)
_available_models: Optional[dict] = PrivateAttr(default=None)
@property
def headers(self) -> dict:
"""Return headers with API key injected"""
headers_ = self.headers_tmpl.copy()
for header in headers_.values():
if "{nvidia_api_key}" in header["Authorization"]:
header["Authorization"] = header["Authorization"].format(
nvidia_api_key=self.nvidia_api_key.get_secret_value(),
)
return headers_
@root_validator(pre=True)
def validate_model(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and update model arguments, including API key and formatting"""
values["nvidia_api_key"] = get_from_dict_or_env(
values,
"nvidia_api_key",
"NVIDIA_API_KEY",
)
if "nvapi-" not in values.get("nvidia_api_key", ""):
raise ValueError("Invalid NVAPI key detected. Should start with `nvapi-`")
values["is_staging"] = "nvapi-stg-" in values["nvidia_api_key"]
if "headers_tmpl" not in values:
call_kvs = {
"Accept": "application/json",
}
stream_kvs = {
"Accept": "text/event-stream",
"content-type": "application/json",
}
shared_kvs = {
"Authorization": "Bearer {nvidia_api_key}",
"User-Agent": "langchain-nvidia-ai-endpoints",
}
values["headers_tmpl"] = {
"call": {**call_kvs, **shared_kvs},
"stream": {**stream_kvs, **shared_kvs},
}
return values
@root_validator(pre=False)
def validate_model_post(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Additional validation after default values have been put in"""
values["stagify"] = partial(cls._stagify, is_staging=values["is_staging"])
values["fetch_url_format"] = values["stagify"](values.get("fetch_url_format"))
values["call_invoke_base"] = values["stagify"](values.get("call_invoke_base"))
return values
@property
def available_models(self) -> dict:
"""List the available models that can be invoked."""
if self._available_models is not None:
return self._available_models
live_fns = [v for v in self.available_functions if v.get("status") == "ACTIVE"]
self._available_models = {v["name"]: v["id"] for v in live_fns}
return self._available_models
@property
def available_functions(self) -> List[dict]:
"""List the available functions that can be invoked."""
if self._available_functions is not None:
return self._available_functions
invoke_url = self._stagify(self.func_list_format, self.is_staging)
query_res = self.query(invoke_url)
if "functions" not in query_res:
raise ValueError(
f"Unexpected response when querying {invoke_url}\n{query_res}"
)
self._available_functions = query_res["functions"]
return self._available_functions
@staticmethod
def _stagify(path: str, is_staging: bool) -> str:
"""Helper method to switch between staging and production endpoints"""
if is_staging and "stg.api" not in path:
return path.replace("api.", "stg.api.")
if not is_staging and "stg.api" in path:
return path.replace("stg.api.", "api.")
return path
####################################################################################
## Core utilities for posting and getting from NV Endpoints
def _post(self, invoke_url: str, payload: dict = {}) -> Tuple[Response, Any]:
"""Method for posting to the AI Foundation Model Function API."""
self.last_inputs = {
"url": invoke_url,
"headers": self.headers["call"],
"json": self.payload_fn(payload),
"stream": False,
}
session = self.get_session_fn()
response = session.post(**self.last_inputs)
self._try_raise(response)
return response, session
def _get(self, invoke_url: str, payload: dict = {}) -> Tuple[Response, Any]:
"""Method for getting from the AI Foundation Model Function API."""
self.last_inputs = {
"url": invoke_url,
"headers": self.headers["call"],
"json": self.payload_fn(payload),
"stream": False,
}
session = self.get_session_fn()
last_response = session.get(**self.last_inputs)
self._try_raise(last_response)
return last_response, session
def _wait(self, response: Response, session: Any) -> Response:
"""Wait for a response from API after an initial response is made"""
start_time = time.time()
while response.status_code == 202:
time.sleep(self.interval)
if (time.time() - start_time) > self.timeout:
raise TimeoutError(
f"Timeout reached without a successful response."
f"\nLast response: {str(response)}"
)
request_id = response.headers.get("NVCF-REQID", "")
response = session.get(
self.fetch_url_format + request_id,
headers=self.headers["call"],
)
self._try_raise(response)
return response
def _try_raise(self, response: Response) -> None:
"""Try to raise an error from a response"""
## (VK) Several systems can throw errors. This tries to coerce all of them
## If we can't predictably pull out request id, then dump response
try:
response.raise_for_status()
except requests.HTTPError:
try:
rd = response.json()
if "detail" in rd and "reqId" in rd.get("detail", ""):
rd_buf = "- " + str(rd["detail"])
rd_buf = rd_buf.replace(": ", ", Error: ").replace(", ", "\n- ")
rd["detail"] = rd_buf
except json.JSONDecodeError:
rd = response.__dict__
rd = rd.get("_content", rd)
if isinstance(rd, bytes):
rd = rd.decode("utf-8")[5:] ## remove "data:" prefix
try:
rd = json.loads(rd)
except Exception:
rd = {"detail": rd}
status = rd.get("status", "###")
title = rd.get("title", rd.get("error", "Unknown Error"))
header = f"[{status}] {title}"
body = ""
if "requestId" in rd:
if "detail" in rd:
body += f"{rd['detail']}\n"
body += "RequestID: " + rd["requestId"]
else:
body = rd.get("detail", rd)
if str(status) == "401":
body += "\nPlease check or regenerate your API key."
raise Exception(f"{header}\n{body}") from None
####################################################################################
## Simple query interface to show the set of model options
def query(self, invoke_url: str, payload: dict = {}) -> dict:
"""Simple method for an end-to-end get query. Returns result dictionary"""
response, session = self._get(invoke_url, payload)
response = self._wait(response, session)
output = self._process_response(response)[0]
return output
def _process_response(self, response: Union[str, Response]) -> List[dict]:
"""General-purpose response processing for single responses and streams"""
if hasattr(response, "json"): ## For single response (i.e. non-streaming)
try:
return [response.json()]
except json.JSONDecodeError:
response = str(response.__dict__)
if isinstance(response, str): ## For set of responses (i.e. streaming)
msg_list = []
for msg in response.split("\n\n"):
if "{" not in msg:
continue
msg_list += [json.loads(msg[msg.find("{") :])]
return msg_list
raise ValueError(f"Received ill-formed response: {response}")
def _get_invoke_url(
self, model_name: Optional[str] = None, invoke_url: Optional[str] = None
) -> str:
"""Helper method to get invoke URL from a model name, URL, or endpoint stub"""
if not invoke_url:
if not model_name:
raise ValueError("URL or model name must be specified to invoke")
if model_name in self.available_models:
invoke_url = self.available_models[model_name]
elif f"playground_{model_name}" in self.available_models:
invoke_url = self.available_models[f"playground_{model_name}"]
else:
available_models_str = "\n".join(
[f"{k} - {v}" for k, v in self.available_models.items()]
)
raise ValueError(
f"Unknown model name {model_name} specified."
"\nAvailable models are:\n"
f"{available_models_str}"
)
if not invoke_url:
# For mypy
raise ValueError("URL or model name must be specified to invoke")
# Why is this even needed?
if "http" not in invoke_url:
invoke_url = f"{self.call_invoke_base}/{invoke_url}"
return invoke_url
####################################################################################
## Generation interface to allow users to generate new values from endpoints
def get_req(
self,
model_name: Optional[str] = None,
payload: dict = {},
invoke_url: Optional[str] = None,
stop: Optional[Sequence[str]] = None,
) -> Response:
"""Post to the API."""
invoke_url = self._get_invoke_url(model_name, invoke_url)
if payload.get("stream", False) is True:
payload = {**payload, "stream": False}
response, session = self._post(invoke_url, payload)
return self._wait(response, session)
def get_req_generation(
self,
model_name: Optional[str] = None,
payload: dict = {},
invoke_url: Optional[str] = None,
stop: Optional[Sequence[str]] = None,
) -> dict:
"""Method for an end-to-end post query with NVE post-processing."""
response = self.get_req(model_name, payload, invoke_url)
output, _ = self.postprocess(response, stop=stop)
return output
def postprocess(
self, response: Union[str, Response], stop: Optional[Sequence[str]] = None
) -> Tuple[dict, bool]:
"""Parses a response from the AI Foundation Model Function API.
Strongly assumes that the API will return a single response.
"""
msg_list = self._process_response(response)
msg, is_stopped = self._aggregate_msgs(msg_list)
msg, is_stopped = self._early_stop_msg(msg, is_stopped, stop=stop)
return msg, is_stopped
def _aggregate_msgs(self, msg_list: Sequence[dict]) -> Tuple[dict, bool]:
"""Dig out relevant details of aggregated message"""
content_buffer: Dict[str, Any] = dict()
content_holder: Dict[Any, Any] = dict()
is_stopped = False
for msg in msg_list:
if "choices" in msg:
## Tease out ['choices'][0]...['delta'/'message']
msg = msg.get("choices", [{}])[0]
is_stopped = msg.get("finish_reason", "") == "stop"
msg = msg.get("delta", msg.get("message", {"content": ""}))
elif "data" in msg:
## Tease out ['data'][0]...['embedding']
msg = msg.get("data", [{}])[0]
content_holder = msg
for k, v in msg.items():
if k in ("content",) and k in content_buffer:
content_buffer[k] += v
else:
content_buffer[k] = v
if is_stopped:
break
content_holder = {**content_holder, **content_buffer}
return content_holder, is_stopped
def _early_stop_msg(
self, msg: dict, is_stopped: bool, stop: Optional[Sequence[str]] = None
) -> Tuple[dict, bool]:
"""Try to early-terminate streaming or generation by iterating over stop list"""
content = msg.get("content", "")
if content and stop:
for stop_str in stop:
if stop_str and stop_str in content:
msg["content"] = content[: content.find(stop_str) + 1]
is_stopped = True
return msg, is_stopped
####################################################################################
## Streaming interface to allow you to iterate through progressive generations
def get_req_stream(
self,
model: Optional[str] = None,
payload: dict = {},
invoke_url: Optional[str] = None,
stop: Optional[Sequence[str]] = None,
) -> Iterator:
invoke_url = self._get_invoke_url(model, invoke_url)
if payload.get("stream", True) is False:
payload = {**payload, "stream": True}
self.last_inputs = {
"url": invoke_url,
"headers": self.headers["stream"],
"json": payload,
"stream": True,
}
response = self.get_session_fn().post(**self.last_inputs)
self._try_raise(response)
call = self.copy()
def out_gen() -> Generator[dict, Any, Any]:
## Good for client, since it allows self.last_inputs
for line in response.iter_lines():
if line and line.strip() != b"data: [DONE]":
line = line.decode("utf-8")
msg, final_line = call.postprocess(line, stop=stop)
yield msg
if final_line:
break
self._try_raise(response)
return (r for r in out_gen())
####################################################################################
## Asynchronous streaming interface to allow multiple generations to happen at once.
async def get_req_astream(
self,
model: Optional[str] = None,
payload: dict = {},
invoke_url: Optional[str] = None,
stop: Optional[Sequence[str]] = None,
) -> AsyncIterator:
invoke_url = self._get_invoke_url(model, invoke_url)
if payload.get("stream", True) is False:
payload = {**payload, "stream": True}
self.last_inputs = {
"url": invoke_url,
"headers": self.headers["stream"],
"json": payload,
}
async with self.get_asession_fn() as session:
async with session.post(**self.last_inputs) as response:
self._try_raise(response)
async for line in response.content.iter_any():
if line and line.strip() != b"data: [DONE]":
line = line.decode("utf-8")
msg, final_line = self.postprocess(line, stop=stop)
yield msg
if final_line:
break
class _NVIDIAClient(BaseModel):
"""
Higher-Level AI Foundation Model Function API Client with argument defaults.
Is subclassed by ChatNVIDIA to provide a simple LangChain interface.
"""
client: NVEModel = Field(NVEModel)
model: str = Field(..., description="Name of the model to invoke")
####################################################################################
@root_validator(pre=True)
def validate_client(cls, values: Any) -> Any:
"""Validate and update client arguments, including API key and formatting"""
if not values.get("client"):
values["client"] = NVEModel(**values)
return values
@classmethod
def is_lc_serializable(cls) -> bool:
return True
@property
def available_functions(self) -> List[dict]:
"""Map the available functions that can be invoked."""
return self.client.available_functions
@property
def available_models(self) -> dict:
"""Map the available models that can be invoked."""
return self.client.available_models
@staticmethod
def get_available_functions(**kwargs: Any) -> List[dict]:
"""Map the available functions that can be invoked. Callable from class"""
return NVEModel(**kwargs).available_functions
@staticmethod
def get_available_models(**kwargs: Any) -> dict:
"""Map the available models that can be invoked. Callable from class"""
return NVEModel(**kwargs).available_models
def get_model_details(self, model: Optional[str] = None) -> dict:
"""Get more meta-details about a model retrieved by a given name"""
if model is None:
model = self.model
model_key = self.client._get_invoke_url(model).split("/")[-1]
known_fns = self.client.available_functions
fn_spec = [f for f in known_fns if f.get("id") == model_key][0]
return fn_spec

@ -1,307 +0,0 @@
"""Chat Model Components Derived from ChatModel/NVIDIA"""
from __future__ import annotations
import base64
import io
import logging
import os
import sys
import urllib.parse
from typing import (
Any,
AsyncIterator,
Dict,
Iterator,
List,
Mapping,
Optional,
Sequence,
Union,
)
import requests
from langchain_core.callbacks.manager import (
AsyncCallbackManagerForLLMRun,
CallbackManagerForLLMRun,
)
from langchain_core.language_models.chat_models import SimpleChatModel
from langchain_core.messages import BaseMessage, ChatMessage, ChatMessageChunk
from langchain_core.outputs import ChatGenerationChunk
from langchain_core.pydantic_v1 import Field
from langchain_nvidia_ai_endpoints import _common as nvidia_ai_endpoints
try:
import PIL.Image
has_pillow = True
except ImportError:
has_pillow = False
logger = logging.getLogger(__name__)
def _is_openai_parts_format(part: dict) -> bool:
return "type" in part
def _is_url(s: str) -> bool:
try:
result = urllib.parse.urlparse(s)
return all([result.scheme, result.netloc])
except Exception as e:
logger.debug(f"Unable to parse URL: {e}")
return False
def _is_b64(s: str) -> bool:
return s.startswith("data:image")
def _resize_image(img_data: bytes, max_dim: int = 1024) -> str:
if not has_pillow:
print( # noqa: T201
"Pillow is required to resize images down to reasonable scale."
" Please install it using `pip install pillow`."
" For now, not resizing; may cause NVIDIA API to fail."
)
return base64.b64encode(img_data).decode("utf-8")
image = PIL.Image.open(io.BytesIO(img_data))
max_dim_size = max(image.size)
aspect_ratio = max_dim / max_dim_size
new_h = int(image.size[1] * aspect_ratio)
new_w = int(image.size[0] * aspect_ratio)
resized_image = image.resize((new_w, new_h), PIL.Image.Resampling.LANCZOS)
output_buffer = io.BytesIO()
resized_image.save(output_buffer, format="JPEG")
output_buffer.seek(0)
resized_b64_string = base64.b64encode(output_buffer.read()).decode("utf-8")
return resized_b64_string
def _url_to_b64_string(image_source: str) -> str:
b64_template = "data:image/png;base64,{b64_string}"
try:
if _is_url(image_source):
response = requests.get(image_source)
response.raise_for_status()
encoded = base64.b64encode(response.content).decode("utf-8")
if sys.getsizeof(encoded) > 200000:
## (VK) Temporary fix. NVIDIA API has a limit of 250KB for the input.
encoded = _resize_image(response.content)
return b64_template.format(b64_string=encoded)
elif _is_b64(image_source):
return image_source
elif os.path.exists(image_source):
with open(image_source, "rb") as f:
encoded = base64.b64encode(f.read()).decode("utf-8")
return b64_template.format(b64_string=encoded)
else:
raise ValueError(
"The provided string is not a valid URL, base64, or file path."
)
except Exception as e:
raise ValueError(f"Unable to process the provided image source: {e}")
class ChatNVIDIA(nvidia_ai_endpoints._NVIDIAClient, SimpleChatModel):
"""NVIDIA chat model.
Example:
.. code-block:: python
from langchain_nvidia_ai_endpoints import ChatNVIDIA
model = ChatNVIDIA(model="llama2_13b")
response = model.invoke("Hello")
"""
temperature: Optional[float] = Field(description="Sampling temperature in [0, 1]")
max_tokens: Optional[int] = Field(description="Maximum # of tokens to generate")
top_p: Optional[float] = Field(description="Top-p for distribution sampling")
seed: Optional[int] = Field(description="The seed for deterministic results")
bad: Optional[Sequence[str]] = Field(description="Bad words to avoid (cased)")
stop: Optional[Sequence[str]] = Field(description="Stop words (cased)")
labels: Optional[Dict[str, float]] = Field(description="Steering parameters")
@property
def _llm_type(self) -> str:
"""Return type of NVIDIA AI Foundation Model Interface."""
return "chat-nvidia-ai-playground"
def _call(
self,
messages: List[BaseMessage],
stop: Optional[Sequence[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
"""Invoke on a single list of chat messages."""
inputs = self.custom_preprocess(messages)
responses = self.get_generation(inputs=inputs, stop=stop, **kwargs)
outputs = self.custom_postprocess(responses)
return outputs
def _get_filled_chunk(
self, text: str, role: Optional[str] = "assistant"
) -> ChatGenerationChunk:
"""Fill the generation chunk."""
return ChatGenerationChunk(message=ChatMessageChunk(content=text, role=role))
def _stream(
self,
messages: List[BaseMessage],
stop: Optional[Sequence[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
"""Allows streaming to model!"""
inputs = self.custom_preprocess(messages)
for response in self.get_stream(inputs=inputs, stop=stop, **kwargs):
chunk = self._get_filled_chunk(self.custom_postprocess(response))
if run_manager:
run_manager.on_llm_new_token(chunk.text, chunk=chunk)
yield chunk
async def _astream(
self,
messages: List[BaseMessage],
stop: Optional[Sequence[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> AsyncIterator[ChatGenerationChunk]:
inputs = self.custom_preprocess(messages)
async for response in self.get_astream(inputs=inputs, stop=stop, **kwargs):
chunk = self._get_filled_chunk(self.custom_postprocess(response))
if run_manager:
await run_manager.on_llm_new_token(chunk.text, chunk=chunk)
yield chunk
def custom_preprocess(
self, msg_list: Sequence[BaseMessage]
) -> List[Dict[str, str]]:
return [self.preprocess_msg(m) for m in msg_list]
def _process_content(self, content: Union[str, List[Union[dict, str]]]) -> str:
if isinstance(content, str):
return content
string_array: list = []
for part in content:
if isinstance(part, str):
string_array.append(part)
elif isinstance(part, Mapping):
# OpenAI Format
if _is_openai_parts_format(part):
if part["type"] == "text":
string_array.append(str(part["text"]))
elif part["type"] == "image_url":
img_url = part["image_url"]
if isinstance(img_url, dict):
if "url" not in img_url:
raise ValueError(
f"Unrecognized message image format: {img_url}"
)
img_url = img_url["url"]
b64_string = _url_to_b64_string(img_url)
string_array.append(f'<img src="{b64_string}" />')
else:
raise ValueError(
f"Unrecognized message part type: {part['type']}"
)
else:
raise ValueError(f"Unrecognized message part format: {part}")
return "".join(string_array)
def preprocess_msg(self, msg: BaseMessage) -> Dict[str, str]:
if isinstance(msg, BaseMessage):
role_convert = {"ai": "assistant", "human": "user"}
if isinstance(msg, ChatMessage):
role = msg.role
else:
role = msg.type
role = role_convert.get(role, role)
content = self._process_content(msg.content)
return {"role": role, "content": content}
raise ValueError(f"Invalid message: {repr(msg)} of type {type(msg)}")
def custom_postprocess(self, msg: dict) -> str:
if "content" in msg:
return msg["content"]
elif "b64_json" in msg:
return msg["b64_json"]
return str(msg)
######################################################################################
## Core client-side interfaces
def get_generation(
self,
inputs: Sequence[Dict],
**kwargs: Any,
) -> dict:
"""Call to client generate method with call scope"""
stop = kwargs.get("stop", None)
payload = self.get_payload(inputs=inputs, stream=False, **kwargs)
out = self.client.get_req_generation(self.model, stop=stop, payload=payload)
return out
def get_stream(
self,
inputs: Sequence[Dict],
**kwargs: Any,
) -> Iterator:
"""Call to client stream method with call scope"""
stop = kwargs.get("stop", None)
payload = self.get_payload(inputs=inputs, stream=True, **kwargs)
return self.client.get_req_stream(self.model, stop=stop, payload=payload)
def get_astream(
self,
inputs: Sequence[Dict],
**kwargs: Any,
) -> AsyncIterator:
"""Call to client astream methods with call scope"""
stop = kwargs.get("stop", None)
payload = self.get_payload(inputs=inputs, stream=True, **kwargs)
return self.client.get_req_astream(self.model, stop=stop, payload=payload)
def get_payload(self, inputs: Sequence[Dict], **kwargs: Any) -> dict:
"""Generates payload for the _NVIDIAClient API to send to service."""
attr_kwargs = {
"temperature": self.temperature,
"max_tokens": self.max_tokens,
"top_p": self.top_p,
"seed": self.seed,
"bad": self.bad,
"stop": self.stop,
"labels": self.labels,
}
attr_kwargs = {k: v for k, v in attr_kwargs.items() if v is not None}
new_kwargs = {**attr_kwargs, **kwargs}
return self.prep_payload(inputs=inputs, **new_kwargs)
def prep_payload(self, inputs: Sequence[Dict], **kwargs: Any) -> dict:
"""Prepares a message or list of messages for the payload"""
messages = [self.prep_msg(m) for m in inputs]
if kwargs.get("labels"):
# (WFH) Labels are currently (?) always passed as an assistant
# suffix message, but this API seems less stable.
messages += [{"labels": kwargs.pop("labels"), "role": "assistant"}]
if kwargs.get("stop") is None:
kwargs.pop("stop")
return {"messages": messages, **kwargs}
def prep_msg(self, msg: Union[str, dict, BaseMessage]) -> dict:
"""Helper Method: Ensures a message is a dictionary with a role and content."""
if isinstance(msg, str):
# (WFH) this shouldn't ever be reached but leaving this here bcs
# it's a Chesterton's fence I'm unwilling to touch
return dict(role="user", content=msg)
if isinstance(msg, dict):
if msg.get("content", None) is None:
raise ValueError(f"Message {msg} has no content")
return msg
raise ValueError(f"Unknown message received: {msg} of type {type(msg)}")

@ -1,59 +0,0 @@
"""Embeddings Components Derived from NVEModel/Embeddings"""
from typing import List, Literal, Optional
from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import Field
from langchain_nvidia_ai_endpoints._common import _NVIDIAClient
class NVIDIAEmbeddings(_NVIDIAClient, Embeddings):
"""NVIDIA's AI Foundation Retriever Question-Answering Asymmetric Model."""
max_length: int = Field(2048, ge=1, le=2048)
max_batch_size: int = Field(default=50)
model_type: Optional[Literal["passage", "query"]] = Field(
"passage", description="The type of text to be embedded."
)
def _embed(
self, texts: List[str], model_type: Literal["passage", "query"]
) -> List[List[float]]:
"""Embed a single text entry to either passage or query type"""
response = self.client.get_req(
model_name=self.model,
payload={
"input": texts,
"model": model_type,
"encoding_format": "float",
},
)
response.raise_for_status()
result = response.json()
data = result["data"]
if not isinstance(data, list):
raise ValueError(f"Expected a list of embeddings. Got: {data}")
embedding_list = [(res["embedding"], res["index"]) for res in data]
return [x[0] for x in sorted(embedding_list, key=lambda x: x[1])]
def embed_query(self, text: str) -> List[float]:
"""Input pathway for query embeddings."""
return self._embed([text], model_type=self.model_type or "query")[0]
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Input pathway for document embeddings."""
# From https://catalog.ngc.nvidia.com/orgs/nvidia/teams/ai-foundation/models/nvolve-40k/documentation
# The input must not exceed the 2048 max input characters and inputs above 512
# model tokens will be truncated. The input array must not exceed 50 input
# strings.
all_embeddings = []
for i in range(0, len(texts), self.max_batch_size):
batch = texts[i : i + self.max_batch_size]
truncated = [
text[: self.max_length] if len(text) > self.max_length else text
for text in batch
]
all_embeddings.extend(
self._embed(truncated, model_type=self.model_type or "passage")
)
return all_embeddings

File diff suppressed because it is too large Load Diff

@ -1,96 +0,0 @@
[tool.poetry]
name = "langchain-nvidia-ai-endpoints"
version = "0.0.3"
description = "An integration package connecting NVIDIA AI Endpoints and LangChain"
authors = []
readme = "README.md"
repository = "https://github.com/langchain-ai/langchain"
license = "MIT"
[tool.poetry.urls]
"Source Code" = "https://github.com/langchain-ai/langchain/tree/master/libs/partners/nvidia-ai-endpoints"
[tool.poetry.dependencies]
python = ">=3.8.1,<4.0"
langchain-core = "^0.1"
aiohttp = "^3.9.1"
[tool.poetry.group.test]
optional = true
[tool.poetry.group.test.dependencies]
pytest = "^7.3.0"
freezegun = "^1.2.2"
pytest-mock = "^3.10.0"
syrupy = "^4.0.2"
pytest-watcher = "^0.3.4"
pytest-asyncio = "^0.21.1"
langchain-core = { path = "../../core", develop = true }
[tool.poetry.group.codespell]
optional = true
[tool.poetry.group.codespell.dependencies]
codespell = "^2.2.0"
[tool.poetry.group.test_integration]
optional = true
[tool.poetry.group.test_integration.dependencies]
[tool.poetry.group.lint]
optional = true
[tool.poetry.group.lint.dependencies]
ruff = "^0.1.5"
[tool.poetry.group.typing.dependencies]
mypy = "^0.991"
langchain-core = { path = "../../core", develop = true }
types-requests = "^2.31.0.10"
types-pillow = "^10.2.0.20240125"
[tool.poetry.group.dev]
optional = true
[tool.poetry.group.dev.dependencies]
langchain-core = { path = "../../core", develop = true }
[tool.ruff.lint]
select = [
"E", # pycodestyle
"F", # pyflakes
"I", # isort
"T201", # print
]
[tool.mypy]
disallow_untyped_defs = "True"
exclude = ["notebooks", "examples", "example_data", "langchain_core/pydantic"]
[tool.coverage.run]
omit = ["tests/*"]
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.pytest.ini_options]
# --strict-markers will raise errors on unknown marks.
# https://docs.pytest.org/en/7.1.x/how-to/mark.html#raising-errors-on-unknown-marks
#
# https://docs.pytest.org/en/7.1.x/reference/reference.html
# --strict-config any warnings encountered while parsing the `pytest`
# section of the configuration file raise errors.
#
# https://github.com/tophat/syrupy
# --snapshot-warn-unused Prints a warning on unused snapshots rather than fail the test suite.
addopts = "--snapshot-warn-unused --strict-markers --strict-config --durations=5"
# Registering custom markers.
# https://docs.pytest.org/en/7.1.x/example/markers.html#registering-markers
markers = [
"requires: mark tests as requiring a specific library",
"asyncio: mark tests as requiring asyncio",
"compile: mark placeholder test used to compile integration tests without running them",
]
asyncio_mode = "auto"

@ -1,17 +0,0 @@
import sys
import traceback
from importlib.machinery import SourceFileLoader
if __name__ == "__main__":
files = sys.argv[1:]
has_failure = False
for file in files:
try:
SourceFileLoader("x", file).load_module()
except Exception:
has_faillure = True
print(file) # noqa: T201
traceback.print_exc()
print() # noqa: T201
sys.exit(1 if has_failure else 0)

@ -1,27 +0,0 @@
#!/bin/bash
#
# This script searches for lines starting with "import pydantic" or "from pydantic"
# in tracked files within a Git repository.
#
# Usage: ./scripts/check_pydantic.sh /path/to/repository
# Check if a path argument is provided
if [ $# -ne 1 ]; then
echo "Usage: $0 /path/to/repository"
exit 1
fi
repository_path="$1"
# Search for lines matching the pattern within the specified repository
result=$(git -C "$repository_path" grep -E '^import pydantic|^from pydantic')
# Check if any matching lines were found
if [ -n "$result" ]; then
echo "ERROR: The following lines need to be updated:"
echo "$result"
echo "Please replace the code with an import from langchain_core.pydantic_v1."
echo "For example, replace 'from pydantic import BaseModel'"
echo "with 'from langchain_core.pydantic_v1 import BaseModel'"
exit 1
fi

@ -1,17 +0,0 @@
#!/bin/bash
set -eu
# Initialize a variable to keep track of errors
errors=0
# make sure not importing from langchain or langchain_experimental
git --no-pager grep '^from langchain\.' . && errors=$((errors+1))
git --no-pager grep '^from langchain_experimental\.' . && errors=$((errors+1))
# Decide on an exit status based on the errors
if [ "$errors" -gt 0 ]; then
exit 1
else
exit 0
fi

@ -1,96 +0,0 @@
"""Test ChatNVIDIA chat model."""
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
from langchain_nvidia_ai_endpoints.chat_models import ChatNVIDIA
def test_chat_ai_endpoints() -> None:
"""Test ChatNVIDIA wrapper."""
chat = ChatNVIDIA(
model="llama2_13b",
temperature=0.7,
)
message = HumanMessage(content="Hello")
response = chat.invoke([message])
assert isinstance(response, BaseMessage)
assert isinstance(response.content, str)
def test_chat_ai_endpoints_model() -> None:
"""Test wrapper handles model."""
chat = ChatNVIDIA(model="mistral")
assert chat.model == "mistral"
def test_chat_ai_endpoints_system_message() -> None:
"""Test wrapper with system message."""
chat = ChatNVIDIA(model="llama2_13b", max_tokens=36)
system_message = SystemMessage(content="You are to chat with the user.")
human_message = HumanMessage(content="Hello")
response = chat([system_message, human_message])
assert isinstance(response, BaseMessage)
assert isinstance(response.content, str)
## TODO: Not sure if we want to support the n syntax. Trash or keep test
def test_ai_endpoints_streaming() -> None:
"""Test streaming tokens from ai endpoints."""
llm = ChatNVIDIA(model="llama2_13b", max_tokens=36)
for token in llm.stream("I'm Pickle Rick"):
assert isinstance(token.content, str)
async def test_ai_endpoints_astream() -> None:
"""Test streaming tokens from ai endpoints."""
llm = ChatNVIDIA(model="llama2_13b", max_tokens=35)
async for token in llm.astream("I'm Pickle Rick"):
assert isinstance(token.content, str)
async def test_ai_endpoints_abatch() -> None:
"""Test streaming tokens."""
llm = ChatNVIDIA(model="llama2_13b", max_tokens=36)
result = await llm.abatch(["I'm Pickle Rick", "I'm not Pickle Rick"])
for token in result:
assert isinstance(token.content, str)
async def test_ai_endpoints_abatch_tags() -> None:
"""Test batch tokens."""
llm = ChatNVIDIA(model="llama2_13b", max_tokens=55)
result = await llm.abatch(
["I'm Pickle Rick", "I'm not Pickle Rick"], config={"tags": ["foo"]}
)
for token in result:
assert isinstance(token.content, str)
def test_ai_endpoints_batch() -> None:
"""Test batch tokens."""
llm = ChatNVIDIA(model="llama2_13b", max_tokens=60)
result = llm.batch(["I'm Pickle Rick", "I'm not Pickle Rick"])
for token in result:
assert isinstance(token.content, str)
async def test_ai_endpoints_ainvoke() -> None:
"""Test invoke tokens."""
llm = ChatNVIDIA(model="llama2_13b", max_tokens=60)
result = await llm.ainvoke("I'm Pickle Rick", config={"tags": ["foo"]})
assert isinstance(result.content, str)
def test_ai_endpoints_invoke() -> None:
"""Test invoke tokens."""
llm = ChatNVIDIA(model="llama2_13b", max_tokens=60)
result = llm.invoke("I'm Pickle Rick", config=dict(tags=["foo"]))
assert isinstance(result.content, str)

@ -1,7 +0,0 @@
import pytest
@pytest.mark.compile
def test_placeholder() -> None:
"""Used for compiling integration tests without running any real tests."""
pass

@ -1,48 +0,0 @@
"""Test NVIDIA AI Foundation Model Embeddings.
Note: These tests are designed to validate the functionality of NVIDIAEmbeddings.
"""
from langchain_nvidia_ai_endpoints import NVIDIAEmbeddings
def test_nvai_play_embedding_documents() -> None:
"""Test NVIDIA embeddings for documents."""
documents = ["foo bar"]
embedding = NVIDIAEmbeddings(model="nvolveqa_40k")
output = embedding.embed_documents(documents)
assert len(output) == 1
assert len(output[0]) == 1024 # Assuming embedding size is 2048
def test_nvai_play_embedding_documents_multiple() -> None:
"""Test NVIDIA embeddings for multiple documents."""
documents = ["foo bar", "bar foo", "foo"]
embedding = NVIDIAEmbeddings(model="nvolveqa_40k")
output = embedding.embed_documents(documents)
assert len(output) == 3
assert all(len(doc) == 1024 for doc in output)
def test_nvai_play_embedding_query() -> None:
"""Test NVIDIA embeddings for a single query."""
query = "foo bar"
embedding = NVIDIAEmbeddings(model="nvolveqa_40k")
output = embedding.embed_query(query)
assert len(output) == 1024
async def test_nvai_play_embedding_async_query() -> None:
"""Test NVIDIA async embeddings for a single query."""
query = "foo bar"
embedding = NVIDIAEmbeddings(model="nvolveqa_40k")
output = await embedding.aembed_query(query)
assert len(output) == 1024
async def test_nvai_play_embedding_async_documents() -> None:
"""Test NVIDIA async embeddings for multiple documents."""
documents = ["foo bar", "bar foo", "foo"]
embedding = NVIDIAEmbeddings(model="nvolveqa_40k")
output = await embedding.aembed_documents(documents)
assert len(output) == 3
assert all(len(doc) == 1024 for doc in output)

@ -1,16 +0,0 @@
"""Test chat model integration."""
from langchain_nvidia_ai_endpoints.chat_models import ChatNVIDIA
def test_integration_initialization() -> None:
"""Test chat model initialization."""
ChatNVIDIA(
model="llama2_13b",
nvidia_api_key="nvapi-...",
temperature=0.5,
top_p=0.9,
max_tokens=50,
)
ChatNVIDIA(model="mistral", nvidia_api_key="nvapi-...")

@ -1,7 +0,0 @@
from langchain_nvidia_ai_endpoints import __all__
EXPECTED_ALL = ["ChatNVIDIA", "NVIDIAEmbeddings"]
def test_all_imports() -> None:
assert sorted(EXPECTED_ALL) == sorted(__all__)

@ -1 +0,0 @@
__pycache__

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2023 LangChain, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

@ -1,59 +0,0 @@
.PHONY: all format lint test tests integration_tests docker_tests help extended_tests
# Default target executed when no arguments are given to make.
all: help
# Define a variable for the test file path.
TEST_FILE ?= tests/unit_tests/
test:
poetry run pytest $(TEST_FILE)
tests:
poetry run pytest $(TEST_FILE)
######################
# LINTING AND FORMATTING
######################
# Define a variable for Python and notebook files.
PYTHON_FILES=.
MYPY_CACHE=.mypy_cache
lint format: PYTHON_FILES=.
lint_diff format_diff: PYTHON_FILES=$(shell git diff --relative=libs/partners/nvidia-trt --name-only --diff-filter=d master | grep -E '\.py$$|\.ipynb$$')
lint_package: PYTHON_FILES=langchain_nvidia_trt
lint_tests: PYTHON_FILES=tests
lint_tests: MYPY_CACHE=.mypy_cache_test
lint lint_diff lint_package lint_tests:
poetry run ruff .
poetry run ruff format $(PYTHON_FILES) --diff
poetry run ruff --select I $(PYTHON_FILES)
mkdir $(MYPY_CACHE); poetry run mypy $(PYTHON_FILES) --cache-dir $(MYPY_CACHE)
format format_diff:
poetry run ruff format $(PYTHON_FILES)
poetry run ruff --select I --fix $(PYTHON_FILES)
spell_check:
poetry run codespell --toml pyproject.toml
spell_fix:
poetry run codespell --toml pyproject.toml -w
check_imports: $(shell find langchain_nvidia_trt -name '*.py')
poetry run python ./scripts/check_imports.py $^
######################
# HELP
######################
help:
@echo '----'
@echo 'check_imports - check imports'
@echo 'format - run code formatters'
@echo 'lint - run linters'
@echo 'test - run unit tests'
@echo 'tests - run unit tests'
@echo 'test TEST_FILE=<test_file> - run all tests in file'

@ -1 +1,3 @@
# langchain-nvidia-trt
This package has moved!
https://github.com/langchain-ai/langchain-nvidia/tree/main/libs/trt

@ -1,106 +0,0 @@
{
"cells": [
{
"cell_type": "raw",
"id": "67db2992",
"metadata": {},
"source": [
"---\n",
"sidebar_label: TritonTensorRT\n",
"---"
]
},
{
"cell_type": "markdown",
"id": "b56b221d",
"metadata": {},
"source": [
"# Nvidia Triton+TRT-LLM\n",
"\n",
"Nvidia's Triton is an inference server that provides an API style access to hosted LLM models. Likewise, Nvidia TensorRT-LLM, often abbreviated as TRT-LLM, is a GPU accelerated SDK for running optimizations and inference on LLM models. This connector allows for Langchain to remotely interact with a Triton inference server over GRPC or HTTP to performance accelerated inference operations.\n",
"\n",
"[Triton Inference Server Github](https://github.com/triton-inference-server/server)\n",
"\n",
"\n",
"## TritonTensorRTLLM\n",
"\n",
"This example goes over how to use LangChain to interact with `TritonTensorRT` LLMs. To install, run the following command:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "59c710c4",
"metadata": {},
"outputs": [],
"source": [
"# install package\n",
"%pip install -U langchain-nvidia-trt"
]
},
{
"cell_type": "markdown",
"id": "0ee90032",
"metadata": {},
"source": [
"## Create the Triton+TRT-LLM instance\n",
"\n",
"Remember that a Triton instance represents a running server instance therefore you should ensure you have a valid server configuration running and change the `localhost:8001` to the correct IP/hostname:port combination for your server.\n",
"\n",
"An example of setting up this environment can be found at Nvidia's (GenerativeAIExamples Github Repo)[https://github.com/NVIDIA/GenerativeAIExamples/tree/main/RetrievalAugmentedGeneration]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "035dea0f",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from langchain_core.prompts import PromptTemplate\n",
"from langchain_nvidia_trt.llms import TritonTensorRTLLM\n",
"\n",
"template = \"\"\"Question: {question}\n",
"\n",
"Answer: Let's think step by step.\"\"\"\n",
"\n",
"prompt = PromptTemplate.from_template(template)\n",
"\n",
"# Connect to the TRT-LLM Llama-2 model running on the Triton server at the url below\n",
"triton_llm = TritonTensorRTLLM(server_url =\"localhost:8001\", model_name=\"ensemble\", tokens=500)\n",
"\n",
"chain = prompt | triton_llm \n",
"\n",
"chain.invoke({\"question\": \"What is LangChain?\"})"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.9"
},
"vscode": {
"interpreter": {
"hash": "e971737741ff4ec9aff7dc6155a1060a59a8a6d52c757dbbe66bf8ee389494b1"
}
}
},
"nbformat": 4,
"nbformat_minor": 5
}

@ -1,3 +0,0 @@
from langchain_nvidia_trt.llms import TritonTensorRTLLM
__all__ = ["TritonTensorRTLLM"]

@ -1,411 +0,0 @@
from __future__ import annotations
import json
import queue
import random
import time
from functools import partial
from typing import Any, Dict, Iterator, List, Optional, Sequence, Union
import google.protobuf.json_format
import numpy as np
import tritonclient.grpc as grpcclient
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.language_models import BaseLLM
from langchain_core.outputs import Generation, GenerationChunk, LLMResult
from langchain_core.pydantic_v1 import Field, root_validator
from tritonclient.grpc.service_pb2 import ModelInferResponse
from tritonclient.utils import np_to_triton_dtype
class TritonTensorRTError(Exception):
"""Base exception for TritonTensorRT."""
class TritonTensorRTRuntimeError(TritonTensorRTError, RuntimeError):
"""Runtime error for TritonTensorRT."""
class TritonTensorRTLLM(BaseLLM):
"""TRTLLM triton models.
Arguments:
server_url: (str) The URL of the Triton inference server to use.
model_name: (str) The name of the Triton TRT model to use.
temperature: (str) Temperature to use for sampling
top_p: (float) The top-p value to use for sampling
top_k: (float) The top k values use for sampling
beam_width: (int) Last n number of tokens to penalize
repetition_penalty: (int) Last n number of tokens to penalize
length_penalty: (float) The penalty to apply repeated tokens
tokens: (int) The maximum number of tokens to generate.
client: The client object used to communicate with the inference server
verbose_client: flag to pass to the client on creation
Example:
.. code-block:: python
from langchain_nvidia_trt import TritonTensorRTLLM
model = TritonTensorRTLLM()
"""
server_url: Optional[str] = Field(None, alias="server_url")
model_name: str = Field(
..., description="The name of the model to use, such as 'ensemble'."
)
## Optional args for the model
temperature: float = 1.0
top_p: float = 0
top_k: int = 1
tokens: int = 100
beam_width: int = 1
repetition_penalty: float = 1.0
length_penalty: float = 1.0
client: grpcclient.InferenceServerClient
stop: List[str] = Field(
default_factory=lambda: ["</s>"], description="Stop tokens."
)
seed: int = Field(42, description="The seed to use for random generation.")
load_model: bool = Field(
True,
description="Request the inference server to load the specified model.\
Certain Triton configurations do not allow for this operation.",
)
verbose_client: bool = False
def __del__(self):
"""Ensure the client streaming connection is properly shutdown"""
self.client.close()
@root_validator(pre=True, allow_reuse=True)
def validate_environment(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Validate that python package exists in environment."""
if not values.get("client"):
values["client"] = grpcclient.InferenceServerClient(
values["server_url"], verbose=values.get("verbose_client", False)
)
return values
@property
def _llm_type(self) -> str:
"""Return type of LLM."""
return "nvidia-trt-llm"
@property
def _model_default_parameters(self) -> Dict[str, Any]:
return {
"tokens": self.tokens,
"top_k": self.top_k,
"top_p": self.top_p,
"temperature": self.temperature,
"repetition_penalty": self.repetition_penalty,
"length_penalty": self.length_penalty,
"beam_width": self.beam_width,
}
@property
def _identifying_params(self) -> Dict[str, Any]:
"""Get all the identifying parameters."""
return {
"server_url": self.server_url,
"model_name": self.model_name,
**self._model_default_parameters,
}
def _get_invocation_params(self, **kwargs: Any) -> Dict[str, Any]:
return {**self._model_default_parameters, **kwargs}
def get_model_list(self) -> List[str]:
"""Get a list of models loaded in the triton server."""
res = self.client.get_model_repository_index(as_json=True)
return [model["name"] for model in res["models"]]
def _load_model(self, model_name: str, timeout: int = 1000) -> None:
"""Load a model into the server."""
if self.client.is_model_ready(model_name):
return
self.client.load_model(model_name)
t0 = time.perf_counter()
t1 = t0
while not self.client.is_model_ready(model_name) and t1 - t0 < timeout:
t1 = time.perf_counter()
if not self.client.is_model_ready(model_name):
raise TritonTensorRTRuntimeError(
f"Failed to load {model_name} on Triton in {timeout}s"
)
def _generate(
self,
prompts: List[str],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> LLMResult:
self._load_model(self.model_name)
invocation_params = self._get_invocation_params(**kwargs)
stop_words = stop if stop is not None else self.stop
generations = []
# TODO: We should handle the native batching instead.
for prompt in prompts:
invoc_params = {**invocation_params, "prompt": [[prompt]]}
result: str = self._request(
self.model_name,
stop=stop_words,
**invoc_params,
)
generations.append([Generation(text=result, generation_info={})])
return LLMResult(generations=generations)
def _stream(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[GenerationChunk]:
self._load_model(self.model_name)
invocation_params = self._get_invocation_params(**kwargs, prompt=[[prompt]])
stop_words = stop if stop is not None else self.stop
inputs = self._generate_inputs(stream=True, **invocation_params)
outputs = self._generate_outputs()
result_queue = self._invoke_triton(self.model_name, inputs, outputs, stop_words)
for token in result_queue:
if run_manager:
run_manager.on_llm_new_token(token)
yield GenerationChunk(text=token)
self.client.stop_stream()
##### BELOW ARE METHODS PREVIOUSLY ONLY IN THE GRPC CLIENT
def _request(
self,
model_name: str,
prompt: Sequence[Sequence[str]],
stop: Optional[List[str]] = None,
**params: Any,
) -> str:
"""Request inferencing from the triton server."""
# create model inputs and outputs
inputs = self._generate_inputs(stream=False, prompt=prompt, **params)
outputs = self._generate_outputs()
result_queue = self._invoke_triton(self.model_name, inputs, outputs, stop)
result_str = ""
try:
for token in result_queue:
if isinstance(token, Exception):
raise token
result_str += token
finally:
self.client.stop_stream()
return result_str
def _invoke_triton(self, model_name, inputs, outputs, stop_words):
if not self.client.is_model_ready(model_name):
raise RuntimeError("Cannot request streaming, model is not loaded")
request_id = str(random.randint(1, 9999999)) # nosec
result_queue = StreamingResponseGenerator(
self,
request_id,
force_batch=False,
stop_words=stop_words,
)
self.client.start_stream(
callback=partial(
self._stream_callback,
result_queue,
stop_words=stop_words,
)
)
# Even though this request may not be a streaming request certain configurations
# in Triton prevent the GRPC server from accepting none streaming connections.
# Therefore we call the streaming API and combine the streamed results.
self.client.async_stream_infer(
model_name=model_name,
inputs=inputs,
outputs=outputs,
request_id=request_id,
)
return result_queue
def _generate_outputs(
self,
) -> List[grpcclient.InferRequestedOutput]:
"""Generate the expected output structure."""
return [grpcclient.InferRequestedOutput("text_output")]
def _prepare_tensor(
self, name: str, input_data: np.ndarray
) -> grpcclient.InferInput:
"""Prepare an input data structure."""
t = grpcclient.InferInput(
name, input_data.shape, np_to_triton_dtype(input_data.dtype)
)
t.set_data_from_numpy(input_data)
return t
def _generate_inputs(
self,
prompt: Sequence[Sequence[str]],
tokens: int = 300,
temperature: float = 1.0,
top_k: float = 1,
top_p: float = 0,
beam_width: int = 1,
repetition_penalty: float = 1,
length_penalty: float = 1.0,
stream: bool = True,
) -> List[grpcclient.InferRequestedOutput]:
"""Create the input for the triton inference server."""
query = np.array(prompt).astype(object)
request_output_len = np.array([tokens]).astype(np.uint32).reshape((1, -1))
runtime_top_k = np.array([top_k]).astype(np.uint32).reshape((1, -1))
runtime_top_p = np.array([top_p]).astype(np.float32).reshape((1, -1))
temperature_array = np.array([temperature]).astype(np.float32).reshape((1, -1))
len_penalty = np.array([length_penalty]).astype(np.float32).reshape((1, -1))
repetition_penalty_array = (
np.array([repetition_penalty]).astype(np.float32).reshape((1, -1))
)
random_seed = np.array([self.seed]).astype(np.uint64).reshape((1, -1))
beam_width_array = np.array([beam_width]).astype(np.uint32).reshape((1, -1))
streaming_data = np.array([[stream]], dtype=bool)
inputs = [
self._prepare_tensor("text_input", query),
self._prepare_tensor("max_tokens", request_output_len),
self._prepare_tensor("top_k", runtime_top_k),
self._prepare_tensor("top_p", runtime_top_p),
self._prepare_tensor("temperature", temperature_array),
self._prepare_tensor("length_penalty", len_penalty),
self._prepare_tensor("repetition_penalty", repetition_penalty_array),
self._prepare_tensor("random_seed", random_seed),
self._prepare_tensor("beam_width", beam_width_array),
self._prepare_tensor("stream", streaming_data),
]
return inputs
def _send_stop_signals(self, model_name: str, request_id: str) -> None:
"""Send the stop signal to the Triton Inference server."""
stop_inputs = self._generate_stop_signals()
self.client.async_stream_infer(
model_name,
stop_inputs,
request_id=request_id,
parameters={"Streaming": True},
)
def _generate_stop_signals(
self,
) -> List[grpcclient.InferInput]:
"""Generate the signal to stop the stream."""
inputs = [
grpcclient.InferInput("input_ids", [1, 1], "INT32"),
grpcclient.InferInput("input_lengths", [1, 1], "INT32"),
grpcclient.InferInput("request_output_len", [1, 1], "UINT32"),
grpcclient.InferInput("stop", [1, 1], "BOOL"),
]
inputs[0].set_data_from_numpy(np.empty([1, 1], dtype=np.int32))
inputs[1].set_data_from_numpy(np.zeros([1, 1], dtype=np.int32))
inputs[2].set_data_from_numpy(np.array([[0]], dtype=np.uint32))
inputs[3].set_data_from_numpy(np.array([[True]], dtype="bool"))
return inputs
@staticmethod
def _process_result(result: Dict[str, str]) -> str:
"""Post-process the result from the server."""
message = ModelInferResponse()
google.protobuf.json_format.Parse(json.dumps(result), message)
infer_result = grpcclient.InferResult(message)
np_res = infer_result.as_numpy("text_output")
generated_text = ""
if np_res is not None:
generated_text = "".join([token.decode() for token in np_res])
return generated_text
def _stream_callback(
self,
result_queue: queue.Queue[Union[Optional[Dict[str, str]], str]],
result: grpcclient.InferResult,
error: str,
stop_words: List[str],
) -> None:
"""Add streamed result to queue."""
if error:
result_queue.put(error)
else:
response_raw: dict = result.get_response(as_json=True)
# TODO: Check the response is a map rather than a string
if "outputs" in response_raw:
# the very last response might have no output, just the final flag
response = self._process_result(response_raw)
if response in stop_words:
result_queue.put(None)
else:
result_queue.put(response)
if response_raw["parameters"]["triton_final_response"]["bool_param"]:
# end of the generation
result_queue.put(None)
def stop_stream(
self, model_name: str, request_id: str, signal: bool = True
) -> None:
"""Close the streaming connection."""
if signal:
self._send_stop_signals(model_name, request_id)
self.client.stop_stream()
class StreamingResponseGenerator(queue.Queue):
"""A Generator that provides the inference results from an LLM."""
def __init__(
self,
llm: TritonTensorRTLLM,
request_id: str,
force_batch: bool,
stop_words: Sequence[str],
) -> None:
"""Instantiate the generator class."""
super().__init__()
self.llm = llm
self.request_id = request_id
self._batch = force_batch
self._stop_words = stop_words
def __iter__(self) -> StreamingResponseGenerator:
"""Return self as a generator."""
return self
def __next__(self) -> str:
"""Return the next retrieved token."""
val = self.get()
if val is None or val in self._stop_words:
self.llm.stop_stream(
self.llm.model_name, self.request_id, signal=not self._batch
)
raise StopIteration()
return val

@ -1,4 +0,0 @@
[mypy]
# Empty global config
[mypy-tritonclient.*]
ignore_missing_imports = True

File diff suppressed because it is too large Load Diff

@ -1,96 +0,0 @@
[tool.poetry]
name = "langchain-nvidia-trt"
version = "0.0.1"
description = "An integration package connecting TritonTensorRT and LangChain"
authors = []
readme = "README.md"
repository = "https://github.com/langchain-ai/langchain"
license = "MIT"
[tool.poetry.urls]
"Source Code" = "https://github.com/langchain-ai/langchain/tree/master/libs/partners/nvidia-trt"
[tool.poetry.dependencies]
python = ">=3.8.1,<4.0"
langchain-core = "^0.1"
tritonclient = { extras = ["grpc"], version = "^2.42.0" }
lint = "^1.2.1"
types-protobuf = "^4.24.0.4"
protobuf = "^3.5.0"
[tool.poetry.group.test]
optional = true
[tool.poetry.group.test.dependencies]
pytest = "^7.3.0"
freezegun = "^1.2.2"
pytest-mock = "^3.10.0"
syrupy = "^4.0.2"
pytest-watcher = "^0.3.4"
pytest-asyncio = "^0.21.1"
langchain-core = { path = "../../core", develop = true }
[tool.poetry.group.codespell]
optional = true
[tool.poetry.group.codespell.dependencies]
codespell = "^2.2.0"
[tool.poetry.group.test_integration]
optional = true
[tool.poetry.group.test_integration.dependencies]
[tool.poetry.group.lint]
optional = true
[tool.poetry.group.lint.dependencies]
ruff = "^0.1.5"
[tool.poetry.group.typing.dependencies]
mypy = "^0.991"
langchain-core = { path = "../../core", develop = true }
[tool.poetry.group.dev]
optional = true
[tool.poetry.group.dev.dependencies]
langchain-core = { path = "../../core", develop = true }
[tool.ruff.lint]
select = [
"E", # pycodestyle
"F", # pyflakes
"I", # isort
"T201", # print
]
[tool.mypy]
disallow_untyped_defs = "True"
[tool.coverage.run]
omit = ["tests/*"]
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.pytest.ini_options]
# --strict-markers will raise errors on unknown marks.
# https://docs.pytest.org/en/7.1.x/how-to/mark.html#raising-errors-on-unknown-marks
#
# https://docs.pytest.org/en/7.1.x/reference/reference.html
# --strict-config any warnings encountered while parsing the `pytest`
# section of the configuration file raise errors.
#
# https://github.com/tophat/syrupy
# --snapshot-warn-unused Prints a warning on unused snapshots rather than fail the test suite.
addopts = "--snapshot-warn-unused --strict-markers --strict-config --durations=5"
# Registering custom markers.
# https://docs.pytest.org/en/7.1.x/example/markers.html#registering-markers
markers = [
"requires: mark tests as requiring a specific library",
"asyncio: mark tests as requiring asyncio",
"compile: mark placeholder test used to compile integration tests without running them",
]
asyncio_mode = "auto"

@ -1,17 +0,0 @@
import sys
import traceback
from importlib.machinery import SourceFileLoader
if __name__ == "__main__":
files = sys.argv[1:]
has_failure = False
for file in files:
try:
SourceFileLoader("x", file).load_module()
except Exception:
has_faillure = True
print(file) # noqa: T201
traceback.print_exc()
print() # noqa: T201
sys.exit(1 if has_failure else 0)

@ -1,27 +0,0 @@
#!/bin/bash
#
# This script searches for lines starting with "import pydantic" or "from pydantic"
# in tracked files within a Git repository.
#
# Usage: ./scripts/check_pydantic.sh /path/to/repository
# Check if a path argument is provided
if [ $# -ne 1 ]; then
echo "Usage: $0 /path/to/repository"
exit 1
fi
repository_path="$1"
# Search for lines matching the pattern within the specified repository
result=$(git -C "$repository_path" grep -E '^import pydantic|^from pydantic')
# Check if any matching lines were found
if [ -n "$result" ]; then
echo "ERROR: The following lines need to be updated:"
echo "$result"
echo "Please replace the code with an import from langchain_core.pydantic_v1."
echo "For example, replace 'from pydantic import BaseModel'"
echo "with 'from langchain_core.pydantic_v1 import BaseModel'"
exit 1
fi

@ -1,17 +0,0 @@
#!/bin/bash
set -eu
# Initialize a variable to keep track of errors
errors=0
# make sure not importing from langchain or langchain_experimental
git --no-pager grep '^from langchain\.' . && errors=$((errors+1))
git --no-pager grep '^from langchain_experimental\.' . && errors=$((errors+1))
# Decide on an exit status based on the errors
if [ "$errors" -gt 0 ]; then
exit 1
else
exit 0
fi

@ -1,7 +0,0 @@
import pytest
@pytest.mark.compile
def test_placeholder() -> None:
"""Used for compiling integration tests without running any real tests."""
pass

@ -1,74 +0,0 @@
"""Test TritonTensorRTLLM llm."""
import pytest
from langchain_nvidia_trt.llms import TritonTensorRTLLM
_MODEL_NAME = "ensemble"
@pytest.mark.skip(reason="Need a working Triton server")
def test_stream() -> None:
"""Test streaming tokens from NVIDIA TRT."""
llm = TritonTensorRTLLM(model_name=_MODEL_NAME)
for token in llm.stream("I'm Pickle Rick"):
assert isinstance(token, str)
@pytest.mark.skip(reason="Need a working Triton server")
async def test_astream() -> None:
"""Test streaming tokens from NVIDIA TRT."""
llm = TritonTensorRTLLM(model_name=_MODEL_NAME)
async for token in llm.astream("I'm Pickle Rick"):
assert isinstance(token, str)
@pytest.mark.skip(reason="Need a working Triton server")
async def test_abatch() -> None:
"""Test streaming tokens from TritonTensorRTLLM."""
llm = TritonTensorRTLLM(model_name=_MODEL_NAME)
result = await llm.abatch(["I'm Pickle Rick", "I'm not Pickle Rick"])
for token in result:
assert isinstance(token, str)
@pytest.mark.skip(reason="Need a working Triton server")
async def test_abatch_tags() -> None:
"""Test batch tokens from TritonTensorRTLLM."""
llm = TritonTensorRTLLM(model_name=_MODEL_NAME)
result = await llm.abatch(
["I'm Pickle Rick", "I'm not Pickle Rick"], config={"tags": ["foo"]}
)
for token in result:
assert isinstance(token, str)
@pytest.mark.skip(reason="Need a working Triton server")
def test_batch() -> None:
"""Test batch tokens from TritonTensorRTLLM."""
llm = TritonTensorRTLLM(model_name=_MODEL_NAME)
result = llm.batch(["I'm Pickle Rick", "I'm not Pickle Rick"])
for token in result:
assert isinstance(token, str)
@pytest.mark.skip(reason="Need a working Triton server")
async def test_ainvoke() -> None:
"""Test invoke tokens from TritonTensorRTLLM."""
llm = TritonTensorRTLLM(model_name=_MODEL_NAME)
result = await llm.ainvoke("I'm Pickle Rick", config={"tags": ["foo"]})
assert isinstance(result, str)
@pytest.mark.skip(reason="Need a working Triton server")
def test_invoke() -> None:
"""Test invoke tokens from TritonTensorRTLLM."""
llm = TritonTensorRTLLM(model_name=_MODEL_NAME)
result = llm.invoke("I'm Pickle Rick", config=dict(tags=["foo"]))
assert isinstance(result, str)

@ -1,7 +0,0 @@
from langchain_nvidia_trt import __all__
EXPECTED_ALL = ["TritonTensorRTLLM"]
def test_all_imports() -> None:
assert sorted(EXPECTED_ALL) == sorted(__all__)

@ -1,33 +0,0 @@
"""Test TritonTensorRT Chat API wrapper."""
import sys
from io import StringIO
from unittest.mock import patch
from langchain_nvidia_trt import TritonTensorRTLLM
def test_initialization() -> None:
"""Test integration initialization."""
TritonTensorRTLLM(model_name="ensemble", server_url="http://localhost:8001")
@patch("tritonclient.grpc.service_pb2_grpc.GRPCInferenceServiceStub")
def test_default_verbose(ignore) -> None:
llm = TritonTensorRTLLM(server_url="http://localhost:8001", model_name="ensemble")
captured = StringIO()
sys.stdout = captured
llm.client.is_server_live()
sys.stdout = sys.__stdout__
assert "is_server_live" not in captured.getvalue()
@patch("tritonclient.grpc.service_pb2_grpc.GRPCInferenceServiceStub")
def test_verbose(ignore) -> None:
llm = TritonTensorRTLLM(
server_url="http://localhost:8001", model_name="ensemble", verbose_client=True
)
captured = StringIO()
sys.stdout = captured
llm.client.is_server_live()
sys.stdout = sys.__stdout__
assert "is_server_live" in captured.getvalue()
Loading…
Cancel
Save