完整的 Agentic AI 应用:构建生产级多 Agent 客服系统
•Devin
Agentic AI
从零构建一个完整的多 Agent 智能客服系统,包含多渠道输入、智能路由、RAG 知识库、Agent 协作、记忆管理和性能监控的生产级实现。
完整的 Agentic AI 应用:构建生产级多 Agent 客服系统
目录
1. 项目概述
1.1 项目目标
构建一个生产级智能客服系统,具备:
- ✅ 多渠道接入(Webhook、Email、Chat)
- ✅ 智能意图识别和路由
- ✅ RAG 知识库(Supabase)
- ✅ 多 Agent 协作
- ✅ 对话记忆管理
- ✅ 性能监控和日志
1.2 系统特性
Preparing diagram...
1.3 技术栈
const techStack = { platform: 'n8n', ai: { llm: 'OpenAI GPT-4', embedding: 'text-embedding-3-small' }, database: { vector: 'Supabase (pgvector)', cache: 'Redis', storage: 'PostgreSQL' }, monitoring: { metrics: 'Prometheus', logging: 'Winston', tracing: 'Sentry' } };
2. 系统架构
2.1 整体架构
Preparing diagram...
2.2 数据流
Preparing diagram...
3. 多渠道输入
3.1 Webhook 输入
// Webhook 节点配置 { "name": "Webhook Input", "type": "n8n-nodes-base.webhook", "parameters": { "path": "customer-support", "httpMethod": "POST", "responseMode": "responseNode", "authentication": "headerAuth" } } // 数据标准化 const webhookData = { channel: 'webhook', userId: $json.body.userId, message: $json.body.message, metadata: { source: 'api', timestamp: new Date().toISOString(), sessionId: $json.body.sessionId } };
3.2 Email 输入
// Email Trigger 节点 { "name": "Email Input", "type": "n8n-nodes-base.emailReadImap", "parameters": { "mailbox": "INBOX", "format": "simple", "options": { "allowUnauthorizedCerts": false } } } // Email 数据标准化 const emailData = { channel: 'email', userId: extractUserId($json.from), message: $json.text, metadata: { source: 'email', subject: $json.subject, from: $json.from, timestamp: $json.date } };
3.3 统一数据格式
interface UnifiedMessage { channel: 'webhook' | 'email' | 'chat'; userId: string; message: string; metadata: { source: string; timestamp: string; sessionId?: string; [key: string]: any; }; } // Function 节点 - 数据标准化 function normalizeInput(input: any): UnifiedMessage { const channel = input.channel || detectChannel(input); return { channel, userId: extractUserId(input), message: extractMessage(input), metadata: { source: channel, timestamp: new Date().toISOString(), ...extractMetadata(input) } }; }
4. 智能路由系统
4.1 意图识别
// Code 节点 - 意图识别 const OpenAI = require('openai'); const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY }); const message = $('Normalize Input').item.json.message; const response = await openai.chat.completions.create({ model: 'gpt-4-turbo-preview', messages: [ { role: 'system', content: `你是一个意图分类器。分析用户消息并返回以下之一: - general_inquiry: 一般咨询 - technical_support: 技术支持 - billing: 账单问题 - complaint: 投诉 - escalation: 需要人工介入 只返回分类名称,不要其他内容。` }, { role: 'user', content: message } ], temperature: 0.3 }); const intent = response.choices[0].message.content.trim(); // 计算优先级 const priority = calculatePriority(intent, message); return [{ json: { ...$input.item.json, intent, priority, classification: { intent, confidence: 0.9, timestamp: new Date().toISOString() } } }];
4.2 Agent 路由规则
// Switch 节点 - Agent 路由 const routingRules = { general_inquiry: 'Customer Service Agent', technical_support: 'Technical Support Agent', billing: 'Customer Service Agent', complaint: 'Escalation Agent', escalation: 'Escalation Agent' }; const intent = $json.intent; const selectedAgent = routingRules[intent] || 'Customer Service Agent'; return [{ json: { ...$json, assignedAgent: selectedAgent, routingDecision: { intent, agent: selectedAgent, timestamp: new Date().toISOString() } } }];
4.3 优先级队列
// Function 节点 - 优先级管理 const Redis = require('ioredis'); const redis = new Redis(process.env.REDIS_URL); const priority = $json.priority; const messageId = $json.metadata.sessionId; // 添加到优先级队列 await redis.zadd( 'support_queue', priority, JSON.stringify({ messageId, userId: $json.userId, intent: $json.intent, timestamp: Date.now() }) ); // 获取队列位置 const position = await redis.zrank('support_queue', messageId); return [{ json: { ...$json, queueInfo: { position: position + 1, priority, estimatedWait: calculateWaitTime(position) } } }];
5. RAG 知识库集成
5.1 Supabase 配置
-- 创建知识库表 CREATE TABLE knowledge_base ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), title TEXT NOT NULL, content TEXT NOT NULL, category TEXT, embedding vector(1536), metadata JSONB DEFAULT '{}', created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); -- 启用 RLS ALTER TABLE knowledge_base ENABLE ROW LEVEL SECURITY; -- 创建向量索引 CREATE INDEX idx_kb_embedding ON knowledge_base USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100); -- 创建搜索函数 CREATE OR REPLACE FUNCTION search_knowledge_base( query_embedding vector(1536), match_threshold float DEFAULT 0.7, match_count int DEFAULT 5 ) RETURNS TABLE ( id uuid, title text, content text, category text, similarity float ) LANGUAGE sql STABLE AS $$ SELECT id, title, content, category, 1 - (embedding <=> query_embedding) AS similarity FROM knowledge_base WHERE 1 - (embedding <=> query_embedding) > match_threshold ORDER BY embedding <=> query_embedding LIMIT match_count; $$;
5.2 RAG 检索实现
// Code 节点 - RAG 检索 const { createClient } = require('@supabase/supabase-js'); const OpenAI = require('openai'); const supabase = createClient( process.env.SUPABASE_URL, process.env.SUPABASE_KEY ); const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY }); const userMessage = $('Router').item.json.message; // 1. 生成查询向量 const embeddingResponse = await openai.embeddings.create({ model: 'text-embedding-3-small', input: userMessage }); const queryEmbedding = embeddingResponse.data[0].embedding; // 2. 向量搜索 const { data: documents, error } = await supabase.rpc('search_knowledge_base', { query_embedding: queryEmbedding, match_threshold: 0.7, match_count: 3 }); if (error) throw error; // 3. 构建上下文 const context = documents.map(doc => ({ title: doc.title, content: doc.content, similarity: doc.similarity })); return [{ json: { ...$input.item.json, ragContext: { documents: context, query: userMessage, retrievedAt: new Date().toISOString() } } }];
5.3 知识库管理
// 添加文档到知识库 async function addToKnowledgeBase(document) { // 生成 embedding const embeddingResponse = await openai.embeddings.create({ model: 'text-embedding-3-small', input: document.content }); const embedding = embeddingResponse.data[0].embedding; // 插入数据库 const { data, error } = await supabase .from('knowledge_base') .insert({ title: document.title, content: document.content, category: document.category, embedding: embedding, metadata: document.metadata }); return data; } // 批量导入 async function batchImport(documents) { for (const doc of documents) { await addToKnowledgeBase(doc); await sleep(100); // 避免速率限制 } }
6. 多 Agent 协作
6.1 客服 Agent
// Code 节点 - 客服 Agent const customerServiceAgent = async (input) => { const { message, ragContext, conversationHistory } = input; const systemPrompt = `你是一个专业的客服代表。 知识库内容: ${ragContext.documents.map(d => `- ${d.title}: ${d.content}`).join('\n')} 对话历史: ${conversationHistory.slice(-5).map(m => `${m.role}: ${m.content}`).join('\n')} 职责: - 回答一般性问题 - 提供产品信息 - 处理简单的账户问题 - 如果问题复杂,建议转接技术支持 保持友好、专业、简洁。`; const response = await openai.chat.completions.create({ model: 'gpt-4-turbo-preview', messages: [ { role: 'system', content: systemPrompt }, { role: 'user', content: message } ], tools: [ { type: 'function', function: { name: 'escalate_to_technical', description: '将问题升级到技术支持', parameters: { type: 'object', properties: { reason: { type: 'string' } } } } } ] }); return response.choices[0].message; };
6.2 技术支持 Agent
// Code 节点 - 技术支持 Agent const technicalSupportAgent = async (input) => { const { message, ragContext, userInfo } = input; const systemPrompt = `你是一个技术支持专家。 用户信息: - 产品版本:${userInfo.productVersion} - 操作系统:${userInfo.os} - 最近错误:${userInfo.recentErrors} 技术文档: ${ragContext.documents.map(d => d.content).join('\n\n')} 职责: - 诊断技术问题 - 提供解决方案 - 收集错误日志 - 创建技术工单 可用工具: - check_system_status: 检查系统状态 - create_ticket: 创建技术工单 - run_diagnostic: 运行诊断`; const tools = [ { type: 'function', function: { name: 'check_system_status', description: '检查系统和服务状态', parameters: { type: 'object', properties: { service: { type: 'string' } } } } }, { type: 'function', function: { name: 'create_ticket', description: '创建技术支持工单', parameters: { type: 'object', properties: { title: { type: 'string' }, description: { type: 'string' }, severity: { type: 'string', enum: ['low', 'medium', 'high', 'critical'] } }, required: ['title', 'description', 'severity'] } } } ]; const response = await openai.chat.completions.create({ model: 'gpt-4-turbo-preview', messages: [ { role: 'system', content: systemPrompt }, { role: 'user', content: message } ], tools }); return response.choices[0].message; };
6.3 升级 Agent
// Code 节点 - 升级 Agent const escalationAgent = async (input) => { const { message, conversationHistory, sentiment } = input; const systemPrompt = `你是一个升级处理专家。 对话历史: ${conversationHistory.map(m => `${m.role}: ${m.content}`).join('\n')} 情绪分析:${sentiment} 职责: - 评估是否需要人工介入 - 安抚客户情绪 - 收集完整信息 - 创建升级工单 判断标准: - 客户情绪负面 - 问题复杂度高 - 涉及退款/赔偿 - 多次未解决`; const response = await openai.chat.completions.create({ model: 'gpt-4-turbo-preview', messages: [ { role: 'system', content: systemPrompt }, { role: 'user', content: message } ], tools: [ { type: 'function', function: { name: 'create_escalation', description: '创建人工介入工单', parameters: { type: 'object', properties: { priority: { type: 'string', enum: ['normal', 'high', 'urgent'] }, summary: { type: 'string' }, customerMood: { type: 'string' } }, required: ['priority', 'summary'] } } } ] }); return response.choices[0].message; };
6.4 Agent 协作流程
Preparing diagram...
7. 记忆管理
7.1 对话历史存储
-- 对话历史表 CREATE TABLE conversations ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), user_id TEXT NOT NULL, session_id TEXT NOT NULL, role TEXT NOT NULL CHECK (role IN ('user', 'assistant', 'system')), content TEXT NOT NULL, metadata JSONB DEFAULT '{}', created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); CREATE INDEX idx_conversations_session ON conversations(session_id, created_at); CREATE INDEX idx_conversations_user ON conversations(user_id, created_at DESC);
7.2 记忆加载
// Function 节点 - 加载对话历史 const { createClient } = require('@supabase/supabase-js'); const supabase = createClient(process.env.SUPABASE_URL, process.env.SUPABASE_KEY); const userId = $json.userId; const sessionId = $json.metadata.sessionId; // 加载最近对话 const { data: history } = await supabase .from('conversations') .select('*') .eq('session_id', sessionId) .order('created_at', { ascending: true }) .limit(20); // 加载用户画像 const { data: profile } = await supabase .from('user_profiles') .select('*') .eq('user_id', userId) .single(); return [{ json: { ...$json, conversationHistory: history || [], userProfile: profile || {}, memoryLoaded: true } }];
7.3 用户画像
interface UserProfile { userId: string; preferences: { language: string; communicationStyle: string; topics: string[]; }; history: { totalInteractions: number; lastInteraction: string; commonIssues: string[]; }; sentiment: { overall: 'positive' | 'neutral' | 'negative'; recentTrend: number; }; metadata: { tier: 'free' | 'pro' | 'enterprise'; accountAge: number; [key: string]: any; }; } // 更新用户画像 async function updateUserProfile(userId: string, interaction: any) { const profile = await loadProfile(userId); profile.history.totalInteractions++; profile.history.lastInteraction = new Date().toISOString(); // 更新情绪趋势 const sentiment = await analyzeSentiment(interaction.message); profile.sentiment.recentTrend = calculateTrend(profile.sentiment, sentiment); await saveProfile(userId, profile); }
7.4 记忆保存
// Function 节点 - 保存对话 const saveConversation = async (data) => { const { userId, sessionId, userMessage, assistantMessage } = data; // 保存用户消息 await supabase.from('conversations').insert({ user_id: userId, session_id: sessionId, role: 'user', content: userMessage, metadata: { timestamp: new Date().toISOString(), channel: data.channel } }); // 保存助手回复 await supabase.from('conversations').insert({ user_id: userId, session_id: sessionId, role: 'assistant', content: assistantMessage, metadata: { agent: data.assignedAgent, intent: data.intent, timestamp: new Date().toISOString() } }); // 更新用户画像 await updateUserProfile(userId, { message: userMessage, response: assistantMessage, intent: data.intent }); };
8. 性能监控
8.1 指标收集
// Function 节点 - 收集指标 const collectMetrics = (executionData) => { const metrics = { // 性能指标 performance: { totalDuration: Date.now() - executionData.startTime, ragRetrievalTime: executionData.ragTime, agentProcessingTime: executionData.agentTime, databaseTime: executionData.dbTime }, // 使用指标 usage: { tokensUsed: executionData.tokensUsed, documentsRetrieved: executionData.documentsCount, agentType: executionData.assignedAgent }, // 质量指标 quality: { intent: executionData.intent, confidence: executionData.confidence, userSatisfaction: executionData.satisfaction }, // 业务指标 business: { channel: executionData.channel, resolved: executionData.resolved, escalated: executionData.escalated } }; return metrics; };
8.2 日志系统
// Winston 日志配置 const winston = require('winston'); const logger = winston.createLogger({ level: 'info', format: winston.format.combine( winston.format.timestamp(), winston.format.json() ), transports: [ new winston.transports.File({ filename: 'error.log', level: 'error' }), new winston.transports.File({ filename: 'combined.log' }) ] }); // 结构化日志 logger.info('Agent execution completed', { userId: data.userId, sessionId: data.sessionId, agent: data.assignedAgent, intent: data.intent, duration: data.duration, tokensUsed: data.tokensUsed, timestamp: new Date().toISOString() });
8.3 错误追踪
// Sentry 集成 const Sentry = require('@sentry/node'); Sentry.init({ dsn: process.env.SENTRY_DSN, environment: process.env.NODE_ENV, tracesSampleRate: 1.0 }); // 错误捕获 try { const result = await processRequest(data); return result; } catch (error) { Sentry.captureException(error, { tags: { workflow: $workflow.name, node: $node.name, userId: data.userId }, extra: { input: data, timestamp: new Date().toISOString() } }); throw error; }
8.4 实时监控仪表板
// Prometheus 指标导出 const prometheus = require('prom-client'); const requestCounter = new prometheus.Counter({ name: 'agent_requests_total', help: 'Total number of agent requests', labelNames: ['agent', 'intent', 'status'] }); const requestDuration = new prometheus.Histogram({ name: 'agent_request_duration_seconds', help: 'Duration of agent requests', labelNames: ['agent'], buckets: [0.1, 0.5, 1, 2, 5, 10] }); // 记录指标 requestCounter.inc({ agent: 'customer_service', intent: 'general', status: 'success' }); requestDuration.observe({ agent: 'customer_service' }, duration);
9. 完整实现
9.1 主工作流
由于完整工作流代码较长,这里提供核心结构:
// 主工作流结构 const mainWorkflow = { // 1. 输入层 inputs: [ 'Webhook Input', 'Email Input', 'Chat Input' ], // 2. 标准化 normalize: 'Normalize Input', // 3. 路由 routing: [ 'Intent Recognition', 'Agent Router', 'Priority Queue' ], // 4. 上下文加载 context: [ 'Load Conversation History', 'Load User Profile', 'RAG Retrieval' ], // 5. Agent 执行 agents: [ 'Customer Service Agent', 'Technical Support Agent', 'Escalation Agent' ], // 6. 后处理 postProcess: [ 'Save Conversation', 'Update Profile', 'Collect Metrics' ], // 7. 响应 response: 'Format and Send Response' };
9.2 部署配置
# docker-compose.yml version: '3.8' services: n8n: image: n8nio/n8n ports: - "5678:5678" environment: - N8N_BASIC_AUTH_ACTIVE=true - N8N_BASIC_AUTH_USER=admin - N8N_BASIC_AUTH_PASSWORD=${N8N_PASSWORD} - OPENAI_API_KEY=${OPENAI_API_KEY} - SUPABASE_URL=${SUPABASE_URL} - SUPABASE_KEY=${SUPABASE_KEY} - REDIS_URL=redis://redis:6379 volumes: - n8n_data:/home/node/.n8n depends_on: - redis - postgres redis: image: redis:7-alpine ports: - "6379:6379" postgres: image: postgres:15-alpine environment: - POSTGRES_DB=n8n - POSTGRES_USER=n8n - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} volumes: - postgres_data:/var/lib/postgresql/data volumes: n8n_data: postgres_data:
10. 测试与优化
10.1 测试场景
// 测试用例 const testScenarios = [ { name: '一般咨询', input: { message: '你们的营业时间是什么?', userId: 'test_user_1' }, expected: { intent: 'general_inquiry', agent: 'Customer Service Agent', resolved: true } }, { name: '技术问题', input: { message: '我的应用一直崩溃,错误代码 500', userId: 'test_user_2' }, expected: { intent: 'technical_support', agent: 'Technical Support Agent', toolCalls: ['check_system_status', 'create_ticket'] } }, { name: '投诉升级', input: { message: '我已经等了三天了,这太糟糕了!', userId: 'test_user_3' }, expected: { intent: 'complaint', agent: 'Escalation Agent', escalated: true } } ]; // 运行测试 async function runTests() { for (const scenario of testScenarios) { const result = await callWorkflow(scenario.input); assert(result.intent === scenario.expected.intent); console.log(`✓ ${scenario.name} passed`); } }
10.2 性能优化
// 优化策略 const optimizations = { // 1. 缓存 caching: { ragResults: '5分钟', userProfiles: '10分钟', knowledgeBase: '1小时' }, // 2. 批处理 batching: { embeddings: '批量生成', dbWrites: '批量写入' }, // 3. 并行处理 parallel: { ragRetrieval: '并行检索', profileLoading: '并行加载' }, // 4. 连接池 pooling: { database: '10个连接', redis: '5个连接' } };
10.3 监控优化
// 性能基准 const performanceBenchmarks = { responseTime: { p50: '< 1s', p95: '< 3s', p99: '< 5s' }, throughput: { target: '100 req/s', peak: '500 req/s' }, availability: { target: '99.9%', downtime: '< 43min/month' }, accuracy: { intentRecognition: '> 95%', ragRelevance: '> 90%' } };
总结
核心成果
通过本项目,我们构建了一个生产级多 Agent 智能客服系统,包含:
- ✅ 多渠道输入 - Webhook、Email、Chat 统一处理
- ✅ 智能路由 - 意图识别和 Agent 自动分配
- ✅ RAG 知识库 - Supabase 向量检索
- ✅ 多 Agent 协作 - 客服、技术、升级三层体系
- ✅ 记忆管理 - 对话历史和用户画像
- ✅ 性能监控 - 完整的指标、日志、告警
技术亮点
Preparing diagram...
下一步
- 添加 WebSocket 支持实时对话
- 集成更多 AI 模型(Claude、Gemini)
- 实现 Agent 自学习机制
- 构建管理后台
- 添加多语言支持
恭喜完成本周学习! 🎉
你已经掌握了从基础到生产的完整 Agentic AI 开发流程。继续探索,构建更强大的 AI 系统!