`anyio.CancelScope(shield=True)` not working inside langgraph node

Hi! I’m trying to catch a CancelledError triggered by FastAPI so I can perform some asynchronous cleanup. However, wrapping my cleanup logic in anyio.CancelScope(shield=True) doesn’t seem to work when executed inside a LangGraph node; the cancellation still propagates immediately.

Is there a recommended way to temporarily shield a task from a CancelledError within LangGraph? Thanks in advance!

Example Code

import anyio
import pprint
from typing import TypedDict
from fastapi import FastAPI
from fastapi.responses import StreamingResponse, HTMLResponse
import uvicorn

from langgraph.graph import StateGraph, START, END
from langchain_core.callbacks.manager import adispatch_custom_event
from langchain_core.runnables import RunnableConfig


class GraphState(TypedDict):
    session_id: str


async def node(state: GraphState, config: RunnableConfig):
    try:
        for i in range(100):
            await adispatch_custom_event(
                name="my_custom_chunk", data={"text": f"Node {i}"}, config=config
            )
            await anyio.sleep(1)
    except anyio.get_cancelled_exc_class():
        print("Detected Cancel (Inside LangGraph)")
        raise
    finally:
        print("Finalize (Inside LangGraph)")

        with anyio.CancelScope(shield=True):
            try:
                await anyio.sleep(1)
                print("Finalize complete (Inside LangGraph)")
            except BaseException as e:
                print(f"Exception await (Inside LangGraph):\n{pprint.pformat(e)}")


workflow = StateGraph(GraphState)  # ty: ignore[invalid-argument-type]
workflow.add_node("stream_generator", node)
workflow.add_edge(START, "stream_generator")
workflow.add_edge("stream_generator", END)
app_graph = workflow.compile()


async def sse_generator(session_id: str):
    print(f"Sesstion ID: {session_id}")
    initial_state = {"session_id": session_id}

    try:
        async for event in app_graph.astream_events(initial_state, version="v2"):
            match event:
                case {
                    "event": "on_custom_event",
                    "name": "my_custom_chunk",
                    "data": {"text": chunk_text},
                }:
                    yield f"data: {chunk_text}\n\n"
    except anyio.get_cancelled_exc_class():
        print("Detected Cancel (Outside LangGraph)")
        raise

    finally:
        with anyio.CancelScope(shield=True):
            print("Finalize (Outside LangGraph)")
            try:
                await anyio.sleep(1.0)
                print("Finalize Completed (Outside Graph)")
            except BaseException as e:
                print(f"Exception await (Ouside LangGraph):\n{pprint.pformat(e)}")


app = FastAPI()


@app.get("/stream")
async def stream_endpoint():
    import uuid

    session_id = str(uuid.uuid4())[:6]
    return StreamingResponse(sse_generator(session_id), media_type="text/event-stream")


@app.get("/")
async def index():
    html = """
    <!DOCTYPE html>
    <html>
        <body>
            <button onclick="startStream()">Start</button>
            <button onclick="stopStream()">Stop</button>
            <div id="output"></div>
            <script>
                let eventSource;
                function startStream() {
                    if (eventSource) return;
                    document.getElementById('output').innerHTML = "";
                    eventSource = new EventSource('/stream');
                    eventSource.onmessage = function(e) {
                        document.getElementById('output').innerHTML += e.data + " ";
                    };
                }
                function stopStream() {
                    if (eventSource) {
                        eventSource.close();
                        eventSource = null;
                        document.getElementById('output').innerHTML += "<br><br><b style='color:red;'>[Stopped]</b>";
                    }
                }
            </script>
        </body>
    </html>
    """
    return HTMLResponse(html)


if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=8003)

Output

Sesstion ID: a8b7ee
Detected Cancel (Inside LangGraph)
Finalize (Inside LangGraph)
Exception await (Inside LangGraph):
CancelledError("Cancelled via cancel scope 7ffb00d61700 by <Task pending name='Task-7' coro=<RequestResponseCycle.run_asgi() running at /var/home/.../.venv/lib/python3.12/site-packages/uvicorn/protocols/http/h11_impl.py:410> cb=[set.discard()]>")
Detected Cancel (Outside LangGraph)
Finalize (Outside LangGraph)
Finalize Completed (Outside Graph)

Hello @jlzhjp
Can you try the following, I have made the following updates:

Part Original Fixed
Node finally anyio.CancelScope(shield=True) + await anyio.sleep(1) Spawn asyncio.get_event_loop().create_task(_node_cleanup(done_event)) — no await
Cleanup function Inlined in finally Moved to _node_cleanup(done_event) — a separate coroutine run as a fresh task
Coordination None asyncio.Event in cleanup_done_registry
sse_generator finally anyio.CancelScope(shield=True) + await anyio.sleep(1) Same shield, but now await cleanup_event.wait() instead of sleeping blindly
import asyncio
import pprint
import uuid

