I’m trying to stream a graph that contains React agents equipped with tools. I followed the documentation and specified that I want different types of stream mode: stream_mode=["messages", "updates", "custom"]. For some reason, nothing gets streamed from “messages”. I’m only able to capture events of type “Updates”. Here is my implementation:
async for stream_mode, chunk in self.graph.astream(
initial_state.model_dump(),
config=config,
stream_mode=["messages", "updates", "custom"],
):
if stream_mode == "messages":
# chunk is a tuple (message_chunk, metadata) for messages mode
message_chunk, metadata = chunk
logger.info(f"Messages stream event - message_chunk: {message_chunk}, metadata: {metadata}")
# Start content block if not already started
if not content_started:
current_content_block_id = str(uuid.uuid4())
content_start_event = ContentBlockStartEvent(
block_id=current_content_block_id,
content_type="text",
)
yield self._format_sse_event(content_start_event)
content_started = True
# Stream text content tokens
if hasattr(message_chunk, "content") and message_chunk.content:
content_delta_event = ContentBlockDeltaEvent(
block_id=current_content_block_id,
delta=message_chunk.content,
)
yield self._format_sse_event(content_delta_event)
# Save content to database
if initial_state.current_message_id:
await self._save_streaming_text_content(
message_id=initial_state.current_message_id,
text_chunk=message_chunk.content,
)
# Handle graph node completions and responses
elif stream_mode == "updates":
logger.info(f"Node completed with output keys: {chunk}")
# Handle custom streaming data from tools/nodes
elif stream_mode == "custom":
logger.debug(f"Custom stream event: {chunk}")
Graph:
class Graph:
def __init__(self) -> None:
self.graph_builder = StateGraph(ChatState)
self._setup_nodes()
self._setup_edges()
memory = MemorySaver()
self.graph = self.graph_builder.compile(checkpointer=memory)
def _setup_nodes(self) -> None:
"""Set up the nodes for the graph."""
from project.agents.x_agent.agent import x_agent
from project.agents.a_agent.agent import a_agent
from project.agents.y_agent.agent import y_agent
self.graph_builder.add_node("x_agent", x_agent)
self.graph_builder.add_node("a_agent", a_agent)
self.graph_builder.add_node("y_agent", y_agent)
def _setup_edges(self) -> None:
"""Set up the edges for the graph according to the new workflow."""
# Start with supervisor for routing
self.graph_builder.add_edge(START, "a_agent")
self.graph_builder.add_conditional_edges(
"a_agent",
self._route_to_agent,
{
"other": "y_agent",
"general": "x_agent",
"error": END,
},
)
# Both specialized agents go to END
self.graph_builder.add_edge("y_agent", END)
self.graph_builder.add_edge("x_agent", END)
def _route_to_agent(self, state: ChatState) -> str:
"""Route to the appropriate agent based on supervisor decision."""
if state.errors:
return "error"
if state.selected_agent == "y_agent":
return "y_agent"
if state.selected_agent == "x_agent":
return "x_agent"
return "x_agent"
def get_graph(): # noqa: ANN201
"""Get the graph instance."""
return Graph().graph
x_agent and y_agent are defined the same way:
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.runnables import RunnableConfig
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
MODEL_NAME = "x-model"
@traceable(
run_type="llm",
metadata={
"agent": "x_agent",
"ls_model_name": MODEL_NAME,
"ls_provider": "x-provider",
},
)
async def x_agent(state: ChatState, config: RunnableConfig) -> ChatState:
try:
mcp_tools = await get_tools(state.id)
llm = ChatOpenAI(
model=MODEL_NAME,
base_url=settings.BASE_LLM_ENDPOINT,
api_key=SecretStr(settings.BASE_LLM_API_KEY),
streaming=True, # Enable streaming
temperature=0.1,
)
agent = create_react_agent(
model=llm,
tools=mcp_tools,
)
messages = []
messages.append(SystemMessage(content=system_prompt))
for hist_msg in state.conversation_history:
if hist_msg["role"] == "user":
messages.append(HumanMessage(content=hist_msg["content"]))
elif hist_msg["role"] == "assistant":
from langchain_core.messages import AIMessage
ai_msg = AIMessage(content=hist_msg.get("content", ""))
if "tool_calls" in hist_msg:
ai_msg.tool_calls = hist_msg["tool_calls"]
messages.append(ai_msg)
current_user_message = user_prompt.format(user_question=state.user_prompt)
messages.append(HumanMessage(content=current_user_message))
llm_response = await agent.ainvoke({"messages": messages}, config=config)
return state.model_copy(
update={
"agent_response": llm_response,
"agent_data": {
"agent_type": "x_agent",
"tools_available": len(mcp_tools),
"streaming_enabled": True,
},
},
)
except (ValueError, ConnectionError, RuntimeError) as e:
error_message = f"x_agent failed with error: {e}"
logger.error(error_message, exc_info=True)
return state.model_copy(update={"errors": [*state.errors, error_message]})
When I run this I only get two SSE events streamed:
data: {"type": "message_start", "timestamp": "2025-09-15T10:21:08.691839", "message_id": "uuid-x", "conversation_id": "uuid-x"}
data: {"type": "message_stop", "timestamp": "2025-09-15T10:21:14.778332", "message_id": "uuid-x", "stop_reason": "complete", "usage": {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}}
In my terminal I get:
2025-09-15 10:21:14.777 | INFO | _stream_graph_execution:349 - Node completed with output keys:
{
"state": {
"user_prompt": "hello!",
"conversation_history": [{"role": "user", "content": "hello!"}],
"next_task": None,
"selected_agent": "x_agent",
"agent_response": {
"messages": [
SystemMessage(
...(system_prompt),
),
HumanMessage(
content="hello!",
additional_kwargs={},
response_metadata={},
id="uuid_1",
),
HumanMessage(
content='\nAnalyze this question',
additional_kwargs={},
response_metadata={},
id="uuid_2",
),
AIMessage(
content="Hello! how can I help you?",
additional_kwargs={},
response_metadata={
"finish_reason": "stop",
"model_name": "x-model",
"system_fingerprint": "xxxx",
},
id="uuid_3",
usage_metadata={
"input_tokens": 6741,
"output_tokens": 127,
"total_tokens": 6868,
"input_token_details": {"audio": 0, "cache_read": 0},
"output_token_details": {"audio": 0, "reasoning": 0},
},
),
]
},
"agent_data": {
"agent_type": "x_agent",
"tools_available": 10,
"streaming_enabled": True,
},
"response_content": "",
"tool_calls": [],
"citations": [],
"token_usage": None,
"errors": [],
"created_at": datetime.datetime(
2025, 9, 15, 17, 21, 9, 156410, tzinfo=datetime.timezone.utc
),
"updated_at": None,
}
}
What am I doing wrong? Is this related to this issue: When using create_react_agent with tools, executing astream with stream_mode="messages" does not produce streaming output. · Issue #5249 · langchain-ai/langgraph · GitHub
My langgraph and python version are:
"langgraph>=0.6.7",
python 3.13.5
