Model Context Protocol (MCP) 完整教程
•Devin
AIMCP
Model Context Protocol (MCP) 完整教程
Model Context Protocol (MCP) 完整教程
本教程基于 MCP 官方规范,涵盖核心概念、架构设计、实现指南和最佳实践
目录
1. MCP 简介
1.1 什么是 MCP?
Model Context Protocol (MCP) 是由 Anthropic 在 2024 年 11 月发布的开放标准协议,旨在标准化 AI 应用与外部数据源、工具和系统之间的连接方式。
1.2 为什么需要 MCP?
问题背景:M×N 集成困境
在 MCP 之前,如果你有:
- M 个 AI 应用(Claude、ChatGPT、IDE 助手等)
- N 个工具/系统(GitHub、Slack、数据库等)
你需要构建 M×N 个不同的集成,导致:
- ❌ 重复开发工作
- ❌ 不一致的实现
- ❌ 维护成本高
- ❌ 难以扩展
MCP 的解决方案:M+N 模式
Preparing diagram...
通过 MCP:
- ✅ 工具开发者:只需构建 N 个 MCP Server
- ✅ 应用开发者:只需构建 M 个 MCP Client
- ✅ 总集成数:M + N(而非 M×N)
1.3 MCP 的核心特性
| 特性 | 说明 |
|---|---|
| 🌐 开放标准 | 详细的规范文档,任何人都可以实现 |
| 🔌 通用连接器 | 类似 AI 系统的 USB-C 接口 |
| 🔒 安全可控 | 支持 OAuth 2.1、权限管理 |
| 🔄 双向通信 | 基于 JSON-RPC 2.0 的消息传递 |
| 📦 模块化设计 | 可选择性实现不同功能 |
| 🚀 生态系统 | 丰富的 SDK 和预构建服务器 |
1.4 MCP 类比
可以将 MCP 理解为:
- USB-C 端口:统一的连接标准
- HTTP 协议:用于 Web 的标准通信协议
- LSP (Language Server Protocol):IDE 与编程语言的标准接口
2. 核心概念
2.1 三大核心术语
Preparing diagram...
Host(宿主)
- 用户直接交互的应用程序
- 协调整个系统,管理 LLM 交互
- 示例:Claude Desktop、Cursor IDE、Windsurf
Client(客户端)
- 存在于 Host 内部的连接管理器
- 与单个 Server 保持 1 关系
- 负责会话管理、错误处理、重连机制
Server(服务器)
- 通过标准化 API 暴露功能的外部程序
- 提供 Tools、Resources 和 Prompts
- 可以是本地进程或远程 HTTP 服务
2.2 三大核心能力
Preparing diagram...
Tools(工具)- 模型控制
AI 模型决定何时调用的可执行函数,通常有副作用。
示例:
{ "name": "send_email", "description": "发送邮件给指定收件人", "inputSchema": { "type": "object", "properties": { "to": { "type": "string" }, "subject": { "type": "string" }, "body": { "type": "string" } }, "required": ["to", "subject", "body"] } }
Resources(资源)- 应用控制
应用提供给 AI 的只读数据源,类似 REST API 的 GET 端点。
示例:
{ "uri": "file:///project/README.md", "name": "项目文档", "description": "项目的 README 文件", "mimeType": "text/markdown" }
Prompts(提示模板)- 用户控制
用户可以调用的预定义提示模板,封装最佳实践。
示例:
{ "name": "code_review", "description": "对代码进行详细审查", "arguments": [ { "name": "code", "description": "需要审查的代码", "required": true } ] }
3. 系统架构
3.1 整体架构图
Preparing diagram...
3.2 Client-Host-Server 架构
Preparing diagram...
3.3 分层架构
Preparing diagram...
4. 核心组件详解
4.1 Host(宿主应用)
职责:
- 用户交互界面:处理用户输入和展示结果
- LLM 集成:管理与 AI 模型的通信
- Client 管理:初始化和协调多个 Client
- 安全边界:维护明确的安全边界和权限控制
常见 Host 示例:
- Claude Desktop:Anthropic 的桌面应用
- Cursor IDE:AI 驱动的代码编辑器
- Windsurf:智能开发环境
- IBM BeeAI:企业级 AI 助手
4.2 Client(客户端)
职责:
- 连接管理:维护与 Server 的 1 连接
- 消息转换:将 MCP 消息转换为 JSON-RPC 格式
- 会话管理:处理中断、超时、重连
- 能力发现:查询和缓存 Server 能力
Client 生命周期:
Preparing diagram...
4.3 Server(服务器)
职责:
- 能力暴露:声明支持的 Tools、Resources、Prompts
- 请求处理:执行工具调用、返回资源数据
- 状态管理:维护会话状态(如果需要)
- 安全验证:实现认证和授权逻辑
Server 类型:
| 类型 | 传输方式 | 使用场景 | 示例 |
|---|---|---|---|
| 本地 Server | stdio | 本地工具、文件系统 | Git 操作、文件读写 |
| 远程 Server | HTTP | 云服务、外部 API | GitHub API、Slack |
| 混合 Server | 两者都支持 | 灵活部署 | 数据库服务器 |
5. 协议工作流程
5.1 完整通信流程
Preparing diagram...
5.2 初始化流程详解
Preparing diagram...
5.3 Tool 调用流程
Preparing diagram...
5.4 Resource 读取流程
Preparing diagram...
6. 传输层
6.1 两种传输方式对比
Preparing diagram...
| 特性 | stdio | Streamable HTTP |
|---|---|---|
| 适用场景 | 本地工具、CLI | 远程服务、Web API |
| 连接方式 | 进程间通信 | HTTP 网络请求 |
| 安全性 | 进程隔离 | OAuth 2.1、TLS |
| 会话管理 | 无需会话 | Session ID |
| 断线恢复 | N/A | Last-Event-ID |
| 性能 | 极快 | 依赖网络 |
| 实现复杂度 | 简单 | 中等 |
6.2 Streamable HTTP 工作原理
Preparing diagram...
6.3 完整的 HTTP 流程
Preparing diagram...
7. 能力协商机制
7.1 能力协商流程
Preparing diagram...
7.2 常见能力列表
Server 端能力
interface ServerCapabilities { // 工具相关 tools?: { listChanged?: boolean; // 支持工具列表变更通知 }; // 资源相关 resources?: { subscribe?: boolean; // 支持资源订阅 listChanged?: boolean; // 支持资源列表变更通知 }; // 提示模板相关 prompts?: { listChanged?: boolean; // 支持提示模板列表变更通知 }; // 日志记录 logging?: {}; // 实验性功能 experimental?: { [key: string]: any; }; }
Client 端能力
interface ClientCapabilities { // 根目录(工作空间)管理 roots?: { listChanged?: boolean; // 支持根目录列表变更通知 }; // 采样(让 Server 请求 LLM) sampling?: {}; // 实验性功能 experimental?: { [key: string]: any; }; }
7.3 能力检查决策树
Preparing diagram...
8. 实现指南
8.1 快速开始:创建一个 MCP Server
环境准备
# Node.js/TypeScript npm install @modelcontextprotocol/sdk # Python pip install mcp
最小化 Server 实现(TypeScript)
import { Server } from '@modelcontextprotocol/sdk/server/index.js'; import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; import { CallToolRequestSchema, ListToolsRequestSchema } from '@modelcontextprotocol/sdk/types.js'; // 1. 创建 Server 实例 const server = new Server( { name: 'example-server', version: '1.0.0', }, { capabilities: { tools: {}, }, }, ); // 2. 注册工具列表处理器 server.setRequestHandler(ListToolsRequestSchema, async () => { return { tools: [ { name: 'get_weather', description: '获取指定城市的天气信息', inputSchema: { type: 'object', properties: { city: { type: 'string', description: '城市名称', }, }, required: ['city'], }, }, ], }; }); // 3. 注册工具调用处理器 server.setRequestHandler(CallToolRequestSchema, async (request) => { if (request.params.name === 'get_weather') { const city = request.params.arguments?.city as string; // 模拟 API 调用 return { content: [ { type: 'text', text: `${city} 的天气:晴天,25°C`, }, ], }; } throw new Error(`Unknown tool: ${request.params.name}`); }); // 4. 启动 Server async function main() { const transport = new StdioServerTransport(); await server.connect(transport); console.error('MCP Server 已启动'); } main().catch((error) => { console.error('Server 错误:', error); process.exit(1); });
配置文件(Claude Desktop)
{ "mcpServers": { "weather": { "command": "node", "args": ["/path/to/weather-server/build/index.js"] } } }
8.2 创建一个 MCP Client
import { Client } from '@modelcontextprotocol/sdk/client/index.js'; import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js'; async function main() { // 1. 创建 Client 实例 const client = new Client( { name: 'example-client', version: '1.0.0', }, { capabilities: {}, }, ); // 2. 创建传输层 const transport = new StdioClientTransport({ command: 'node', args: ['/path/to/server/build/index.js'], }); // 3. 连接到 Server await client.connect(transport); // 4. 列出可用工具 const tools = await client.listTools(); console.log('可用工具:', tools); // 5. 调用工具 const result = await client.callTool({ name: 'get_weather', arguments: { city: '北京', }, }); console.log('结果:', result); // 6. 关闭连接 await client.close(); } main();
8.3 实现 Resource Server
server.setRequestHandler(ListResourcesRequestSchema, async () => { return { resources: [ { uri: 'file:///project/README.md', name: '项目文档', description: '项目的 README 文件', mimeType: 'text/markdown', }, ], }; }); server.setRequestHandler(ReadResourceRequestSchema, async (request) => { const uri = request.params.uri; if (uri === 'file:///project/README.md') { const content = await fs.readFile('/project/README.md', 'utf-8'); return { contents: [ { uri: uri, mimeType: 'text/markdown', text: content, }, ], }; } throw new Error(`Resource not found: ${uri}`); });
8.4 实现 Streamable HTTP Server
import express from 'express'; import { Server } from '@modelcontextprotocol/sdk/server/index.js'; import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; const app = express(); app.use(express.json()); const server = new Server( { name: 'http-server', version: '1.0.0', }, { capabilities: { tools: {}, }, }, ); // MCP 端点 const MCP_ENDPOINT = '/mcp'; // POST 请求处理 app.post(MCP_ENDPOINT, async (req, res) => { const transport = new StreamableHTTPServerTransport({ sessionId: req.headers['mcp-session-id'] as string, }); await transport.handlePostRequest(req, res, server); }); // GET 请求处理(用于 SSE 恢复) app.get(MCP_ENDPOINT, async (req, res) => { const transport = new StreamableHTTPServerTransport({ sessionId: req.headers['mcp-session-id'] as string, }); await transport.handleGetRequest(req, res, server); }); app.listen(3000, () => { console.log('MCP Server 运行在 http://localhost:3000'); });
9. 最佳实践
9.1 Server 端最佳实践
✅ DO(推荐做法)
// 1. 明确的工具描述 { name: "send_email", description: "向指定收件人发送电子邮件。需要有效的邮箱地址。", inputSchema: { type: "object", properties: { to: { type: "string", description: "收件人邮箱地址(必须是有效格式)", pattern: "^[^@]+@[^@]+\\.[^@]+$" }, subject: { type: "string", description: "邮件主题(最多 100 个字符)", maxLength: 100 }, body: { type: "string", description: "邮件正文(支持 HTML)" } }, required: ["to", "subject", "body"] } } // 2. 详细的错误处理 try { const result = await performAction(); return { content: [{ type: "text", text: JSON.stringify(result) }] }; } catch (error) { return { content: [{ type: "text", text: `错误: ${error.message}` }], isError: true }; } // 3. 资源使用 URI 标准 { uri: "file:///absolute/path/to/file.txt", name: "配置文件", mimeType: "text/plain" }
❌ DON'T(避免的做法)
// 1. 模糊的工具描述 { name: "do_thing", description: "做一些事情" // ❌ 太模糊 } // 2. 吞掉错误 try { await performAction(); } catch (error) { // ❌ 不要静默失败 return { success: true }; } // 3. 相对路径 { uri: "./file.txt", // ❌ 应使用绝对路径 name: "文件" }
9.2 Client 端最佳实践
连接管理
class MCPClientManager { private client: Client; private reconnectAttempts = 0; private maxReconnectAttempts = 5; async connect() { try { await this.client.connect(this.transport); this.reconnectAttempts = 0; } catch (error) { await this.handleConnectionError(error); } } private async handleConnectionError(error: Error) { if (this.reconnectAttempts < this.maxReconnectAttempts) { this.reconnectAttempts++; const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000); console.log(`重连中... (${this.reconnectAttempts}/${this.maxReconnectAttempts})`); await new Promise((resolve) => setTimeout(resolve, delay)); await this.connect(); } else { throw new Error('无法连接到 MCP Server'); } } }
请求超时
async function callToolWithTimeout(client: Client, toolName: string, args: any, timeoutMs = 30000) { return Promise.race([ client.callTool({ name: toolName, arguments: args }), new Promise((_, reject) => setTimeout(() => reject(new Error('请求超时')), timeoutMs)), ]); }
9.3 安全最佳实践
Server 端
// 1. 验证输入 function validateInput(input: any, schema: any): boolean { // 使用 JSON Schema 验证 const ajv = new Ajv(); const validate = ajv.compile(schema); return validate(input); } // 2. 限制资源访问 const ALLOWED_PATHS = ['/safe/directory']; function isPathAllowed(path: string): boolean { return ALLOWED_PATHS.some((allowed) => path.startsWith(allowed)); } // 3. 实现速率限制 const rateLimiter = new Map<string, number>(); function checkRateLimit(clientId: string): boolean { const now = Date.now(); const lastRequest = rateLimiter.get(clientId) || 0; if (now - lastRequest < 1000) { return false; // 限制每秒一个请求 } rateLimiter.set(clientId, now); return true; }
HTTP Server 安全
import helmet from 'helmet'; import cors from 'cors'; app.use(helmet()); app.use( cors({ origin: process.env.ALLOWED_ORIGINS?.split(','), credentials: true, }), ); // OAuth 2.1 认证 app.use('/mcp', async (req, res, next) => { const token = req.headers.authorization?.replace('Bearer ', ''); if (!token) { return res.status(401).json({ error: '未授权' }); } try { const user = await verifyToken(token); req.user = user; next(); } catch (error) { return res.status(401).json({ error: '无效的令牌' }); } });
9.4 性能优化
缓存策略
class ResourceCache { private cache = new Map<string, { data: any; expires: number }>(); async get(uri: string, fetcher: () => Promise<any>, ttl = 60000) { const cached = this.cache.get(uri); if (cached && cached.expires > Date.now()) { return cached.data; } const data = await fetcher(); this.cache.set(uri, { data, expires: Date.now() + ttl, }); return data; } invalidate(uri: string) { this.cache.delete(uri); } }
批量请求
// Server 支持批量请求 server.setRequestHandler('tools/callBatch', async (request) => { const calls = request.params.calls as Array<{ name: string; arguments: any; }>; const results = await Promise.all(calls.map((call) => executeTool(call.name, call.arguments))); return { results }; });
10. 实战示例
10.1 示例 1:文件系统 Server
完整的文件系统操作 MCP Server:
import { Server } from '@modelcontextprotocol/sdk/server/index.js'; import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; import * as fs from 'fs/promises'; import * as path from 'path'; const SAFE_ROOT = '/safe/directory'; const server = new Server( { name: 'filesystem-server', version: '1.0.0', }, { capabilities: { tools: {}, resources: { subscribe: true }, }, }, ); // 工具:读取文件 server.setRequestHandler(ListToolsRequestSchema, async () => { return { tools: [ { name: 'read_file', description: '读取文件内容', inputSchema: { type: 'object', properties: { path: { type: 'string', description: '文件路径' }, }, required: ['path'], }, }, { name: 'write_file', description: '写入文件内容', inputSchema: { type: 'object', properties: { path: { type: 'string', description: '文件路径' }, content: { type: 'string', description: '文件内容' }, }, required: ['path', 'content'], }, }, { name: 'list_directory', description: '列出目录内容', inputSchema: { type: 'object', properties: { path: { type: 'string', description: '目录路径' }, }, required: ['path'], }, }, ], }; }); server.setRequestHandler(CallToolRequestSchema, async (request) => { const { name, arguments: args } = request.params; // 安全检查 function validatePath(p: string): string { const absolute = path.resolve(SAFE_ROOT, p); if (!absolute.startsWith(SAFE_ROOT)) { throw new Error('路径不在安全范围内'); } return absolute; } switch (name) { case 'read_file': { const filePath = validatePath(args.path as string); const content = await fs.readFile(filePath, 'utf-8'); return { content: [{ type: 'text', text: content }], }; } case 'write_file': { const filePath = validatePath(args.path as string); await fs.writeFile(filePath, args.content as string, 'utf-8'); return { content: [{ type: 'text', text: '文件写入成功' }], }; } case 'list_directory': { const dirPath = validatePath(args.path as string); const entries = await fs.readdir(dirPath, { withFileTypes: true }); const list = entries.map((entry) => ({ name: entry.name, type: entry.isDirectory() ? 'directory' : 'file', })); return { content: [{ type: 'text', text: JSON.stringify(list, null, 2) }], }; } default: throw new Error(`未知工具: ${name}`); } }); // 资源:文件内容 server.setRequestHandler(ListResourcesRequestSchema, async () => { const files = await fs.readdir(SAFE_ROOT); return { resources: files.map((file) => ({ uri: `file:///${path.join(SAFE_ROOT, file)}`, name: file, mimeType: 'text/plain', })), }; }); server.setRequestHandler(ReadResourceRequestSchema, async (request) => { const uri = request.params.uri; const filePath = uri.replace('file:///', ''); const content = await fs.readFile(filePath, 'utf-8'); return { contents: [ { uri: uri, mimeType: 'text/plain', text: content, }, ], }; }); // 启动 const transport = new StdioServerTransport(); server.connect(transport);
10.2 示例 2:数据库查询 Server
import { Pool } from 'pg'; const pool = new Pool({ host: 'localhost', database: 'mydb', user: 'user', password: 'password', }); server.setRequestHandler(ListToolsRequestSchema, async () => { return { tools: [ { name: 'query_database', description: '执行 SQL 查询(仅支持 SELECT)', inputSchema: { type: 'object', properties: { query: { type: 'string', description: 'SQL 查询语句' }, }, required: ['query'], }, }, ], }; }); server.setRequestHandler(CallToolRequestSchema, async (request) => { if (request.params.name === 'query_database') { const query = request.params.arguments?.query as string; // 安全检查:仅允许 SELECT if (!query.trim().toUpperCase().startsWith('SELECT')) { throw new Error('仅支持 SELECT 查询'); } try { const result = await pool.query(query); return { content: [ { type: 'text', text: JSON.stringify(result.rows, null, 2), }, ], }; } catch (error) { return { content: [ { type: 'text', text: `查询错误: ${error.message}`, }, ], isError: true, }; } } throw new Error(`未知工具: ${request.params.name}`); });
10.3 示例 3:GitHub API Server
import { Octokit } from '@octokit/rest'; const octokit = new Octokit({ auth: process.env.GITHUB_TOKEN, }); server.setRequestHandler(ListToolsRequestSchema, async () => { return { tools: [ { name: 'list_repositories', description: '列出用户的所有仓库', inputSchema: { type: 'object', properties: { username: { type: 'string', description: 'GitHub 用户名' }, }, required: ['username'], }, }, { name: 'get_issues', description: '获取仓库的 Issues', inputSchema: { type: 'object', properties: { owner: { type: 'string', description: '仓库所有者' }, repo: { type: 'string', description: '仓库名称' }, state: { type: 'string', enum: ['open', 'closed', 'all'], description: 'Issue 状态', }, }, required: ['owner', 'repo'], }, }, { name: 'create_issue', description: '创建新的 Issue', inputSchema: { type: 'object', properties: { owner: { type: 'string' }, repo: { type: 'string' }, title: { type: 'string' }, body: { type: 'string' }, }, required: ['owner', 'repo', 'title'], }, }, ], }; }); server.setRequestHandler(CallToolRequestSchema, async (request) => { const { name, arguments: args } = request.params; switch (name) { case 'list_repositories': { const { data } = await octokit.repos.listForUser({ username: args.username as string, }); return { content: [ { type: 'text', text: JSON.stringify( data.map((repo) => ({ name: repo.name, description: repo.description, stars: repo.stargazers_count, })), null, 2, ), }, ], }; } case 'get_issues': { const { data } = await octokit.issues.listForRepo({ owner: args.owner as string, repo: args.repo as string, state: (args.state as any) || 'open', }); return { content: [ { type: 'text', text: JSON.stringify( data.map((issue) => ({ number: issue.number, title: issue.title, state: issue.state, created_at: issue.created_at, })), null, 2, ), }, ], }; } case 'create_issue': { const { data } = await octokit.issues.create({ owner: args.owner as string, repo: args.repo as string, title: args.title as string, body: args.body as string, }); return { content: [ { type: 'text', text: `Issue 创建成功: #${data.number} - ${data.html_url}`, }, ], }; } default: throw new Error(`未知工具: ${name}`); } });
10.4 测试你的 MCP Server
使用官方的 MCP Inspector 工具:
# 安装 MCP Inspector npm install -g @modelcontextprotocol/inspector # 启动 Inspector mcp-inspector node path/to/your/server/index.js # 浏览器打开 # http://localhost:5173
MCP Inspector 提供:
- 🔍 查看工具、资源、提示模板列表
- 🧪 交互式测试工具调用
- 📊 查看请求/响应日志
- 🐛 调试 JSON-RPC 消息
总结
MCP 的核心价值
- 标准化:统一的协议,减少重复开发
- 模块化:清晰的组件边界,易于扩展
- 安全性:内置的安全机制和最佳实践
- 生态系统:丰富的工具和社区支持
下一步行动
- ✅ 阅读官方文档
- ✅ 尝试预构建的 MCP Servers
- ✅ 构建你的第一个 MCP Server
- ✅ 加入 MCP 社区
参考资源
- 📘 MCP 规范
- 📦 TypeScript SDK
- 🐍 Python SDK
- 💬 Discord 社区
- 🎓 官方教程
本教程由 Claude 基于 MCP 官方文档整理,最后更新:2025-11