feat: add option to ignore or restrict to SQL tables (#151)

`SQLDatabase` now accepts two `init` arguments:
1. `ignore_tables` to pass in a list of tables to not search over
2. `include_tables` to restrict to a list of tables to consider
harrison/save_metadatas
Nicholas Larus-Stone 2 years ago committed by GitHub
parent d2f9288be6
commit ca4b10bb74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,4 +1,6 @@
"""SQLAlchemy wrapper around a database."""
from typing import Any, Iterable, List, Optional
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine import Engine
@ -6,29 +8,57 @@ from sqlalchemy.engine import Engine
class SQLDatabase:
"""SQLAlchemy wrapper around a database."""
def __init__(self, engine: Engine):
def __init__(
self,
engine: Engine,
ignore_tables: Optional[List[str]] = None,
include_tables: Optional[List[str]] = None,
):
"""Create engine from database URI."""
self._engine = engine
if include_tables and ignore_tables:
raise ValueError("Cannot specify both include_tables and ignore_tables")
self._inspector = inspect(self._engine)
self._all_tables = self._inspector.get_table_names()
self._include_tables = include_tables or []
if self._include_tables:
missing_tables = set(self._include_tables).difference(self._all_tables)
if missing_tables:
raise ValueError(
f"include_tables {missing_tables} not found in database"
)
self._ignore_tables = ignore_tables or []
if self._ignore_tables:
missing_tables = set(self._ignore_tables).difference(self._all_tables)
if missing_tables:
raise ValueError(
f"ignore_tables {missing_tables} not found in database"
)
@classmethod
def from_uri(cls, database_uri: str) -> "SQLDatabase":
def from_uri(cls, database_uri: str, **kwargs: Any) -> "SQLDatabase":
"""Construct a SQLAlchemy engine from URI."""
return cls(create_engine(database_uri))
return cls(create_engine(database_uri), **kwargs)
@property
def dialect(self) -> str:
"""Return string representation of dialect to use."""
return self._engine.dialect.name
def _get_table_names(self) -> Iterable[str]:
if self._include_tables:
return self._include_tables
return set(self._all_tables) - set(self._ignore_tables)
@property
def table_info(self) -> str:
"""Information about all tables in the database."""
template = "The '{table_name}' table has columns: {columns}."
template = "Table '{table_name}' has columns: {columns}."
tables = []
inspector = inspect(self._engine)
for table_name in inspector.get_table_names():
for table_name in self._get_table_names():
columns = []
for column in inspector.get_columns(table_name):
for column in self._inspector.get_columns(table_name):
columns.append(f"{column['name']} ({str(column['type'])})")
column_str = ", ".join(columns)
table_str = template.format(table_name=table_name, columns=column_str)

@ -28,11 +28,11 @@ def test_table_info() -> None:
db = SQLDatabase(engine)
output = db.table_info
expected_output = (
"The 'company' table has columns: company_id (INTEGER), "
"company_location (VARCHAR).\n"
"The 'user' table has columns: user_id (INTEGER), user_name (VARCHAR(16))."
"Table 'company' has columns: company_id (INTEGER), "
"company_location (VARCHAR).",
"Table 'user' has columns: user_id (INTEGER), user_name (VARCHAR(16)).",
)
assert output == expected_output
assert sorted(output.split("\n")) == sorted(expected_output)
def test_sql_database_run() -> None:

Loading…
Cancel
Save