How to stream from graph with ReactAgent?

I’m trying to stream a graph that contains React agents equipped with tools. I followed the documentation and specified that I want different types of stream mode: stream_mode=["messages", "updates", "custom"]. For some reason, nothing gets streamed from “messages”. I’m only able to capture events of type “Updates”. Here is my implementation:

 async for stream_mode, chunk in self.graph.astream(
                initial_state.model_dump(),
                config=config,
                stream_mode=["messages", "updates", "custom"],
            ):
                if stream_mode == "messages":
                    # chunk is a tuple (message_chunk, metadata) for messages mode
                    message_chunk, metadata = chunk
                    logger.info(f"Messages stream event - message_chunk: {message_chunk}, metadata: {metadata}")

                    # Start content block if not already started
                    if not content_started:
                        current_content_block_id = str(uuid.uuid4())
                        content_start_event = ContentBlockStartEvent(
                            block_id=current_content_block_id,
                            content_type="text",
                        )
                        yield self._format_sse_event(content_start_event)
                        content_started = True

                    # Stream text content tokens
                    if hasattr(message_chunk, "content") and message_chunk.content:
                        content_delta_event = ContentBlockDeltaEvent(
                            block_id=current_content_block_id,
                            delta=message_chunk.content,
                        )
                        yield self._format_sse_event(content_delta_event)

                        # Save content to database
                        if initial_state.current_message_id:
                            await self._save_streaming_text_content(
                                message_id=initial_state.current_message_id,
                                text_chunk=message_chunk.content,
                            )

                # Handle graph node completions and responses
                elif stream_mode == "updates":
                    logger.info(f"Node completed with output keys: {chunk}")

                # Handle custom streaming data from tools/nodes
                elif stream_mode == "custom":
                    logger.debug(f"Custom stream event: {chunk}")

Graph:

class Graph:
    def __init__(self) -> None:
        self.graph_builder = StateGraph(ChatState)
        self._setup_nodes()
        self._setup_edges()

        memory = MemorySaver()
        self.graph = self.graph_builder.compile(checkpointer=memory)

    def _setup_nodes(self) -> None:
        """Set up the nodes for the graph."""
        from project.agents.x_agent.agent import x_agent
        from project.agents.a_agent.agent import a_agent
        from project.agents.y_agent.agent import y_agent

        self.graph_builder.add_node("x_agent", x_agent)
        self.graph_builder.add_node("a_agent", a_agent)
        self.graph_builder.add_node("y_agent", y_agent)

    def _setup_edges(self) -> None:
        """Set up the edges for the graph according to the new workflow."""
        # Start with supervisor for routing
        self.graph_builder.add_edge(START, "a_agent")

        self.graph_builder.add_conditional_edges(
            "a_agent",
            self._route_to_agent,
            {
                "other": "y_agent",
                "general": "x_agent",
                "error": END,
            },
        )

        # Both specialized agents go to END
        self.graph_builder.add_edge("y_agent", END)
        self.graph_builder.add_edge("x_agent", END)

    def _route_to_agent(self, state: ChatState) -> str:
        """Route to the appropriate agent based on supervisor decision."""
        if state.errors:
            return "error"

        if state.selected_agent == "y_agent":
            return "y_agent"

        if state.selected_agent == "x_agent":
            return "x_agent"

        return "x_agent"


def get_graph():  # noqa: ANN201
    """Get the graph instance."""
    return Graph().graph

x_agent and y_agent are defined the same way:

from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.runnables import RunnableConfig
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent

MODEL_NAME = "x-model"


@traceable(
    run_type="llm",
    metadata={
        "agent": "x_agent",
        "ls_model_name": MODEL_NAME,
        "ls_provider": "x-provider",
    },
)
async def x_agent(state: ChatState, config: RunnableConfig) -> ChatState:
    try:
        mcp_tools = await get_tools(state.id)

        llm = ChatOpenAI(
            model=MODEL_NAME,
            base_url=settings.BASE_LLM_ENDPOINT,
            api_key=SecretStr(settings.BASE_LLM_API_KEY),
            streaming=True,  # Enable streaming
            temperature=0.1,
        )

        agent = create_react_agent(
            model=llm,
            tools=mcp_tools,
        )

        messages = []
        messages.append(SystemMessage(content=system_prompt))

        for hist_msg in state.conversation_history:
            if hist_msg["role"] == "user":
                messages.append(HumanMessage(content=hist_msg["content"]))
            elif hist_msg["role"] == "assistant":
                from langchain_core.messages import AIMessage

                ai_msg = AIMessage(content=hist_msg.get("content", ""))

                if "tool_calls" in hist_msg:
                    ai_msg.tool_calls = hist_msg["tool_calls"]

                messages.append(ai_msg)

        current_user_message = user_prompt.format(user_question=state.user_prompt)
        messages.append(HumanMessage(content=current_user_message))

        llm_response = await agent.ainvoke({"messages": messages}, config=config)

        return state.model_copy(
            update={
                "agent_response": llm_response,
                "agent_data": {
                    "agent_type": "x_agent",
                    "tools_available": len(mcp_tools),
                    "streaming_enabled": True,
                },
            },
        )

    except (ValueError, ConnectionError, RuntimeError) as e:
        error_message = f"x_agent failed with error: {e}"
        logger.error(error_message, exc_info=True)

        return state.model_copy(update={"errors": [*state.errors, error_message]})

