Streaming from langgraph-supervisor

I was previously using an agent and streaming worked fine. For some decision, i switched to the langgraph-supervisor library and the streaming code does not work. I get the entire text response and not just a token. Can someone help me out?

```
async for chunk in supervisor.astream(state, config=config, stream_mode=“messages” ):
print(chunk[0].content)

I get ‘Can you please let me …. {till end of sentence}

```

I am using gradio as the UI and trying to get the streamed tokens there.

Also I get RuntimeError: async generator raised StopAsyncIteration error. Here is my full code block

output = ""
try:
        async for chunk in supervisor.astream(
            state, config=config, stream_mode="messages"
        ):
            msg, metadata = chunk
            current_node = metadata.get("langgraph_node")
            if current_node == "supervisor": # only want to print supervisor's response for now)
                if hasattr(msg, "content") and type(msg) is AIMessageChunk:
                    content = msg.content
                    # print(content, flush=True)  # Ensure immediate output
                    # Format as Server-Sent Events (SSE)
                    output += f"{content}"
                    print(output) # prints entire sentence
                    yield output
    except StopAsyncIteration:
        pass

Hi @mgathena

could you try out these examples:

Token-by-token (yield only the new token for Gradio):

async def stream_supervisor_tokens(supervisor, state, config) -> AsyncGenerator[str, None]:
    async for message, meta in supervisor.astream(state, config=config, stream_mode="messages"):
        if meta.get("langgraph_node") != "supervisor":
            continue
        if isinstance(message, AIMessageChunk):
            token = message.content or ""
            # yield only the delta token
            yield token
        elif isinstance(message, AIMessage):
            # optional: handle final full message; often not needed for token streaming
            pass

Buffered (yield the accumulated text each step for Gradio’s running text experience):

async def stream_supervisor_buffered(supervisor, state, config) -> AsyncGenerator[str, None]:
    buffer = ""
    async for message, meta in supervisor.astream(state, config=config, stream_mode="messages"):
        if meta.get("langgraph_node") != "supervisor":
            continue
        if isinstance(message, AIMessageChunk):
            buffer += message.content or ""
            yield buffer
        elif isinstance(message, AIMessage):
            # final full message arrived; you can choose to flush/ignore
            pass

Gradio integration:

import gradio as gr

def chat(user_text: str):
    state = {"messages": [{"role": "user", "content": user_text}]}
    # return an async generator; Gradio will stream it
    return stream_supervisor_buffered(supervisor, state, config)

demo = gr.Interface(fn=chat, inputs="text", outputs="text")
demo.launch()
1 Like