Hi Langgraph team,
I ran into something I’m not sure about regarding task cancellations in StateGraph.
I was wondering if you could help me understand whether this is intended behavior, or if there’s a recommended way to handle cancellations properly. Any guidance or suggestions would be greatly appreciated! ![]()
Description
When using StateGraph with astream to build a state machine, if a stage catches an asyncio.CancelledError and returns a Command (e.g., to transition to another node), the state machine does not continue executing that Command.
In other words, when the outer task is cancelled, the astream generator immediately raises CancelledError, preventing the next stage from running. It is unclear whether this is intended behavior or a potential issue.
Minimal Reproducible Example
import asyncio
from asyncio import CancelledError
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import Command
from pydantic import BaseModel, Field
class GraphState(BaseModel):
sleep_seconds: int = Field(..., description="sleep seconds", gt=0)
result: str | None = Field(None, description="result")
async def work_stage(state: GraphState, writer):
writer("# run stage: work")
try:
writer(f"---> sleep for total {state.sleep_seconds} seconds")
for i in range(state.sleep_seconds):
writer(f"---> sleeping: {i + 1}")
await asyncio.sleep(1)
writer(f"---> awake")
except CancelledError:
print("................ ️cancellation detected ................")
writer("❗️cancellation detected, next goto cleanup stage")
return Command(goto="cleanup_stage")
return Command(goto=END, update={"result": "done"})
async def cleanup_stage(state: GraphState, writer):
writer("# run stage: cleanup")
return Command(goto=END, update={"result": "cleanup"})
graph_builder = StateGraph(GraphState)
graph_builder.add_node("work_stage", work_stage)
graph_builder.add_node("cleanup_stage", cleanup_stage)
graph_builder.add_edge(START, "work_stage")
graph = graph_builder.compile()
async def main():
async def _task():
async for chunk_type, chunk in graph.astream(
input={'sleep_seconds': 10},
stream_mode=['custom'],
):
print(chunk)
task_future = asyncio.create_task(_task())
await asyncio.sleep(3.5)
print("................ now cancel ................")
task_future.cancel()
await task_future
asyncio.run(main())
Actual Behavior
-
The
except CancelledErrorblock inwork_stageis triggered, and the warning log is printed. -
However, the state machine does not execute the returned
Command. -
Ultimately, the task raises
CancelledError. -
Console prints:
# run stage: work
---> sleep for total 10 seconds
---> sleeping: 1
---> sleeping: 2
---> sleeping: 3
---> sleeping: 4
................ now cancel ................
................ ️cancellation detected ................
Traceback (most recent call last):
File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/pydevd.py", line 1570, in _exec
pydev_imports.execfile(file, globals, locals) # execute the script
~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^
File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/zhengyun/PycharmProjects/bit-Agent-server4p/demo.py", line 60, in <module>
asyncio.run(main())
~~~~~~~~~~~^^^^^^^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/runners.py", line 194, in run
return runner.run(main)
~~~~~~~~~~^^^^^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/base_events.py", line 720, in run_until_complete
return future.result()
~~~~~~~~~~~~~^^
File "/Users/zhengyun/PycharmProjects/bit-Agent-server4p/demo.py", line 57, in main
await task_future
File "/Users/zhengyun/PycharmProjects/bit-Agent-server4p/demo.py", line 45, in _task
async for chunk_type, chunk in graph.astream(
...<3 lines>...
print(chunk)
File "/Users/zhengyun/Library/Caches/pypoetry/virtualenvs/bit-agent-server4p-pg9iQrBf-py3.13/lib/python3.13/site-packages/langgraph/pregel/__init__.py", line 2655, in astream
async for _ in runner.atick(
...<7 lines>...
yield o
File "/Users/zhengyun/Library/Caches/pypoetry/virtualenvs/bit-agent-server4p-pg9iQrBf-py3.13/lib/python3.13/site-packages/langgraph/pregel/runner.py", line 368, in atick
done, inflight = await asyncio.wait(
^^^^^^^^^^^^^^^^^^^
...<3 lines>...
)
^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/tasks.py", line 451, in wait
return await _wait(fs, timeout, return_when, loop)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/tasks.py", line 537, in _wait
await waiter
asyncio.exceptions.CancelledError
Expected Behavior
-
When a stage catches
CancelledErrorand returns aCommand, the state machine should execute that Command rather than being immediately interrupted by the asyncio cancellation. -
This would allow “safe exit” or cleanup logic to run even when the task is cancelled.
Additional Information
-
Python version: 3.13
-
Langgraph version: 0.4.10
-
Environment: macOS + PyCharm