langchain/langchain/sql_database.py
Jon Luo 5bf8772f26
add option to use user-defined SQL table info (#1347)
Currently, table information is gathered through SQLAlchemy as complete
table DDL and a user-selected number of sample rows from each table.
This PR adds the option to use user-defined table information instead of
automatically collecting it. This will use the provided table
information and fall back to the automatic gathering for tables that the
user didn't provide information for.

Off the top of my head, there are a few cases where this can be quite
useful:
- The first n rows of a table are uninformative, or very similar to one
another. In this case, hand-crafting example rows for a table such that
they provide the good, diverse information can be very helpful. Another
approach we can think about later is getting a random sample of n rows
instead of the first n rows, but there are some performance
considerations that need to be taken there. Even so, hand-crafting the
sample rows is useful and can guarantee the model sees informative data.
- The user doesn't want every column to be available to the model. This
is not an elegant way to fulfill this specific need since the user would
have to provide the table definition instead of a simple list of columns
to include or ignore, but it does work for this purpose.
- For the developers, this makes it a lot easier to compare/benchmark
the performance of different prompting structures for providing table
information in the prompt.

These are cases I've run into myself (particularly cases 1 and 3) and
I've found these changes useful. Personally, I keep custom table info
for a few tables in a yaml file for versioning and easy loading.

Definitely open to other opinions/approaches though!
2023-02-28 18:58:04 -08:00

190 lines
7.3 KiB
Python

"""SQLAlchemy wrapper around a database."""
from __future__ import annotations
from typing import Any, Iterable, List, Optional
from sqlalchemy import MetaData, create_engine, inspect, select
from sqlalchemy.engine import Engine
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.schema import CreateTable
class SQLDatabase:
"""SQLAlchemy wrapper around a database."""
def __init__(
self,
engine: Engine,
schema: Optional[str] = None,
metadata: Optional[MetaData] = None,
ignore_tables: Optional[List[str]] = None,
include_tables: Optional[List[str]] = None,
sample_rows_in_table_info: int = 3,
custom_table_info: Optional[dict] = None,
):
"""Create engine from database URI."""
self._engine = engine
self._schema = schema
if include_tables and ignore_tables:
raise ValueError("Cannot specify both include_tables and ignore_tables")
self._inspector = inspect(self._engine)
self._all_tables = set(self._inspector.get_table_names(schema=schema))
self._include_tables = set(include_tables) if include_tables else set()
if self._include_tables:
missing_tables = self._include_tables - self._all_tables
if missing_tables:
raise ValueError(
f"include_tables {missing_tables} not found in database"
)
self._ignore_tables = set(ignore_tables) if ignore_tables else set()
if self._ignore_tables:
missing_tables = self._ignore_tables - self._all_tables
if missing_tables:
raise ValueError(
f"ignore_tables {missing_tables} not found in database"
)
if not isinstance(sample_rows_in_table_info, int):
raise TypeError("sample_rows_in_table_info must be an integer")
self._sample_rows_in_table_info = sample_rows_in_table_info
self._custom_table_info = custom_table_info
if self._custom_table_info:
if not isinstance(self._custom_table_info, dict):
raise TypeError(
"table_info must be a dictionary with table names as keys and the "
"desired table info as values"
)
# only keep the tables that are also present in the database
intersection = set(self._custom_table_info).intersection(self._all_tables)
self._custom_table_info = dict(
(table, self._custom_table_info[table])
for table in self._custom_table_info
if table in intersection
)
self._metadata = metadata or MetaData()
self._metadata.reflect(bind=self._engine)
@classmethod
def from_uri(cls, database_uri: str, **kwargs: Any) -> SQLDatabase:
"""Construct a SQLAlchemy engine from URI."""
return cls(create_engine(database_uri), **kwargs)
@property
def dialect(self) -> str:
"""Return string representation of dialect to use."""
return self._engine.dialect.name
def get_table_names(self) -> Iterable[str]:
"""Get names of tables available."""
if self._include_tables:
return self._include_tables
return self._all_tables - self._ignore_tables
@property
def table_info(self) -> str:
"""Information about all tables in the database."""
return self.get_table_info()
def get_table_info(self, table_names: Optional[List[str]] = None) -> str:
"""Get information about specified tables.
Follows best practices as specified in: Rajkumar et al, 2022
(https://arxiv.org/abs/2204.00498)
If `sample_rows_in_table_info`, the specified number of sample rows will be
appended to each table description. This can increase performance as
demonstrated in the paper.
"""
all_table_names = self.get_table_names()
if table_names is not None:
missing_tables = set(table_names).difference(all_table_names)
if missing_tables:
raise ValueError(f"table_names {missing_tables} not found in database")
all_table_names = table_names
meta_tables = [
tbl
for tbl in self._metadata.sorted_tables
if tbl.name in set(all_table_names)
and not (self.dialect == "sqlite" and tbl.name.startswith("sqlite_"))
]
tables = []
for table in meta_tables:
if self._custom_table_info and table.name in self._custom_table_info:
tables.append(self._custom_table_info[table.name])
continue
# add create table command
create_table = str(CreateTable(table).compile(self._engine))
if self._sample_rows_in_table_info:
# build the select command
command = select(table).limit(self._sample_rows_in_table_info)
# save the command in string format
select_star = (
f"SELECT * FROM '{table.name}' LIMIT "
f"{self._sample_rows_in_table_info}"
)
# save the columns in string format
columns_str = " ".join([col.name for col in table.columns])
try:
# get the sample rows
with self._engine.connect() as connection:
sample_rows = connection.execute(command)
# shorten values in the sample rows
sample_rows = list(
map(lambda ls: [str(i)[:100] for i in ls], sample_rows)
)
# save the sample rows in string format
sample_rows_str = "\n".join([" ".join(row) for row in sample_rows])
# in some dialects when there are no rows in the table a
# 'ProgrammingError' is returned
except ProgrammingError:
sample_rows_str = ""
# build final info for table
tables.append(
create_table
+ select_star
+ ";\n"
+ columns_str
+ "\n"
+ sample_rows_str
)
else:
tables.append(create_table)
final_str = "\n\n".join(tables)
return final_str
def run(self, command: str, fetch: str = "all") -> str:
"""Execute a SQL command and return a string representing the results.
If the statement returns rows, a string of the results is returned.
If the statement returns no rows, an empty string is returned.
"""
with self._engine.begin() as connection:
if self._schema is not None:
connection.exec_driver_sql(f"SET search_path TO {self._schema}")
cursor = connection.exec_driver_sql(command)
if cursor.returns_rows:
if fetch == "all":
result = cursor.fetchall()
elif fetch == "one":
result = cursor.fetchone()[0]
else:
raise ValueError("Fetch parameter must be either 'one' or 'all'")
return str(result)
return ""