LangChain Agents: stream_mode="messages" intermittently emits cumulative AIMessageChunk (large duplicate text mid-response)

Hey everyone,

I am currently working on creating a RAG agent using the create_agent() method and a retrieval tool. I have already implemented everything and tried streaming it using stream_mode=”values”, and that works perfectly fine. For my application, I need to stream it token-by-token using stream_mode=”messages”, but I am running into a very weird error which only happens sometimes and is therefore difficult to reproduce.

Basically, when I send a message that would have a short answer and doesn’t require a tool call, it works fine. But sometimes when I send a message that requires a tool call and needs a longer answer, the agent’s final output seems to contain some weird duplication in the middle of the output. So it shows something like this (I will use numbers instead of text just to explain the issue easily):

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142

As you can see, somewhere in the middle of the text, the output just gets repeated from the beginning. When I was looking into it, I thought maybe it starts off normally by emitting AIMessageChunks, and then suddenly emits the full AIMessage instead of continuing the token-by-token streaming. When I tried to check this by printing each token and its type, it turns out that it first emits normal AIMessageChunks for each token (so one AIMessageChunk for each token up to token 95), then suddenly it emits an AIMessageChunk containing all the accumulated tokens so far (so one AIMessageChunk where the content contains the tokens from 1 to 95), the it goes back to normal streaming with one AIMessageChunks for each token until the end of the message.

I have been trying everything to fix this, including trying to set the parameter stream_subgraphs=True, but nothing seems to work. Also, as I mentioned, this issue does not happen every time so it is difficult to reproduce to pinpoint which change in the code causes it. I am not sure if I am doing something wrong, or if this is some kind of LangChain bug.

Here is my code, which was more complex as it contained memory management, more middleware, etc., but I simplified it to the basic elements of creating an agent to try find the issue but it still occurs:

Package versions:

langchain==1.0.2
langchain-azure-ai==1.0.1
langchain-community==0.4.1
langchain-core==1.0.4
langchain-openai==1.0.1
langgraph==1.0.1
langgraph-checkpoint==2.1.1
import os
from dotenv import load_dotenv
from dataclasses import dataclass
from langchain.tools import tool, ToolRuntime
from langgraph.checkpoint.memory import InMemorySaver
from langchain.agents import create_agent
from langchain.agents.middleware import dynamic_prompt, ModelRequest, TodoListMiddleware
from langchain_core.messages import HumanMessage, ToolMessage, AIMessage, SystemMessage, BaseMessage, AIMessageChunk
from langchain_azure_ai.chat_models import AzureAIChatCompletionsModel

from qdrant_client import models
from qdrant_client import QdrantClient
from qdrant_client.hybrid.fusion import distribution_based_score_fusion
from fastembed import SparseTextEmbedding 
from sentence_transformers import SentenceTransformer



# Initialize LLM for generation
llm = AzureAIChatCompletionsModel(
	endpoint="endpoint",
	credential="key",
	model="gpt-oss-120b",
	max_tokens=2048,
	temperature=0.0,
	verbose=False,
	client_kwargs={"logging_enable": False},
	model_kwargs={"reasoning_effort": "high"},
)

# Initialize vector DB
qdrant_client = QdrantClient(path="path/to/vectordb")

# Initialize dense and sparse embedding models
dense_embedding_model = SentenceTransformer("Alibaba-NLP/gte-modernbert-base")
sparse_embedding_model = SparseTextEmbedding("Qdrant/bm25")


@dataclass
class Context:
	user_name: str
	language: str


@tool
def retrieve(
	query: str, 
	runtime: ToolRuntime[Context],
) -> str: 
	"""
	Use this tool to retrieve the passages relevant to the given query.

	Args:
		query: a short query string describing what you are searching for

	Returns:
		str: the relevant passages
	"""
	if query == "":
		# No query -> no retrieval
		return "No query given."

	# Dense retrieval
	dense_results = qdrant_client.query_points(
		collection_name="collection",
		query=dense_embedding_model.encode_query(query, normalize_embeddings=True),
		using="dense",
		limit=3000,
		with_payload=True,
	)
	dense_results = dense_results.points

	# Copy of IDs and scores since `dense_results` get updated after dbsf + score conversion
	dense_ids_and_scores = [(doc.id, 1 - (2 - (doc.score*2))) for doc in dense_results[:101]] 

	# Sparse retireval
	sparse_results = qdrant_client.query_points(
		collection_name="collection",
		query=models.SparseVector(**next(sparse_embedding_model.embed(query)).as_object()),
		using="sparse",
		limit=3000,
		with_payload=True,
	)
	sparse_results = sparse_results.points

	dense_and_sparse_results = [dense_results, sparse_results]

	# Hybrid search fusion
	hybrid_dbsf_results = distribution_based_score_fusion(dense_and_sparse_results, limit=3)

	# Store retrieved docs as (doc, dense_score)
	retrieved_docs = []
	for doc in hybrid_dbsf_results:
		for dense_id, dense_score in dense_ids_and_scores:
			if doc.id == dense_id:
				retrieved_docs.append((doc, dense_score))

	context = "Here are the relevant passages (only use them if they are relevant to the user's question): \n\n"
	for i, (doc, score) in enumerate(retrieved_docs):
		title = doc.payload["metadata"].get("title", "Title N/A")
		url = doc.payload["metadata"].get("url", "URL not available")
		if isinstance(url, str) and runtime.context.language != "ar": 
			url = url.replace("/en/", f"/{runtime.context.language}/", 1)

		context += "-"*100
		context += f"\nPassage {i+1}:\nPage title: {title}\nPage URL: {url}\nRelevance score: {score}\n"
		context += doc.payload["page_content"] + "\n\n\n"

	return context


