Streaming from sub-agents in the subagents pattern

Hello,

I’m trying to set up a multi agent system based on the subagents pattern (main agent calling sub-agents as tools).
Although it seems to work, I can’t get the sub-agents to stream events (messages and updates) to the main agent’s (orchestrator) event stream. I can see the orchestrator’s steps as expected, but none from the sub-agents. Is there a way to achieve this with this pattern? Thanks!

Most of the code is from the deep agents course.

This is the orchestrator setup:

# Tools
sub_agent_tools = [think_tool, get_table_names, get_table_schema, execute_query]
built_in_tools = [ls, read_file, write_file, write_todos, read_todos, think_tool]

threat_research_sub_agent = {
    "name": "threat-research-agent",
    "description": "Delegate threat research to the sub-agent threat researcher. Only give this researcher one threat at a time.",
    "prompt": THREAT_RESEARCHER_AGENT_INSTRUCTIONS_v2.format(date=get_today_str()),
    "tools": ["think_tool", "get_table_names", "get_table_schema", "execute_query"],
}

# Create task tool to delegate tasks to sub-agents
task_tool = _create_task_tool(
    sub_agent_tools, [threat_research_sub_agent], model, DeepAgentState
)

delegation_tools = [task_tool]
orchestrator_tools = built_in_tools + delegation_tools

ORCHESTRATOR_INSTRUCTIONS = ORCHESTRATOR_INSTRUCTIONS_v2.format(
    max_concurrent_threat_research_units=max_concurrent_threat_research_units,
    max_threat_researcher_iterations=max_threat_researcher_iterations,
    date=get_today_str(),
)

INSTRUCTIONS = (
    "# TODO MANAGEMENT\n"
    + TODO_USAGE_INSTRUCTIONS
    + "\n\n"
    + "=" * 80
    + "\n\n"
    + "# FILE SYSTEM USAGE\n"
    + FILE_USAGE_INSTRUCTIONS
    + "\n\n"
    + "=" * 80
    + "\n\n"
    + "# SUB-AGENT DELEGATION\n"
    + ORCHESTRATOR_INSTRUCTIONS
)

agent = create_agent(
    model, orchestrator_tools, system_prompt=INSTRUCTIONS, state_schema=DeepAgentState, name="orchestrator"
)

This is the sub-agent tool:

def _create_task_tool(tools, subagents: list[SubAgent], model, state_schema):
    """Create a task delegation tool that enables context isolation through sub-agents.

    This function implements the core pattern for spawning specialized sub-agents with
    isolated contexts, preventing context clash and confusion in complex multi-step tasks.

    Args:
        tools: List of available tools that can be assigned to sub-agents
        subagents: List of specialized sub-agent configurations
        model: The language model to use for all agents
        state_schema: The state schema (typically DeepAgentState)

    Returns:
        A 'task' tool that can delegate work to specialized sub-agents
    """
    # Create agent registry
    agents = {}

    # Build tool name mapping for selective tool assignment
    tools_by_name = {}
    for tool_ in tools:
        if not isinstance(tool_, BaseTool):
            tool_ = tool(tool_)
        tools_by_name[tool_.name] = tool_

    # Create specialized sub-agents based on configurations
    for _agent in subagents:
        if "tools" in _agent:
            # Use specific tools if specified
            _tools = [tools_by_name[t] for t in _agent["tools"]]
        else:
            # Default to all tools
            _tools = tools
        agents[_agent["name"]] = create_agent(   # updated 1.0
            model, system_prompt=_agent["prompt"], tools=_tools, state_schema=state_schema, name=_agent["name"]
        )

    # Generate description of available sub-agents for the tool description
    other_agents_string = [
        f"- {_agent['name']}: {_agent['description']}" for _agent in subagents
    ]

    @tool(description=TASK_DESCRIPTION_PREFIX.format(other_agents=other_agents_string))
    def task(
        description: str,
        subagent_type: str,
        state: Annotated[DeepAgentState, InjectedState],
        tool_call_id: Annotated[str, InjectedToolCallId],
    ):
        """Delegate a task to a specialized sub-agent with isolated context.

        This creates a fresh context for the sub-agent containing only the task description,
        preventing context pollution from the parent agent's conversation history.
        """
        # Validate requested agent type exists
        if subagent_type not in agents:
            return f"Error: invoked agent of type {subagent_type}, the only allowed types are {[f'`{k}`' for k in agents]}"

        # Get the requested sub-agent
        sub_agent = agents[subagent_type]

        # Create isolated context with only the task description
        # This is the key to context isolation - no parent history
        state["messages"] = [{"role": "user", "content": description}]

        # Execute the sub-agent in isolation
        result = None
        for chunk in sub_agent.stream(
            state,
            stream_mode=["messages", "updates"],
        ):
            result = chunk

        # Use the final result
        if result is None:
            return "Error: sub-agent produced no output"

        # Return results to parent agent via Command state update
        return Command(
            update={
                "files": result.get("files", {}),  # Merge any file changes
                "messages": [
                    # Sub-agent result becomes a ToolMessage in parent context
                    ToolMessage(
                        result["messages"][-1].content, tool_call_id=tool_call_id
                    )
                ],
            }
        )

    return task

And this is the invocation:

config = {"configurable": {"thread_id": "1"}}

for _, chunk_type, chunk in agent.stream(
    {"messages": [{"role": "user", "content": "asdf"}]},
    config=config,
    subgraphs=True,
    stream_mode=["messages", "updates"],
):  
     breakpoint()

Hi @kzoltan

what about this:

  from langgraph.config import get_stream_writer

  @tool
  def task(...):
      writer = get_stream_writer()
      for mode, chunk in sub_agent.stream(
          sub_state,
          stream_mode=["messages", "updates"],
      ):
          writer({"subagent": subagent_type, "mode": mode, "chunk": chunk})
      return Command(...)

and

  for mode, chunk in agent.stream(..., stream_mode=["messages", "updates", "custom"], subgraphs=True):
      ...

Or use stream_events Streaming API - Docs by LangChain

One issue:

Your tool mutates the parent state in-place:

state["messages"] = [{"role": "user", "content": description}]

IMHO this way, you’re not just preparing input for the sub-agent - you’re overwriting the orchestrator’s own conversation history in-place.
Better go for this:

sub_state = dict(state)          # shallow copy is usually enough for top-level keys
sub_state["messages"] = [{"role": "user", "content": description}]

for chunk in sub_agent.stream(sub_state, stream_mode=["messages", "updates"]):
    ...

oh, I did not think about that. It is so simple and works great! :slight_smile:
Thank you!

:slight_smile: if it helps, huge favor, mark the post as Solved picking the answer, so the others can make use of it later :slight_smile: