hi @rajmsn @Galsor
take a look at this example (you can run it, it works in general):
"""
Repro: duplicate user-facing responses on multi-agent handoff with Command.goto.
Goal
----
Show the common "handoff duplication" UX bug:
- orchestrator produces a user-facing message ("I'll help you with that...")
- specialist ALSO produces a user-facing message ("I'll help you with that...")
and the user sees both.
This file also includes a simple `--fix` flag which demonstrates the most common
practical mitigation: **don't render tool-call turns as user-visible messages**.
In LangGraph/LangChain handoffs, the orchestrator's "handoff" message is usually
an AI tool-call turn (often with empty content). Many UIs accidentally render it.
This script uses:
- OpenAI provider (langchain-openai / ChatOpenAI)
- dotenv (.env)
- PostgresSaver (langgraph-checkpoint-postgres) for checkpointing
Env vars (.env)
--------------
OPENAI_API_KEY=...
POSTGRES_URI=postgresql://user:password@localhost:5432/langgraph_checkpoints
OPENAI_MODEL=gpt-4o-mini # optional
Run
---
python3 -m src.repro_duplicate_handoff_goto_openai_postgres
Fixed (hide tool-call turns from the "UI" stream):
python3 -m src.repro_duplicate_handoff_goto_openai_postgres --fix
"""
from __future__ import annotations
import argparse
import os
import uuid
from typing import Annotated, TypedDict
try:
# Optional at runtime (still recommended): pip install python-dotenv
from dotenv import load_dotenv # type: ignore
except ModuleNotFoundError: # pragma: no cover
def load_dotenv() -> None:
return None
try:
from langchain.agents import create_agent
from langchain.tools import ToolRuntime, tool
from langchain_core.messages import AIMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph import END, START, StateGraph
from langgraph.graph.message import add_messages
from langgraph.types import Command
except ModuleNotFoundError as e: # pragma: no cover
raise RuntimeError(
"Missing Python dependencies. Install the minimal set with:\n\n"
"pip3 install -U langchain langgraph langchain-openai "
"langgraph-checkpoint-postgres python-dotenv 'psycopg[binary,pool]'\n\n"
"Or install from this repo:\n\n"
"pip3 install -r requirements.txt"
) from e
load_dotenv()
class State(TypedDict):
messages: Annotated[list, add_messages]
@tool
def handoff_to_specialist(runtime: ToolRuntime) -> Command:
"""
A handoff tool that returns Command.goto to the parent graph.
IMPORTANT: This tool *intentionally* forces the "duplicate response" issue by
overwriting the orchestrator's tool-call AIMessage content to be:
"I'll help you with that..."
This makes the orchestrator's tool-call message user-visible, and the specialist
is also prompted to respond with the same text.
"""
state = runtime.state
messages = list(state["messages"])
# Find the most recent AI message (the one that invoked this tool).
last_ai: AIMessage | None = None
for m in reversed(messages):
if isinstance(m, AIMessage):
last_ai = m
break
if last_ai is None or not getattr(last_ai, "tool_calls", None):
# Fallback: still hand off, but don't try to overwrite anything.
return Command(goto="specialist", graph=Command.PARENT)
# Overwrite the tool-calling AIMessage (same id) so it becomes user-visible.
# The message reducer (`add_messages`) will replace by id.
overwritten = AIMessage(
id=last_ai.id,
content="I'll help you with that...",
tool_calls=last_ai.tool_calls,
name=last_ai.name,
)
# Add a proper ToolMessage so the tool call is "completed" in the shared history.
tool_call_id = runtime.tool_call_id
if tool_call_id is None:
# Extremely defensive: ToolRuntime should provide it.
tool_call_id = "missing-tool-call-id"
tool_msg = ToolMessage(
content="handoff_to_specialist: transferred control to specialist",
tool_call_id=tool_call_id,
name="handoff_to_specialist",
)
# Send the *entire* orchestrator message history up to the parent graph.
# This matches the official handoff pattern and ensures the parent has the
# orchestrator's tool-call message (now overwritten) + the ToolMessage.
handoff_history = [*messages, overwritten, tool_msg]
return Command(
goto="specialist",
graph=Command.PARENT,
update={"messages": handoff_history},
)
def _is_user_visible_ai_message(message: AIMessage, *, hide_tool_calls: bool) -> bool:
content = (message.content or "").strip()
if not content:
return False
if hide_tool_calls and getattr(message, "tool_calls", None):
return False
return True
def _pretty_print_updates(stream, *, fix: bool) -> None:
printed_message_ids: set[str] = set()
for chunk in stream:
namespace: tuple[str, ...] | None = None
update = chunk
if isinstance(chunk, tuple) and len(chunk) == 2 and isinstance(chunk[1], dict):
ns_raw, update = chunk
if isinstance(ns_raw, tuple):
namespace = tuple(str(x) for x in ns_raw)
else:
namespace = (str(ns_raw),)
if not isinstance(update, dict):
continue
for node_name, node_update in update.items():
if not isinstance(node_update, dict):
continue
msgs = node_update.get("messages") or []
# Only show AI messages with non-empty content (what users typically see).
for m in msgs:
if not isinstance(m, AIMessage):
continue
msg_id = getattr(m, "id", None)
if isinstance(msg_id, str) and msg_id in printed_message_ids:
continue
if not _is_user_visible_ai_message(m, hide_tool_calls=fix):
continue
if isinstance(msg_id, str):
printed_message_ids.add(msg_id)
label = node_name
if namespace:
label = "/".join([*namespace, node_name])
print(f"[{label}] AI: {m.content}")
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Show duplicate user-visible responses during a Command.goto handoff, "
"and optionally demonstrate a fix."
)
)
parser.add_argument(
"--fix",
action="store_true",
help=(
"Hide AI tool-call turns from the streamed output (common UI mitigation "
"to prevent duplicate handoff messages)."
),
)
parser.add_argument(
"--model",
default=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
help="OpenAI model name (defaults to env OPENAI_MODEL or gpt-4o-mini).",
)
parser.add_argument(
"--thread-id",
default=os.getenv("THREAD_ID"),
help="LangGraph thread id (defaults to env THREAD_ID or a random id).",
)
parser.add_argument(
"--user-message",
default="Please have the specialist handle this request.",
help="User message to send into the graph.",
)
parser.add_argument(
"--subgraphs",
action="store_true",
help=(
"If set, stream updates from nested subgraphs too. This changes the "
"shape of streamed chunks to (namespace, data)."
),
)
return parser.parse_args()
def main() -> None:
args = _parse_args()
db_uri = os.getenv("POSTGRES_URI")
if not db_uri:
raise RuntimeError(
"Missing POSTGRES_URI.\n\n"
"Put it in .env, e.g.\n"
"POSTGRES_URI=postgresql://user:password@localhost:5432/langgraph_checkpoints"
)
model = ChatOpenAI(model=args.model, temperature=0)
# Build two agents using LangChain v1 `create_agent`.
orchestrator = create_agent(
model=model,
tools=[handoff_to_specialist],
system_prompt=(
"You are the orchestrator.\n"
"You MUST delegate the user's request to the specialist by calling "
"`handoff_to_specialist` immediately.\n"
"Do not answer the user yourself.\n"
),
name="orchestrator",
)
specialist = create_agent(
model=model,
tools=[],
system_prompt=(
"You are the specialist.\n"
"Start your response with exactly: \"I'll help you with that...\"\n"
"Then provide 1-2 short helpful sentences.\n"
),
name="specialist",
)
with PostgresSaver.from_conn_string(db_uri) as checkpointer:
checkpointer.setup()
builder = StateGraph(State)
builder.add_node("orchestrator", orchestrator)
builder.add_node("specialist", specialist)
builder.add_edge(START, "orchestrator")
# Default path if no handoff occurs.
builder.add_edge("orchestrator", END)
builder.add_edge("specialist", END)
graph = builder.compile(checkpointer=checkpointer)
thread_id = args.thread_id or f"repro-dup-goto-{uuid.uuid4().hex[:8]}"
config = {"configurable": {"thread_id": thread_id}}
user_input = {
"messages": [
{
"role": "user",
"content": args.user_message,
}
]
}
print("\n--- Streaming (what a UI might show) ---")
print(f"(thread_id={thread_id}, fix={args.fix}, subgraphs={args.subgraphs})\n")
_pretty_print_updates(
graph.stream(
user_input,
config=config,
stream_mode="updates",
subgraphs=args.subgraphs,
),
fix=args.fix,
)
print("\n--- Final user-visible messages ---\n")
# IMPORTANT: don't run the graph again. `stream()` already executed the run and
# persisted the final checkpoint. Calling `invoke()` here would append another
# full turn and make it look like there are "extra duplicates".
final_state = graph.get_state(config).values
printed_message_ids: set[str] = set()
for m in final_state["messages"]:
if not isinstance(m, AIMessage):
continue
msg_id = getattr(m, "id", None)
if isinstance(msg_id, str) and msg_id in printed_message_ids:
continue
if not _is_user_visible_ai_message(m, hide_tool_calls=args.fix):
continue
if isinstance(msg_id, str):
printed_message_ids.add(msg_id)
print(f"- {m.name or 'assistant'}: {m.content}")
print(
"\nIf you see two identical lines starting with \"I'll help you with that...\", "
"the repro succeeded. If you ran with --fix, you should only see ONE line."
)
if __name__ == "__main__":
main()