@dynamic_prompt
def dynamic_context_system_prompt(request: ModelRequest) -> str:
	user_name = request.runtime.context.user_name

	system_prompt = f"""
*Big system prompt describing behaviour and tool use*
	"""

	return system_prompt


if __name__ == "__main__":
	rag_agent = create_agent(
		model=llm,
		tools=[retrieve],
		middleware=[
			dynamic_context_system_prompt, 
			TodoListMiddleware(system_prompt="", tool_description=None), 
			],
		context_schema=Context,
		checkpointer=InMemorySaver(),
	)

	# Runtime context for the agent
	context = {
		"user_name": "Adam", 
		"language": "en",
	}

	thread_id = 123

	print("\nStarting interactive demo. Type 'quit' to exit.\n")

	while True:
		user_input = input("\nUser: ").strip()
		if user_input.lower() in ("quit", "exit"):
			print("Exiting demo.")
			break

		user_msg = HumanMessage(content=user_input)
		state = {"messages": [user_msg]}

		print("\n--- STREAM OUTPUT ---")
		for token, metadata in rag_agent.stream(input=state, config={"configurable": {"thread_id": thread_id}}, context=context, stream_mode="messages"):
			node_name = metadata.get("langgraph_node", "")
			token_content = getattr(token, "content", "")
			
			if node_name == "model" and token_content:
				print(token_content, end="", flush=True)
				# print(type(token), end=" ", flush=True)

		print()  # print newline after agent response

I don’t think this is an issue with the printing in the terminal, because I also have a version which is integrated in a FastAPI backend and a TypeScript frontend, and the same issue happens there. The weird thing is that when I had a naive RAG version of the application which did not use the agent architecture, everything works perfectly and I never had this issue.

If anyone is able to help that would be much appreciated!

hi @adam

IMHO this is expected given how messages-mode streaming works in LangGraph and how some providers emit chunks. In messages mode, LangGraph emits:

  • Token chunks as they arrive (AIMessageChunk)
  • A final full message at the end of an LLM call
  • Any BaseMessage returned by nodes on chain end

Some providers also can send “snapshot” chunks mid-stream (the entire content-so-far). If you print every chunk verbatim, you’ll see repetition. The robust fix is to treat each chunk as a snapshot and print only the new suffix you haven’t printed yet. You can also filter out final/non-delta emissions.

Try to print only the new suffix.

Thank you for the explanation Pawel!

If I understand correctly, this is then normal behaviour and the expectation is to just handle it client side? But then my question would be why does this issue not happen when using messages-mode streaming in a naive RAG approach without using create_agent()?

Hi @adam

Have you tested both, create_agent and naive RAG, with the same llm provider?

I’ll try to reproduce that issue today, but with OpenAI provider. Let’s see.

Hi Pawel,

Yes, I have tested it with the same LLM provider, the AzureAIChatCompletionsModel with gpt-oss-120b, for both cases.

I hope you would be able to reproduce it, since it doesn’t seem to happen every time. You might have to try a few times with a few different inputs that would require the use of a tool to see it happen.

Thank you for being willing to try reproduce it!

hi @adam

I’ve already tried this code (essentially the one that you provided with tiny modifications like middlewares disabled) with OpenAI as the provider:

import os

from dotenv import load_dotenv
from dataclasses import dataclass
from langchain.tools import tool, ToolRuntime
from langgraph.checkpoint.memory import InMemorySaver
from langchain.agents import create_agent
from langchain.agents.middleware import dynamic_prompt, ModelRequest, TodoListMiddleware
from langchain_core.messages import HumanMessage, ToolMessage, AIMessage, SystemMessage, BaseMessage, AIMessageChunk
from langchain_openai import ChatOpenAI
from qdrant_client import models
from qdrant_client import QdrantClient
from qdrant_client.hybrid.fusion import distribution_based_score_fusion
from fastembed import SparseTextEmbedding 
from sentence_transformers import SentenceTransformer

load_dotenv()

