Harrison/self hosted runhouse (#1154)

Co-authored-by: Donny Greenberg <dongreenberg2@gmail.com>
Co-authored-by: John Dagdelen <jdagdelen@users.noreply.github.com>
Co-authored-by: Harrison Chase <harrisonchase@Harrisons-MBP.attlocal.net>
Co-authored-by: Andrew White <white.d.andrew@gmail.com>
Co-authored-by: Peng Qu <82029664+pengqu123@users.noreply.github.com>
Co-authored-by: Matt Robinson <mthw.wm.robinson@gmail.com>
Co-authored-by: jeff <tangj1122@gmail.com>
Co-authored-by: Harrison Chase <harrisonchase@Harrisons-MacBook-Pro.local>
Co-authored-by: zanderchase <zander@unfold.ag>
Co-authored-by: Charles Frye <cfrye59@gmail.com>
Co-authored-by: zanderchase <zanderchase@gmail.com>
Co-authored-by: Shahriar Tajbakhsh <sh.tajbakhsh@gmail.com>
Co-authored-by: Stefan Keselj <skeselj@princeton.edu>
Co-authored-by: Francisco Ingham <fpingham@gmail.com>
Co-authored-by: Dhruv Anand <105786647+dhruv-anand-aintech@users.noreply.github.com>
Co-authored-by: cragwolfe <cragcw@gmail.com>
Co-authored-by: Anton Troynikov <atroyn@users.noreply.github.com>
Co-authored-by: William FH <13333726+hinthornw@users.noreply.github.com>
Co-authored-by: Oliver Klingefjord <oliver@klingefjord.com>
Co-authored-by: blob42 <contact@blob42.xyz>
Co-authored-by: blob42 <spike@w530>
Co-authored-by: Enrico Shippole <henryshippole@gmail.com>
Co-authored-by: Ibis Prevedello <ibiscp@gmail.com>
Co-authored-by: jped <jonathanped@gmail.com>
Co-authored-by: Justin Torre <justintorre75@gmail.com>
Co-authored-by: Ivan Vendrov <ivan@anthropic.com>
Co-authored-by: Sasmitha Manathunga <70096033+mmz-001@users.noreply.github.com>
Co-authored-by: Ankush Gola <9536492+agola11@users.noreply.github.com>
Co-authored-by: Matt Robinson <mrobinson@unstructuredai.io>
Co-authored-by: Jeff Huber <jeffchuber@gmail.com>
Co-authored-by: Akshay <64036106+akshayvkt@users.noreply.github.com>
Co-authored-by: Andrew Huang <jhuang16888@gmail.com>
Co-authored-by: rogerserper <124558887+rogerserper@users.noreply.github.com>
Co-authored-by: seanaedmiston <seane999@gmail.com>
Co-authored-by: Hasegawa Yuya <52068175+Hase-U@users.noreply.github.com>
Co-authored-by: Ivan Vendrov <ivendrov@gmail.com>
Co-authored-by: Chen Wu (吴尘) <henrychenwu@cmu.edu>
Co-authored-by: Dennis Antela Martinez <dennis.antela@gmail.com>
Co-authored-by: Maxime Vidal <max.vidal@hotmail.fr>
Co-authored-by: Rishabh Raizada <110235735+rishabh-ti@users.noreply.github.com>
searx-api
Harrison Chase 1 year ago committed by GitHub
parent af8f5c1a49
commit 9d6d8f85da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,31 @@
# Runhouse
This page covers how to use the [Runhouse](https://github.com/run-house/runhouse) ecosystem within LangChain.
It is broken into three parts: installation and setup, LLMs, and Embeddings.
## Installation and Setup
- Install the Python SDK with `pip install runhouse`
- If you'd like to use on-demand cluster, check your cloud credentials with `sky check`
## Self-hosted LLMs
For a basic self-hosted LLM, you can use the `SelfHostedHuggingFaceLLM` class. For more
custom LLMs, you can use the `SelfHostedPipeline` parent class.
```python
from langchain.llms import SelfHostedPipeline, SelfHostedHuggingFaceLLM
```
For a more detailed walkthrough of the Self-hosted LLMs, see [this notebook](../modules/llms/integrations/self_hosted_examples.ipynb)
## Self-hosted Embeddings
There are several ways to use self-hosted embeddings with LangChain via Runhouse.
For a basic self-hosted embedding from a Hugging Face Transformers model, you can use
the `SelfHostedEmbedding` class.
```python
from langchain.llms import SelfHostedPipeline, SelfHostedHuggingFaceLLM
```
For a more detailed walkthrough of the Self-hosted Embeddings, see [this notebook](../modules/utils/combine_docs_examples/embeddings.ipynb)
##

@ -27,6 +27,8 @@ The examples here are all "how-to" guides for how to integrate with various LLM
`Anthropic <./integrations/anthropic_example.html>`_: Covers how to use Anthropic models with Langchain.
`Self-Hosted Models (via Runhouse) <./integrations/self_hosted_examples.html>`_: Covers how to run models on existing or on-demand remote compute with Langchain.
.. toctree::
:maxdepth: 1

@ -0,0 +1,296 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "9597802c",
"metadata": {},
"source": [
"# Self-Hosted Models via Runhouse\n",
"This example goes over how to use LangChain and [Runhouse](https://github.com/run-house/runhouse) to interact with models hosted on your own GPU, or on-demand GPUs on AWS, GCP, AWS, or Lambda.\n",
"\n",
"For more information, see [Runhouse](https://github.com/run-house/runhouse) or the [Runhouse docs](https://runhouse-docs.readthedocs-hosted.com/en/latest/)."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6fb585dd",
"metadata": {},
"outputs": [],
"source": [
"from langchain.llms import SelfHostedPipeline, SelfHostedHuggingFaceLLM\n",
"from langchain import PromptTemplate, LLMChain\n",
"import runhouse as rh"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "06d6866e",
"metadata": {},
"outputs": [],
"source": [
"# For an on-demand A100 with GCP, Azure, or Lambda\n",
"gpu = rh.cluster(name=\"rh-a10x\", instance_type=\"A100:1\", use_spot=False)\n",
"\n",
"# For an on-demand A10G with AWS (no single A100s on AWS)\n",
"# gpu = rh.cluster(name='rh-a10x', instance_type='g5.2xlarge', provider='aws')\n",
"\n",
"# For an existing cluster\n",
"# gpu = rh.cluster(ips=['<ip of the cluster>'], \n",
"# ssh_creds={'ssh_user': '...', 'ssh_private_key':'<path_to_key>'},\n",
"# name='rh-a10x')"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "035dea0f",
"metadata": {},
"outputs": [],
"source": [
"template = \"\"\"Question: {question}\n",
"\n",
"Answer: Let's think step by step.\"\"\"\n",
"\n",
"prompt = PromptTemplate(template=template, input_variables=[\"question\"])"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3f3458d9",
"metadata": {},
"outputs": [],
"source": [
"llm = SelfHostedHuggingFaceLLM(model_id=\"gpt2\", hardware=gpu, model_reqs=[\"pip:./\", \"transformers\", \"torch\"])"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "a641dbd9",
"metadata": {},
"outputs": [],
"source": [
"llm_chain = LLMChain(prompt=prompt, llm=llm)"
]
},
{
"cell_type": "code",
"execution_count": 31,
"id": "6fb6fdb2",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO | 2023-02-17 05:42:23,537 | Running _generate_text via gRPC\n",
"INFO | 2023-02-17 05:42:24,016 | Time to send message: 0.48 seconds\n"
]
},
{
"data": {
"text/plain": [
"\"\\n\\nLet's say we're talking sports teams who won the Super Bowl in the year Justin Beiber\""
]
},
"execution_count": 31,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"question = \"What NFL team won the Super Bowl in the year Justin Beiber was born?\"\n",
"\n",
"llm_chain.run(question)"
]
},
{
"cell_type": "markdown",
"id": "c88709cd",
"metadata": {},
"source": [
"You can also load more custom models through the SelfHostedHuggingFaceLLM interface:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "22820c5a",
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"llm = SelfHostedHuggingFaceLLM(\n",
" model_id=\"google/flan-t5-small\",\n",
" task=\"text2text-generation\",\n",
" hardware=gpu,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 39,
"id": "1528e70f",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO | 2023-02-17 05:54:21,681 | Running _generate_text via gRPC\n",
"INFO | 2023-02-17 05:54:21,937 | Time to send message: 0.25 seconds\n"
]
},
{
"data": {
"text/plain": [
"'berlin'"
]
},
"execution_count": 39,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"llm(\"What is the capital of Germany?\")"
]
},
{
"cell_type": "markdown",
"id": "7a0c3746",
"metadata": {},
"source": [
"Using a custom load function, we can load a custom pipeline directly on the remote hardware:"
]
},
{
"cell_type": "code",
"execution_count": 34,
"id": "893eb1d3",
"metadata": {},
"outputs": [],
"source": [
"def load_pipeline():\n",
" from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline # Need to be inside the fn in notebooks\n",
" model_id = \"gpt2\"\n",
" tokenizer = AutoTokenizer.from_pretrained(model_id)\n",
" model = AutoModelForCausalLM.from_pretrained(model_id)\n",
" pipe = pipeline(\n",
" \"text-generation\", model=model, tokenizer=tokenizer, max_new_tokens=10\n",
" )\n",
" return pipe\n",
"\n",
"def inference_fn(pipeline, prompt, stop = None):\n",
" return pipeline(prompt)[0][\"generated_text\"][len(prompt):]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "087d50dc",
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"llm = SelfHostedHuggingFaceLLM(model_load_fn=load_pipeline, hardware=gpu, inference_fn=inference_fn)"
]
},
{
"cell_type": "code",
"execution_count": 36,
"id": "feb8da8e",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO | 2023-02-17 05:42:59,219 | Running _generate_text via gRPC\n",
"INFO | 2023-02-17 05:42:59,522 | Time to send message: 0.3 seconds\n"
]
},
{
"data": {
"text/plain": [
"'john w. bush'"
]
},
"execution_count": 36,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"llm(\"Who is the current US president?\")"
]
},
{
"cell_type": "markdown",
"id": "af08575f",
"metadata": {},
"source": [
"You can send your pipeline directly over the wire to your model, but this will only work for small models (<2 Gb), and will be pretty slow:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d23023b9",
"metadata": {},
"outputs": [],
"source": [
"pipeline = load_pipeline()\n",
"llm = SelfHostedPipeline.from_pipeline(\n",
" pipeline=pipeline, hardware=gpu, model_reqs=model_reqs\n",
")"
]
},
{
"cell_type": "markdown",
"id": "fcb447a1",
"metadata": {},
"source": [
"Instead, we can also send it to the hardware's filesystem, which will be much faster."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7206b7d6",
"metadata": {},
"outputs": [],
"source": [
"rh.blob(pickle.dumps(pipeline), path=\"models/pipeline.pkl\").save().to(gpu, path=\"models\")\n",
"\n",
"llm = SelfHostedPipeline.from_pipeline(pipeline=\"models/pipeline.pkl\", hardware=gpu)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.15"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

@ -313,13 +313,156 @@
"query_result = embeddings.embed_query(text)"
]
},
{
"cell_type": "markdown",
"id": "eec4efda",
"metadata": {},
"source": [
"## Self Hosted Embeddings\n",
"Let's load the SelfHostedEmbeddings, SelfHostedHuggingFaceEmbeddings, and SelfHostedHuggingFaceInstructEmbeddings classes."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d338722a",
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"from langchain.embeddings import (\n",
" SelfHostedEmbeddings, \n",
" SelfHostedHuggingFaceEmbeddings, \n",
" SelfHostedHuggingFaceInstructEmbeddings\n",
")\n",
"import runhouse as rh"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "146559e8",
"metadata": {},
"outputs": [],
"source": [
"# For an on-demand A100 with GCP, Azure, or Lambda\n",
"gpu = rh.cluster(name=\"rh-a10x\", instance_type=\"A100:1\", use_spot=False)\n",
"\n",
"# For an on-demand A10G with AWS (no single A100s on AWS)\n",
"# gpu = rh.cluster(name='rh-a10x', instance_type='g5.2xlarge', provider='aws')\n",
"\n",
"# For an existing cluster\n",
"# gpu = rh.cluster(ips=['<ip of the cluster>'], \n",
"# ssh_creds={'ssh_user': '...', 'ssh_private_key':'<path_to_key>'},\n",
"# name='my-cluster')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1230f7df",
"metadata": {},
"outputs": [],
"source": [
"embeddings = SelfHostedHuggingFaceEmbeddings(hardware=gpu)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "2684e928",
"metadata": {},
"outputs": [],
"source": [
"text = \"This is a test document.\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1dc5e606",
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"query_result = embeddings.embed_query(text)"
]
},
{
"cell_type": "markdown",
"id": "cef9cc54",
"metadata": {},
"source": [
"And similarly for SelfHostedHuggingFaceInstructEmbeddings:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a961cdb5",
"id": "81a17ca3",
"metadata": {},
"outputs": [],
"source": []
"source": [
"embeddings = SelfHostedHuggingFaceInstructEmbeddings(hardware=gpu)"
]
},
{
"cell_type": "markdown",
"id": "5a33d1c8",
"metadata": {},
"source": [
"Now let's load an embedding model with a custom load function:"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "c4af5679",
"metadata": {},
"outputs": [],
"source": [
"def get_pipeline():\n",
" from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline # Must be inside the function in notebooks\n",
" model_id = \"facebook/bart-base\"\n",
" tokenizer = AutoTokenizer.from_pretrained(model_id)\n",
" model = AutoModelForCausalLM.from_pretrained(model_id)\n",
" return pipeline(\"feature-extraction\", model=model, tokenizer=tokenizer)\n",
"\n",
"def inference_fn(pipeline, prompt):\n",
" # Return last hidden state of the model\n",
" if isinstance(prompt, list):\n",
" return [emb[0][-1] for emb in pipeline(prompt)] \n",
" return pipeline(prompt)[0][-1]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8654334b",
"metadata": {},
"outputs": [],
"source": [
"embeddings = SelfHostedEmbeddings(\n",
" model_load_fn=get_pipeline, \n",
" hardware=gpu,\n",
" model_reqs=[\"./\", \"torch\", \"transformers\"],\n",
" inference_fn=inference_fn\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fc1bfd0f",
"metadata": {
"scrolled": false
},
"outputs": [],
"source": [
"query_result = embeddings.embed_query(text)"
]
}
],
"metadata": {
@ -338,7 +481,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.1"
"version": "3.10.9"
},
"vscode": {
"interpreter": {

@ -9,6 +9,11 @@ from langchain.embeddings.huggingface import (
)
from langchain.embeddings.huggingface_hub import HuggingFaceHubEmbeddings
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.embeddings.self_hosted import SelfHostedEmbeddings
from langchain.embeddings.self_hosted_hugging_face import (
SelfHostedHuggingFaceEmbeddings,
SelfHostedHuggingFaceInstructEmbeddings,
)
from langchain.embeddings.tensorflow_hub import TensorflowHubEmbeddings
logger = logging.getLogger(__name__)
@ -20,6 +25,9 @@ __all__ = [
"HuggingFaceHubEmbeddings",
"TensorflowHubEmbeddings",
"HuggingFaceInstructEmbeddings",
"SelfHostedEmbeddings",
"SelfHostedHuggingFaceEmbeddings",
"SelfHostedHuggingFaceInstructEmbeddings",
]

@ -0,0 +1,103 @@
"""Running custom embedding models on self-hosted remote hardware."""
from typing import Any, Callable, List
from pydantic import BaseModel, Extra
from langchain.embeddings.base import Embeddings
from langchain.llms import SelfHostedPipeline
def _embed_documents(pipeline: Any, *args: Any, **kwargs: Any) -> List[List[float]]:
"""Inference function to send to the remote hardware.
Accepts a sentence_transformer model_id and
returns a list of embeddings for each document in the batch.
"""
return pipeline(*args, **kwargs)
class SelfHostedEmbeddings(SelfHostedPipeline, Embeddings, BaseModel):
"""Runs custom embedding models on self-hosted remote hardware.
Supported hardware includes auto-launched instances on AWS, GCP, Azure,
and Lambda, as well as servers specified
by IP address and SSH credentials (such as on-prem, or another
cloud like Paperspace, Coreweave, etc.).
To use, you should have the ``runhouse`` python package installed.
Example using a model load function:
.. code-block:: python
from langchain.embeddings import SelfHostedEmbeddings
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
import runhouse as rh
gpu = rh.cluster(name="rh-a10x", instance_type="A100:1")
def get_pipeline():
model_id = "facebook/bart-large"
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(model_id)
return pipeline("feature-extraction", model=model, tokenizer=tokenizer)
embeddings = SelfHostedEmbeddings(
model_load_fn=get_pipeline,
hardware=gpu
model_reqs=["./", "torch", "transformers"],
)
Example passing in a pipeline path:
.. code-block:: python
from langchain.embeddings import SelfHostedHFEmbeddings
import runhouse as rh
from transformers import pipeline
gpu = rh.cluster(name="rh-a10x", instance_type="A100:1")
pipeline = pipeline(model="bert-base-uncased", task="feature-extraction")
rh.blob(pickle.dumps(pipeline),
path="models/pipeline.pkl").save().to(gpu, path="models")
embeddings = SelfHostedHFEmbeddings.from_pipeline(
pipeline="models/pipeline.pkl",
hardware=gpu,
model_reqs=["./", "torch", "transformers"],
)
"""
inference_fn: Callable = _embed_documents
"""Inference function to extract the embeddings on the remote hardware."""
inference_kwargs: Any = None
"""Any kwargs to pass to the model's inference function."""
class Config:
"""Configuration for this pydantic object."""
extra = Extra.forbid
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Compute doc embeddings using a HuggingFace transformer model.
Args:
texts: The list of texts to embed.s
Returns:
List of embeddings, one for each text.
"""
texts = list(map(lambda x: x.replace("\n", " "), texts))
embeddings = self.client(self.pipeline_ref, texts)
if not isinstance(embeddings, list):
return embeddings.tolist()
return embeddings
def embed_query(self, text: str) -> List[float]:
"""Compute query embeddings using a HuggingFace transformer model.
Args:
text: The text to embed.
Returns:
Embeddings for the text.
"""
text = text.replace("\n", " ")
embeddings = self.client(self.pipeline_ref, text)
if not isinstance(embeddings, list):
return embeddings.tolist()
return embeddings

@ -0,0 +1,171 @@
"""Wrapper around HuggingFace embedding models for self-hosted remote hardware."""
import importlib
import logging
from typing import Any, Callable, List, Optional
from pydantic import BaseModel
from langchain.embeddings.self_hosted import SelfHostedEmbeddings
DEFAULT_MODEL_NAME = "sentence-transformers/all-mpnet-base-v2"
DEFAULT_INSTRUCT_MODEL = "hkunlp/instructor-large"
DEFAULT_EMBED_INSTRUCTION = "Represent the document for retrieval: "
DEFAULT_QUERY_INSTRUCTION = (
"Represent the question for retrieving supporting documents: "
)
logger = logging.getLogger(__name__)
def _embed_documents(client: Any, *args: Any, **kwargs: Any) -> List[List[float]]:
"""Inference function to send to the remote hardware.
Accepts a sentence_transformer model_id and
returns a list of embeddings for each document in the batch.
"""
return client.encode(*args, **kwargs)
def load_embedding_model(model_id: str, instruct: bool = False, device: int = 0) -> Any:
"""Load the embedding model."""
if not instruct:
import sentence_transformers
client = sentence_transformers.SentenceTransformer(model_id)
else:
from InstructorEmbedding import INSTRUCTOR
client = INSTRUCTOR(model_id)
if importlib.util.find_spec("torch") is not None:
import torch
cuda_device_count = torch.cuda.device_count()
if device < -1 or (device >= cuda_device_count):
raise ValueError(
f"Got device=={device}, "
f"device is required to be within [-1, {cuda_device_count})"
)
if device < 0 and cuda_device_count > 0:
logger.warning(
"Device has %d GPUs available. "
"Provide device={deviceId} to `from_model_id` to use available"
"GPUs for execution. deviceId is -1 for CPU and "
"can be a positive integer associated with CUDA device id.",
cuda_device_count,
)
client = client.to(device)
return client
class SelfHostedHuggingFaceEmbeddings(SelfHostedEmbeddings, BaseModel):
"""Runs sentence_transformers embedding models on self-hosted remote hardware.
Supported hardware includes auto-launched instances on AWS, GCP, Azure,
and Lambda, as well as servers specified
by IP address and SSH credentials (such as on-prem, or another cloud
like Paperspace, Coreweave, etc.).
To use, you should have the ``runhouse`` python package installed.
Example:
.. code-block:: python
from langchain.embeddings import SelfHostedHuggingFaceEmbeddings
import runhouse as rh
model_name = "sentence-transformers/all-mpnet-base-v2"
gpu = rh.cluster(name="rh-a10x", instance_type="A100:1")
hf = SelfHostedHuggingFaceEmbeddings(model_name=model_name, hardware=gpu)
"""
client: Any #: :meta private:
model_id: str = DEFAULT_MODEL_NAME
"""Model name to use."""
model_reqs: List[str] = ["./", "sentence_transformers", "torch"]
"""Requirements to install on hardware to inference the model."""
hardware: Any
"""Remote hardware to send the inference function to."""
model_load_fn: Callable = load_embedding_model
"""Function to load the model remotely on the server."""
load_fn_kwargs: Optional[dict] = None
"""Key word arguments to pass to the model load function."""
inference_fn: Callable = _embed_documents
"""Inference function to extract the embeddings."""
def __init__(self, **kwargs: Any):
"""Initialize the remote inference function."""
load_fn_kwargs = kwargs.pop("load_fn_kwargs", {})
load_fn_kwargs["model_id"] = load_fn_kwargs.get("model_id", DEFAULT_MODEL_NAME)
load_fn_kwargs["instruct"] = load_fn_kwargs.get("instruct", False)
load_fn_kwargs["device"] = load_fn_kwargs.get("device", 0)
super().__init__(load_fn_kwargs=load_fn_kwargs, **kwargs)
class SelfHostedHuggingFaceInstructEmbeddings(SelfHostedHuggingFaceEmbeddings):
"""Runs InstructorEmbedding embedding models on self-hosted remote hardware.
Supported hardware includes auto-launched instances on AWS, GCP, Azure,
and Lambda, as well as servers specified
by IP address and SSH credentials (such as on-prem, or another
cloud like Paperspace, Coreweave, etc.).
To use, you should have the ``runhouse`` python package installed.
Example:
.. code-block:: python
from langchain.embeddings import SelfHostedHuggingFaceInstructEmbeddings
import runhouse as rh
model_name = "hkunlp/instructor-large"
gpu = rh.cluster(name='rh-a10x', instance_type='A100:1')
hf = SelfHostedHuggingFaceInstructEmbeddings(
model_name=model_name, hardware=gpu)
"""
model_id: str = DEFAULT_INSTRUCT_MODEL
"""Model name to use."""
embed_instruction: str = DEFAULT_EMBED_INSTRUCTION
"""Instruction to use for embedding documents."""
query_instruction: str = DEFAULT_QUERY_INSTRUCTION
"""Instruction to use for embedding query."""
model_reqs: List[str] = ["./", "InstructorEmbedding", "torch"]
"""Requirements to install on hardware to inference the model."""
def __init__(self, **kwargs: Any):
"""Initialize the remote inference function."""
load_fn_kwargs = kwargs.pop("load_fn_kwargs", {})
load_fn_kwargs["model_id"] = load_fn_kwargs.get(
"model_id", DEFAULT_INSTRUCT_MODEL
)
load_fn_kwargs["instruct"] = load_fn_kwargs.get("instruct", True)
load_fn_kwargs["device"] = load_fn_kwargs.get("device", 0)
super().__init__(load_fn_kwargs=load_fn_kwargs, **kwargs)
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Compute doc embeddings using a HuggingFace instruct model.
Args:
texts: The list of texts to embed.
Returns:
List of embeddings, one for each text.
"""
instruction_pairs = []
for text in texts:
instruction_pairs.append([self.embed_instruction, text])
embeddings = self.client(self.pipeline_ref, instruction_pairs)
return embeddings.tolist()
def embed_query(self, text: str) -> List[float]:
"""Compute query embeddings using a HuggingFace instruct model.
Args:
text: The text to embed.
Returns:
Embeddings for the text.
"""
instruction_pair = [self.query_instruction, text]
embedding = self.client(self.pipeline_ref, [instruction_pair])[0]
return embedding.tolist()

@ -14,6 +14,8 @@ from langchain.llms.nlpcloud import NLPCloud
from langchain.llms.openai import AzureOpenAI, OpenAI
from langchain.llms.petals import Petals
from langchain.llms.promptlayer_openai import PromptLayerOpenAI
from langchain.llms.self_hosted import SelfHostedPipeline
from langchain.llms.self_hosted_hugging_face import SelfHostedHuggingFaceLLM
__all__ = [
"Anthropic",
@ -28,6 +30,8 @@ __all__ = [
"HuggingFacePipeline",
"AI21",
"AzureOpenAI",
"SelfHostedPipeline",
"SelfHostedHuggingFaceLLM",
"PromptLayerOpenAI",
]
@ -44,4 +48,6 @@ type_to_cls_dict: Dict[str, Type[BaseLLM]] = {
"petals": Petals,
"huggingface_pipeline": HuggingFacePipeline,
"azure": AzureOpenAI,
"self_hosted": SelfHostedPipeline,
"self_hosted_hugging_face": SelfHostedHuggingFaceLLM,
}

@ -0,0 +1,212 @@
"""Run model inference on self-hosted remote hardware."""
import importlib.util
import logging
import pickle
from typing import Any, Callable, List, Mapping, Optional
from pydantic import BaseModel, Extra
from langchain.llms.base import LLM
from langchain.llms.utils import enforce_stop_tokens
logger = logging.getLogger()
def _generate_text(
pipeline: Any,
prompt: str,
*args: Any,
stop: Optional[List[str]] = None,
**kwargs: Any,
) -> str:
"""Inference function to send to the remote hardware.
Accepts a pipeline callable (or, more likely,
a key pointing to the model on the cluster's object store)
and returns text predictions for each document
in the batch.
"""
text = pipeline(prompt, *args, **kwargs)
if stop is not None:
text = enforce_stop_tokens(text, stop)
return text
def _send_pipeline_to_device(pipeline: Any, device: int) -> Any:
"""Send a pipeline to a device on the cluster."""
if isinstance(pipeline, str):
with open(pipeline, "rb") as f:
pipeline = pickle.load(f)
if importlib.util.find_spec("torch") is not None:
import torch
cuda_device_count = torch.cuda.device_count()
if device < -1 or (device >= cuda_device_count):
raise ValueError(
f"Got device=={device}, "
f"device is required to be within [-1, {cuda_device_count})"
)
if device < 0 and cuda_device_count > 0:
logger.warning(
"Device has %d GPUs available. "
"Provide device={deviceId} to `from_model_id` to use available"
"GPUs for execution. deviceId is -1 for CPU and "
"can be a positive integer associated with CUDA device id.",
cuda_device_count,
)
pipeline.device = torch.device(device)
pipeline.model = pipeline.model.to(pipeline.device)
return pipeline
class SelfHostedPipeline(LLM, BaseModel):
"""Run model inference on self-hosted remote hardware.
Supported hardware includes auto-launched instances on AWS, GCP, Azure,
and Lambda, as well as servers specified
by IP address and SSH credentials (such as on-prem, or another
cloud like Paperspace, Coreweave, etc.).
To use, you should have the ``runhouse`` python package installed.
Example for custom pipeline and inference functions:
.. code-block:: python
from langchain.llms import SelfHostedPipeline
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
import runhouse as rh
def load_pipeline():
tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
return pipeline(
"text-generation", model=model, tokenizer=tokenizer,
max_new_tokens=10
)
def inference_fn(pipeline, prompt, stop = None):
return pipeline(prompt)[0]["generated_text"]
gpu = rh.cluster(name="rh-a10x", instance_type="A100:1")
llm = SelfHostedPipeline(
model_load_fn=load_pipeline,
hardware=gpu,
model_reqs=model_reqs, inference_fn=inference_fn
)
Example for <2GB model (can be serialized and sent directly to the server):
.. code-block:: python
from langchain.llms import SelfHostedPipeline
import runhouse as rh
gpu = rh.cluster(name="rh-a10x", instance_type="A100:1")
my_model = ...
llm = SelfHostedPipeline.from_pipeline(
pipeline=my_model,
hardware=gpu,
model_reqs=["./", "torch", "transformers"],
)
Example passing model path for larger models:
.. code-block:: python
from langchain.llms import SelfHostedPipeline
import runhouse as rh
import pickle
from transformers import pipeline
generator = pipeline(model="gpt2")
rh.blob(pickle.dumps(generator), path="models/pipeline.pkl"
).save().to(gpu, path="models")
llm = SelfHostedPipeline.from_pipeline(
pipeline="models/pipeline.pkl",
hardware=gpu,
model_reqs=["./", "torch", "transformers"],
)
"""
pipeline_ref: Any #: :meta private:
client: Any #: :meta private:
inference_fn: Callable = _generate_text #: :meta private:
"""Inference function to send to the remote hardware."""
hardware: Any
"""Remote hardware to send the inference function to."""
model_load_fn: Callable
"""Function to load the model remotely on the server."""
load_fn_kwargs: Optional[dict] = None
"""Key word arguments to pass to the model load function."""
model_reqs: List[str] = ["./", "torch"]
"""Requirements to install on hardware to inference the model."""
class Config:
"""Configuration for this pydantic object."""
extra = Extra.forbid
def __init__(self, **kwargs: Any):
"""Init the pipeline with an auxiliary function.
The load function must be in global scope to be imported
and run on the server, i.e. in a module and not a REPL or closure.
Then, initialize the remote inference function.
"""
super().__init__(**kwargs)
try:
import runhouse as rh
except ImportError:
raise ValueError(
"Could not import runhouse python package. "
"Please install it with `pip install runhouse`."
)
remote_load_fn = rh.function(fn=self.model_load_fn).to(
self.hardware, reqs=self.model_reqs
)
_load_fn_kwargs = self.load_fn_kwargs or {}
self.pipeline_ref = remote_load_fn.remote(**_load_fn_kwargs)
self.client = rh.function(fn=self.inference_fn).to(
self.hardware, reqs=self.model_reqs
)
@classmethod
def from_pipeline(
cls,
pipeline: Any,
hardware: Any,
model_reqs: Optional[List[str]] = None,
device: int = 0,
**kwargs: Any,
) -> LLM:
"""Init the SelfHostedPipeline from a pipeline object or string."""
if not isinstance(pipeline, str):
logger.warning(
"Serializing pipeline to send to remote hardware. "
"Note, it can be quite slow"
"to serialize and send large models with each execution. "
"Consider sending the pipeline"
"to the cluster and passing the path to the pipeline instead."
)
load_fn_kwargs = {"pipeline": pipeline, "device": device}
return cls(
load_fn_kwargs=load_fn_kwargs,
model_load_fn=_send_pipeline_to_device,
hardware=hardware,
model_reqs=["transformers", "torch"] + (model_reqs or []),
**kwargs,
)
@property
def _identifying_params(self) -> Mapping[str, Any]:
"""Get the identifying parameters."""
return {
**{"hardware": self.hardware},
}
@property
def _llm_type(self) -> str:
return "self_hosted_llm"
def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str:
return self.client(pipeline=self.pipeline_ref, prompt=prompt, stop=stop)

@ -0,0 +1,202 @@
"""Wrapper around HuggingFace Pipeline API to run on self-hosted remote hardware."""
import importlib.util
import logging
from typing import Any, Callable, List, Mapping, Optional
from pydantic import BaseModel, Extra
from langchain.llms.self_hosted import SelfHostedPipeline
from langchain.llms.utils import enforce_stop_tokens
DEFAULT_MODEL_ID = "gpt2"
DEFAULT_TASK = "text-generation"
VALID_TASKS = ("text2text-generation", "text-generation")
logger = logging.getLogger()
def _generate_text(
pipeline: Any,
prompt: str,
*args: Any,
stop: Optional[List[str]] = None,
**kwargs: Any,
) -> str:
"""Inference function to send to the remote hardware.
Accepts a Hugging Face pipeline (or more likely,
a key pointing to such a pipeline on the cluster's object store)
and returns generated text.
"""
response = pipeline(prompt, *args, **kwargs)
if pipeline.task == "text-generation":
# Text generation return includes the starter text.
text = response[0]["generated_text"][len(prompt) :]
elif pipeline.task == "text2text-generation":
text = response[0]["generated_text"]
else:
raise ValueError(
f"Got invalid task {pipeline.task}, "
f"currently only {VALID_TASKS} are supported"
)
if stop is not None:
text = enforce_stop_tokens(text, stop)
return text
def _load_transformer(
model_id: str = DEFAULT_MODEL_ID,
task: str = DEFAULT_TASK,
device: int = 0,
model_kwargs: Optional[dict] = None,
) -> Any:
"""Inference function to send to the remote hardware.
Accepts a huggingface model_id and returns a pipeline for the task.
"""
from transformers import AutoModelForCausalLM, AutoModelForSeq2SeqLM, AutoTokenizer
from transformers import pipeline as hf_pipeline
_model_kwargs = model_kwargs or {}
tokenizer = AutoTokenizer.from_pretrained(model_id, **_model_kwargs)
try:
if task == "text-generation":
model = AutoModelForCausalLM.from_pretrained(model_id, **_model_kwargs)
elif task == "text2text-generation":
model = AutoModelForSeq2SeqLM.from_pretrained(model_id, **_model_kwargs)
else:
raise ValueError(
f"Got invalid task {task}, "
f"currently only {VALID_TASKS} are supported"
)
except ImportError as e:
raise ValueError(
f"Could not load the {task} model due to missing dependencies."
) from e
if importlib.util.find_spec("torch") is not None:
import torch
cuda_device_count = torch.cuda.device_count()
if device < -1 or (device >= cuda_device_count):
raise ValueError(
f"Got device=={device}, "
f"device is required to be within [-1, {cuda_device_count})"
)
if device < 0 and cuda_device_count > 0:
logger.warning(
"Device has %d GPUs available. "
"Provide device={deviceId} to `from_model_id` to use available"
"GPUs for execution. deviceId is -1 for CPU and "
"can be a positive integer associated with CUDA device id.",
cuda_device_count,
)
pipeline = hf_pipeline(
task=task,
model=model,
tokenizer=tokenizer,
device=device,
model_kwargs=_model_kwargs,
)
if pipeline.task not in VALID_TASKS:
raise ValueError(
f"Got invalid task {pipeline.task}, "
f"currently only {VALID_TASKS} are supported"
)
return pipeline
class SelfHostedHuggingFaceLLM(SelfHostedPipeline, BaseModel):
"""Wrapper around HuggingFace Pipeline API to run on self-hosted remote hardware.
Supported hardware includes auto-launched instances on AWS, GCP, Azure,
and Lambda, as well as servers specified
by IP address and SSH credentials (such as on-prem, or another cloud
like Paperspace, Coreweave, etc.).
To use, you should have the ``runhouse`` python package installed.
Only supports `text-generation` and `text2text-generation` for now.
Example using from_model_id:
.. code-block:: python
from langchain.llms import SelfHostedHuggingFaceLLM
import runhouse as rh
gpu = rh.cluster(name="rh-a10x", instance_type="A100:1")
hf = SelfHostedHuggingFaceLLM(
model_id="google/flan-t5-large", task="text2text-generation",
hardware=gpu
)
Example passing fn that generates a pipeline (bc the pipeline is not serializable):
.. code-block:: python
from langchain.llms import SelfHostedHuggingFaceLLM
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
import runhouse as rh
def get_pipeline():
model_id = "gpt2"
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(model_id)
pipe = pipeline(
"text-generation", model=model, tokenizer=tokenizer
)
return pipe
hf = SelfHostedHuggingFaceLLM(
model_load_fn=get_pipeline, model_id="gpt2", hardware=gpu)
"""
model_id: str = DEFAULT_MODEL_ID
"""Hugging Face model_id to load the model."""
task: str = DEFAULT_TASK
"""Hugging Face task (either "text-generation" or "text2text-generation")."""
device: int = 0
"""Device to use for inference. -1 for CPU, 0 for GPU, 1 for second GPU, etc."""
model_kwargs: Optional[dict] = None
"""Key word arguments to pass to the model."""
hardware: Any
"""Remote hardware to send the inference function to."""
model_reqs: List[str] = ["./", "transformers", "torch"]
"""Requirements to install on hardware to inference the model."""
model_load_fn: Callable = _load_transformer
"""Function to load the model remotely on the server."""
inference_fn: Callable = _generate_text #: :meta private:
"""Inference function to send to the remote hardware."""
class Config:
"""Configuration for this pydantic object."""
extra = Extra.forbid
def __init__(self, **kwargs: Any):
"""Construct the pipeline remotely using an auxiliary function.
The load function needs to be importable to be imported
and run on the server, i.e. in a module and not a REPL or closure.
Then, initialize the remote inference function.
"""
load_fn_kwargs = {
"model_id": kwargs.get("model_id", DEFAULT_MODEL_ID),
"task": kwargs.get("task", DEFAULT_TASK),
"device": kwargs.get("device", 0),
"model_kwargs": kwargs.get("model_kwargs", None),
}
super().__init__(load_fn_kwargs=load_fn_kwargs, **kwargs)
@property
def _identifying_params(self) -> Mapping[str, Any]:
"""Get the identifying parameters."""
return {
**{"model_id": self.model_id},
**{"model_kwargs": self.model_kwargs},
}
@property
def _llm_type(self) -> str:
return "selfhosted_huggingface_pipeline"
def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str:
return self.client(pipeline=self.pipeline_ref, prompt=prompt, stop=stop)

@ -0,0 +1,96 @@
"""Test self-hosted embeddings."""
from typing import Any
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from langchain.embeddings import (
SelfHostedEmbeddings,
SelfHostedHuggingFaceEmbeddings,
SelfHostedHuggingFaceInstructEmbeddings,
)
def get_remote_instance() -> Any:
"""Get remote instance for testing."""
import runhouse as rh
gpu = rh.cluster(name="rh-a10x", instance_type="A100:1", use_spot=False)
gpu.install_packages(["pip:./"])
return gpu
def test_self_hosted_huggingface_embedding_documents() -> None:
"""Test self-hosted huggingface embeddings."""
documents = ["foo bar"]
gpu = get_remote_instance()
embedding = SelfHostedHuggingFaceEmbeddings(hardware=gpu)
output = embedding.embed_documents(documents)
assert len(output) == 1
assert len(output[0]) == 768
def test_self_hosted_huggingface_embedding_query() -> None:
"""Test self-hosted huggingface embeddings."""
document = "foo bar"
gpu = get_remote_instance()
embedding = SelfHostedHuggingFaceEmbeddings(hardware=gpu)
output = embedding.embed_query(document)
assert len(output) == 768
def test_self_hosted_huggingface_instructor_embedding_documents() -> None:
"""Test self-hosted huggingface instruct embeddings."""
documents = ["foo bar"]
gpu = get_remote_instance()
embedding = SelfHostedHuggingFaceInstructEmbeddings(hardware=gpu)
output = embedding.embed_documents(documents)
assert len(output) == 1
assert len(output[0]) == 768
def test_self_hosted_huggingface_instructor_embedding_query() -> None:
"""Test self-hosted huggingface instruct embeddings."""
query = "foo bar"
gpu = get_remote_instance()
embedding = SelfHostedHuggingFaceInstructEmbeddings(hardware=gpu)
output = embedding.embed_query(query)
assert len(output) == 768
def get_pipeline() -> Any:
"""Get pipeline for testing."""
model_id = "facebook/bart-base"
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(model_id)
return pipeline("feature-extraction", model=model, tokenizer=tokenizer)
def inference_fn(pipeline: Any, prompt: str) -> Any:
"""Inference function for testing."""
# Return last hidden state of the model
if isinstance(prompt, list):
return [emb[0][-1] for emb in pipeline(prompt)]
return pipeline(prompt)[0][-1]
def test_self_hosted_embedding_documents() -> None:
"""Test self-hosted huggingface instruct embeddings."""
documents = ["foo bar"] * 2
gpu = get_remote_instance()
embedding = SelfHostedEmbeddings(
model_load_fn=get_pipeline, hardware=gpu, inference_fn=inference_fn
)
output = embedding.embed_documents(documents)
assert len(output) == 2
assert len(output[0]) == 50265
def test_self_hosted_embedding_query() -> None:
"""Test self-hosted custom embeddings."""
query = "foo bar"
gpu = get_remote_instance()
embedding = SelfHostedEmbeddings(
model_load_fn=get_pipeline, hardware=gpu, inference_fn=inference_fn
)
output = embedding.embed_query(query)
assert len(output) == 50265

@ -0,0 +1,105 @@
"""Test Self-hosted LLMs."""
import pickle
from typing import Any, List, Optional
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from langchain.llms import SelfHostedHuggingFaceLLM, SelfHostedPipeline
model_reqs = ["pip:./", "transformers", "torch"]
def get_remote_instance() -> Any:
"""Get remote instance for testing."""
import runhouse as rh
return rh.cluster(name="rh-a10x", instance_type="A100:1", use_spot=False)
def test_self_hosted_huggingface_pipeline_text_generation() -> None:
"""Test valid call to self-hosted HuggingFace text generation model."""
gpu = get_remote_instance()
llm = SelfHostedHuggingFaceLLM(
model_id="gpt2",
task="text-generation",
model_kwargs={"n_positions": 1024},
hardware=gpu,
model_reqs=model_reqs,
)
output = llm("Say foo:") # type: ignore
assert isinstance(output, str)
def test_self_hosted_huggingface_pipeline_text2text_generation() -> None:
"""Test valid call to self-hosted HuggingFace text2text generation model."""
gpu = get_remote_instance()
llm = SelfHostedHuggingFaceLLM(
model_id="google/flan-t5-small",
task="text2text-generation",
hardware=gpu,
model_reqs=model_reqs,
)
output = llm("Say foo:") # type: ignore
assert isinstance(output, str)
def load_pipeline() -> Any:
"""Load pipeline for testing."""
model_id = "gpt2"
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(model_id)
pipe = pipeline(
"text-generation", model=model, tokenizer=tokenizer, max_new_tokens=10
)
return pipe
def inference_fn(pipeline: Any, prompt: str, stop: Optional[List[str]] = None) -> str:
"""Inference function for testing."""
return pipeline(prompt)[0]["generated_text"]
def test_init_with_local_pipeline() -> None:
"""Test initialization with a self-hosted HF pipeline."""
gpu = get_remote_instance()
pipeline = load_pipeline()
llm = SelfHostedPipeline.from_pipeline(
pipeline=pipeline,
hardware=gpu,
model_reqs=model_reqs,
inference_fn=inference_fn,
)
output = llm("Say foo:") # type: ignore
assert isinstance(output, str)
def test_init_with_pipeline_path() -> None:
"""Test initialization with a self-hosted HF pipeline."""
gpu = get_remote_instance()
pipeline = load_pipeline()
import runhouse as rh
rh.blob(pickle.dumps(pipeline), path="models/pipeline.pkl").save().to(
gpu, path="models"
)
llm = SelfHostedPipeline.from_pipeline(
pipeline="models/pipeline.pkl",
hardware=gpu,
model_reqs=model_reqs,
inference_fn=inference_fn,
)
output = llm("Say foo:") # type: ignore
assert isinstance(output, str)
def test_init_with_pipeline_fn() -> None:
"""Test initialization with a self-hosted HF pipeline."""
gpu = get_remote_instance()
llm = SelfHostedPipeline(
model_load_fn=load_pipeline,
hardware=gpu,
model_reqs=model_reqs,
inference_fn=inference_fn,
)
output = llm("Say foo:") # type: ignore
assert isinstance(output, str)
Loading…
Cancel
Save