Hi,
I ran into the below issue. The script might need to be executed several times to hit the bug.
Is this referenced somewhere in the doc? Is this expected?
Thanks!
“”"Minimal reproduction of LangGraph concurrent astream() token-leak bug.
Bug:
When the same compiled graph (and therefore the same underlying chat
model instance) is streamed concurrently from two asyncio tasks with
stream_mode="messages", token chunks emitted by the model during one
task’s call can surface inside the other task’s astream() iterator.
Root cause (short version):
stream_mode="messages" installs a streaming callback handler through
LangChain’s callback-manager contextvar. The contextvar is per-asyncio-
Task in theory, but the compiled graph holds bound references to the
shared model’s configuration, and the BaseChatModel’s streaming path can
pick up the “wrong” callback manager when two tasks interleave inside
_astream. The result: token T produced by the model during Task B’s
invocation gets delivered to Task A’s queue.
Expected output when the bug reproduces:
Task A collected chunks that include text meant for Task B (or vice
versa). The script asserts isolation at the end; it raises
AssertionError when the bug is present.
“”"
from future import annotations
import asyncio
from typing import Any
from langchain_core.language_models.fake_chat_models import GenericFakeChatModel
from langchain_core.messages import AIMessage, HumanMessage
from langgraph.graph import END, START, MessagesState, StateGraph
def build_shared_graph(responses: list[str]) → Any:
“”"Build ONE compiled graph whose LLM node cycles through responses.
Using ``GenericFakeChatModel`` here keeps the repro hermetic — no
network, no API key — while still going through LangChain's
``BaseChatModel`` streaming machinery (which is where the leak
originates).
"""
# ``GenericFakeChatModel`` accepts an iterator of AIMessages and
# streams their content char-by-char through BaseChatModel._astream,
# which is the exact path LangGraph hooks its "messages" stream-mode
# callback into.
messages_iter = iter(AIMessage(content=r) for r in responses)
model = GenericFakeChatModel(messages=messages_iter)
async def call_model(state: MessagesState) -> dict:
# Note: no explicit config plumbing. This mirrors what agent
# middleware does internally — the model is called under whatever
# RunnableConfig is current in the contextvar.
reply = await model.ainvoke(state["messages"])
return {"messages": [reply]}
builder: StateGraph = StateGraph(MessagesState)
builder.add_node("chat", call_model)
builder.add_edge(START, "chat")
builder.add_edge("chat", END)
return builder.compile()
async def run_stream(graph: Any, label: str, prompt: str, out: list[str]) → None:
“”"Drive graph.astream with stream_mode="messages".
Collects every AIMessageChunk text into ``out``. Each concurrent
caller gets its own ``out`` list — if isolation held, each list
should contain only the content produced for *its own* prompt.
"""
async for chunk, _metadata in graph.astream(
{"messages": [HumanMessage(content=prompt)]},
{"configurable": {"thread_id": label}},
stream_mode="messages",
):
# AIMessageChunk.content can be str or a list of content blocks.
text = chunk.content if isinstance(chunk.content, str) else str(chunk.content)
if text:
out.append(text)
# Small sleep to maximise interleaving between the two tasks.
await asyncio.sleep(0)
print(f"[{label}] collected: {''.join(out)!r}")
async def main() → None:
Two clearly distinguishable responses so we can tell which task’s
stream a chunk belongs to by eyeballing the text alone.
response_a = “AAAA-AAAA-AAAA-AAAA-AAAA”
response_b = “BBBB-BBBB-BBBB-BBBB-BBBB”
# One shared compiled graph (same instance for both tasks) — this is
# exactly the condition under which the bug manifests in production.
# The model's internal iterator yields response_a first, then
# response_b; the two concurrent astreams will race to consume them.
graph = build_shared_graph([response_a, response_b])
collected_a: list[str] = []
collected_b: list[str] = []
await asyncio.gather(
run_stream(graph, "A", "say AAAA", collected_a),
run_stream(graph, "B", "say BBBB", collected_b),
)
text_a = "".join(collected_a)
text_b = "".join(collected_b)
print()
print(f"Task A final text: {text_a!r}")
print(f"Task B final text: {text_b!r}")
print()
# Isolation assertions. If the bug is present at least one of these
# will fail — a task will have received some of the *other* task's
# characters (or will be missing its own).
a_has_b_chunks = "B" in text_a
b_has_a_chunks = "A" in text_b
if a_has_b_chunks or b_has_a_chunks:
print("LEAK DETECTED:")
if a_has_b_chunks:
print(" - Task A received B-chunks (should only contain A)")
if b_has_a_chunks:
print(" - Task B received A-chunks (should only contain B)")
raise AssertionError("concurrent astream() leaked tokens across tasks")
print("No leak observed in this run.")
if name == “main”:
asyncio.run(main())