import asyncio
import os
import dotenv
from deepagents import create_deep_agent
from deepagents.backends import CompositeBackend, StateBackend, StoreBackend
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.store.postgres import PostgresStore
from model.llm.deepseek.deepseek_llm import deepseek
dotenv.load_dotenv()
async def deep_agent_run():
async with PostgresStore.from_conn_string(os.getenv("DATABASE_URL")) as store:
async with PostgresSaver.from_conn_string(os.getenv("DATABASE_URL")) as checkpointer:
await store.setup()
await checkpointer.setup()
deep_agent = create_deep_agent(
model=deepseek,
checkpointer=checkpointer,
store=store ,
backend=lambda rt: CompositeBackend(
default=StateBackend(rt),
routes={"/memory/memory_data/": StoreBackend(rt)}
)
)
return deep_agent
if __name__ == '__main__':
asyncio.run(deep_agent_run())
D:\Python\python.exe C:\Users\Huimin\Desktop\AI工程化\Huiminclaw\agents\main_deep_agent\deep_agent.py
Traceback (most recent call last):
File “C:\Users\Huimin\Desktop\AI工程化\Huiminclaw\agents\main_deep_agent\deep_agent.py”, line 38, in
asyncio.run(deep_agent_run())
File “D:\Python\Lib\asyncio\runners.py”, line 194, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File “D:\Python\Lib\asyncio\runners.py”, line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “D:\Python\Lib\asyncio\base_events.py”, line 687, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File “C:\Users\Huimin\Desktop\AI工程化\Huiminclaw\agents\main_deep_agent\deep_agent.py”, line 16, in deep_agent_run
async with PostgresStore.from_conn_string(os.getenv(“DATABASE_URL”)) as store:
TypeError: ‘_GeneratorContextManager’ object does not support the asynchronous context manager protocol
进程已结束,退出代码为 1
hi @Huimin-station
make this chage - swap to the async variants:
- PostgresStore → AsyncPostgresStore (from langgraph.store.postgres)
- PostgresSaver → AsyncPostgresSaver (from langgraph.checkpoint.postgres.aio)
Since I need to return an agent, I cannot utilize the with statement, as it would terminate the connection upon exiting the block. Consequently, I have adopted this alternative approach, yet it appears that an error persists.
The previous translation is unsatisfactory; my point is that I am unable to employ the word “with.”
why you skipped async with? It should be wrapped in it.
Or you have to use a workaround:
for async
_store_ctx = AsyncPostgresStore.from_conn_string(DB_URI)
STORE: PostgresStore = _store_ctx.__enter__() # type: ignore[assignment]
STORE.setup()
_checkpointer_ctx = AsyncPostgresStore.from_conn_string(DB_URI)
CHECKPOINTER: PostgresSaver = _checkpointer_ctx.__enter__()
CHECKPOINTER.setup()
or sync
_store_ctx = PostgresStore.from_conn_string(DB_URI)
STORE: PostgresStore = _store_ctx.__enter__() # type: ignore[assignment]
STORE.setup()
_checkpointer_ctx = PostgresStore.from_conn_string(DB_URI)
CHECKPOINTER: PostgresSaver = _checkpointer_ctx.__enter__()
CHECKPOINTER.setup()
import asyncio
import os
import selectors
import dotenv
from deepagents import create_deep_agent
from deepagents.backends import CompositeBackend, StateBackend, StoreBackend
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.store.postgres import PostgresStore, AsyncPostgresStore
from model.llm.deepseek.deepseek_llm import deepseek
dotenv.load_dotenv()
def setup_windows_event_loop():
"""修复 Windows 下 psycopg 异步兼容问题"""
if os.name == "nt": # 仅对 Windows 生效
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
async def deep_agent_run():
async with AsyncPostgresStore.from_conn_string(os.getenv("DATABASE_URL")) as store:
async with AsyncPostgresSaver.from_conn_string(os.getenv("DATABASE_URL")) as checkpointer:
await store.setup()
await checkpointer.setup()
deep_agent = create_deep_agent(
model=deepseek,
checkpointer=checkpointer,
store=store ,
backend=lambda rt: CompositeBackend(
default=StateBackend(rt),
routes={"/memory/memory_data/": StoreBackend(rt)}
)
)
return deep_agent
async def main():
agent =await deep_agent_run()
config = {"configurable":{"thread_id":"1"}}
while True:
user = input("User: ")
async for chunk in agent.astream({"messages":user},config):
print(chunk)
if __name__ == '__main__':
setup_windows_event_loop()
asyncio.run(main())
I want to use this function as an agent factory, but the following error is reported when calling it externally:
D:\Python\python.exe C:\Users\Huimin\Desktop\AI工程化\Huiminclaw\agents\main_deep_agent\deep_agent.py
User: 你好
Traceback (most recent call last):
File “C:\Users\Huimin\Desktop\AI工程化\Huiminclaw\agents\main_deep_agent\deep_agent.py”, line 48, in
asyncio.run(main())
File “D:\Python\Lib\asyncio\runners.py”, line 194, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File “D:\Python\Lib\asyncio\runners.py”, line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “D:\Python\Lib\asyncio\base_events.py”, line 687, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File “C:\Users\Huimin\Desktop\AI工程化\Huiminclaw\agents\main_deep_agent\deep_agent.py”, line 43, in main
async for chunk in agent.astream({“messages”:user},config):
File “D:\Python\Lib\site-packages\langgraph\pregel\main.py”, line 3026, in astream
async with AsyncPregelLoop(
File “D:\Python\Lib\site-packages\langgraph\pregel_loop.py”, line 1361, in aenter
saved = await self.checkpointer.aget_tuple(self.checkpoint_config)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “D:\Python\Lib\site-packages\langgraph\checkpoint\postgres\aio.py”, line 197, in aget_tuple
async with self._cursor() as cur:
File “D:\Python\Lib\contextlib.py”, line 210, in aenter
return await anext(self.gen)
^^^^^^^^^^^^^^^^^^^^^
File “D:\Python\Lib\site-packages\langgraph\checkpoint\postgres\aio.py”, line 391, in _cursor
async with conn.cursor(binary=True, row_factory=dict_row) as cur:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “D:\Python\Lib\site-packages\psycopg\connection_async.py”, line 261, in cursor
self._check_connection_ok()
File “D:\Python\Lib\site-packages\psycopg_connection_base.py”, line 532, in _check_connection_ok
raise e.OperationalError(“the connection is closed”)
psycopg.OperationalError: the connection is closed
进程已结束,退出代码为 1
it’s because the connections get closed after create_agent_with_postgres is execuded.
try this:
import asyncio
import os
from langgraph.store.postgres import AsyncPostgresStore
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from deepagents import create_deep_agent
async def main():
async with AsyncPostgresStore.from_conn_string(os.getenv("DATABASE_URL")) as store:
async with AsyncPostgresSaver.from_conn_string(os.getenv("DATABASE_URL")) as checkpointer:
await store.setup()
await checkpointer.setup()
agent = create_deep_agent(
model="anthropic:claude-sonnet-4-6",
checkpointer=checkpointer,
store=store,
# ... other arguments
)
# Use the agent INSIDE the async with block
result = await agent.ainvoke(
{"messages": [{"role": "user", "content": "Hello!"}]},
config={"configurable": {"thread_id": "1"}},
)
print(result)
asyncio.run(main())
or this @asynccontextmanager:
from contextlib import asynccontextmanager
@asynccontextmanager
async def create_agent_with_postgres(db_url: str):
"""Create a Deep Agent with Postgres backends, keeping connections alive."""
async with AsyncPostgresStore.from_conn_string(db_url) as store:
async with AsyncPostgresSaver.from_conn_string(db_url) as checkpointer:
await store.setup()
await checkpointer.setup()
agent = create_deep_agent(
model="anthropic:claude-sonnet-4-6",
checkpointer=checkpointer,
store=store,
)
yield agent # Connections stay open until caller exits
async def main():
async with create_agent_with_postgres(os.getenv("DATABASE_URL")) as agent:
# Connections are alive here
result = await agent.ainvoke(
{"messages": [{"role": "user", "content": "Hello!"}]},
config={"configurable": {"thread_id": "1"}},
)
print(result)
# You can invoke multiple times while connections stay open
result2 = await agent.ainvoke(
{"messages": [{"role": "user", "content": "Follow up"}]},
config={"configurable": {"thread_id": "1"}},
)
asyncio.run(main())