I was previously using an agent and streaming worked fine. For some decision, i switched to the langgraph-supervisor library and the streaming code does not work. I get the entire text response and not just a token. Can someone help me out?
```
async for chunk in supervisor.astream(state, config=config, stream_mode=“messages” ):
print(chunk[0].content)
I get ‘Can you please let me …. {till end of sentence}
```
I am using gradio as the UI and trying to get the streamed tokens there.
Also I get RuntimeError: async generator raised StopAsyncIteration error. Here is my full code block
output = ""
try:
async for chunk in supervisor.astream(
state, config=config, stream_mode="messages"
):
msg, metadata = chunk
current_node = metadata.get("langgraph_node")
if current_node == "supervisor": # only want to print supervisor's response for now)
if hasattr(msg, "content") and type(msg) is AIMessageChunk:
content = msg.content
# print(content, flush=True) # Ensure immediate output
# Format as Server-Sent Events (SSE)
output += f"{content}"
print(output) # prints entire sentence
yield output
except StopAsyncIteration:
pass
Hi @mgathena
could you try out these examples:
Token-by-token (yield only the new token for Gradio):
async def stream_supervisor_tokens(supervisor, state, config) -> AsyncGenerator[str, None]:
async for message, meta in supervisor.astream(state, config=config, stream_mode="messages"):
if meta.get("langgraph_node") != "supervisor":
continue
if isinstance(message, AIMessageChunk):
token = message.content or ""
# yield only the delta token
yield token
elif isinstance(message, AIMessage):
# optional: handle final full message; often not needed for token streaming
pass
Buffered (yield the accumulated text each step for Gradio’s running text experience):
async def stream_supervisor_buffered(supervisor, state, config) -> AsyncGenerator[str, None]:
buffer = ""
async for message, meta in supervisor.astream(state, config=config, stream_mode="messages"):
if meta.get("langgraph_node") != "supervisor":
continue
if isinstance(message, AIMessageChunk):
buffer += message.content or ""
yield buffer
elif isinstance(message, AIMessage):
# final full message arrived; you can choose to flush/ignore
pass
Gradio integration:
import gradio as gr
def chat(user_text: str):
state = {"messages": [{"role": "user", "content": user_text}]}
# return an async generator; Gradio will stream it
return stream_supervisor_buffered(supervisor, state, config)
demo = gr.Interface(fn=chat, inputs="text", outputs="text")
demo.launch()
1 Like