community[patch]: Support SerDe transform functions in Databricks LLM (#16752)
**Description:** Databricks LLM does not support SerDe the
transform_input_fn and transform_output_fn. After saving and loading,
the LLM will be broken. This PR serialize these functions into a hex
string using pickle, and saving the hex string in the yaml file. Using
pickle to serialize a function can be flaky, but this is a simple
workaround that unblocks many use cases. If more sophisticated SerDe is
needed, we can improve it later.
Test:
Added a simple unit test.
I did manual test on Databricks and it works well.
The saved yaml looks like:
```
llm:
_type: databricks
cluster_driver_port: null
cluster_id: null
databricks_uri: databricks
endpoint_name: databricks-mixtral-8x7b-instruct
extra_params: {}
host: e2-dogfood.staging.cloud.databricks.com
max_tokens: null
model_kwargs: null
n: 1
stop: null
task: null
temperature: 0.0
transform_input_fn: 80049520000000000000008c085f5f6d61696e5f5f948c0f7472616e73666f726d5f696e7075749493942e
transform_output_fn: null
```
@baskaryan
```python
from langchain_community.embeddings import DatabricksEmbeddings
from langchain_community.llms import Databricks
from langchain.chains import RetrievalQA
from langchain.document_loaders import TextLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS
import mlflow
embeddings = DatabricksEmbeddings(endpoint="databricks-bge-large-en")
def transform_input(**request):
request["messages"] = [
{
"role": "user",
"content": request["prompt"]
}
]
del request["prompt"]
return request
llm = Databricks(endpoint_name="databricks-mixtral-8x7b-instruct", transform_input_fn=transform_input)
persist_dir = "faiss_databricks_embedding"
# Create the vector db, persist the db to a local fs folder
loader = TextLoader("state_of_the_union.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)
db = FAISS.from_documents(docs, embeddings)
db.save_local(persist_dir)
def load_retriever(persist_directory):
embeddings = DatabricksEmbeddings(endpoint="databricks-bge-large-en")
vectorstore = FAISS.load_local(persist_directory, embeddings)
return vectorstore.as_retriever()
retriever = load_retriever(persist_dir)
retrievalQA = RetrievalQA.from_llm(llm=llm, retriever=retriever)
with mlflow.start_run() as run:
logged_model = mlflow.langchain.log_model(
retrievalQA,
artifact_path="retrieval_qa",
loader_fn=load_retriever,
persist_dir=persist_dir,
)
# Load the retrievalQA chain
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
print(loaded_model.predict([{"query": "What did the president say about Ketanji Brown Jackson"}]))
```
2024-02-08 21:09:50 +00:00
|
|
|
"""test Databricks LLM"""
|
|
|
|
from typing import Any, Dict
|
|
|
|
|
2024-03-06 02:04:45 +00:00
|
|
|
import pytest
|
community[patch]: Support SerDe transform functions in Databricks LLM (#16752)
**Description:** Databricks LLM does not support SerDe the
transform_input_fn and transform_output_fn. After saving and loading,
the LLM will be broken. This PR serialize these functions into a hex
string using pickle, and saving the hex string in the yaml file. Using
pickle to serialize a function can be flaky, but this is a simple
workaround that unblocks many use cases. If more sophisticated SerDe is
needed, we can improve it later.
Test:
Added a simple unit test.
I did manual test on Databricks and it works well.
The saved yaml looks like:
```
llm:
_type: databricks
cluster_driver_port: null
cluster_id: null
databricks_uri: databricks
endpoint_name: databricks-mixtral-8x7b-instruct
extra_params: {}
host: e2-dogfood.staging.cloud.databricks.com
max_tokens: null
model_kwargs: null
n: 1
stop: null
task: null
temperature: 0.0
transform_input_fn: 80049520000000000000008c085f5f6d61696e5f5f948c0f7472616e73666f726d5f696e7075749493942e
transform_output_fn: null
```
@baskaryan
```python
from langchain_community.embeddings import DatabricksEmbeddings
from langchain_community.llms import Databricks
from langchain.chains import RetrievalQA
from langchain.document_loaders import TextLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS
import mlflow
embeddings = DatabricksEmbeddings(endpoint="databricks-bge-large-en")
def transform_input(**request):
request["messages"] = [
{
"role": "user",
"content": request["prompt"]
}
]
del request["prompt"]
return request
llm = Databricks(endpoint_name="databricks-mixtral-8x7b-instruct", transform_input_fn=transform_input)
persist_dir = "faiss_databricks_embedding"
# Create the vector db, persist the db to a local fs folder
loader = TextLoader("state_of_the_union.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)
db = FAISS.from_documents(docs, embeddings)
db.save_local(persist_dir)
def load_retriever(persist_directory):
embeddings = DatabricksEmbeddings(endpoint="databricks-bge-large-en")
vectorstore = FAISS.load_local(persist_directory, embeddings)
return vectorstore.as_retriever()
retriever = load_retriever(persist_dir)
retrievalQA = RetrievalQA.from_llm(llm=llm, retriever=retriever)
with mlflow.start_run() as run:
logged_model = mlflow.langchain.log_model(
retrievalQA,
artifact_path="retrieval_qa",
loader_fn=load_retriever,
persist_dir=persist_dir,
)
# Load the retrievalQA chain
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
print(loaded_model.predict([{"query": "What did the president say about Ketanji Brown Jackson"}]))
```
2024-02-08 21:09:50 +00:00
|
|
|
from pytest import MonkeyPatch
|
|
|
|
|
2024-03-06 02:04:45 +00:00
|
|
|
from langchain_community.llms.databricks import (
|
|
|
|
Databricks,
|
|
|
|
_load_pickled_fn_from_hex_string,
|
|
|
|
)
|
community[patch]: Support SerDe transform functions in Databricks LLM (#16752)
**Description:** Databricks LLM does not support SerDe the
transform_input_fn and transform_output_fn. After saving and loading,
the LLM will be broken. This PR serialize these functions into a hex
string using pickle, and saving the hex string in the yaml file. Using
pickle to serialize a function can be flaky, but this is a simple
workaround that unblocks many use cases. If more sophisticated SerDe is
needed, we can improve it later.
Test:
Added a simple unit test.
I did manual test on Databricks and it works well.
The saved yaml looks like:
```
llm:
_type: databricks
cluster_driver_port: null
cluster_id: null
databricks_uri: databricks
endpoint_name: databricks-mixtral-8x7b-instruct
extra_params: {}
host: e2-dogfood.staging.cloud.databricks.com
max_tokens: null
model_kwargs: null
n: 1
stop: null
task: null
temperature: 0.0
transform_input_fn: 80049520000000000000008c085f5f6d61696e5f5f948c0f7472616e73666f726d5f696e7075749493942e
transform_output_fn: null
```
@baskaryan
```python
from langchain_community.embeddings import DatabricksEmbeddings
from langchain_community.llms import Databricks
from langchain.chains import RetrievalQA
from langchain.document_loaders import TextLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS
import mlflow
embeddings = DatabricksEmbeddings(endpoint="databricks-bge-large-en")
def transform_input(**request):
request["messages"] = [
{
"role": "user",
"content": request["prompt"]
}
]
del request["prompt"]
return request
llm = Databricks(endpoint_name="databricks-mixtral-8x7b-instruct", transform_input_fn=transform_input)
persist_dir = "faiss_databricks_embedding"
# Create the vector db, persist the db to a local fs folder
loader = TextLoader("state_of_the_union.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)
db = FAISS.from_documents(docs, embeddings)
db.save_local(persist_dir)
def load_retriever(persist_directory):
embeddings = DatabricksEmbeddings(endpoint="databricks-bge-large-en")
vectorstore = FAISS.load_local(persist_directory, embeddings)
return vectorstore.as_retriever()
retriever = load_retriever(persist_dir)
retrievalQA = RetrievalQA.from_llm(llm=llm, retriever=retriever)
with mlflow.start_run() as run:
logged_model = mlflow.langchain.log_model(
retrievalQA,
artifact_path="retrieval_qa",
loader_fn=load_retriever,
persist_dir=persist_dir,
)
# Load the retrievalQA chain
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
print(loaded_model.predict([{"query": "What did the president say about Ketanji Brown Jackson"}]))
```
2024-02-08 21:09:50 +00:00
|
|
|
|
|
|
|
|
|
|
|
class MockDatabricksServingEndpointClient:
|
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
host: str,
|
|
|
|
api_token: str,
|
|
|
|
endpoint_name: str,
|
|
|
|
databricks_uri: str,
|
|
|
|
task: str,
|
|
|
|
):
|
|
|
|
self.host = host
|
|
|
|
self.api_token = api_token
|
|
|
|
self.endpoint_name = endpoint_name
|
|
|
|
self.databricks_uri = databricks_uri
|
|
|
|
self.task = task
|
|
|
|
|
|
|
|
|
|
|
|
def transform_input(**request: Any) -> Dict[str, Any]:
|
|
|
|
request["messages"] = [{"role": "user", "content": request["prompt"]}]
|
|
|
|
del request["prompt"]
|
|
|
|
return request
|
|
|
|
|
|
|
|
|
2024-03-06 02:04:45 +00:00
|
|
|
@pytest.mark.requires("cloudpickle")
|
community[patch]: Support SerDe transform functions in Databricks LLM (#16752)
**Description:** Databricks LLM does not support SerDe the
transform_input_fn and transform_output_fn. After saving and loading,
the LLM will be broken. This PR serialize these functions into a hex
string using pickle, and saving the hex string in the yaml file. Using
pickle to serialize a function can be flaky, but this is a simple
workaround that unblocks many use cases. If more sophisticated SerDe is
needed, we can improve it later.
Test:
Added a simple unit test.
I did manual test on Databricks and it works well.
The saved yaml looks like:
```
llm:
_type: databricks
cluster_driver_port: null
cluster_id: null
databricks_uri: databricks
endpoint_name: databricks-mixtral-8x7b-instruct
extra_params: {}
host: e2-dogfood.staging.cloud.databricks.com
max_tokens: null
model_kwargs: null
n: 1
stop: null
task: null
temperature: 0.0
transform_input_fn: 80049520000000000000008c085f5f6d61696e5f5f948c0f7472616e73666f726d5f696e7075749493942e
transform_output_fn: null
```
@baskaryan
```python
from langchain_community.embeddings import DatabricksEmbeddings
from langchain_community.llms import Databricks
from langchain.chains import RetrievalQA
from langchain.document_loaders import TextLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS
import mlflow
embeddings = DatabricksEmbeddings(endpoint="databricks-bge-large-en")
def transform_input(**request):
request["messages"] = [
{
"role": "user",
"content": request["prompt"]
}
]
del request["prompt"]
return request
llm = Databricks(endpoint_name="databricks-mixtral-8x7b-instruct", transform_input_fn=transform_input)
persist_dir = "faiss_databricks_embedding"
# Create the vector db, persist the db to a local fs folder
loader = TextLoader("state_of_the_union.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)
db = FAISS.from_documents(docs, embeddings)
db.save_local(persist_dir)
def load_retriever(persist_directory):
embeddings = DatabricksEmbeddings(endpoint="databricks-bge-large-en")
vectorstore = FAISS.load_local(persist_directory, embeddings)
return vectorstore.as_retriever()
retriever = load_retriever(persist_dir)
retrievalQA = RetrievalQA.from_llm(llm=llm, retriever=retriever)
with mlflow.start_run() as run:
logged_model = mlflow.langchain.log_model(
retrievalQA,
artifact_path="retrieval_qa",
loader_fn=load_retriever,
persist_dir=persist_dir,
)
# Load the retrievalQA chain
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
print(loaded_model.predict([{"query": "What did the president say about Ketanji Brown Jackson"}]))
```
2024-02-08 21:09:50 +00:00
|
|
|
def test_serde_transform_input_fn(monkeypatch: MonkeyPatch) -> None:
|
2024-03-06 02:04:45 +00:00
|
|
|
import cloudpickle
|
|
|
|
|
community[patch]: Support SerDe transform functions in Databricks LLM (#16752)
**Description:** Databricks LLM does not support SerDe the
transform_input_fn and transform_output_fn. After saving and loading,
the LLM will be broken. This PR serialize these functions into a hex
string using pickle, and saving the hex string in the yaml file. Using
pickle to serialize a function can be flaky, but this is a simple
workaround that unblocks many use cases. If more sophisticated SerDe is
needed, we can improve it later.
Test:
Added a simple unit test.
I did manual test on Databricks and it works well.
The saved yaml looks like:
```
llm:
_type: databricks
cluster_driver_port: null
cluster_id: null
databricks_uri: databricks
endpoint_name: databricks-mixtral-8x7b-instruct
extra_params: {}
host: e2-dogfood.staging.cloud.databricks.com
max_tokens: null
model_kwargs: null
n: 1
stop: null
task: null
temperature: 0.0
transform_input_fn: 80049520000000000000008c085f5f6d61696e5f5f948c0f7472616e73666f726d5f696e7075749493942e
transform_output_fn: null
```
@baskaryan
```python
from langchain_community.embeddings import DatabricksEmbeddings
from langchain_community.llms import Databricks
from langchain.chains import RetrievalQA
from langchain.document_loaders import TextLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS
import mlflow
embeddings = DatabricksEmbeddings(endpoint="databricks-bge-large-en")
def transform_input(**request):
request["messages"] = [
{
"role": "user",
"content": request["prompt"]
}
]
del request["prompt"]
return request
llm = Databricks(endpoint_name="databricks-mixtral-8x7b-instruct", transform_input_fn=transform_input)
persist_dir = "faiss_databricks_embedding"
# Create the vector db, persist the db to a local fs folder
loader = TextLoader("state_of_the_union.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)
db = FAISS.from_documents(docs, embeddings)
db.save_local(persist_dir)
def load_retriever(persist_directory):
embeddings = DatabricksEmbeddings(endpoint="databricks-bge-large-en")
vectorstore = FAISS.load_local(persist_directory, embeddings)
return vectorstore.as_retriever()
retriever = load_retriever(persist_dir)
retrievalQA = RetrievalQA.from_llm(llm=llm, retriever=retriever)
with mlflow.start_run() as run:
logged_model = mlflow.langchain.log_model(
retrievalQA,
artifact_path="retrieval_qa",
loader_fn=load_retriever,
persist_dir=persist_dir,
)
# Load the retrievalQA chain
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
print(loaded_model.predict([{"query": "What did the president say about Ketanji Brown Jackson"}]))
```
2024-02-08 21:09:50 +00:00
|
|
|
monkeypatch.setattr(
|
|
|
|
"langchain_community.llms.databricks._DatabricksServingEndpointClient",
|
|
|
|
MockDatabricksServingEndpointClient,
|
|
|
|
)
|
|
|
|
monkeypatch.setenv("DATABRICKS_HOST", "my-default-host")
|
|
|
|
monkeypatch.setenv("DATABRICKS_TOKEN", "my-default-token")
|
|
|
|
|
|
|
|
llm = Databricks(
|
|
|
|
endpoint_name="databricks-mixtral-8x7b-instruct",
|
|
|
|
transform_input_fn=transform_input,
|
|
|
|
)
|
|
|
|
params = llm._default_params
|
2024-03-06 02:04:45 +00:00
|
|
|
pickled_string = cloudpickle.dumps(transform_input).hex()
|
community[patch]: Support SerDe transform functions in Databricks LLM (#16752)
**Description:** Databricks LLM does not support SerDe the
transform_input_fn and transform_output_fn. After saving and loading,
the LLM will be broken. This PR serialize these functions into a hex
string using pickle, and saving the hex string in the yaml file. Using
pickle to serialize a function can be flaky, but this is a simple
workaround that unblocks many use cases. If more sophisticated SerDe is
needed, we can improve it later.
Test:
Added a simple unit test.
I did manual test on Databricks and it works well.
The saved yaml looks like:
```
llm:
_type: databricks
cluster_driver_port: null
cluster_id: null
databricks_uri: databricks
endpoint_name: databricks-mixtral-8x7b-instruct
extra_params: {}
host: e2-dogfood.staging.cloud.databricks.com
max_tokens: null
model_kwargs: null
n: 1
stop: null
task: null
temperature: 0.0
transform_input_fn: 80049520000000000000008c085f5f6d61696e5f5f948c0f7472616e73666f726d5f696e7075749493942e
transform_output_fn: null
```
@baskaryan
```python
from langchain_community.embeddings import DatabricksEmbeddings
from langchain_community.llms import Databricks
from langchain.chains import RetrievalQA
from langchain.document_loaders import TextLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS
import mlflow
embeddings = DatabricksEmbeddings(endpoint="databricks-bge-large-en")
def transform_input(**request):
request["messages"] = [
{
"role": "user",
"content": request["prompt"]
}
]
del request["prompt"]
return request
llm = Databricks(endpoint_name="databricks-mixtral-8x7b-instruct", transform_input_fn=transform_input)
persist_dir = "faiss_databricks_embedding"
# Create the vector db, persist the db to a local fs folder
loader = TextLoader("state_of_the_union.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)
db = FAISS.from_documents(docs, embeddings)
db.save_local(persist_dir)
def load_retriever(persist_directory):
embeddings = DatabricksEmbeddings(endpoint="databricks-bge-large-en")
vectorstore = FAISS.load_local(persist_directory, embeddings)
return vectorstore.as_retriever()
retriever = load_retriever(persist_dir)
retrievalQA = RetrievalQA.from_llm(llm=llm, retriever=retriever)
with mlflow.start_run() as run:
logged_model = mlflow.langchain.log_model(
retrievalQA,
artifact_path="retrieval_qa",
loader_fn=load_retriever,
persist_dir=persist_dir,
)
# Load the retrievalQA chain
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
print(loaded_model.predict([{"query": "What did the president say about Ketanji Brown Jackson"}]))
```
2024-02-08 21:09:50 +00:00
|
|
|
assert params["transform_input_fn"] == pickled_string
|
2024-03-06 02:04:45 +00:00
|
|
|
|
|
|
|
request = {"prompt": "What is the meaning of life?"}
|
|
|
|
fn = _load_pickled_fn_from_hex_string(params["transform_input_fn"])
|
|
|
|
assert fn(**request) == transform_input(**request)
|