2023-02-14 05:48:41 +00:00
|
|
|
# flake8: noqa=E501
|
2022-10-28 06:21:47 +00:00
|
|
|
"""Test SQL database wrapper."""
|
|
|
|
|
2023-06-02 01:33:31 +00:00
|
|
|
from sqlalchemy import (
|
|
|
|
Column,
|
|
|
|
Integer,
|
|
|
|
MetaData,
|
|
|
|
String,
|
|
|
|
Table,
|
|
|
|
Text,
|
|
|
|
create_engine,
|
|
|
|
insert,
|
|
|
|
)
|
2022-10-28 06:21:47 +00:00
|
|
|
|
2023-06-02 01:33:31 +00:00
|
|
|
from langchain.sql_database import SQLDatabase, truncate_word
|
2022-10-28 06:21:47 +00:00
|
|
|
|
|
|
|
metadata_obj = MetaData()
|
|
|
|
|
|
|
|
user = Table(
|
|
|
|
"user",
|
|
|
|
metadata_obj,
|
|
|
|
Column("user_id", Integer, primary_key=True),
|
|
|
|
Column("user_name", String(16), nullable=False),
|
2023-06-02 01:33:31 +00:00
|
|
|
Column("user_bio", Text, nullable=True),
|
2022-10-28 06:21:47 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
company = Table(
|
|
|
|
"company",
|
|
|
|
metadata_obj,
|
|
|
|
Column("company_id", Integer, primary_key=True),
|
|
|
|
Column("company_location", String, nullable=False),
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def test_table_info() -> None:
|
|
|
|
"""Test that table info is constructed properly."""
|
|
|
|
engine = create_engine("sqlite:///:memory:")
|
|
|
|
metadata_obj.create_all(engine)
|
|
|
|
db = SQLDatabase(engine)
|
|
|
|
output = db.table_info
|
2023-02-16 07:53:37 +00:00
|
|
|
expected_output = """
|
2023-02-18 18:58:29 +00:00
|
|
|
CREATE TABLE user (
|
2023-02-16 07:53:37 +00:00
|
|
|
user_id INTEGER NOT NULL,
|
|
|
|
user_name VARCHAR(16) NOT NULL,
|
2023-06-02 01:33:31 +00:00
|
|
|
user_bio TEXT,
|
2023-02-16 07:53:37 +00:00
|
|
|
PRIMARY KEY (user_id)
|
2022-10-28 06:21:47 +00:00
|
|
|
)
|
2023-03-14 06:08:27 +00:00
|
|
|
/*
|
|
|
|
3 rows from user table:
|
2023-06-02 01:33:31 +00:00
|
|
|
user_id user_name user_bio
|
2023-03-14 06:08:27 +00:00
|
|
|
/*
|
2023-02-16 07:53:37 +00:00
|
|
|
|
|
|
|
|
|
|
|
CREATE TABLE company (
|
|
|
|
company_id INTEGER NOT NULL,
|
|
|
|
company_location VARCHAR NOT NULL,
|
|
|
|
PRIMARY KEY (company_id)
|
|
|
|
)
|
2023-03-14 06:08:27 +00:00
|
|
|
/*
|
|
|
|
3 rows from company table:
|
2023-02-16 07:53:37 +00:00
|
|
|
company_id company_location
|
2023-03-14 06:08:27 +00:00
|
|
|
*/
|
2023-02-16 07:53:37 +00:00
|
|
|
"""
|
|
|
|
|
|
|
|
assert sorted(" ".join(output.split())) == sorted(" ".join(expected_output.split()))
|
2022-10-28 06:21:47 +00:00
|
|
|
|
|
|
|
|
2023-02-07 02:56:18 +00:00
|
|
|
def test_table_info_w_sample_rows() -> None:
|
2023-01-28 21:37:07 +00:00
|
|
|
"""Test that table info is constructed properly."""
|
|
|
|
engine = create_engine("sqlite:///:memory:")
|
|
|
|
metadata_obj.create_all(engine)
|
2023-02-07 02:56:18 +00:00
|
|
|
values = [
|
2023-06-02 01:33:31 +00:00
|
|
|
{"user_id": 13, "user_name": "Harrison", "user_bio": "bio"},
|
|
|
|
{"user_id": 14, "user_name": "Chase", "user_bio": "bio"},
|
2023-02-07 02:56:18 +00:00
|
|
|
]
|
|
|
|
stmt = insert(user).values(values)
|
2023-01-28 21:37:07 +00:00
|
|
|
with engine.begin() as conn:
|
|
|
|
conn.execute(stmt)
|
|
|
|
|
2023-02-07 02:56:18 +00:00
|
|
|
db = SQLDatabase(engine, sample_rows_in_table_info=2)
|
2023-01-28 21:37:07 +00:00
|
|
|
|
|
|
|
output = db.table_info
|
2023-02-16 07:53:37 +00:00
|
|
|
|
|
|
|
expected_output = """
|
|
|
|
CREATE TABLE company (
|
|
|
|
company_id INTEGER NOT NULL,
|
|
|
|
company_location VARCHAR NOT NULL,
|
|
|
|
PRIMARY KEY (company_id)
|
|
|
|
)
|
2023-03-14 06:08:27 +00:00
|
|
|
/*
|
|
|
|
2 rows from company table:
|
2023-02-16 07:53:37 +00:00
|
|
|
company_id company_location
|
2023-03-14 06:08:27 +00:00
|
|
|
*/
|
2023-02-16 07:53:37 +00:00
|
|
|
|
|
|
|
CREATE TABLE user (
|
|
|
|
user_id INTEGER NOT NULL,
|
|
|
|
user_name VARCHAR(16) NOT NULL,
|
2023-06-02 01:33:31 +00:00
|
|
|
user_bio TEXT,
|
2023-02-16 07:53:37 +00:00
|
|
|
PRIMARY KEY (user_id)
|
|
|
|
)
|
2023-03-14 06:08:27 +00:00
|
|
|
/*
|
|
|
|
2 rows from user table:
|
2023-06-02 01:33:31 +00:00
|
|
|
user_id user_name user_bio
|
|
|
|
13 Harrison bio
|
|
|
|
14 Chase bio
|
2023-03-14 06:08:27 +00:00
|
|
|
*/
|
2023-02-16 07:53:37 +00:00
|
|
|
"""
|
|
|
|
|
|
|
|
assert sorted(output.split()) == sorted(expected_output.split())
|
2023-01-28 21:37:07 +00:00
|
|
|
|
|
|
|
|
2022-10-28 06:21:47 +00:00
|
|
|
def test_sql_database_run() -> None:
|
|
|
|
"""Test that commands can be run successfully and returned in correct format."""
|
|
|
|
engine = create_engine("sqlite:///:memory:")
|
|
|
|
metadata_obj.create_all(engine)
|
2023-06-02 01:33:31 +00:00
|
|
|
stmt = insert(user).values(
|
|
|
|
user_id=13, user_name="Harrison", user_bio="That is my Bio " * 24
|
|
|
|
)
|
2023-01-25 15:14:07 +00:00
|
|
|
with engine.begin() as conn:
|
2022-10-28 06:21:47 +00:00
|
|
|
conn.execute(stmt)
|
|
|
|
db = SQLDatabase(engine)
|
2023-06-02 01:33:31 +00:00
|
|
|
command = "select user_id, user_name, user_bio from user where user_id = 13"
|
2022-10-28 06:21:47 +00:00
|
|
|
output = db.run(command)
|
2023-06-02 01:33:31 +00:00
|
|
|
user_bio = "That is my Bio " * 19 + "That is my..."
|
|
|
|
expected_output = f"[(13, 'Harrison', '{user_bio}')]"
|
2022-10-28 06:21:47 +00:00
|
|
|
assert output == expected_output
|
2022-11-29 16:28:45 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_sql_database_run_update() -> None:
|
|
|
|
"""Test commands which return no rows return an empty string."""
|
|
|
|
engine = create_engine("sqlite:///:memory:")
|
|
|
|
metadata_obj.create_all(engine)
|
|
|
|
stmt = insert(user).values(user_id=13, user_name="Harrison")
|
2023-01-25 15:14:07 +00:00
|
|
|
with engine.begin() as conn:
|
2022-11-29 16:28:45 +00:00
|
|
|
conn.execute(stmt)
|
|
|
|
db = SQLDatabase(engine)
|
|
|
|
command = "update user set user_name='Updated' where user_id = 13"
|
|
|
|
output = db.run(command)
|
|
|
|
expected_output = ""
|
|
|
|
assert output == expected_output
|
2023-06-02 01:33:31 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_truncate_word() -> None:
|
|
|
|
assert truncate_word("Hello World", length=5) == "He..."
|
|
|
|
assert truncate_word("Hello World", length=0) == "Hello World"
|
|
|
|
assert truncate_word("Hello World", length=-10) == "Hello World"
|
|
|
|
assert truncate_word("Hello World", length=5, suffix="!!!") == "He!!!"
|
|
|
|
assert truncate_word("Hello World", length=12, suffix="!!!") == "Hello World"
|