Hello @whatCanIsay321
Great question, this is a real gap that trips up many people building streaming LangGraph agents. Here’s what’s happening and how to address both scenarios.
Why the partial output is lost
LangGraph writes a checkpoint at the boundary of each super-step (after a node fully completes and its outputs pass through reducers). When your frontend disconnects mid-stream, the model node never returns, so:
- The node’s output never flows through the
messages reducer
- The in-progress super-step checkpoint is never committed
- The next run on that thread sees the last complete checkpoint, which is the state from before the cancelled LLM call
The key is to intercept streamed tokens on the backend, accumulate them, and, after cancellation , manually write the partial content back into the thread using update_state.
from langchain_core.messages import AIMessage
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import InMemorySaver
import asyncio
checkpointer = InMemorySaver()
graph = ... # your compiled graph with checkpointer
async def stream_with_cancel_recovery(thread_id: str, user_input: str):
config = {"configurable": {"thread_id": thread_id}}
partial_chunks = []
try:
async for event in graph.astream(
{"messages": [{"role": "user", "content": user_input}]},
config,
stream_mode="messages", # gives you token-level chunks
):
# event is (message_chunk, metadata) in "messages" mode
chunk, metadata = event
partial_chunks.append(chunk)
yield chunk # stream to frontend
except (asyncio.CancelledError, GeneratorExit):
# Reconstruct whatever was generated so far
if partial_chunks:
partial_content = "".join(
c.content for c in partial_chunks if hasattr(c, "content")
)
partial_ai_message = AIMessage(
content=partial_content,
additional_kwargs={"cancelled": True},
)
# Write the partial message into the thread state manually.
# This creates a new checkpoint so the next run sees the partial output.
graph.update_state(
config,
{"messages": [partial_ai_message]},
as_node="llm", # treat it as if the node produced this output
)
update_state creates a new checkpoint with the partial content as the node’s output, so when the user later asks “please continue the story,” the thread history contains the partial text and the model can resume from it.
Note: Use stream_mode="messages" (not "updates") to get individual token chunks as they arrive. This gives you the finest granularity to reconstruct what was generated.
LangGraph’s time-travel support makes this straightforward. Every update_state call creates a new checkpoint, and you can fork from any checkpoint via checkpoint_id:
async def stream_as_branch(thread_id: str, user_input: str):
config = {"configurable": {"thread_id": thread_id}}
partial_chunks = []
try:
async for chunk, metadata in graph.astream(
{"messages": [{"role": "user", "content": user_input}]},
config,
stream_mode="messages",
):
partial_chunks.append(chunk)
yield chunk
except (asyncio.CancelledError, GeneratorExit):
if partial_chunks:
partial_content = "".join(
c.content for c in partial_chunks if hasattr(c, "content")
)
partial_ai_message = AIMessage(content=partial_content)
# update_state returns the config of the NEW checkpoint it created
fork_config = graph.update_state(
config,
{"messages": [partial_ai_message]},
as_node="llm",
)
# The fork_config["configurable"]["checkpoint_id"] is the branch point.
# Store this on the frontend to let the user "continue from here"
# on a separate branch rather than the main thread.
fork_checkpoint_id = fork_config["configurable"]["checkpoint_id"]
print(f"Branch checkpoint: {fork_checkpoint_id}")
# Return this to the frontend so it can pass checkpoint_id
# in future requests to continue from this specific branch
To continue from the branch later:
branch_config = {
"configurable": {
"thread_id": thread_id,
"checkpoint_id": fork_checkpoint_id, # resume from the cancelled point
}
}
async for chunk, _ in graph.astream(
{"messages": [{"role": "user", "content": "please continue the story"}]},
branch_config,
stream_mode="messages",
):
...
Summary
| Goal |
Mechanism |
| Preserve partial output |
Accumulate streamed tokens, call update_state on cancel |
| Resume seamlessly |
Thread history now includes the partial message |
| Separate branch |
Store the checkpoint_id returned by update_state, pass it as checkpoint_id in the next run’s config |
The core insight is that LangGraph only commits state at node boundaries, it will never automatically save a half-generated message. You have to catch the cancellation on the backend and write partial state yourself before the cancelled coroutine fully unwinds.
This answer draws on LangGraph’s persistence docs, specifically the super-step checkpoint model and update_state, and langchain-core’s AsyncChatModelStream.aclose which handles the CancelledError path at the LLM layer. The stream_mode="messages" trick is the key to getting per-token granularity without waiting for a full node completion.