import anyio
from fastapi import FastAPI
from fastapi.responses import HTMLResponse, StreamingResponse
from langchain_core.callbacks.manager import adispatch_custom_event
from langchain_core.runnables import RunnableConfig
from langgraph.graph import END, START, StateGraph
from typing import TypedDict
import uvicorn


# Registry: session_id -> asyncio.Event
# The node sets it when its independent cleanup task finishes.
cleanup_done_registry: dict[str, asyncio.Event] = {}


class GraphState(TypedDict):
    session_id: str


async def _node_cleanup(done_event: asyncio.Event) -> None:
    """
    Runs as a brand-new asyncio.Task — inherits zero cancellation state
    from the node task that spawned it. Signals done_event when finished.
    """
    await asyncio.sleep(1)      # I asssumed it simulate real async cleanup work
    print("Finalize complete (Inside LangGraph)")
    done_event.set()


async def node(state: GraphState, config: RunnableConfig):
    try:
        for i in range(100):
            await adispatch_custom_event(
                name="my_custom_chunk", data={"text": f"Node {i}"}, config=config
            )
            await anyio.sleep(1)

    except anyio.get_cancelled_exc_class():
        print("Detected Cancel (Inside LangGraph)")
        raise

    finally:
        print("Finalize (Inside LangGraph)")
        # Spawn cleanup as a completely independent task.
        # anyio's cancel-scope machinery has no handle on a freshly-created
        # task, so it can await freely without being re-cancelled.
        # Do NOT await here — the sse_generator's shielded finally block
        # waits for it via the asyncio.Event.
        session_id = state["session_id"]
        done_event = cleanup_done_registry.get(session_id)
        if done_event is not None:
            asyncio.get_event_loop().create_task(
                _node_cleanup(done_event),
                name=f"cleanup-{session_id}",
            )


workflow = StateGraph(GraphState)
workflow.add_node("stream_generator", node)
workflow.add_edge(START, "stream_generator")
workflow.add_edge("stream_generator", END)
app_graph = workflow.compile()


async def sse_generator(session_id: str):
    print(f"Session ID: {session_id}")
    cleanup_event = asyncio.Event()
    cleanup_done_registry[session_id] = cleanup_event

    try:
        async for event in app_graph.astream_events(
            {"session_id": session_id}, version="v2"
        ):
            match event:
                case {
                    "event": "on_custom_event",
                    "name": "my_custom_chunk",
                    "data": {"text": chunk_text},
                }:
                    yield f"data: {chunk_text}\n\n"

    except anyio.get_cancelled_exc_class():
        print("Detected Cancel (Outside LangGraph)")
        raise

    finally:
        cleanup_done_registry.pop(session_id, None)

        # anyio.CancelScope(shield=True) works here because sse_generator runs
        # in the original anyio-managed FastAPI request task.
        with anyio.CancelScope(shield=True):
            print("Finalize (Outside LangGraph)")
            try:
                with anyio.fail_after(5.0):
                    await cleanup_event.wait()   # wait for node cleanup task
                print("Finalize complete (Outside LangGraph) — node cleanup confirmed")
            except TimeoutError:
                print("Finalize (Outside LangGraph) — node cleanup timed out")
            except BaseException as e:
                print(f"Exception in outer finalize:\n{pprint.pformat(e)}")


app = FastAPI()


@app.get("/stream")
async def stream_endpoint():
    session_id = str(uuid.uuid4())[:6]
    return StreamingResponse(
        sse_generator(session_id), media_type="text/event-stream"
    )


@app.get("/")
async def index():
    html = """
    <!DOCTYPE html>
    <html>
        <body>
            <button onclick="startStream()">Start</button>
            <button onclick="stopStream()">Stop</button>
            <div id="output"></div>
            <script>
                let eventSource;
                function startStream() {
                    if (eventSource) return;
                    document.getElementById('output').innerHTML = "";
                    eventSource = new EventSource('/stream');
                    eventSource.onmessage = function(e) {
                        document.getElementById('output').innerHTML += e.data + " ";
                    };
                }
                function stopStream() {
                    if (eventSource) {
                        eventSource.close();
                        eventSource = null;
                        document.getElementById('output').innerHTML +=
                            "<br><br><b style='color:red;'>[Stopped]</b>";
                    }
                }
            </script>
        </body>
    </html>
    """
    return HTMLResponse(html)


if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=8003)

I tested it, and shielding seems to be working now, as I can see following:

Session ID: 5a8b09
Detected Cancel (Inside LangGraph)
Finalize (Inside LangGraph)
Detected Cancel (Outside LangGraph)
Finalize (Outside LangGraph)
Finalize complete (Inside LangGraph)
Finalize complete (Outside LangGraph) — node cleanup confirmed 

This confirms the cleanup coroutine actually waits to be finished instead of being cancelled immediately.