Add schema property to sql database utility class (#448) (#462)

Co-authored-by: Harrison Chase <hw.chase.17@gmail.com>

Signed-off-by: Diwank Singh Tomer <diwank.singh@gmail.com>
Co-authored-by: Nuno Campos <nuno@boringbits.io>
Co-authored-by: Diwank Singh Tomer <diwank.singh@gmail.com>
harrison/callback-updates
Harrison Chase 1 year ago committed by GitHub
parent 451665cfdf
commit 95157d0aad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -13,16 +13,18 @@ class SQLDatabase:
def __init__(
self,
engine: Engine,
schema: Optional[str] = None,
ignore_tables: Optional[List[str]] = None,
include_tables: Optional[List[str]] = None,
):
"""Create engine from database URI."""
self._engine = engine
self._schema = schema
if include_tables and ignore_tables:
raise ValueError("Cannot specify both include_tables and ignore_tables")
self._inspector = inspect(self._engine)
self._all_tables = self._inspector.get_table_names()
self._all_tables = self._inspector.get_table_names(schema=schema)
self._include_tables = include_tables or []
if self._include_tables:
missing_tables = set(self._include_tables).difference(self._all_tables)
@ -60,7 +62,7 @@ class SQLDatabase:
tables = []
for table_name in self._get_table_names():
columns = []
for column in self._inspector.get_columns(table_name):
for column in self._inspector.get_columns(table_name, schema=self._schema):
columns.append(f"{column['name']} ({str(column['type'])})")
column_str = ", ".join(columns)
table_str = template.format(table_name=table_name, columns=column_str)
@ -74,6 +76,8 @@ class SQLDatabase:
If the statement returns no rows, an empty string is returned.
"""
with self._engine.connect() as connection:
if self._schema is not None:
connection.exec_driver_sql(f"SET search_path TO {self._schema}")
cursor = connection.exec_driver_sql(command)
if cursor.returns_rows:
result = cursor.fetchall()

3874
poetry.lock generated

File diff suppressed because it is too large Load Diff

@ -33,6 +33,7 @@ weaviate-client = {version = "^3", optional = true}
pytest = "^7.2.0"
pytest-cov = "^4.0.0"
pytest-dotenv = "^0.5.2"
duckdb-engine = "^0.6.6"
pytest-watcher = "^0.2.6"
[tool.poetry.group.lint.dependencies]

@ -0,0 +1,66 @@
"""Test SQL database wrapper with schema support.
Using DuckDB as SQLite does not support schemas.
"""
from sqlalchemy import (
Column,
Integer,
MetaData,
Sequence,
String,
Table,
create_engine,
event,
insert,
schema,
)
from langchain.sql_database import SQLDatabase
metadata_obj = MetaData()
event.listen(metadata_obj, "before_create", schema.CreateSchema("schema_a"))
event.listen(metadata_obj, "before_create", schema.CreateSchema("schema_b"))
user = Table(
"user",
metadata_obj,
Column("user_id", Integer, Sequence("user_id_seq"), primary_key=True),
Column("user_name", String, nullable=False),
schema="schema_a",
)
company = Table(
"company",
metadata_obj,
Column("company_id", Integer, Sequence("company_id_seq"), primary_key=True),
Column("company_location", String, nullable=False),
schema="schema_b",
)
def test_table_info() -> None:
"""Test that table info is constructed properly."""
engine = create_engine("duckdb:///:memory:")
metadata_obj.create_all(engine)
db = SQLDatabase(engine, schema="schema_a")
output = db.table_info
expected_output = (
"Table 'user' has columns: user_id (INTEGER), user_name (VARCHAR).",
)
assert sorted(output.split("\n")) == sorted(expected_output)
def test_sql_database_run() -> None:
"""Test that commands can be run successfully and returned in correct format."""
engine = create_engine("duckdb:///:memory:")
metadata_obj.create_all(engine)
stmt = insert(user).values(user_id=13, user_name="Harrison")
with engine.connect() as conn:
conn.execute(stmt)
db = SQLDatabase(engine, schema="schema_a")
command = 'select user_name from "user" where user_id = 13'
output = db.run(command)
expected_output = "[('Harrison',)]"
assert output == expected_output
Loading…
Cancel
Save