hi @langchain
maybe something like this?
A parent orchestrator that:
- Uses a planner agent (built with
create_agent) to decide the next action per step (research → write → finalize).
- Hands off to specialized agents (research, writer) implemented with
create_agent.
- Shares a single state schema so tools can update short-term memory and routing.
- Uses
interrupt for optional human-in-the-loop, and a checkpointer for pause/resume.
This is more a pseudo-code - I haven’t tested it yet 
import operator
from typing import Annotated, Literal
from typing_extensions import NotRequired, TypedDict
from langchain.chat_models import init_chat_model
from langchain_core.tools import tool
from langchain.agents import create_agent
from langchain.agents.middleware import dynamic_prompt
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.types import Command, interrupt
# 1) Shared state across parent + subgraphs
class OrchestratorState(TypedDict):
messages: Annotated[list, add_messages]
tasks: Annotated[list[str], operator.add]
artifacts: Annotated[list[str], operator.add]
current_task: NotRequired[str]
next_node: NotRequired[Literal["research", "write", "finalize"]]
# 2) Planner tools (LLM decides when to call)
@tool
def choose_next(action: Literal["research", "write", "finalize"]) -> Command:
"""Planner chooses next action."""
return Command(update={"next_node": action})
@tool
def add_task(task: str) -> Command:
"""Append a task to the running task list."""
return Command(update={"tasks": [task]})
@tool
def save_artifact(content: str) -> Command:
"""Save an artifact (notes, snippet, reference) to short-term memory."""
return Command(update={"artifacts": [content]})
@tool
def human_assist(question: str) -> str:
"""Ask a human for help (interrupt)."""
response = interrupt({"question": question})
return response["answer"]
# 3) Planner agent (system prompt via middleware)
@dynamic_prompt
def planner_prompt(request):
return (
"You are a planner that breaks a user goal into small steps and decides the next action.\n"
"- Prefer calling choose_next('research'|'write'|'finalize') each turn.\n"
"- Call add_task(...) to record tasks that should be completed.\n"
"- Call save_artifact(...) to store short notes/results.\n"
"- If information is missing, call human_assist(question) to ask for it.\n"
"Return to 'finalize' when the result is ready."
)
def build_planner():
model = init_chat_model("anthropic:claude-3-5-sonnet-latest", temperature=0)
return create_agent(
model=model,
tools=[choose_next, add_task, save_artifact, human_assist],
middleware=[planner_prompt],
state_schema=OrchestratorState,
name="planner_agent",
)
# 4) Research agent (simple stub tools)
@tool
def web_search(query: str) -> str:
"""Stub search; replace with a real search integration if needed."""
return f"[search-result] Summary for: {query}"
@dynamic_prompt
def research_prompt(request):
return (
"You are a research assistant. Use available tools, produce concise notes.\n"
"When done, summarize findings for the planner."
)
def build_research_agent():
model = init_chat_model("anthropic:claude-3-5-sonnet-latest", temperature=0)
return create_agent(
model=model,
tools=[web_search, save_artifact], # can save notes back into state
middleware=[research_prompt],
state_schema=OrchestratorState,
name="research_agent",
)
# 5) Writer agent (simple stub tools)
@tool
def draft_section(topic: str) -> str:
"""Draft a short section for the final output."""
return f"[draft] A concise section about {topic}."
@dynamic_prompt
def writer_prompt(request):
return (
"You are a writing assistant. Draft clean, concise content using tasks, artifacts, and messages.\n"
"Produce short sections and avoid redundancy."
)
def build_writer_agent():
model = init_chat_model("anthropic:claude-3-5-sonnet-latest", temperature=0)
return create_agent(
model=model,
tools=[draft_section, save_artifact],
middleware=[writer_prompt],
state_schema=OrchestratorState,
name="writer_agent",
)
# 6) Parent orchestrator graph wiring
def build_plan_tasks_and_execute():
planner = build_planner()
researcher = build_research_agent()
writer = build_writer_agent()
builder = StateGraph(OrchestratorState)
builder.add_node("planner", planner)
builder.add_node("research_agent", researcher)
builder.add_node("writer_agent", writer)
def route_from_planner(state: OrchestratorState):
action = state.get("next_node")
if action == "research":
return "research_agent"
if action == "write":
return "writer_agent"
return END
builder.add_edge(START, "planner")
builder.add_conditional_edges("planner", route_from_planner, ["research_agent", "writer_agent", END])
builder.add_edge("research_agent", "planner")
builder.add_edge("writer_agent", "planner")
checkpointer = MemorySaver()
return builder.compile(checkpointer=checkpointer)
if __name__ == "__main__":
graph = build_plan_tasks_and_execute()
config = {"configurable": {"thread_id": "demo-1"}}
result = graph.invoke(
{
"messages": [
{
"role": "user",
"content": "Goal: Produce a brief 3-bullet summary on 'edge computing for retail'. Start with research, then write.",
}
],
# optional: seed lists
"tasks": [],
"artifacts": [],
},
config,
)
# Inspect final messages / artifacts
print("Artifacts:", result.get("artifacts", []))
print("Next node:", result.get("next_node"))
- The planner agent can:
- Decide dynamically among research, write, or finalize using the
choose_next tool.
- Append tasks and artifacts into shared short-term memory via
add_task and save_artifact.
- Trigger human-in-the-loop with
human_assist (execution pauses and resumes with a checkpointer).
- The parent orchestrator:
- Routes from
planner to research_agent or writer_agent based on state["next_node"].
- Loops back to
planner after each sub-agent turn until finalize is chosen.
- All agents share the same
OrchestratorState so tools can update the same memory.
- The entire flow can be resumed after an interrupt using
Command(resume=...).