langchain/templates/rag-opensearch/rag_opensearch/chain.py

61 lines
1.6 KiB
Python
Raw Normal View History

import os
from langchain.chat_models import ChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from langchain.prompts import ChatPromptTemplate
from langchain.pydantic_v1 import BaseModel
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnableParallel, RunnablePassthrough
from langchain.vectorstores.opensearch_vector_search import OpenSearchVectorSearch
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENSEARCH_URL = os.getenv("OPENSEARCH_URL", "https://localhost:9200")
OPENSEARCH_USERNAME = os.getenv("OPENSEARCH_USERNAME", "admin")
OPENSEARCH_PASSWORD = os.getenv("OPENSEARCH_PASSWORD", "admin")
OPENSEARCH_INDEX_NAME = os.getenv("OPENSEARCH_INDEX_NAME", "langchain-test")
embedding_function = OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY)
vector_store = OpenSearchVectorSearch(
opensearch_url=OPENSEARCH_URL,
http_auth=(OPENSEARCH_USERNAME, OPENSEARCH_PASSWORD),
index_name=OPENSEARCH_INDEX_NAME,
embedding_function=embedding_function,
verify_certs=False,
)
retriever = vector_store.as_retriever()
def format_docs(docs):
return "\n\n".join([d.page_content for d in docs])
# RAG prompt
template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
# RAG
model = ChatOpenAI(openai_api_key=OPENAI_API_KEY)
chain = (
RunnableParallel(
{"context": retriever | format_docs, "question": RunnablePassthrough()}
)
| prompt
| model
| StrOutputParser()
)
# Add typing for input
class Question(BaseModel):
__root__: str
chain = chain.with_types(input_type=Question)