When I run this I only get two SSE events streamed:

data: {"type": "message_start", "timestamp": "2025-09-15T10:21:08.691839", "message_id": "uuid-x", "conversation_id": "uuid-x"}

data: {"type": "message_stop", "timestamp": "2025-09-15T10:21:14.778332", "message_id": "uuid-x", "stop_reason": "complete", "usage": {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}}

In my terminal I get:

2025-09-15 10:21:14.777 | INFO     | _stream_graph_execution:349 - Node completed with output keys: 
{
    "state": {
        "user_prompt": "hello!",
        "conversation_history": [{"role": "user", "content": "hello!"}],
        "next_task": None,
        "selected_agent": "x_agent",
        "agent_response": {
            "messages": [
                SystemMessage(
                    ...(system_prompt),
                ),
                HumanMessage(
                    content="hello!",
                    additional_kwargs={},
                    response_metadata={},
                    id="uuid_1",
                ),
                HumanMessage(
                    content='\nAnalyze this question',
                    additional_kwargs={},
                    response_metadata={},
                    id="uuid_2",
                ),
                AIMessage(
                    content="Hello! how can I help you?",
                    additional_kwargs={},
                    response_metadata={
                        "finish_reason": "stop",
                        "model_name": "x-model",
                        "system_fingerprint": "xxxx",
                    },
                    id="uuid_3",
                    usage_metadata={
                        "input_tokens": 6741,
                        "output_tokens": 127,
                        "total_tokens": 6868,
                        "input_token_details": {"audio": 0, "cache_read": 0},
                        "output_token_details": {"audio": 0, "reasoning": 0},
                    },
                ),
            ]
        },
        "agent_data": {
            "agent_type": "x_agent",
            "tools_available": 10,
            "streaming_enabled": True,
        },
        "response_content": "",
        "tool_calls": [],
        "citations": [],
        "token_usage": None,
        "errors": [],
        "created_at": datetime.datetime(
            2025, 9, 15, 17, 21, 9, 156410, tzinfo=datetime.timezone.utc
        ),
        "updated_at": None,
    }
}

What am I doing wrong? Is this related to this issue: When using create_react_agent with tools, executing astream with stream_mode="messages" does not produce streaming output. · Issue #5249 · langchain-ai/langgraph · GitHub

My langgraph and python version are:

    "langgraph>=0.6.7",

python 3.13.5

hey @jerson-censys ! Are you streaming with subgraph=True?

1 Like

Hi @jerson-censys

have you tried that configuration:

async for stream_mode, chunk in self.graph.astream(
initial_state.model_dump(),
config=config,
stream_mode=[“messages”, “updates”, “custom”],
subgraphs=True, # this is the key
):

I would suggest you first try with a minimal code with stream_mode=”messages” and confirm that it is working end-to-end

If that works it means the issue is the agent configuration.

If it works, then the issue might be with how you handle multiple modes or how you process stream_mode, chunk tuples.

what I have tried and it works

from langgraph.prebuilt import create_react_agent

def get_weather(city: str) -> str:
    """Get the weather in a given city"""
    return f"The weather in {city} is sunny"
agent = create_react_agent(
    model="openai:gpt-4o-mini",
    tools=[get_weather],
    prompt="You are a helpful assistant that can answer questions and help with tasks.",
)
input = {"messages": [{"role": "user", "content": "what is the weather in sf"}]}

async for stream_mode, chunk in agent.astream(input, stream_mode=["messages", "updates"]):
    if stream_mode == "messages":
        message_chunk, metadata = chunk
        print(f"Message Chunk: {message_chunk}")
    elif stream_mode == "updates":
        update_chunk = chunk
        print(f"Update Chunk: {update_chunk}")

Thanks guys I tried subgraphs=True and it works not, I also change the way I return the result from the LLM call inside the node.
Instead of returning:

// Iniside the node:
        result = await agent.ainvoke({"messages": messages}, config)

        return state.model_copy(
            update={
                "messages": result

I change it to:

        result = await agent.ainvoke({"messages": messages}, config)

        return state.model_copy(
            update={
                "messages": result["messages"],

That + the subgraphs=True fixed it.

1 Like