langchain/templates/rag-opensearch/rag_opensearch/chain.py
2024-01-03 13:28:05 -08:00

63 lines
1.7 KiB
Python

import os
from langchain_community.chat_models import ChatOpenAI
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores.opensearch_vector_search import (
OpenSearchVectorSearch,
)
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENSEARCH_URL = os.getenv("OPENSEARCH_URL", "https://localhost:9200")
OPENSEARCH_USERNAME = os.getenv("OPENSEARCH_USERNAME", "admin")
OPENSEARCH_PASSWORD = os.getenv("OPENSEARCH_PASSWORD", "admin")
OPENSEARCH_INDEX_NAME = os.getenv("OPENSEARCH_INDEX_NAME", "langchain-test")
embedding_function = OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY)
vector_store = OpenSearchVectorSearch(
opensearch_url=OPENSEARCH_URL,
http_auth=(OPENSEARCH_USERNAME, OPENSEARCH_PASSWORD),
index_name=OPENSEARCH_INDEX_NAME,
embedding_function=embedding_function,
verify_certs=False,
)
retriever = vector_store.as_retriever()
def format_docs(docs):
return "\n\n".join([d.page_content for d in docs])
# RAG prompt
template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
# RAG
model = ChatOpenAI(openai_api_key=OPENAI_API_KEY)
chain = (
RunnableParallel(
{"context": retriever | format_docs, "question": RunnablePassthrough()}
)
| prompt
| model
| StrOutputParser()
)
# Add typing for input
class Question(BaseModel):
__root__: str
chain = chain.with_types(input_type=Question)