Hi everyone,
I’m a bit confused about how parent state is propagated into a compiled subgraph when using the Send API with a node strategy.
Example scenario:
- I have a parent graph whose state includes
document_id.
- I also have a compiled subgraph for page-level processing, whose state includes
page_id and also expects document_id.
- I use the Send API from the parent graph to parallelize page processing by sending
page_id through send.arg. The page node in the subgraph has both state fields (page_id, document_id).
My confusion:
When the subgraph is triggered via Send:
- Will the subgraph automatically receive the parent’s
document_id state?
- Or do I need to explicitly include
document_id in the Send args along with page_id?
( document_id should be read_only for all the subgraphs but passing it via Send.arg causes INVALID_CONCURRENT_GRAPH_UPDATE)
I’m basically trying to understand how shared / inherited state is handled when a compiled subgraph is triggered via Send, not just invoked directly as a node. I do know that directly calling the subgraph inside a node with the right parameters works best , but in my use case I’m restricted to a node strategy + Send API.
Another solution will be to use different states for both parents and subgraphs (e.g., documend_id, parent_document_id), but in my case I have deep nesting (parent, children, grandchildren, etc ..) so this will eventually cause naming inconsistencies.
Any clarification on how state propagation works in this setup would be greatly appreciated.
Here is an example for reference:
# ..
def init(state: AgentState) -> Command[Literal["orchestrate", "__end__"]]:
"""
Initialize the document orchestrator.
"""
try:
# 1. Get the document IDs from the state
document_id = state.get("document_id", None)
doc_node = DocumentNode(root=(ResourcesDir.documents / document_id).resolve())
pages_ids = list(doc_node.pages.keys())
# 2. All set, goto the orchestration node
return Command(
goto="orchestrate",
update={
"pages_ids": pages_ids,
}
)
except Exception as e:
pass
return Command(goto=END)
def orchestrate(state: AgentState) -> AgentState:
"""
Pseudo node to delegate tasks to the page refiner
"""
document_id = state.get("document_id", None)
pages_ids = state.get("pages_ids", [])
documents_stats = state.get("documents_stats", {})
for page_id in pages_ids:
documents_stats[document_id]["pages"][page_id] = PageState(
status="IN_PROGRESS",
groups={},
orphans={}
)
return {
"documents_stats": documents_stats
}
def delegate_tasks(state: AgentState):
"""
Delegate and spawn page nodes for each page in the document.
"""
# 1. Get the pages' IDs from the state
pages_ids = state.get("pages_ids", [])
# 2. Build the args for each page node
page_args = {
# "document_id": state.get("document_id", None), -> omitting this does not propagate state, wheras including it raises INVALID_CONCURRENT_GRAPH_UPDATE
"page_id": None
}
# 3. Use Send API to delegate tasks to the page nodes
page_tasks = []
for page_id in pages_ids:
page_args["page_id"] = page_id
page_tasks.append(Send("page_node", page_args))
return page_tasks
def aggregate(state: AgentState) -> AgentState:
return state
# etc ..
page_node= page_graph.compile(name="PageRefiner", checkpointer=False)
# etc ..
graph = (
StateGraph(AgentState)
.add_node("__init__", init)
.add_node("orchestrate", orchestrate)
.add_node("page_node", page_node)
.add_node("aggregate", aggregate)
.add_edge(START, "__init__")
.add_conditional_edges("orchestrate", delegate_tasks)
.add_edge("page_node", "aggregate")
.add_edge("aggregate", END)
)
Hi @pawel-twardziak , could you please check this out ?
Hi @codeonym
sure, lemme see whether I could help…
Ok, I think I get what is happening here.
Send("page_node", page_args) - this is a correct way to pass state from a parent graph to a subgraph via Send, passing args
- you have to add
document_id to the args (it’s been commented out for now)
- the
INVALID_CONCURRENT_GRAPH_UPDATE is because you are fanning out to parallel branches in one step and fanning in trying to set document_id in the parent graph state from multiple node/subgraph outputs - which is not allowed unless document_id field is annotated with a specific reducer that handles multiple values within one super step
Given that:
- no, a subgraph will not automatically receive the parent’s state when using
Send api
- yes, you need to pass
document_id in the arguments
how to fix it?
Currently you have multiple parallel branches in the same super step that override the same key document_id.
You can either a) add a reducer on that key to handle concurent wites (but I believe it does not make sense in this case) or b) re-design the state so that it handles the flow correctly
You could also invoke a subgraph from a node (not making the subgraph a node itself) → Subgraphs - Docs by LangChain, e.g.:
async def call_pages_no_send(state: ParentState) -> ParentState:
doc_id = state["document_id"]
pages = state["pages"]
# Prepare inputs for child subgraph
inputs = [{"document_id": doc_id, "page_id": p} for p in pages]
# Fan-out concurrently inside this single node
# Each call returns e.g. {"page_results": {page_id: "..."}}
results = await asyncio.gather(*(page_graph.ainvoke(inp) for inp in inputs))
# Aggregate into one parent update to avoid concurrent writes
merged: dict[str, str] = {}
for out in results:
merged.update(out.get("page_results", {}))
return {"page_results": merged}
Another way, if document_id is constant and should not participate in state updates at all, pass it via the runtime context and access it inside nodes using runtime.context (avoids state collisions entirely)
class ContextSchema:
document_id: str
graph = StateGraph(State, context_schema=ContextSchema)
...
def node_with_runtime(state: State, runtime: Runtime[ContextSchema]):
print("In node: ", runtime.context.document_id)
1 Like
HI @pawel-twardziak I’ve decided to invoke the subgraph inside a wrapper node, here is my current impl:
def delegate_tasks(state: AgentState):
"""
Delegate tasks to `page_node` for each page in the document.
"""
# 1. Get the pages' IDs from the state
pages_ids = state.get("pages_ids", [])
print(f" PAGE IDS: {pages_ids}")
# 2. Use Send API to delegate tasks to the page nodes
page_tasks = []
for page_id in pages_ids:
page_args = PageRefinerArgs(
pr_document_id=state.get("document_id", ""), # < -- Added Prefix PR_ to avoid collision with graph' state
pr_page_id = page_id,
)
page_tasks.append(Send("page_node", page_args))
return page_tasks
def page_node(state: PageRefinerArgs):
"""
Invoking the Page refiner agent
"""
# 1. Get args from state
document_id = state.get("pr_document_id", "")
page_id = state.get("pr_page_id", "")
try:
# 2. Check args validity
if not (document_id and page_id):
raise ValueError("PageRefiner requires document_id and page_id.")
# 3. Invoke the page refiner agent
for chunk in workflow.stream({
"document_id": document_id,
"page_id": page_id
},
stream_mode="values"):
# 4. Yield the refiner's state
page_state = PageState(
groups=chunk.get("groups", {}),
orphans=chunk.get("orphans", {}),
status=chunk.get("status", "FAILED")
)
yield {
"pages": {
page_id: page_state
}
}
except Exception:
# 5. Oops, something went wrong. End the agent.
yield {
"pages": {
page_id: PageState(status="FAILED", groups={}, orphans={})
}
}