langchain/templates/rag-self-query/rag_self_query/chain.py
Erick Friis ed789be8f4
docs, templates: update schema imports to core (#17885)
- chat models, messages
- documents
- agentaction/finish
- baseretriever,document
- stroutputparser
- more messages
- basemessage
- format_document
- baseoutputparser

---------

Co-authored-by: Bagatur <baskaryan@gmail.com>
2024-02-22 15:58:44 -08:00

102 lines
3.1 KiB
Python

import os
from operator import itemgetter
from typing import List, Tuple
from langchain.retrievers import SelfQueryRetriever
from langchain_community.chat_models import ChatOpenAI
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores.elasticsearch import ElasticsearchStore
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import format_document
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from .prompts import CONDENSE_QUESTION_PROMPT, DOCUMENT_PROMPT, LLM_CONTEXT_PROMPT
ELASTIC_CLOUD_ID = os.getenv("ELASTIC_CLOUD_ID")
ELASTIC_USERNAME = os.getenv("ELASTIC_USERNAME", "elastic")
ELASTIC_PASSWORD = os.getenv("ELASTIC_PASSWORD")
ES_URL = os.getenv("ES_URL", "http://localhost:9200")
ELASTIC_INDEX_NAME = os.getenv("ELASTIC_INDEX_NAME", "workspace-search-example")
if ELASTIC_CLOUD_ID and ELASTIC_USERNAME and ELASTIC_PASSWORD:
es_connection_details = {
"es_cloud_id": ELASTIC_CLOUD_ID,
"es_user": ELASTIC_USERNAME,
"es_password": ELASTIC_PASSWORD,
}
else:
es_connection_details = {"es_url": ES_URL}
vecstore = ElasticsearchStore(
ELASTIC_INDEX_NAME,
embedding=OpenAIEmbeddings(),
**es_connection_details,
)
document_contents = "The purpose and specifications of a workplace policy."
metadata_field_info = [
{"name": "name", "type": "string", "description": "Name of the workplace policy."},
{
"name": "created_on",
"type": "date",
"description": "The date the policy was created in ISO 8601 date format (YYYY-MM-DD).", # noqa: E501
},
{
"name": "updated_at",
"type": "date",
"description": "The date the policy was last updated in ISO 8601 date format (YYYY-MM-DD).", # noqa: E501
},
{
"name": "location",
"type": "string",
"description": "Where the policy text is stored. The only valid values are ['github', 'sharepoint'].", # noqa: E501
},
]
llm = ChatOpenAI(temperature=0)
retriever = SelfQueryRetriever.from_llm(
llm, vecstore, document_contents, metadata_field_info
)
def _combine_documents(docs: List) -> str:
return "\n\n".join(format_document(doc, prompt=DOCUMENT_PROMPT) for doc in docs)
def _format_chat_history(chat_history: List[Tuple]) -> str:
return "\n".join(f"Human: {human}\nAssistant: {ai}" for human, ai in chat_history)
class InputType(BaseModel):
question: str
chat_history: List[Tuple[str, str]] = Field(default_factory=list)
standalone_question = (
{
"question": itemgetter("question"),
"chat_history": lambda x: _format_chat_history(x["chat_history"]),
}
| CONDENSE_QUESTION_PROMPT
| llm
| StrOutputParser()
)
def route_question(input):
if input.get("chat_history"):
return standalone_question
else:
return RunnablePassthrough()
_context = RunnableParallel(
context=retriever | _combine_documents,
question=RunnablePassthrough(),
)
chain = (
standalone_question | _context | LLM_CONTEXT_PROMPT | llm | StrOutputParser()
).with_types(input_type=InputType)