hi @Huimin-station
I’ve created this script, maybe it could help somehow:
"""
Demo: Streaming sub-agent input/output with Deep Agents.
Shows three approaches to observe sub-agent execution in real time:
1. `stream_mode="updates"` + `subgraphs=True` -- structured step-by-step output
2. `stream_mode="messages"` + `subgraphs=True` -- token-by-token LLM streaming
3. Combined modes (`["updates", "messages"]`) with v2 format
Requirements:
- OPENAI_API_KEY in environment (or .env)
- POSTGRES_URI in environment (or .env), e.g.:
postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable
Run:
python src/deepagents_subagent_streaming_anthropic_postgres.py
"""
import os
from dotenv import load_dotenv
from langchain_core.messages import AIMessageChunk, HumanMessage
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.types import Overwrite
from deepagents import SubAgent, create_deep_agent
def _require_env(name: str) -> str:
value = os.getenv(name)
if not value:
raise RuntimeError(f"Missing required env var: {name}")
return value
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _format_ns(ns: tuple[str, ...]) -> str:
"""Format a LangGraph namespace tuple for display."""
if not ns:
return "root"
return " > ".join(ns)
def _unwrap(value):
"""Unwrap an Overwrite wrapper if present, otherwise return as-is."""
if isinstance(value, Overwrite):
return value.value
return value
def _content_to_str(content) -> str:
"""Normalize message content to a string.
Content can be a str, a list of content blocks (dicts with 'text' keys),
or None.
"""
if not content:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for block in content:
if isinstance(block, str):
parts.append(block)
elif isinstance(block, dict) and "text" in block:
parts.append(block["text"])
return " ".join(parts)
return str(content)
def _separator(title: str) -> None:
print(f"\n{'=' * 60}")
print(f" {title}")
print(f"{'=' * 60}\n")
# ---------------------------------------------------------------------------
# Sub-agent definitions
# ---------------------------------------------------------------------------
researcher: SubAgent = {
"name": "researcher",
"description": "Researches a topic and provides a concise summary with key findings.",
"system_prompt": (
"You are a researcher. When given a topic, provide a concise 2-3 paragraph "
"summary with key facts. Be direct and factual."
),
}
writer: SubAgent = {
"name": "writer",
"description": "Takes research notes and writes a polished short article.",
"system_prompt": (
"You are a writer. When given research notes or a topic, write a polished "
"short article (3-4 paragraphs). Use clear, engaging prose."
),
}
# ---------------------------------------------------------------------------
# Streaming demos
# ---------------------------------------------------------------------------
def demo_stream_updates(agent, thread_id: str) -> None:
"""Stream sub-agent execution with `stream_mode='updates'`.
Each node update from both the main agent and sub-agents is emitted
as a structured dict keyed by node name.
"""
_separator("Demo 1: stream_mode='updates' + subgraphs=True")
config = {"configurable": {"thread_id": thread_id}}
inputs = {"messages": [HumanMessage(content="Research the topic of 'solar energy advantages' and then write a short article about it.")]}
for namespace, chunk in agent.stream(
inputs,
config=config,
subgraphs=True,
stream_mode="updates",
):
ns_label = _format_ns(namespace)
for node_name, node_output in chunk.items():
print(f"[{ns_label}] node={node_name}")
if not isinstance(node_output, dict):
continue
if "messages" in node_output:
msgs = _unwrap(node_output["messages"])
if not isinstance(msgs, list) or not msgs:
continue
last_msg = msgs[-1]
content = _content_to_str(getattr(last_msg, "content", ""))
msg_type = type(last_msg).__name__
preview = content[:120].replace("\n", " ")
print(f" {msg_type}: {preview}{'...' if len(content) > 120 else ''}")
print()
def demo_stream_messages(agent, thread_id: str) -> None:
"""Stream LLM tokens from sub-agents with `stream_mode='messages'`.
Each chunk is an `AIMessageChunk` emitted token-by-token,
giving real-time visibility into what the sub-agent's LLM is generating.
"""
_separator("Demo 2: stream_mode='messages' + subgraphs=True")
config = {"configurable": {"thread_id": thread_id}}
inputs = {"messages": [HumanMessage(content="Research the topic of 'wind energy' briefly.")]}
current_ns = None
for namespace, payload in agent.stream(
inputs,
config=config,
subgraphs=True,
stream_mode="messages",
):
if not isinstance(payload, tuple) or len(payload) != 2:
continue
msg_chunk, metadata = payload
if not isinstance(msg_chunk, AIMessageChunk):
continue
content = _content_to_str(msg_chunk.content)
if not content:
continue
# Print a header when we switch between graphs
ns_label = _format_ns(namespace)
node = metadata.get("langgraph_node", "?")
if (namespace, node) != current_ns:
current_ns = (namespace, node)
print(f"\n--- [{ns_label}] node={node} ---")
print(content, end="", flush=True)
print() # final newline
def demo_stream_combined_v2(agent, thread_id: str) -> None:
"""Stream multiple modes simultaneously using v2 format.
Combines `updates` and `messages` in a single stream.
Each chunk is a typed `StreamPart` dict with `type`, `ns`, and `data` fields.
"""
_separator("Demo 3: Combined modes with version='v2'")
config = {"configurable": {"thread_id": thread_id}}
inputs = {"messages": [HumanMessage(content="Research 'hydroelectric power' in one paragraph.")]}
current_ns = None
for chunk in agent.stream(
inputs,
config=config,
subgraphs=True,
stream_mode=["updates", "messages"],
version="v2",
):
chunk_type = chunk["type"]
ns = chunk["ns"]
data = chunk["data"]
ns_label = _format_ns(ns)
if chunk_type == "updates":
if isinstance(data, dict):
for node_name in data:
print(f"[{ns_label}] UPDATE node={node_name}")
else:
print(f"[{ns_label}] UPDATE: {data}")
elif chunk_type == "messages":
msg_chunk, metadata = data
if not isinstance(msg_chunk, AIMessageChunk):
continue
content = _content_to_str(msg_chunk.content)
if not content:
continue
node = metadata.get("langgraph_node", "?")
if (ns, node) != current_ns:
current_ns = (ns, node)
print(f"\n--- [{ns_label}] TOKEN STREAM node={node} ---")
print(content, end="", flush=True)
print()
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
load_dotenv()
# Disable LangSmith tracing if no valid API key is configured
if not os.getenv("LANGSMITH_API_KEY"):
os.environ.setdefault("LANGCHAIN_TRACING_V2", "false")
_require_env("OPENAI_API_KEY")
postgres_url = os.getenv("POSTGRES_URI") or os.getenv("DATABASE_URL")
if not postgres_url:
raise RuntimeError("Missing required env var: POSTGRES_URI (or DATABASE_URL)")
with PostgresSaver.from_conn_string(postgres_url) as checkpointer:
checkpointer.setup()
agent = create_deep_agent(
model="openai:gpt-5.4",
system_prompt=(
"You are a coordinating agent. When asked to research and write, "
"delegate research to the 'researcher' sub-agent, then delegate "
"writing to the 'writer' sub-agent. Do not do the work yourself."
),
subagents=[researcher, writer],
checkpointer=checkpointer,
)
demo_stream_updates(agent, thread_id="demo-updates-1")
demo_stream_messages(agent, thread_id="demo-messages-1")
demo_stream_combined_v2(agent, thread_id="demo-combined-v2-1")
if __name__ == "__main__":
main()