Description
I am planning to integrate the LangGraph PostgreSQL checkpointer into a FastAPI application where endpoints execute LangGraph agents.
The application will handle concurrent API requests, each associated with its own LangGraph thread_id. I want to ensure correctness while avoiding unnecessary serialization or performance bottlenecks.
Current Setup
I plan to create a global connection pool and a global checkpointer (saver) during FastAPI startup:
from psycopg_pool import AsyncConnectionPool
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
DB_URI = "postgresql://postgres:postgres@localhost:5442/postgres?sslmode=disable"
pool = AsyncConnectionPool(
conninfo=DB_URI,
min_size=4,
max_size=20,
kwargs={"autocommit": True},
)
checkpointer = AsyncPostgresSaver(pool)
await checkpointer.setup()
The LangGraph graph is compiled once and reused across requests:
graph = workflow.compile(checkpointer=checkpointer)
Graphs are executed using streaming:
async for chunk in graph.astream(state, config, stream_mode="updates", subgraphs=True):
_, state_chunk = chunk
stage, message = list(state_chunk.items())[0]
print("stage:", stage, ", message:", message)
Internal Lock in PostgresSaver
While inspecting PostgresSaver and AsyncPostgresSaver, I noticed that both implementations acquire an internal lock inside the _cursor context manager.
@asynccontextmanager
async def _cursor(
self, *, pipeline: bool = False
) -> AsyncIterator[AsyncCursor[DictRow]]:
"""
...
...
"""
async with self.lock, _ainternal.get_connection(self.conn) as conn:
This appears to serialize all checkpoint read/write operations per saver instance, even when:
- A shared PostgreSQL connection pool is used
- PostgreSQL itself can handle parallel queries
This suggests that using a single global saver may introduce a throughput bottleneck under concurrent FastAPI requests. Do I understand that correctly that concurrent requests are going to be serialized?
Questions
-
Is it safe and recommended to create a single global Saver and share it across FastAPI requests when using a connection pool?
-
Does the internal saver lock intentionally serialize all checkpoint I/O operations per saver instance?
-
For high-concurrency FastAPI applications, what is the recommended pattern?
A single global saver?
One saver per request (with graph compilation per request)?
Process-level scaling (multiple Uvicorn/Gunicorn workers)? -
I noticed that RedisSaver does not appear to use a similar lock. Is Redis considered the preferred backend for high-concurrency async web applications?
I would prefer not to initialize a new checkpointer or recompile the graph per FastAPI request unless that is the recommended approach. While graph compilation is relatively fast, it still adds overhead and complexity.
At the same time, I want to avoid unnecessary serialization of concurrent requests.
Documentation
In the documentation Memory - Docs by LangChain ), I see an example usage pattern that appears to initialize a new checkpointer and recompile the graph for each FastAPI request.
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
DB_URI = "postgresql://postgres:postgres@localhost:5442/postgres?sslmode=disable"
async with AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
Thank you for taking the time to respond.