Can the input and output of deep_agent's sub-agents be output in streaming mode?

When I use deep_agent to call sub-agents, I want to get the input and output of the sub-agents. But as sub-agents, these processes will not be exposed in 【content】, because they are used as tool calls.

The input of the sub-agent is passed as the task description in 【AI_Message → tool_calls → args → description】.

The output is returned as a ToolMessage.

The effect I originally wanted was to have it output as 【AIMessagesChunk】, but this is clearly not compliant with the specification.

So I have to extract these outputs using 【values】 or other methods, which is not very friendly for interaction.

hi @Huimin-station

I’ve created this script, maybe it could help somehow:

"""
Demo: Streaming sub-agent input/output with Deep Agents.

Shows three approaches to observe sub-agent execution in real time:
  1. `stream_mode="updates"` + `subgraphs=True`  -- structured step-by-step output
  2. `stream_mode="messages"` + `subgraphs=True`  -- token-by-token LLM streaming
  3. Combined modes (`["updates", "messages"]`) with v2 format

Requirements:
- OPENAI_API_KEY in environment (or .env)
- POSTGRES_URI in environment (or .env), e.g.:
    postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable

Run:
  python src/deepagents_subagent_streaming_anthropic_postgres.py
"""

import os

from dotenv import load_dotenv
from langchain_core.messages import AIMessageChunk, HumanMessage
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.types import Overwrite

from deepagents import SubAgent, create_deep_agent


def _require_env(name: str) -> str:
    value = os.getenv(name)
    if not value:
        raise RuntimeError(f"Missing required env var: {name}")
    return value


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def _format_ns(ns: tuple[str, ...]) -> str:
    """Format a LangGraph namespace tuple for display."""
    if not ns:
        return "root"
    return " > ".join(ns)


def _unwrap(value):
    """Unwrap an Overwrite wrapper if present, otherwise return as-is."""
    if isinstance(value, Overwrite):
        return value.value
    return value


def _content_to_str(content) -> str:
    """Normalize message content to a string.

    Content can be a str, a list of content blocks (dicts with 'text' keys),
    or None.
    """
    if not content:
        return ""
    if isinstance(content, str):
        return content
    if isinstance(content, list):
        parts = []
        for block in content:
            if isinstance(block, str):
                parts.append(block)
            elif isinstance(block, dict) and "text" in block:
                parts.append(block["text"])
        return " ".join(parts)
    return str(content)


def _separator(title: str) -> None:
    print(f"\n{'=' * 60}")
    print(f"  {title}")
    print(f"{'=' * 60}\n")


# ---------------------------------------------------------------------------
# Sub-agent definitions
# ---------------------------------------------------------------------------

researcher: SubAgent = {
    "name": "researcher",
    "description": "Researches a topic and provides a concise summary with key findings.",
    "system_prompt": (
        "You are a researcher. When given a topic, provide a concise 2-3 paragraph "
        "summary with key facts. Be direct and factual."
    ),
}

writer: SubAgent = {
    "name": "writer",
    "description": "Takes research notes and writes a polished short article.",
    "system_prompt": (
        "You are a writer. When given research notes or a topic, write a polished "
        "short article (3-4 paragraphs). Use clear, engaging prose."
    ),
}


# ---------------------------------------------------------------------------
# Streaming demos
# ---------------------------------------------------------------------------

def demo_stream_updates(agent, thread_id: str) -> None:
    """Stream sub-agent execution with `stream_mode='updates'`.

    Each node update from both the main agent and sub-agents is emitted
    as a structured dict keyed by node name.
    """
    _separator("Demo 1: stream_mode='updates' + subgraphs=True")

    config = {"configurable": {"thread_id": thread_id}}
    inputs = {"messages": [HumanMessage(content="Research the topic of 'solar energy advantages' and then write a short article about it.")]}

    for namespace, chunk in agent.stream(
        inputs,
        config=config,
        subgraphs=True,
        stream_mode="updates",
    ):
        ns_label = _format_ns(namespace)
        for node_name, node_output in chunk.items():
            print(f"[{ns_label}] node={node_name}")
            if not isinstance(node_output, dict):
                continue
            if "messages" in node_output:
                msgs = _unwrap(node_output["messages"])
                if not isinstance(msgs, list) or not msgs:
                    continue
                last_msg = msgs[-1]
                content = _content_to_str(getattr(last_msg, "content", ""))
                msg_type = type(last_msg).__name__
                preview = content[:120].replace("\n", " ")
                print(f"  {msg_type}: {preview}{'...' if len(content) > 120 else ''}")
            print()


