Using the useStream frontend API with custom FastAPI backend

Hi all,

My team and I are building our first LangChain application using the deepagents framework (0.5.0a2), and we are using FastAPI as our backend in order to integrate the deepagent with our custom business logic. We currently expose the deepagent’s streaming API using a FastAPI SSE route.

I was wondering if it was possible for our frontend to use the useStream API in order to consume this custom streaming endpoint that we have? We are currently using Angular to build our frontend. So far from the documentation, I seem to understand that the useStream API only works with a LangGraph server.

Alternatively, would appreciate any suggestions on the best way to render streaming endpoints on the frotnend. Thanks in advance!

hi @danielonges

Yes, useStream / injectStream absolutely supports custom backends - including a custom FastAPI server. You are not locked into using a LangGraph Platform server. The SDK provides a dedicated “custom transport” mode specifically for this use case.

Btw, since you’re using Angular, the function is called injectStream (following Angular’s inject naming convention). useStream is still available as a deprecated alias.

It’s good to use the latest versions of packages from LangChain

Source: @langchain/angular README

Mode A: LangGraph Platform (default) - requires assistantId + apiUrl, connects to a LangGraph-managed server.

stream = injectStream({
  assistantId: "agent",
  apiUrl: "http://localhost:2024",
});

Mode B: Custom Transport - uses the transport option to connect to any backend that returns SSE in the correct format. This is what you need.

import { injectStream, FetchStreamTransport } from "@langchain/angular";

stream = injectStream({
  transport: new FetchStreamTransport({
    apiUrl: "https://your-fastapi-server.com/api/stream",
  }),
});

Docs

Source: libs/sdk/src/ui/transport.ts - FetchStreamTransport implementation.

injectStream + custom UseStreamTransport is also possible for total control over request/response - I would not recommend it though :slight_smile:

Hey @pawel-twardziak,

Thank you for the speedy reply! I just wanted to clarify one more thing. You mentioned that Mode B uses the transport option to connect to any backend that returns SSE in the correct format – if I am using the V2 streaming API ( Streaming - Docs by LangChain ), do I just serialize the entire chunk and return it as part of the SSE “data” field to the frontend? Furthermore, is the injectStream API able to pick up on the different modes of streaming, e.g. between “updates” and “messages”?

hi @danielonges

How to serialize v2 StreamPart to SSE

as far as I can read it from the source code, noo, you don’t dump the entire StreamPart dict as the SSE data field.
You map the StreamPart fields to separate SSE fields: type becomes the SSE event:, data becomes the SSE data:, and ns becomes a pipe-separated suffix on the event name.

V2 StreamPart structure (from langgraph/types.py):

Every V2 chunk is a StreamPart TypedDict with type, ns, and data fields. The StreamPart is a discriminated union of 7 concrete types (lines 329-339):

TypedDict class type value data type Extra fields Description
ValuesStreamPart "values" OutputT (full state dict) interrupts: tuple[Interrupt, ...] Full state snapshot after each step
UpdatesStreamPart "updates" dict[str, Any] (node name → output) - Per-node outputs; may contain __interrupt__ and __metadata__ keys
MessagesStreamPart "messages" tuple[AnyMessage, dict[str, Any]] - LLM message chunk + metadata (langgraph_step, langgraph_node, etc.)
CustomStreamPart "custom" Any - User-defined data emitted via StreamWriter
CheckpointStreamPart "checkpoints" CheckpointPayload - Checkpoint state (same format as get_state())
TasksStreamPart "tasks" TaskPayload | TaskResultPayload - Task start/result events
DebugStreamPart "debug" DebugPayload - Debug events (combines checkpoints + tasks info)

All types share the common ns: tuple[str, ...] field (empty tuple for root graph, populated for subgraph events).

# Example StreamPart dicts:

# ValuesStreamPart - note the extra "interrupts" field
{"type": "values", "ns": (), "data": {"messages": [...]}, "interrupts": ()}

# UpdatesStreamPart
{"type": "updates", "ns": (), "data": {"agent": {"messages": [...]}}}

# MessagesStreamPart - data is a 2-tuple [message, metadata]
{"type": "messages", "ns": (), "data": (<AIMessageChunk>, {"langgraph_step": 1, "langgraph_node": "agent"})}

# CustomStreamPart
{"type": "custom", "ns": (), "data": {"progress": 0.5}}

How to convert to SSE:

The mapping is straightforward - this is exactly what the JS LangGraph toEventStream function does internally (langgraph-core/src/pregel/stream.ts lines 356-394):

V2 StreamPart field SSE field Transformation
type event: Used directly (e.g., "values", "updates", "messages")
ns appended to event: Joined with | separator (e.g., "values|parent:id|child:id")
data data: JSON-serialized

EXAMPLES

FastAPI implementation:

import json
from collections.abc import AsyncIterator
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from langchain_core.messages import BaseMessage

app = FastAPI()

# Assume `graph` is your compiled deepagent/LangGraph graph

def _serialize_data(stream_type: str, data) -> str:
    """Serialize StreamPart data to JSON."""
    if stream_type == "messages":
        # V2 messages data is a tuple: (BaseMessage, metadata_dict)
        message, metadata = data
        # Serialize the message using its dict representation
        return json.dumps([_message_to_dict(message), metadata])
    else:
        # For values, updates, custom, etc. - serialize directly
        return json.dumps(data, default=_default_serializer)


