The `_amake_session()` method does not allow modifying the
`self.session_factory` with
anything other than `async_sessionmaker`. This prohibits advanced uses
of `index()`.
In a RAG architecture, it is necessary to import document chunks.
To keep track of the links between chunks and documents, we can use the
`index()` API.
This API proposes to use an SQL-type record manager.
In a classic use case, using `SQLRecordManager` and a vector database,
it is impossible
to guarantee the consistency of the import. Indeed, if a crash occurs
during the import
(problem with the network, ...)
there is an inconsistency between the SQL database and the vector
database.
With the
[PR](https://github.com/langchain-ai/langchain-postgres/pull/32) we are
proposing for `langchain-postgres`,
it is now possible to guarantee the consistency of the import of chunks
into
a vector database. It's possible only if the outer session is built
with the connection.
```python
def main():
db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
engine = create_engine(db_url, echo=True)
embeddings = FakeEmbeddings()
pgvector:VectorStore = PGVector(
embeddings=embeddings,
connection=engine,
)
record_manager = SQLRecordManager(
namespace="namespace",
engine=engine,
)
record_manager.create_schema()
with engine.connect() as connection:
session_maker = scoped_session(sessionmaker(bind=connection))
# NOTE: Update session_factories
record_manager.session_factory = session_maker
pgvector.session_maker = session_maker
with connection.begin():
loader = CSVLoader(
"data/faq/faq.csv",
source_column="source",
autodetect_encoding=True,
)
result = index(
source_id_key="source",
docs_source=loader.load()[:1],
cleanup="incremental",
vector_store=pgvector,
record_manager=record_manager,
)
print(result)
```
The same thing is possible asynchronously, but a bug in
`sql_record_manager.py`
in `_amake_session()` must first be fixed.
```python
async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]:
"""Create a session and close it after use."""
# FIXME: REMOVE if not isinstance(self.session_factory, async_sessionmaker):~~
if not isinstance(self.engine, AsyncEngine):
raise AssertionError("This method is not supported for sync engines.")
async with self.session_factory() as session:
yield session
```
Then, it is possible to do the same thing asynchronously:
```python
async def main():
db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
engine = create_async_engine(db_url, echo=True)
embeddings = FakeEmbeddings()
pgvector:VectorStore = PGVector(
embeddings=embeddings,
connection=engine,
)
record_manager = SQLRecordManager(
namespace="namespace",
engine=engine,
async_mode=True,
)
await record_manager.acreate_schema()
async with engine.connect() as connection:
session_maker = async_scoped_session(
async_sessionmaker(bind=connection),
scopefunc=current_task)
record_manager.session_factory = session_maker
pgvector.session_maker = session_maker
async with connection.begin():
loader = CSVLoader(
"data/faq/faq.csv",
source_column="source",
autodetect_encoding=True,
)
result = await aindex(
source_id_key="source",
docs_source=loader.load()[:1],
cleanup="incremental",
vector_store=pgvector,
record_manager=record_manager,
)
print(result)
asyncio.run(main())
```
---------
Signed-off-by: Rahul Tripathi <rauhl.psit.ec@gmail.com>
Co-authored-by: Bagatur <22008038+baskaryan@users.noreply.github.com>
Co-authored-by: Sean <sean@upstage.ai>
Co-authored-by: JuHyung-Son <sonju0427@gmail.com>
Co-authored-by: Erick Friis <erick@langchain.dev>
Co-authored-by: YISH <mokeyish@hotmail.com>
Co-authored-by: Bagatur <baskaryan@gmail.com>
Co-authored-by: Jason_Chen <820542443@qq.com>
Co-authored-by: Joan Fontanals <joan.fontanals.martinez@jina.ai>
Co-authored-by: Pavlo Paliychuk <pavlo.paliychuk.ca@gmail.com>
Co-authored-by: fzowl <160063452+fzowl@users.noreply.github.com>
Co-authored-by: samanhappy <samanhappy@gmail.com>
Co-authored-by: Lei Zhang <zhanglei@apache.org>
Co-authored-by: Tomaz Bratanic <bratanic.tomaz@gmail.com>
Co-authored-by: merdan <48309329+merdan-9@users.noreply.github.com>
Co-authored-by: ccurme <chester.curme@gmail.com>
Co-authored-by: Andres Algaba <andresalgaba@gmail.com>
Co-authored-by: davidefantiniIntel <115252273+davidefantiniIntel@users.noreply.github.com>
Co-authored-by: Jingpan Xiong <71321890+klaus-xiong@users.noreply.github.com>
Co-authored-by: kaka <kaka@zbyte-inc.cloud>
Co-authored-by: jingsi <jingsi@leadincloud.com>
Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
Co-authored-by: Rahul Triptahi <rahul.psit.ec@gmail.com>
Co-authored-by: Rahul Tripathi <rauhl.psit.ec@gmail.com>
Co-authored-by: Shengsheng Huang <shannie.huang@gmail.com>
Co-authored-by: Michael Schock <mjschock@users.noreply.github.com>
Co-authored-by: Anish Chakraborty <anish749@users.noreply.github.com>
Co-authored-by: am-kinetica <85610855+am-kinetica@users.noreply.github.com>
Co-authored-by: Dristy Srivastava <58721149+dristysrivastava@users.noreply.github.com>
Co-authored-by: Matt <matthew.gotteiner@microsoft.com>
Co-authored-by: William FH <13333726+hinthornw@users.noreply.github.com>
- **Description:**
- Add DocumentManager class, which is a nosql record manager.
- In order to use index and aindex in
libs/langchain/langchain/indexes/_api.py, DocumentManager inherits
RecordManager.
- Also I added the MongoDB implementation of Document Manager too.
- **Dependencies:** pymongo, motor
<!-- Thank you for contributing to LangChain!
Please title your PR "<package>: <description>", where <package> is
whichever of langchain, community, core, experimental, etc. is being
modified.
Replace this entire comment with:
- **Description:** Add DocumentManager class, which is a no sql record
manager. To use index method and aindex method in indexes._api.py,
Document Manager inherits RecordManager.Add the MongoDB implementation
of Document Manager.
- **Dependencies:** pymongo, motor
Please make sure your PR is passing linting and testing before
submitting. Run `make format`, `make lint` and `make test` from the root
of the package you've modified to check this locally.
See contribution guidelines for more information on how to write/run
tests, lint, etc: https://python.langchain.com/docs/contributing/
If you're adding a new integration, please include:
1. a test for the integration, preferably unit tests that do not rely on
network access,
2. an example notebook showing its use. It lives in
`docs/docs/integrations` directory.
If no one reviews your PR within a few days, please @-mention one of
@baskaryan, @eyurtsev, @hwchase17.
-->
---------
Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>