here is the example with langchain-mcp-adapters:
"""
Demo: MCP tools + langchain-mcp-adapters + create_agent + return_direct + PostgresSaver
What this script shows:
- How to expose local Python functions as MCP tools (via FastMCP) over stdio.
- How to load those MCP tools into LangChain/LangGraph using the official adapter:
https://github.com/langchain-ai/langchain-mcp-adapters
- How to *modify the loaded MCP tools* to behave like return-direct tools by setting
`tool.return_direct = True` (LangChain uses `return_direct`, not `return_directly`).
- How to run `create_agent` with an OpenAI model and a PostgreSQL checkpointer
(AsyncPostgresSaver) so runs are persisted by `thread_id`.
Prereqs (typical):
pip install -U python-dotenv mcp langchain-mcp-adapters langgraph-checkpoint-postgres "langchain[openai]"
Environment (.env):
OPENAI_API_KEY=...
POSTGRES_URI=postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable
Run:
python src/mcp_tools_return_direct_create_agent_openai_postgres_demo.py
Run only the MCP server (stdio):
python src/mcp_tools_return_direct_create_agent_openai_postgres_demo.py --mcp-server
"""
from __future__ import annotations
import argparse
import asyncio
import os
import sys
import uuid
from typing import Iterable, Sequence
from dotenv import load_dotenv
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--mcp-server",
action="store_true",
help="Run the local FastMCP server over stdio (used by the client demo).",
)
parser.add_argument(
"--rows",
type=int,
default=50,
help="How many rows to request from the 'big_report' tool in the client demo.",
)
return parser.parse_args()
def _set_return_direct(tools: Sequence[object], tool_names: set[str]) -> list[object]:
"""
Mutate/copy tools to set `return_direct=True` for selected tool names.
`create_agent` checks `tool.return_direct` when deciding whether it can route
tool execution directly to END (skipping the next model step).
"""
updated: list[object] = []
for t in tools:
name = getattr(t, "name", None)
if isinstance(name, str) and name in tool_names:
# Prefer in-place assignment (often works for BaseTool).
try:
setattr(t, "return_direct", True)
updated.append(t)
continue
except Exception:
pass
# Fallback for pydantic-ish tools that are “frozen”:
# BaseTool is typically a pydantic model and supports model_copy(update=...).
model_copy = getattr(t, "model_copy", None)
if callable(model_copy):
updated.append(model_copy(update={"return_direct": True}))
continue
# Last resort: keep it unchanged (and rely on routing/interrupts instead).
updated.append(t)
else:
updated.append(t)
return updated
async def run_demo_client(*, rows: int) -> None:
"""
Client-side demo:
- starts a local MCP server (this same file) via stdio
- loads MCP tools via MultiServerMCPClient/get_tools()
- sets return_direct=True on a chosen tool
- runs create_agent with AsyncPostgresSaver
"""
load_dotenv()
POSTGRES_URI = os.environ.get("POSTGRES_URI")
if not POSTGRES_URI:
raise RuntimeError(
"Missing POSTGRES_URI. Add it to your .env (see file header)."
)
# --- MCP tools via official adapter (Python) ------------------------------
# Source: https://github.com/langchain-ai/langchain-mcp-adapters (README)
from langchain_mcp_adapters.client import MultiServerMCPClient
servers: dict[str, dict] = {
"demo": {
"transport": "stdio",
"command": sys.executable,
"args": [os.path.abspath(__file__), "--mcp-server"],
}
}
client = MultiServerMCPClient(servers)
tools = await client.get_tools()
print("Loaded MCP tools:")
for t in tools:
print(f"- {getattr(t, 'name', '<unknown>')} (return_direct={getattr(t, 'return_direct', False)})")
# --- Modify MCP tools: set return_direct=True ----------------------------
# IMPORTANT: LangChain's tool flag is `return_direct` (not return_directly).
# With create_agent, tools marked return_direct can short-circuit to END.
tools = _set_return_direct(tools, {"big_report"})
print("\nAfter setting return_direct on selected tools:")
for t in tools:
if getattr(t, "name", None) == "big_report":
print(f"- {t.name}: return_direct={getattr(t, 'return_direct', False)}")
# --- Checkpointer (Postgres) --------------------------------------------
# Docs: https://docs.langchain.com/oss/python/langgraph/add-memory
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
async with AsyncPostgresSaver.from_conn_string(POSTGRES_URI) as checkpointer:
# Create the schema on first run (safe to call repeatedly in most setups).
await checkpointer.setup()
# --- Agent -----------------------------------------------------------
from langchain.agents import create_agent
system_prompt = (
"You are a helpful assistant.\n"
"When the user asks for a report, call the 'big_report' tool.\n"
"The 'big_report' tool is configured to return_direct, so after the tool executes, "
"the run should end immediately (no extra commentary).\n"
)
agent = create_agent(
model="openai:gpt-4.1",
tools=tools,
system_prompt=system_prompt,
checkpointer=checkpointer,
)
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
user_msg = (
f"Generate a report with {rows} rows. "
"Return the report contents as-is."
)
result = await agent.ainvoke(
{"messages": [{"role": "user", "content": user_msg}]},
config=config,
)
# --- What you get back ---------------------------------------------------
# create_agent returns a dict with a 'messages' list.
messages = result["messages"]
print("\nFinal messages (last 3):")
for m in messages[-3:]:
# pretty_print() exists on LangChain messages; fall back to repr.
pretty = getattr(m, "pretty_print", None)
if callable(pretty):
pretty()
else:
print(repr(m))
# If return_direct worked, the last message is commonly a ToolMessage from big_report.
try:
from langchain.messages import ToolMessage
except Exception:
ToolMessage = None # type: ignore
if ToolMessage is not None:
tool_messages = [m for m in messages if isinstance(m, ToolMessage)]
if tool_messages:
last_tool = tool_messages[-1]
print("\nLast ToolMessage summary:")
print(f"- name: {last_tool.name}")
print(f"- content length: {len(str(last_tool.content))}")
if getattr(last_tool, "artifact", None):
print(f"- artifact keys: {list(last_tool.artifact.keys())}")
def run_demo_mcp_server() -> None:
"""
A minimal MCP server exposing two tools:
- big_report: returns a “large-ish” text payload
- add: simple arithmetic tool
"""
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("DemoMCP")
@mcp.tool()
def add(a: int, b: int) -> int:
"""Add two integers."""
return a + b
@mcp.tool()
def big_report(rows: int = 50) -> str:
"""Return a multi-line report. This simulates a large tool output."""
rows = max(1, min(rows, 500))
lines = ["id,value"]
for i in range(1, rows + 1):
lines.append(f"{i},{i*i}")
return "\n".join(lines)
mcp.run(transport="stdio")
def main() -> None:
args = _parse_args()
# IMPORTANT:
# `FastMCP.run()` spins up its own event loop (via AnyIO). If we call it from inside
# `asyncio.run(...)`, we get:
# RuntimeError: Already running asyncio in this thread
# So we keep the server path fully synchronous.
if args.mcp_server:
run_demo_mcp_server()
return
asyncio.run(run_demo_client(rows=args.rows))
if __name__ == "__main__":
main()