This is how interrupt works - it stops the graph 
I’ve prepared this script:
"""
Orchestrator + 3 worker "subagents" demo:
- Uses OpenAI via `langchain_openai.ChatOpenAI`
- Loads env vars with `python-dotenv`
- Persists state with `langgraph.checkpoint.postgres.PostgresSaver`
- Demonstrates 3 concurrent interrupts in the SAME step, resumed in ONE run via:
Command(resume={interrupt_id: resume_value, ...})
Required env vars (put in `.env` and this script will load it):
- OPENAI_API_KEY
- POSTGRES_URI
Example:
postgresql://postgres:postgres@localhost:5432/langgraph_checkpoints?sslmode=disable
Run:
python3 src/orchestrator_3_workers_batch_interrupt_postgres.py
"""
import json
import os
import uuid
from typing import Any, Annotated
import operator
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from typing_extensions import NotRequired, TypedDict
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import Command, Send, interrupt
class ParentState(TypedDict):
topic: str
worker_tasks: list[dict[str, str]]
# Each worker contributes exactly one item; operator.add aggregates across parallel tasks.
reviews: Annotated[list[dict[str, Any]], operator.add]
summary: str
class WorkerState(TypedDict):
worker_id: str
role: str
task: str
draft: NotRequired[str]
decision: NotRequired[str]
# This key is returned by `worker_emit_to_parent` and is meant to update the parent graph.
# It MUST be present in the worker graph schema, otherwise the update may be dropped.
reviews: NotRequired[list[dict[str, Any]]]
def _model() -> ChatOpenAI:
return ChatOpenAI(model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"), temperature=0)
def orchestrator(state: ParentState) -> dict[str, Any]:
"""Creates 3 worker tasks for the topic."""
topic = state["topic"]
llm = _model()
prompt = (
"You are an orchestrator. Create 3 distinct worker assignments for the topic.\n"
"Return STRICT JSON (no markdown) as a list of exactly 3 objects, each with keys:\n"
"- worker_id (short identifier)\n"
"- role (one-line role description)\n"
"- task (one-line instruction)\n"
f"Topic: {topic!r}\n"
)
raw = llm.invoke(prompt).content.strip()
tasks: list[dict[str, str]]
try:
parsed = json.loads(raw)
assert isinstance(parsed, list) and len(parsed) == 3
tasks = [
{
"worker_id": str(x["worker_id"]),
"role": str(x["role"]),
"task": str(x["task"]),
}
for x in parsed
]
except Exception:
print(f"Error parsing model response: {raw}")
# Fallback if model returns non-JSON.
tasks = [
{
"worker_id": "researcher",
"role": "Find key facts and structure",
"task": f"Draft a concise factual outline about: {topic}",
},
{
"worker_id": "critic",
"role": "Identify gaps and risks",
"task": f"Draft potential issues, missing info, and pitfalls about: {topic}",
},
{
"worker_id": "editor",
"role": "Improve clarity and tone",
"task": f"Draft a polished short explanation of: {topic}",
},
]
return {"worker_tasks": tasks, "reviews": []}
def assign_workers(state: ParentState) -> list[Send]:
"""Fan out into 3 parallel worker executions (same node invoked multiple times)."""
sends: list[Send] = []
for t in state["worker_tasks"]:
sends.append(
Send(
"worker",
{
"worker_id": t["worker_id"],
"role": t["role"],
"task": t["task"],
},
)
)
return sends
def worker_draft(state: WorkerState) -> dict[str, Any]:
llm = _model()
content = llm.invoke(
[
("system", f"You are a worker agent. Role: {state['role']}"),
("user", f"Task: {state['task']}\nWrite a short draft (<=120 words)."),
]
).content
return {"draft": content}
def worker_interrupt_for_review(state: WorkerState) -> dict[str, Any]:
"""
Interrupts for human review.
Key point: `draft` is produced in a prior step, so it is already checkpointed.
This node can be resumed without re-running the LLM call.
"""
decision = interrupt(
{
"worker_id": state["worker_id"],
"role": state["role"],
"task": state["task"],
"draft": state["draft"],
"request": "Review the draft. Reply with 'approve' or 'revise: <notes>'.",
}
)
return {"decision": decision}
def worker_emit_to_parent(state: WorkerState) -> dict[str, Any]:
"""Return a single aggregated item into the parent state's `reviews` list."""
return {
"reviews": [
{
"worker_id": state["worker_id"],
"role": state["role"],
"task": state["task"],
"draft": state["draft"],
"decision": state["decision"],
}
]
}
def finalize(state: ParentState) -> dict[str, Any]:
llm = _model()
summary = llm.invoke(
[
("system", "You are the orchestrator. Produce a final combined summary."),
(
"user",
"Combine these worker drafts and their decisions into one short answer.\n"
f"Topic: {state['topic']}\n"
f"Worker reviews: {json.dumps(state['reviews'], ensure_ascii=False)}\n",
),
]
).content
return {"summary": summary}
def build_app(checkpointer: PostgresSaver):
worker_builder = StateGraph(WorkerState)
worker_builder.add_node("draft", worker_draft)
worker_builder.add_node("review", worker_interrupt_for_review)
worker_builder.add_node("emit", worker_emit_to_parent)
worker_builder.add_edge(START, "draft")
worker_builder.add_edge("draft", "review")
worker_builder.add_edge("review", "emit")
worker_builder.add_edge("emit", END)
worker_graph = worker_builder.compile()
parent_builder = StateGraph(ParentState)
parent_builder.add_node("orchestrator", orchestrator)
parent_builder.add_node("worker", worker_graph)
parent_builder.add_node("finalize", finalize)
parent_builder.add_edge(START, "orchestrator")
parent_builder.add_conditional_edges("orchestrator", assign_workers, ["worker"])
parent_builder.add_edge("worker", "finalize")
parent_builder.add_edge("finalize", END)
return parent_builder.compile(checkpointer=checkpointer)
def main() -> None:
load_dotenv()
postgres_uri = os.getenv("POSTGRES_URI")
if not postgres_uri:
raise RuntimeError(
"POSTGRES_URI is required (see script docstring for an example)."
)
if not os.getenv("OPENAI_API_KEY"):
raise RuntimeError("OPENAI_API_KEY is required.")
topic = os.getenv("TOPIC", "LangGraph human review batching pattern")
thread_id = os.getenv("THREAD_ID", str(uuid.uuid4()))
config = {"configurable": {"thread_id": thread_id}}
with PostgresSaver.from_conn_string(postgres_uri) as saver:
# Must be called at least once per fresh database.
saver.setup()
app = build_app(saver)
# 1) Run until we hit interrupts (all 3 workers should interrupt in the same step).
values = app.invoke({"topic": topic, "reviews": [], "summary": ""}, config, stream_mode="values")
state = app.get_state(config)
interrupts = list(state.interrupts)
print("\n=== First run (paused) ===")
print(f"thread_id={thread_id}")
print(f"interrupts={len(interrupts)}")
for i in interrupts:
payload = i.value
print(f"- interrupt_id={i.id} worker_id={payload.get('worker_id')} task={payload.get('task')}")
if not interrupts:
print("\nNo interrupts; graph output:")
print(values)
return
# 2) Consolidated "human review" step (this is where you'd dedupe/merge).
# Here we auto-approve each worker, but you can replace this with real UI logic.
resume_map: dict[str, str] = {
i.id: f"approve (auto) for worker={i.value.get('worker_id')}"
for i in interrupts
}
# 3) Resume ALL workers in ONE run.
final = app.invoke(Command(resume=resume_map), config)
print("\n=== Second run (resumed once) ===")
print("reviews:")
for r in final["reviews"]:
print(f"- {r['worker_id']}: {r['decision']}")
print("\nsummary:")
print(final["summary"])
if __name__ == "__main__":
main()