Parallel Nodes: how to manage failures or exceptions

Hi,
I’m trying to build my custom agent on LangGraph. Currently I’m building a graph using parallel agents and I would like to know if there is a smart strategy to cancel the execution of one Node if the other node raises an exception.

Thanks in advance for any kind of feedback

hi @carmine-sacco

LangGraph actually has built-in machinery for handling failures in parallel nodes, plus several strategies you can layer on top.

1. LangGraph already cancels sibling nodes on failure (default behavior)

When nodes run in parallel (same superstep), the runner uses concurent.features.wait() (sync) or asyncio.wait() (async) with return_when=FIRST_COMPLETED.
After each task completes, it calls an internal function _should_stop_others() that checks if any completed task raised a non-interrupt exception. If so, it breaks out of the execution loop immediately.

Then _panic_or_proceed() kicks in:

  • it iterates over all futures
  • if it finds an exception, it cancels all still-inflight tasks
  • it re-raises the first exception

Here’s the relevant logic from the LangGraph source code

Therefore, is a node A and node B run in parallel and a node A throws an exception, LangGrpah will cancel node B and re-raise node A’s exception.

important nuance: GraphInterrupt (used for human-in-the-loop) is not treated as a failure. Only real exceptions trigger cancellation. Source: _runner.py

2. RetryPolicy to make nodes resilient before failing

Before a node failure propagates and cancels siblings, you can add retry policies to give transient errors a chance to recover. This is configured per-node via add_node().

from langgraph.graph import StateGraph
from langgraph.types import RetryPolicy

builder = StateGraph(MyState)

# Default retry: 3 attempts, exponential backoff, jitter
builder.add_node(my_flaky_node, retry_policy=RetryPolicy())

# Custom retry: 5 attempts, only on specific exceptions
builder.add_node(
    my_api_node,
    retry_policy=RetryPolicy(
        max_attempts=5,
        initial_interval=1.0,
        backoff_factor=2.0,
        max_interval=30.0,
        jitter=True,
        retry_on=[ConnectionError, TimeoutError],
    ),
)

RetryPolicy fields:

Parameter Default Description
initial_interval 0.5 Seconds before first retry
backoff_factor 2.0 Multiplier for each subsequent retry
max_interval 128.0 Maximum seconds between retries
max_attempts 3 Total attempts (including the first)
jitter True Add random jitter to avoid thundering herd
retry_on default_retry_on Exception class(es) or a callable returning bool

Source: RetryPolicy in langgraph.types and _retry.py

3. catch exceptions inside the node (graceful degradation)

If you want a parallel branch to fail gracefully instead of crashing the entire graph, wrap the node logic in a try/except and write a fallback result to the state:

from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
import operator


class MyState(TypedDict):
    query: str
    results: Annotated[list[dict], operator.add]
    errors: Annotated[list[str], operator.add]


def node_a(state: MyState) -> dict:
    try:
        result = call_external_api_a(state["query"])
        return {"results": [{"source": "A", "data": result}]}
    except Exception as e:
        # Graceful degradation: record the error, don't crash
        return {
            "results": [],
            "errors": [f"Node A failed: {e}"],
        }


def node_b(state: MyState) -> dict:
    try:
        result = call_external_api_b(state["query"])
        return {"results": [{"source": "B", "data": result}]}
    except Exception as e:
        return {
            "results": [],
            "errors": [f"Node B failed: {e}"],
        }


def aggregate(state: MyState) -> dict:
    if not state["results"] and state.get("errors"):
        # All branches failed
        raise RuntimeError(f"All parallel nodes failed: {state['errors']}")
    # Process whatever results we got
    return {"results": state["results"]}


builder = StateGraph(MyState)
builder.add_node(node_a)
builder.add_node(node_b)
builder.add_node(aggregate)

# Fan-out: START -> node_a, node_b (parallel)
builder.add_edge(START, "node_a")
builder.add_edge(START, "node_b")

# Fan-in: node_a, node_b -> aggregate
builder.add_edge("node_a", "aggregate")
builder.add_edge("node_b", "aggregate")
builder.add_edge("aggregate", END)

graph = builder.compile()

This pattern gives you partial results even when some branches fail. The aggregate node can then decide what to do (proceed with partial data, raise an error if all failed, etc.).

Note: the Annotated[list[...], operator.add] reducer is critical here - it merges lists from all parallel branches into a single list in the state.

4. Dynamic fan-out with send + error handling

For map-reduce workflows where you dynamically spawn parallel tasks using Send, the same patterns apply:

from langgraph.types import Send


class OverallState(TypedDict):
    subjects: list[str]
    jokes: Annotated[list[str], operator.add]
    errors: Annotated[list[str], operator.add]


def continue_to_jokes(state: OverallState):
    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]


def generate_joke(state: dict) -> dict:
    try:
        joke = call_llm(f"Tell me a joke about {state['subject']}")
        return {"jokes": [joke]}
    except Exception as e:
        return {"jokes": [], "errors": [f"Failed for {state['subject']}: {e}"]}


builder = StateGraph(OverallState)
builder.add_node(
    generate_joke,
    retry_policy=RetryPolicy(max_attempts=2), 
)
builder.add_conditional_edges(START, continue_to_jokes)
builder.add_edge("generate_joke", END)

graph = builder.compile()

Source: Send class in langgraph.types

5. Summary

Strategy When to Use Behavior
Default (do nothing) You want fail-fast One node fails → siblings cancelled → exception raised
RetryPolicy Transient errors (APIs, network) Retry N times with backoff before failing
Try/except in node Partial results are acceptable Node catches its own error, writes fallback to state
Combine retry + try/except Maximum resilience Retry first, then graceful fallback if all retries exhausted

Hi Pawel,

Thanks a lot for your response. I tried the default approach and unfortunately the other node run its tasks although Node A raised an exception. That was the reason I opened this topic. I would like to understand if there are different ways to control the execution in the Node B doing something similar to asyncio.current_task().cancel().
Currently I’m using the try/except approach but it won’t stop the execution of the node B which is an auxiliary node in my solution.
Is it possible there is a bug in the current solution for which the default option doesn’t interrupt the execution of the other node?

Hi @carmine-sacco

yeah, you are actually right. The mechanizm works as described, but there is an aimportant limitation -Python’s threading limitation. The graph stops waiting for Node B and re-raises Node A’s exception, but Node B’s thread silently runs to completion in the background - concurrent.futures.Future.cancel() can only cancel tasks that haven’t started yet. It happend when you use sync api (.invoke()).

Could you try using ainvoke?

The async runner uses asyncio tasks instead of threads. When asyncio.Future.cancel() is called on an async task, it raises CancelledError inside the coroutine at the next await point - this actually interrupts execution.