import asyncio
import json
from pathlib import Path
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import os
import time
load_dotenv()
def load_mcp_config(config_path: str = "mcp_server_settings.json") -> dict:
"""加载 MCP 服务器配置文件"""
config_file = Path(config_path)
if not config_file.exists():
raise FileNotFoundError(f"配置文件 {config_path} 不存在")
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
return config.get("mcpServers", {})
async def main():
try:
# 加载 MCP 配置
print("正在加载 MCP 配置...")
mcp_servers = load_mcp_config()
print(f"✓ 成功加载 {len(mcp_servers)} 个 MCP 服务器配置")
# 转换配置格式以适配 MultiServerMCPClient
client_config = {}
for server_name, server_config in mcp_servers.items():
server_type = server_config.pop("type")
if server_type == "streamable_http":
client_config[server_name] = {
"url": server_config["url"],
"transport": "streamable_http"
}
elif server_type == "stdio":
client_config[server_name] = {
"command": server_config["command"],
"args": server_config.get("args", []),
"transport": "stdio"
}
elif server_type == "sse":
client_config[server_name] = {
"url": server_config["url"],
"transport": "sse"
}
# 获取 MCP 服务器提供的工具
print("正在连接 MCP 服务器...")
client = MultiServerMCPClient(client_config)
tools = await client.get_tools()
print(f"成功获取 {len(tools)} 个工具:")
for tool in tools:
print(f" - {tool.name}: {tool.description}")
# 使用 DeepSeek 模型
# 使用 ChatOpenAI 模型
print("\n正在初始化模型...")
model = ChatOpenAI(
model="deepseek-chat",
api_key=os.getenv("DEEPSEEK_API_KEY"),
base_url="https://api.deepseek.com",
temperature=0,
timeout=15,
request_timeout=15,
max_retries=2 # 添加重试机制
)
print("✓ 模型初始化成功")
print("\n正在创建 agent...")
model_with_tools = model.bind_tools(tools, parallel_tool_calls=True)
# 创建 ReAct agent
agent = create_react_agent(
model=model_with_tools,
tools=tools,
debug=False # 启用调试模式
)
# 交互式循环
print("Agent 已启动。输入 'quit', 'exit' 或 'q' 退出。")
while True:
try:
user_input = input("\nUser: ")
if user_input.lower() in ["quit", "exit", "q"]:
print("再见!")
break
# 调用 agent
# 使用 astream 替代 ainvoke,可以看到中间步骤
print("\n[执行开始]")
start_time = time.time()
messages = []
step_count = 0
try:
async with asyncio.timeout(90.0):
async for event in agent.astream(
{"messages": [{"role": "user", "content": user_input}]},
stream_mode="updates"
):
step_count += 1
# 只处理 agent 节点的更新
if "agent" in event and "messages" in event["agent"]:
messages = event["agent"]["messages"]
last_msg = messages[-1]
# 简化日志输出
if hasattr(last_msg, 'tool_calls') and last_msg.tool_calls:
print(f"[步骤 {step_count}] 调用 {len(last_msg.tool_calls)} 个工具")
except asyncio.TimeoutError:
print(f"\n[超时] 执行超过 90 秒,共完成 {step_count} 个步骤")
continue
if messages:
elapsed = time.time() - start_time
print(f"\n[完成] 总耗时: {elapsed:.1f}s")
print(f"Assistant: {messages[-1].content}")
except KeyboardInterrupt:
print("\n再见!")
break
except Exception as e:
print(f"\n[错误] {e}")
import traceback
traceback.print_exc()
except Exception as e:
print(f"\n[初始化错误] {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())
My code is shown above. I’m using Amap’s MCP (Multi-Channel Programming) to plan my trip, which requires 23 calls. However, when I use Cherry Studio to call Amap’s MCP, it responds instantly. Are there any optimization solutions for my code?