From 9d6d8f85daf85e17c57d552077a5a9140dd06973 Mon Sep 17 00:00:00 2001 From: Harrison Chase Date: Sun, 19 Feb 2023 09:53:45 -0800 Subject: [PATCH] Harrison/self hosted runhouse (#1154) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Donny Greenberg Co-authored-by: John Dagdelen Co-authored-by: Harrison Chase Co-authored-by: Andrew White Co-authored-by: Peng Qu <82029664+pengqu123@users.noreply.github.com> Co-authored-by: Matt Robinson Co-authored-by: jeff Co-authored-by: Harrison Chase Co-authored-by: zanderchase Co-authored-by: Charles Frye Co-authored-by: zanderchase Co-authored-by: Shahriar Tajbakhsh Co-authored-by: Stefan Keselj Co-authored-by: Francisco Ingham Co-authored-by: Dhruv Anand <105786647+dhruv-anand-aintech@users.noreply.github.com> Co-authored-by: cragwolfe Co-authored-by: Anton Troynikov Co-authored-by: William FH <13333726+hinthornw@users.noreply.github.com> Co-authored-by: Oliver Klingefjord Co-authored-by: blob42 Co-authored-by: blob42 Co-authored-by: Enrico Shippole Co-authored-by: Ibis Prevedello Co-authored-by: jped Co-authored-by: Justin Torre Co-authored-by: Ivan Vendrov Co-authored-by: Sasmitha Manathunga <70096033+mmz-001@users.noreply.github.com> Co-authored-by: Ankush Gola <9536492+agola11@users.noreply.github.com> Co-authored-by: Matt Robinson Co-authored-by: Jeff Huber Co-authored-by: Akshay <64036106+akshayvkt@users.noreply.github.com> Co-authored-by: Andrew Huang Co-authored-by: rogerserper <124558887+rogerserper@users.noreply.github.com> Co-authored-by: seanaedmiston Co-authored-by: Hasegawa Yuya <52068175+Hase-U@users.noreply.github.com> Co-authored-by: Ivan Vendrov Co-authored-by: Chen Wu (吴尘) Co-authored-by: Dennis Antela Martinez Co-authored-by: Maxime Vidal Co-authored-by: Rishabh Raizada <110235735+rishabh-ti@users.noreply.github.com> --- docs/ecosystem/runhouse.md | 31 ++ docs/modules/llms/integrations.rst | 2 + .../integrations/self_hosted_examples.ipynb | 296 ++++++++++++++++++ .../combine_docs_examples/embeddings.ipynb | 149 ++++++++- langchain/embeddings/__init__.py | 8 + langchain/embeddings/self_hosted.py | 103 ++++++ .../embeddings/self_hosted_hugging_face.py | 171 ++++++++++ langchain/llms/__init__.py | 6 + langchain/llms/self_hosted.py | 212 +++++++++++++ langchain/llms/self_hosted_hugging_face.py | 202 ++++++++++++ .../embeddings/test_self_hosted.py | 96 ++++++ .../llms/test_self_hosted_llm.py | 105 +++++++ 12 files changed, 1378 insertions(+), 3 deletions(-) create mode 100644 docs/ecosystem/runhouse.md create mode 100644 docs/modules/llms/integrations/self_hosted_examples.ipynb create mode 100644 langchain/embeddings/self_hosted.py create mode 100644 langchain/embeddings/self_hosted_hugging_face.py create mode 100644 langchain/llms/self_hosted.py create mode 100644 langchain/llms/self_hosted_hugging_face.py create mode 100644 tests/integration_tests/embeddings/test_self_hosted.py create mode 100644 tests/integration_tests/llms/test_self_hosted_llm.py diff --git a/docs/ecosystem/runhouse.md b/docs/ecosystem/runhouse.md new file mode 100644 index 00000000..28731cfe --- /dev/null +++ b/docs/ecosystem/runhouse.md @@ -0,0 +1,31 @@ +# Runhouse + +This page covers how to use the [Runhouse](https://github.com/run-house/runhouse) ecosystem within LangChain. +It is broken into three parts: installation and setup, LLMs, and Embeddings. + +## Installation and Setup +- Install the Python SDK with `pip install runhouse` +- If you'd like to use on-demand cluster, check your cloud credentials with `sky check` + +## Self-hosted LLMs +For a basic self-hosted LLM, you can use the `SelfHostedHuggingFaceLLM` class. For more +custom LLMs, you can use the `SelfHostedPipeline` parent class. + +```python +from langchain.llms import SelfHostedPipeline, SelfHostedHuggingFaceLLM +``` + +For a more detailed walkthrough of the Self-hosted LLMs, see [this notebook](../modules/llms/integrations/self_hosted_examples.ipynb) + +## Self-hosted Embeddings +There are several ways to use self-hosted embeddings with LangChain via Runhouse. + +For a basic self-hosted embedding from a Hugging Face Transformers model, you can use +the `SelfHostedEmbedding` class. +```python +from langchain.llms import SelfHostedPipeline, SelfHostedHuggingFaceLLM +``` + +For a more detailed walkthrough of the Self-hosted Embeddings, see [this notebook](../modules/utils/combine_docs_examples/embeddings.ipynb) + +## \ No newline at end of file diff --git a/docs/modules/llms/integrations.rst b/docs/modules/llms/integrations.rst index 36d22605..faf3bf87 100644 --- a/docs/modules/llms/integrations.rst +++ b/docs/modules/llms/integrations.rst @@ -27,6 +27,8 @@ The examples here are all "how-to" guides for how to integrate with various LLM `Anthropic <./integrations/anthropic_example.html>`_: Covers how to use Anthropic models with Langchain. +`Self-Hosted Models (via Runhouse) <./integrations/self_hosted_examples.html>`_: Covers how to run models on existing or on-demand remote compute with Langchain. + .. toctree:: :maxdepth: 1 diff --git a/docs/modules/llms/integrations/self_hosted_examples.ipynb b/docs/modules/llms/integrations/self_hosted_examples.ipynb new file mode 100644 index 00000000..b1174611 --- /dev/null +++ b/docs/modules/llms/integrations/self_hosted_examples.ipynb @@ -0,0 +1,296 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "9597802c", + "metadata": {}, + "source": [ + "# Self-Hosted Models via Runhouse\n", + "This example goes over how to use LangChain and [Runhouse](https://github.com/run-house/runhouse) to interact with models hosted on your own GPU, or on-demand GPUs on AWS, GCP, AWS, or Lambda.\n", + "\n", + "For more information, see [Runhouse](https://github.com/run-house/runhouse) or the [Runhouse docs](https://runhouse-docs.readthedocs-hosted.com/en/latest/)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6fb585dd", + "metadata": {}, + "outputs": [], + "source": [ + "from langchain.llms import SelfHostedPipeline, SelfHostedHuggingFaceLLM\n", + "from langchain import PromptTemplate, LLMChain\n", + "import runhouse as rh" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "06d6866e", + "metadata": {}, + "outputs": [], + "source": [ + "# For an on-demand A100 with GCP, Azure, or Lambda\n", + "gpu = rh.cluster(name=\"rh-a10x\", instance_type=\"A100:1\", use_spot=False)\n", + "\n", + "# For an on-demand A10G with AWS (no single A100s on AWS)\n", + "# gpu = rh.cluster(name='rh-a10x', instance_type='g5.2xlarge', provider='aws')\n", + "\n", + "# For an existing cluster\n", + "# gpu = rh.cluster(ips=[''], \n", + "# ssh_creds={'ssh_user': '...', 'ssh_private_key':''},\n", + "# name='rh-a10x')" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "035dea0f", + "metadata": {}, + "outputs": [], + "source": [ + "template = \"\"\"Question: {question}\n", + "\n", + "Answer: Let's think step by step.\"\"\"\n", + "\n", + "prompt = PromptTemplate(template=template, input_variables=[\"question\"])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3f3458d9", + "metadata": {}, + "outputs": [], + "source": [ + "llm = SelfHostedHuggingFaceLLM(model_id=\"gpt2\", hardware=gpu, model_reqs=[\"pip:./\", \"transformers\", \"torch\"])" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "a641dbd9", + "metadata": {}, + "outputs": [], + "source": [ + "llm_chain = LLMChain(prompt=prompt, llm=llm)" + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "id": "6fb6fdb2", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "INFO | 2023-02-17 05:42:23,537 | Running _generate_text via gRPC\n", + "INFO | 2023-02-17 05:42:24,016 | Time to send message: 0.48 seconds\n" + ] + }, + { + "data": { + "text/plain": [ + "\"\\n\\nLet's say we're talking sports teams who won the Super Bowl in the year Justin Beiber\"" + ] + }, + "execution_count": 31, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "question = \"What NFL team won the Super Bowl in the year Justin Beiber was born?\"\n", + "\n", + "llm_chain.run(question)" + ] + }, + { + "cell_type": "markdown", + "id": "c88709cd", + "metadata": {}, + "source": [ + "You can also load more custom models through the SelfHostedHuggingFaceLLM interface:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "22820c5a", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "llm = SelfHostedHuggingFaceLLM(\n", + " model_id=\"google/flan-t5-small\",\n", + " task=\"text2text-generation\",\n", + " hardware=gpu,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 39, + "id": "1528e70f", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "INFO | 2023-02-17 05:54:21,681 | Running _generate_text via gRPC\n", + "INFO | 2023-02-17 05:54:21,937 | Time to send message: 0.25 seconds\n" + ] + }, + { + "data": { + "text/plain": [ + "'berlin'" + ] + }, + "execution_count": 39, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "llm(\"What is the capital of Germany?\")" + ] + }, + { + "cell_type": "markdown", + "id": "7a0c3746", + "metadata": {}, + "source": [ + "Using a custom load function, we can load a custom pipeline directly on the remote hardware:" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "id": "893eb1d3", + "metadata": {}, + "outputs": [], + "source": [ + "def load_pipeline():\n", + " from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline # Need to be inside the fn in notebooks\n", + " model_id = \"gpt2\"\n", + " tokenizer = AutoTokenizer.from_pretrained(model_id)\n", + " model = AutoModelForCausalLM.from_pretrained(model_id)\n", + " pipe = pipeline(\n", + " \"text-generation\", model=model, tokenizer=tokenizer, max_new_tokens=10\n", + " )\n", + " return pipe\n", + "\n", + "def inference_fn(pipeline, prompt, stop = None):\n", + " return pipeline(prompt)[0][\"generated_text\"][len(prompt):]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "087d50dc", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "llm = SelfHostedHuggingFaceLLM(model_load_fn=load_pipeline, hardware=gpu, inference_fn=inference_fn)" + ] + }, + { + "cell_type": "code", + "execution_count": 36, + "id": "feb8da8e", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "INFO | 2023-02-17 05:42:59,219 | Running _generate_text via gRPC\n", + "INFO | 2023-02-17 05:42:59,522 | Time to send message: 0.3 seconds\n" + ] + }, + { + "data": { + "text/plain": [ + "'john w. bush'" + ] + }, + "execution_count": 36, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "llm(\"Who is the current US president?\")" + ] + }, + { + "cell_type": "markdown", + "id": "af08575f", + "metadata": {}, + "source": [ + "You can send your pipeline directly over the wire to your model, but this will only work for small models (<2 Gb), and will be pretty slow:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d23023b9", + "metadata": {}, + "outputs": [], + "source": [ + "pipeline = load_pipeline()\n", + "llm = SelfHostedPipeline.from_pipeline(\n", + " pipeline=pipeline, hardware=gpu, model_reqs=model_reqs\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "fcb447a1", + "metadata": {}, + "source": [ + "Instead, we can also send it to the hardware's filesystem, which will be much faster." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7206b7d6", + "metadata": {}, + "outputs": [], + "source": [ + "rh.blob(pickle.dumps(pipeline), path=\"models/pipeline.pkl\").save().to(gpu, path=\"models\")\n", + "\n", + "llm = SelfHostedPipeline.from_pipeline(pipeline=\"models/pipeline.pkl\", hardware=gpu)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.15" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/docs/modules/utils/combine_docs_examples/embeddings.ipynb b/docs/modules/utils/combine_docs_examples/embeddings.ipynb index 64917e51..bac1a35c 100644 --- a/docs/modules/utils/combine_docs_examples/embeddings.ipynb +++ b/docs/modules/utils/combine_docs_examples/embeddings.ipynb @@ -313,13 +313,156 @@ "query_result = embeddings.embed_query(text)" ] }, + { + "cell_type": "markdown", + "id": "eec4efda", + "metadata": {}, + "source": [ + "## Self Hosted Embeddings\n", + "Let's load the SelfHostedEmbeddings, SelfHostedHuggingFaceEmbeddings, and SelfHostedHuggingFaceInstructEmbeddings classes." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d338722a", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "from langchain.embeddings import (\n", + " SelfHostedEmbeddings, \n", + " SelfHostedHuggingFaceEmbeddings, \n", + " SelfHostedHuggingFaceInstructEmbeddings\n", + ")\n", + "import runhouse as rh" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "146559e8", + "metadata": {}, + "outputs": [], + "source": [ + "# For an on-demand A100 with GCP, Azure, or Lambda\n", + "gpu = rh.cluster(name=\"rh-a10x\", instance_type=\"A100:1\", use_spot=False)\n", + "\n", + "# For an on-demand A10G with AWS (no single A100s on AWS)\n", + "# gpu = rh.cluster(name='rh-a10x', instance_type='g5.2xlarge', provider='aws')\n", + "\n", + "# For an existing cluster\n", + "# gpu = rh.cluster(ips=[''], \n", + "# ssh_creds={'ssh_user': '...', 'ssh_private_key':''},\n", + "# name='my-cluster')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1230f7df", + "metadata": {}, + "outputs": [], + "source": [ + "embeddings = SelfHostedHuggingFaceEmbeddings(hardware=gpu)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "2684e928", + "metadata": {}, + "outputs": [], + "source": [ + "text = \"This is a test document.\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1dc5e606", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "query_result = embeddings.embed_query(text)" + ] + }, + { + "cell_type": "markdown", + "id": "cef9cc54", + "metadata": {}, + "source": [ + "And similarly for SelfHostedHuggingFaceInstructEmbeddings:" + ] + }, { "cell_type": "code", "execution_count": null, - "id": "a961cdb5", + "id": "81a17ca3", "metadata": {}, "outputs": [], - "source": [] + "source": [ + "embeddings = SelfHostedHuggingFaceInstructEmbeddings(hardware=gpu)" + ] + }, + { + "cell_type": "markdown", + "id": "5a33d1c8", + "metadata": {}, + "source": [ + "Now let's load an embedding model with a custom load function:" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "c4af5679", + "metadata": {}, + "outputs": [], + "source": [ + "def get_pipeline():\n", + " from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline # Must be inside the function in notebooks\n", + " model_id = \"facebook/bart-base\"\n", + " tokenizer = AutoTokenizer.from_pretrained(model_id)\n", + " model = AutoModelForCausalLM.from_pretrained(model_id)\n", + " return pipeline(\"feature-extraction\", model=model, tokenizer=tokenizer)\n", + "\n", + "def inference_fn(pipeline, prompt):\n", + " # Return last hidden state of the model\n", + " if isinstance(prompt, list):\n", + " return [emb[0][-1] for emb in pipeline(prompt)] \n", + " return pipeline(prompt)[0][-1]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8654334b", + "metadata": {}, + "outputs": [], + "source": [ + "embeddings = SelfHostedEmbeddings(\n", + " model_load_fn=get_pipeline, \n", + " hardware=gpu,\n", + " model_reqs=[\"./\", \"torch\", \"transformers\"],\n", + " inference_fn=inference_fn\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fc1bfd0f", + "metadata": { + "scrolled": false + }, + "outputs": [], + "source": [ + "query_result = embeddings.embed_query(text)" + ] } ], "metadata": { @@ -338,7 +481,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.1" + "version": "3.10.9" }, "vscode": { "interpreter": { diff --git a/langchain/embeddings/__init__.py b/langchain/embeddings/__init__.py index ee981a64..403616c5 100644 --- a/langchain/embeddings/__init__.py +++ b/langchain/embeddings/__init__.py @@ -9,6 +9,11 @@ from langchain.embeddings.huggingface import ( ) from langchain.embeddings.huggingface_hub import HuggingFaceHubEmbeddings from langchain.embeddings.openai import OpenAIEmbeddings +from langchain.embeddings.self_hosted import SelfHostedEmbeddings +from langchain.embeddings.self_hosted_hugging_face import ( + SelfHostedHuggingFaceEmbeddings, + SelfHostedHuggingFaceInstructEmbeddings, +) from langchain.embeddings.tensorflow_hub import TensorflowHubEmbeddings logger = logging.getLogger(__name__) @@ -20,6 +25,9 @@ __all__ = [ "HuggingFaceHubEmbeddings", "TensorflowHubEmbeddings", "HuggingFaceInstructEmbeddings", + "SelfHostedEmbeddings", + "SelfHostedHuggingFaceEmbeddings", + "SelfHostedHuggingFaceInstructEmbeddings", ] diff --git a/langchain/embeddings/self_hosted.py b/langchain/embeddings/self_hosted.py new file mode 100644 index 00000000..7e05617e --- /dev/null +++ b/langchain/embeddings/self_hosted.py @@ -0,0 +1,103 @@ +"""Running custom embedding models on self-hosted remote hardware.""" +from typing import Any, Callable, List + +from pydantic import BaseModel, Extra + +from langchain.embeddings.base import Embeddings +from langchain.llms import SelfHostedPipeline + + +def _embed_documents(pipeline: Any, *args: Any, **kwargs: Any) -> List[List[float]]: + """Inference function to send to the remote hardware. + + Accepts a sentence_transformer model_id and + returns a list of embeddings for each document in the batch. + """ + return pipeline(*args, **kwargs) + + +class SelfHostedEmbeddings(SelfHostedPipeline, Embeddings, BaseModel): + """Runs custom embedding models on self-hosted remote hardware. + + Supported hardware includes auto-launched instances on AWS, GCP, Azure, + and Lambda, as well as servers specified + by IP address and SSH credentials (such as on-prem, or another + cloud like Paperspace, Coreweave, etc.). + + To use, you should have the ``runhouse`` python package installed. + + Example using a model load function: + .. code-block:: python + + from langchain.embeddings import SelfHostedEmbeddings + from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline + import runhouse as rh + + gpu = rh.cluster(name="rh-a10x", instance_type="A100:1") + def get_pipeline(): + model_id = "facebook/bart-large" + tokenizer = AutoTokenizer.from_pretrained(model_id) + model = AutoModelForCausalLM.from_pretrained(model_id) + return pipeline("feature-extraction", model=model, tokenizer=tokenizer) + embeddings = SelfHostedEmbeddings( + model_load_fn=get_pipeline, + hardware=gpu + model_reqs=["./", "torch", "transformers"], + ) + Example passing in a pipeline path: + .. code-block:: python + + from langchain.embeddings import SelfHostedHFEmbeddings + import runhouse as rh + from transformers import pipeline + + gpu = rh.cluster(name="rh-a10x", instance_type="A100:1") + pipeline = pipeline(model="bert-base-uncased", task="feature-extraction") + rh.blob(pickle.dumps(pipeline), + path="models/pipeline.pkl").save().to(gpu, path="models") + embeddings = SelfHostedHFEmbeddings.from_pipeline( + pipeline="models/pipeline.pkl", + hardware=gpu, + model_reqs=["./", "torch", "transformers"], + ) + """ + + inference_fn: Callable = _embed_documents + """Inference function to extract the embeddings on the remote hardware.""" + inference_kwargs: Any = None + """Any kwargs to pass to the model's inference function.""" + + class Config: + """Configuration for this pydantic object.""" + + extra = Extra.forbid + + def embed_documents(self, texts: List[str]) -> List[List[float]]: + """Compute doc embeddings using a HuggingFace transformer model. + + Args: + texts: The list of texts to embed.s + + Returns: + List of embeddings, one for each text. + """ + texts = list(map(lambda x: x.replace("\n", " "), texts)) + embeddings = self.client(self.pipeline_ref, texts) + if not isinstance(embeddings, list): + return embeddings.tolist() + return embeddings + + def embed_query(self, text: str) -> List[float]: + """Compute query embeddings using a HuggingFace transformer model. + + Args: + text: The text to embed. + + Returns: + Embeddings for the text. + """ + text = text.replace("\n", " ") + embeddings = self.client(self.pipeline_ref, text) + if not isinstance(embeddings, list): + return embeddings.tolist() + return embeddings diff --git a/langchain/embeddings/self_hosted_hugging_face.py b/langchain/embeddings/self_hosted_hugging_face.py new file mode 100644 index 00000000..7675d1e4 --- /dev/null +++ b/langchain/embeddings/self_hosted_hugging_face.py @@ -0,0 +1,171 @@ +"""Wrapper around HuggingFace embedding models for self-hosted remote hardware.""" +import importlib +import logging +from typing import Any, Callable, List, Optional + +from pydantic import BaseModel + +from langchain.embeddings.self_hosted import SelfHostedEmbeddings + +DEFAULT_MODEL_NAME = "sentence-transformers/all-mpnet-base-v2" +DEFAULT_INSTRUCT_MODEL = "hkunlp/instructor-large" +DEFAULT_EMBED_INSTRUCTION = "Represent the document for retrieval: " +DEFAULT_QUERY_INSTRUCTION = ( + "Represent the question for retrieving supporting documents: " +) + +logger = logging.getLogger(__name__) + + +def _embed_documents(client: Any, *args: Any, **kwargs: Any) -> List[List[float]]: + """Inference function to send to the remote hardware. + + Accepts a sentence_transformer model_id and + returns a list of embeddings for each document in the batch. + """ + return client.encode(*args, **kwargs) + + +def load_embedding_model(model_id: str, instruct: bool = False, device: int = 0) -> Any: + """Load the embedding model.""" + if not instruct: + import sentence_transformers + + client = sentence_transformers.SentenceTransformer(model_id) + else: + from InstructorEmbedding import INSTRUCTOR + + client = INSTRUCTOR(model_id) + + if importlib.util.find_spec("torch") is not None: + import torch + + cuda_device_count = torch.cuda.device_count() + if device < -1 or (device >= cuda_device_count): + raise ValueError( + f"Got device=={device}, " + f"device is required to be within [-1, {cuda_device_count})" + ) + if device < 0 and cuda_device_count > 0: + logger.warning( + "Device has %d GPUs available. " + "Provide device={deviceId} to `from_model_id` to use available" + "GPUs for execution. deviceId is -1 for CPU and " + "can be a positive integer associated with CUDA device id.", + cuda_device_count, + ) + + client = client.to(device) + return client + + +class SelfHostedHuggingFaceEmbeddings(SelfHostedEmbeddings, BaseModel): + """Runs sentence_transformers embedding models on self-hosted remote hardware. + + Supported hardware includes auto-launched instances on AWS, GCP, Azure, + and Lambda, as well as servers specified + by IP address and SSH credentials (such as on-prem, or another cloud + like Paperspace, Coreweave, etc.). + + To use, you should have the ``runhouse`` python package installed. + + Example: + .. code-block:: python + + from langchain.embeddings import SelfHostedHuggingFaceEmbeddings + import runhouse as rh + model_name = "sentence-transformers/all-mpnet-base-v2" + gpu = rh.cluster(name="rh-a10x", instance_type="A100:1") + hf = SelfHostedHuggingFaceEmbeddings(model_name=model_name, hardware=gpu) + """ + + client: Any #: :meta private: + model_id: str = DEFAULT_MODEL_NAME + """Model name to use.""" + model_reqs: List[str] = ["./", "sentence_transformers", "torch"] + """Requirements to install on hardware to inference the model.""" + hardware: Any + """Remote hardware to send the inference function to.""" + model_load_fn: Callable = load_embedding_model + """Function to load the model remotely on the server.""" + load_fn_kwargs: Optional[dict] = None + """Key word arguments to pass to the model load function.""" + inference_fn: Callable = _embed_documents + """Inference function to extract the embeddings.""" + + def __init__(self, **kwargs: Any): + """Initialize the remote inference function.""" + load_fn_kwargs = kwargs.pop("load_fn_kwargs", {}) + load_fn_kwargs["model_id"] = load_fn_kwargs.get("model_id", DEFAULT_MODEL_NAME) + load_fn_kwargs["instruct"] = load_fn_kwargs.get("instruct", False) + load_fn_kwargs["device"] = load_fn_kwargs.get("device", 0) + super().__init__(load_fn_kwargs=load_fn_kwargs, **kwargs) + + +class SelfHostedHuggingFaceInstructEmbeddings(SelfHostedHuggingFaceEmbeddings): + """Runs InstructorEmbedding embedding models on self-hosted remote hardware. + + Supported hardware includes auto-launched instances on AWS, GCP, Azure, + and Lambda, as well as servers specified + by IP address and SSH credentials (such as on-prem, or another + cloud like Paperspace, Coreweave, etc.). + + To use, you should have the ``runhouse`` python package installed. + + Example: + .. code-block:: python + + from langchain.embeddings import SelfHostedHuggingFaceInstructEmbeddings + import runhouse as rh + model_name = "hkunlp/instructor-large" + gpu = rh.cluster(name='rh-a10x', instance_type='A100:1') + hf = SelfHostedHuggingFaceInstructEmbeddings( + model_name=model_name, hardware=gpu) + """ + + model_id: str = DEFAULT_INSTRUCT_MODEL + """Model name to use.""" + embed_instruction: str = DEFAULT_EMBED_INSTRUCTION + """Instruction to use for embedding documents.""" + query_instruction: str = DEFAULT_QUERY_INSTRUCTION + """Instruction to use for embedding query.""" + model_reqs: List[str] = ["./", "InstructorEmbedding", "torch"] + """Requirements to install on hardware to inference the model.""" + + def __init__(self, **kwargs: Any): + """Initialize the remote inference function.""" + load_fn_kwargs = kwargs.pop("load_fn_kwargs", {}) + load_fn_kwargs["model_id"] = load_fn_kwargs.get( + "model_id", DEFAULT_INSTRUCT_MODEL + ) + load_fn_kwargs["instruct"] = load_fn_kwargs.get("instruct", True) + load_fn_kwargs["device"] = load_fn_kwargs.get("device", 0) + super().__init__(load_fn_kwargs=load_fn_kwargs, **kwargs) + + def embed_documents(self, texts: List[str]) -> List[List[float]]: + """Compute doc embeddings using a HuggingFace instruct model. + + Args: + texts: The list of texts to embed. + + Returns: + List of embeddings, one for each text. + """ + instruction_pairs = [] + for text in texts: + instruction_pairs.append([self.embed_instruction, text]) + embeddings = self.client(self.pipeline_ref, instruction_pairs) + return embeddings.tolist() + + def embed_query(self, text: str) -> List[float]: + """Compute query embeddings using a HuggingFace instruct model. + + Args: + text: The text to embed. + + Returns: + Embeddings for the text. + """ + instruction_pair = [self.query_instruction, text] + embedding = self.client(self.pipeline_ref, [instruction_pair])[0] + return embedding.tolist() diff --git a/langchain/llms/__init__.py b/langchain/llms/__init__.py index 5dd8924f..6ede7c3a 100644 --- a/langchain/llms/__init__.py +++ b/langchain/llms/__init__.py @@ -14,6 +14,8 @@ from langchain.llms.nlpcloud import NLPCloud from langchain.llms.openai import AzureOpenAI, OpenAI from langchain.llms.petals import Petals from langchain.llms.promptlayer_openai import PromptLayerOpenAI +from langchain.llms.self_hosted import SelfHostedPipeline +from langchain.llms.self_hosted_hugging_face import SelfHostedHuggingFaceLLM __all__ = [ "Anthropic", @@ -28,6 +30,8 @@ __all__ = [ "HuggingFacePipeline", "AI21", "AzureOpenAI", + "SelfHostedPipeline", + "SelfHostedHuggingFaceLLM", "PromptLayerOpenAI", ] @@ -44,4 +48,6 @@ type_to_cls_dict: Dict[str, Type[BaseLLM]] = { "petals": Petals, "huggingface_pipeline": HuggingFacePipeline, "azure": AzureOpenAI, + "self_hosted": SelfHostedPipeline, + "self_hosted_hugging_face": SelfHostedHuggingFaceLLM, } diff --git a/langchain/llms/self_hosted.py b/langchain/llms/self_hosted.py new file mode 100644 index 00000000..3054329f --- /dev/null +++ b/langchain/llms/self_hosted.py @@ -0,0 +1,212 @@ +"""Run model inference on self-hosted remote hardware.""" +import importlib.util +import logging +import pickle +from typing import Any, Callable, List, Mapping, Optional + +from pydantic import BaseModel, Extra + +from langchain.llms.base import LLM +from langchain.llms.utils import enforce_stop_tokens + +logger = logging.getLogger() + + +def _generate_text( + pipeline: Any, + prompt: str, + *args: Any, + stop: Optional[List[str]] = None, + **kwargs: Any, +) -> str: + """Inference function to send to the remote hardware. + + Accepts a pipeline callable (or, more likely, + a key pointing to the model on the cluster's object store) + and returns text predictions for each document + in the batch. + """ + text = pipeline(prompt, *args, **kwargs) + if stop is not None: + text = enforce_stop_tokens(text, stop) + return text + + +def _send_pipeline_to_device(pipeline: Any, device: int) -> Any: + """Send a pipeline to a device on the cluster.""" + if isinstance(pipeline, str): + with open(pipeline, "rb") as f: + pipeline = pickle.load(f) + + if importlib.util.find_spec("torch") is not None: + import torch + + cuda_device_count = torch.cuda.device_count() + if device < -1 or (device >= cuda_device_count): + raise ValueError( + f"Got device=={device}, " + f"device is required to be within [-1, {cuda_device_count})" + ) + if device < 0 and cuda_device_count > 0: + logger.warning( + "Device has %d GPUs available. " + "Provide device={deviceId} to `from_model_id` to use available" + "GPUs for execution. deviceId is -1 for CPU and " + "can be a positive integer associated with CUDA device id.", + cuda_device_count, + ) + + pipeline.device = torch.device(device) + pipeline.model = pipeline.model.to(pipeline.device) + return pipeline + + +class SelfHostedPipeline(LLM, BaseModel): + """Run model inference on self-hosted remote hardware. + + Supported hardware includes auto-launched instances on AWS, GCP, Azure, + and Lambda, as well as servers specified + by IP address and SSH credentials (such as on-prem, or another + cloud like Paperspace, Coreweave, etc.). + + To use, you should have the ``runhouse`` python package installed. + + Example for custom pipeline and inference functions: + .. code-block:: python + + from langchain.llms import SelfHostedPipeline + from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline + import runhouse as rh + + def load_pipeline(): + tokenizer = AutoTokenizer.from_pretrained("gpt2") + model = AutoModelForCausalLM.from_pretrained("gpt2") + return pipeline( + "text-generation", model=model, tokenizer=tokenizer, + max_new_tokens=10 + ) + def inference_fn(pipeline, prompt, stop = None): + return pipeline(prompt)[0]["generated_text"] + + gpu = rh.cluster(name="rh-a10x", instance_type="A100:1") + llm = SelfHostedPipeline( + model_load_fn=load_pipeline, + hardware=gpu, + model_reqs=model_reqs, inference_fn=inference_fn + ) + Example for <2GB model (can be serialized and sent directly to the server): + .. code-block:: python + + from langchain.llms import SelfHostedPipeline + import runhouse as rh + gpu = rh.cluster(name="rh-a10x", instance_type="A100:1") + my_model = ... + llm = SelfHostedPipeline.from_pipeline( + pipeline=my_model, + hardware=gpu, + model_reqs=["./", "torch", "transformers"], + ) + Example passing model path for larger models: + .. code-block:: python + + from langchain.llms import SelfHostedPipeline + import runhouse as rh + import pickle + from transformers import pipeline + + generator = pipeline(model="gpt2") + rh.blob(pickle.dumps(generator), path="models/pipeline.pkl" + ).save().to(gpu, path="models") + llm = SelfHostedPipeline.from_pipeline( + pipeline="models/pipeline.pkl", + hardware=gpu, + model_reqs=["./", "torch", "transformers"], + ) + """ + + pipeline_ref: Any #: :meta private: + client: Any #: :meta private: + inference_fn: Callable = _generate_text #: :meta private: + """Inference function to send to the remote hardware.""" + hardware: Any + """Remote hardware to send the inference function to.""" + model_load_fn: Callable + """Function to load the model remotely on the server.""" + load_fn_kwargs: Optional[dict] = None + """Key word arguments to pass to the model load function.""" + model_reqs: List[str] = ["./", "torch"] + """Requirements to install on hardware to inference the model.""" + + class Config: + """Configuration for this pydantic object.""" + + extra = Extra.forbid + + def __init__(self, **kwargs: Any): + """Init the pipeline with an auxiliary function. + + The load function must be in global scope to be imported + and run on the server, i.e. in a module and not a REPL or closure. + Then, initialize the remote inference function. + """ + super().__init__(**kwargs) + try: + import runhouse as rh + + except ImportError: + raise ValueError( + "Could not import runhouse python package. " + "Please install it with `pip install runhouse`." + ) + + remote_load_fn = rh.function(fn=self.model_load_fn).to( + self.hardware, reqs=self.model_reqs + ) + _load_fn_kwargs = self.load_fn_kwargs or {} + self.pipeline_ref = remote_load_fn.remote(**_load_fn_kwargs) + + self.client = rh.function(fn=self.inference_fn).to( + self.hardware, reqs=self.model_reqs + ) + + @classmethod + def from_pipeline( + cls, + pipeline: Any, + hardware: Any, + model_reqs: Optional[List[str]] = None, + device: int = 0, + **kwargs: Any, + ) -> LLM: + """Init the SelfHostedPipeline from a pipeline object or string.""" + if not isinstance(pipeline, str): + logger.warning( + "Serializing pipeline to send to remote hardware. " + "Note, it can be quite slow" + "to serialize and send large models with each execution. " + "Consider sending the pipeline" + "to the cluster and passing the path to the pipeline instead." + ) + + load_fn_kwargs = {"pipeline": pipeline, "device": device} + return cls( + load_fn_kwargs=load_fn_kwargs, + model_load_fn=_send_pipeline_to_device, + hardware=hardware, + model_reqs=["transformers", "torch"] + (model_reqs or []), + **kwargs, + ) + + @property + def _identifying_params(self) -> Mapping[str, Any]: + """Get the identifying parameters.""" + return { + **{"hardware": self.hardware}, + } + + @property + def _llm_type(self) -> str: + return "self_hosted_llm" + + def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str: + return self.client(pipeline=self.pipeline_ref, prompt=prompt, stop=stop) diff --git a/langchain/llms/self_hosted_hugging_face.py b/langchain/llms/self_hosted_hugging_face.py new file mode 100644 index 00000000..9415b6ca --- /dev/null +++ b/langchain/llms/self_hosted_hugging_face.py @@ -0,0 +1,202 @@ +"""Wrapper around HuggingFace Pipeline API to run on self-hosted remote hardware.""" +import importlib.util +import logging +from typing import Any, Callable, List, Mapping, Optional + +from pydantic import BaseModel, Extra + +from langchain.llms.self_hosted import SelfHostedPipeline +from langchain.llms.utils import enforce_stop_tokens + +DEFAULT_MODEL_ID = "gpt2" +DEFAULT_TASK = "text-generation" +VALID_TASKS = ("text2text-generation", "text-generation") + +logger = logging.getLogger() + + +def _generate_text( + pipeline: Any, + prompt: str, + *args: Any, + stop: Optional[List[str]] = None, + **kwargs: Any, +) -> str: + """Inference function to send to the remote hardware. + + Accepts a Hugging Face pipeline (or more likely, + a key pointing to such a pipeline on the cluster's object store) + and returns generated text. + """ + response = pipeline(prompt, *args, **kwargs) + if pipeline.task == "text-generation": + # Text generation return includes the starter text. + text = response[0]["generated_text"][len(prompt) :] + elif pipeline.task == "text2text-generation": + text = response[0]["generated_text"] + else: + raise ValueError( + f"Got invalid task {pipeline.task}, " + f"currently only {VALID_TASKS} are supported" + ) + if stop is not None: + text = enforce_stop_tokens(text, stop) + return text + + +def _load_transformer( + model_id: str = DEFAULT_MODEL_ID, + task: str = DEFAULT_TASK, + device: int = 0, + model_kwargs: Optional[dict] = None, +) -> Any: + """Inference function to send to the remote hardware. + + Accepts a huggingface model_id and returns a pipeline for the task. + """ + from transformers import AutoModelForCausalLM, AutoModelForSeq2SeqLM, AutoTokenizer + from transformers import pipeline as hf_pipeline + + _model_kwargs = model_kwargs or {} + tokenizer = AutoTokenizer.from_pretrained(model_id, **_model_kwargs) + + try: + if task == "text-generation": + model = AutoModelForCausalLM.from_pretrained(model_id, **_model_kwargs) + elif task == "text2text-generation": + model = AutoModelForSeq2SeqLM.from_pretrained(model_id, **_model_kwargs) + else: + raise ValueError( + f"Got invalid task {task}, " + f"currently only {VALID_TASKS} are supported" + ) + except ImportError as e: + raise ValueError( + f"Could not load the {task} model due to missing dependencies." + ) from e + + if importlib.util.find_spec("torch") is not None: + import torch + + cuda_device_count = torch.cuda.device_count() + if device < -1 or (device >= cuda_device_count): + raise ValueError( + f"Got device=={device}, " + f"device is required to be within [-1, {cuda_device_count})" + ) + if device < 0 and cuda_device_count > 0: + logger.warning( + "Device has %d GPUs available. " + "Provide device={deviceId} to `from_model_id` to use available" + "GPUs for execution. deviceId is -1 for CPU and " + "can be a positive integer associated with CUDA device id.", + cuda_device_count, + ) + + pipeline = hf_pipeline( + task=task, + model=model, + tokenizer=tokenizer, + device=device, + model_kwargs=_model_kwargs, + ) + if pipeline.task not in VALID_TASKS: + raise ValueError( + f"Got invalid task {pipeline.task}, " + f"currently only {VALID_TASKS} are supported" + ) + return pipeline + + +class SelfHostedHuggingFaceLLM(SelfHostedPipeline, BaseModel): + """Wrapper around HuggingFace Pipeline API to run on self-hosted remote hardware. + + Supported hardware includes auto-launched instances on AWS, GCP, Azure, + and Lambda, as well as servers specified + by IP address and SSH credentials (such as on-prem, or another cloud + like Paperspace, Coreweave, etc.). + + To use, you should have the ``runhouse`` python package installed. + + Only supports `text-generation` and `text2text-generation` for now. + + Example using from_model_id: + .. code-block:: python + + from langchain.llms import SelfHostedHuggingFaceLLM + import runhouse as rh + gpu = rh.cluster(name="rh-a10x", instance_type="A100:1") + hf = SelfHostedHuggingFaceLLM( + model_id="google/flan-t5-large", task="text2text-generation", + hardware=gpu + ) + Example passing fn that generates a pipeline (bc the pipeline is not serializable): + .. code-block:: python + + from langchain.llms import SelfHostedHuggingFaceLLM + from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline + import runhouse as rh + + def get_pipeline(): + model_id = "gpt2" + tokenizer = AutoTokenizer.from_pretrained(model_id) + model = AutoModelForCausalLM.from_pretrained(model_id) + pipe = pipeline( + "text-generation", model=model, tokenizer=tokenizer + ) + return pipe + hf = SelfHostedHuggingFaceLLM( + model_load_fn=get_pipeline, model_id="gpt2", hardware=gpu) + """ + + model_id: str = DEFAULT_MODEL_ID + """Hugging Face model_id to load the model.""" + task: str = DEFAULT_TASK + """Hugging Face task (either "text-generation" or "text2text-generation").""" + device: int = 0 + """Device to use for inference. -1 for CPU, 0 for GPU, 1 for second GPU, etc.""" + model_kwargs: Optional[dict] = None + """Key word arguments to pass to the model.""" + hardware: Any + """Remote hardware to send the inference function to.""" + model_reqs: List[str] = ["./", "transformers", "torch"] + """Requirements to install on hardware to inference the model.""" + model_load_fn: Callable = _load_transformer + """Function to load the model remotely on the server.""" + inference_fn: Callable = _generate_text #: :meta private: + """Inference function to send to the remote hardware.""" + + class Config: + """Configuration for this pydantic object.""" + + extra = Extra.forbid + + def __init__(self, **kwargs: Any): + """Construct the pipeline remotely using an auxiliary function. + + The load function needs to be importable to be imported + and run on the server, i.e. in a module and not a REPL or closure. + Then, initialize the remote inference function. + """ + load_fn_kwargs = { + "model_id": kwargs.get("model_id", DEFAULT_MODEL_ID), + "task": kwargs.get("task", DEFAULT_TASK), + "device": kwargs.get("device", 0), + "model_kwargs": kwargs.get("model_kwargs", None), + } + super().__init__(load_fn_kwargs=load_fn_kwargs, **kwargs) + + @property + def _identifying_params(self) -> Mapping[str, Any]: + """Get the identifying parameters.""" + return { + **{"model_id": self.model_id}, + **{"model_kwargs": self.model_kwargs}, + } + + @property + def _llm_type(self) -> str: + return "selfhosted_huggingface_pipeline" + + def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str: + return self.client(pipeline=self.pipeline_ref, prompt=prompt, stop=stop) diff --git a/tests/integration_tests/embeddings/test_self_hosted.py b/tests/integration_tests/embeddings/test_self_hosted.py new file mode 100644 index 00000000..055f7343 --- /dev/null +++ b/tests/integration_tests/embeddings/test_self_hosted.py @@ -0,0 +1,96 @@ +"""Test self-hosted embeddings.""" +from typing import Any + +from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline + +from langchain.embeddings import ( + SelfHostedEmbeddings, + SelfHostedHuggingFaceEmbeddings, + SelfHostedHuggingFaceInstructEmbeddings, +) + + +def get_remote_instance() -> Any: + """Get remote instance for testing.""" + import runhouse as rh + + gpu = rh.cluster(name="rh-a10x", instance_type="A100:1", use_spot=False) + gpu.install_packages(["pip:./"]) + return gpu + + +def test_self_hosted_huggingface_embedding_documents() -> None: + """Test self-hosted huggingface embeddings.""" + documents = ["foo bar"] + gpu = get_remote_instance() + embedding = SelfHostedHuggingFaceEmbeddings(hardware=gpu) + output = embedding.embed_documents(documents) + assert len(output) == 1 + assert len(output[0]) == 768 + + +def test_self_hosted_huggingface_embedding_query() -> None: + """Test self-hosted huggingface embeddings.""" + document = "foo bar" + gpu = get_remote_instance() + embedding = SelfHostedHuggingFaceEmbeddings(hardware=gpu) + output = embedding.embed_query(document) + assert len(output) == 768 + + +def test_self_hosted_huggingface_instructor_embedding_documents() -> None: + """Test self-hosted huggingface instruct embeddings.""" + documents = ["foo bar"] + gpu = get_remote_instance() + embedding = SelfHostedHuggingFaceInstructEmbeddings(hardware=gpu) + output = embedding.embed_documents(documents) + assert len(output) == 1 + assert len(output[0]) == 768 + + +def test_self_hosted_huggingface_instructor_embedding_query() -> None: + """Test self-hosted huggingface instruct embeddings.""" + query = "foo bar" + gpu = get_remote_instance() + embedding = SelfHostedHuggingFaceInstructEmbeddings(hardware=gpu) + output = embedding.embed_query(query) + assert len(output) == 768 + + +def get_pipeline() -> Any: + """Get pipeline for testing.""" + model_id = "facebook/bart-base" + tokenizer = AutoTokenizer.from_pretrained(model_id) + model = AutoModelForCausalLM.from_pretrained(model_id) + return pipeline("feature-extraction", model=model, tokenizer=tokenizer) + + +def inference_fn(pipeline: Any, prompt: str) -> Any: + """Inference function for testing.""" + # Return last hidden state of the model + if isinstance(prompt, list): + return [emb[0][-1] for emb in pipeline(prompt)] + return pipeline(prompt)[0][-1] + + +def test_self_hosted_embedding_documents() -> None: + """Test self-hosted huggingface instruct embeddings.""" + documents = ["foo bar"] * 2 + gpu = get_remote_instance() + embedding = SelfHostedEmbeddings( + model_load_fn=get_pipeline, hardware=gpu, inference_fn=inference_fn + ) + output = embedding.embed_documents(documents) + assert len(output) == 2 + assert len(output[0]) == 50265 + + +def test_self_hosted_embedding_query() -> None: + """Test self-hosted custom embeddings.""" + query = "foo bar" + gpu = get_remote_instance() + embedding = SelfHostedEmbeddings( + model_load_fn=get_pipeline, hardware=gpu, inference_fn=inference_fn + ) + output = embedding.embed_query(query) + assert len(output) == 50265 diff --git a/tests/integration_tests/llms/test_self_hosted_llm.py b/tests/integration_tests/llms/test_self_hosted_llm.py new file mode 100644 index 00000000..0dc753ab --- /dev/null +++ b/tests/integration_tests/llms/test_self_hosted_llm.py @@ -0,0 +1,105 @@ +"""Test Self-hosted LLMs.""" +import pickle +from typing import Any, List, Optional + +from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline + +from langchain.llms import SelfHostedHuggingFaceLLM, SelfHostedPipeline + +model_reqs = ["pip:./", "transformers", "torch"] + + +def get_remote_instance() -> Any: + """Get remote instance for testing.""" + import runhouse as rh + + return rh.cluster(name="rh-a10x", instance_type="A100:1", use_spot=False) + + +def test_self_hosted_huggingface_pipeline_text_generation() -> None: + """Test valid call to self-hosted HuggingFace text generation model.""" + gpu = get_remote_instance() + llm = SelfHostedHuggingFaceLLM( + model_id="gpt2", + task="text-generation", + model_kwargs={"n_positions": 1024}, + hardware=gpu, + model_reqs=model_reqs, + ) + output = llm("Say foo:") # type: ignore + assert isinstance(output, str) + + +def test_self_hosted_huggingface_pipeline_text2text_generation() -> None: + """Test valid call to self-hosted HuggingFace text2text generation model.""" + gpu = get_remote_instance() + llm = SelfHostedHuggingFaceLLM( + model_id="google/flan-t5-small", + task="text2text-generation", + hardware=gpu, + model_reqs=model_reqs, + ) + output = llm("Say foo:") # type: ignore + assert isinstance(output, str) + + +def load_pipeline() -> Any: + """Load pipeline for testing.""" + model_id = "gpt2" + tokenizer = AutoTokenizer.from_pretrained(model_id) + model = AutoModelForCausalLM.from_pretrained(model_id) + pipe = pipeline( + "text-generation", model=model, tokenizer=tokenizer, max_new_tokens=10 + ) + return pipe + + +def inference_fn(pipeline: Any, prompt: str, stop: Optional[List[str]] = None) -> str: + """Inference function for testing.""" + return pipeline(prompt)[0]["generated_text"] + + +def test_init_with_local_pipeline() -> None: + """Test initialization with a self-hosted HF pipeline.""" + gpu = get_remote_instance() + pipeline = load_pipeline() + llm = SelfHostedPipeline.from_pipeline( + pipeline=pipeline, + hardware=gpu, + model_reqs=model_reqs, + inference_fn=inference_fn, + ) + output = llm("Say foo:") # type: ignore + assert isinstance(output, str) + + +def test_init_with_pipeline_path() -> None: + """Test initialization with a self-hosted HF pipeline.""" + gpu = get_remote_instance() + pipeline = load_pipeline() + import runhouse as rh + + rh.blob(pickle.dumps(pipeline), path="models/pipeline.pkl").save().to( + gpu, path="models" + ) + llm = SelfHostedPipeline.from_pipeline( + pipeline="models/pipeline.pkl", + hardware=gpu, + model_reqs=model_reqs, + inference_fn=inference_fn, + ) + output = llm("Say foo:") # type: ignore + assert isinstance(output, str) + + +def test_init_with_pipeline_fn() -> None: + """Test initialization with a self-hosted HF pipeline.""" + gpu = get_remote_instance() + llm = SelfHostedPipeline( + model_load_fn=load_pipeline, + hardware=gpu, + model_reqs=model_reqs, + inference_fn=inference_fn, + ) + output = llm("Say foo:") # type: ignore + assert isinstance(output, str)