Hello @mgarcia
I was able to verify and replicate your bug using the following script:
import logging
import os
import tempfile
import time
from langchain_chroma import Chroma
from langchain_classic.indexes import SQLRecordManager, index
from langchain_core.documents import Document
from langchain_core.embeddings import FakeEmbeddings
from sqlalchemy import text
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
def stable_docs(n: int = 100) -> list[Document]:
"""Documents whose content never changes between runs."""
return [
Document(
page_content=f"Stable document chunk {i}. This content never changes.",
metadata={"source": "stable_source", "chunk_id": i},
)
for i in range(n)
]
def volatile_docs(n: int = 37, run: int = 0) -> list[Document]:
"""Documents that change on every run (simulates a timestamp/version field
in the source system being updated, causing the hash to differ each time).
"""
return [
Document(
page_content=(
f"Volatile chunk {i}. "
f"Last-modified epoch: {int(time.time())} run={run}."
),
metadata={"source": "volatile_source", "chunk_id": i},
)
for i in range(n)
]
def db_size(path: str) -> int:
return os.path.getsize(path)
def run() -> None:
n_stable = 100
n_volatile = 37
n_rounds = 6
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "record_manager.db")
db_url = f"sqlite:///{db_path}"
embeddings = FakeEmbeddings(size=128)
vector_store = Chroma(
collection_name="bug_repro",
embedding_function=embeddings,
)
record_manager = SQLRecordManager(namespace="bug_repro", db_url=db_url)
record_manager.create_schema()
sizes: list[int] = []
for round_num in range(1, n_rounds + 1):
docs = stable_docs(n_stable) + volatile_docs(n_volatile, run=round_num)
result = index(
docs,
record_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
key_encoder="blake2b",
)
size = db_size(db_path)
sizes.append(size)
logger.info(
"Round %d — added=%d, skipped=%d, deleted=%d, net=%+d, db_size=%d bytes",
round_num,
result["num_added"],
result["num_skipped"],
result["num_deleted"],
result["num_added"] - result["num_deleted"],
size,
)
initial_size = sizes[0]
final_size = sizes[-1]
logger.info(
"DB grew by %d bytes over %d churn rounds despite net additions = 0",
final_size - initial_size,
n_rounds - 1,
)
# Demonstrate VACUUM reclaims the space
pre = db_size(db_path)
with record_manager.engine.connect() as conn:
conn.execute(text("VACUUM"))
conn.commit()
post = db_size(db_path)
logger.info(
"After VACUUM — before=%d bytes, after=%d bytes, reclaimed=%d bytes",
pre,
post,
pre - post,
)
if __name__ == "__main__":
run()
The problem Every time you re-ingest documents with cleanup="incremental" , some old records get deleted from the SQLite database and new ones get inserted. SQLite never shrinks the file after deletions, it just leaves “dead space” (free pages) behind. Over many runs, these pile up and the file keeps growing even when nothing new is actually added.
The fix, one line: enable auto_vacuum
SQLite has a built-in setting called auto_vacuum = FULL that tells it to automatically clean up free pages after every delete, keeping the file compact.
You just need to set this pragma once, right when you create the database connection, before any tables are written.
def _enable_sqlite_auto_vacuum(dbapi_connection: Any, _connection_record: Any) -> None:
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA auto_vacuum = FULL")
cursor.close()
And then you can add the following during initialisation of SQLRecordManager:
if self.dialect == "sqlite":
# Register the auto_vacuum listener on the underlying sync engine so
# it fires on every new connection regardless of whether the engine is
# sync or async.
sync_engine = _engine.sync_engine if isinstance(_engine, AsyncEngine) else _engine
event.listen(sync_engine, "connect", _enable_sqlite_auto_vacuum)
You can open this issue on Langchain repo and suggest the same fix.
I tested after implementing the above fix and now DB size shrinks.