Hey everyone,
I am currently working on creating a RAG agent using the create_agent() method and a retrieval tool. I have already implemented everything and tried streaming it using stream_mode=”values”, and that works perfectly fine. For my application, I need to stream it token-by-token using stream_mode=”messages”, but I am running into a very weird error which only happens sometimes and is therefore difficult to reproduce.
Basically, when I send a message that would have a short answer and doesn’t require a tool call, it works fine. But sometimes when I send a message that requires a tool call and needs a longer answer, the agent’s final output seems to contain some weird duplication in the middle of the output. So it shows something like this (I will use numbers instead of text just to explain the issue easily):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
As you can see, somewhere in the middle of the text, the output just gets repeated from the beginning. When I was looking into it, I thought maybe it starts off normally by emitting AIMessageChunks, and then suddenly emits the full AIMessage instead of continuing the token-by-token streaming. When I tried to check this by printing each token and its type, it turns out that it first emits normal AIMessageChunks for each token (so one AIMessageChunk for each token up to token 95), then suddenly it emits an AIMessageChunk containing all the accumulated tokens so far (so one AIMessageChunk where the content contains the tokens from 1 to 95), the it goes back to normal streaming with one AIMessageChunks for each token until the end of the message.
I have been trying everything to fix this, including trying to set the parameter stream_subgraphs=True, but nothing seems to work. Also, as I mentioned, this issue does not happen every time so it is difficult to reproduce to pinpoint which change in the code causes it. I am not sure if I am doing something wrong, or if this is some kind of LangChain bug.
Here is my code, which was more complex as it contained memory management, more middleware, etc., but I simplified it to the basic elements of creating an agent to try find the issue but it still occurs:
Package versions:
langchain==1.0.2
langchain-azure-ai==1.0.1
langchain-community==0.4.1
langchain-core==1.0.4
langchain-openai==1.0.1
langgraph==1.0.1
langgraph-checkpoint==2.1.1
import os
from dotenv import load_dotenv
from dataclasses import dataclass
from langchain.tools import tool, ToolRuntime
from langgraph.checkpoint.memory import InMemorySaver
from langchain.agents import create_agent
from langchain.agents.middleware import dynamic_prompt, ModelRequest, TodoListMiddleware
from langchain_core.messages import HumanMessage, ToolMessage, AIMessage, SystemMessage, BaseMessage, AIMessageChunk
from langchain_azure_ai.chat_models import AzureAIChatCompletionsModel
from qdrant_client import models
from qdrant_client import QdrantClient
from qdrant_client.hybrid.fusion import distribution_based_score_fusion
from fastembed import SparseTextEmbedding
from sentence_transformers import SentenceTransformer
# Initialize LLM for generation
llm = AzureAIChatCompletionsModel(
endpoint="endpoint",
credential="key",
model="gpt-oss-120b",
max_tokens=2048,
temperature=0.0,
verbose=False,
client_kwargs={"logging_enable": False},
model_kwargs={"reasoning_effort": "high"},
)
# Initialize vector DB
qdrant_client = QdrantClient(path="path/to/vectordb")
# Initialize dense and sparse embedding models
dense_embedding_model = SentenceTransformer("Alibaba-NLP/gte-modernbert-base")
sparse_embedding_model = SparseTextEmbedding("Qdrant/bm25")
@dataclass
class Context:
user_name: str
language: str
@tool
def retrieve(
query: str,
runtime: ToolRuntime[Context],
) -> str:
"""
Use this tool to retrieve the passages relevant to the given query.
Args:
query: a short query string describing what you are searching for
Returns:
str: the relevant passages
"""
if query == "":
# No query -> no retrieval
return "No query given."
# Dense retrieval
dense_results = qdrant_client.query_points(
collection_name="collection",
query=dense_embedding_model.encode_query(query, normalize_embeddings=True),
using="dense",
limit=3000,
with_payload=True,
)
dense_results = dense_results.points
# Copy of IDs and scores since `dense_results` get updated after dbsf + score conversion
dense_ids_and_scores = [(doc.id, 1 - (2 - (doc.score*2))) for doc in dense_results[:101]]
# Sparse retireval
sparse_results = qdrant_client.query_points(
collection_name="collection",
query=models.SparseVector(**next(sparse_embedding_model.embed(query)).as_object()),
using="sparse",
limit=3000,
with_payload=True,
)
sparse_results = sparse_results.points
dense_and_sparse_results = [dense_results, sparse_results]
# Hybrid search fusion
hybrid_dbsf_results = distribution_based_score_fusion(dense_and_sparse_results, limit=3)
# Store retrieved docs as (doc, dense_score)
retrieved_docs = []
for doc in hybrid_dbsf_results:
for dense_id, dense_score in dense_ids_and_scores:
if doc.id == dense_id:
retrieved_docs.append((doc, dense_score))
context = "Here are the relevant passages (only use them if they are relevant to the user's question): \n\n"
for i, (doc, score) in enumerate(retrieved_docs):
title = doc.payload["metadata"].get("title", "Title N/A")
url = doc.payload["metadata"].get("url", "URL not available")
if isinstance(url, str) and runtime.context.language != "ar":
url = url.replace("/en/", f"/{runtime.context.language}/", 1)
context += "-"*100
context += f"\nPassage {i+1}:\nPage title: {title}\nPage URL: {url}\nRelevance score: {score}\n"
context += doc.payload["page_content"] + "\n\n\n"
return context
@dynamic_prompt
def dynamic_context_system_prompt(request: ModelRequest) -> str:
user_name = request.runtime.context.user_name
system_prompt = f"""
*Big system prompt describing behaviour and tool use*
"""
return system_prompt
if __name__ == "__main__":
rag_agent = create_agent(
model=llm,
tools=[retrieve],
middleware=[
dynamic_context_system_prompt,
TodoListMiddleware(system_prompt="", tool_description=None),
],
context_schema=Context,
checkpointer=InMemorySaver(),
)
# Runtime context for the agent
context = {
"user_name": "Adam",
"language": "en",
}
thread_id = 123
print("\nStarting interactive demo. Type 'quit' to exit.\n")
while True:
user_input = input("\nUser: ").strip()
if user_input.lower() in ("quit", "exit"):
print("Exiting demo.")
break
user_msg = HumanMessage(content=user_input)
state = {"messages": [user_msg]}
print("\n--- STREAM OUTPUT ---")
for token, metadata in rag_agent.stream(input=state, config={"configurable": {"thread_id": thread_id}}, context=context, stream_mode="messages"):
node_name = metadata.get("langgraph_node", "")
token_content = getattr(token, "content", "")
if node_name == "model" and token_content:
print(token_content, end="", flush=True)
# print(type(token), end=" ", flush=True)
print() # print newline after agent response
I don’t think this is an issue with the printing in the terminal, because I also have a version which is integrated in a FastAPI backend and a TypeScript frontend, and the same issue happens there. The weird thing is that when I had a naive RAG version of the application which did not use the agent architecture, everything works perfectly and I never had this issue.
If anyone is able to help that would be much appreciated!