Hello,
I’m trying to set up a multi agent system based on the subagents pattern (main agent calling sub-agents as tools).
Although it seems to work, I can’t get the sub-agents to stream events (messages and updates) to the main agent’s (orchestrator) event stream. I can see the orchestrator’s steps as expected, but none from the sub-agents. Is there a way to achieve this with this pattern? Thanks!
Most of the code is from the deep agents course.
This is the orchestrator setup:
# Tools
sub_agent_tools = [think_tool, get_table_names, get_table_schema, execute_query]
built_in_tools = [ls, read_file, write_file, write_todos, read_todos, think_tool]
threat_research_sub_agent = {
"name": "threat-research-agent",
"description": "Delegate threat research to the sub-agent threat researcher. Only give this researcher one threat at a time.",
"prompt": THREAT_RESEARCHER_AGENT_INSTRUCTIONS_v2.format(date=get_today_str()),
"tools": ["think_tool", "get_table_names", "get_table_schema", "execute_query"],
}
# Create task tool to delegate tasks to sub-agents
task_tool = _create_task_tool(
sub_agent_tools, [threat_research_sub_agent], model, DeepAgentState
)
delegation_tools = [task_tool]
orchestrator_tools = built_in_tools + delegation_tools
ORCHESTRATOR_INSTRUCTIONS = ORCHESTRATOR_INSTRUCTIONS_v2.format(
max_concurrent_threat_research_units=max_concurrent_threat_research_units,
max_threat_researcher_iterations=max_threat_researcher_iterations,
date=get_today_str(),
)
INSTRUCTIONS = (
"# TODO MANAGEMENT\n"
+ TODO_USAGE_INSTRUCTIONS
+ "\n\n"
+ "=" * 80
+ "\n\n"
+ "# FILE SYSTEM USAGE\n"
+ FILE_USAGE_INSTRUCTIONS
+ "\n\n"
+ "=" * 80
+ "\n\n"
+ "# SUB-AGENT DELEGATION\n"
+ ORCHESTRATOR_INSTRUCTIONS
)
agent = create_agent(
model, orchestrator_tools, system_prompt=INSTRUCTIONS, state_schema=DeepAgentState, name="orchestrator"
)
This is the sub-agent tool:
def _create_task_tool(tools, subagents: list[SubAgent], model, state_schema):
"""Create a task delegation tool that enables context isolation through sub-agents.
This function implements the core pattern for spawning specialized sub-agents with
isolated contexts, preventing context clash and confusion in complex multi-step tasks.
Args:
tools: List of available tools that can be assigned to sub-agents
subagents: List of specialized sub-agent configurations
model: The language model to use for all agents
state_schema: The state schema (typically DeepAgentState)
Returns:
A 'task' tool that can delegate work to specialized sub-agents
"""
# Create agent registry
agents = {}
# Build tool name mapping for selective tool assignment
tools_by_name = {}
for tool_ in tools:
if not isinstance(tool_, BaseTool):
tool_ = tool(tool_)
tools_by_name[tool_.name] = tool_
# Create specialized sub-agents based on configurations
for _agent in subagents:
if "tools" in _agent:
# Use specific tools if specified
_tools = [tools_by_name[t] for t in _agent["tools"]]
else:
# Default to all tools
_tools = tools
agents[_agent["name"]] = create_agent( # updated 1.0
model, system_prompt=_agent["prompt"], tools=_tools, state_schema=state_schema, name=_agent["name"]
)
# Generate description of available sub-agents for the tool description
other_agents_string = [
f"- {_agent['name']}: {_agent['description']}" for _agent in subagents
]
@tool(description=TASK_DESCRIPTION_PREFIX.format(other_agents=other_agents_string))
def task(
description: str,
subagent_type: str,
state: Annotated[DeepAgentState, InjectedState],
tool_call_id: Annotated[str, InjectedToolCallId],
):
"""Delegate a task to a specialized sub-agent with isolated context.
This creates a fresh context for the sub-agent containing only the task description,
preventing context pollution from the parent agent's conversation history.
"""
# Validate requested agent type exists
if subagent_type not in agents:
return f"Error: invoked agent of type {subagent_type}, the only allowed types are {[f'`{k}`' for k in agents]}"
# Get the requested sub-agent
sub_agent = agents[subagent_type]
# Create isolated context with only the task description
# This is the key to context isolation - no parent history
state["messages"] = [{"role": "user", "content": description}]
# Execute the sub-agent in isolation
result = None
for chunk in sub_agent.stream(
state,
stream_mode=["messages", "updates"],
):
result = chunk
# Use the final result
if result is None:
return "Error: sub-agent produced no output"
# Return results to parent agent via Command state update
return Command(
update={
"files": result.get("files", {}), # Merge any file changes
"messages": [
# Sub-agent result becomes a ToolMessage in parent context
ToolMessage(
result["messages"][-1].content, tool_call_id=tool_call_id
)
],
}
)
return task
And this is the invocation:
config = {"configurable": {"thread_id": "1"}}
for _, chunk_type, chunk in agent.stream(
{"messages": [{"role": "user", "content": "asdf"}]},
config=config,
subgraphs=True,
stream_mode=["messages", "updates"],
):
breakpoint()