def demo_stream_messages(agent, thread_id: str) -> None:
    """Stream LLM tokens from sub-agents with `stream_mode='messages'`.

    Each chunk is an `AIMessageChunk` emitted token-by-token,
    giving real-time visibility into what the sub-agent's LLM is generating.
    """
    _separator("Demo 2: stream_mode='messages' + subgraphs=True")

    config = {"configurable": {"thread_id": thread_id}}
    inputs = {"messages": [HumanMessage(content="Research the topic of 'wind energy' briefly.")]}

    current_ns = None
    for namespace, payload in agent.stream(
        inputs,
        config=config,
        subgraphs=True,
        stream_mode="messages",
    ):
        if not isinstance(payload, tuple) or len(payload) != 2:
            continue
        msg_chunk, metadata = payload
        if not isinstance(msg_chunk, AIMessageChunk):
            continue
        content = _content_to_str(msg_chunk.content)
        if not content:
            continue

        # Print a header when we switch between graphs
        ns_label = _format_ns(namespace)
        node = metadata.get("langgraph_node", "?")
        if (namespace, node) != current_ns:
            current_ns = (namespace, node)
            print(f"\n--- [{ns_label}] node={node} ---")

        print(content, end="", flush=True)

    print()  # final newline


def demo_stream_combined_v2(agent, thread_id: str) -> None:
    """Stream multiple modes simultaneously using v2 format.

    Combines `updates` and `messages` in a single stream.
    Each chunk is a typed `StreamPart` dict with `type`, `ns`, and `data` fields.
    """
    _separator("Demo 3: Combined modes with version='v2'")

    config = {"configurable": {"thread_id": thread_id}}
    inputs = {"messages": [HumanMessage(content="Research 'hydroelectric power' in one paragraph.")]}

    current_ns = None
    for chunk in agent.stream(
        inputs,
        config=config,
        subgraphs=True,
        stream_mode=["updates", "messages"],
        version="v2",
    ):
        chunk_type = chunk["type"]
        ns = chunk["ns"]
        data = chunk["data"]
        ns_label = _format_ns(ns)

        if chunk_type == "updates":
            if isinstance(data, dict):
                for node_name in data:
                    print(f"[{ns_label}] UPDATE node={node_name}")
            else:
                print(f"[{ns_label}] UPDATE: {data}")

        elif chunk_type == "messages":
            msg_chunk, metadata = data
            if not isinstance(msg_chunk, AIMessageChunk):
                continue
            content = _content_to_str(msg_chunk.content)
            if not content:
                continue

            node = metadata.get("langgraph_node", "?")
            if (ns, node) != current_ns:
                current_ns = (ns, node)
                print(f"\n--- [{ns_label}] TOKEN STREAM node={node} ---")
            print(content, end="", flush=True)

    print()


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

def main() -> None:
    load_dotenv()

    # Disable LangSmith tracing if no valid API key is configured
    if not os.getenv("LANGSMITH_API_KEY"):
        os.environ.setdefault("LANGCHAIN_TRACING_V2", "false")

    _require_env("OPENAI_API_KEY")
    postgres_url = os.getenv("POSTGRES_URI") or os.getenv("DATABASE_URL")
    if not postgres_url:
        raise RuntimeError("Missing required env var: POSTGRES_URI (or DATABASE_URL)")

    with PostgresSaver.from_conn_string(postgres_url) as checkpointer:
        checkpointer.setup()

        agent = create_deep_agent(
            model="openai:gpt-5.4",
            system_prompt=(
                "You are a coordinating agent. When asked to research and write, "
                "delegate research to the 'researcher' sub-agent, then delegate "
                "writing to the 'writer' sub-agent. Do not do the work yourself."
            ),
            subagents=[researcher, writer],
            checkpointer=checkpointer,
        )

        demo_stream_updates(agent, thread_id="demo-updates-1")
        demo_stream_messages(agent, thread_id="demo-messages-1")
        demo_stream_combined_v2(agent, thread_id="demo-combined-v2-1")


if __name__ == "__main__":
    main()