import os from langchain_community.chat_models import ChatOpenAI from langchain_community.embeddings import OpenAIEmbeddings from langchain_community.vectorstores.opensearch_vector_search import ( OpenSearchVectorSearch, ) from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain_core.pydantic_v1 import BaseModel from langchain_core.runnables import RunnableParallel, RunnablePassthrough OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") OPENSEARCH_URL = os.getenv("OPENSEARCH_URL", "https://localhost:9200") OPENSEARCH_USERNAME = os.getenv("OPENSEARCH_USERNAME", "admin") OPENSEARCH_PASSWORD = os.getenv("OPENSEARCH_PASSWORD", "admin") OPENSEARCH_INDEX_NAME = os.getenv("OPENSEARCH_INDEX_NAME", "langchain-test") embedding_function = OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY) vector_store = OpenSearchVectorSearch( opensearch_url=OPENSEARCH_URL, http_auth=(OPENSEARCH_USERNAME, OPENSEARCH_PASSWORD), index_name=OPENSEARCH_INDEX_NAME, embedding_function=embedding_function, verify_certs=False, ) retriever = vector_store.as_retriever() def format_docs(docs): return "\n\n".join([d.page_content for d in docs]) # RAG prompt template = """Answer the question based only on the following context: {context} Question: {question} """ prompt = ChatPromptTemplate.from_template(template) # RAG model = ChatOpenAI(openai_api_key=OPENAI_API_KEY) chain = ( RunnableParallel( {"context": retriever | format_docs, "question": RunnablePassthrough()} ) | prompt | model | StrOutputParser() ) # Add typing for input class Question(BaseModel): __root__: str chain = chain.with_types(input_type=Question)