How to fix asynchronous context manager errors in PostgresStore

import asyncio
import os

import dotenv
from deepagents import create_deep_agent
from deepagents.backends import CompositeBackend, StateBackend, StoreBackend
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.store.postgres import PostgresStore

from model.llm.deepseek.deepseek_llm import deepseek

dotenv.load_dotenv()


async def deep_agent_run():
    async with PostgresStore.from_conn_string(os.getenv("DATABASE_URL")) as store:
        async with PostgresSaver.from_conn_string(os.getenv("DATABASE_URL")) as checkpointer:
            await store.setup()
            await checkpointer.setup()

            deep_agent = create_deep_agent(
                model=deepseek,
                checkpointer=checkpointer,
                store=store ,
                backend=lambda rt: CompositeBackend(
                default=StateBackend(rt),
                routes={"/memory/memory_data/": StoreBackend(rt)}
         )
    )

    return deep_agent


if __name__ == '__main__':
    asyncio.run(deep_agent_run())


D:\Python\python.exe C:\Users\Huimin\Desktop\AI工程化\Huiminclaw\agents\main_deep_agent\deep_agent.py
Traceback (most recent call last):
File “C:\Users\Huimin\Desktop\AI工程化\Huiminclaw\agents\main_deep_agent\deep_agent.py”, line 38, in
asyncio.run(deep_agent_run())
File “D:\Python\Lib\asyncio\runners.py”, line 194, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File “D:\Python\Lib\asyncio\runners.py”, line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “D:\Python\Lib\asyncio\base_events.py”, line 687, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File “C:\Users\Huimin\Desktop\AI工程化\Huiminclaw\agents\main_deep_agent\deep_agent.py”, line 16, in deep_agent_run
async with PostgresStore.from_conn_string(os.getenv(“DATABASE_URL”)) as store:
TypeError: ‘_GeneratorContextManager’ object does not support the asynchronous context manager protocol

进程已结束,退出代码为 1

hi @Huimin-station

make this chage - swap to the async variants:

  • PostgresStore → AsyncPostgresStore (from langgraph.store.postgres)
  • PostgresSaver → AsyncPostgresSaver (from langgraph.checkpoint.postgres.aio)

Since I need to return an agent, I cannot utilize the with statement, as it would terminate the connection upon exiting the block. Consequently, I have adopted this alternative approach, yet it appears that an error persists.

The previous translation is unsatisfactory; my point is that I am unable to employ the word “with.”

why you skipped async with? It should be wrapped in it.

Or you have to use a workaround:

for async

_store_ctx = AsyncPostgresStore.from_conn_string(DB_URI)
STORE: PostgresStore = _store_ctx.__enter__()  # type: ignore[assignment]
STORE.setup() 

_checkpointer_ctx = AsyncPostgresStore.from_conn_string(DB_URI)
CHECKPOINTER: PostgresSaver = _checkpointer_ctx.__enter__()
CHECKPOINTER.setup()

or sync

_store_ctx = PostgresStore.from_conn_string(DB_URI)
STORE: PostgresStore = _store_ctx.__enter__()  # type: ignore[assignment]
STORE.setup() 

_checkpointer_ctx = PostgresStore.from_conn_string(DB_URI)
CHECKPOINTER: PostgresSaver = _checkpointer_ctx.__enter__()
CHECKPOINTER.setup()
import asyncio
import os
import selectors

import dotenv
from deepagents import create_deep_agent
from deepagents.backends import CompositeBackend, StateBackend, StoreBackend
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.store.postgres import PostgresStore, AsyncPostgresStore

from model.llm.deepseek.deepseek_llm import deepseek

dotenv.load_dotenv()
def setup_windows_event_loop():
    """修复 Windows 下 psycopg 异步兼容问题"""
    if os.name == "nt":  # 仅对 Windows 生效
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

async def deep_agent_run():
    async with  AsyncPostgresStore.from_conn_string(os.getenv("DATABASE_URL")) as store:
        async with AsyncPostgresSaver.from_conn_string(os.getenv("DATABASE_URL")) as checkpointer:
            await store.setup()
            await checkpointer.setup()

            deep_agent = create_deep_agent(
                model=deepseek,
                checkpointer=checkpointer,
                store=store ,
                backend=lambda rt: CompositeBackend(
                default=StateBackend(rt),
                routes={"/memory/memory_data/": StoreBackend(rt)}
         )
    )

    return deep_agent