# Initialize LLM for generation
llm = ChatOpenAI(
    model="gpt-5",
    max_tokens=2048,
    temperature=0.0,
    verbose=False,
    reasoning={"effort": "high"}
)

# Initialize vector DB
qdrant_client = QdrantClient(path="./src/rag_agent_openai")

# Initialize dense and sparse embedding models
dense_embedding_model = SentenceTransformer("Alibaba-NLP/gte-modernbert-base")
sparse_embedding_model = SparseTextEmbedding("Qdrant/bm25")

@dataclass
class Context:
    user_name: str
    language: str

@tool
def retrieve(
    query: str,
    runtime: ToolRuntime[Context],
) -> str: 
    """
    Use this tool to retrieve the passages relevant to the given query.
    Args:
        query: a short query string describing what you are searching for
    Returns:
        str: the relevant passages
    """
    if query == "":
        # No query -> no retrieval
        return "No query given."
    # Dense retrieval
    dense_results = qdrant_client.query_points(
        collection_name="collection",
        query=dense_embedding_model.encode_query(query, normalize_embeddings=True),
        using="dense",
        limit=3000,
        with_payload=True,
    )
    dense_results = dense_results.points
    # Copy of IDs and scores since `dense_results` get updated after dbsf + score conversion
    dense_ids_and_scores = [(doc.id, 1 - (2 - (doc.score*2))) for doc in dense_results[:101]]
    # Sparse retireval
    sparse_results = qdrant_client.query_points(
        collection_name="collection",
        query=models.SparseVector(**next(sparse_embedding_model.embed(query)).as_object()),
        using="sparse",
        limit=3000,
        with_payload=True,
    )
    sparse_results = sparse_results.points
    dense_and_sparse_results = [dense_results, sparse_results]
    # Hybrid search fusion
    hybrid_dbsf_results = distribution_based_score_fusion(dense_and_sparse_results, limit=3)
    # Store retrieved docs as (doc, dense_score)
    retrieved_docs = []
    for doc in hybrid_dbsf_results:
        for dense_id, dense_score in dense_ids_and_scores:
            if doc.id == dense_id:
                retrieved_docs.append((doc, dense_score))
    context = "Here are the relevant passages (only use them if they are relevant to the user's question): \n\n"
    for i, (doc, score) in enumerate(retrieved_docs):
        title = doc.payload["metadata"].get("title", "Title N/A")
        url = doc.payload["metadata"].get("url", "URL not available")
        if isinstance(url, str) and runtime.context.language != "ar":
            url = url.replace("/en/", f"/{runtime.context.language}/", 1)
        context += "-"*100
        context += f"\nPassage {i+1}:\nPage title: {title}\nPage URL: {url}\nRelevance score: {score}\n"
        context += doc.payload["page_content"] + "\n\n\n"
    return context

@dynamic_prompt
def dynamic_context_system_prompt(request: ModelRequest) -> str:
    user_name = request.runtime.context.user_name
    system_prompt = f"""
*Big system prompt describing behaviour and tool use*
    """
    return system_prompt

if __name__ == "__main__":
    rag_agent = create_agent(
        model=llm,
        tools=[retrieve],
        middleware=[
            # dynamic_context_system_prompt,
            # TodoListMiddleware(system_prompt="", tool_description=None),
        ],
        context_schema=Context,
        checkpointer=InMemorySaver(),
    )
    # Runtime context for the agent
    context = {
        "user_name": "Adam",
        "language": "en",
    }
    thread_id = 123
    print("\nStarting interactive demo. Type 'quit' to exit.\n")
    while True:
        user_input = input("\nUser: ").strip()
        if user_input.lower() in ("quit", "exit"):
            print("Exiting demo.")
            break
        user_msg = HumanMessage(content=user_input)
        state = {"messages": [user_msg]}
        other_tokens = []
        print("\n--- STREAM OUTPUT ---")
        for token, metadata in rag_agent.stream(input=state, config={"configurable": {"thread_id": thread_id}}, context=context, stream_mode="messages"):
            node_name = metadata.get("langgraph_node", "")
            token_content = getattr(token, "content", "")

            if node_name == "model":
                # print(token_content, end="", flush=True)
                for content in token_content:
                    if content["type"] == "text" and "text" in content:
                        print(content["text"], end="", flush=True)
                    else:
                        other_tokens.append(content)
                # print(type(token), end=" ", flush=True)
        print()  # print newline after agent response

        if len(other_tokens):
            print("\n--- OTHER TOKENS ---\n\n")
            for other_token in other_tokens:
                print(other_token, end="\n\n\n", flush=True)

And I haven’t faced any issues so far. I might test it with Azure later on.

If you have any further info or inputs, please let me know.

Hi Pawel,

This is great to know, maybe the issue comes from Azure then. I will try look into it further and will let you know if I find out something new.

Thank you so much for the support!

1 Like