def _message_to_dict(message: BaseMessage) -> dict:
    """Convert a LangChain message to a serializable dict."""
    d = message.model_dump()
    d["type"] = message.type  # ensure "ai", "human", "tool", etc.
    return d


def _default_serializer(obj):
    """Fallback serializer for non-standard types."""
    if hasattr(obj, "model_dump"):
        return obj.model_dump()
    if hasattr(obj, "__dict__"):
        return obj.__dict__
    return str(obj)


async def sse_generator(graph, input_data: dict) -> AsyncIterator[str]:
    """Convert V2 StreamPart dicts to SSE wire format."""
    async for chunk in graph.astream(
        input_data,
        stream_mode=["values", "messages", "updates"],
        version="v2",
    ):
        stream_type = chunk["type"]
        ns = chunk.get("ns", ())
        data = chunk["data"]

        # Build event name: "type" or "type|ns_part1|ns_part2"
        if ns:
            event_name = f"{stream_type}|{'|'.join(ns)}"
        else:
            event_name = stream_type

        serialized_data = _serialize_data(stream_type, data)
        yield f"event: {event_name}\ndata: {serialized_data}\n\n"


@app.post("/api/stream")
async def stream_endpoint(request: Request):
    body = await request.json()
    input_data = body.get("input", {})

    return StreamingResponse(
        sse_generator(graph, input_data),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-store"},
    )

Example SSE output from the FastAPI endpoint:

event: values
data: {"messages": [{"type": "human", "content": "Hello!", "id": "abc-123"}]}

event: messages
data: [{"type": "ai", "content": "", "id": "run-456", "tool_calls": []}, {"langgraph_step": 1, "langgraph_node": "agent"}]

event: messages
data: [{"type": "ai", "content": "Hi there!", "id": "run-456", "tool_calls": []}, {"langgraph_step": 1, "langgraph_node": "agent"}]

event: updates
data: {"agent": {"messages": [{"type": "ai", "content": "Hi there!", "id": "run-456"}]}}

event: values
data: {"messages": [{"type": "human", "content": "Hello!", "id": "abc-123"}, {"type": "ai", "content": "Hi there!", "id": "run-456"}]}

Important: The messages event data is a 2-element JSON array [message, metadata], NOT just the message. The metadata dict contains fields like langgraph_step, langgraph_node, langgraph_triggers, etc. that the frontend uses for display and routing. This is the "messages-tuple" wire format.

Source: MessagesStreamPart in langgraph/types.py - data is typed as tuple[AnyMessage, dict[str, Any]].

Question 2: can injectStream differentiate between stream modes?

Yes, absolutely. The frontend StreamManager differentiates events by their SSE event: field name. Each stream mode maps to a different event type, and the StreamManager routes them to the appropriate handler.

From libs/sdk/src/ui/manager.ts lines 757-912:

for await (const { event, data } of run) {
  if (event === "error") { /* handle error */ }
  if (event === "metadata") options.callbacks.onMetadataEvent?.(data);
  if (event === "events") options.callbacks.onLangChainEvent?.(data);

  if (this.matchEventType("updates", event, data)) {
    // Handles "updates" and "updates|namespace" events
    options.callbacks.onUpdateEvent?.(data, { namespace, mutate });
  }

  if (this.matchEventType("custom", event, data)) {
    options.callbacks.onCustomEvent?.(data, { namespace, mutate });
  }

  // ... checkpoints, tasks, debug, tools ...

  if (event === "values" || event.startsWith("values|")) {
    // Updates the full state snapshot
    this.setStreamValues(data as StateType);
  }

  if (this.matchEventType("messages", event, data)) {
    // Processes [message, metadata] tuples for token-by-token streaming
    const [serialized, metadata] = data;
    this.messages.add(serialized, metadata);
  }
}

The matchEventType function (manager.ts line 690-714) matches both exact event names and namespaced variants:

private matchEventType = (expected, actual, _data) => {
  return expected === actual || actual.startsWith(`${expected}|`);
};

What this means for your backend: each V2 StreamPart type is automatically routed to the correct handler based on the SSE event: field:

SSE event: Frontend behavior
values Updates stream.values() signal with full state; stream.messages() is derived from this
updates Triggers onUpdateEvent callback; tracks subagent activity
messages Adds message chunks to the streaming message list (token-by-token display)
custom Triggers onCustomEvent callback
error Sets stream.error() and stops the stream

So when you emit event: updates\ndata: {...}\n\n and event: messages\ndata: [...]\n\n from your FastAPI endpoint, injectStream processes each one differently and routes it to the correct state/handler. You don’t need to do anything special on the frontend - the stream mode differentiation is built into the SSE event names.

For a basic chat UI, you typically want at least these stream modes:

  • "values" - for the final state snapshots (used for stream.messages() after each step)
  • "messages" - for token-by-token streaming display during LLM generation
  • "updates" - for tracking which node produced what (used for subagents, metadata display)
async for chunk in graph.astream(
    input_data,
    stream_mode=["values", "messages", "updates"],
    version="v2",
):
    # Each chunk has a different "type" -- your SSE serializer maps it to the event name
    ...

Hey @pawel-twardziak,

Thank you for the extremely detailed explanation along with code samples, they were really helpful in helping to understand how to implement the whole streaming endpoint.

Another question: am I able to use the injectStream API to fetch chat history onto the UI? Or must I retrieve chat history using get_state_history (Persistence - Docs by LangChain), and then map the result to my UI components individually?