Best Pattern for “Batch” Human Review with Multiple LangGraph Workers?

Hello,

I’m trying to build a more advanced human-in-the-loop workflow using the orchestrator-worker agent pattern.

My goal is to reduce back-and-forth by consolidating all worker interruptions into a single human review step (e.g., deduplicate/merge the payloads), then resume all workers in one go without losing their in-progress state.

Problem

Using interrupt in this setup isn’t straightforward:

  • If each worker calls interrupt directly, I can’t consolidate/deduplicate the interruption payloads across workers.

  • If workers instead return their interruptions payloads to the orchestrator and it interrupt only once from there, the workers are treated as finished, and langgraph lose the workers states it’d need to resume from where they left off.

Current workaround

Right now, the best approach I’ve found is:

  • each worker returns an object that contains both:

    • the interruption payload,

    • a snapshot of the worker state

  • after human review, the orchestrator re-triggers each worker and re-injects:

    • the saved state snapshot, and

    • the human feedback

This works, but it feels suboptimal because it duplicates state snapshotting logic and also bloats the orchestrator state by storing snapshots for all workers. It also require extra logic to handle resuming vs. first work in workers.

Question

Has anyone implemented a pattern like this in LangGraph (single consolidated interrupt + resuming multiple workers while preserving their state)? If so, what approach/pattern did you use?

Thanks in advance,
Louis

hi @Louis

I think LangGraph already supports batch human review across multiple workers without manual state snapshots:

  • Let each worker call interrupt() (with a checkpointer enabled).
  • When workers run concurrently, you’ll receive multiple pending interrupts at once.
  • Do one consolidated human review, then resume all workers in one call via Command(resume={interrupt_id: response, …}).

Have you tried it? What does you graph actually look like?

Hello @pawel-twardziak, and thanks for you quick answer!

That is indeed one of the pattern I had in mind. But I see one limitation: when I say I want to consolidate payloads, I meant applying arbitrary logic or even calling LLMs to be able to pre-process the required feedback (in my case, workers surface missing evidences to answer the problem, so multiple workers could surface the same missing evidence). But when using interrupt, the actual execution is stopped, meaning no logic beyond the worker logic can be executed.

Another solution would be to use a second, separate software component/agent that is in charge of collecting interrupts and doing the consolidation.

This is how interrupt works - it stops the graph :slight_smile:

I’ve prepared this script:

"""
Orchestrator + 3 worker "subagents" demo:

- Uses OpenAI via `langchain_openai.ChatOpenAI`
- Loads env vars with `python-dotenv`
- Persists state with `langgraph.checkpoint.postgres.PostgresSaver`
- Demonstrates 3 concurrent interrupts in the SAME step, resumed in ONE run via:
    Command(resume={interrupt_id: resume_value, ...})

Required env vars (put in `.env` and this script will load it):
- OPENAI_API_KEY
- POSTGRES_URI
    Example:
    postgresql://postgres:postgres@localhost:5432/langgraph_checkpoints?sslmode=disable

Run:
  python3 src/orchestrator_3_workers_batch_interrupt_postgres.py
"""

import json
import os
import uuid
from typing import Any, Annotated
import operator

from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from typing_extensions import NotRequired, TypedDict

from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import Command, Send, interrupt


class ParentState(TypedDict):
    topic: str
    worker_tasks: list[dict[str, str]]
    # Each worker contributes exactly one item; operator.add aggregates across parallel tasks.
    reviews: Annotated[list[dict[str, Any]], operator.add]
    summary: str


class WorkerState(TypedDict):
    worker_id: str
    role: str
    task: str
    draft: NotRequired[str]
    decision: NotRequired[str]
    # This key is returned by `worker_emit_to_parent` and is meant to update the parent graph.
    # It MUST be present in the worker graph schema, otherwise the update may be dropped.
    reviews: NotRequired[list[dict[str, Any]]]