async def main():
    agent =await deep_agent_run()
    config = {"configurable":{"thread_id":"1"}}
    while True:
        user = input("User: ")
        async for chunk in agent.astream({"messages":user},config):
            print(chunk)

if __name__ == '__main__':
    setup_windows_event_loop()
    asyncio.run(main())

I want to use this function as an agent factory, but the following error is reported when calling it externally:

D:\Python\python.exe C:\Users\Huimin\Desktop\AI工程化\Huiminclaw\agents\main_deep_agent\deep_agent.py
User: 你好
Traceback (most recent call last):
File “C:\Users\Huimin\Desktop\AI工程化\Huiminclaw\agents\main_deep_agent\deep_agent.py”, line 48, in
asyncio.run(main())
File “D:\Python\Lib\asyncio\runners.py”, line 194, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File “D:\Python\Lib\asyncio\runners.py”, line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “D:\Python\Lib\asyncio\base_events.py”, line 687, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File “C:\Users\Huimin\Desktop\AI工程化\Huiminclaw\agents\main_deep_agent\deep_agent.py”, line 43, in main
async for chunk in agent.astream({“messages”:user},config):
File “D:\Python\Lib\site-packages\langgraph\pregel\main.py”, line 3026, in astream
async with AsyncPregelLoop(
File “D:\Python\Lib\site-packages\langgraph\pregel_loop.py”, line 1361, in aenter
saved = await self.checkpointer.aget_tuple(self.checkpoint_config)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “D:\Python\Lib\site-packages\langgraph\checkpoint\postgres\aio.py”, line 197, in aget_tuple
async with self._cursor() as cur:
File “D:\Python\Lib\contextlib.py”, line 210, in aenter
return await anext(self.gen)
^^^^^^^^^^^^^^^^^^^^^
File “D:\Python\Lib\site-packages\langgraph\checkpoint\postgres\aio.py”, line 391, in _cursor
async with conn.cursor(binary=True, row_factory=dict_row) as cur:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “D:\Python\Lib\site-packages\psycopg\connection_async.py”, line 261, in cursor
self._check_connection_ok()
File “D:\Python\Lib\site-packages\psycopg_connection_base.py”, line 532, in _check_connection_ok
raise e.OperationalError(“the connection is closed”)
psycopg.OperationalError: the connection is closed

进程已结束,退出代码为 1

it’s because the connections get closed after create_agent_with_postgres is execuded.

try this:

import asyncio
import os
from langgraph.store.postgres import AsyncPostgresStore
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from deepagents import create_deep_agent

async def main():
    async with AsyncPostgresStore.from_conn_string(os.getenv("DATABASE_URL")) as store:
        async with AsyncPostgresSaver.from_conn_string(os.getenv("DATABASE_URL")) as checkpointer:
            await store.setup()
            await checkpointer.setup()

            agent = create_deep_agent(
                model="anthropic:claude-sonnet-4-6",
                checkpointer=checkpointer,
                store=store,
                # ... other arguments
            )

            # Use the agent INSIDE the async with block
            result = await agent.ainvoke(
                {"messages": [{"role": "user", "content": "Hello!"}]},
                config={"configurable": {"thread_id": "1"}},
            )
            print(result)

asyncio.run(main())

or this @asynccontextmanager:

from contextlib import asynccontextmanager

@asynccontextmanager
async def create_agent_with_postgres(db_url: str):
    """Create a Deep Agent with Postgres backends, keeping connections alive."""
    async with AsyncPostgresStore.from_conn_string(db_url) as store:
        async with AsyncPostgresSaver.from_conn_string(db_url) as checkpointer:
            await store.setup()
            await checkpointer.setup()

            agent = create_deep_agent(
                model="anthropic:claude-sonnet-4-6",
                checkpointer=checkpointer,
                store=store,
            )
            yield agent  # Connections stay open until caller exits

async def main():
    async with create_agent_with_postgres(os.getenv("DATABASE_URL")) as agent:
        # Connections are alive here
        result = await agent.ainvoke(
            {"messages": [{"role": "user", "content": "Hello!"}]},
            config={"configurable": {"thread_id": "1"}},
        )
        print(result)

        # You can invoke multiple times while connections stay open
        result2 = await agent.ainvoke(
            {"messages": [{"role": "user", "content": "Follow up"}]},
            config={"configurable": {"thread_id": "1"}},
        )

asyncio.run(main())