Parallel astream() on the same compiled graph leaks messages between streams

Hi,

I ran into the below issue. The script might need to be executed several times to hit the bug.
Is this referenced somewhere in the doc? Is this expected?

Thanks!

“”"Minimal reproduction of LangGraph concurrent astream() token-leak bug.

Bug:
When the same compiled graph (and therefore the same underlying chat
model instance) is streamed concurrently from two asyncio tasks with
stream_mode="messages", token chunks emitted by the model during one
task’s call can surface inside the other task’s astream() iterator.

Root cause (short version):
stream_mode="messages" installs a streaming callback handler through
LangChain’s callback-manager contextvar. The contextvar is per-asyncio-
Task in theory, but the compiled graph holds bound references to the
shared model’s configuration, and the BaseChatModel’s streaming path can
pick up the “wrong” callback manager when two tasks interleave inside
_astream. The result: token T produced by the model during Task B’s
invocation gets delivered to Task A’s queue.


Expected output when the bug reproduces:
Task A collected chunks that include text meant for Task B (or vice
versa). The script asserts isolation at the end; it raises
AssertionError when the bug is present.
“”"

from future import annotations

import asyncio
from typing import Any

from langchain_core.language_models.fake_chat_models import GenericFakeChatModel
from langchain_core.messages import AIMessage, HumanMessage
from langgraph.graph import END, START, MessagesState, StateGraph

def build_shared_graph(responses: list[str]) → Any:
“”"Build ONE compiled graph whose LLM node cycles through responses.

Using ``GenericFakeChatModel`` here keeps the repro hermetic — no
network, no API key — while still going through LangChain's
``BaseChatModel`` streaming machinery (which is where the leak
originates).
"""

# ``GenericFakeChatModel`` accepts an iterator of AIMessages and
# streams their content char-by-char through BaseChatModel._astream,
# which is the exact path LangGraph hooks its "messages" stream-mode
# callback into.
messages_iter = iter(AIMessage(content=r) for r in responses)
model = GenericFakeChatModel(messages=messages_iter)

async def call_model(state: MessagesState) -> dict:
    # Note: no explicit config plumbing. This mirrors what agent
    # middleware does internally — the model is called under whatever
    # RunnableConfig is current in the contextvar.
    reply = await model.ainvoke(state["messages"])
    return {"messages": [reply]}

builder: StateGraph = StateGraph(MessagesState)
builder.add_node("chat", call_model)
builder.add_edge(START, "chat")
builder.add_edge("chat", END)
return builder.compile()

async def run_stream(graph: Any, label: str, prompt: str, out: list[str]) → None:
“”"Drive graph.astream with stream_mode="messages".

Collects every AIMessageChunk text into ``out``. Each concurrent
caller gets its own ``out`` list — if isolation held, each list
should contain only the content produced for *its own* prompt.
"""
async for chunk, _metadata in graph.astream(
    {"messages": [HumanMessage(content=prompt)]},
    {"configurable": {"thread_id": label}},
    stream_mode="messages",
):
    # AIMessageChunk.content can be str or a list of content blocks.
    text = chunk.content if isinstance(chunk.content, str) else str(chunk.content)
    if text:
        out.append(text)
        # Small sleep to maximise interleaving between the two tasks.
        await asyncio.sleep(0)
print(f"[{label}] collected: {''.join(out)!r}")

async def main() → None:

Two clearly distinguishable responses so we can tell which task’s

stream a chunk belongs to by eyeballing the text alone.

response_a = “AAAA-AAAA-AAAA-AAAA-AAAA”
response_b = “BBBB-BBBB-BBBB-BBBB-BBBB”

# One shared compiled graph (same instance for both tasks) — this is
# exactly the condition under which the bug manifests in production.
# The model's internal iterator yields response_a first, then
# response_b; the two concurrent astreams will race to consume them.
graph = build_shared_graph([response_a, response_b])

collected_a: list[str] = []
collected_b: list[str] = []

await asyncio.gather(
    run_stream(graph, "A", "say AAAA", collected_a),
    run_stream(graph, "B", "say BBBB", collected_b),
)

text_a = "".join(collected_a)
text_b = "".join(collected_b)

print()
print(f"Task A final text: {text_a!r}")
print(f"Task B final text: {text_b!r}")
print()