def _model() -> ChatOpenAI:
    return ChatOpenAI(model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"), temperature=0)


def orchestrator(state: ParentState) -> dict[str, Any]:
    """Creates 3 worker tasks for the topic."""
    topic = state["topic"]
    llm = _model()

    prompt = (
        "You are an orchestrator. Create 3 distinct worker assignments for the topic.\n"
        "Return STRICT JSON (no markdown) as a list of exactly 3 objects, each with keys:\n"
        "- worker_id (short identifier)\n"
        "- role (one-line role description)\n"
        "- task (one-line instruction)\n"
        f"Topic: {topic!r}\n"
    )

    raw = llm.invoke(prompt).content.strip()
    tasks: list[dict[str, str]]
    try:
        parsed = json.loads(raw)
        assert isinstance(parsed, list) and len(parsed) == 3
        tasks = [
            {
                "worker_id": str(x["worker_id"]),
                "role": str(x["role"]),
                "task": str(x["task"]),
            }
            for x in parsed
        ]
    except Exception:
        print(f"Error parsing model response: {raw}")
        # Fallback if model returns non-JSON.
        tasks = [
            {
                "worker_id": "researcher",
                "role": "Find key facts and structure",
                "task": f"Draft a concise factual outline about: {topic}",
            },
            {
                "worker_id": "critic",
                "role": "Identify gaps and risks",
                "task": f"Draft potential issues, missing info, and pitfalls about: {topic}",
            },
            {
                "worker_id": "editor",
                "role": "Improve clarity and tone",
                "task": f"Draft a polished short explanation of: {topic}",
            },
        ]

    return {"worker_tasks": tasks, "reviews": []}


def assign_workers(state: ParentState) -> list[Send]:
    """Fan out into 3 parallel worker executions (same node invoked multiple times)."""
    sends: list[Send] = []
    for t in state["worker_tasks"]:
        sends.append(
            Send(
                "worker",
                {
                    "worker_id": t["worker_id"],
                    "role": t["role"],
                    "task": t["task"],
                },
            )
        )
    return sends


def worker_draft(state: WorkerState) -> dict[str, Any]:
    llm = _model()
    content = llm.invoke(
        [
            ("system", f"You are a worker agent. Role: {state['role']}"),
            ("user", f"Task: {state['task']}\nWrite a short draft (<=120 words)."),
        ]
    ).content
    return {"draft": content}


def worker_interrupt_for_review(state: WorkerState) -> dict[str, Any]:
    """
    Interrupts for human review.

    Key point: `draft` is produced in a prior step, so it is already checkpointed.
    This node can be resumed without re-running the LLM call.
    """
    decision = interrupt(
        {
            "worker_id": state["worker_id"],
            "role": state["role"],
            "task": state["task"],
            "draft": state["draft"],
            "request": "Review the draft. Reply with 'approve' or 'revise: <notes>'.",
        }
    )
    return {"decision": decision}


def worker_emit_to_parent(state: WorkerState) -> dict[str, Any]:
    """Return a single aggregated item into the parent state's `reviews` list."""
    return {
        "reviews": [
            {
                "worker_id": state["worker_id"],
                "role": state["role"],
                "task": state["task"],
                "draft": state["draft"],
                "decision": state["decision"],
            }
        ]
    }


def finalize(state: ParentState) -> dict[str, Any]:
    llm = _model()
    summary = llm.invoke(
        [
            ("system", "You are the orchestrator. Produce a final combined summary."),
            (
                "user",
                "Combine these worker drafts and their decisions into one short answer.\n"
                f"Topic: {state['topic']}\n"
                f"Worker reviews: {json.dumps(state['reviews'], ensure_ascii=False)}\n",
            ),
        ]
    ).content
    return {"summary": summary}


def build_app(checkpointer: PostgresSaver):
    worker_builder = StateGraph(WorkerState)
    worker_builder.add_node("draft", worker_draft)
    worker_builder.add_node("review", worker_interrupt_for_review)
    worker_builder.add_node("emit", worker_emit_to_parent)
    worker_builder.add_edge(START, "draft")
    worker_builder.add_edge("draft", "review")
    worker_builder.add_edge("review", "emit")
    worker_builder.add_edge("emit", END)
    worker_graph = worker_builder.compile()

    parent_builder = StateGraph(ParentState)
    parent_builder.add_node("orchestrator", orchestrator)
    parent_builder.add_node("worker", worker_graph)
    parent_builder.add_node("finalize", finalize)

    parent_builder.add_edge(START, "orchestrator")
    parent_builder.add_conditional_edges("orchestrator", assign_workers, ["worker"])
    parent_builder.add_edge("worker", "finalize")
    parent_builder.add_edge("finalize", END)

    return parent_builder.compile(checkpointer=checkpointer)


def main() -> None:
    load_dotenv()

    postgres_uri = os.getenv("POSTGRES_URI")
    if not postgres_uri:
        raise RuntimeError(
            "POSTGRES_URI is required (see script docstring for an example)."
        )

    if not os.getenv("OPENAI_API_KEY"):
        raise RuntimeError("OPENAI_API_KEY is required.")

    topic = os.getenv("TOPIC", "LangGraph human review batching pattern")
    thread_id = os.getenv("THREAD_ID", str(uuid.uuid4()))
    config = {"configurable": {"thread_id": thread_id}}

    with PostgresSaver.from_conn_string(postgres_uri) as saver:
        # Must be called at least once per fresh database.
        saver.setup()

        app = build_app(saver)

        # 1) Run until we hit interrupts (all 3 workers should interrupt in the same step).
        values = app.invoke({"topic": topic, "reviews": [], "summary": ""}, config, stream_mode="values")
        state = app.get_state(config)
        interrupts = list(state.interrupts)

        print("\n=== First run (paused) ===")
        print(f"thread_id={thread_id}")
        print(f"interrupts={len(interrupts)}")
        for i in interrupts:
            payload = i.value
            print(f"- interrupt_id={i.id} worker_id={payload.get('worker_id')} task={payload.get('task')}")

        if not interrupts:
            print("\nNo interrupts; graph output:")
            print(values)
            return

        # 2) Consolidated "human review" step (this is where you'd dedupe/merge).
        # Here we auto-approve each worker, but you can replace this with real UI logic.
        resume_map: dict[str, str] = {
            i.id: f"approve (auto) for worker={i.value.get('worker_id')}"
            for i in interrupts
        }

        # 3) Resume ALL workers in ONE run.
        final = app.invoke(Command(resume=resume_map), config)

        print("\n=== Second run (resumed once) ===")
        print("reviews:")
        for r in final["reviews"]:
            print(f"- {r['worker_id']}: {r['decision']}")
        print("\nsummary:")
        print(final["summary"])


if __name__ == "__main__":
    main()

Thanks for your help @pawel-twardziak! If that can be of any interest for future readers, in the end we went with interrupts within workers, and handled consolidation with a side-car service that collects interruption dictionaries, run consolidation logic, collect human feedback and rebuild the proper resume dictionary for the main orchestrator/worker agent.

PS: It seems the fact that you can interrupt parallel subgraphs is not documented here. Also it is not documented, but something I tested: if an orchestrator agent triggers X parallel tasks, but you set maximum concurrency to Y, where Y < X, LangGraph will run all X parallel tasks before surfacing any interruptions within those tasks.

1 Like