Mastra.ai MCP Agent
•Devin
AgentMastra
Mastra.ai Agent 实现 MCP Client 完整教程
Mastra.ai Agent 实现 MCP Client 完整教程
📋 目录
概述
本教程将教你如何在 Mastra.ai 框架中实现一个功能完整的 MCP Client,使你的 Agent 能够:
- ✅ 连接多个 MCP 服务器(类似 Claude Code CLI / Codex)
- ✅ 动态管理和查看挂载的 MCP 服务器
- ✅ 发送请求到 MCP 服务器并处理响应
- ✅ 支持 stdio 和 HTTP(SSE) 两种传输协议
- ✅ 实时监控 MCP 服务器状态
- ✅ 处理工具调用、资源访问和提示模板
什么是 MCP?
Model Context Protocol (MCP) 是由 Anthropic 提出的标准协议,用于 AI Agent 与外部工具和服务的通信。它类似于 "AI 工具的 USB-C 接口"。
Mastra.ai 的 MCP 支持
Mastra.ai 通过 @mastra/mcp 包提供了完整的 MCP 支持:
- MCPClient - 连接到外部 MCP 服务器,获取其工具
- MCPServer - 将 Mastra 工具暴露为 MCP 服务器
核心概念
MCP 传输方式
| 传输方式 | 适用场景 | 示例 |
|---|---|---|
| Stdio | 本地进程(npx, CLI 工具) | npx wikipedia-mcp |
| SSE (Server-Sent Events) | 远程 HTTP 服务器(旧版) | https://mcp.run/profile-url |
| Streamable HTTP | 远程 HTTP 服务器(新版) | https://api.example.com/mcp |
MCPClient 核心方法
class MCPClient { // 连接管理 connect(): Promise<void> disconnect(): Promise<void> // 工具获取 getTools(): Promise<Record<string, Tool>> // 静态获取,用于 Agent 定义 getToolsets(): Promise<Record<string, Toolset>> // 动态获取,用于运行时 // 资源管理 resources.list(): Promise<Record<string, Resource[]>> resources.read(serverName: string, uri: string): Promise<ResourceContent> // 提示模板 prompts.list(): Promise<Record<string, Prompt[]>> prompts.get(serverName: string, name: string): Promise<PromptData> // 服务器管理(自定义扩展) getServerNames(): string[] getServerStatus(serverName: string): ServerStatus }
架构设计
整体架构
┌─────────────────────────────────────────────────────────┐
│ Mastra Agent │
│ ┌──────────────────────────────────────────────┐ │
│ │ Agent with MCPClient │ │
│ │ • instructions │ │
│ │ • model (OpenAI/Anthropic/etc) │ │
│ │ • tools (from MCP servers) │ │
│ └──────────────────────────────────────────────┘ │
└───────────────────────┬─────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ MCPClient (管理层) │
│ │
│ ┌────────────────────────────────────────────┐ │
│ │ Server Registry │ │
│ │ • wikipedia → StdioTransport │ │
│ │ • weather → StreamableHTTPTransport │ │
│ │ • github → SSETransport(fallback) │ │
│ └────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────┐ │
│ │ Tool Aggregator │ │
│ │ • Namespace tools by server │ │
│ │ • wikipedia_search │ │
│ │ • weather_get_forecast │ │
│ │ • github_create_issue │ │
│ └────────────────────────────────────────────┘ │
└───────────────────────┬─────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ MCP Server Connections │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Server 1 │ │ Server 2 │ │ Server 3 │ │
│ │ (Stdio) │ │ (HTTP) │ │ (SSE) │ │
│ │ │ │ │ │ │ │
│ │ ┌────────┐ │ │ ┌────────┐ │ │ ┌────────┐ │ │
│ │ │ Tools │ │ │ │ Tools │ │ │ │ Tools │ │ │
│ │ └────────┘ │ │ └────────┘ │ │ └────────┘ │ │
│ │ ┌────────┐ │ │ ┌────────┐ │ │ ┌────────┐ │ │
│ │ │Resource│ │ │ │Resource│ │ │ │Resource│ │ │
│ │ └────────┘ │ │ └────────┘ │ │ └────────┘ │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────┘
类图
┌─────────────────────────────────────┐
│ MCPClientManager │
├─────────────────────────────────────┤
│ - mcpClient: MCPClient │
│ - serverConfigs: ServerConfig[] │
│ - connectedServers: Set<string> │
├─────────────────────────────────────┤
│ + initialize() │
│ + listServers() │
│ + getServerStatus(name) │
│ + addServer(config) │
│ + removeServer(name) │
│ + notifyServer(name, request) │
│ + receiveResponse(name) │
└─────────────────────────────────────┘
↓ uses
┌─────────────────────────────────────┐
│ MCPClient │
│ (@mastra/mcp) │
├─────────────────────────────────────┤
│ - servers: Map<string, Server> │
│ - timeout: number │
├─────────────────────────────────────┤
│ + connect() │
│ + disconnect() │
│ + getTools() │
│ + getToolsets() │
│ + resources.* │
│ + prompts.* │
└─────────────────────────────────────┘
↓ manages
┌─────────────────────────────────────┐
│ ServerConnection │
├─────────────────────────────────────┤
│ - name: string │
│ - transport: Transport │
│ - status: ConnectionStatus │
│ - tools: Tool[] │
├─────────────────────────────────────┤
│ + send(request) │
│ + receive() │
│ + getStatus() │
└─────────────────────────────────────┘
环境准备
1. 安装依赖
# 创建新项目 mkdir mastra-mcp-agent cd mastra-mcp-agent npm init -y # 安装 Mastra 核心包 npm install @mastra/core @mastra/mcp # 安装 AI SDK npm install @ai-sdk/openai # 或者使用 Anthropic # npm install @ai-sdk/anthropic # 安装 TypeScript npm install -D typescript @types/node tsx # 初始化 TypeScript npx tsc --init
2. 配置环境变量
# .env OPENAI_API_KEY=sk-xxx # 如果使用远程 MCP 服务器 SMITHERY_API_KEY=xxx MCP_RUN_SSE_URL=https://mcp.run/sse/xxx
3. 项目结构
mastra-mcp-agent/
├── src/
│ ├── mcp/
│ │ ├── client.ts # MCPClient 配置
│ │ ├── manager.ts # MCP 管理器
│ │ └── types.ts # 类型定义
│ ├── agents/
│ │ └── mcp-agent.ts # Agent 定义
│ ├── tools/
│ │ └── custom-tools.ts # 自定义工具
│ └── index.ts # 入口文件
├── .env
├── package.json
└── tsconfig.json
基础实现
第一步 MCPClient 配置
// src/mcp/client.ts import { MCPClient } from '@mastra/mcp'; export interface ServerConfig { name: string; type: 'stdio' | 'http' | 'sse'; config: { command?: string; args?: string[]; env?: Record<string, string>; url?: URL; requestInit?: RequestInit; }; timeout?: number; enabled?: boolean; } /** * 创建 MCPClient 实例 */ export function createMCPClient(configs: ServerConfig[]) { // 过滤启用的服务器 const enabledServers = configs.filter((c) => c.enabled !== false); // 构建服务器配置 const servers: Record<string, any> = {}; for (const config of enabledServers) { if (config.type === 'stdio') { servers[config.name] = { command: config.config.command!, args: config.config.args || [], env: config.config.env, timeout: config.timeout, // 可选:添加日志处理器 log: (logMessage: any) => { console.log(`[${config.name}][${logMessage.level}] ${logMessage.message}`); }, }; } else { servers[config.name] = { url: config.config.url!, requestInit: config.config.requestInit, timeout: config.timeout, log: (logMessage: any) => { console.log(`[${config.name}][${logMessage.level}] ${logMessage.message}`); }, }; } } return new MCPClient({ id: 'main-mcp-client', servers, timeout: 60000, // 全局超时 60 秒 }); } /** * 预定义的服务器配置 */ export const defaultServerConfigs: ServerConfig[] = [ { name: 'wikipedia', type: 'stdio', config: { command: 'npx', args: ['-y', '@modelcontextprotocol/server-wikipedia'], }, enabled: true, }, { name: 'filesystem', type: 'stdio', config: { command: 'npx', args: ['-y', '@modelcontextprotocol/server-filesystem', process.cwd()], }, enabled: true, }, { name: 'weather', type: 'http', config: { url: new URL( `https://server.smithery.ai/@smithery-ai/national-weather-service/mcp?api_key=${process.env.SMITHERY_API_KEY}`, ), }, enabled: !!process.env.SMITHERY_API_KEY, }, { name: 'github', type: 'stdio', config: { command: 'npx', args: ['-y', '@modelcontextprotocol/server-github'], env: { GITHUB_TOKEN: process.env.GITHUB_TOKEN || '', }, }, enabled: !!process.env.GITHUB_TOKEN, }, ];
第二步 MCP 管理器
// src/mcp/manager.ts import { MCPClient } from '@mastra/mcp'; import { ServerConfig } from './client'; export interface ServerStatus { name: string; connected: boolean; toolCount: number; resourceCount: number; lastError?: string; metadata?: { transport: 'stdio' | 'http' | 'sse'; uptime?: number; }; } export class MCPClientManager { private mcpClient: MCPClient; private serverConfigs: Map<string, ServerConfig>; private connectionStatus: Map<string, boolean>; private serverTools: Map<string, any[]>; private serverResources: Map<string, any[]>; constructor(configs: ServerConfig[]) { this.serverConfigs = new Map(configs.map((c) => [c.name, c])); this.connectionStatus = new Map(); this.serverTools = new Map(); this.serverResources = new Map(); // 构建服务器配置对象 const servers: Record<string, any> = {}; for (const config of configs.filter((c) => c.enabled !== false)) { servers[config.name] = this.buildServerConfig(config); } this.mcpClient = new MCPClient({ id: 'managed-mcp-client', servers, timeout: 60000, }); } /** * 构建单个服务器配置 */ private buildServerConfig(config: ServerConfig) { const baseConfig: any = { timeout: config.timeout || 60000, log: (logMessage: any) => { console.log(`[${config.name}][${logMessage.level}] ${logMessage.message}`); }, }; if (config.type === 'stdio') { return { ...baseConfig, command: config.config.command!, args: config.config.args || [], env: config.config.env, }; } else { return { ...baseConfig, url: config.config.url!, requestInit: config.config.requestInit, }; } } /** * 初始化:连接所有服务器并加载工具 */ async initialize(): Promise<void> { console.log('🚀 Initializing MCP Client Manager...'); try { // 注意:MCPClient 在调用 getTools() 时会自动连接 // 所以我们直接获取工具即可 const allTools = await this.mcpClient.getTools(); // 按服务器分组工具 for (const [toolName, tool] of Object.entries(allTools)) { const serverName = toolName.split('_')[0]; // 假设格式为 serverName_toolName if (!this.serverTools.has(serverName)) { this.serverTools.set(serverName, []); } this.serverTools.get(serverName)!.push({ name: toolName, tool }); this.connectionStatus.set(serverName, true); } // 加载资源 try { const allResources = await this.mcpClient.resources.list(); for (const [serverName, resources] of Object.entries(allResources)) { this.serverResources.set(serverName, resources); } } catch (error) { console.warn('⚠️ Some servers do not support resources:', error); } console.log('✅ MCP Client Manager initialized successfully'); this.printServerSummary(); } catch (error) { console.error('❌ Failed to initialize MCP Client Manager:', error); throw error; } } /** * 获取所有已连接的服务器名称 */ getConnectedServers(): string[] { return Array.from(this.connectionStatus.entries()) .filter(([_, connected]) => connected) .map(([name]) => name); } /** * 获取服务器状态 */ getServerStatus(serverName: string): ServerStatus | null { const config = this.serverConfigs.get(serverName); if (!config) { return null; } const connected = this.connectionStatus.get(serverName) || false; const tools = this.serverTools.get(serverName) || []; const resources = this.serverResources.get(serverName) || []; return { name: serverName, connected, toolCount: tools.length, resourceCount: resources.length, metadata: { transport: config.type, }, }; } /** * 列出所有服务器及其状态 */ listServers(): ServerStatus[] { return Array.from(this.serverConfigs.keys()) .map((name) => this.getServerStatus(name)) .filter((status): status is ServerStatus => status !== null); } /** * 获取所有工具(用于 Agent) */ async getTools() { return await this.mcpClient.getTools(); } /** * 获取工具集(用于动态使用) */ async getToolsets() { return await this.mcpClient.getToolsets(); } /** * 通知特定服务器(发送请求) * 注意:MCP 协议中工具调用是通过 AI SDK 自动处理的 * 这个方法主要用于直接调用资源或提示模板 */ async notifyServer( serverName: string, request: { type: 'resource' | 'prompt'; data: any; }, ): Promise<any> { if (request.type === 'resource') { return await this.mcpClient.resources.read(serverName, request.data.uri); } else if (request.type === 'prompt') { return await this.mcpClient.prompts.get({ serverName, name: request.data.name, args: request.data.args, }); } throw new Error(`Unknown request type: ${request.type}`); } /** * 断开连接 */ async disconnect(): Promise<void> { console.log('🔌 Disconnecting from all MCP servers...'); await this.mcpClient.disconnect(); this.connectionStatus.clear(); this.serverTools.clear(); this.serverResources.clear(); console.log('✅ Disconnected successfully'); } /** * 打印服务器摘要 */ private printServerSummary(): void { console.log('\n📊 MCP Servers Summary:'); console.log('━'.repeat(60)); for (const status of this.listServers()) { const icon = status.connected ? '✅' : '❌'; console.log(`${icon} ${status.name}`); console.log(` Transport: ${status.metadata?.transport}`); console.log(` Tools: ${status.toolCount}`); console.log(` Resources: ${status.resourceCount}`); console.log(''); } console.log('━'.repeat(60)); } }
第三步 MCP 的 Agent
// src/agents/mcp-agent.ts import { Agent } from '@mastra/core/agent'; import { openai } from '@ai-sdk/openai'; import { MCPClientManager } from '../mcp/manager'; import { defaultServerConfigs } from '../mcp/client'; export class MCPAgent { private manager: MCPClientManager; private agent: Agent | null = null; constructor() { this.manager = new MCPClientManager(defaultServerConfigs); } /** * 初始化 Agent */ async initialize(): Promise<void> { console.log('🤖 Initializing MCP Agent...'); // 初始化 MCP 管理器 await this.manager.initialize(); // 获取所有工具 const tools = await this.manager.getTools(); // 创建 Agent this.agent = new Agent({ name: 'MCP-Powered Assistant', instructions: `You are a helpful AI assistant with access to multiple external tools via MCP. Available capabilities: - Wikipedia search and information retrieval - File system operations (read/write files in current directory) - Weather information (if configured) - GitHub operations (if configured) When using tools: 1. Always explain what tool you're going to use and why 2. Parse the tool results carefully 3. Present information in a clear, user-friendly format 4. If a tool call fails, explain the issue and try an alternative approach`, model: openai('gpt-4o-mini'), tools, // 传入所有 MCP 工具 }); console.log('✅ MCP Agent initialized successfully\n'); } /** * 生成响应 */ async generate(prompt: string): Promise<string> { if (!this.agent) { throw new Error('Agent not initialized. Call initialize() first.'); } console.log(`\n💬 User: ${prompt}\n`); const response = await this.agent.generate(prompt); console.log(`🤖 Agent: ${response.text}\n`); return response.text; } /** * 流式响应 */ async stream(prompt: string): Promise<void> { if (!this.agent) { throw new Error('Agent not initialized. Call initialize() first.'); } console.log(`\n💬 User: ${prompt}\n`); console.log('🤖 Agent: '); const stream = await this.agent.stream(prompt); for await (const chunk of stream.textStream) { process.stdout.write(chunk); } console.log('\n'); } /** * 列出所有 MCP 服务器 */ listServers(): void { const servers = this.manager.listServers(); console.log('\n🔌 Connected MCP Servers:'); console.log('━'.repeat(60)); for (const server of servers) { const status = server.connected ? '🟢 Online' : '🔴 Offline'; console.log(`\n${server.name} - ${status}`); console.log(` Transport: ${server.metadata?.transport}`); console.log(` Tools: ${server.toolCount}`); console.log(` Resources: ${server.resourceCount}`); } console.log('\n' + '━'.repeat(60) + '\n'); } /** * 获取服务器状态 */ getServerStatus(serverName: string): void { const status = this.manager.getServerStatus(serverName); if (!status) { console.log(`❌ Server "${serverName}" not found`); return; } console.log(`\n📊 Server Status: ${serverName}`); console.log('━'.repeat(60)); console.log(`Status: ${status.connected ? '🟢 Connected' : '🔴 Disconnected'}`); console.log(`Transport: ${status.metadata?.transport}`); console.log(`Tools: ${status.toolCount}`); console.log(`Resources: ${status.resourceCount}`); if (status.lastError) { console.log(`Last Error: ${status.lastError}`); } console.log('━'.repeat(60) + '\n'); } /** * 断开连接 */ async disconnect(): Promise<void> { await this.manager.disconnect(); } }
第四步
// src/index.ts import { MCPAgent } from './agents/mcp-agent'; import * as readline from 'readline'; async function main() { // 创建并初始化 Agent const agent = new MCPAgent(); await agent.initialize(); // 显示已连接的服务器 agent.listServers(); // 创建交互式命令行 const rl = readline.createInterface({ input: process.stdin, output: process.stdout, }); console.log('🎯 MCP Agent Ready! Type your question or command:'); console.log(' - Type your question to chat'); console.log(' - /servers - List all MCP servers'); console.log(' - /status <name> - Get server status'); console.log(' - /exit - Quit\n'); const ask = () => { rl.question('You: ', async (input) => { const trimmed = input.trim(); if (!trimmed) { ask(); return; } // 处理命令 if (trimmed === '/exit') { console.log('\n👋 Goodbye!'); await agent.disconnect(); rl.close(); process.exit(0); } if (trimmed === '/servers') { agent.listServers(); ask(); return; } if (trimmed.startsWith('/status ')) { const serverName = trimmed.substring(8); agent.getServerStatus(serverName); ask(); return; } // 处理用户问题 try { await agent.stream(trimmed); } catch (error) { console.error('❌ Error:', error); } ask(); }); }; ask(); } // 运行 main().catch(console.error);
高级功能
动态工具集(Multi-tenant 支持)
// src/agents/dynamic-mcp-agent.ts import { Agent } from '@mastra/core/agent'; import { MCPClient } from '@mastra/mcp'; import { openai } from '@ai-sdk/openai'; export class DynamicMCPAgent { private baseAgent: Agent; constructor() { // 创建不带工具的基础 Agent this.baseAgent = new Agent({ name: 'Dynamic MCP Agent', instructions: 'You are a helpful assistant.', model: openai('gpt-4o-mini'), // 不在这里设置 tools }); } /** * 为特定用户处理请求 */ async handleUserRequest( userPrompt: string, userApiKey: string, userConfig: Record<string, any>, ): Promise<string> { // 为这个用户创建专属的 MCP Client const userMcp = new MCPClient({ servers: { weather: { url: new URL('https://weather-api.example.com/mcp'), requestInit: { headers: { Authorization: `Bearer ${userApiKey}`, 'X-User-Id': userConfig.userId, }, }, }, userDocs: { command: 'npx', args: ['-y', 'user-docs-mcp'], env: { USER_ID: userConfig.userId, API_KEY: userApiKey, }, }, }, }); try { // 动态获取工具集 const toolsets = await userMcp.getToolsets(); // 使用工具集生成响应 const response = await this.baseAgent.generate(userPrompt, { toolsets, // 动态传入工具 }); return response.text; } finally { // 清理连接 await userMcp.disconnect(); } } }
监听资源更新
// src/mcp/resource-monitor.ts import { MCPClient } from '@mastra/mcp'; export class ResourceMonitor { private mcpClient: MCPClient; constructor(mcpClient: MCPClient) { this.mcpClient = mcpClient; } /** * 订阅资源更新 */ async subscribeToResource(serverName: string, uri: string): Promise<void> { // 订阅资源 await this.mcpClient.resources.subscribe(serverName, uri); // 设置更新处理器 await this.mcpClient.resources.onUpdated(serverName, async (params) => { console.log(`📢 Resource updated: ${params.uri}`); // 读取更新后的内容 const content = await this.mcpClient.resources.read(serverName, params.uri); console.log('New content:', content.contents[0]); }); console.log(`✅ Subscribed to ${uri} on ${serverName}`); } /** * 监听资源列表变化 */ async monitorResourceList(serverName: string): Promise<void> { await this.mcpClient.resources.onListChanged(serverName, async () => { console.log(`📢 Resource list changed on ${serverName}`); // 重新获取资源列表 const resources = await this.mcpClient.resources.list(); console.log(`New resources on ${serverName}:`, resources[serverName]); }); console.log(`✅ Monitoring resource list on ${serverName}`); } }
使用提示模板
// src/mcp/prompt-manager.ts import { MCPClient } from '@mastra/mcp'; export class PromptManager { private mcpClient: MCPClient; constructor(mcpClient: MCPClient) { this.mcpClient = mcpClient; } /** * 列出所有可用的提示模板 */ async listPrompts(): Promise<void> { const promptsByServer = await this.mcpClient.prompts.list(); console.log('\n📝 Available Prompts:'); console.log('━'.repeat(60)); for (const [serverName, prompts] of Object.entries(promptsByServer)) { console.log(`\n${serverName}:`); for (const prompt of prompts) { console.log(` - ${prompt.name}: ${prompt.description || 'No description'}`); } } console.log('\n' + '━'.repeat(60) + '\n'); } /** * 使用提示模板 */ async usePrompt( serverName: string, promptName: string, args?: Record<string, any>, ): Promise<any> { const { prompt, messages } = await this.mcpClient.prompts.get({ serverName, name: promptName, args, }); console.log(`\n📝 Using prompt: ${prompt.name}`); console.log('Messages:'); for (const message of messages) { console.log(` ${message.role}: ${JSON.stringify(message.content)}`); } return { prompt, messages }; } }
MCP 服务器管理
CLI 工具实现
// src/cli/mcp-manager-cli.ts import { Command } from 'commander'; import { MCPClientManager } from '../mcp/manager'; import { ServerConfig } from '../mcp/client'; import * as fs from 'fs/promises'; import * as path from 'path'; const CONFIG_FILE = path.join(process.cwd(), '.mcp-servers.json'); export class MCPManagerCLI { private program: Command; constructor() { this.program = new Command(); this.setupCommands(); } private setupCommands(): void { this.program.name('mcp-manager').description('MCP Server Manager CLI').version('1.0.0'); // 列出服务器 this.program .command('list') .description('List all MCP servers') .action(async () => { await this.listServers(); }); // 添加服务器 this.program .command('add') .description('Add a new MCP server') .requiredOption('-n, --name <name>', 'Server name') .requiredOption('-t, --type <type>', 'Server type (stdio|http)') .option('-c, --command <command>', 'Command (for stdio)') .option('-a, --args <args...>', 'Arguments (for stdio)') .option('-u, --url <url>', 'URL (for http)') .action(async (options) => { await this.addServer(options); }); // 移除服务器 this.program .command('remove') .description('Remove an MCP server') .requiredOption('-n, --name <name>', 'Server name') .action(async (options) => { await this.removeServer(options.name); }); // 测试服务器 this.program .command('test') .description('Test MCP server connection') .requiredOption('-n, --name <name>', 'Server name') .action(async (options) => { await this.testServer(options.name); }); // 显示服务器详情 this.program .command('info') .description('Show server details') .requiredOption('-n, --name <name>', 'Server name') .action(async (options) => { await this.showServerInfo(options.name); }); } async run(args: string[]): Promise<void> { await this.program.parseAsync(args); } private async loadConfig(): Promise<ServerConfig[]> { try { const data = await fs.readFile(CONFIG_FILE, 'utf-8'); return JSON.parse(data); } catch (error) { return []; } } private async saveConfig(configs: ServerConfig[]): Promise<void> { await fs.writeFile(CONFIG_FILE, JSON.stringify(configs, null, 2)); } private async listServers(): Promise<void> { const configs = await this.loadConfig(); if (configs.length === 0) { console.log('No MCP servers configured.'); return; } console.log('\n📋 Configured MCP Servers:'); console.log('━'.repeat(60)); for (const config of configs) { const status = config.enabled !== false ? '✅' : '❌'; console.log(`\n${status} ${config.name}`); console.log(` Type: ${config.type}`); if (config.type === 'stdio') { console.log(` Command: ${config.config.command} ${config.config.args?.join(' ') || ''}`); } else { console.log(` URL: ${config.config.url}`); } } console.log('\n' + '━'.repeat(60) + '\n'); } private async addServer(options: any): Promise<void> { const configs = await this.loadConfig(); // 检查是否已存在 if (configs.some((c) => c.name === options.name)) { console.error(`❌ Server "${options.name}" already exists`); return; } const config: ServerConfig = { name: options.name, type: options.type, config: {}, enabled: true, }; if (options.type === 'stdio') { if (!options.command) { console.error('❌ --command is required for stdio servers'); return; } config.config.command = options.command; config.config.args = options.args || []; } else { if (!options.url) { console.error('❌ --url is required for http servers'); return; } config.config.url = new URL(options.url); } configs.push(config); await this.saveConfig(configs); console.log(`✅ Server "${options.name}" added successfully`); } private async removeServer(name: string): Promise<void> { const configs = await this.loadConfig(); const filtered = configs.filter((c) => c.name !== name); if (filtered.length === configs.length) { console.error(`❌ Server "${name}" not found`); return; } await this.saveConfig(filtered); console.log(`✅ Server "${name}" removed successfully`); } private async testServer(name: string): Promise<void> { const configs = await this.loadConfig(); const config = configs.find((c) => c.name === name); if (!config) { console.error(`❌ Server "${name}" not found`); return; } console.log(`🧪 Testing server "${name}"...`); try { const manager = new MCPClientManager([config]); await manager.initialize(); const status = manager.getServerStatus(name); if (status?.connected) { console.log(`✅ Server "${name}" is working correctly`); console.log(` Tools: ${status.toolCount}`); console.log(` Resources: ${status.resourceCount}`); } else { console.error(`❌ Server "${name}" failed to connect`); } await manager.disconnect(); } catch (error) { console.error(`❌ Error testing server:`, error); } } private async showServerInfo(name: string): Promise<void> { const configs = await this.loadConfig(); const config = configs.find((c) => c.name === name); if (!config) { console.error(`❌ Server "${name}" not found`); return; } console.log(`\n📊 Server Info: ${name}`); console.log('━'.repeat(60)); console.log(`Type: ${config.type}`); console.log(`Enabled: ${config.enabled !== false ? 'Yes' : 'No'}`); if (config.type === 'stdio') { console.log(`Command: ${config.config.command}`); console.log(`Args: ${config.config.args?.join(' ') || 'None'}`); if (config.config.env) { console.log(`Env: ${Object.keys(config.config.env).join(', ')}`); } } else { console.log(`URL: ${config.config.url}`); } console.log('━'.repeat(60) + '\n'); } } // 运行 CLI if (require.main === module) { const cli = new MCPManagerCLI(); cli.run(process.argv).catch(console.error); }
实战案例
案例 1
// examples/multi-tool-assistant.ts import { MCPAgent } from '../src/agents/mcp-agent'; async function demo() { const agent = new MCPAgent(); await agent.initialize(); // 示例 1:Wikipedia 搜索 await agent.generate('Tell me about the Model Context Protocol'); // 示例 2:文件操作 await agent.generate('Create a file called test.txt with "Hello MCP!"'); // 示例 3:天气查询(如果配置了) await agent.generate('What is the weather like in San Francisco?'); await agent.disconnect(); } demo();
案例 2
// examples/workflow-automation.ts import { MCPAgent } from '../src/agents/mcp-agent'; async function automatedWorkflow() { const agent = new MCPAgent(); await agent.initialize(); console.log('🚀 Starting automated workflow...\n'); // Step 1: 研究主题 console.log('📚 Step 1: Research'); const research = await agent.generate( 'Search Wikipedia for "TypeScript programming language" and give me a brief summary', ); // Step 2: 保存到文件 console.log('💾 Step 2: Save to file'); await agent.generate( `Create a file called typescript-research.md with the following content:\n\n${research}`, ); // Step 3: 验证 console.log('✅ Step 3: Verify'); await agent.generate('Read the file typescript-research.md and confirm it was saved correctly'); console.log('\n✅ Workflow completed!'); await agent.disconnect(); } automatedWorkflow();
流程图
MCP Client 初始化流程
┌─────────────────────────────────────────┐
│ Start Initialization │
└──────────────────┬──────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Load Server Configurations │
│ • Read from config file │
│ • Parse environment variables │
│ • Validate configuration │
└──────────────────┬──────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Create MCPClient Instance │
│ • Build server map │
│ • Set global timeout │
│ • Configure logging │
└──────────────────┬──────────────────────┘
↓
┌────────┴────────┐
│ For each server │
└────────┬────────┘
↓
┌─────────────────────────┐
│ Determine Transport │
│ • Stdio (command+args) │
│ • HTTP (url) │
└──────────┬──────────────┘
↓
┌───────┴────────┐
│ │
┌───▼───┐ ┌─────▼─────┐
│ Stdio │ │ HTTP │
└───┬───┘ └─────┬─────┘
│ │
↓ ↓
┌──────────┐ ┌───────────┐
│ Spawn │ │ Try │
│ Process │ │ Streamable│
└────┬─────┘ │ HTTP │
│ └─────┬─────┘
│ ↓
│ ┌───────────┐
│ │ Success? │
│ └─────┬─────┘
│ │
│ ┌──────┴──────┐
│ ↓ ↓
│ [Yes] [No]
│ │ ↓
│ │ ┌──────────┐
│ │ │ Fallback │
│ │ │ to SSE │
│ │ └────┬─────┘
│ │ │
└─────────┴───────────┘
↓
┌─────────────────────────────────────────┐
│ Connection Established │
│ • Send initialize request │
│ • Exchange capabilities │
│ • Set up message handlers │
└──────────────────┬──────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Fetch Server Capabilities │
│ • List tools │
│ • List resources │
│ • List prompts │
└──────────────────┬──────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Transform to Mastra Format │
│ • Namespace tools by server │
│ • Register tool schemas │
│ • Store server metadata │
└──────────────────┬──────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Initialization Complete │
│ ✅ All servers connected │
│ ✅ Tools available │
│ ✅ Ready to use │
└─────────────────────────────────────────┘
工具调用流程
┌─────────────────────────────────────────┐
│ User sends prompt │
│ "Search Wikipedia for TypeScript" │
└──────────────────┬──────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Agent receives prompt │
│ • Add to conversation history │
│ • Pass to LLM with tool definitions │
└──────────────────┬──────────────────────┘
↓
┌─────────────────────────────────────────┐
│ LLM generates response │
│ Decision: Use tool or respond? │
└──────────────────┬──────────────────────┘
↓
┌────────┴────────┐
│ Tool needed? │
└────────┬─────────┘
│
┌─────────┴─────────┐
↓ ↓
[Yes] [No]
│ ↓
↓ ┌─────────────────┐
┌──────────────────┐│ Return text │
│ Generate ││ response │
│ tool_call │└─────────────────┘
│ { │
│ name: "wiki...",
│ args: {...} │
│ } │
└────────┬─────────┘
↓
┌─────────────────────────────────────────┐
│ Mastra Agent Executor │
│ • Identify tool by name │
│ • Parse arguments │
│ • Extract server name from namespace │
└──────────────────┬──────────────────────┘
↓
┌─────────────────────────────────────────┐
│ MCPClient routes to server │
│ Tool: "wikipedia_search" │
│ → Server: "wikipedia" │
└──────────────────┬──────────────────────┘
↓
┌────────┴────────┐
│ Transport type? │
└────────┬─────────┘
│
┌─────────┴─────────┐
↓ ↓
┌─────────┐ ┌─────────┐
│ Stdio │ │ HTTP │
└────┬────┘ └────┬────┘
│ │
↓ ↓
┌──────────────────┐ ┌──────────────────┐
│ Send via stdin │ │ POST /mcp/tools │
│ JSON-RPC request │ │ JSON-RPC request │
└────────┬─────────┘ └─────────┬────────┘
│ │
↓ ↓
┌──────────────────┐ ┌──────────────────┐
│ Read from stdout │ │ Parse HTTP resp │
│ Parse response │ │ Extract result │
└────────┬─────────┘ └─────────┬────────┘
│ │
└──────────┬───────────┘
↓
┌─────────────────────────────────────────┐
│ Tool Result Received │
│ { │
│ content: "TypeScript is...", │
│ isError: false │
│ } │
└──────────────────┬──────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Transform to Message Format │
│ • Add tool_result message │
│ • Append to conversation │
└──────────────────┬──────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Send back to LLM │
│ • LLM processes tool result │
│ • Generates final response │
└──────────────────┬──────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Return to User │
│ "TypeScript is a typed superset of │
│ JavaScript that compiles to plain │
│ JavaScript..." │
└─────────────────────────────────────────┘
MCP 服务器通信协议
Client (Mastra Agent) Server (MCP Server)
│ │
│ 1. Initialize Connection │
├──────────────────────────────────>│
│ { │
│ "jsonrpc": "2.0", │
│ "method": "initialize", │
│ "params": { │
│ "protocolVersion": "...", │
│ "capabilities": {...} │
│ } │
│ } │
│ │
│ 2. Initialize OK │
│<──────────────────────────────────┤
│ { │
│ "result": { │
│ "protocolVersion": "...", │
│ "capabilities": {...}, │
│ "serverInfo": {...} │
│ } │
│ } │
│ │
│ 3. List Tools │
├──────────────────────────────────>│
│ { │
│ "method": "tools/list" │
│ } │
│ │
│ 4. Tools List │
│<──────────────────────────────────┤
│ { │
│ "result": { │
│ "tools": [ │
│ { │
│ "name": "search", │
│ "description": "...", │
│ "inputSchema": {...} │
│ } │
│ ] │
│ } │
│ } │
│ │
│ 5. Call Tool │
├──────────────────────────────────>│
│ { │
│ "method": "tools/call", │
│ "params": { │
│ "name": "search", │
│ "arguments": { │
│ "query": "TypeScript" │
│ } │
│ } │
│ } │
│ │
│ 6. Tool Result │
│<──────────────────────────────────┤
│ { │
│ "result": { │
│ "content": [ │
│ { │
│ "type": "text", │
│ "text": "Result..." │
│ } │
│ ] │
│ } │
│ } │
│ │
│ 7. Disconnect │
├──────────────────────────────────>│
│ (close connection) │
│ │
最佳实践
1. 错误处理
class RobustMCPAgent { async safeToolCall(toolName: string, args: any): Promise<any> { const maxRetries = 3; let lastError: Error | null = null; for (let i = 0; i < maxRetries; i++) { try { return await this.agent.generate(`Use ${toolName} with ${JSON.stringify(args)}`); } catch (error) { lastError = error as Error; console.warn(`Attempt ${i + 1} failed:`, error); // 指数退避 await new Promise((resolve) => setTimeout(resolve, Math.pow(2, i) * 1000)); } } throw new Error(`Failed after ${maxRetries} attempts: ${lastError?.message}`); } }
2. 性能优化
class OptimizedMCPClient { private toolCache = new Map<string, any>(); private cacheExpiry = 5 * 60 * 1000; // 5 分钟 async getToolsCached(): Promise<Record<string, any>> { const cacheKey = 'all-tools'; const cached = this.toolCache.get(cacheKey); if (cached && Date.now() - cached.timestamp < this.cacheExpiry) { return cached.tools; } const tools = await this.mcpClient.getTools(); this.toolCache.set(cacheKey, { tools, timestamp: Date.now() }); return tools; } }
3. 日志和监控
class MonitoredMCPClient { private metrics = { toolCalls: new Map<string, number>(), errors: new Map<string, number>(), latencies: new Map<string, number[]>(), }; async callToolWithMetrics(toolName: string, args: any): Promise<any> { const startTime = Date.now(); try { const result = await this.callTool(toolName, args); // 记录成功调用 this.metrics.toolCalls.set(toolName, (this.metrics.toolCalls.get(toolName) || 0) + 1); // 记录延迟 const latency = Date.now() - startTime; if (!this.metrics.latencies.has(toolName)) { this.metrics.latencies.set(toolName, []); } this.metrics.latencies.get(toolName)!.push(latency); return result; } catch (error) { // 记录错误 this.metrics.errors.set(toolName, (this.metrics.errors.get(toolName) || 0) + 1); throw error; } } getMetrics() { return { toolCalls: Object.fromEntries(this.metrics.toolCalls), errors: Object.fromEntries(this.metrics.errors), avgLatencies: Object.fromEntries( Array.from(this.metrics.latencies.entries()).map(([tool, latencies]) => [ tool, latencies.reduce((a, b) => a + b, 0) / latencies.length, ]), ), }; } }
4. 安全性
class SecureMCPClient { private allowedTools = new Set<string>([ 'wikipedia_search', 'filesystem_read', // 白名单其他安全工具 ]); async validateAndCallTool(toolName: string, args: any): Promise<any> { // 验证工具名称 if (!this.allowedTools.has(toolName)) { throw new Error(`Tool ${toolName} is not allowed`); } // 验证参数 this.validateArgs(toolName, args); // 限制速率 await this.rateLimitCheck(toolName); // 执行调用 return await this.callTool(toolName, args); } private validateArgs(toolName: string, args: any): void { // 根据工具类型验证参数 if (toolName.startsWith('filesystem_')) { // 确保路径不包含 '..' if (args.path && args.path.includes('..')) { throw new Error('Path traversal detected'); } } } private async rateLimitCheck(toolName: string): Promise<void> { // 实现速率限制逻辑 } }
5. 测试
// tests/mcp-client.test.ts import { describe, it, expect, beforeAll, afterAll } from 'vitest'; import { MCPAgent } from '../src/agents/mcp-agent'; describe('MCP Agent', () => { let agent: MCPAgent; beforeAll(async () => { agent = new MCPAgent(); await agent.initialize(); }); afterAll(async () => { await agent.disconnect(); }); it('should list connected servers', () => { const servers = agent.listServers(); expect(servers.length).toBeGreaterThan(0); }); it('should handle Wikipedia search', async () => { const response = await agent.generate('Search for "TypeScript" on Wikipedia'); expect(response).toContain('TypeScript'); }); it('should handle file operations', async () => { await agent.generate('Create a test file with "Hello World"'); const content = await agent.generate('Read the test file'); expect(content).toContain('Hello World'); }); });
总结
关键要点
- MCPClient 是核心 - 通过
@mastra/mcp包管理所有 MCP 服务器连接 - 两种工具获取方式:
getTools()- 静态获取,用于 Agent 定义getToolsets()- 动态获取,用于运行时(multi-tenant)
- 支持多种传输 - Stdio、HTTP(Streamable)、SSE(legacy)
- 工具命名空间 - 自动以服务器名作为前缀,避免冲突
- 丰富的 API - 工具、资源、提示模板、日志
完整项目 Checklist
- ✅ 安装
@mastra/core和@mastra/mcp - ✅ 配置环境变量
- ✅ 创建 ServerConfig 配置
- ✅ 实现 MCPClientManager
- ✅ 创建集成 MCP 的 Agent
- ✅ 添加服务器管理命令
- ✅ 实现错误处理和重试
- ✅ 添加日志和监控
- ✅ 编写测试用例
运行项目
# 安装依赖 npm install # 运行主程序 npm run dev # 运行 CLI 工具 npm run mcp-manager list npm run mcp-manager add -n weather -t http -u https://api.weather.com/mcp # 运行测试 npm test
文档版本: 1.0
最后更新: 2025-11-13
作者: Claude (基于 Mastra.ai 官方文档整理)