# Isolation assertions. If the bug is present at least one of these
# will fail — a task will have received some of the *other* task's
# characters (or will be missing its own).
a_has_b_chunks = "B" in text_a
b_has_a_chunks = "A" in text_b

if a_has_b_chunks or b_has_a_chunks:
    print("LEAK DETECTED:")
    if a_has_b_chunks:
        print("  - Task A received B-chunks (should only contain A)")
    if b_has_a_chunks:
        print("  - Task B received A-chunks (should only contain B)")
    raise AssertionError("concurrent astream() leaked tokens across tasks")

print("No leak observed in this run.")

if name == “main”:
asyncio.run(main())

hi @kzoltan

Two independent bugs are causing this

After tracing through the source code of both LangGraph and LangChain-core, there are two distinct issues that can cause token chunks from one astream() call to appear in another’s iterator.


Bug 1 - Shared mutable state in GenericFakeChatModel

The reproduction uses GenericFakeChatModel, which stores responses as a plain Iterator on the instance:

# langchain_core/language_models/fake_chat_models.py
class GenericFakeChatModel(BaseChatModel):
    messages: Iterator[AIMessage | str]

    def _generate(self, messages, stop, run_manager, **kwargs):
        message = next(self.messages)   # ← advances the SHARED iterator

When the same compiled graph (and therefore the same model instance) is used for two concurrent astream() calls, both calls race on next(self.messages). One call may consume the message that was supposed to go to the other. This is a shared-state race condition, not a streaming-routing bug, but it produces the same observable symptom.

This applies to any model that stores per-request state at the instance level.


Bug 2 - inheritable_handlers list aliasing through on_chain_start

This is the deeper cause for the actual routing leak (tokens going to the wrong AsyncQueue).

How the messages stream is wired (in langgraph/pregel/main.py):

stream = AsyncQueue()
stream_put = partial(aioloop.call_soon_threadsafe, stream.put_nowait)

config = ensure_config(self.config, config)
callback_manager = get_async_callback_manager_for_config(config)
run_manager = await callback_manager.on_chain_start(None, input, ...)

if "messages" in stream_modes:
    run_manager.inheritable_handlers.append(
        StreamMessagesHandler(stream_put, subgraphs, parent_ns=...)  # ← bound to THIS call's queue
    )

Looks isolated - each call gets its own StreamMessagesHandler. But on_chain_start in LangChain-core (langchain_core/callbacks/manager.py) returns:

return AsyncCallbackManagerForChainRun(
    run_id=run_id,
    handlers=self.handlers,
    inheritable_handlers=self.inheritable_handlers,  # ← NO COPY - same list object!
    ...
)

So run_manager.inheritable_handlers IS callback_manager.inheritable_handlers. The append() call mutates the parent’s list, not a copy.

When does this cause a cross-stream leak?

When two concurrent astream() calls share the same callback_manager instance. This happens if:

  1. The same config dict containing a pre-existing AsyncCallbackManager is passed to both calls - get_async_callback_manager_for_config returns the manager as-is (no copy):

    # langgraph/_internal/_config.py
    if isinstance(callbacks, AsyncCallbackManager):
        manager = callbacks   # ← returned directly, no copy
    

    Also note: ensure_config’s inner loop for user-provided config does NOT copy callbacks:

    else:
        empty[k] = v   # ← callbacks from user config go through unchanged
    
  2. Running on Python < 3.11 where AsyncBackgroundExecutor.submit() cannot pass context=copy_context() to asyncio.create_task (_executor.py), so var_child_runnable_config context isolation between node tasks is unreliable.

The resulting contamination chain:

callback_manager (shared between both astream() calls)
    .inheritable_handlers = [existing_handlers...]

Call A: run_manager_A.inheritable_handlers IS the same list
        → append(StreamMessagesHandler_A)
        → list: [..., StreamMessagesHandler_A]

Call B: run_manager_B.inheritable_handlers IS the SAME list (already poisoned!)
        → append(StreamMessagesHandler_B)
        → list: [..., StreamMessagesHandler_A, StreamMessagesHandler_B]

Node tasks from Call B now inherit BOTH handlers.
B's LLM fires on_chat_model_start on BOTH StreamMessagesHandler_A and _B.
B's tokens get emitted by both → stream_A receives B's chunks.

