hi @danielonges
How to serialize v2 StreamPart to SSE
as far as I can read it from the source code, noo, you don’t dump the entire StreamPart dict as the SSE data field.
You map the StreamPart fields to separate SSE fields: type becomes the SSE event:, data becomes the SSE data:, and ns becomes a pipe-separated suffix on the event name.
V2 StreamPart structure (from langgraph/types.py):
Every V2 chunk is a StreamPart TypedDict with type, ns, and data fields. The StreamPart is a discriminated union of 7 concrete types (lines 329-339):
| TypedDict class |
type value |
data type |
Extra fields |
Description |
ValuesStreamPart |
"values" |
OutputT (full state dict) |
interrupts: tuple[Interrupt, ...] |
Full state snapshot after each step |
UpdatesStreamPart |
"updates" |
dict[str, Any] (node name → output) |
- |
Per-node outputs; may contain __interrupt__ and __metadata__ keys |
MessagesStreamPart |
"messages" |
tuple[AnyMessage, dict[str, Any]] |
- |
LLM message chunk + metadata (langgraph_step, langgraph_node, etc.) |
CustomStreamPart |
"custom" |
Any |
- |
User-defined data emitted via StreamWriter |
CheckpointStreamPart |
"checkpoints" |
CheckpointPayload |
- |
Checkpoint state (same format as get_state()) |
TasksStreamPart |
"tasks" |
TaskPayload | TaskResultPayload |
- |
Task start/result events |
DebugStreamPart |
"debug" |
DebugPayload |
- |
Debug events (combines checkpoints + tasks info) |
All types share the common ns: tuple[str, ...] field (empty tuple for root graph, populated for subgraph events).
# Example StreamPart dicts:
# ValuesStreamPart - note the extra "interrupts" field
{"type": "values", "ns": (), "data": {"messages": [...]}, "interrupts": ()}
# UpdatesStreamPart
{"type": "updates", "ns": (), "data": {"agent": {"messages": [...]}}}
# MessagesStreamPart - data is a 2-tuple [message, metadata]
{"type": "messages", "ns": (), "data": (<AIMessageChunk>, {"langgraph_step": 1, "langgraph_node": "agent"})}
# CustomStreamPart
{"type": "custom", "ns": (), "data": {"progress": 0.5}}
How to convert to SSE:
The mapping is straightforward - this is exactly what the JS LangGraph toEventStream function does internally (langgraph-core/src/pregel/stream.ts lines 356-394):
| V2 StreamPart field |
SSE field |
Transformation |
type |
event: |
Used directly (e.g., "values", "updates", "messages") |
ns |
appended to event: |
Joined with | separator (e.g., "values|parent:id|child:id") |
data |
data: |
JSON-serialized |
EXAMPLES
FastAPI implementation:
import json
from collections.abc import AsyncIterator
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from langchain_core.messages import BaseMessage
app = FastAPI()
# Assume `graph` is your compiled deepagent/LangGraph graph
def _serialize_data(stream_type: str, data) -> str:
"""Serialize StreamPart data to JSON."""
if stream_type == "messages":
# V2 messages data is a tuple: (BaseMessage, metadata_dict)
message, metadata = data
# Serialize the message using its dict representation
return json.dumps([_message_to_dict(message), metadata])
else:
# For values, updates, custom, etc. - serialize directly
return json.dumps(data, default=_default_serializer)
def _message_to_dict(message: BaseMessage) -> dict:
"""Convert a LangChain message to a serializable dict."""
d = message.model_dump()
d["type"] = message.type # ensure "ai", "human", "tool", etc.
return d
def _default_serializer(obj):
"""Fallback serializer for non-standard types."""
if hasattr(obj, "model_dump"):
return obj.model_dump()
if hasattr(obj, "__dict__"):
return obj.__dict__
return str(obj)
async def sse_generator(graph, input_data: dict) -> AsyncIterator[str]:
"""Convert V2 StreamPart dicts to SSE wire format."""
async for chunk in graph.astream(
input_data,
stream_mode=["values", "messages", "updates"],
version="v2",
):
stream_type = chunk["type"]
ns = chunk.get("ns", ())
data = chunk["data"]
# Build event name: "type" or "type|ns_part1|ns_part2"
if ns:
event_name = f"{stream_type}|{'|'.join(ns)}"
else:
event_name = stream_type
serialized_data = _serialize_data(stream_type, data)
yield f"event: {event_name}\ndata: {serialized_data}\n\n"
@app.post("/api/stream")
async def stream_endpoint(request: Request):
body = await request.json()
input_data = body.get("input", {})
return StreamingResponse(
sse_generator(graph, input_data),
media_type="text/event-stream",
headers={"Cache-Control": "no-store"},
)
Example SSE output from the FastAPI endpoint:
event: values
data: {"messages": [{"type": "human", "content": "Hello!", "id": "abc-123"}]}
event: messages
data: [{"type": "ai", "content": "", "id": "run-456", "tool_calls": []}, {"langgraph_step": 1, "langgraph_node": "agent"}]
event: messages
data: [{"type": "ai", "content": "Hi there!", "id": "run-456", "tool_calls": []}, {"langgraph_step": 1, "langgraph_node": "agent"}]
event: updates
data: {"agent": {"messages": [{"type": "ai", "content": "Hi there!", "id": "run-456"}]}}
event: values
data: {"messages": [{"type": "human", "content": "Hello!", "id": "abc-123"}, {"type": "ai", "content": "Hi there!", "id": "run-456"}]}
Important: The messages event data is a 2-element JSON array [message, metadata], NOT just the message. The metadata dict contains fields like langgraph_step, langgraph_node, langgraph_triggers, etc. that the frontend uses for display and routing. This is the "messages-tuple" wire format.
Source: MessagesStreamPart in langgraph/types.py - data is typed as tuple[AnyMessage, dict[str, Any]].
Question 2: can injectStream differentiate between stream modes?
Yes, absolutely. The frontend StreamManager differentiates events by their SSE event: field name. Each stream mode maps to a different event type, and the StreamManager routes them to the appropriate handler.
From libs/sdk/src/ui/manager.ts lines 757-912:
for await (const { event, data } of run) {
if (event === "error") { /* handle error */ }
if (event === "metadata") options.callbacks.onMetadataEvent?.(data);
if (event === "events") options.callbacks.onLangChainEvent?.(data);
if (this.matchEventType("updates", event, data)) {
// Handles "updates" and "updates|namespace" events
options.callbacks.onUpdateEvent?.(data, { namespace, mutate });
}
if (this.matchEventType("custom", event, data)) {
options.callbacks.onCustomEvent?.(data, { namespace, mutate });
}
// ... checkpoints, tasks, debug, tools ...
if (event === "values" || event.startsWith("values|")) {
// Updates the full state snapshot
this.setStreamValues(data as StateType);
}
if (this.matchEventType("messages", event, data)) {
// Processes [message, metadata] tuples for token-by-token streaming
const [serialized, metadata] = data;
this.messages.add(serialized, metadata);
}
}
The matchEventType function (manager.ts line 690-714) matches both exact event names and namespaced variants:
private matchEventType = (expected, actual, _data) => {
return expected === actual || actual.startsWith(`${expected}|`);
};
What this means for your backend: each V2 StreamPart type is automatically routed to the correct handler based on the SSE event: field:
SSE event: |
Frontend behavior |
values |
Updates stream.values() signal with full state; stream.messages() is derived from this |
updates |
Triggers onUpdateEvent callback; tracks subagent activity |
messages |
Adds message chunks to the streaming message list (token-by-token display) |
custom |
Triggers onCustomEvent callback |
error |
Sets stream.error() and stops the stream |
So when you emit event: updates\ndata: {...}\n\n and event: messages\ndata: [...]\n\n from your FastAPI endpoint, injectStream processes each one differently and routes it to the correct state/handler. You don’t need to do anything special on the frontend - the stream mode differentiation is built into the SSE event names.
For a basic chat UI, you typically want at least these stream modes:
"values" - for the final state snapshots (used for stream.messages() after each step)
"messages" - for token-by-token streaming display during LLM generation
"updates" - for tracking which node produced what (used for subagents, metadata display)
async for chunk in graph.astream(
input_data,
stream_mode=["values", "messages", "updates"],
version="v2",
):
# Each chunk has a different "type" -- your SSE serializer maps it to the event name
...