Hi! I’m trying to catch a CancelledError triggered by FastAPI so I can perform some asynchronous cleanup. However, wrapping my cleanup logic in anyio.CancelScope(shield=True) doesn’t seem to work when executed inside a LangGraph node; the cancellation still propagates immediately.
Is there a recommended way to temporarily shield a task from a CancelledError within LangGraph? Thanks in advance!
Example Code
import anyio
import pprint
from typing import TypedDict
from fastapi import FastAPI
from fastapi.responses import StreamingResponse, HTMLResponse
import uvicorn
from langgraph.graph import StateGraph, START, END
from langchain_core.callbacks.manager import adispatch_custom_event
from langchain_core.runnables import RunnableConfig
class GraphState(TypedDict):
session_id: str
async def node(state: GraphState, config: RunnableConfig):
try:
for i in range(100):
await adispatch_custom_event(
name="my_custom_chunk", data={"text": f"Node {i}"}, config=config
)
await anyio.sleep(1)
except anyio.get_cancelled_exc_class():
print("Detected Cancel (Inside LangGraph)")
raise
finally:
print("Finalize (Inside LangGraph)")
with anyio.CancelScope(shield=True):
try:
await anyio.sleep(1)
print("Finalize complete (Inside LangGraph)")
except BaseException as e:
print(f"Exception await (Inside LangGraph):\n{pprint.pformat(e)}")
workflow = StateGraph(GraphState) # ty: ignore[invalid-argument-type]
workflow.add_node("stream_generator", node)
workflow.add_edge(START, "stream_generator")
workflow.add_edge("stream_generator", END)
app_graph = workflow.compile()
async def sse_generator(session_id: str):
print(f"Sesstion ID: {session_id}")
initial_state = {"session_id": session_id}
try:
async for event in app_graph.astream_events(initial_state, version="v2"):
match event:
case {
"event": "on_custom_event",
"name": "my_custom_chunk",
"data": {"text": chunk_text},
}:
yield f"data: {chunk_text}\n\n"
except anyio.get_cancelled_exc_class():
print("Detected Cancel (Outside LangGraph)")
raise
finally:
with anyio.CancelScope(shield=True):
print("Finalize (Outside LangGraph)")
try:
await anyio.sleep(1.0)
print("Finalize Completed (Outside Graph)")
except BaseException as e:
print(f"Exception await (Ouside LangGraph):\n{pprint.pformat(e)}")
app = FastAPI()
@app.get("/stream")
async def stream_endpoint():
import uuid
session_id = str(uuid.uuid4())[:6]
return StreamingResponse(sse_generator(session_id), media_type="text/event-stream")
@app.get("/")
async def index():
html = """
<!DOCTYPE html>
<html>
<body>
<button onclick="startStream()">Start</button>
<button onclick="stopStream()">Stop</button>
<div id="output"></div>
<script>
let eventSource;
function startStream() {
if (eventSource) return;
document.getElementById('output').innerHTML = "";
eventSource = new EventSource('/stream');
eventSource.onmessage = function(e) {
document.getElementById('output').innerHTML += e.data + " ";
};
}
function stopStream() {
if (eventSource) {
eventSource.close();
eventSource = null;
document.getElementById('output').innerHTML += "<br><br><b style='color:red;'>[Stopped]</b>";
}
}
</script>
</body>
</html>
"""
return HTMLResponse(html)
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8003)
Output
Sesstion ID: a8b7ee
Detected Cancel (Inside LangGraph)
Finalize (Inside LangGraph)
Exception await (Inside LangGraph):
CancelledError("Cancelled via cancel scope 7ffb00d61700 by <Task pending name='Task-7' coro=<RequestResponseCycle.run_asgi() running at /var/home/.../.venv/lib/python3.12/site-packages/uvicorn/protocols/http/h11_impl.py:410> cb=[set.discard()]>")
Detected Cancel (Outside LangGraph)
Finalize (Outside LangGraph)
Finalize Completed (Outside Graph)