Fixes

1. Always create a fresh config with a unique thread_id per call - never share the config dict

import uuid

async def handle_request(user_input: str):
    config = {"configurable": {"thread_id": str(uuid.uuid4())}}
    async for chunk in graph.astream(
        {"messages": [{"role": "user", "content": user_input}]},
        config,
        stream_mode="messages",
    ):
        yield chunk

2. Never share model instances that have per-request mutable state

# BAD for concurrent use - shared mutable iterator
model = GenericFakeChatModel(messages=iter(["A", "B"]))

# GOOD - stateless model, safe to share a single compiled graph
from langchain_anthropic import ChatAnthropic
model = ChatAnthropic(model="claude-sonnet-4-6")
graph = build_graph(model).compile()  # compile once, call many times safely

For testing with GenericFakeChatModel, create a separate instance (and thus a separate compiled graph) per logical request.

3. Always forward config: RunnableConfig into async model calls inside nodes (especially on Python < 3.11)

from langchain_core.runnables import RunnableConfig

async def agent_node(state: State, config: RunnableConfig) -> dict:
    # Explicitly forward config so the model picks up the correct callback chain
    response = await model.ainvoke(state["messages"], config=config)
    return {"messages": [response]}

The LangGraph streaming docs explicitly warn: on Python < 3.11 you must pass RunnableConfig explicitly into async LLM calls for streaming to work correctly.

4. Use create_agent for standard agent patterns

The prebuilt create_agent handles all the config propagation correctly and is designed for concurrent use out of the box.

5. Use the LangGraph Server for production concurrent workloads

The LangGraph Server provides per-request isolation at the infrastructure level - the recommended path for production.


Summary table

Symptom Root cause Fix
Wrong response content (AAAA/BBBB swapped) GenericFakeChatModel.messages is a shared Iterator across all calls to the same instance Separate model instance per request
Tokens routed to wrong stream on_chain_start shares inheritable_handlers by reference; shared callback_manager causes both StreamMessagesHandlers to appear in each other’s chains Never pass the same config dict (with a callback manager) to concurrent astream() calls; use a fresh config with unique thread_id per call
Silent token loss / wrong callbacks on Python < 3.11 asyncio.create_task(context=...) not available; var_child_runnable_config not propagated into node tasks Explicitly pass config: RunnableConfig into every async model call inside nodes

Hey Pawel,

Thanks for the analysis, it helped me to find the right track. I was able to work around it; the setup here is a bit complex, so instead of trying to explain it I asked my agent to write a minimal repro script:

"""Minimal reproduction of the cached-Pregel concurrent-astream metadata leak.

Bug:
  ``Pregel.astream()`` internally calls ``ensure_config(self.config, config)``
  which merges the per-call ``configurable.thread_id`` / ``user_id`` into the
  Pregel instance's own ``self.config["metadata"]`` **in place**. When the same
  compiled graph instance is reused across concurrent callers (the common
  pattern when graphs are cached), the first caller's ``thread_id`` stamp
  persists on the shared Pregel and every subsequent concurrent ``astream``
  emits chunks whose ``metadata["thread_id"]`` points at the wrong caller.

What this script does:
  - Builds ONE compiled object (either a raw LangGraph ``StateGraph`` or a
    LangChain ``create_agent`` agent; both compile to a cached Pregel).
  - Fires two concurrent ``astream()`` calls with distinct ``thread_id``s.
  - Records ``metadata["thread_id"]`` for every chunk each caller receives.
  - Fails loudly if any caller saw a chunk whose metadata does NOT carry its
    own thread_id -- that is the leak.

Why a real ChatAnthropic (not GenericFakeChatModel):
  ``GenericFakeChatModel`` has per-request mutable state (a shared
  ``messages`` iterator). Using it here would confound the Pregel-metadata
  leak with a separate model-level race and make the reproduction ambiguous.
  ``ChatAnthropic`` holds no per-request mutable state, so any cross-caller
  contamination we observe is cleanly attributable to the Pregel bug.

How to run:
  export ANTHROPIC_API_KEY=...
  export ANTHROPIC_BASE_URL=...
  uv run python repro_concurrent_stream_leak [--mode graph|agent] [--fix]

Pass ``--fix`` to apply the in-project mitigation (reset the cached
Pregel's ``self.config`` to its pristine post-compile state before each
``astream``) and confirm the leak disappears.
"""

