# Beam

Calls the Beam API wrapper to deploy and make subsequent calls to an
instance of the gpt2 LLM in a cloud deployment. Requires installation of
the Beam library and registration of Beam Client ID and Client Secret.
Additional calls can then be made through the instance of the large
language model in your code or by calling the Beam API.

---------

Co-authored-by: Dev 2049 <dev.dev2049@gmail.com>
searx_updates
Nolan Tremelling 12 months ago committed by GitHub
parent c81fb88035
commit faa26650c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,92 @@
# Beam
This page covers how to use Beam within LangChain.
It is broken into two parts: installation and setup, and then references to specific Beam wrappers.
## Installation and Setup
- [Create an account](https://www.beam.cloud/)
- Install the Beam CLI with `curl https://raw.githubusercontent.com/slai-labs/get-beam/main/get-beam.sh -sSfL | sh`
- Register API keys with `beam configure`
- Set environment variables (`BEAM_CLIENT_ID`) and (`BEAM_CLIENT_SECRET`)
- Install the Beam SDK `pip install beam-sdk`
## Wrappers
### LLM
There exists a Beam LLM wrapper, which you can access with
```python
from langchain.llms.beam import Beam
```
## Define your Beam app.
This is the environment youll be developing against once you start the app.
It's also used to define the maximum response length from the model.
```python
llm = Beam(model_name="gpt2",
name="langchain-gpt2-test",
cpu=8,
memory="32Gi",
gpu="A10G",
python_version="python3.8",
python_packages=[
"diffusers[torch]>=0.10",
"transformers",
"torch",
"pillow",
"accelerate",
"safetensors",
"xformers",],
max_length="50",
verbose=False)
```
## Deploy your Beam app
Once defined, you can deploy your Beam app by calling your model's `_deploy()` method.
```python
llm._deploy()
```
## Call your Beam app
Once a beam model is deployed, it can be called by callying your model's `_call()` method.
This returns the GPT2 text response to your prompt.
```python
response = llm._call("Running machine learning on a remote GPU")
```
An example script which deploys the model and calls it would be:
```python
from langchain.llms.beam import Beam
import time
llm = Beam(model_name="gpt2",
name="langchain-gpt2-test",
cpu=8,
memory="32Gi",
gpu="A10G",
python_version="python3.8",
python_packages=[
"diffusers[torch]>=0.10",
"transformers",
"torch",
"pillow",
"accelerate",
"safetensors",
"xformers",],
max_length="50",
verbose=False)
llm._deploy()
response = llm._call("Running machine learning on a remote GPU")
print(response)
```

@ -0,0 +1,159 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "J-yvaDTmTTza"
},
"source": [
"# Beam integration for langchain\n",
"\n",
"Calls the Beam API wrapper to deploy and make subsequent calls to an instance of the gpt2 LLM in a cloud deployment. Requires installation of the Beam library and registration of Beam Client ID and Client Secret. By calling the wrapper an instance of the model is created and run, with returned text relating to the prompt. Additional calls can then be made by directly calling the Beam API.\n",
"\n",
"[Create an account](https://www.beam.cloud/), if you don't have one already. Grab your API keys from the [dashboard](https://www.beam.cloud/dashboard/settings/api-keys)."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "CfTmesWtTfTS"
},
"source": [
"Install the Beam CLI"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "G_tCCurqR7Ik"
},
"outputs": [],
"source": [
"!curl https://raw.githubusercontent.com/slai-labs/get-beam/main/get-beam.sh -sSfL | sh"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "jJkcNqOdThQ7"
},
"source": [
"Register API Keys and set your beam client id and secret environment variables:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "7gQd6fszSEaH"
},
"outputs": [],
"source": [
"import os\n",
"import subprocess\n",
"\n",
"beam_client_id = \"<Your beam client id>\"\n",
"beam_client_secret = \"<Your beam client secret>\"\n",
"\n",
"# Set the environment variables\n",
"os.environ['BEAM_CLIENT_ID'] = beam_client_id\n",
"os.environ['BEAM_CLIENT_SECRET'] = beam_client_secret\n",
"\n",
"# Run the beam configure command\n",
"!beam configure --clientId={beam_client_id} --clientSecret={beam_client_secret}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "c20rkK18TrK2"
},
"source": [
"Install the Beam SDK:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "CH2Vop6ISNIf"
},
"outputs": [],
"source": [
"!pip install beam-sdk"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "XflOsp3bTwl1"
},
"source": [
"**Deploy and call Beam directly from langchain!**\n",
"\n",
"Note that a cold start might take a couple of minutes to return the response, but subsequent calls will be faster!"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "KmaHxUqbSVnh"
},
"outputs": [],
"source": [
"from langchain.llms.beam import Beam\n",
"\n",
"llm = Beam(model_name=\"gpt2\",\n",
" name=\"langchain-gpt2-test\",\n",
" cpu=8,\n",
" memory=\"32Gi\",\n",
" gpu=\"A10G\",\n",
" python_version=\"python3.8\",\n",
" python_packages=[\n",
" \"diffusers[torch]>=0.10\",\n",
" \"transformers\",\n",
" \"torch\",\n",
" \"pillow\",\n",
" \"accelerate\",\n",
" \"safetensors\",\n",
" \"xformers\",],\n",
" max_length=\"50\",\n",
" verbose=False)\n",
"\n",
"llm._deploy()\n",
"\n",
"response = llm._call(\"Running machine learning on a remote GPU\")\n",
"\n",
"print(response)"
]
}
],
"metadata": {
"colab": {
"private_outputs": true,
"provenance": []
},
"gpuClass": "standard",
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.3"
}
},
"nbformat": 4,
"nbformat_minor": 1
}

