diff --git a/libs/langchain/langchain/__init__.py b/libs/langchain/langchain/__init__.py index 04f7e4cae9..e3b87d4961 100644 --- a/libs/langchain/langchain/__init__.py +++ b/libs/langchain/langchain/__init__.py @@ -27,6 +27,12 @@ if "pydantic_v1" not in sys.modules: # and may run prior to langchain core package. sys.modules["pydantic_v1"] = pydantic_v1 +try: + _PYDANTIC_MAJOR_VERSION: int = int(metadata.version("pydantic").split(".")[0]) +except metadata.PackageNotFoundError: + _PYDANTIC_MAJOR_VERSION = 0 + + from langchain.agents import MRKLChain, ReActChain, SelfAskWithSearchChain from langchain.cache import BaseCache from langchain.chains import ( diff --git a/libs/langchain/langchain/tools/openapi/utils/api_models.py b/libs/langchain/langchain/tools/openapi/utils/api_models.py index b40dde124d..8922d0cd59 100644 --- a/libs/langchain/langchain/tools/openapi/utils/api_models.py +++ b/libs/langchain/langchain/tools/openapi/utils/api_models.py @@ -3,9 +3,9 @@ import logging from enum import Enum from typing import Any, Dict, List, Optional, Sequence, Tuple, Type, Union -from openapi_schema_pydantic import MediaType, Parameter, Reference, RequestBody, Schema from pydantic_v1 import BaseModel, Field +from langchain import _PYDANTIC_MAJOR_VERSION from langchain.tools.openapi.utils.openapi_utils import HTTPVerb, OpenAPISpec logger = logging.getLogger(__name__) @@ -85,499 +85,543 @@ class APIPropertyBase(BaseModel): """The description of the property.""" -class APIProperty(APIPropertyBase): - """A model for a property in the query, path, header, or cookie params.""" - - location: APIPropertyLocation = Field(alias="location") - """The path/how it's being passed to the endpoint.""" - - @staticmethod - def _cast_schema_list_type(schema: Schema) -> Optional[Union[str, Tuple[str, ...]]]: - type_ = schema.type - if not isinstance(type_, list): - return type_ - else: - return tuple(type_) - - @staticmethod - def _get_schema_type_for_enum(parameter: Parameter, schema: Schema) -> Enum: - """Get the schema type when the parameter is an enum.""" - param_name = f"{parameter.name}Enum" - return Enum(param_name, {str(v): v for v in schema.enum}) - - @staticmethod - def _get_schema_type_for_array( - schema: Schema, - ) -> Optional[Union[str, Tuple[str, ...]]]: - items = schema.items - if isinstance(items, Schema): - schema_type = APIProperty._cast_schema_list_type(items) - elif isinstance(items, Reference): - ref_name = items.ref.split("/")[-1] - schema_type = ref_name # TODO: Add ref definitions to make his valid - else: - raise ValueError(f"Unsupported array items: {items}") - - if isinstance(schema_type, str): - # TODO: recurse - schema_type = (schema_type,) - - return schema_type - - @staticmethod - def _get_schema_type(parameter: Parameter, schema: Optional[Schema]) -> SCHEMA_TYPE: - if schema is None: - return None - schema_type: SCHEMA_TYPE = APIProperty._cast_schema_list_type(schema) - if schema_type == "array": - schema_type = APIProperty._get_schema_type_for_array(schema) - elif schema_type == "object": - # TODO: Resolve array and object types to components. - raise NotImplementedError("Objects not yet supported") - elif schema_type in PRIMITIVE_TYPES: - if schema.enum: - schema_type = APIProperty._get_schema_type_for_enum(parameter, schema) +if _PYDANTIC_MAJOR_VERSION == 1: + from openapi_schema_pydantic import ( + MediaType, + Parameter, + Reference, + RequestBody, + Schema, + ) + + class APIProperty(APIPropertyBase): + """A model for a property in the query, path, header, or cookie params.""" + + location: APIPropertyLocation = Field(alias="location") + """The path/how it's being passed to the endpoint.""" + + @staticmethod + def _cast_schema_list_type( + schema: Schema, + ) -> Optional[Union[str, Tuple[str, ...]]]: + type_ = schema.type + if not isinstance(type_, list): + return type_ else: - # Directly use the primitive type - pass - else: - raise NotImplementedError(f"Unsupported type: {schema_type}") - - return schema_type - - @staticmethod - def _validate_location(location: APIPropertyLocation, name: str) -> None: - if location not in SUPPORTED_LOCATIONS: - raise NotImplementedError( - INVALID_LOCATION_TEMPL.format(location=location, name=name) - ) - - @staticmethod - def _validate_content(content: Optional[Dict[str, MediaType]]) -> None: - if content: - raise ValueError( - "API Properties with media content not supported. " - "Media content only supported within APIRequestBodyProperty's" - ) - - @staticmethod - def _get_schema(parameter: Parameter, spec: OpenAPISpec) -> Optional[Schema]: - schema = parameter.param_schema - if isinstance(schema, Reference): - schema = spec.get_referenced_schema(schema) - elif schema is None: - return None - elif not isinstance(schema, Schema): - raise ValueError(f"Error dereferencing schema: {schema}") - - return schema - - @staticmethod - def is_supported_location(location: str) -> bool: - """Return whether the provided location is supported.""" - try: - return APIPropertyLocation.from_str(location) in SUPPORTED_LOCATIONS - except ValueError: - return False - - @classmethod - def from_parameter(cls, parameter: Parameter, spec: OpenAPISpec) -> "APIProperty": - """Instantiate from an OpenAPI Parameter.""" - location = APIPropertyLocation.from_str(parameter.param_in) - cls._validate_location( - location, - parameter.name, - ) - cls._validate_content(parameter.content) - schema = cls._get_schema(parameter, spec) - schema_type = cls._get_schema_type(parameter, schema) - default_val = schema.default if schema is not None else None - return cls( - name=parameter.name, - location=location, - default=default_val, - description=parameter.description, - required=parameter.required, - type=schema_type, - ) - - -class APIRequestBodyProperty(APIPropertyBase): - """A model for a request body property.""" - - properties: List["APIRequestBodyProperty"] = Field(alias="properties") - """The sub-properties of the property.""" - - # This is useful for handling nested property cycles. - # We can define separate types in that case. - references_used: List[str] = Field(alias="references_used") - """The references used by the property.""" - - @classmethod - def _process_object_schema( - cls, schema: Schema, spec: OpenAPISpec, references_used: List[str] - ) -> Tuple[Union[str, List[str], None], List["APIRequestBodyProperty"]]: - properties = [] - required_props = schema.required or [] - if schema.properties is None: - raise ValueError( - f"No properties found when processing object schema: {schema}" - ) - for prop_name, prop_schema in schema.properties.items(): - if isinstance(prop_schema, Reference): - ref_name = prop_schema.ref.split("/")[-1] - if ref_name not in references_used: - references_used.append(ref_name) - prop_schema = spec.get_referenced_schema(prop_schema) - else: - continue - - properties.append( - cls.from_schema( - schema=prop_schema, - name=prop_name, - required=prop_name in required_props, - spec=spec, - references_used=references_used, - ) - ) - return schema.type, properties - - @classmethod - def _process_array_schema( - cls, schema: Schema, name: str, spec: OpenAPISpec, references_used: List[str] - ) -> str: - items = schema.items - if items is not None: - if isinstance(items, Reference): + return tuple(type_) + + @staticmethod + def _get_schema_type_for_enum(parameter: Parameter, schema: Schema) -> Enum: + """Get the schema type when the parameter is an enum.""" + param_name = f"{parameter.name}Enum" + return Enum(param_name, {str(v): v for v in schema.enum}) + + @staticmethod + def _get_schema_type_for_array( + schema: Schema, + ) -> Optional[Union[str, Tuple[str, ...]]]: + items = schema.items + if isinstance(items, Schema): + schema_type = APIProperty._cast_schema_list_type(items) + elif isinstance(items, Reference): ref_name = items.ref.split("/")[-1] - if ref_name not in references_used: - references_used.append(ref_name) - items = spec.get_referenced_schema(items) + schema_type = ref_name # TODO: Add ref definitions to make his valid + else: + raise ValueError(f"Unsupported array items: {items}") + + if isinstance(schema_type, str): + # TODO: recurse + schema_type = (schema_type,) + + return schema_type + + @staticmethod + def _get_schema_type( + parameter: Parameter, schema: Optional[Schema] + ) -> SCHEMA_TYPE: + if schema is None: + return None + schema_type: SCHEMA_TYPE = APIProperty._cast_schema_list_type(schema) + if schema_type == "array": + schema_type = APIProperty._get_schema_type_for_array(schema) + elif schema_type == "object": + # TODO: Resolve array and object types to components. + raise NotImplementedError("Objects not yet supported") + elif schema_type in PRIMITIVE_TYPES: + if schema.enum: + schema_type = APIProperty._get_schema_type_for_enum( + parameter, schema + ) else: + # Directly use the primitive type pass - return f"Array<{ref_name}>" else: - pass + raise NotImplementedError(f"Unsupported type: {schema_type}") - if isinstance(items, Schema): - array_type = cls.from_schema( - schema=items, - name=f"{name}Item", - required=True, # TODO: Add required - spec=spec, - references_used=references_used, - ) - return f"Array<{array_type.type}>" + return schema_type - return "array" + @staticmethod + def _validate_location(location: APIPropertyLocation, name: str) -> None: + if location not in SUPPORTED_LOCATIONS: + raise NotImplementedError( + INVALID_LOCATION_TEMPL.format(location=location, name=name) + ) - @classmethod - def from_schema( - cls, - schema: Schema, - name: str, - required: bool, - spec: OpenAPISpec, - references_used: Optional[List[str]] = None, - ) -> "APIRequestBodyProperty": - """Recursively populate from an OpenAPI Schema.""" - if references_used is None: - references_used = [] + @staticmethod + def _validate_content(content: Optional[Dict[str, MediaType]]) -> None: + if content: + raise ValueError( + "API Properties with media content not supported. " + "Media content only supported within APIRequestBodyProperty's" + ) - schema_type = schema.type - properties: List[APIRequestBodyProperty] = [] - if schema_type == "object" and schema.properties: - schema_type, properties = cls._process_object_schema( - schema, spec, references_used + @staticmethod + def _get_schema(parameter: Parameter, spec: OpenAPISpec) -> Optional[Schema]: + schema = parameter.param_schema + if isinstance(schema, Reference): + schema = spec.get_referenced_schema(schema) + elif schema is None: + return None + elif not isinstance(schema, Schema): + raise ValueError(f"Error dereferencing schema: {schema}") + + return schema + + @staticmethod + def is_supported_location(location: str) -> bool: + """Return whether the provided location is supported.""" + try: + return APIPropertyLocation.from_str(location) in SUPPORTED_LOCATIONS + except ValueError: + return False + + @classmethod + def from_parameter( + cls, parameter: Parameter, spec: OpenAPISpec + ) -> "APIProperty": + """Instantiate from an OpenAPI Parameter.""" + location = APIPropertyLocation.from_str(parameter.param_in) + cls._validate_location( + location, + parameter.name, ) - elif schema_type == "array": - schema_type = cls._process_array_schema(schema, name, spec, references_used) - elif schema_type in PRIMITIVE_TYPES: - # Use the primitive type directly - pass - elif schema_type is None: - # No typing specified/parsed. WIll map to 'any' - pass - else: - raise ValueError(f"Unsupported type: {schema_type}") - - return cls( - name=name, - required=required, - type=schema_type, - default=schema.default, - description=schema.description, - properties=properties, - references_used=references_used, - ) - - -class APIRequestBody(BaseModel): - """A model for a request body.""" - - description: Optional[str] = Field(alias="description") - """The description of the request body.""" - - properties: List[APIRequestBodyProperty] = Field(alias="properties") - - # E.g., application/json - we only support JSON at the moment. - media_type: str = Field(alias="media_type") - """The media type of the request body.""" - - @classmethod - def _process_supported_media_type( - cls, - media_type_obj: MediaType, - spec: OpenAPISpec, - ) -> List[APIRequestBodyProperty]: - """Process the media type of the request body.""" - references_used = [] - schema = media_type_obj.media_type_schema - if isinstance(schema, Reference): - references_used.append(schema.ref.split("/")[-1]) - schema = spec.get_referenced_schema(schema) - if schema is None: - raise ValueError( - f"Could not resolve schema for media type: {media_type_obj}" + cls._validate_content(parameter.content) + schema = cls._get_schema(parameter, spec) + schema_type = cls._get_schema_type(parameter, schema) + default_val = schema.default if schema is not None else None + return cls( + name=parameter.name, + location=location, + default=default_val, + description=parameter.description, + required=parameter.required, + type=schema_type, ) - api_request_body_properties = [] - required_properties = schema.required or [] - if schema.type == "object" and schema.properties: + + class APIRequestBodyProperty(APIPropertyBase): + """A model for a request body property.""" + + properties: List["APIRequestBodyProperty"] = Field(alias="properties") + """The sub-properties of the property.""" + + # This is useful for handling nested property cycles. + # We can define separate types in that case. + references_used: List[str] = Field(alias="references_used") + """The references used by the property.""" + + @classmethod + def _process_object_schema( + cls, schema: Schema, spec: OpenAPISpec, references_used: List[str] + ) -> Tuple[Union[str, List[str], None], List["APIRequestBodyProperty"]]: + properties = [] + required_props = schema.required or [] + if schema.properties is None: + raise ValueError( + f"No properties found when processing object schema: {schema}" + ) for prop_name, prop_schema in schema.properties.items(): if isinstance(prop_schema, Reference): - prop_schema = spec.get_referenced_schema(prop_schema) - - api_request_body_properties.append( - APIRequestBodyProperty.from_schema( + ref_name = prop_schema.ref.split("/")[-1] + if ref_name not in references_used: + references_used.append(ref_name) + prop_schema = spec.get_referenced_schema(prop_schema) + else: + continue + + properties.append( + cls.from_schema( schema=prop_schema, name=prop_name, - required=prop_name in required_properties, + required=prop_name in required_props, spec=spec, + references_used=references_used, ) ) - else: - api_request_body_properties.append( - APIRequestBodyProperty( - name="body", - required=True, - type=schema.type, - default=schema.default, - description=schema.description, - properties=[], - references_used=references_used, - ) - ) - - return api_request_body_properties + return schema.type, properties + + @classmethod + def _process_array_schema( + cls, + schema: Schema, + name: str, + spec: OpenAPISpec, + references_used: List[str], + ) -> str: + items = schema.items + if items is not None: + if isinstance(items, Reference): + ref_name = items.ref.split("/")[-1] + if ref_name not in references_used: + references_used.append(ref_name) + items = spec.get_referenced_schema(items) + else: + pass + return f"Array<{ref_name}>" + else: + pass - @classmethod - def from_request_body( - cls, request_body: RequestBody, spec: OpenAPISpec - ) -> "APIRequestBody": - """Instantiate from an OpenAPI RequestBody.""" - properties = [] - for media_type, media_type_obj in request_body.content.items(): - if media_type not in _SUPPORTED_MEDIA_TYPES: - continue - api_request_body_properties = cls._process_supported_media_type( - media_type_obj, - spec, + if isinstance(items, Schema): + array_type = cls.from_schema( + schema=items, + name=f"{name}Item", + required=True, # TODO: Add required + spec=spec, + references_used=references_used, + ) + return f"Array<{array_type.type}>" + + return "array" + + @classmethod + def from_schema( + cls, + schema: Schema, + name: str, + required: bool, + spec: OpenAPISpec, + references_used: Optional[List[str]] = None, + ) -> "APIRequestBodyProperty": + """Recursively populate from an OpenAPI Schema.""" + if references_used is None: + references_used = [] + + schema_type = schema.type + properties: List[APIRequestBodyProperty] = [] + if schema_type == "object" and schema.properties: + schema_type, properties = cls._process_object_schema( + schema, spec, references_used + ) + elif schema_type == "array": + schema_type = cls._process_array_schema( + schema, name, spec, references_used + ) + elif schema_type in PRIMITIVE_TYPES: + # Use the primitive type directly + pass + elif schema_type is None: + # No typing specified/parsed. WIll map to 'any' + pass + else: + raise ValueError(f"Unsupported type: {schema_type}") + + return cls( + name=name, + required=required, + type=schema_type, + default=schema.default, + description=schema.description, + properties=properties, + references_used=references_used, ) - properties.extend(api_request_body_properties) - - return cls( - description=request_body.description, - properties=properties, - media_type=media_type, - ) - - -class APIOperation(BaseModel): - """A model for a single API operation.""" - - operation_id: str = Field(alias="operation_id") - """The unique identifier of the operation.""" - description: Optional[str] = Field(alias="description") - """The description of the operation.""" + # class APIRequestBodyProperty(APIPropertyBase): + class APIRequestBody(BaseModel): + """A model for a request body.""" - base_url: str = Field(alias="base_url") - """The base URL of the operation.""" + description: Optional[str] = Field(alias="description") + """The description of the request body.""" - path: str = Field(alias="path") - """The path of the operation.""" + properties: List[APIRequestBodyProperty] = Field(alias="properties") - method: HTTPVerb = Field(alias="method") - """The HTTP method of the operation.""" + # E.g., application/json - we only support JSON at the moment. + media_type: str = Field(alias="media_type") + """The media type of the request body.""" - properties: Sequence[APIProperty] = Field(alias="properties") - - # TODO: Add parse in used components to be able to specify what type of - # referenced object it is. - # """The properties of the operation.""" - # components: Dict[str, BaseModel] = Field(alias="components") - - request_body: Optional[APIRequestBody] = Field(alias="request_body") - """The request body of the operation.""" - - @staticmethod - def _get_properties_from_parameters( - parameters: List[Parameter], spec: OpenAPISpec - ) -> List[APIProperty]: - """Get the properties of the operation.""" - properties = [] - for param in parameters: - if APIProperty.is_supported_location(param.param_in): - properties.append(APIProperty.from_parameter(param, spec)) - elif param.required: + @classmethod + def _process_supported_media_type( + cls, + media_type_obj: MediaType, + spec: OpenAPISpec, + ) -> List[APIRequestBodyProperty]: + """Process the media type of the request body.""" + references_used = [] + schema = media_type_obj.media_type_schema + if isinstance(schema, Reference): + references_used.append(schema.ref.split("/")[-1]) + schema = spec.get_referenced_schema(schema) + if schema is None: raise ValueError( - INVALID_LOCATION_TEMPL.format( - location=param.param_in, name=param.name - ) + f"Could not resolve schema for media type: {media_type_obj}" ) + api_request_body_properties = [] + required_properties = schema.required or [] + if schema.type == "object" and schema.properties: + for prop_name, prop_schema in schema.properties.items(): + if isinstance(prop_schema, Reference): + prop_schema = spec.get_referenced_schema(prop_schema) + + api_request_body_properties.append( + APIRequestBodyProperty.from_schema( + schema=prop_schema, + name=prop_name, + required=prop_name in required_properties, + spec=spec, + ) + ) else: - logger.warning( - INVALID_LOCATION_TEMPL.format( - location=param.param_in, name=param.name + api_request_body_properties.append( + APIRequestBodyProperty( + name="body", + required=True, + type=schema.type, + default=schema.default, + description=schema.description, + properties=[], + references_used=references_used, ) - + " Ignoring optional parameter" ) - pass - return properties - @classmethod - def from_openapi_url( - cls, - spec_url: str, - path: str, - method: str, - ) -> "APIOperation": - """Create an APIOperation from an OpenAPI URL.""" - spec = OpenAPISpec.from_url(spec_url) - return cls.from_openapi_spec(spec, path, method) + return api_request_body_properties - @classmethod - def from_openapi_spec( - cls, - spec: OpenAPISpec, - path: str, - method: str, - ) -> "APIOperation": - """Create an APIOperation from an OpenAPI spec.""" - operation = spec.get_operation(path, method) - parameters = spec.get_parameters_for_operation(operation) - properties = cls._get_properties_from_parameters(parameters, spec) - operation_id = OpenAPISpec.get_cleaned_operation_id(operation, path, method) - request_body = spec.get_request_body_for_operation(operation) - api_request_body = ( - APIRequestBody.from_request_body(request_body, spec) - if request_body is not None - else None - ) - description = operation.description or operation.summary - if not description and spec.paths is not None: - description = spec.paths[path].description or spec.paths[path].summary - return cls( - operation_id=operation_id, - description=description or "", - base_url=spec.base_url, - path=path, - method=method, - properties=properties, - request_body=api_request_body, - ) - - @staticmethod - def ts_type_from_python(type_: SCHEMA_TYPE) -> str: - if type_ is None: - # TODO: Handle Nones better. These often result when - # parsing specs that are < v3 - return "any" - elif isinstance(type_, str): - return { - "str": "string", - "integer": "number", - "float": "number", - "date-time": "string", - }.get(type_, type_) - elif isinstance(type_, tuple): - return f"Array<{APIOperation.ts_type_from_python(type_[0])}>" - elif isinstance(type_, type) and issubclass(type_, Enum): - return " | ".join([f"'{e.value}'" for e in type_]) - else: - return str(type_) - - def _format_nested_properties( - self, properties: List[APIRequestBodyProperty], indent: int = 2 - ) -> str: - """Format nested properties.""" - formatted_props = [] - - for prop in properties: - prop_name = prop.name - prop_type = self.ts_type_from_python(prop.type) - prop_required = "" if prop.required else "?" - prop_desc = f"/* {prop.description} */" if prop.description else "" - - if prop.properties: - nested_props = self._format_nested_properties( - prop.properties, indent + 2 + @classmethod + def from_request_body( + cls, request_body: RequestBody, spec: OpenAPISpec + ) -> "APIRequestBody": + """Instantiate from an OpenAPI RequestBody.""" + properties = [] + for media_type, media_type_obj in request_body.content.items(): + if media_type not in _SUPPORTED_MEDIA_TYPES: + continue + api_request_body_properties = cls._process_supported_media_type( + media_type_obj, + spec, ) - prop_type = f"{{\n{nested_props}\n{' ' * indent}}}" + properties.extend(api_request_body_properties) - formatted_props.append( - f"{prop_desc}\n{' ' * indent}{prop_name}{prop_required}: {prop_type}," + return cls( + description=request_body.description, + properties=properties, + media_type=media_type, + ) + + # class APIRequestBodyProperty(APIPropertyBase): + # class APIRequestBody(BaseModel): + class APIOperation(BaseModel): + """A model for a single API operation.""" + + operation_id: str = Field(alias="operation_id") + """The unique identifier of the operation.""" + + description: Optional[str] = Field(alias="description") + """The description of the operation.""" + + base_url: str = Field(alias="base_url") + """The base URL of the operation.""" + + path: str = Field(alias="path") + """The path of the operation.""" + + method: HTTPVerb = Field(alias="method") + """The HTTP method of the operation.""" + + properties: Sequence[APIProperty] = Field(alias="properties") + + # TODO: Add parse in used components to be able to specify what type of + # referenced object it is. + # """The properties of the operation.""" + # components: Dict[str, BaseModel] = Field(alias="components") + + request_body: Optional[APIRequestBody] = Field(alias="request_body") + """The request body of the operation.""" + + @staticmethod + def _get_properties_from_parameters( + parameters: List[Parameter], spec: OpenAPISpec + ) -> List[APIProperty]: + """Get the properties of the operation.""" + properties = [] + for param in parameters: + if APIProperty.is_supported_location(param.param_in): + properties.append(APIProperty.from_parameter(param, spec)) + elif param.required: + raise ValueError( + INVALID_LOCATION_TEMPL.format( + location=param.param_in, name=param.name + ) + ) + else: + logger.warning( + INVALID_LOCATION_TEMPL.format( + location=param.param_in, name=param.name + ) + + " Ignoring optional parameter" + ) + pass + return properties + + @classmethod + def from_openapi_url( + cls, + spec_url: str, + path: str, + method: str, + ) -> "APIOperation": + """Create an APIOperation from an OpenAPI URL.""" + spec = OpenAPISpec.from_url(spec_url) + return cls.from_openapi_spec(spec, path, method) + + @classmethod + def from_openapi_spec( + cls, + spec: OpenAPISpec, + path: str, + method: str, + ) -> "APIOperation": + """Create an APIOperation from an OpenAPI spec.""" + operation = spec.get_operation(path, method) + parameters = spec.get_parameters_for_operation(operation) + properties = cls._get_properties_from_parameters(parameters, spec) + operation_id = OpenAPISpec.get_cleaned_operation_id(operation, path, method) + request_body = spec.get_request_body_for_operation(operation) + api_request_body = ( + APIRequestBody.from_request_body(request_body, spec) + if request_body is not None + else None + ) + description = operation.description or operation.summary + if not description and spec.paths is not None: + description = spec.paths[path].description or spec.paths[path].summary + return cls( + operation_id=operation_id, + description=description or "", + base_url=spec.base_url, + path=path, + method=method, + properties=properties, + request_body=api_request_body, ) - return "\n".join(formatted_props) + @staticmethod + def ts_type_from_python(type_: SCHEMA_TYPE) -> str: + if type_ is None: + # TODO: Handle Nones better. These often result when + # parsing specs that are < v3 + return "any" + elif isinstance(type_, str): + return { + "str": "string", + "integer": "number", + "float": "number", + "date-time": "string", + }.get(type_, type_) + elif isinstance(type_, tuple): + return f"Array<{APIOperation.ts_type_from_python(type_[0])}>" + elif isinstance(type_, type) and issubclass(type_, Enum): + return " | ".join([f"'{e.value}'" for e in type_]) + else: + return str(type_) + + def _format_nested_properties( + self, properties: List[APIRequestBodyProperty], indent: int = 2 + ) -> str: + """Format nested properties.""" + formatted_props = [] + + for prop in properties: + prop_name = prop.name + prop_type = self.ts_type_from_python(prop.type) + prop_required = "" if prop.required else "?" + prop_desc = f"/* {prop.description} */" if prop.description else "" + + if prop.properties: + nested_props = self._format_nested_properties( + prop.properties, indent + 2 + ) + prop_type = f"{{\n{nested_props}\n{' ' * indent}}}" - def to_typescript(self) -> str: - """Get typescript string representation of the operation.""" - operation_name = self.operation_id - params = [] + formatted_props.append( + f"{prop_desc}\n{' ' * indent}{prop_name}" + f"{prop_required}: {prop_type}," + ) - if self.request_body: - formatted_request_body_props = self._format_nested_properties( - self.request_body.properties - ) - params.append(formatted_request_body_props) - - for prop in self.properties: - prop_name = prop.name - prop_type = self.ts_type_from_python(prop.type) - prop_required = "" if prop.required else "?" - prop_desc = f"/* {prop.description} */" if prop.description else "" - params.append(f"{prop_desc}\n\t\t{prop_name}{prop_required}: {prop_type},") - - formatted_params = "\n".join(params).strip() - description_str = f"/* {self.description} */" if self.description else "" - typescript_definition = f""" -{description_str} -type {operation_name} = (_: {{ -{formatted_params} -}}) => any; -""" - return typescript_definition.strip() - - @property - def query_params(self) -> List[str]: - return [ - property.name - for property in self.properties - if property.location == APIPropertyLocation.QUERY - ] - - @property - def path_params(self) -> List[str]: - return [ - property.name - for property in self.properties - if property.location == APIPropertyLocation.PATH - ] - - @property - def body_params(self) -> List[str]: - if self.request_body is None: - return [] - return [prop.name for prop in self.request_body.properties] + return "\n".join(formatted_props) + + def to_typescript(self) -> str: + """Get typescript string representation of the operation.""" + operation_name = self.operation_id + params = [] + + if self.request_body: + formatted_request_body_props = self._format_nested_properties( + self.request_body.properties + ) + params.append(formatted_request_body_props) + + for prop in self.properties: + prop_name = prop.name + prop_type = self.ts_type_from_python(prop.type) + prop_required = "" if prop.required else "?" + prop_desc = f"/* {prop.description} */" if prop.description else "" + params.append( + f"{prop_desc}\n\t\t{prop_name}{prop_required}: {prop_type}," + ) + + formatted_params = "\n".join(params).strip() + description_str = f"/* {self.description} */" if self.description else "" + typescript_definition = f""" + {description_str} + type {operation_name} = (_: {{ + {formatted_params} + }}) => any; + """ + return typescript_definition.strip() + + @property + def query_params(self) -> List[str]: + return [ + property.name + for property in self.properties + if property.location == APIPropertyLocation.QUERY + ] + + @property + def path_params(self) -> List[str]: + return [ + property.name + for property in self.properties + if property.location == APIPropertyLocation.PATH + ] + + @property + def body_params(self) -> List[str]: + if self.request_body is None: + return [] + return [prop.name for prop in self.request_body.properties] + +else: + + class APIProperty(APIPropertyBase): # type: ignore[no-redef] + def __init__(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("Only supported for pydantic v1") + + class APIRequestBodyProperty(APIPropertyBase): # type: ignore[no-redef] + def __init__(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("Only supported for pydantic v1") + + class APIRequestBody(BaseModel): # type: ignore[no-redef] + def __init__(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("Only supported for pydantic v1") + + class APIOperation(BaseModel): # type: ignore[no-redef] + def __init__(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("Only supported for pydantic v1") diff --git a/libs/langchain/langchain/tools/openapi/utils/openapi_utils.py b/libs/langchain/langchain/tools/openapi/utils/openapi_utils.py index f37b41b9d1..eb37681926 100644 --- a/libs/langchain/langchain/tools/openapi/utils/openapi_utils.py +++ b/libs/langchain/langchain/tools/openapi/utils/openapi_utils.py @@ -1,2 +1,4 @@ """Utility functions for parsing an OpenAPI spec. Kept for backwards compat.""" -from langchain.utilities.openapi import HTTPVerb, OpenAPISpec # noqa: F401 +from langchain.utilities.openapi import HTTPVerb, OpenAPISpec + +__all__ = ["HTTPVerb", "OpenAPISpec"] diff --git a/libs/langchain/langchain/utilities/alpha_vantage.py b/libs/langchain/langchain/utilities/alpha_vantage.py index de634569a4..638174f0e9 100644 --- a/libs/langchain/langchain/utilities/alpha_vantage.py +++ b/libs/langchain/langchain/utilities/alpha_vantage.py @@ -2,9 +2,8 @@ from typing import Any, Dict, List, Optional import requests -from pydantic_v1 import Extra, root_validator +from pydantic_v1 import BaseModel, Extra, root_validator -from langchain.tools.base import BaseModel from langchain.utils import get_from_dict_or_env diff --git a/libs/langchain/langchain/utilities/openapi.py b/libs/langchain/langchain/utilities/openapi.py index 5a1f8d4a8c..2262186d44 100644 --- a/libs/langchain/langchain/utilities/openapi.py +++ b/libs/langchain/langchain/utilities/openapi.py @@ -1,4 +1,6 @@ """Utility functions for parsing an OpenAPI spec.""" +from __future__ import annotations + import copy import json import logging @@ -9,19 +11,10 @@ from typing import Dict, List, Optional, Union import requests import yaml -from openapi_schema_pydantic import ( - Components, - OpenAPI, - Operation, - Parameter, - PathItem, - Paths, - Reference, - RequestBody, - Schema, -) from pydantic_v1 import ValidationError +from langchain import _PYDANTIC_MAJOR_VERSION + logger = logging.getLogger(__name__) @@ -38,7 +31,7 @@ class HTTPVerb(str, Enum): TRACE = "trace" @classmethod - def from_str(cls, verb: str) -> "HTTPVerb": + def from_str(cls, verb: str) -> HTTPVerb: """Parse an HTTP verb.""" try: return cls(verb) @@ -46,240 +39,265 @@ class HTTPVerb(str, Enum): raise ValueError(f"Invalid HTTP verb. Valid values are {cls.__members__}") -class OpenAPISpec(OpenAPI): - """OpenAPI Model that removes misformatted parts of the spec.""" - - @property - def _paths_strict(self) -> Paths: - if not self.paths: - raise ValueError("No paths found in spec") - return self.paths - - def _get_path_strict(self, path: str) -> PathItem: - path_item = self._paths_strict.get(path) - if not path_item: - raise ValueError(f"No path found for {path}") - return path_item - - @property - def _components_strict(self) -> Components: - """Get components or err.""" - if self.components is None: - raise ValueError("No components found in spec. ") - return self.components - - @property - def _parameters_strict(self) -> Dict[str, Union[Parameter, Reference]]: - """Get parameters or err.""" - parameters = self._components_strict.parameters - if parameters is None: - raise ValueError("No parameters found in spec. ") - return parameters - - @property - def _schemas_strict(self) -> Dict[str, Schema]: - """Get the dictionary of schemas or err.""" - schemas = self._components_strict.schemas - if schemas is None: - raise ValueError("No schemas found in spec. ") - return schemas - - @property - def _request_bodies_strict(self) -> Dict[str, Union[RequestBody, Reference]]: - """Get the request body or err.""" - request_bodies = self._components_strict.requestBodies - if request_bodies is None: - raise ValueError("No request body found in spec. ") - return request_bodies - - def _get_referenced_parameter(self, ref: Reference) -> Union[Parameter, Reference]: - """Get a parameter (or nested reference) or err.""" - ref_name = ref.ref.split("/")[-1] - parameters = self._parameters_strict - if ref_name not in parameters: - raise ValueError(f"No parameter found for {ref_name}") - return parameters[ref_name] - - def _get_root_referenced_parameter(self, ref: Reference) -> Parameter: - """Get the root reference or err.""" - parameter = self._get_referenced_parameter(ref) - while isinstance(parameter, Reference): - parameter = self._get_referenced_parameter(parameter) - return parameter - - def get_referenced_schema(self, ref: Reference) -> Schema: - """Get a schema (or nested reference) or err.""" - ref_name = ref.ref.split("/")[-1] - schemas = self._schemas_strict - if ref_name not in schemas: - raise ValueError(f"No schema found for {ref_name}") - return schemas[ref_name] - - def get_schema(self, schema: Union[Reference, Schema]) -> Schema: - if isinstance(schema, Reference): - return self.get_referenced_schema(schema) - return schema - - def _get_root_referenced_schema(self, ref: Reference) -> Schema: - """Get the root reference or err.""" - schema = self.get_referenced_schema(ref) - while isinstance(schema, Reference): - schema = self.get_referenced_schema(schema) - return schema - - def _get_referenced_request_body( - self, ref: Reference - ) -> Optional[Union[Reference, RequestBody]]: - """Get a request body (or nested reference) or err.""" - ref_name = ref.ref.split("/")[-1] - request_bodies = self._request_bodies_strict - if ref_name not in request_bodies: - raise ValueError(f"No request body found for {ref_name}") - return request_bodies[ref_name] - - def _get_root_referenced_request_body( - self, ref: Reference - ) -> Optional[RequestBody]: - """Get the root request Body or err.""" - request_body = self._get_referenced_request_body(ref) - while isinstance(request_body, Reference): - request_body = self._get_referenced_request_body(request_body) - return request_body - - @staticmethod - def _alert_unsupported_spec(obj: dict) -> None: - """Alert if the spec is not supported.""" - warning_message = ( - " This may result in degraded performance." - + " Convert your OpenAPI spec to 3.1.* spec" - + " for better support." - ) - swagger_version = obj.get("swagger") - openapi_version = obj.get("openapi") - if isinstance(openapi_version, str): - if openapi_version != "3.1.0": +if _PYDANTIC_MAJOR_VERSION == 1: + from openapi_schema_pydantic import ( + Components, + OpenAPI, + Operation, + Parameter, + PathItem, + Paths, + Reference, + RequestBody, + Schema, + ) + + class OpenAPISpec(OpenAPI): + """OpenAPI Model that removes mis-formatted parts of the spec.""" + + @property + def _paths_strict(self) -> Paths: + if not self.paths: + raise ValueError("No paths found in spec") + return self.paths + + def _get_path_strict(self, path: str) -> PathItem: + path_item = self._paths_strict.get(path) + if not path_item: + raise ValueError(f"No path found for {path}") + return path_item + + @property + def _components_strict(self) -> Components: + """Get components or err.""" + if self.components is None: + raise ValueError("No components found in spec. ") + return self.components + + @property + def _parameters_strict(self) -> Dict[str, Union[Parameter, Reference]]: + """Get parameters or err.""" + parameters = self._components_strict.parameters + if parameters is None: + raise ValueError("No parameters found in spec. ") + return parameters + + @property + def _schemas_strict(self) -> Dict[str, Schema]: + """Get the dictionary of schemas or err.""" + schemas = self._components_strict.schemas + if schemas is None: + raise ValueError("No schemas found in spec. ") + return schemas + + @property + def _request_bodies_strict(self) -> Dict[str, Union[RequestBody, Reference]]: + """Get the request body or err.""" + request_bodies = self._components_strict.requestBodies + if request_bodies is None: + raise ValueError("No request body found in spec. ") + return request_bodies + + def _get_referenced_parameter( + self, ref: Reference + ) -> Union[Parameter, Reference]: + """Get a parameter (or nested reference) or err.""" + ref_name = ref.ref.split("/")[-1] + parameters = self._parameters_strict + if ref_name not in parameters: + raise ValueError(f"No parameter found for {ref_name}") + return parameters[ref_name] + + def _get_root_referenced_parameter(self, ref: Reference) -> Parameter: + """Get the root reference or err.""" + parameter = self._get_referenced_parameter(ref) + while isinstance(parameter, Reference): + parameter = self._get_referenced_parameter(parameter) + return parameter + + def get_referenced_schema(self, ref: Reference) -> Schema: + """Get a schema (or nested reference) or err.""" + ref_name = ref.ref.split("/")[-1] + schemas = self._schemas_strict + if ref_name not in schemas: + raise ValueError(f"No schema found for {ref_name}") + return schemas[ref_name] + + def get_schema(self, schema: Union[Reference, Schema]) -> Schema: + if isinstance(schema, Reference): + return self.get_referenced_schema(schema) + return schema + + def _get_root_referenced_schema(self, ref: Reference) -> Schema: + """Get the root reference or err.""" + schema = self.get_referenced_schema(ref) + while isinstance(schema, Reference): + schema = self.get_referenced_schema(schema) + return schema + + def _get_referenced_request_body( + self, ref: Reference + ) -> Optional[Union[Reference, RequestBody]]: + """Get a request body (or nested reference) or err.""" + ref_name = ref.ref.split("/")[-1] + request_bodies = self._request_bodies_strict + if ref_name not in request_bodies: + raise ValueError(f"No request body found for {ref_name}") + return request_bodies[ref_name] + + def _get_root_referenced_request_body( + self, ref: Reference + ) -> Optional[RequestBody]: + """Get the root request Body or err.""" + request_body = self._get_referenced_request_body(ref) + while isinstance(request_body, Reference): + request_body = self._get_referenced_request_body(request_body) + return request_body + + @staticmethod + def _alert_unsupported_spec(obj: dict) -> None: + """Alert if the spec is not supported.""" + warning_message = ( + " This may result in degraded performance." + + " Convert your OpenAPI spec to 3.1.* spec" + + " for better support." + ) + swagger_version = obj.get("swagger") + openapi_version = obj.get("openapi") + if isinstance(openapi_version, str): + if openapi_version != "3.1.0": + logger.warning( + f"Attempting to load an OpenAPI {openapi_version}" + f" spec. {warning_message}" + ) + else: + pass + elif isinstance(swagger_version, str): logger.warning( - f"Attempting to load an OpenAPI {openapi_version}" + f"Attempting to load a Swagger {swagger_version}" f" spec. {warning_message}" ) else: - pass - elif isinstance(swagger_version, str): - logger.warning( - f"Attempting to load a Swagger {swagger_version}" - f" spec. {warning_message}" - ) - else: - raise ValueError( - "Attempting to load an unsupported spec:" - f"\n\n{obj}\n{warning_message}" - ) - - @classmethod - def parse_obj(cls, obj: dict) -> "OpenAPISpec": - try: - cls._alert_unsupported_spec(obj) - return super().parse_obj(obj) - except ValidationError as e: - # We are handling possibly misconfigured specs and want to do a best-effort - # job to get a reasonable interface out of it. - new_obj = copy.deepcopy(obj) - for error in e.errors(): - keys = error["loc"] - item = new_obj - for key in keys[:-1]: - item = item[key] - item.pop(keys[-1], None) - return cls.parse_obj(new_obj) - - @classmethod - def from_spec_dict(cls, spec_dict: dict) -> "OpenAPISpec": - """Get an OpenAPI spec from a dict.""" - return cls.parse_obj(spec_dict) - - @classmethod - def from_text(cls, text: str) -> "OpenAPISpec": - """Get an OpenAPI spec from a text.""" - try: - spec_dict = json.loads(text) - except json.JSONDecodeError: - spec_dict = yaml.safe_load(text) - return cls.from_spec_dict(spec_dict) - - @classmethod - def from_file(cls, path: Union[str, Path]) -> "OpenAPISpec": - """Get an OpenAPI spec from a file path.""" - path_ = path if isinstance(path, Path) else Path(path) - if not path_.exists(): - raise FileNotFoundError(f"{path} does not exist") - with path_.open("r") as f: - return cls.from_text(f.read()) + raise ValueError( + "Attempting to load an unsupported spec:" + f"\n\n{obj}\n{warning_message}" + ) - @classmethod - def from_url(cls, url: str) -> "OpenAPISpec": - """Get an OpenAPI spec from a URL.""" - response = requests.get(url) - return cls.from_text(response.text) - - @property - def base_url(self) -> str: - """Get the base url.""" - return self.servers[0].url - - def get_methods_for_path(self, path: str) -> List[str]: - """Return a list of valid methods for the specified path.""" - path_item = self._get_path_strict(path) - results = [] - for method in HTTPVerb: - operation = getattr(path_item, method.value, None) - if isinstance(operation, Operation): - results.append(method.value) - return results - - def get_parameters_for_path(self, path: str) -> List[Parameter]: - path_item = self._get_path_strict(path) - parameters = [] - if not path_item.parameters: - return [] - for parameter in path_item.parameters: - if isinstance(parameter, Reference): - parameter = self._get_root_referenced_parameter(parameter) - parameters.append(parameter) - return parameters - - def get_operation(self, path: str, method: str) -> Operation: - """Get the operation object for a given path and HTTP method.""" - path_item = self._get_path_strict(path) - operation_obj = getattr(path_item, method, None) - if not isinstance(operation_obj, Operation): - raise ValueError(f"No {method} method found for {path}") - return operation_obj - - def get_parameters_for_operation(self, operation: Operation) -> List[Parameter]: - """Get the components for a given operation.""" - parameters = [] - if operation.parameters: - for parameter in operation.parameters: + @classmethod + def parse_obj(cls, obj: dict) -> OpenAPISpec: + try: + cls._alert_unsupported_spec(obj) + return super().parse_obj(obj) + except ValidationError as e: + # We are handling possibly misconfigured specs and + # want to do a best-effort job to get a reasonable interface out of it. + new_obj = copy.deepcopy(obj) + for error in e.errors(): + keys = error["loc"] + item = new_obj + for key in keys[:-1]: + item = item[key] + item.pop(keys[-1], None) + return cls.parse_obj(new_obj) + + @classmethod + def from_spec_dict(cls, spec_dict: dict) -> OpenAPISpec: + """Get an OpenAPI spec from a dict.""" + return cls.parse_obj(spec_dict) + + @classmethod + def from_text(cls, text: str) -> OpenAPISpec: + """Get an OpenAPI spec from a text.""" + try: + spec_dict = json.loads(text) + except json.JSONDecodeError: + spec_dict = yaml.safe_load(text) + return cls.from_spec_dict(spec_dict) + + @classmethod + def from_file(cls, path: Union[str, Path]) -> OpenAPISpec: + """Get an OpenAPI spec from a file path.""" + path_ = path if isinstance(path, Path) else Path(path) + if not path_.exists(): + raise FileNotFoundError(f"{path} does not exist") + with path_.open("r") as f: + return cls.from_text(f.read()) + + @classmethod + def from_url(cls, url: str) -> OpenAPISpec: + """Get an OpenAPI spec from a URL.""" + response = requests.get(url) + return cls.from_text(response.text) + + @property + def base_url(self) -> str: + """Get the base url.""" + return self.servers[0].url + + def get_methods_for_path(self, path: str) -> List[str]: + """Return a list of valid methods for the specified path.""" + path_item = self._get_path_strict(path) + results = [] + for method in HTTPVerb: + operation = getattr(path_item, method.value, None) + if isinstance(operation, Operation): + results.append(method.value) + return results + + def get_parameters_for_path(self, path: str) -> List[Parameter]: + path_item = self._get_path_strict(path) + parameters = [] + if not path_item.parameters: + return [] + for parameter in path_item.parameters: if isinstance(parameter, Reference): parameter = self._get_root_referenced_parameter(parameter) parameters.append(parameter) - return parameters - - def get_request_body_for_operation( - self, operation: Operation - ) -> Optional[RequestBody]: - """Get the request body for a given operation.""" - request_body = operation.requestBody - if isinstance(request_body, Reference): - request_body = self._get_root_referenced_request_body(request_body) - return request_body - - @staticmethod - def get_cleaned_operation_id(operation: Operation, path: str, method: str) -> str: - """Get a cleaned operation id from an operation id.""" - operation_id = operation.operationId - if operation_id is None: - # Replace all punctuation of any kind with underscore - path = re.sub(r"[^a-zA-Z0-9]", "_", path.lstrip("/")) - operation_id = f"{path}_{method}" - return operation_id.replace("-", "_").replace(".", "_").replace("/", "_") + return parameters + + def get_operation(self, path: str, method: str) -> Operation: + """Get the operation object for a given path and HTTP method.""" + path_item = self._get_path_strict(path) + operation_obj = getattr(path_item, method, None) + if not isinstance(operation_obj, Operation): + raise ValueError(f"No {method} method found for {path}") + return operation_obj + + def get_parameters_for_operation(self, operation: Operation) -> List[Parameter]: + """Get the components for a given operation.""" + parameters = [] + if operation.parameters: + for parameter in operation.parameters: + if isinstance(parameter, Reference): + parameter = self._get_root_referenced_parameter(parameter) + parameters.append(parameter) + return parameters + + def get_request_body_for_operation( + self, operation: Operation + ) -> Optional[RequestBody]: + """Get the request body for a given operation.""" + request_body = operation.requestBody + if isinstance(request_body, Reference): + request_body = self._get_root_referenced_request_body(request_body) + return request_body + + @staticmethod + def get_cleaned_operation_id( + operation: Operation, path: str, method: str + ) -> str: + """Get a cleaned operation id from an operation id.""" + operation_id = operation.operationId + if operation_id is None: + # Replace all punctuation of any kind with underscore + path = re.sub(r"[^a-zA-Z0-9]", "_", path.lstrip("/")) + operation_id = f"{path}_{method}" + return operation_id.replace("-", "_").replace(".", "_").replace("/", "_") + +else: + + class OpenAPISpec: # type: ignore[no-redef] + """Shim for pydantic version >=2""" + + def __init__(self) -> None: + raise NotImplementedError("Only supported for pydantic version 1") diff --git a/libs/langchain/tests/unit_tests/tools/openapi/test_api_models.py b/libs/langchain/tests/unit_tests/tools/openapi/test_api_models.py index 945309aafa..6da34e6224 100644 --- a/libs/langchain/tests/unit_tests/tools/openapi/test_api_models.py +++ b/libs/langchain/tests/unit_tests/tools/openapi/test_api_models.py @@ -4,6 +4,18 @@ import os from pathlib import Path from typing import Iterable, List, Tuple +import pytest + +# Keep at top of file to ensure that pydantic test can be skipped before +# pydantic v1 related imports are attempted by openapi_schema_pydantic. +from langchain import _PYDANTIC_MAJOR_VERSION + +if _PYDANTIC_MAJOR_VERSION != 1: + pytest.skip( + f"Pydantic major version {_PYDANTIC_MAJOR_VERSION} is not supported.", + allow_module_level=True, + ) + import pytest import yaml from openapi_schema_pydantic import (