from __future__ import annotations

import argparse
import asyncio
import os
from typing import Any

from dotenv import load_dotenv
from langchain.agents import create_agent
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langgraph.graph import END, START, MessagesState, StateGraph
from pydantic import SecretStr


def _make_model() -> ChatAnthropic:
    """Instantiate the real Anthropic chat model used in production."""
    api_key = os.environ["ANTHROPIC_API_KEY"]
    return ChatAnthropic(
        model_name="anthropic.claude-haiku-4-5-20251001-v1:0",
        api_key=SecretStr(api_key),
        base_url=os.environ.get("ANTHROPIC_BASE_URL"),
        default_headers={"api-key": api_key},
        streaming=True,
        timeout=None,
        stop=None,
    )


def build_langgraph_graph() -> Any:
    """Build a plain LangGraph ``StateGraph`` with a single chat node.

    Pre-condition for the bug: ``self.config`` must contain a ``metadata``
    dict. ``ensure_config`` only mutates in place if ``metadata`` is already
    present on the Pregel's own config -- otherwise a fresh ``ChainMap`` is
    used and no cross-call aliasing can happen. Production graphs land in
    this state because ``create_agent(name=...)`` (and other entry points)
    populate ``self.config["metadata"]["lc_agent_name"]``. We reproduce that
    with ``with_config`` below.
    """
    model = _make_model()

    async def call_model(state: MessagesState) -> dict:
        reply = await model.ainvoke(state["messages"])
        return {"messages": [reply]}

    builder: StateGraph = StateGraph(MessagesState)
    builder.add_node("chat", call_model)
    builder.add_edge(START, "chat")
    builder.add_edge("chat", END)
    # ``with_config`` returns a new Pregel whose ``self.config`` is
    # ``{"metadata": {"lc_agent_name": "..."}, "configurable": {}}`` -- the
    # exact shape that triggers the in-place mutation in
    # ``langgraph._internal._config.ensure_config``.
    return builder.compile().with_config({"metadata": {"lc_agent_name": "ReproGraph"}})


def build_langchain_agent() -> Any:
    """Build a LangChain ``create_agent`` agent.

    ``create_agent(name=...)`` compiles to a Pregel whose ``self.config`` is
    ``{"metadata": {"lc_agent_name": name}, "configurable": {}}`` -- same
    pre-condition as the graph variant, reached via a different (production)
    entry point.
    """

    @tool
    def noop(arg: str = "") -> str:
        """No-op tool; present only so create_agent has something to bind."""
        return arg or "ok"

    return create_agent(model=_make_model(), tools=[noop], name="ReproAgent")


async def run_stream(
    graph: Any,
    thread_id: str,
    prompt: str,
    seen_thread_ids: list[str],
    apply_fix: bool = False,
) -> None:
    """Drive ``graph.astream`` with ``stream_mode="messages"``.

    For every chunk, record ``metadata["thread_id"]`` into ``seen_thread_ids``.
    When the bug is active, at least one recorded id will NOT equal
    ``thread_id`` -- the caller received a chunk stamped with another
    concurrent caller's thread.

    When ``apply_fix`` is True, apply the mitigation deployed in-project
    (see ``servicekit/graph_manager.py::_stream_single_run`` and
    ``agent_core/tools/task_tool.py::_stream_sub_agent``): reset the cached
    Pregel's ``self.config`` to its pristine post-compile state immediately
    before each ``astream`` call, preserving only ``lc_agent_name``.
    """
    if apply_fix:
        _reset_pristine_config(graph)

    async for _chunk, metadata in graph.astream(
        {"messages": [HumanMessage(content=prompt)]},
        {"configurable": {"thread_id": thread_id}},
        stream_mode="messages",
    ):
        seen_thread_ids.append(str(metadata.get("thread_id")))
        # Yield to the event loop to maximise interleaving between the
        # two concurrent astream drivers.
        await asyncio.sleep(0)
    print(
        f"[{thread_id}] {len(seen_thread_ids)} chunks, "
        f"unique meta thread_ids seen: {sorted(set(seen_thread_ids))}"
    )


