Duplicate node execution when using conditionals

I have the following graph:

import operator
import random
from typing import Annotated
from langgraph.graph import START, StateGraph
from langgraph.types import Send
import pydantic


class State(pydantic.BaseModel):
    value: int = 0
    fanout_results: Annotated[list[int], operator.add] = pydantic.Field(
        default_factory=list
    )
    result: int = 0


class FanoutInput(pydantic.BaseModel):
    value: int


def step1(state: State):
    print("step1")
    return {"value": random.randint(1, 4)}


def prepare_static_step2(state: State):
    print("prepare_static_step2")
    return {}


def static_step2(state: State):
    print("static_step2")
    return {"fanout_results": [state.value + 5]}


def dynamic_step2(input: FanoutInput):
    print("dynamic_step2", input.value)
    return {
        "fanout_results": [input.value * 2],
    }


def fanin(state: State):
    print("fanin", state.fanout_results)
    return {"result": sum(state.fanout_results)}


# If true, `fanin` is called twice!
needs_preparation = True

graph = StateGraph(State)

graph.add_node("step1", step1)
if needs_preparation:
    graph.add_node("prepare_static_step2", prepare_static_step2)
graph.add_node("static_step2", static_step2)
graph.add_node("dynamic_step2", dynamic_step2)
graph.add_node("fanin", fanin)

graph.add_edge(START, "step1")

# unconditionally execute static_step2
if needs_preparation:
    graph.add_edge("step1", "prepare_static_step2")
    graph.add_edge("prepare_static_step2", "static_step2")
else:
    graph.add_edge("step1", "static_step2")

# fan out to dynamic_step2
graph.add_conditional_edges(
    "step1",
    lambda state: [
        Send("dynamic_step2", FanoutInput(value=i)) for i in range(state.value)
    ],
    ["dynamic_step2"],
)

# Intention: execute fanin after both static_step2 and all dynamic_step2 executions complete
graph.add_edge("static_step2", "fanin")
graph.add_edge("dynamic_step2", "fanin")

g = graph.compile()
print(g.get_graph().draw_mermaid())
g.invoke(State())

If needs_preparation=True the fanin node will be executed twice. I kinda understand why it is happening but I can’t find a workaround to make sure that it is only called after all the dynamic steps and the static one is done. Can you help me?

Ok, found the solution myself :wink:

When I change

graph.add_edge("static_step2", "fanin")
graph.add_edge("dynamic_step2", "fanin")
# to
graph.add_edge(["static_step2", "dynamic_step2"], "fanin")

I get the desired behavior.

1 Like