本栏目专注于AI Agent系统的核心技术,包括:
- Agent架构设计 - 从ReAct到AutoGPT的架构演进
- 多Agent协作 - 分布式智能体系统的协同机制
- RAG技术 - 检索增强生成的实战应用
- LangChain框架 - 构建复杂Agent应用的最佳实践
- 记忆系统 - Agent的长短期记忆设计
- 知识图谱 - 结构化知识在Agent中的应用
本栏目专注于AI Agent系统的核心技术,包括:
不会写代码也想玩AI?Dify、Coze、FastGPT三大低代码平台让你拖拖拽拽就能搭建AI应用。但它们各有什么特点?哪个更适合你?这篇文章给你答案。
RAG是什么 一句话:先查资料,再回答问题。 大模型直接回答问题容易编造内容。RAG让它先从你的知识库里找到相关内容,再基于这些内容回答。 1 用户问题 → 搜索知识库 → 找到相关文档 → 喂给LLM → 生成答案 最简实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from langchain.vectorstores import Chroma from langchain.embeddings import OpenAIEmbeddings from langchain.chat_models import ChatOpenAI # 1. 把文档切块并存入向量数据库 docs = load_and_split_documents("./docs") vectorstore = Chroma.from_documents(docs, OpenAIEmbeddings()) # 2. 检索相关内容 retriever = vectorstore.as_retriever(k=3) relevant_docs = retriever.get_relevant_documents("什么是RAG?") # 3. 生成答案 llm = ChatOpenAI() answer = llm.invoke(f""" 根据以下内容回答问题: {relevant_docs} 问题:什么是RAG? """) 就这么简单。30行代码就能跑起来。 ...
开场:AI助手的「能力危机」 场景一:你问Claude 你:“帮我查一下公司数据库里上个月的销售数据” Claude:“抱歉,我无法直接访问数据库…” 场景二:你问ChatGPT 你:“读取我桌面上的report.pdf并总结” ChatGPT:“我无法访问您的本地文件…” 问题来了:这些AI明明这么聪明,为什么连最基本的「读文件」「查数据库」都做不到? 答案:不是它们不够聪明,而是缺少「工具」。 就像一个天才厨师,如果厨房里没有刀、锅、灶,也做不出美食。 第一章:MCP协议是什么? 1.1 一句话解释 MCP (Model Context Protocol) = AI模型的「USB接口标准」 就像USB让所有设备都能连接电脑一样,MCP让所有工具都能连接AI。 1.2 没有MCP之前的世界 每个AI应用都要自己实现工具集成: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 # 开发者A的实现 class ClaudeWithDatabase: def query_db(self, sql): # 自己写数据库连接逻辑 conn = psycopg2.connect(...) # 自己写SQL执行逻辑 cursor.execute(sql) # 自己写结果格式化 return format_results(...) # 开发者B的实现(完全不同) class GPTWithDatabase: def db_query(self, query): # 又要重新实现一遍 engine = create_engine(...) # 完全不同的接口 return engine.execute(query) 问题: ...
早上8:00 - 开工!今天又是「搬砖」的一天 当你还在挣扎要不要再赖床5分钟时,你的AI Agent已经开始工作了。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 # Agent的早晨例行任务 class MorningRoutine: def __init__(self): self.tasks = [] self.priority_queue = PriorityQueue() async def start_day(self): """开始新的一天""" # 1. 检查邮件,筛选重要信息 urgent_emails = await self.check_emails() # 2. 查看日历,准备今天的会议 meetings = await self.prepare_meetings() # 3. 扫描Slack/钉钉,看看有啥新消息 notifications = await self.scan_channels() # 4. 生成今日工作清单 return self.create_daily_plan( urgent_emails, meetings, notifications ) 真实场景: 某科技公司的产品经理小王,每天早上收到的邮件平均80封。自从用了AI Agent后,Agent会自动: ...
Agent的核心循环 一个Agent本质上在做这件事: 1 感知 → 思考 → 行动 → 反馈 → 继续思考... 用代码表示: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 while not done: # 1. 理解用户要什么 intent = understand(user_input) # 2. 想想怎么做 plan = think(intent, memory) # 3. 动手执行 result = act(plan, tools) # 4. 看看结果对不对 if verify(result): done = True else: memory.add(result) # 记住失败,下次改进 三个关键模块 1. 记忆系统 Agent和普通LLM调用的区别:Agent会记东西。 1 2 3 4 5 6 7 8 9 class Memory: short_term = [] # 当前对话历史 long_term = {} # 跨对话的知识 def remember(self, key, value): self.long_term[key] = value def recall(self, query): return search(self.long_term, query) 实际应用: ...
开场:一个神奇的对话 2025年某天,你和AI的对话: 你:[上传一张冰箱照片] 你:“帮我看看能做什么菜” AI:“我看到你冰箱里有:鸡蛋、西红柿、青椒、米饭… 推荐做番茄炒蛋盖饭!步骤如下…” 你:“等等,我不吃辣” AI:“好的,那把青椒换成黄瓜,做黄瓜炒蛋…” 这不是科幻,这是2025年的现实。 AI不仅能"看懂"你的冰箱,还能理解上下文、给出建议、甚至根据你的偏好调整方案。 这就是多模态AI的魔力。 第一章:什么是多模态AI? 1.1 从「单一感官」到「全感官」 传统AI(单模态): 1 2 3 4 5 6 7 # 只能处理文字 text_ai = GPT3() response = text_ai.chat("今天天气怎么样?") # ✅ 能回答 response = text_ai.chat("[图片: 窗外风景]") # ❌ 看不懂图片 多模态AI: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 # 能处理文字、图片、音频、视频 multimodal_ai = GPT4V() # 文字 ✅ response = multimodal_ai.chat("今天天气怎么样?") # 图片 ✅ response = multimodal_ai.chat( text="这是什么?", image="photo.jpg" ) # 音频 ✅ response = multimodal_ai.chat( text="这段音乐是什么风格?", audio="music.mp3" ) # 视频 ✅ response = multimodal_ai.chat( text="视频里的人在做什么?", video="video.mp4" ) 1.2 多模态的「模态」是什么? 模态(Modality) = 信息的表现形式 ...
引言 2025年,LangGraph正式发布1.0版本,成为构建生产级AI Agent的首选框架。作为LangChain生态系统的核心组件,LangGraph提供了图状态编排(Graph-based Orchestration)能力,支持Agent的循环、分支、回溯和动态决策。更重要的是,它内置了持久化执行(Durable Execution)、**检查点(Checkpointing)和人工干预(Human-in-the-Loop)**等企业级功能。本文将深入探讨LangGraph的概念、工作原理、应用场景以及实践技巧。 知识图谱与LangChain Graph基础 什么是知识图谱? 知识图谱(Knowledge Graph)是一种结构化数据模型,用于表示实体(Entities)之间的关系(Relations)。它以图的形式组织信息,其中: 节点(Nodes):代表实体或概念 边(Edges):代表实体间的关系 graph LR A["艾伦·图灵"] -->|"发明"| B["图灵机"] A -->|"出生于"| C["英国"] A -->|"被誉为"| D["计算机科学之父"] B -->|"是"| E["理论计算模型"] LangChain Graph的定义与价值 LangChain Graph是LangChain框架中专注于知识图谱构建、存储和查询的模块集合。它将LLM的自然语言处理能力与图数据库的结构化表示结合,实现了: 自动从文本中提取实体和关系 构建和维护知识图谱 基于图结构进行复杂查询和推理 增强LLM应用的上下文理解和回答质量 LangChain Graph架构 LangChain Graph的整体架构可以通过以下图示来理解: flowchart TB subgraph "输入层" A["文本文档"] --> B["网页内容"] C["结构化数据"] --> D["用户查询"] end subgraph "处理层" E["实体提取 EntityExtractor"] F["关系提取 RelationExtractor"] G["知识图谱构建 KnowledgeGraphCreator"] end subgraph "存储层" H["图数据库 Neo4j/NetworkX"] I["向量存储 VectorStores"] end subgraph "应用层" J["图查询 GraphQuery"] K["图推理 GraphReasoning"] L["QA系统 GraphQAChain"] end A --> E B --> E C --> F D --> F E --> G F --> G G --> H G --> I H --> J H --> K I --> L 核心组件详解 1. 实体和关系提取器 这些组件负责从文本中识别实体和它们之间的关系: ...
引言 记忆是智能的基石。一个没有记忆的Agent就像得了健忘症的助手,无法积累经验、学习模式或维持上下文。本文深入探讨如何为AI Agent构建高效的记忆系统。 1. 记忆系统架构 1.1 记忆类型分层 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 from enum import Enum from typing import Any, List, Dict, Optional import time class MemoryType(Enum): SENSORY = "sensory" # 感官记忆(<1秒) WORKING = "working" # 工作记忆(秒级) SHORT_TERM = "short_term" # 短期记忆(分钟级) LONG_TERM = "long_term" # 长期记忆(永久) EPISODIC = "episodic" # 情景记忆 SEMANTIC = "semantic" # 语义记忆 PROCEDURAL = "procedural" # 程序记忆 class MemorySystem: def __init__(self): self.sensory_buffer = SensoryMemory(capacity=10, ttl=1) self.working_memory = WorkingMemory(capacity=7) self.short_term_memory = ShortTermMemory(capacity=100, ttl=300) self.long_term_memory = LongTermMemory() self.episodic_memory = EpisodicMemory() self.semantic_memory = SemanticMemory() self.procedural_memory = ProceduralMemory() def process_input(self, input_data: Any): """处理输入信息的记忆流程""" # 1. 感官记忆 self.sensory_buffer.store(input_data) # 2. 注意力机制筛选 if self.is_important(input_data): # 3. 进入工作记忆 self.working_memory.add(input_data) # 4. 编码到短期记忆 encoded = self.encode_memory(input_data) self.short_term_memory.store(encoded) # 5. 巩固到长期记忆 if self.should_consolidate(encoded): self.consolidate_to_long_term(encoded) 1.2 工作记忆实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 class WorkingMemory: """Miller's Magic Number: 7±2""" def __init__(self, capacity: int = 7): self.capacity = capacity self.buffer = [] self.attention_weights = [] def add(self, item: Any): """添加项目到工作记忆""" if len(self.buffer) >= self.capacity: # 移除注意力权重最低的项 min_idx = self.attention_weights.index(min(self.attention_weights)) self.buffer.pop(min_idx) self.attention_weights.pop(min_idx) self.buffer.append(item) self.attention_weights.append(1.0) def update_attention(self, idx: int, weight_delta: float): """更新注意力权重""" if 0 <= idx < len(self.attention_weights): self.attention_weights[idx] += weight_delta # 归一化 total = sum(self.attention_weights) self.attention_weights = [w/total for w in self.attention_weights] def get_context(self) -> List[Any]: """获取当前工作记忆上下文""" # 按注意力权重排序 sorted_items = sorted( zip(self.buffer, self.attention_weights), key=lambda x: x[1], reverse=True ) return [item for item, _ in sorted_items] 2. 长期记忆管理 2.1 记忆编码与存储 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 import hashlib import numpy as np from dataclasses import dataclass from datetime import datetime @dataclass class Memory: id: str content: Any embedding: np.ndarray timestamp: float access_count: int = 0 importance: float = 0.5 last_accessed: float = None metadata: Dict = None class LongTermMemory: def __init__(self, vector_dim: int = 768): self.memories = {} self.embeddings = [] self.index = None # FAISS索引 self.vector_dim = vector_dim self._init_index() def _init_index(self): """初始化向量索引""" import faiss self.index = faiss.IndexFlatL2(self.vector_dim) def store(self, content: Any, importance: float = 0.5): """存储记忆""" # 生成唯一ID memory_id = self._generate_id(content) # 生成嵌入向量 embedding = self._encode_content(content) # 创建记忆对象 memory = Memory( id=memory_id, content=content, embedding=embedding, timestamp=time.time(), importance=importance, metadata=self._extract_metadata(content) ) # 存储 self.memories[memory_id] = memory self.index.add(np.array([embedding])) return memory_id def retrieve(self, query: Any, k: int = 5) -> List[Memory]: """检索相关记忆""" query_embedding = self._encode_content(query) # 向量检索 distances, indices = self.index.search( np.array([query_embedding]), k ) # 获取记忆对象 retrieved = [] for idx in indices[0]: if idx < len(self.memories): memory_id = list(self.memories.keys())[idx] memory = self.memories[memory_id] # 更新访问信息 memory.access_count += 1 memory.last_accessed = time.time() retrieved.append(memory) # 按相关性和重要性重排 return self._rerank_memories(retrieved, query) def _rerank_memories(self, memories: List[Memory], query: Any) -> List[Memory]: """重排记忆(考虑时间衰减、重要性等)""" current_time = time.time() scored_memories = [] for memory in memories: # 时间衰减因子 time_decay = np.exp(-(current_time - memory.timestamp) / 86400) # 日衰减 # 访问频率因子 access_factor = np.log1p(memory.access_count) / 10 # 综合得分 score = ( 0.4 * memory.importance + 0.3 * time_decay + 0.3 * access_factor ) scored_memories.append((memory, score)) # 按得分排序 scored_memories.sort(key=lambda x: x[1], reverse=True) return [memory for memory, _ in scored_memories] 2.2 记忆巩固机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 class MemoryConsolidation: def __init__(self, consolidation_threshold: float = 0.7): self.threshold = consolidation_threshold self.consolidation_queue = [] def evaluate_for_consolidation(self, memory: Memory) -> bool: """评估是否需要巩固到长期记忆""" # 重要性评分 importance_score = memory.importance # 重复接触评分 repetition_score = min(1.0, memory.access_count / 5) # 情感强度评分 emotion_score = self._evaluate_emotional_intensity(memory.content) # 新颖性评分 novelty_score = self._evaluate_novelty(memory.content) # 综合评分 consolidation_score = ( 0.3 * importance_score + 0.2 * repetition_score + 0.3 * emotion_score + 0.2 * novelty_score ) return consolidation_score >= self.threshold def consolidate(self, short_term_memories: List[Memory]) -> List[Memory]: """巩固短期记忆到长期记忆""" consolidated = [] for memory in short_term_memories: if self.evaluate_for_consolidation(memory): # 增强记忆编码 enhanced_memory = self._enhance_memory(memory) # 创建关联连接 self._create_associations(enhanced_memory, consolidated) consolidated.append(enhanced_memory) return consolidated def _enhance_memory(self, memory: Memory) -> Memory: """增强记忆编码(添加更多细节和关联)""" # 提取关键概念 concepts = self._extract_concepts(memory.content) # 生成记忆摘要 summary = self._generate_summary(memory.content) # 更新元数据 memory.metadata.update({ "concepts": concepts, "summary": summary, "consolidation_time": time.time() }) return memory 3. 情景记忆系统 3.1 情景记忆结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 @dataclass class Episode: id: str start_time: float end_time: float events: List[Dict] context: Dict outcome: Any emotional_valence: float # -1到1,负面到正面 class EpisodicMemory: def __init__(self): self.episodes = {} self.current_episode = None self.episode_index = {} # 用于快速检索 def start_episode(self, context: Dict): """开始新情景""" episode_id = f"ep_{int(time.time() * 1000)}" self.current_episode = Episode( id=episode_id, start_time=time.time(), end_time=None, events=[], context=context, outcome=None, emotional_valence=0 ) return episode_id def add_event(self, event: Dict): """向当前情景添加事件""" if self.current_episode: event["timestamp"] = time.time() self.current_episode.events.append(event) # 更新情感效价 if "emotion" in event: self._update_emotional_valence(event["emotion"]) def end_episode(self, outcome: Any): """结束当前情景""" if self.current_episode: self.current_episode.end_time = time.time() self.current_episode.outcome = outcome # 存储情景 self.episodes[self.current_episode.id] = self.current_episode # 建立索引 self._index_episode(self.current_episode) # 重置当前情景 self.current_episode = None def recall_similar_episodes(self, query_context: Dict, k: int = 3) -> List[Episode]: """回忆相似情景""" similar_episodes = [] for episode in self.episodes.values(): similarity = self._calculate_context_similarity( query_context, episode.context ) similar_episodes.append((episode, similarity)) # 排序并返回top-k similar_episodes.sort(key=lambda x: x[1], reverse=True) return [ep for ep, _ in similar_episodes[:k]] def extract_patterns(self) -> Dict: """从情景中提取行为模式""" patterns = { "successful_patterns": [], "failure_patterns": [], "emotional_triggers": [] } for episode in self.episodes.values(): # 分析成功模式 if self._is_successful_outcome(episode.outcome): pattern = self._extract_action_sequence(episode) patterns["successful_patterns"].append(pattern) # 分析失败模式 elif self._is_failure_outcome(episode.outcome): pattern = self._extract_action_sequence(episode) patterns["failure_patterns"].append(pattern) # 分析情感触发器 if abs(episode.emotional_valence) > 0.5: trigger = self._identify_emotional_trigger(episode) patterns["emotional_triggers"].append(trigger) return patterns 3.2 情景压缩与摘要 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 class EpisodeCompression: def __init__(self, compression_ratio: float = 0.3): self.compression_ratio = compression_ratio def compress_episode(self, episode: Episode) -> Dict: """压缩情景为摘要""" # 识别关键事件 key_events = self._identify_key_events(episode.events) # 提取转折点 turning_points = self._find_turning_points(episode.events) # 生成叙事摘要 narrative = self._generate_narrative( key_events, turning_points, episode.outcome ) compressed = { "id": episode.id, "duration": episode.end_time - episode.start_time, "key_events": key_events, "turning_points": turning_points, "narrative": narrative, "outcome": episode.outcome, "emotional_arc": self._extract_emotional_arc(episode) } return compressed def _identify_key_events(self, events: List[Dict]) -> List[Dict]: """识别关键事件""" if len(events) <= 5: return events # 计算事件重要性 event_scores = [] for event in events: score = self._calculate_event_importance(event) event_scores.append((event, score)) # 选择top事件 event_scores.sort(key=lambda x: x[1], reverse=True) num_key_events = max(3, int(len(events) * self.compression_ratio)) key_events = [event for event, _ in event_scores[:num_key_events]] # 保持时间顺序 key_events.sort(key=lambda x: x["timestamp"]) return key_events 4. 语义记忆网络 4.1 概念网络构建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 import networkx as nx class SemanticMemory: def __init__(self): self.concept_graph = nx.DiGraph() self.concept_embeddings = {} self.relation_types = [ "is_a", "part_of", "has_property", "causes", "prevents", "related_to" ] def add_concept(self, concept: str, properties: Dict = None): """添加概念节点""" if concept not in self.concept_graph: self.concept_graph.add_node( concept, properties=properties or {}, activation=0.0, last_activated=None ) # 生成概念嵌入 self.concept_embeddings[concept] = self._embed_concept(concept) def add_relation(self, concept1: str, relation: str, concept2: str, strength: float = 1.0): """添加概念关系""" self.add_concept(concept1) self.add_concept(concept2) self.concept_graph.add_edge( concept1, concept2, relation=relation, strength=strength, created_at=time.time() ) def activate_concept(self, concept: str, activation: float = 1.0): """激活概念(扩散激活)""" if concept not in self.concept_graph: return # 设置初始激活 self.concept_graph.nodes[concept]["activation"] = activation self.concept_graph.nodes[concept]["last_activated"] = time.time() # 扩散激活 self._spread_activation(concept, activation, decay=0.5, depth=3) def _spread_activation(self, source: str, activation: float, decay: float, depth: int): """扩散激活算法""" if depth <= 0 or activation < 0.1: return # 激活相邻节点 for neighbor in self.concept_graph.neighbors(source): edge_data = self.concept_graph[source][neighbor] spread_activation = activation * edge_data["strength"] * decay current_activation = self.concept_graph.nodes[neighbor].get("activation", 0) new_activation = current_activation + spread_activation self.concept_graph.nodes[neighbor]["activation"] = min(1.0, new_activation) # 递归扩散 self._spread_activation(neighbor, spread_activation, decay, depth - 1) def query_concepts(self, query: str, k: int = 5) -> List[str]: """查询相关概念""" # 激活查询相关概念 query_concepts = self._extract_concepts_from_text(query) for concept in query_concepts: self.activate_concept(concept) # 获取激活度最高的概念 activated_concepts = [ (node, data["activation"]) for node, data in self.concept_graph.nodes(data=True) if data["activation"] > 0 ] activated_concepts.sort(key=lambda x: x[1], reverse=True) return [concept for concept, _ in activated_concepts[:k]] 4.2 知识推理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 class SemanticReasoning: def __init__(self, semantic_memory: SemanticMemory): self.semantic_memory = semantic_memory def infer_relations(self, concept1: str, concept2: str) -> List[Dict]: """推理两个概念之间的关系""" inferences = [] # 直接关系 if self.semantic_memory.concept_graph.has_edge(concept1, concept2): edge_data = self.semantic_memory.concept_graph[concept1][concept2] inferences.append({ "type": "direct", "relation": edge_data["relation"], "confidence": edge_data["strength"] }) # 传递关系 try: paths = list(nx.all_simple_paths( self.semantic_memory.concept_graph, concept1, concept2, cutoff=3 )) for path in paths: if len(path) > 2: inference = self._analyze_path(path) inferences.append(inference) except nx.NetworkXNoPath: pass # 类比推理 analogies = self._find_analogies(concept1, concept2) inferences.extend(analogies) return inferences def _find_analogies(self, concept1: str, concept2: str) -> List[Dict]: """查找类比关系""" analogies = [] # 获取concept1的关系模式 patterns1 = self._get_relation_patterns(concept1) # 查找相似模式 for node in self.semantic_memory.concept_graph.nodes(): if node != concept1: patterns = self._get_relation_patterns(node) similarity = self._pattern_similarity(patterns1, patterns) if similarity > 0.7: analogies.append({ "type": "analogy", "base": concept1, "target": node, "mapped_to": concept2, "confidence": similarity }) return analogies 5. 程序记忆系统 5.1 技能学习 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 @dataclass class Skill: name: str steps: List[Dict] preconditions: List[str] postconditions: List[str] success_rate: float = 0.0 execution_count: int = 0 class ProceduralMemory: def __init__(self): self.skills = {} self.skill_hierarchy = nx.DiGraph() self.execution_history = [] def learn_skill(self, demonstration: List[Dict]) -> Skill: """从演示中学习技能""" # 提取动作序列 action_sequence = self._extract_actions(demonstration) # 识别前置和后置条件 preconditions = self._identify_preconditions(demonstration[0]) postconditions = self._identify_postconditions(demonstration[-1]) # 创建技能 skill = Skill( name=self._generate_skill_name(action_sequence), steps=action_sequence, preconditions=preconditions, postconditions=postconditions ) # 存储技能 self.skills[skill.name] = skill # 更新技能层次 self._update_skill_hierarchy(skill) return skill def execute_skill(self, skill_name: str, context: Dict) -> Dict: """执行技能""" if skill_name not in self.skills: return {"success": False, "error": "Skill not found"} skill = self.skills[skill_name] # 检查前置条件 if not self._check_preconditions(skill.preconditions, context): return {"success": False, "error": "Preconditions not met"} # 执行步骤 result = {"success": True, "steps_executed": []} for step in skill.steps: step_result = self._execute_step(step, context) result["steps_executed"].append(step_result) if not step_result["success"]: result["success"] = False result["error"] = f"Failed at step: {step}" break # 更新技能统计 skill.execution_count += 1 if result["success"]: skill.success_rate = ( (skill.success_rate * (skill.execution_count - 1) + 1) / skill.execution_count ) else: skill.success_rate = ( (skill.success_rate * (skill.execution_count - 1)) / skill.execution_count ) # 记录执行历史 self.execution_history.append({ "skill": skill_name, "context": context, "result": result, "timestamp": time.time() }) return result def compose_skills(self, goal: str) -> List[str]: """组合技能以达成目标""" # 查找能达成目标的技能序列 relevant_skills = self._find_relevant_skills(goal) # 规划技能执行顺序 skill_plan = self._plan_skill_sequence(relevant_skills, goal) return skill_plan 5.2 技能优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 class SkillOptimizer: def __init__(self, procedural_memory: ProceduralMemory): self.procedural_memory = procedural_memory def optimize_skill(self, skill_name: str): """优化技能执行""" skill = self.procedural_memory.skills.get(skill_name) if not skill: return # 分析执行历史 history = [ h for h in self.procedural_memory.execution_history if h["skill"] == skill_name ] # 识别失败模式 failure_patterns = self._identify_failure_patterns(history) # 优化步骤 optimized_steps = self._optimize_steps( skill.steps, failure_patterns ) # 创建优化版本 optimized_skill = Skill( name=f"{skill_name}_optimized", steps=optimized_steps, preconditions=skill.preconditions, postconditions=skill.postconditions ) self.procedural_memory.skills[optimized_skill.name] = optimized_skill return optimized_skill def _identify_failure_patterns(self, history: List[Dict]) -> List[Dict]: """识别失败模式""" failures = [h for h in history if not h["result"]["success"]] patterns = [] for failure in failures: failed_step = failure["result"].get("error", "") context = failure["context"] pattern = { "step": failed_step, "context_conditions": self._extract_conditions(context), "frequency": 1 } # 合并相似模式 merged = False for existing_pattern in patterns: if self._patterns_similar(pattern, existing_pattern): existing_pattern["frequency"] += 1 merged = True break if not merged: patterns.append(pattern) return patterns 6. 记忆检索优化 6.1 上下文感知检索 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 class ContextAwareRetrieval: def __init__(self, memory_system: MemorySystem): self.memory_system = memory_system self.context_window = 10 # 考虑最近10个交互 def retrieve(self, query: str, context: List[Dict]) -> List[Memory]: """上下文感知的记忆检索""" # 1. 提取上下文特征 context_features = self._extract_context_features(context) # 2. 扩展查询 expanded_query = self._expand_query_with_context(query, context_features) # 3. 多源检索 candidates = [] # 从长期记忆检索 ltm_results = self.memory_system.long_term_memory.retrieve( expanded_query, k=10 ) candidates.extend(ltm_results) # 从情景记忆检索 episodes = self.memory_system.episodic_memory.recall_similar_episodes( context_features, k=3 ) for episode in episodes: candidates.extend(self._extract_memories_from_episode(episode)) # 从语义记忆检索 concepts = self.memory_system.semantic_memory.query_concepts( query, k=5 ) for concept in concepts: candidates.extend(self._get_concept_memories(concept)) # 4. 重排和去重 unique_memories = self._deduplicate_memories(candidates) ranked_memories = self._rank_by_relevance( unique_memories, query, context_features ) return ranked_memories[:5] def _rank_by_relevance(self, memories: List[Memory], query: str, context: Dict) -> List[Memory]: """按相关性排序记忆""" scored_memories = [] for memory in memories: # 查询相关性 query_relevance = self._calculate_similarity( memory.content, query ) # 上下文相关性 context_relevance = self._calculate_context_relevance( memory, context ) # 时间相关性 time_relevance = self._calculate_time_relevance(memory) # 综合评分 score = ( 0.4 * query_relevance + 0.3 * context_relevance + 0.2 * time_relevance + 0.1 * memory.importance ) scored_memories.append((memory, score)) scored_memories.sort(key=lambda x: x[1], reverse=True) return [memory for memory, _ in scored_memories] 6.2 记忆链构建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 class MemoryChain: def __init__(self): self.chain_graph = nx.DiGraph() def build_memory_chain(self, seed_memory: Memory, memory_pool: List[Memory], max_length: int = 5) -> List[Memory]: """构建记忆链""" chain = [seed_memory] current = seed_memory while len(chain) < max_length: # 找到最相关的下一个记忆 next_memory = self._find_next_memory( current, memory_pool, chain ) if next_memory is None: break chain.append(next_memory) current = next_memory return chain def _find_next_memory(self, current: Memory, candidates: List[Memory], chain: List[Memory]) -> Optional[Memory]: """找到链中的下一个记忆""" best_memory = None best_score = -1 for candidate in candidates: if candidate in chain: continue # 计算连接强度 connection_strength = self._calculate_connection_strength( current, candidate ) # 计算多样性奖励 diversity_bonus = self._calculate_diversity_bonus( candidate, chain ) score = connection_strength + 0.2 * diversity_bonus if score > best_score: best_score = score best_memory = candidate return best_memory if best_score > 0.3 else None 7. 记忆更新与遗忘 7.1 自适应遗忘 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 class AdaptiveForgetting: def __init__(self, base_decay_rate: float = 0.01): self.base_decay_rate = base_decay_rate def update_memories(self, memories: Dict[str, Memory]): """更新记忆(包括遗忘)""" current_time = time.time() to_forget = [] for memory_id, memory in memories.items(): # 计算遗忘曲线 time_since_access = current_time - (memory.last_accessed or memory.timestamp) time_since_creation = current_time - memory.timestamp # Ebbinghaus遗忘曲线 retention = self._calculate_retention( time_since_access, memory.access_count, memory.importance ) # 更新记忆强度 memory.strength = retention # 标记需要遗忘的记忆 if retention < 0.1 and time_since_creation > 86400: # 24小时 to_forget.append(memory_id) # 执行遗忘 for memory_id in to_forget: self._forget_memory(memories, memory_id) def _calculate_retention(self, time_elapsed: float, access_count: int, importance: float) -> float: """计算记忆保持率""" # 基础遗忘率 base_retention = np.exp(-self.base_decay_rate * time_elapsed / 3600) # 重复强化因子 repetition_factor = 1 + np.log1p(access_count) * 0.1 # 重要性调节 importance_factor = 1 + importance * 0.5 # 最终保持率 retention = min(1.0, base_retention * repetition_factor * importance_factor) return retention def _forget_memory(self, memories: Dict, memory_id: str): """遗忘记忆(不是删除,而是转为痕迹)""" memory = memories[memory_id] # 保留痕迹 trace = { "id": memory_id, "summary": self._create_summary(memory.content), "timestamp": memory.timestamp, "importance": memory.importance * 0.1 } # 存储痕迹(可以用于后续的重建) self._store_trace(trace) # 从活跃记忆中移除 del memories[memory_id] 8. 记忆系统集成 8.1 统一记忆接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 class UnifiedMemoryInterface: def __init__(self): self.memory_system = MemorySystem() self.retrieval = ContextAwareRetrieval(self.memory_system) self.forgetting = AdaptiveForgetting() async def remember(self, content: Any, memory_type: MemoryType = None): """记住信息""" # 自动判断记忆类型 if memory_type is None: memory_type = self._infer_memory_type(content) if memory_type == MemoryType.EPISODIC: self.memory_system.episodic_memory.add_event(content) elif memory_type == MemoryType.SEMANTIC: concepts = self._extract_concepts(content) for concept in concepts: self.memory_system.semantic_memory.add_concept(concept) elif memory_type == MemoryType.PROCEDURAL: if self._is_skill_demonstration(content): self.memory_system.procedural_memory.learn_skill(content) else: # 默认存储到长期记忆 self.memory_system.long_term_memory.store(content) async def recall(self, query: str, context: List[Dict] = None) -> List[Any]: """回忆信息""" # 并行从各种记忆类型检索 tasks = [ self._recall_from_ltm(query), self._recall_from_episodic(query, context), self._recall_from_semantic(query), self._recall_from_procedural(query) ] results = await asyncio.gather(*tasks) # 合并和排序结果 all_memories = [] for result in results: all_memories.extend(result) # 去重和排序 unique_memories = self._deduplicate(all_memories) ranked_memories = self._rank_memories(unique_memories, query) return ranked_memories[:10] def reflect(self) -> Dict: """反思和总结记忆""" reflection = { "patterns": self.memory_system.episodic_memory.extract_patterns(), "important_concepts": self._get_important_concepts(), "skill_improvements": self._suggest_skill_improvements(), "memory_statistics": self._get_memory_stats() } return reflection 9. 最佳实践 分层存储:根据访问频率和重要性分层 智能遗忘:模拟人类遗忘曲线 关联构建:自动构建记忆间的关联 上下文感知:考虑当前上下文进行检索 持续学习:从交互中不断优化记忆系统 压缩策略:定期压缩和总结记忆 结论 一个优秀的Agent记忆系统不仅要能存储和检索信息,还要能够学习、关联、遗忘和总结。通过模拟人类记忆的多层次结构和处理机制,我们可以构建出更智能、更有"记忆"的AI Agent。 ...
引言 单个Agent虽然强大,但面对复杂任务时往往力不从心。多Agent系统通过协作分工,能够处理更复杂的问题,实现1+1>2的效果。本文深入探讨多Agent系统的设计原理、协作机制和实践案例。 1. 多Agent系统架构 graph TB subgraph "多Agent系统拓扑" subgraph "层次结构" M[管理Agent] M --> A1[执行Agent1] M --> A2[执行Agent2] M --> A3[执行Agent3] end subgraph "对等网络" P1[Agent1] <--> P2[Agent2] P2 <--> P3[Agent3] P3 <--> P1 end subgraph "黑板模式" BB[共享黑板] B1[Agent1] --> BB B2[Agent2] --> BB B3[Agent3] --> BB BB --> B1 BB --> B2 BB --> B3 end end style M fill:#ffeb3b,stroke:#f57c00,stroke-width:2px style BB fill:#e1f5fe,stroke:#01579b,stroke-width:2px 1.1 系统拓扑结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from enum import Enum from typing import List, Dict, Any class TopologyType(Enum): HIERARCHICAL = "hierarchical" # 层次结构 PEER_TO_PEER = "p2p" # 对等网络 BLACKBOARD = "blackboard" # 黑板模式 PIPELINE = "pipeline" # 流水线 HYBRID = "hybrid" # 混合模式 class MultiAgentSystem: def __init__(self, topology: TopologyType): self.topology = topology self.agents = {} self.communication_channels = {} self.shared_memory = {} def add_agent(self, agent_id: str, agent: Any): """添加Agent到系统""" self.agents[agent_id] = agent self.setup_communication(agent_id) def setup_communication(self, agent_id: str): """设置通信通道""" if self.topology == TopologyType.HIERARCHICAL: self._setup_hierarchical_comm(agent_id) elif self.topology == TopologyType.PEER_TO_PEER: self._setup_p2p_comm(agent_id) 1.2 Agent角色定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class AgentRole(Enum): COORDINATOR = "coordinator" # 协调者 EXECUTOR = "executor" # 执行者 VALIDATOR = "validator" # 验证者 MONITOR = "monitor" # 监控者 SPECIALIST = "specialist" # 专家 class BaseAgent: def __init__(self, agent_id: str, role: AgentRole, capabilities: List[str]): self.agent_id = agent_id self.role = role self.capabilities = capabilities self.message_queue = [] self.state = AgentState.IDLE async def receive_message(self, message: Dict): """接收消息""" self.message_queue.append(message) await self.process_message(message) async def send_message(self, recipient: str, content: Dict): """发送消息""" message = { "sender": self.agent_id, "recipient": recipient, "content": content, "timestamp": time.time() } await self.communication_channel.send(message) 2. 通信协议设计 2.1 消息格式定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 from dataclasses import dataclass from typing import Optional, Any @dataclass class Message: sender_id: str recipient_id: str message_type: str # REQUEST, RESPONSE, BROADCAST, etc. content: Any conversation_id: str timestamp: float priority: int = 0 requires_response: bool = False class MessageProtocol: """消息协议定义""" # 消息类型 REQUEST = "REQUEST" RESPONSE = "RESPONSE" BROADCAST = "BROADCAST" SUBSCRIBE = "SUBSCRIBE" UNSUBSCRIBE = "UNSUBSCRIBE" HEARTBEAT = "HEARTBEAT" # 内容格式 @staticmethod def create_task_request(task: str, requirements: Dict) -> Dict: return { "type": MessageProtocol.REQUEST, "task": task, "requirements": requirements, "deadline": None } @staticmethod def create_capability_announcement(capabilities: List[str]) -> Dict: return { "type": MessageProtocol.BROADCAST, "capabilities": capabilities, "availability": True } 2.2 通信中间件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import asyncio from collections import defaultdict class CommunicationMiddleware: def __init__(self): self.subscribers = defaultdict(list) self.message_buffer = asyncio.Queue() self.routing_table = {} async def publish(self, topic: str, message: Message): """发布消息到主题""" subscribers = self.subscribers.get(topic, []) tasks = [] for subscriber in subscribers: task = asyncio.create_task( subscriber.receive_message(message) ) tasks.append(task) await asyncio.gather(*tasks) def subscribe(self, topic: str, agent: BaseAgent): """订阅主题""" self.subscribers[topic].append(agent) async def route_message(self, message: Message): """路由消息""" if message.recipient_id == "BROADCAST": await self.broadcast(message) else: recipient = self.routing_table.get(message.recipient_id) if recipient: await recipient.receive_message(message) 3. 任务分配与协调 3.1 任务分解策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 class TaskDecomposer: def __init__(self, llm): self.llm = llm def decompose_task(self, task: str) -> List[Dict]: """将复杂任务分解为子任务""" prompt = f""" 将以下任务分解为独立的子任务: 任务:{task} 要求: 1. 每个子任务应该是独立可执行的 2. 标注每个子任务所需的能力 3. 标注子任务之间的依赖关系 输出JSON格式: {{ "subtasks": [ {{ "id": "task_1", "description": "...", "required_capabilities": [...], "dependencies": [], "estimated_time": 0 }} ] }} """ response = self.llm.invoke(prompt) return json.loads(response)["subtasks"] class TaskCoordinator: def __init__(self, agents: Dict[str, BaseAgent]): self.agents = agents self.task_queue = asyncio.Queue() self.task_assignments = {} async def assign_task(self, task: Dict): """分配任务给合适的Agent""" # 找到具备所需能力的Agent capable_agents = self.find_capable_agents( task["required_capabilities"] ) if not capable_agents: return None # 选择最优Agent(考虑负载均衡) selected_agent = self.select_optimal_agent(capable_agents) # 分配任务 self.task_assignments[task["id"]] = selected_agent.agent_id await selected_agent.receive_message( MessageProtocol.create_task_request( task["description"], task.get("requirements", {}) ) ) return selected_agent.agent_id def find_capable_agents(self, required_capabilities: List[str]): """查找具备所需能力的Agent""" capable = [] for agent in self.agents.values(): if all(cap in agent.capabilities for cap in required_capabilities): capable.append(agent) return capable 3.2 协商机制 sequenceDiagram participant C as 协调Agent participant A1 as Agent1 participant A2 as Agent2 participant A3 as Agent3 C->>A1: 任务公告 C->>A2: 任务公告 C->>A3: 任务公告 Note over A1,A3: 评估任务能力 A1-->>C: 投标(成本:10, 时间:5min) A2-->>C: 投标(成本:8, 时间:7min) A3-->>C: 不投标 Note over C: 评估投标 C->>A2: 授予合同 A2->>C: 接受合同 Note over A2: 执行任务 A2->>C: 任务完成报告 C->>A2: 确认完成 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 class ContractNetProtocol: """合同网协议实现""" def __init__(self, coordinator: BaseAgent): self.coordinator = coordinator self.bids = {} async def announce_task(self, task: Dict): """发布任务公告""" announcement = { "type": "TASK_ANNOUNCEMENT", "task": task, "deadline": time.time() + 10 # 10秒投标期 } # 广播任务 await self.coordinator.send_message( "BROADCAST", announcement ) # 等待投标 await asyncio.sleep(10) # 选择中标者 winner = self.select_winner() if winner: await self.award_contract(winner, task) async def submit_bid(self, agent_id: str, bid: Dict): """提交投标""" self.bids[agent_id] = { "cost": bid.get("cost", float('inf')), "time": bid.get("estimated_time", float('inf')), "confidence": bid.get("confidence", 0) } def select_winner(self) -> Optional[str]: """选择中标Agent""" if not self.bids: return None # 综合评分(可自定义权重) best_agent = None best_score = float('-inf') for agent_id, bid in self.bids.items(): score = ( bid["confidence"] * 0.5 - bid["cost"] * 0.3 - bid["time"] * 0.2 ) if score > best_score: best_score = score best_agent = agent_id return best_agent 4. 知识共享机制 4.1 黑板系统 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 class BlackboardSystem: """黑板系统实现""" def __init__(self): self.blackboard = {} self.knowledge_sources = [] self.controller = None self.locks = {} async def write(self, key: str, value: Any, agent_id: str): """写入知识""" async with self.get_lock(key): self.blackboard[key] = { "value": value, "author": agent_id, "timestamp": time.time(), "version": self.get_version(key) + 1 } # 通知订阅者 await self.notify_subscribers(key, value) async def read(self, key: str) -> Any: """读取知识""" entry = self.blackboard.get(key) return entry["value"] if entry else None def subscribe(self, pattern: str, callback): """订阅知识更新""" self.knowledge_sources.append({ "pattern": pattern, "callback": callback }) async def notify_subscribers(self, key: str, value: Any): """通知订阅者""" for source in self.knowledge_sources: if self.match_pattern(key, source["pattern"]): await source["callback"](key, value) 4.2 知识图谱共享 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 from typing import Tuple import networkx as nx class SharedKnowledgeGraph: def __init__(self): self.graph = nx.DiGraph() self.embeddings = {} # 节点嵌入 def add_knowledge(self, subject: str, predicate: str, object: str, agent_id: str, confidence: float = 1.0): """添加知识三元组""" # 添加节点 if subject not in self.graph: self.graph.add_node(subject, type="entity") if object not in self.graph: self.graph.add_node(object, type="entity") # 添加边 self.graph.add_edge( subject, object, predicate=predicate, contributor=agent_id, confidence=confidence, timestamp=time.time() ) def query(self, query_type: str, **kwargs) -> List: """查询知识""" if query_type == "neighbors": node = kwargs.get("node") return list(self.graph.neighbors(node)) elif query_type == "path": source = kwargs.get("source") target = kwargs.get("target") try: path = nx.shortest_path(self.graph, source, target) return path except nx.NetworkXNoPath: return [] elif query_type == "subgraph": nodes = kwargs.get("nodes", []) return self.graph.subgraph(nodes) def merge_knowledge(self, other_graph: 'SharedKnowledgeGraph'): """合并其他Agent的知识""" for edge in other_graph.graph.edges(data=True): source, target, data = edge existing_edge = self.graph.get_edge_data(source, target) if existing_edge: # 更新置信度(加权平均) new_confidence = ( existing_edge["confidence"] + data["confidence"] ) / 2 self.graph[source][target]["confidence"] = new_confidence else: self.add_knowledge( source, data["predicate"], target, data["contributor"], data["confidence"] ) 5. 冲突解决机制 5.1 投票机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 class VotingMechanism: def __init__(self, voting_type: str = "majority"): self.voting_type = voting_type self.votes = {} async def collect_votes(self, issue: str, options: List[str], voters: List[BaseAgent]): """收集投票""" self.votes[issue] = defaultdict(list) # 并行收集投票 tasks = [] for voter in voters: task = asyncio.create_task( self.get_vote(voter, issue, options) ) tasks.append(task) votes = await asyncio.gather(*tasks) # 统计投票 for voter, vote in zip(voters, votes): self.votes[issue][vote].append(voter.agent_id) return self.determine_winner(issue) async def get_vote(self, voter: BaseAgent, issue: str, options: List[str]) -> str: """获取单个Agent的投票""" vote_request = { "type": "VOTE_REQUEST", "issue": issue, "options": options } response = await voter.process_vote_request(vote_request) return response["vote"] def determine_winner(self, issue: str) -> str: """确定获胜选项""" vote_counts = { option: len(voters) for option, voters in self.votes[issue].items() } if self.voting_type == "majority": return max(vote_counts, key=vote_counts.get) elif self.voting_type == "unanimous": if len(vote_counts) == 1: return list(vote_counts.keys())[0] return None 5.2 协商与妥协 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 class NegotiationProtocol: def __init__(self): self.negotiation_history = [] async def negotiate(self, agents: List[BaseAgent], issue: Dict): """多轮协商""" max_rounds = 5 current_round = 0 while current_round < max_rounds: proposals = await self.collect_proposals(agents, issue) # 评估提案 evaluations = await self.evaluate_proposals( agents, proposals ) # 检查是否达成共识 consensus = self.check_consensus(evaluations) if consensus: return consensus # 生成反提案 issue = self.generate_counter_proposal(evaluations) current_round += 1 # 未达成共识,使用仲裁 return await self.arbitrate(agents, issue) async def collect_proposals(self, agents: List[BaseAgent], issue: Dict) -> List[Dict]: """收集提案""" proposals = [] for agent in agents: proposal = await agent.generate_proposal(issue) proposals.append({ "agent_id": agent.agent_id, "proposal": proposal }) return proposals 6. 实际应用案例 6.1 软件开发团队 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 class SoftwareDevelopmentTeam: def __init__(self): self.product_manager = self.create_pm_agent() self.architect = self.create_architect_agent() self.developers = self.create_developer_agents(3) self.qa_engineer = self.create_qa_agent() self.devops = self.create_devops_agent() def create_pm_agent(self): return Agent( agent_id="pm_001", role=AgentRole.COORDINATOR, capabilities=["requirement_analysis", "planning"], llm=ChatOpenAI(model="gpt-4") ) def create_architect_agent(self): return Agent( agent_id="architect_001", role=AgentRole.SPECIALIST, capabilities=["system_design", "tech_selection"], llm=ChatOpenAI(model="gpt-4") ) async def develop_feature(self, feature_request: str): """开发新功能的完整流程""" # 1. PM分析需求 requirements = await self.product_manager.analyze_requirements( feature_request ) # 2. 架构师设计系统 design = await self.architect.create_design(requirements) # 3. 分配开发任务 tasks = self.decompose_development_tasks(design) development_results = await self.parallel_development( tasks, self.developers ) # 4. QA测试 test_results = await self.qa_engineer.test_feature( development_results ) # 5. DevOps部署 if test_results["passed"]: deployment = await self.devops.deploy(development_results) return deployment else: # 返回开发阶段修复bug return await self.fix_bugs(test_results["issues"]) 6.2 研究分析团队 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 class ResearchTeam: def __init__(self): self.lead_researcher = Agent( "lead_001", AgentRole.COORDINATOR, ["research_planning", "synthesis"] ) self.data_collectors = [ Agent(f"collector_{i}", AgentRole.EXECUTOR, ["web_search", "data_extraction"]) for i in range(3) ] self.analysts = [ Agent(f"analyst_{i}", AgentRole.SPECIALIST, ["data_analysis", "visualization"]) for i in range(2) ] self.fact_checker = Agent( "checker_001", AgentRole.VALIDATOR, ["fact_checking", "source_verification"] ) async def conduct_research(self, topic: str): """执行研究项目""" # 1. 制定研究计划 research_plan = await self.lead_researcher.create_plan(topic) # 2. 并行数据收集 data_collection_tasks = [] for collector in self.data_collectors: task = asyncio.create_task( collector.collect_data(research_plan["queries"]) ) data_collection_tasks.append(task) raw_data = await asyncio.gather(*data_collection_tasks) # 3. 事实核查 verified_data = await self.fact_checker.verify_data(raw_data) # 4. 数据分析 analysis_results = [] for analyst in self.analysts: result = await analyst.analyze(verified_data) analysis_results.append(result) # 5. 综合报告 final_report = await self.lead_researcher.synthesize( analysis_results ) return final_report 7. 性能优化 7.1 负载均衡 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 class LoadBalancer: def __init__(self, agents: List[BaseAgent]): self.agents = agents self.agent_loads = {agent.agent_id: 0 for agent in agents} self.agent_performance = { agent.agent_id: {"avg_time": 0, "success_rate": 1.0} for agent in agents } def select_agent(self, task: Dict) -> BaseAgent: """选择负载最低的Agent""" # 计算综合得分 scores = {} for agent in self.agents: if self.is_capable(agent, task): load_score = 1 / (1 + self.agent_loads[agent.agent_id]) perf_score = self.agent_performance[agent.agent_id]["success_rate"] scores[agent.agent_id] = load_score * perf_score if not scores: return None # 选择得分最高的Agent best_agent_id = max(scores, key=scores.get) selected_agent = next( a for a in self.agents if a.agent_id == best_agent_id ) # 更新负载 self.agent_loads[best_agent_id] += 1 return selected_agent def update_performance(self, agent_id: str, execution_time: float, success: bool): """更新Agent性能指标""" perf = self.agent_performance[agent_id] # 更新平均执行时间(指数移动平均) alpha = 0.3 perf["avg_time"] = ( alpha * execution_time + (1 - alpha) * perf["avg_time"] ) # 更新成功率 perf["success_rate"] = ( perf["success_rate"] * 0.9 + (1.0 if success else 0.0) * 0.1 ) # 更新负载 self.agent_loads[agent_id] = max(0, self.agent_loads[agent_id] - 1) 7.2 缓存与共享 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 class SharedCache: def __init__(self, max_size: int = 1000): self.cache = {} self.access_count = defaultdict(int) self.max_size = max_size async def get(self, key: str) -> Any: """获取缓存""" if key in self.cache: self.access_count[key] += 1 return self.cache[key]["value"] return None async def set(self, key: str, value: Any, ttl: int = 3600): """设置缓存""" if len(self.cache) >= self.max_size: # LFU淘汰策略 self.evict_least_frequent() self.cache[key] = { "value": value, "expire_at": time.time() + ttl, "set_by": None # 可以记录是哪个Agent设置的 } def evict_least_frequent(self): """淘汰最少使用的缓存""" if not self.cache: return least_used = min( self.cache.keys(), key=lambda k: self.access_count.get(k, 0) ) del self.cache[least_used] del self.access_count[least_used] 8. 监控与调试 8.1 系统监控 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 class SystemMonitor: def __init__(self): self.metrics = { "message_count": 0, "task_completed": 0, "task_failed": 0, "avg_response_time": 0, "agent_utilization": {} } self.event_log = [] def log_event(self, event_type: str, details: Dict): """记录事件""" event = { "timestamp": time.time(), "type": event_type, "details": details } self.event_log.append(event) # 更新指标 self.update_metrics(event) def update_metrics(self, event: Dict): """更新系统指标""" if event["type"] == "MESSAGE_SENT": self.metrics["message_count"] += 1 elif event["type"] == "TASK_COMPLETED": self.metrics["task_completed"] += 1 elif event["type"] == "TASK_FAILED": self.metrics["task_failed"] += 1 def generate_report(self) -> Dict: """生成监控报告""" return { "metrics": self.metrics, "health_status": self.check_health(), "bottlenecks": self.identify_bottlenecks(), "recommendations": self.generate_recommendations() } def check_health(self) -> str: """检查系统健康状态""" success_rate = ( self.metrics["task_completed"] / max(1, self.metrics["task_completed"] + self.metrics["task_failed"]) ) if success_rate > 0.95: return "HEALTHY" elif success_rate > 0.8: return "DEGRADED" else: return "CRITICAL" 8.2 调试工具 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 class Debugger: def __init__(self, system: MultiAgentSystem): self.system = system self.breakpoints = set() self.watch_list = {} def set_breakpoint(self, agent_id: str, event_type: str): """设置断点""" self.breakpoints.add((agent_id, event_type)) async def trace_execution(self, duration: int = 60): """追踪执行过程""" trace_data = [] start_time = time.time() while time.time() - start_time < duration: for agent_id, agent in self.system.agents.items(): state = { "agent_id": agent_id, "state": agent.state, "queue_size": len(agent.message_queue), "timestamp": time.time() } trace_data.append(state) await asyncio.sleep(1) return self.analyze_trace(trace_data) def analyze_trace(self, trace_data: List[Dict]): """分析追踪数据""" analysis = { "deadlocks": self.detect_deadlocks(trace_data), "performance_issues": self.detect_performance_issues(trace_data), "communication_patterns": self.analyze_communication(trace_data) } return analysis 9. 最佳实践 明确的角色定义:每个Agent应有清晰的职责边界 高效的通信协议:减少不必要的消息传递 容错机制:处理Agent失败的情况 可扩展性设计:支持动态添加/移除Agent 监控和日志:全面的系统监控 测试策略:包括单元测试和集成测试 结论 多Agent系统通过协作能够解决单个Agent无法处理的复杂问题。关键在于设计合理的架构、高效的通信机制和智能的协调策略。 ...