Hi @Petar
What about this? Let me know whether it make sense for you.
How to reduce double agent calls (LangGraph) and lower latency
TL;DR
- Avoid ReAct loops for RAG. Build an explicit graph: router (optional) → retrieve → synth. That yields a single LLM call in the common path.
- Short‑circuit with control flow (conditional edges or Command.goto) so you never do a second LLM call.
- Add node-level caching (LangGraph CachePolicy) + LLM cache (LangChain cache) + retrieval cache (TTL keyed by query + filters). Consider embedding cache on ingestion side.
- Run cheap steps in parallel (e.g., retrieval + insights) and defer/merge before a single synth call. Stream only the synth tokens to the UI.
Why you see two LLM calls
create_react_agent(...) follows a tool-calling loop: agent → tool → agent. For RAG, that means (1) a planning/tool-selection LLM call and (2) a final synthesis call. If you already know you want retrieval, you can remove the first LLM call by controlling the graph yourself.
Relevant docs: Graph API (custom control flow, caching), prebuilt agents (ReAct), streaming, persistence.
- Graph API: Add node caching, conditional edges, parallelism, Command-based routing
- Prebuilt agent quickstart: ReAct adds an extra LLM step
- Streaming: stream tokens from a single synth call
Recommended graph shape (single LLM call)
- router (optional): small/fast model or heuristic deciding whether to use RAG
- retrieve: call your
qdrant_pdf_form_retriever tool; update state with {retrieved_documents, messages: [ToolMessage(...)]}
- synth: one LLM call that uses retrieved context and must output citations including
page_label
# pip install -U langgraph "langchain[openai]" # or provider you use
from typing_extensions import TypedDict, Annotated
from typing import List, Dict, Any
import operator
from langchain.chat_models import init_chat_model
from langchain_core.messages import AnyMessage, HumanMessage, AIMessage, SystemMessage
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph.graph.message import add_messages
from langgraph.types import CachePolicy, Command
from langgraph.cache.memory import InMemoryCache # or SqliteCache
# ----- State -----
class State(TypedDict):
messages: Annotated[List[AnyMessage], add_messages]
query: str
retrieved_documents: Annotated[List[Dict[str, Any]], operator.add]
should_rag: bool
# ----- Optional router (cheap) -----
# Heuristic example: do RAG unless the query is very short/simple
# Replace with a small model if you need semantic routing.
def router(state: State):
q = state.get("query", "").strip()
do_rag = not (len(q) < 6 or q.lower() in {"hi", "hello", "thanks"})
return {"should_rag": do_rag}
# ----- Retrieval -----
# If you already have an @tool for qdrant, you can call it directly here.
# This node should populate retrieved_documents and (optionally) a ToolMessage.
def retrieve(state: State):
query = state["query"]
# Example: YOUR existing tool invocation (pseudo)
# results = await qdrant_pdf_form_retriever(query=query, ...)
# Here we simulate one doc with the fields you guaranteed.
results = [
{
"text": "...doc text...",
"score": 0.82,
"chunk_id": 12,
"page": 4,
"page_label": "4",
}
]
return {
"retrieved_documents": results,
# If you keep messages, you can add a short summary message for transparency
"messages": [("tool", f"Retrieved {len(results)} passages for query: {query}")]
}
# ----- Synthesis (single LLM call) -----
MODEL = init_chat_model("openai:gpt-4.1-mini", temperature=0) # or your default
def synth(state: State):
docs = state.get("retrieved_documents", [])
content_blocks = []
for d in docs:
content_blocks.append(
f"[page_label={d.get('page_label')}] {d.get('text','')}"
)
system = SystemMessage(
content=(
"You are a RAG assistant. Answer strictly from the provided context. "
"Always include page_label next to any quote or citation."
)
)
user = HumanMessage(content=state["query"])
context = HumanMessage(content="\n\n".join(content_blocks) or "No context")
answer = MODEL.invoke([system, context, user])
return {"messages": [answer]}
# ----- Build graph -----
b = StateGraph(State)
b.add_node("router", router)
b.add_node("retrieve", retrieve, cache_policy=CachePolicy(ttl=120))
# Cache synth only if your prompts + docs are stable; otherwise prefer retrieval-only cache
b.add_node("synth", synth, cache_policy=CachePolicy(ttl=60))
b.add_edge(START, "router")
# Conditional: if router says no RAG, go straight to synth with empty context
from typing import Literal
def route_after_router(state: State) -> Literal["retrieve", "synth"]:
return "retrieve" if state.get("should_rag", True) else "synth"
b.add_conditional_edges("router", route_after_router, ["retrieve", "synth"])
b.add_edge("retrieve", "synth")
b.add_edge("synth", END)
graph = b.compile(cache=InMemoryCache()) # swap to SqliteCache() for persistence
# ----- Usage -----
inputs = {
"messages": [
("user", "Find the filing deadlines and cite the page labels."),
],
"query": "Find the filing deadlines and cite the page labels.",
}
result = graph.invoke(inputs)
# The last message is your answer, produced by a single LLM call
Notes
- The graph never loops agent→tool→agent. Retrieval is a normal node; synthesis is a single LLM call.
CachePolicy adds per-node caching with TTL; the compile(cache=...) enables node caching.
- Keep
messages if you stream progress to the UI; otherwise you can remove message history.
Short-circuiting and control flow
- If retrieval is not needed, route directly to
synth (as above).
- You can also short‑circuit with
Command from a node, e.g., return Command(update=..., goto="synth") to skip downstream nodes.
- Set a low recursion limit if you ever introduce loops; for this linear graph, recursion is unnecessary.
Parallelism and “cancel the loser”
- You can fan out cheap steps (e.g.,
retrieve and get_document_insights) in parallel and then fan in before synth. Use a deferred aggregator or route to synth once required state is present.
- LangGraph executes parallel branches per superstep. There isn’t preemptive cancellation of sibling nodes at the framework level; instead, design the aggregator to ignore late/optional results or use timeouts inside nodes. See parallel execution and
defer=True for fan-in control.
Caching on LangGraph Platform
Consider three layers:
- LLM response cache (LangChain): set a global cache so identical prompts+inputs return instantly across processes.
- Retrieval cache (LangGraph node cache): use
CachePolicy(ttl=...) on the retrieve node and compile with cache=.... Key includes node input and config; add your filters (user_id, form_id, embedding_dim) to the state so they’re part of the cache key.
- Embedding cache: prefer caching at ingestion or use deterministic IDs to avoid recomputing; for on-the-fly embeddings, wrap your embedder with a small KV cache (e.g., Redis) keyed by text hash.
On Platform, InMemoryCache is per-process. Use SqliteCache for simple persistence or connect LangChain’s LLM cache to a shared store (e.g., Redis). Retrieval results are excellent candidates for node-level TTL caches; keep LLM caches shorter and scoped, since prompts often change.
Should you rewrite the query first?
- If retrieval quality is sensitive to messy queries, use a conditional normalizer:
- Default: skip rewrite and retrieve immediately.
- If top-1 score < threshold or no results, then run a fast rewrite (small model) and retry retrieval.
- This keeps the common path at one LLM call while preserving quality when needed.
from langchain.chat_models import init_chat_model
FAST = init_chat_model("openai:gpt-4o-mini", temperature=0)
def maybe_rewrite(state: State):
# Only rewrite when needed (e.g., detected low recall previously)
q = state["query"]
rewritten = FAST.invoke([
("system", "Rewrite the query to maximize vector recall; keep meaning."),
("user", q),
])
return {"query": rewritten.content}
Wire this into the graph after you detect low recall, not on the happy path.