def _reset_pristine_config(agent: Any) -> None:
    """Mitigation from the BUG writeup.

    ``Pregel.astream`` calls ``ensure_config(self.config, config)`` which
    merges the per-call ``configurable`` values (``thread_id``, ``user_id``,
    ...) directly into the Pregel's own ``self.config["metadata"]`` dict.
    Because the compiled graph is cached, that mutation persists across
    concurrent callers and pollutes subsequent streams' chunk metadata.

    Resetting ``self.config`` to its pristine post-compile shape
    (``metadata`` containing only ``lc_agent_name``, ``configurable`` empty)
    immediately before each ``astream`` invocation makes the mutation a
    no-op for routing purposes: each caller's own ``configurable.thread_id``
    wins on its own call, with no carry-over from prior callers.

    Placement rationale (from the writeup): the reset must be co-located
    with the ``astream`` call, not done once at compile time -- every
    ``astream`` re-pollutes, so every ``astream`` needs a prior reset.
    """
    _agent_config_attr = getattr(agent, "config", None)
    if isinstance(_agent_config_attr, dict):
        _meta = _agent_config_attr.get("metadata") or {}
        _agent_config_attr["metadata"] = {"lc_agent_name": _meta.get("lc_agent_name") or agent.name}
        _agent_config_attr["configurable"] = {}


async def main(mode: str, apply_fix: bool = False) -> None:
    if mode == "graph":
        graph = build_langgraph_graph()
    elif mode == "agent":
        graph = build_langchain_agent()
    else:
        raise ValueError(f"unknown mode: {mode!r}")

    thread_a = "thread-AAAA"
    thread_b = "thread-BBBB"
    prompt_a = "Count from 1 to 20 slowly, one number per line."
    prompt_b = "List the planets of the solar system, one per line."

    seen_a: list[str] = []
    seen_b: list[str] = []

    # Same compiled graph instance for both concurrent callers -- this is
    # the condition the bug requires (cached Pregel reuse).
    await asyncio.gather(
        run_stream(graph, thread_a, prompt_a, seen_a, apply_fix=apply_fix),
        run_stream(graph, thread_b, prompt_b, seen_b, apply_fix=apply_fix),
    )

    a_leaks = [tid for tid in seen_a if tid != thread_a]
    b_leaks = [tid for tid in seen_b if tid != thread_b]

    print()
    print(f"Caller {thread_a}: {len(a_leaks)}/{len(seen_a)} chunks mis-stamped")
    print(f"Caller {thread_b}: {len(b_leaks)}/{len(seen_b)} chunks mis-stamped")

    if a_leaks or b_leaks:
        print()
        print("LEAK DETECTED: chunk metadata.thread_id does not match caller.")
        if a_leaks:
            print(f"  {thread_a} saw foreign thread_ids: {sorted(set(a_leaks))}")
        if b_leaks:
            print(f"  {thread_b} saw foreign thread_ids: {sorted(set(b_leaks))}")
        raise AssertionError("Pregel.astream() leaked thread_id across concurrent callers")

    print("No leak observed in this run.")


def run() -> None:
    load_dotenv(".env")
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument(
        "--mode",
        choices=("graph", "agent"),
        default="graph",
        help="Which compiled-Pregel variant to stress (default: graph).",
    )
    parser.add_argument(
        "--loop",
        action="store_true",
        help="Run forever; the leak is timing-sensitive.",
    )
    parser.add_argument(
        "--fix",
        action="store_true",
        help=(
            "Apply the in-project mitigation (reset cached Pregel "
            "self.config before each astream) and verify the leak is gone."
        ),
    )
    args = parser.parse_args()

    print("-" * 80)
    print(f"mode={args.mode}  loop={args.loop}  fix={args.fix}")
    print("-" * 80)
    if args.loop:
        while True:
            asyncio.run(main(args.mode, apply_fix=args.fix))
    else:
        asyncio.run(main(args.mode, apply_fix=args.fix))


if __name__ == "__main__":
    run()

EDIT: added fix to the script

@pawel-twardziak not sure this is a bug or just a “trap”, but it might be worth looking at it at some point (see the above code)
If not, just ignore it :wink: