摘要
本文深入探讨了Saga模式在分布式智能体工作流编排中解决数据一致性问题的核心机制。通过设计一个完整的、可运行的轻量级工作流引擎项目,我们将演示如何利用Saga的编排(Choreography)模式,将长流程事务分解为一系列可补偿的本地事务,通过正向执行与反向补偿来保障最终一致性。项目核心包括工作流定义与状态管理、基于事件驱动的Saga协调器、以及模拟的智能体服务。文章不仅提供了完整的项目代码(包含状态机、持久化层及补偿逻辑),还通过序列图和流程图清晰展现了事务执行与回滚的完整生命周期,为构建高可靠、可容错的智能体协同系统提供了实践蓝图。
1. 项目概述与设计
在微服务与智能体(Agent)架构盛行的今天,一个复杂的业务目标往往需要多个智能体协同完成。例如,一个"智能内容创作"工作流可能涉及:研究Agent搜集资料、写作Agent生成初稿、审核Agent进行质检、发布Agent上传到平台。这些步骤分布在不同的服务中,构成一个分布式事务。传统的ACID事务在此场景下力不从心。
Saga模式通过将长事务(Long Running Transaction, LRT)分解为一系列本地事务,并为每个本地事务提供一个对应的补偿事务(Compensating Transaction)来解决这个问题。执行过程中,如果某个步骤失败,系统会按照逆序执行已成功步骤的补偿操作,使系统状态回滚到事务开始前的一致状态,从而实现最终一致性。
本项目实现一个基于Saga编排(Choreography)模式的轻量级智能体工作流引擎。核心设计如下:
- 工作流定义:使用YAML或JSON定义包含多个智能体任务的流程,并指定每个任务的补偿操作。
- 工作流引擎:驱动工作流按定义执行,管理整个Saga的生命周期(运行、暂停、失败、完成)。
- Saga协调器:监听每个本地任务执行完成或失败的事件,并决定触发下一个任务或发起补偿回滚。
- 智能体服务模拟:提供几个模拟的智能体服务端点,用于演示成功、失败及补偿操作。
- 状态持久化:将工作流执行状态与Saga日志持久化,确保引擎重启后能恢复。
本示例将模拟一个"文档处理工作流":验证 -> 分析 -> 生成报告。其中分析步骤有一定概率失败,触发对验证步骤的补偿。
2. 项目结构树
saga-agent-workflow/
├── config/
│ └── workflow_def.yaml # 工作流定义文件
├── src/
│ ├── __init__.py
│ ├── models.py # 数据模型(Workflow, Task, SagaLog)
│ ├── workflow_engine.py # 工作流引擎核心(状态机)
│ ├── saga_coordinator.py # Saga协调器(事件处理,补偿逻辑)
│ ├── agent_services.py # 模拟的智能体服务
│ └── persistence.py # 状态持久化层(内存/文件模拟)
├── tests/
│ └── test_workflow.py # 单元测试
├── main.py # 主程序入口
├── requirements.txt # Python依赖
└── README.md # 项目说明(此处按指令不展示内容)
3. 核心代码实现
文件路径:config/workflow_def.yaml
name: "DocumentProcessingWorkflow"
description: "一个模拟的文档处理智能体工作流"
tasks:
- id: "validate_task"
name: "文档验证"
agent_service: "validation_agent"
compensate_service: "compensate_validation"
parameters:
doc_id: "{{workflow_context.doc_id}}"
next_on_success: "analyze_task"
next_on_failure: null # 失败直接触发Saga回滚
- id: "analyze_task"
name: "内容分析"
agent_service: "analysis_agent"
compensate_service: "compensate_analysis"
parameters:
doc_id: "{{workflow_context.doc_id}}"
mode: "deep"
next_on_success: "generate_report_task"
next_on_failure: null
- id: "generate_report_task"
name: "生成报告"
agent_service: "report_agent"
compensate_service: "compensate_report" # 最后一个任务,补偿通常为空操作或清理
parameters:
doc_id: "{{workflow_context.doc_id}}"
format: "pdf"
next_on_success: null # 工作流结束
next_on_failure: null
文件路径:src/models.py
from dataclasses import dataclass, field, asdict
from enum import Enum
from typing import Any, Dict, List, Optional
from datetime import datetime
import uuid
class TaskStatus(Enum):
PENDING = "PENDING"
RUNNING = "RUNNING"
SUCCEEDED = "SUCCEEDED"
FAILED = "FAILED"
COMPENSATED = "COMPENSATED"
COMPENSATION_FAILED = "COMPENSATION_FAILED"
class WorkflowStatus(Enum):
CREATED = "CREATED"
RUNNING = "RUNNING"
PAUSED = "PAUSED"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
COMPENSATING = "COMPENSATING"
COMPENSATED = "COMPENSATED"
@dataclass
class Task:
"""工作流中的一个任务定义"""
id: str
name: str
agent_service: str # 调用的智能体服务名
compensate_service: str # 补偿操作服务名
parameters: Dict[str, Any]
next_on_success: Optional[str]
next_on_failure: Optional[str]
@dataclass
class TaskInstance:
"""任务的一次执行实例"""
task_id: str
status: TaskStatus = TaskStatus.PENDING
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
def to_dict(self):
return {**asdict(self), 'status': self.status.value}
@dataclass
class Workflow:
"""工作流定义与实例的聚合"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
description: str = ""
status: WorkflowStatus = WorkflowStatus.CREATED
created_at: datetime = field(default_factory=datetime.now)
context: Dict[str, Any] = field(default_factory=dict) # 工作流全局上下文,如doc_id
tasks: Dict[str, Task] = field(default_factory=dict) # 任务定义字典 key=task_id
task_instances: Dict[str, TaskInstance] = field(default_factory=dict) # 执行实例
current_task_id: Optional[str] = None
saga_log: List[Dict] = field(default_factory=list) # Saga执行日志
def get_task_order(self) -> List[str]:
"""获取任务执行顺序列表(简易拓扑排序,仅支持线性)"""
order = []
current = None
# 找到起点(没有前序任务)
for task_id, task in self.tasks.items():
if not any(t.next_on_success == task_id or t.next_on_failure == task_id for t in self.tasks.values()):
current = task_id
break
while current:
order.append(current)
current = self.tasks[current].next_on_success
return order
文件路径:src/saga_coordinator.py
import logging
from typing import Dict, Any
from .models import Workflow, WorkflowStatus, TaskStatus, TaskInstance
from .agent_services import call_agent_service
logger = logging.getLogger(__name__)
class SagaCoordinator:
"""
Saga协调器(编排模式)。
监听任务事件,并决定推进工作流或发起补偿。
"""
def __init__(self, workflow_engine):
self.workflow_engine = workflow_engine
self.persistence = workflow_engine.persistence
def handle_task_success(self, workflow_id: str, task_id: str, result: Dict[str, Any]):
"""处理任务成功事件"""
workflow = self.persistence.load_workflow(workflow_id)
if not workflow:
logger.error(f"Workflow {workflow_id} not found.")
return
# 1. 更新任务实例状态
instance = workflow.task_instances[task_id]
instance.status = TaskStatus.SUCCEEDED
instance.result = result
# 更新上下文(可选,可将结果注入后续任务参数)
workflow.context.update(result.get('context_updates', {}))
# 2. 记录Saga日志
workflow.saga_log.append({
'event': 'TASK_SUCCEEDED',
'task_id': task_id,
'timestamp': workflow.created_at.isoformat(),
'result': result
})
# 3. 决定下一步:执行下一个任务 or 完成工作流
task_def = workflow.tasks[task_id]
next_task_id = task_def.next_on_success
if next_task_id:
workflow.current_task_id = next_task_id
workflow.status = WorkflowStatus.RUNNING
self.persistence.save_workflow(workflow)
# 触发引擎执行下一个任务
self.workflow_engine.execute_task(workflow_id, next_task_id)
else:
# 没有下一个任务,工作流成功完成
workflow.status = WorkflowStatus.COMPLETED
workflow.current_task_id = None
self.persistence.save_workflow(workflow)
logger.info(f"Workflow {workflow_id} completed successfully.")
def handle_task_failure(self, workflow_id: str, task_id: str, error: str):
"""处理任务失败事件,触发Saga补偿流程"""
workflow = self.persistence.load_workflow(workflow_id)
if not workflow:
return
# 1. 更新失败任务状态
instance = workflow.task_instances[task_id]
instance.status = TaskStatus.FAILED
instance.error = error
# 2. 记录Saga日志
workflow.saga_log.append({
'event': 'TASK_FAILED',
'task_id': task_id,
'timestamp': workflow.created_at.isoformat(),
'error': error
})
# 3. 改变工作流状态为补偿中
workflow.status = WorkflowStatus.COMPENSATING
workflow.current_task_id = None
self.persistence.save_workflow(workflow)
logger.warning(f"Task {task_id} failed in workflow {workflow_id}. Initiating Saga compensation.")
# 4. 开始补偿流程(逆序补偿已成功的任务)
self._trigger_compensation(workflow)
def _trigger_compensation(self, workflow: Workflow):
"""按逆序触发已成功任务的补偿操作"""
task_order = workflow.get_task_order()
# 找到失败任务的位置
try:
failed_index = task_order.index(workflow.task_instances[task_order[-1]].task_id) # 简化:找最后一个状态非PENDING的任务
except ValueError:
failed_index = len(task_order)
# 获取需要补偿的任务ID列表(在失败任务之前且已成功的)
tasks_to_compensate = []
for i in range(failed_index - 1, -1, -1):
task_id = task_order[i]
instance = workflow.task_instances.get(task_id)
if instance and instance.status == TaskStatus.SUCCEEDED:
tasks_to_compensate.append(task_id)
if not tasks_to_compensate:
workflow.status = WorkflowStatus.FAILED
self.persistence.save_workflow(workflow)
logger.info(f"No tasks to compensate. Workflow {workflow.id} marked as FAILED.")
return
# 异步执行补偿链(这里简化为顺序同步执行)
self._execute_compensation_chain(workflow.id, tasks_to_compensate, 0)
def _execute_compensation_chain(self, workflow_id: str, task_ids: list, index: int):
"""递归执行补偿链"""
if index >= len(task_ids):
# 所有补偿完成
workflow = self.persistence.load_workflow(workflow_id)
if workflow:
workflow.status = WorkflowStatus.COMPENSATED
self.persistence.save_workflow(workflow)
logger.info(f"Workflow {workflow_id} fully compensated.")
return
task_id = task_ids[index]
workflow = self.persistence.load_workflow(workflow_id)
if not workflow:
return
task_def = workflow.tasks[task_id]
instance = workflow.task_instances[task_id]
logger.info(f"Compensating task {task_id}...")
# 调用补偿服务
try:
# 构建补偿参数,通常需要原任务的参数或结果
comp_params = {**instance.result} if instance.result else {}
comp_result = call_agent_service(task_def.compensate_service, comp_params)
instance.status = TaskStatus.COMPENSATED
workflow.saga_log.append({
'event': 'COMPENSATION_SUCCEEDED',
'task_id': task_id,
'timestamp': workflow.created_at.isoformat()
})
self.persistence.save_workflow(workflow)
# 补偿成功,继续下一个补偿
self._execute_compensation_chain(workflow_id, task_ids, index + 1)
except Exception as e:
logger.error(f"Compensation failed for task {task_id}: {e}")
instance.status = TaskStatus.COMPENSATION_FAILED
workflow.saga_log.append({
'event': 'COMPENSATION_FAILED',
'task_id': task_id,
'error': str(e)
})
# 补偿失败!这是Saga模式的一个难点。可采取:1. 重试 2. 人工介入 3. 标记为需人工修复
workflow.status = WorkflowStatus.FAILED # 标记为最终失败
self.persistence.save_workflow(workflow)
# 实际生产中,此处应进入告警或死信队列
文件路径:src/workflow_engine.py
import logging
import yaml
from typing import Dict, Any
from .models import Workflow, WorkflowStatus, Task, TaskStatus, TaskInstance
from .saga_coordinator import SagaCoordinator
from .agent_services import call_agent_service
from .persistence import PersistenceLayer
logger = logging.getLogger(__name__)
class WorkflowEngine:
"""工作流引擎,负责驱动工作流状态机"""
def __init__(self, persistence: PersistenceLayer):
self.persistence = persistence
self.saga_coordinator = SagaCoordinator(self)
def create_workflow(self, definition_path: str, initial_context: Dict[str, Any]) -> Workflow:
"""从YAML定义文件创建工作流实例"""
with open(definition_path, 'r') as f:
def_data = yaml.safe_load(f)
workflow = Workflow()
workflow.name = def_data['name']
workflow.description = def_data.get('description', '')
workflow.context = initial_context
# 解析任务定义
tasks_dict = {}
for task_def in def_data['tasks']:
task = Task(**task_def)
tasks_dict[task.id] = task
# 初始化任务实例
workflow.task_instances[task.id] = TaskInstance(task_id=task.id)
workflow.tasks = tasks_dict
# 设置第一个任务为当前任务
task_order = workflow.get_task_order()
if task_order:
workflow.current_task_id = task_order[0]
workflow.status = WorkflowStatus.CREATED
self.persistence.save_workflow(workflow)
logger.info(f"Created workflow {workflow.id}")
return workflow
def start_workflow(self, workflow_id: str):
"""启动工作流执行"""
workflow = self.persistence.load_workflow(workflow_id)
if not workflow or workflow.status != WorkflowStatus.CREATED:
raise ValueError(f"Cannot start workflow {workflow_id}")
workflow.status = WorkflowStatus.RUNNING
self.persistence.save_workflow(workflow)
logger.info(f"Started workflow {workflow_id}")
# 开始执行第一个任务
if workflow.current_task_id:
self.execute_task(workflow_id, workflow.current_task_id)
else:
workflow.status = WorkflowStatus.COMPLETED
self.persistence.save_workflow(workflow)
def execute_task(self, workflow_id: str, task_id: str):
"""执行指定任务"""
workflow = self.persistence.load_workflow(workflow_id)
if not workflow or workflow.status not in [WorkflowStatus.RUNNING, WorkflowStatus.COMPENSATING]:
return
task_def = workflow.tasks.get(task_id)
instance = workflow.task_instances.get(task_id)
if not task_def or not instance or instance.status != TaskStatus.PENDING:
return
# 更新状态
instance.status = TaskStatus.RUNNING
instance.start_time = workflow.created_at
self.persistence.save_workflow(workflow)
logger.info(f"Executing task {task_id} in workflow {workflow_id}")
# 渲染任务参数(简单的模板替换,如 {{workflow_context.doc_id}})
params = self._render_parameters(task_def.parameters, workflow.context)
try:
# 调用智能体服务
result = call_agent_service(task_def.agent_service, params)
# 成功事件由协调器处理
self.saga_coordinator.handle_task_success(workflow_id, task_id, result)
except Exception as e:
logger.error(f"Task {task_id} execution failed: {e}")
# 失败事件由协调器处理
self.saga_coordinator.handle_task_failure(workflow_id, task_id, str(e))
def _render_parameters(self, params: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""简单参数渲染,支持 {{workflow_context.key}} 语法"""
import json
param_str = json.dumps(params)
for key, value in context.items():
placeholder = f'{{{{workflow_context.{key}}}}}'
if placeholder in param_str:
param_str = param_str.replace(placeholder, str(value))
return json.loads(param_str)
def get_workflow_status(self, workflow_id: str) -> Optional[Workflow]:
"""查询工作流状态"""
return self.persistence.load_workflow(workflow_id)
文件路径:src/agent_services.py
"""
模拟的智能体服务。
每个服务是一个函数,模拟成功、失败及补偿操作。
"""
import random
import time
from typing import Dict, Any
# 配置失败概率
FAILURE_RATE = 0.3 # 30% 概率失败
def validation_agent(params: Dict[str, Any]) -> Dict[str, Any]:
"""模拟文档验证智能体"""
doc_id = params.get('doc_id', 'unknown')
print(f"[Validation Agent] Validating document {doc_id}...")
time.sleep(0.5) # 模拟处理时间
# 模拟一个简单的验证逻辑
if not doc_id or len(doc_id) < 3:
raise ValueError("Invalid document ID")
print(f"[Validation Agent] Document {doc_id} validated successfully.")
return {
"valid": True,
"message": "Document is valid and ready for analysis.",
"context_updates": {"validation_result": "passed"}
}
def compensate_validation(params: Dict[str, Any]) -> Dict[str, Any]:
"""补偿验证操作:例如,删除生成的验证元数据或解锁文档"""
doc_id = params.get('doc_id', 'unknown')
print(f"[Compensation][Validation] Reverting validation effects for doc {doc_id}...")
time.sleep(0.3)
print(f"[Compensation][Validation] Compensation for validation completed.")
return {"status": "compensated", "operation": "cleanup_validation_metadata"}
def analysis_agent(params: Dict[str, Any]) -> Dict[str, Any]:
"""模拟内容分析智能体,有概率失败"""
doc_id = params.get('doc_id', 'unknown')
mode = params.get('mode', 'standard')
print(f"[Analysis Agent] Analyzing document {doc_id} in {mode} mode...")
time.sleep(1.0)
# 模拟随机失败
if random.random() < FAILURE_RATE:
raise RuntimeError(f"Analysis service internal error for doc {doc_id}. Service unavailable.")
print(f"[Analysis Agent] Analysis for document {doc_id} completed.")
return {
"analysis_summary": "Document contains 5 key topics.",
"sentiment": "neutral",
"context_updates": {"analysis_done": True}
}
def compensate_analysis(params: Dict[str, Any]) -> Dict[str, Any]:
"""补偿分析操作:例如,删除分析结果缓存"""
doc_id = params.get('doc_id', 'unknown')
print(f"[Compensation][Analysis] Reverting analysis results for doc {doc_id}...")
time.sleep(0.4)
print(f"[Compensation][Analysis] Compensation for analysis completed.")
return {"status": "compensated", "operation": "delete_analysis_cache"}
def report_agent(params: Dict[str, Any]) -> Dict[str, Any]:
"""模拟报告生成智能体"""
doc_id = params.get('doc_id', 'unknown')
fmt = params.get('format', 'pdf')
print(f"[Report Agent] Generating {fmt} report for document {doc_id}...")
time.sleep(0.8)
print(f"[Report Agent] Report generated successfully for document {doc_id}.")
return {
"report_url": f"/reports/{doc_id}_final.{fmt}",
"size_kb": 1024,
"context_updates": {"report_generated": True}
}
def compensate_report(params: Dict[str, Any]) -> Dict[str, Any]:
"""补偿报告生成操作:删除生成的文件"""
doc_id = params.get('doc_id', 'unknown')
print(f"[Compensation][Report] Deleting generated report for doc {doc_id}...")
time.sleep(0.2)
print(f"[Compensation][Report] Compensation for report generation completed.")
return {"status": "compensated", "operation": "delete_report_file"}
# 服务注册表
SERVICE_REGISTRY = {
'validation_agent': validation_agent,
'analysis_agent': analysis_agent,
'report_agent': report_agent,
'compensate_validation': compensate_validation,
'compensate_analysis': compensate_analysis,
'compensate_report': compensate_report,
}
def call_agent_service(service_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""调用智能体服务的统一入口"""
service_func = SERVICE_REGISTRY.get(service_name)
if not service_func:
raise ValueError(f"Unknown service: {service_name}")
return service_func(params)
文件路径:src/persistence.py
"""
状态持久化层(简化版:使用内存和文件备份)。
生产环境应替换为数据库(如PostgreSQL, MongoDB)。
"""
import json
import os
from typing import Dict, Optional
from .models import Workflow, WorkflowStatus, TaskStatus
class PersistenceLayer:
def __init__(self, storage_dir='workflow_states'):
self.storage_dir = storage_dir
self.in_memory_cache: Dict[str, Workflow] = {}
os.makedirs(storage_dir, exist_ok=True)
def save_workflow(self, workflow: Workflow):
"""保存工作流状态到内存和文件"""
self.in_memory_cache[workflow.id] = workflow
filepath = os.path.join(self.storage_dir, f"{workflow.id}.json")
with open(filepath, 'w') as f:
# 手动序列化,处理枚举和datetime
data = {
'id': workflow.id,
'name': workflow.name,
'status': workflow.status.value,
'created_at': workflow.created_at.isoformat(),
'context': workflow.context,
'current_task_id': workflow.current_task_id,
'tasks': {tid: {**task.__dict__} for tid, task in workflow.tasks.items()},
'task_instances': {tid: inst.to_dict() for tid, inst in workflow.task_instances.items()},
'saga_log': workflow.saga_log
}
json.dump(data, f, indent=2)
def load_workflow(self, workflow_id: str) -> Optional[Workflow]:
"""从内存或文件加载工作流"""
# 先从内存缓存查找
if workflow_id in self.in_memory_cache:
return self.in_memory_cache[workflow_id]
# 从文件加载
filepath = os.path.join(self.storage_dir, f"{workflow_id}.json")
if not os.path.exists(filepath):
return None
with open(filepath, 'r') as f:
data = json.load(f)
from .models import Workflow, Task, TaskInstance
from datetime import datetime
# 重建任务定义
tasks = {}
for tid, task_data in data['tasks'].items():
# 注意:next_on_success/failure 可能是 null (None)
tasks[tid] = Task(
id=task_data['id'],
name=task_data['name'],
agent_service=task_data['agent_service'],
compensate_service=task_data['compensate_service'],
parameters=task_data['parameters'],
next_on_success=task_data['next_on_success'],
next_on_failure=task_data['next_on_failure']
)
# 重建任务实例
task_instances = {}
for tid, inst_data in data.get('task_instances', {}).items():
instance = TaskInstance(task_id=tid)
instance.status = TaskStatus(inst_data['status'])
instance.result = inst_data.get('result')
instance.error = inst_data.get('error')
# 简化处理时间字段
task_instances[tid] = instance
workflow = Workflow(
id=data['id'],
name=data['name'],
status=WorkflowStatus(data['status']),
created_at=datetime.fromisoformat(data['created_at']),
context=data['context'],
current_task_id=data['current_task_id'],
tasks=tasks,
task_instances=task_instances,
saga_log=data.get('saga_log', [])
)
# 放入缓存
self.in_memory_cache[workflow_id] = workflow
return workflow
def list_workflows(self) -> list:
"""列出所有工作流ID"""
files = [f for f in os.listdir(self.storage_dir) if f.endswith('.json')]
return [f[:-5] for f in files] # 移除 .json 后缀
文件路径:main.py
#!/usr/bin/env python3
"""
主程序入口:演示工作流的创建、执行以及Saga补偿场景。
"""
import logging
import sys
from src.persistence import PersistenceLayer
from src.workflow_engine import WorkflowEngine
# 配置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def run_success_scenario():
"""成功场景演示"""
print("\n" + "="*50)
print("SCENARIO 1: Successful Workflow Execution")
print("="*50)
persistence = PersistenceLayer()
engine = WorkflowEngine(persistence)
# 创建工作流实例
initial_context = {"doc_id": "doc_12345"}
workflow = engine.create_workflow('config/workflow_def.yaml', initial_context)
# 启动工作流(异步执行,这里简化为同步等待)
engine.start_workflow(workflow.id)
# 轮询检查状态(在实际异步系统中,状态更新通过事件回调)
import time
for _ in range(30): # 最多等待30秒
wf = engine.get_workflow_status(workflow.id)
if wf.status in [WorkflowStatus.COMPLETED, WorkflowStatus.FAILED, WorkflowStatus.COMPENSATED]:
print(f"\nFinal Workflow Status: {wf.status}")
print(f"Saga Log Length: {len(wf.saga_log)}")
break
time.sleep(0.5)
else:
print("Workflow execution timeout.")
def run_failure_and_compensation_scenario():
"""失败与补偿场景演示(通过提高失败率)"""
print("\n" + "="*50)
print("SCENARIO 2: Failure and Saga Compensation")
print("="*50)
# 临时提高分析服务的失败率
from src.agent_services import FAILURE_RATE
import src.agent_services
src.agent_services.FAILURE_RATE = 0.8 # 80% 失败率
persistence = PersistenceLayer()
engine = WorkflowEngine(persistence)
initial_context = {"doc_id": "doc_67890"}
workflow = engine.create_workflow('config/workflow_def.yaml', initial_context)
engine.start_workflow(workflow.id)
import time
for _ in range(30):
wf = engine.get_workflow_status(workflow.id)
if wf.status in [WorkflowStatus.COMPLETED, WorkflowStatus.FAILED, WorkflowStatus.COMPENSATED]:
print(f"\nFinal Workflow Status: {wf.status}")
print("Task Instance States:")
for tid, inst in wf.task_instances.items():
print(f" - {tid}: {inst.status}")
print(f"Saga Log Events:")
for entry in wf.saga_log[-5:]: # 打印最后5条日志
print(f" - {entry.get('event')} (task: {entry.get('task_id')})")
break
time.sleep(0.5)
# 恢复失败率
src.agent_services.FAILURE_RATE = FAILURE_RATE
if __name__ == '__main__':
# 运行两个演示场景
run_success_scenario()
run_failure_and_compensation_scenario()
文件路径:requirements.txt
PyYAML>=6.0
4. 安装依赖与运行步骤
- 环境准备:确保系统已安装Python 3.7或更高版本。
- 克隆/创建项目目录:
mkdir saga-agent-workflow && cd saga-agent-workflow
# 将上述所有文件按照项目结构树放置到对应目录中。
- 安装依赖:
pip install -r requirements.txt
- 运行演示程序:
python main.py
你将看到两个场景的输出。场景1有较大概率成功完成所有三个任务。场景2设置了高失败率,`analysis_agent`很可能失败,从而触发对前一个任务`validate_task`的补偿操作,最终工作流状态为`COMPENSATED`。
5. 测试与验证
文件路径:tests/test_workflow.py
import pytest
import os
import tempfile
from src.persistence import PersistenceLayer
from src.models import Workflow, Task, WorkflowStatus, TaskStatus
from src.workflow_engine import WorkflowEngine
def test_workflow_creation():
"""测试工作流创建与基本属性"""
with tempfile.TemporaryDirectory() as tmpdir:
persistence = PersistenceLayer(storage_dir=tmpdir)
engine = WorkflowEngine(persistence)
# 需要一个临时的定义文件
def_content = """
name: "TestWorkflow"
tasks:
- id: "task1"
name: "Task One"
agent_service: "validation_agent"
compensate_service: "compensate_validation"
parameters: {}
next_on_success: "task2"
next_on_failure: null
- id: "task2"
name: "Task Two"
agent_service: "report_agent"
compensate_service: "compensate_report"
parameters: {}
next_on_success: null
next_on_failure: null
"""
def_path = os.path.join(tmpdir, 'def.yaml')
with open(def_path, 'w') as f:
f.write(def_content)
workflow = engine.create_workflow(def_path, {"test": "data"})
assert workflow.id is not None
assert workflow.name == "TestWorkflow"
assert len(workflow.tasks) == 2
assert workflow.current_task_id == "task1"
assert workflow.status == WorkflowStatus.CREATED
# 验证持久化
loaded = persistence.load_workflow(workflow.id)
assert loaded.id == workflow.id
def test_task_order_calculation():
"""测试线性任务顺序计算"""
task1 = Task(id="t1", name="1", agent_service="a1", compensate_service="c1",
parameters={}, next_on_success="t2", next_on_failure=None)
task2 = Task(id="t2", name="2", agent_service="a2", compensate_service="c2",
parameters={}, next_on_success="t3", next_on_failure=None)
task3 = Task(id="t3", name="3", agent_service="a3", compensate_service="c3",
parameters={}, next_on_success=None, next_on_failure=None)
workflow = Workflow()
workflow.tasks = {"t1": task1, "t2": task2, "t3": task3}
order = workflow.get_task_order()
assert order == ["t1", "t2", "t3"]
if __name__ == '__main__':
# 简单运行测试
test_workflow_creation()
test_task_order_calculation()
print("All basic tests passed.")
运行单元测试:
python -m pytest tests/test_workflow.py -v
6. 核心机制图示
6.1 Saga执行与补偿序列图
下面的序列图展示了一个包含三个任务的工作流,其中第二个任务失败,触发对第一个任务的补偿。
6.2 工作流引擎状态机流程图
工作流引擎的核心是一个状态机,驱动整个Saga的生命周期。
7. 扩展说明与最佳实践
- 持久化层增强:本项目使用文件模拟持久化,生产环境应替换为数据库。建议使用具有事务能力的数据库(如PostgreSQL)来存储工作流状态和Saga日志,确保持久化操作本身的原子性。
- 通信机制:示例中的服务调用是同步的。在分布式系统中,应使用异步消息队列(如RabbitMQ、Kafka)进行解耦,使引擎、协调器和智能体服务之间通过事件驱动,提高系统的可伸缩性和可靠性。
- 超时与重试:智能体服务调用可能因网络或临时故障失败。应实现指数退避的重试机制,并为每个任务设置超时,避免工作流无限期挂起。
- 补偿事务的幂等性:补偿操作可能被多次调用(例如,协调器崩溃后重启)。必须确保补偿服务是幂等的,即多次执行产生的结果与一次执行相同。
- 可视化与监控:记录详细的Saga日志对于调试和监控至关重要。可以将其导出到ELK栈或时序数据库,并构建仪表盘来跟踪工作流的成功/失败率、补偿次数等关键指标。
- 模式选择:本项目实现了编排(Choreography)模式,适合中等复杂度、服务间耦合度低的场景。对于非常复杂或需要集中控制的流程,可以考虑协调(Orchestration)模式,即由一个中心协调器显式地命令每个参与者执行操作和补偿。
通过这个项目,我们展示了Saga模式如何为分布式智能体工作流提供强大的容错与最终一致性保障。虽然实现一个生产级的系统需要考虑更多边界情况,但本文提供的核心框架与代码已勾勒出关键路径,可作为深入开发的坚实起点。