Update Tracing Walkthrough (#4760)

Add client methods to read / list runs and sessions.

Update walkthrough to:
- Let the user create a dataset from the runs without going to the UI
- Use the new CLI command to start the server

Improve the error message when `docker` isn't found
dynamic_agent_tools
Zander Chase 1 year ago committed by GitHub
parent fc0a3c8500
commit bee136efa4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -56,7 +56,11 @@ class BaseTracer(BaseCallbackHandler, ABC):
raise TracerException(
f"Parent run with UUID {run.parent_run_id} not found."
)
if run.child_execution_order > parent_run.child_execution_order:
if (
run.child_execution_order is not None
and parent_run.child_execution_order is not None
and run.child_execution_order > parent_run.child_execution_order
):
parent_run.child_execution_order = run.child_execution_order
self.run_map.pop(str(run.id))
@ -68,6 +72,10 @@ class BaseTracer(BaseCallbackHandler, ABC):
parent_run = self.run_map.get(parent_run_id)
if parent_run is None:
raise TracerException(f"Parent run with UUID {parent_run_id} not found.")
if parent_run.child_execution_order is None:
raise TracerException(
f"Parent run with UUID {parent_run_id} has no child execution order."
)
return parent_run.child_execution_order + 1

@ -108,7 +108,7 @@ class RunBase(BaseModel):
extra: dict
error: Optional[str]
execution_order: int
child_execution_order: int
child_execution_order: Optional[int]
serialized: dict
inputs: dict
outputs: Optional[dict]

@ -5,6 +5,7 @@ import shutil
import subprocess
from contextlib import contextmanager
from pathlib import Path
from subprocess import CalledProcessError
from typing import Generator, List, Optional
import requests
@ -19,10 +20,29 @@ _DIR = Path(__file__).parent
def get_docker_compose_command() -> List[str]:
if shutil.which("docker-compose") is None:
"""Get the correct docker compose command for this system."""
try:
subprocess.check_call(
["docker", "compose", "--version"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return ["docker", "compose"]
else:
return ["docker-compose"]
except (CalledProcessError, FileNotFoundError):
try:
subprocess.check_call(
["docker-compose", "--version"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return ["docker-compose"]
except (CalledProcessError, FileNotFoundError):
raise ValueError(
"Neither 'docker compose' nor 'docker-compose'"
" commands are available. Please install the Docker"
" server following the instructions for your operating"
" system at https://docs.docker.com/engine/install/"
)
def get_ngrok_url(auth_token: Optional[str]) -> str:
@ -85,6 +105,12 @@ class ServerCommand:
)
self.ngrok_path = Path(__file__).absolute().parent / "docker-compose.ngrok.yaml"
def _open_browser(self, url: str) -> None:
try:
subprocess.run(["open", url])
except FileNotFoundError:
pass
def _start_local(self) -> None:
command = [
*self.docker_compose_command,
@ -107,7 +133,7 @@ class ServerCommand:
)
logger.info("\tLANGCHAIN_TRACING_V2=true")
subprocess.run(["open", "http://localhost"])
self._open_browser("http://localhost")
def _start_and_expose(self, auth_token: Optional[str]) -> None:
with create_ngrok_config(auth_token=auth_token):
@ -138,7 +164,8 @@ class ServerCommand:
)
logger.info("\tLANGCHAIN_TRACING_V2=true")
logger.info(f"\tLANGCHAIN_ENDPOINT={ngrok_url}")
subprocess.run(["open", "http://localhost"])
self._open_browser("http://0.0.0.0:4040")
self._open_browser("http://localhost")
def start(self, *, expose: bool = False, auth_token: Optional[str] = None) -> None:
"""Run the LangChainPlus server locally.

@ -27,9 +27,16 @@ from requests import Response
from langchain.base_language import BaseLanguageModel
from langchain.callbacks.tracers.langchain import LangChainTracer
from langchain.callbacks.tracers.schemas import Run, TracerSession
from langchain.chains.base import Chain
from langchain.chat_models.base import BaseChatModel
from langchain.client.models import Dataset, DatasetCreate, Example, ExampleCreate
from langchain.client.models import (
Dataset,
DatasetCreate,
Example,
ExampleCreate,
ListRunsQueryParams,
)
from langchain.llms.base import BaseLLM
from langchain.schema import ChatResult, LLMResult, messages_from_dict
from langchain.utils import raise_for_status_with_text, xor_args
@ -192,6 +199,71 @@ class LangChainPlusClient(BaseSettings):
raise ValueError(f"Dataset {file_name} already exists")
return Dataset(**result)
def read_run(self, run_id: str) -> Run:
"""Read a run from the LangChain+ API."""
response = self._get(f"/runs/{run_id}")
raise_for_status_with_text(response)
return Run(**response.json())
def list_runs(
self,
*,
session_id: Optional[str] = None,
session_name: Optional[str] = None,
run_type: Optional[str] = None,
**kwargs: Any,
) -> List[Run]:
"""List runs from the LangChain+ API."""
if session_name is not None:
if session_id is not None:
raise ValueError("Only one of session_id or session_name may be given")
session_id = self.read_session(session_name=session_name).id
query_params = ListRunsQueryParams(
session_id=session_id, run_type=run_type, **kwargs
)
filtered_params = {
k: v for k, v in query_params.dict().items() if v is not None
}
response = self._get("/runs", params=filtered_params)
raise_for_status_with_text(response)
return [Run(**run) for run in response.json()]
@xor_args(("session_id", "session_name"))
def read_session(
self, *, session_id: Optional[str] = None, session_name: Optional[str] = None
) -> TracerSession:
"""Read a session from the LangChain+ API."""
path = "/sessions"
params: Dict[str, Any] = {"limit": 1, "tenant_id": self.tenant_id}
if session_id is not None:
path += f"/{session_id}"
elif session_name is not None:
params["name"] = session_name
else:
raise ValueError("Must provide dataset_name or dataset_id")
response = self._get(
path,
params=params,
)
raise_for_status_with_text(response)
response = self._get(
path,
params=params,
)
raise_for_status_with_text(response)
result = response.json()
if isinstance(result, list):
if len(result) == 0:
raise ValueError(f"Dataset {session_name} not found")
return TracerSession(**result[0])
return TracerSession(**response.json())
def list_sessions(self) -> List[TracerSession]:
"""List sessions from the LangChain+ API."""
response = self._get("/sessions")
raise_for_status_with_text(response)
return [TracerSession(**session) for session in response.json()]
def create_dataset(self, dataset_name: str, description: str) -> Dataset:
"""Create a dataset in the LangChain+ API."""
dataset = DatasetCreate(

@ -2,9 +2,9 @@ from datetime import datetime
from typing import Any, Dict, List, Optional
from uuid import UUID
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, root_validator
from langchain.callbacks.tracers.schemas import Run
from langchain.callbacks.tracers.schemas import Run, RunTypeEnum
class ExampleBase(BaseModel):
@ -52,3 +52,48 @@ class Dataset(DatasetBase):
id: UUID
created_at: datetime
modified_at: Optional[datetime] = Field(default=None)
class ListRunsQueryParams(BaseModel):
"""Query params for GET /runs endpoint."""
class Config:
extra = "forbid"
id: Optional[List[UUID]]
"""Filter runs by id."""
parent_run: Optional[UUID]
"""Filter runs by parent run."""
run_type: Optional[RunTypeEnum]
"""Filter runs by type."""
session: Optional[UUID] = Field(default=None, alias="session_id")
"""Only return runs within a session."""
reference_example: Optional[UUID]
"""Only return runs that reference the specified dataset example."""
execution_order: Optional[int]
"""Filter runs by execution order."""
error: Optional[bool]
"""Whether to return only runs that errored."""
offset: Optional[int]
"""The offset of the first run to return."""
limit: Optional[int]
"""The maximum number of runs to return."""
start_time: Optional[datetime] = Field(
default=None,
alias="start_before",
description="Query Runs that started <= this time",
)
end_time: Optional[datetime] = Field(
default=None,
alias="end_after",
description="Query Runs that ended >= this time",
)
@root_validator
def validate_time_range(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Validate that start_time <= end_time."""
start_time = values.get("start_time")
end_time = values.get("end_time")
if start_time and end_time and start_time > end_time:
raise ValueError("start_time must be <= end_time")
return values

File diff suppressed because it is too large Load Diff

@ -1,18 +1,15 @@
"""Script to run langchain-server locally using docker-compose."""
import shutil
import subprocess
from pathlib import Path
from langchain.cli.main import get_docker_compose_command
def main() -> None:
"""Run the langchain server locally."""
p = Path(__file__).absolute().parent / "docker-compose.yaml"
if shutil.which("docker-compose") is None:
docker_compose_command = ["docker", "compose"]
else:
docker_compose_command = ["docker-compose"]
docker_compose_command = get_docker_compose_command()
subprocess.run([*docker_compose_command, "-f", str(p), "pull"])
subprocess.run([*docker_compose_command, "-f", str(p), "up"])

Loading…
Cancel
Save