|
|
|
"""SQLAlchemy wrapper around a database."""
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
from typing import Any, Iterable, List, Optional
|
|
|
|
|
|
|
|
from sqlalchemy import create_engine, inspect
|
|
|
|
from sqlalchemy.engine import Engine
|
|
|
|
|
|
|
|
|
|
|
|
class SQLDatabase:
|
|
|
|
"""SQLAlchemy wrapper around a database."""
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
engine: Engine,
|
|
|
|
schema: Optional[str] = None,
|
|
|
|
ignore_tables: Optional[List[str]] = None,
|
|
|
|
include_tables: Optional[List[str]] = None,
|
|
|
|
sample_rows_in_table_info: int = 0,
|
|
|
|
# TODO: deprecate.
|
|
|
|
sample_row_in_table_info: bool = False,
|
|
|
|
):
|
|
|
|
"""Create engine from database URI."""
|
|
|
|
if sample_row_in_table_info and sample_rows_in_table_info > 0:
|
|
|
|
raise ValueError(
|
|
|
|
"Only one of `sample_row_in_table_info` "
|
|
|
|
"and `sample_rows_in_table_info` should be set"
|
|
|
|
)
|
|
|
|
self._engine = engine
|
|
|
|
self._schema = schema
|
|
|
|
if include_tables and ignore_tables:
|
|
|
|
raise ValueError("Cannot specify both include_tables and ignore_tables")
|
|
|
|
|
|
|
|
self._inspector = inspect(self._engine)
|
|
|
|
self._all_tables = self._inspector.get_table_names(schema=schema)
|
|
|
|
self._include_tables = include_tables or []
|
|
|
|
if self._include_tables:
|
|
|
|
missing_tables = set(self._include_tables).difference(self._all_tables)
|
|
|
|
if missing_tables:
|
|
|
|
raise ValueError(
|
|
|
|
f"include_tables {missing_tables} not found in database"
|
|
|
|
)
|
|
|
|
self._ignore_tables = ignore_tables or []
|
|
|
|
if self._ignore_tables:
|
|
|
|
missing_tables = set(self._ignore_tables).difference(self._all_tables)
|
|
|
|
if missing_tables:
|
|
|
|
raise ValueError(
|
|
|
|
f"ignore_tables {missing_tables} not found in database"
|
|
|
|
)
|
|
|
|
self._sample_rows_in_table_info = sample_rows_in_table_info
|
|
|
|
# TODO: deprecate
|
|
|
|
if sample_row_in_table_info:
|
|
|
|
self._sample_rows_in_table_info = 1
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def from_uri(cls, database_uri: str, **kwargs: Any) -> SQLDatabase:
|
|
|
|
"""Construct a SQLAlchemy engine from URI."""
|
|
|
|
return cls(create_engine(database_uri), **kwargs)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def dialect(self) -> str:
|
|
|
|
"""Return string representation of dialect to use."""
|
|
|
|
return self._engine.dialect.name
|
|
|
|
|
|
|
|
def get_table_names(self) -> Iterable[str]:
|
|
|
|
"""Get names of tables available."""
|
|
|
|
if self._include_tables:
|
|
|
|
return self._include_tables
|
|
|
|
return set(self._all_tables) - set(self._ignore_tables)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def table_info(self) -> str:
|
|
|
|
"""Information about all tables in the database."""
|
|
|
|
return self.get_table_info()
|
|
|
|
|
|
|
|
def get_table_info(self, table_names: Optional[List[str]] = None) -> str:
|
|
|
|
"""Get information about specified tables.
|
|
|
|
|
|
|
|
If `sample_rows_in_table_info`, the specified number of sample rows will be
|
|
|
|
appended to each table description. This can increase performance as
|
|
|
|
demonstrated by Rajkumar et al, 2022 (https://arxiv.org/abs/2204.00498).
|
|
|
|
"""
|
|
|
|
all_table_names = self.get_table_names()
|
|
|
|
if table_names is not None:
|
|
|
|
missing_tables = set(table_names).difference(all_table_names)
|
|
|
|
if missing_tables:
|
|
|
|
raise ValueError(f"table_names {missing_tables} not found in database")
|
|
|
|
all_table_names = table_names
|
|
|
|
|
|
|
|
template = "Table '{table_name}' has columns: {columns}."
|
|
|
|
|
|
|
|
tables = []
|
|
|
|
for table_name in all_table_names:
|
|
|
|
|
|
|
|
columns = []
|
|
|
|
for column in self._inspector.get_columns(table_name, schema=self._schema):
|
|
|
|
columns.append(f"{column['name']} ({str(column['type'])})")
|
|
|
|
column_str = ", ".join(columns)
|
|
|
|
table_str = template.format(table_name=table_name, columns=column_str)
|
|
|
|
|
|
|
|
if self._sample_rows_in_table_info:
|
|
|
|
row_template = (
|
|
|
|
" Here is an example of {n_rows} rows from this table "
|
|
|
|
"(long strings are truncated):\n"
|
|
|
|
"{sample_rows}"
|
|
|
|
)
|
|
|
|
sample_rows = self.run(
|
|
|
|
f"SELECT * FROM '{table_name}' LIMIT "
|
|
|
|
f"{self._sample_rows_in_table_info}"
|
|
|
|
)
|
|
|
|
sample_rows = eval(sample_rows)
|
|
|
|
if len(sample_rows) > 0:
|
|
|
|
n_rows = len(sample_rows)
|
|
|
|
sample_rows = "\n".join(
|
|
|
|
[" ".join([str(i)[:100] for i in row]) for row in sample_rows]
|
|
|
|
)
|
|
|
|
table_str += row_template.format(
|
|
|
|
n_rows=n_rows, sample_rows=sample_rows
|
|
|
|
)
|
|
|
|
|
|
|
|
tables.append(table_str)
|
|
|
|
return "\n".join(tables)
|
|
|
|
|
|
|
|
def run(self, command: str) -> str:
|
|
|
|
"""Execute a SQL command and return a string representing the results.
|
|
|
|
|
|
|
|
If the statement returns rows, a string of the results is returned.
|
|
|
|
If the statement returns no rows, an empty string is returned.
|
|
|
|
"""
|
|
|
|
with self._engine.begin() as connection:
|
|
|
|
if self._schema is not None:
|
|
|
|
connection.exec_driver_sql(f"SET search_path TO {self._schema}")
|
|
|
|
cursor = connection.exec_driver_sql(command)
|
|
|
|
if cursor.returns_rows:
|
|
|
|
result = cursor.fetchall()
|
|
|
|
return str(result)
|
|
|
|
return ""
|