@ -7,6 +7,7 @@ from langchain.llms.anthropic import Anthropic
from langchain.llms.anyscale import Anyscale
from langchain.llms.bananadev import Banana
from langchain.llms.base import BaseLLM
from langchain.llms.beam import Beam
from langchain.llms.cerebriumai import CerebriumAI
from langchain.llms.cohere import Cohere
from langchain.llms.deepinfra import DeepInfra
@ -43,6 +44,7 @@ __all__ = [
"AlephAlpha",
"Anyscale",
"Banana",
"Beam",
"CerebriumAI",
"Cohere",
"DeepInfra",
@ -85,6 +87,7 @@ type_to_cls_dict: Dict[str, Type[BaseLLM]] = {
"anthropic": Anthropic,
"anyscale": Anyscale,
"bananadev": Banana,
"beam": Beam,
"cerebriumai": CerebriumAI,
"cohere": Cohere,
"deepinfra": DeepInfra,

@ -0,0 +1,268 @@
"""Wrapper around Beam API."""
import base64
import json
import logging
import subprocess
import textwrap
import time
from typing import Any, Dict, List, Mapping, Optional
import requests
from pydantic import Extra, Field, root_validator
from langchain.callbacks.manager import CallbackManagerForLLMRun
from langchain.llms.base import LLM
from langchain.utils import get_from_dict_or_env
logger = logging.getLogger(__name__)
DEFAULT_NUM_TRIES = 10
DEFAULT_SLEEP_TIME = 4
class Beam(LLM):
"""Wrapper around Beam API for gpt2 large language model.
To use, you should have the ``beam-sdk`` python package installed,
and the environment variable ``BEAM_CLIENT_ID`` set with your client id
and ``BEAM_CLIENT_SECRET`` set with your client secret. Information on how
to get these is available here: https://docs.beam.cloud/account/api-keys.
The wrapper can then be called as follows, where the name, cpu, memory, gpu,
python version, and python packages can be updated accordingly. Once deployed,
the instance can be called.
llm = Beam(model_name="gpt2",
name="langchain-gpt2",
cpu=8,
memory="32Gi",
gpu="A10G",
python_version="python3.8",
python_packages=[
"diffusers[torch]>=0.10",
"transformers",
"torch",
"pillow",
"accelerate",
"safetensors",
"xformers",],
max_length=50)
llm._deploy()
call_result = llm._call(input)
"""
model_name: str = ""
name: str = ""
cpu: str = ""
memory: str = ""
gpu: str = ""
python_version: str = ""
python_packages: List[str] = []
max_length: str = ""
url: str = ""
"""model endpoint to use"""
model_kwargs: Dict[str, Any] = Field(default_factory=dict)
"""Holds any model parameters valid for `create` call not
explicitly specified."""
beam_client_id: str = ""
beam_client_secret: str = ""
app_id: Optional[str] = None
class Config:
"""Configuration for this pydantic config."""
extra = Extra.forbid
@root_validator(pre=True)
def build_extra(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Build extra kwargs from additional params that were passed in."""
all_required_field_names = {field.alias for field in cls.__fields__.values()}
extra = values.get("model_kwargs", {})
for field_name in list(values):
if field_name not in all_required_field_names:
if field_name in extra:
raise ValueError(f"Found {field_name} supplied twice.")
logger.warning(
f"""{field_name} was transfered to model_kwargs.
Please confirm that {field_name} is what you intended."""
)
extra[field_name] = values.pop(field_name)
values["model_kwargs"] = extra
return values
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""Validate that api key and python package exists in environment."""
beam_client_id = get_from_dict_or_env(
values, "beam_client_id", "BEAM_CLIENT_ID"
)
beam_client_secret = get_from_dict_or_env(
values, "beam_client_secret", "BEAM_CLIENT_SECRET"
)
values["beam_client_id"] = beam_client_id
values["beam_client_secret"] = beam_client_secret
return values
@property
def _identifying_params(self) -> Mapping[str, Any]:
"""Get the identifying parameters."""
return {
"model_name": self.model_name,
"name": self.name,
"cpu": self.cpu,
"memory": self.memory,
"gpu": self.gpu,
"python_version": self.python_version,
"python_packages": self.python_packages,
"max_length": self.max_length,
"model_kwargs": self.model_kwargs,
}
@property
def _llm_type(self) -> str:
"""Return type of llm."""
return "beam"
def app_creation(self) -> None:
"""Creates a Python file which will contain your Beam app definition."""
script = textwrap.dedent(
"""\
import beam
# The environment your code will run on
app = beam.App(
name="{name}",
cpu={cpu},
memory="{memory}",
gpu="{gpu}",
python_version="{python_version}",
python_packages={python_packages},
)
app.Trigger.RestAPI(
inputs={{"prompt": beam.Types.String(), "max_length": beam.Types.String()}},
outputs={{"text": beam.Types.String()}},
handler="run.py:beam_langchain",
)
"""
)
script_name = "app.py"
with open(script_name, "w") as file:
file.write(
script.format(
name=self.name,
cpu=self.cpu,
memory=self.memory,
gpu=self.gpu,
python_version=self.python_version,
python_packages=self.python_packages,
)
)
def run_creation(self) -> None:
"""Creates a Python file which will be deployed on beam."""
script = textwrap.dedent(
"""
import os
import transformers
from transformers import GPT2LMHeadModel, GPT2Tokenizer
model_name = "{model_name}"
def beam_langchain(**inputs):
prompt = inputs["prompt"]
length = inputs["max_length"]
tokenizer = GPT2Tokenizer.from_pretrained(model_name)
model = GPT2LMHeadModel.from_pretrained(model_name)
encodedPrompt = tokenizer.encode(prompt, return_tensors='pt')
outputs = model.generate(encodedPrompt, max_length=int(length),
do_sample=True, pad_token_id=tokenizer.eos_token_id)
output = tokenizer.decode(outputs[0], skip_special_tokens=True)
print(output)
return {{"text": output}}
"""
)
script_name = "run.py"
with open(script_name, "w") as file:
file.write(script.format(model_name=self.model_name))
def _deploy(self) -> str:
"""Call to Beam."""
try:
import beam # type: ignore
if beam.__path__ == "":
raise ImportError
except ImportError:
raise ImportError(
"Could not import beam python package. "
"Please install it with `curl "
"https://raw.githubusercontent.com/slai-labs"
"/get-beam/main/get-beam.sh -sSfL | sh`."
)
self.app_creation()
self.run_creation()
process = subprocess.run(
"beam deploy app.py", shell=True, capture_output=True, text=True
)
if process.returncode == 0:
output = process.stdout
logger.info(output)
lines = output.split("\n")
for line in lines:
if line.startswith(" i Send requests to: https://apps.beam.cloud/"):
self.app_id = line.split("/")[-1]
self.url = line.split(":")[1].strip()
return self.app_id
raise ValueError(
f"""Failed to retrieve the appID from the deployment output.
Deployment output: {output}"""
)
else:
raise ValueError(f"Deployment failed. Error: {process.stderr}")
@property
def authorization(self) -> str:
if self.beam_client_id:
credential_str = self.beam_client_id + ":" + self.beam_client_secret
else:
credential_str = self.beam_client_secret
return base64.b64encode(credential_str.encode()).decode()
def _call(
self,
prompt: str,
stop: Optional[list] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
) -> str:
"""Call to Beam."""
url = "https://apps.beam.cloud/" + self.app_id if self.app_id else self.url
payload = {"prompt": prompt, "max_length": self.max_length}
headers = {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Authorization": "Basic " + self.authorization,
"Connection": "keep-alive",
"Content-Type": "application/json",
}
for _ in range(DEFAULT_NUM_TRIES):
request = requests.post(url, headers=headers, data=json.dumps(payload))
if request.status_code == 200:
return request.json()["text"]
time.sleep(DEFAULT_SLEEP_TIME)
logger.warning("Unable to successfully call model.")
return ""

@ -0,0 +1,29 @@
"""Test Beam API wrapper."""
from langchain.llms.beam import Beam
def test_beam_call() -> None:
"""Test valid call to Beam."""
llm = Beam(
model_name="gpt2",
name="langchain-gpt2",
cpu=8,
memory="32Gi",
gpu="A10G",
python_version="python3.8",
python_packages=[
"diffusers[torch]>=0.10",
"transformers",
"torch",
"pillow",
"accelerate",
"safetensors",
"xformers",
],
max_length="5",
)
llm._deploy()
output = llm._call("Your prompt goes here")
assert isinstance(output, str)
Loading…
Cancel
Save