Saga模式在智能体工作流编排中的容错与一致性保障

2900559190
2026年04月05日
更新于 2026年04月06日
3 次阅读
摘要:本文深入探讨了Saga模式在分布式智能体工作流编排中解决数据一致性问题的核心机制。通过设计一个完整的、可运行的轻量级工作流引擎项目,我们将演示如何利用Saga的编排(Choreography)模式,将长流程事务分解为一系列可补偿的本地事务,通过正向执行与反向补偿来保障最终一致性。项目核心包括工作流定义与状态管理、基于事件驱动的Saga协调器、以及模拟的智能体服务。文章不仅提供了完整的项目代码(包含...

摘要

本文深入探讨了Saga模式在分布式智能体工作流编排中解决数据一致性问题的核心机制。通过设计一个完整的、可运行的轻量级工作流引擎项目,我们将演示如何利用Saga的编排(Choreography)模式,将长流程事务分解为一系列可补偿的本地事务,通过正向执行与反向补偿来保障最终一致性。项目核心包括工作流定义与状态管理、基于事件驱动的Saga协调器、以及模拟的智能体服务。文章不仅提供了完整的项目代码(包含状态机、持久化层及补偿逻辑),还通过序列图和流程图清晰展现了事务执行与回滚的完整生命周期,为构建高可靠、可容错的智能体协同系统提供了实践蓝图。

1. 项目概述与设计

在微服务与智能体(Agent)架构盛行的今天,一个复杂的业务目标往往需要多个智能体协同完成。例如,一个"智能内容创作"工作流可能涉及:研究Agent搜集资料、写作Agent生成初稿、审核Agent进行质检、发布Agent上传到平台。这些步骤分布在不同的服务中,构成一个分布式事务。传统的ACID事务在此场景下力不从心。

Saga模式通过将长事务(Long Running Transaction, LRT)分解为一系列本地事务,并为每个本地事务提供一个对应的补偿事务(Compensating Transaction)来解决这个问题。执行过程中,如果某个步骤失败,系统会按照逆序执行已成功步骤的补偿操作,使系统状态回滚到事务开始前的一致状态,从而实现最终一致性

本项目实现一个基于Saga编排(Choreography)模式的轻量级智能体工作流引擎。核心设计如下:

  1. 工作流定义:使用YAML或JSON定义包含多个智能体任务的流程,并指定每个任务的补偿操作。
  2. 工作流引擎:驱动工作流按定义执行,管理整个Saga的生命周期(运行、暂停、失败、完成)。
  3. Saga协调器:监听每个本地任务执行完成或失败的事件,并决定触发下一个任务或发起补偿回滚。
  4. 智能体服务模拟:提供几个模拟的智能体服务端点,用于演示成功、失败及补偿操作。
  5. 状态持久化:将工作流执行状态与Saga日志持久化,确保引擎重启后能恢复。

本示例将模拟一个"文档处理工作流":验证 -> 分析 -> 生成报告。其中分析步骤有一定概率失败,触发对验证步骤的补偿。

2. 项目结构树

saga-agent-workflow/
├── config/
│   └── workflow_def.yaml      # 工作流定义文件
├── src/
│   ├── __init__.py
│   ├── models.py              # 数据模型(Workflow, Task, SagaLog)
│   ├── workflow_engine.py     # 工作流引擎核心(状态机)
│   ├── saga_coordinator.py    # Saga协调器(事件处理,补偿逻辑)
│   ├── agent_services.py      # 模拟的智能体服务
│   └── persistence.py         # 状态持久化层(内存/文件模拟)
├── tests/
│   └── test_workflow.py       # 单元测试
├── main.py                    # 主程序入口
├── requirements.txt           # Python依赖
└── README.md                  # 项目说明(此处按指令不展示内容)

3. 核心代码实现

文件路径:config/workflow_def.yaml

name: "DocumentProcessingWorkflow"
description: "一个模拟的文档处理智能体工作流"
tasks:

  - id: "validate_task"
    name: "文档验证"
    agent_service: "validation_agent"
    compensate_service: "compensate_validation"
    parameters:
      doc_id: "{{workflow_context.doc_id}}"
    next_on_success: "analyze_task"
    next_on_failure: null # 失败直接触发Saga回滚

  - id: "analyze_task"
    name: "内容分析"
    agent_service: "analysis_agent"
    compensate_service: "compensate_analysis"
    parameters:
      doc_id: "{{workflow_context.doc_id}}"
      mode: "deep"
    next_on_success: "generate_report_task"
    next_on_failure: null

  - id: "generate_report_task"
    name: "生成报告"
    agent_service: "report_agent"
    compensate_service: "compensate_report" # 最后一个任务,补偿通常为空操作或清理
    parameters:
      doc_id: "{{workflow_context.doc_id}}"
      format: "pdf"
    next_on_success: null # 工作流结束
    next_on_failure: null

文件路径:src/models.py

from dataclasses import dataclass, field, asdict
from enum import Enum
from typing import Any, Dict, List, Optional
from datetime import datetime
import uuid

class TaskStatus(Enum):
    PENDING = "PENDING"
    RUNNING = "RUNNING"
    SUCCEEDED = "SUCCEEDED"
    FAILED = "FAILED"
    COMPENSATED = "COMPENSATED"
    COMPENSATION_FAILED = "COMPENSATION_FAILED"

class WorkflowStatus(Enum):
    CREATED = "CREATED"
    RUNNING = "RUNNING"
    PAUSED = "PAUSED"
    COMPLETED = "COMPLETED"
    FAILED = "FAILED"
    COMPENSATING = "COMPENSATING"
    COMPENSATED = "COMPENSATED"

@dataclass
class Task:
    """工作流中的一个任务定义"""
    id: str
    name: str
    agent_service: str          # 调用的智能体服务名
    compensate_service: str     # 补偿操作服务名
    parameters: Dict[str, Any]
    next_on_success: Optional[str]
    next_on_failure: Optional[str]

@dataclass
class TaskInstance:
    """任务的一次执行实例"""
    task_id: str
    status: TaskStatus = TaskStatus.PENDING
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    result: Optional[Dict[str, Any]] = None
    error: Optional[str] = None

    def to_dict(self):
        return {**asdict(self), 'status': self.status.value}

@dataclass
class Workflow:
    """工作流定义与实例的聚合"""
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    name: str = ""
    description: str = ""
    status: WorkflowStatus = WorkflowStatus.CREATED
    created_at: datetime = field(default_factory=datetime.now)
    context: Dict[str, Any] = field(default_factory=dict)  # 工作流全局上下文,如doc_id
    tasks: Dict[str, Task] = field(default_factory=dict)   # 任务定义字典 key=task_id
    task_instances: Dict[str, TaskInstance] = field(default_factory=dict) # 执行实例
    current_task_id: Optional[str] = None
    saga_log: List[Dict] = field(default_factory=list)     # Saga执行日志

    def get_task_order(self) -> List[str]:
        """获取任务执行顺序列表(简易拓扑排序,仅支持线性)"""
        order = []
        current = None
        # 找到起点(没有前序任务)
        for task_id, task in self.tasks.items():
            if not any(t.next_on_success == task_id or t.next_on_failure == task_id for t in self.tasks.values()):
                current = task_id
                break
        while current:
            order.append(current)
            current = self.tasks[current].next_on_success
        return order

文件路径:src/saga_coordinator.py

import logging
from typing import Dict, Any
from .models import Workflow, WorkflowStatus, TaskStatus, TaskInstance
from .agent_services import call_agent_service

logger = logging.getLogger(__name__)

class SagaCoordinator:
    """
    Saga协调器(编排模式)。
    监听任务事件,并决定推进工作流或发起补偿。
    """
    def __init__(self, workflow_engine):
        self.workflow_engine = workflow_engine
        self.persistence = workflow_engine.persistence

    def handle_task_success(self, workflow_id: str, task_id: str, result: Dict[str, Any]):
        """处理任务成功事件"""
        workflow = self.persistence.load_workflow(workflow_id)
        if not workflow:
            logger.error(f"Workflow {workflow_id} not found.")
            return

        # 1. 更新任务实例状态
        instance = workflow.task_instances[task_id]
        instance.status = TaskStatus.SUCCEEDED
        instance.result = result
        # 更新上下文(可选,可将结果注入后续任务参数)
        workflow.context.update(result.get('context_updates', {}))

        # 2. 记录Saga日志
        workflow.saga_log.append({
            'event': 'TASK_SUCCEEDED',
            'task_id': task_id,
            'timestamp': workflow.created_at.isoformat(),
            'result': result
        })

        # 3. 决定下一步:执行下一个任务 or 完成工作流
        task_def = workflow.tasks[task_id]
        next_task_id = task_def.next_on_success

        if next_task_id:
            workflow.current_task_id = next_task_id
            workflow.status = WorkflowStatus.RUNNING
            self.persistence.save_workflow(workflow)
            # 触发引擎执行下一个任务
            self.workflow_engine.execute_task(workflow_id, next_task_id)
        else:
            # 没有下一个任务,工作流成功完成
            workflow.status = WorkflowStatus.COMPLETED
            workflow.current_task_id = None
            self.persistence.save_workflow(workflow)
            logger.info(f"Workflow {workflow_id} completed successfully.")

    def handle_task_failure(self, workflow_id: str, task_id: str, error: str):
        """处理任务失败事件,触发Saga补偿流程"""
        workflow = self.persistence.load_workflow(workflow_id)
        if not workflow:
            return

        # 1. 更新失败任务状态
        instance = workflow.task_instances[task_id]
        instance.status = TaskStatus.FAILED
        instance.error = error

        # 2. 记录Saga日志
        workflow.saga_log.append({
            'event': 'TASK_FAILED',
            'task_id': task_id,
            'timestamp': workflow.created_at.isoformat(),
            'error': error
        })

        # 3. 改变工作流状态为补偿中
        workflow.status = WorkflowStatus.COMPENSATING
        workflow.current_task_id = None
        self.persistence.save_workflow(workflow)

        logger.warning(f"Task {task_id} failed in workflow {workflow_id}. Initiating Saga compensation.")
        # 4. 开始补偿流程(逆序补偿已成功的任务)
        self._trigger_compensation(workflow)

    def _trigger_compensation(self, workflow: Workflow):
        """按逆序触发已成功任务的补偿操作"""
        task_order = workflow.get_task_order()
        # 找到失败任务的位置
        try:
            failed_index = task_order.index(workflow.task_instances[task_order[-1]].task_id) # 简化:找最后一个状态非PENDING的任务
        except ValueError:
            failed_index = len(task_order)

        # 获取需要补偿的任务ID列表(在失败任务之前且已成功的)
        tasks_to_compensate = []
        for i in range(failed_index - 1, -1, -1):
            task_id = task_order[i]
            instance = workflow.task_instances.get(task_id)
            if instance and instance.status == TaskStatus.SUCCEEDED:
                tasks_to_compensate.append(task_id)

        if not tasks_to_compensate:
            workflow.status = WorkflowStatus.FAILED
            self.persistence.save_workflow(workflow)
            logger.info(f"No tasks to compensate. Workflow {workflow.id} marked as FAILED.")
            return

        # 异步执行补偿链(这里简化为顺序同步执行)
        self._execute_compensation_chain(workflow.id, tasks_to_compensate, 0)

    def _execute_compensation_chain(self, workflow_id: str, task_ids: list, index: int):
        """递归执行补偿链"""
        if index >= len(task_ids):
            # 所有补偿完成
            workflow = self.persistence.load_workflow(workflow_id)
            if workflow:
                workflow.status = WorkflowStatus.COMPENSATED
                self.persistence.save_workflow(workflow)
                logger.info(f"Workflow {workflow_id} fully compensated.")
            return

        task_id = task_ids[index]
        workflow = self.persistence.load_workflow(workflow_id)
        if not workflow:
            return

        task_def = workflow.tasks[task_id]
        instance = workflow.task_instances[task_id]

        logger.info(f"Compensating task {task_id}...")
        # 调用补偿服务
        try:
            # 构建补偿参数,通常需要原任务的参数或结果
            comp_params = {**instance.result} if instance.result else {}
            comp_result = call_agent_service(task_def.compensate_service, comp_params)
            instance.status = TaskStatus.COMPENSATED
            workflow.saga_log.append({
                'event': 'COMPENSATION_SUCCEEDED',
                'task_id': task_id,
                'timestamp': workflow.created_at.isoformat()
            })
            self.persistence.save_workflow(workflow)
            # 补偿成功,继续下一个补偿
            self._execute_compensation_chain(workflow_id, task_ids, index + 1)
        except Exception as e:
            logger.error(f"Compensation failed for task {task_id}: {e}")
            instance.status = TaskStatus.COMPENSATION_FAILED
            workflow.saga_log.append({
                'event': 'COMPENSATION_FAILED',
                'task_id': task_id,
                'error': str(e)
            })
            # 补偿失败!这是Saga模式的一个难点。可采取:1. 重试 2. 人工介入 3. 标记为需人工修复
            workflow.status = WorkflowStatus.FAILED # 标记为最终失败
            self.persistence.save_workflow(workflow)
            # 实际生产中,此处应进入告警或死信队列

文件路径:src/workflow_engine.py

import logging
import yaml
from typing import Dict, Any
from .models import Workflow, WorkflowStatus, Task, TaskStatus, TaskInstance
from .saga_coordinator import SagaCoordinator
from .agent_services import call_agent_service
from .persistence import PersistenceLayer

logger = logging.getLogger(__name__)

class WorkflowEngine:
    """工作流引擎,负责驱动工作流状态机"""
    def __init__(self, persistence: PersistenceLayer):
        self.persistence = persistence
        self.saga_coordinator = SagaCoordinator(self)

    def create_workflow(self, definition_path: str, initial_context: Dict[str, Any]) -> Workflow:
        """从YAML定义文件创建工作流实例"""
        with open(definition_path, 'r') as f:
            def_data = yaml.safe_load(f)

        workflow = Workflow()
        workflow.name = def_data['name']
        workflow.description = def_data.get('description', '')
        workflow.context = initial_context

        # 解析任务定义
        tasks_dict = {}
        for task_def in def_data['tasks']:
            task = Task(**task_def)
            tasks_dict[task.id] = task
            # 初始化任务实例
            workflow.task_instances[task.id] = TaskInstance(task_id=task.id)
        workflow.tasks = tasks_dict

        # 设置第一个任务为当前任务
        task_order = workflow.get_task_order()
        if task_order:
            workflow.current_task_id = task_order[0]
        workflow.status = WorkflowStatus.CREATED
        self.persistence.save_workflow(workflow)
        logger.info(f"Created workflow {workflow.id}")
        return workflow

    def start_workflow(self, workflow_id: str):
        """启动工作流执行"""
        workflow = self.persistence.load_workflow(workflow_id)
        if not workflow or workflow.status != WorkflowStatus.CREATED:
            raise ValueError(f"Cannot start workflow {workflow_id}")

        workflow.status = WorkflowStatus.RUNNING
        self.persistence.save_workflow(workflow)
        logger.info(f"Started workflow {workflow_id}")

        # 开始执行第一个任务
        if workflow.current_task_id:
            self.execute_task(workflow_id, workflow.current_task_id)
        else:
            workflow.status = WorkflowStatus.COMPLETED
            self.persistence.save_workflow(workflow)

    def execute_task(self, workflow_id: str, task_id: str):
        """执行指定任务"""
        workflow = self.persistence.load_workflow(workflow_id)
        if not workflow or workflow.status not in [WorkflowStatus.RUNNING, WorkflowStatus.COMPENSATING]:
            return

        task_def = workflow.tasks.get(task_id)
        instance = workflow.task_instances.get(task_id)
        if not task_def or not instance or instance.status != TaskStatus.PENDING:
            return

        # 更新状态
        instance.status = TaskStatus.RUNNING
        instance.start_time = workflow.created_at
        self.persistence.save_workflow(workflow)

        logger.info(f"Executing task {task_id} in workflow {workflow_id}")
        # 渲染任务参数(简单的模板替换,如 {{workflow_context.doc_id}})
        params = self._render_parameters(task_def.parameters, workflow.context)

        try:
            # 调用智能体服务
            result = call_agent_service(task_def.agent_service, params)
            # 成功事件由协调器处理
            self.saga_coordinator.handle_task_success(workflow_id, task_id, result)
        except Exception as e:
            logger.error(f"Task {task_id} execution failed: {e}")
            # 失败事件由协调器处理
            self.saga_coordinator.handle_task_failure(workflow_id, task_id, str(e))

    def _render_parameters(self, params: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
        """简单参数渲染,支持 {{workflow_context.key}} 语法"""
        import json
        param_str = json.dumps(params)
        for key, value in context.items():
            placeholder = f'{{{{workflow_context.{key}}}}}'
            if placeholder in param_str:
                param_str = param_str.replace(placeholder, str(value))
        return json.loads(param_str)

    def get_workflow_status(self, workflow_id: str) -> Optional[Workflow]:
        """查询工作流状态"""
        return self.persistence.load_workflow(workflow_id)

文件路径:src/agent_services.py

"""
模拟的智能体服务。
每个服务是一个函数,模拟成功、失败及补偿操作。
"""
import random
import time
from typing import Dict, Any

# 配置失败概率
FAILURE_RATE = 0.3  # 30% 概率失败

def validation_agent(params: Dict[str, Any]) -> Dict[str, Any]:
    """模拟文档验证智能体"""
    doc_id = params.get('doc_id', 'unknown')
    print(f"[Validation Agent] Validating document {doc_id}...")
    time.sleep(0.5)  # 模拟处理时间
    # 模拟一个简单的验证逻辑
    if not doc_id or len(doc_id) < 3:
        raise ValueError("Invalid document ID")
    print(f"[Validation Agent] Document {doc_id} validated successfully.")
    return {
        "valid": True,
        "message": "Document is valid and ready for analysis.",
        "context_updates": {"validation_result": "passed"}
    }

def compensate_validation(params: Dict[str, Any]) -> Dict[str, Any]:
    """补偿验证操作:例如,删除生成的验证元数据或解锁文档"""
    doc_id = params.get('doc_id', 'unknown')
    print(f"[Compensation][Validation] Reverting validation effects for doc {doc_id}...")
    time.sleep(0.3)
    print(f"[Compensation][Validation] Compensation for validation completed.")
    return {"status": "compensated", "operation": "cleanup_validation_metadata"}

def analysis_agent(params: Dict[str, Any]) -> Dict[str, Any]:
    """模拟内容分析智能体,有概率失败"""
    doc_id = params.get('doc_id', 'unknown')
    mode = params.get('mode', 'standard')
    print(f"[Analysis Agent] Analyzing document {doc_id} in {mode} mode...")
    time.sleep(1.0)

    # 模拟随机失败
    if random.random() < FAILURE_RATE:
        raise RuntimeError(f"Analysis service internal error for doc {doc_id}. Service unavailable.")

    print(f"[Analysis Agent] Analysis for document {doc_id} completed.")
    return {
        "analysis_summary": "Document contains 5 key topics.",
        "sentiment": "neutral",
        "context_updates": {"analysis_done": True}
    }

def compensate_analysis(params: Dict[str, Any]) -> Dict[str, Any]:
    """补偿分析操作:例如,删除分析结果缓存"""
    doc_id = params.get('doc_id', 'unknown')
    print(f"[Compensation][Analysis] Reverting analysis results for doc {doc_id}...")
    time.sleep(0.4)
    print(f"[Compensation][Analysis] Compensation for analysis completed.")
    return {"status": "compensated", "operation": "delete_analysis_cache"}

def report_agent(params: Dict[str, Any]) -> Dict[str, Any]:
    """模拟报告生成智能体"""
    doc_id = params.get('doc_id', 'unknown')
    fmt = params.get('format', 'pdf')
    print(f"[Report Agent] Generating {fmt} report for document {doc_id}...")
    time.sleep(0.8)
    print(f"[Report Agent] Report generated successfully for document {doc_id}.")
    return {
        "report_url": f"/reports/{doc_id}_final.{fmt}",
        "size_kb": 1024,
        "context_updates": {"report_generated": True}
    }

def compensate_report(params: Dict[str, Any]) -> Dict[str, Any]:
    """补偿报告生成操作:删除生成的文件"""
    doc_id = params.get('doc_id', 'unknown')
    print(f"[Compensation][Report] Deleting generated report for doc {doc_id}...")
    time.sleep(0.2)
    print(f"[Compensation][Report] Compensation for report generation completed.")
    return {"status": "compensated", "operation": "delete_report_file"}

# 服务注册表
SERVICE_REGISTRY = {
    'validation_agent': validation_agent,
    'analysis_agent': analysis_agent,
    'report_agent': report_agent,
    'compensate_validation': compensate_validation,
    'compensate_analysis': compensate_analysis,
    'compensate_report': compensate_report,
}

def call_agent_service(service_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
    """调用智能体服务的统一入口"""
    service_func = SERVICE_REGISTRY.get(service_name)
    if not service_func:
        raise ValueError(f"Unknown service: {service_name}")
    return service_func(params)

文件路径:src/persistence.py

"""
状态持久化层(简化版:使用内存和文件备份)。
生产环境应替换为数据库(如PostgreSQL, MongoDB)。
"""
import json
import os
from typing import Dict, Optional
from .models import Workflow, WorkflowStatus, TaskStatus

class PersistenceLayer:
    def __init__(self, storage_dir='workflow_states'):
        self.storage_dir = storage_dir
        self.in_memory_cache: Dict[str, Workflow] = {}
        os.makedirs(storage_dir, exist_ok=True)

    def save_workflow(self, workflow: Workflow):
        """保存工作流状态到内存和文件"""
        self.in_memory_cache[workflow.id] = workflow
        filepath = os.path.join(self.storage_dir, f"{workflow.id}.json")
        with open(filepath, 'w') as f:
            # 手动序列化,处理枚举和datetime
            data = {
                'id': workflow.id,
                'name': workflow.name,
                'status': workflow.status.value,
                'created_at': workflow.created_at.isoformat(),
                'context': workflow.context,
                'current_task_id': workflow.current_task_id,
                'tasks': {tid: {**task.__dict__} for tid, task in workflow.tasks.items()},
                'task_instances': {tid: inst.to_dict() for tid, inst in workflow.task_instances.items()},
                'saga_log': workflow.saga_log
            }
            json.dump(data, f, indent=2)

    def load_workflow(self, workflow_id: str) -> Optional[Workflow]:
        """从内存或文件加载工作流"""
        # 先从内存缓存查找
        if workflow_id in self.in_memory_cache:
            return self.in_memory_cache[workflow_id]

        # 从文件加载
        filepath = os.path.join(self.storage_dir, f"{workflow_id}.json")
        if not os.path.exists(filepath):
            return None

        with open(filepath, 'r') as f:
            data = json.load(f)

        from .models import Workflow, Task, TaskInstance
        from datetime import datetime

        # 重建任务定义
        tasks = {}
        for tid, task_data in data['tasks'].items():
            # 注意:next_on_success/failure 可能是 null (None)
            tasks[tid] = Task(
                id=task_data['id'],
                name=task_data['name'],
                agent_service=task_data['agent_service'],
                compensate_service=task_data['compensate_service'],
                parameters=task_data['parameters'],
                next_on_success=task_data['next_on_success'],
                next_on_failure=task_data['next_on_failure']
            )

        # 重建任务实例
        task_instances = {}
        for tid, inst_data in data.get('task_instances', {}).items():
            instance = TaskInstance(task_id=tid)
            instance.status = TaskStatus(inst_data['status'])
            instance.result = inst_data.get('result')
            instance.error = inst_data.get('error')
            # 简化处理时间字段
            task_instances[tid] = instance

        workflow = Workflow(
            id=data['id'],
            name=data['name'],
            status=WorkflowStatus(data['status']),
            created_at=datetime.fromisoformat(data['created_at']),
            context=data['context'],
            current_task_id=data['current_task_id'],
            tasks=tasks,
            task_instances=task_instances,
            saga_log=data.get('saga_log', [])
        )
        # 放入缓存
        self.in_memory_cache[workflow_id] = workflow
        return workflow

    def list_workflows(self) -> list:
        """列出所有工作流ID"""
        files = [f for f in os.listdir(self.storage_dir) if f.endswith('.json')]
        return [f[:-5] for f in files]  # 移除 .json 后缀

文件路径:main.py

#!/usr/bin/env python3
"""
主程序入口:演示工作流的创建、执行以及Saga补偿场景。
"""
import logging
import sys
from src.persistence import PersistenceLayer
from src.workflow_engine import WorkflowEngine

# 配置日志
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def run_success_scenario():
    """成功场景演示"""
    print("\n" + "="*50)
    print("SCENARIO 1: Successful Workflow Execution")
    print("="*50)
    persistence = PersistenceLayer()
    engine = WorkflowEngine(persistence)

    # 创建工作流实例
    initial_context = {"doc_id": "doc_12345"}
    workflow = engine.create_workflow('config/workflow_def.yaml', initial_context)

    # 启动工作流(异步执行,这里简化为同步等待)
    engine.start_workflow(workflow.id)

    # 轮询检查状态(在实际异步系统中,状态更新通过事件回调)
    import time
    for _ in range(30):  # 最多等待30秒
        wf = engine.get_workflow_status(workflow.id)
        if wf.status in [WorkflowStatus.COMPLETED, WorkflowStatus.FAILED, WorkflowStatus.COMPENSATED]:
            print(f"\nFinal Workflow Status: {wf.status}")
            print(f"Saga Log Length: {len(wf.saga_log)}")
            break
        time.sleep(0.5)
    else:
        print("Workflow execution timeout.")

def run_failure_and_compensation_scenario():
    """失败与补偿场景演示(通过提高失败率)"""
    print("\n" + "="*50)
    print("SCENARIO 2: Failure and Saga Compensation")
    print("="*50)
    # 临时提高分析服务的失败率
    from src.agent_services import FAILURE_RATE
    import src.agent_services
    src.agent_services.FAILURE_RATE = 0.8  # 80% 失败率

    persistence = PersistenceLayer()
    engine = WorkflowEngine(persistence)

    initial_context = {"doc_id": "doc_67890"}
    workflow = engine.create_workflow('config/workflow_def.yaml', initial_context)
    engine.start_workflow(workflow.id)

    import time
    for _ in range(30):
        wf = engine.get_workflow_status(workflow.id)
        if wf.status in [WorkflowStatus.COMPLETED, WorkflowStatus.FAILED, WorkflowStatus.COMPENSATED]:
            print(f"\nFinal Workflow Status: {wf.status}")
            print("Task Instance States:")
            for tid, inst in wf.task_instances.items():
                print(f"  - {tid}: {inst.status}")
            print(f"Saga Log Events:")
            for entry in wf.saga_log[-5:]:  # 打印最后5条日志
                print(f"  - {entry.get('event')} (task: {entry.get('task_id')})")
            break
        time.sleep(0.5)

    # 恢复失败率
    src.agent_services.FAILURE_RATE = FAILURE_RATE

if __name__ == '__main__':
    # 运行两个演示场景
    run_success_scenario()
    run_failure_and_compensation_scenario()

文件路径:requirements.txt

PyYAML>=6.0

4. 安装依赖与运行步骤

  1. 环境准备:确保系统已安装Python 3.7或更高版本。
  2. 克隆/创建项目目录
mkdir saga-agent-workflow && cd saga-agent-workflow
    # 将上述所有文件按照项目结构树放置到对应目录中。
  1. 安装依赖
pip install -r requirements.txt
  1. 运行演示程序
python main.py
你将看到两个场景的输出。场景1有较大概率成功完成所有三个任务。场景2设置了高失败率,`analysis_agent`很可能失败,从而触发对前一个任务`validate_task`的补偿操作,最终工作流状态为`COMPENSATED`。

5. 测试与验证

文件路径:tests/test_workflow.py

import pytest
import os
import tempfile
from src.persistence import PersistenceLayer
from src.models import Workflow, Task, WorkflowStatus, TaskStatus
from src.workflow_engine import WorkflowEngine

def test_workflow_creation():
    """测试工作流创建与基本属性"""
    with tempfile.TemporaryDirectory() as tmpdir:
        persistence = PersistenceLayer(storage_dir=tmpdir)
        engine = WorkflowEngine(persistence)

        # 需要一个临时的定义文件
        def_content = """
        name: "TestWorkflow"
        tasks:

          - id: "task1"
            name: "Task One"
            agent_service: "validation_agent"
            compensate_service: "compensate_validation"
            parameters: {}
            next_on_success: "task2"
            next_on_failure: null

          - id: "task2"
            name: "Task Two"
            agent_service: "report_agent"
            compensate_service: "compensate_report"
            parameters: {}
            next_on_success: null
            next_on_failure: null
        """
        def_path = os.path.join(tmpdir, 'def.yaml')
        with open(def_path, 'w') as f:
            f.write(def_content)

        workflow = engine.create_workflow(def_path, {"test": "data"})
        assert workflow.id is not None
        assert workflow.name == "TestWorkflow"
        assert len(workflow.tasks) == 2
        assert workflow.current_task_id == "task1"
        assert workflow.status == WorkflowStatus.CREATED

        # 验证持久化
        loaded = persistence.load_workflow(workflow.id)
        assert loaded.id == workflow.id

def test_task_order_calculation():
    """测试线性任务顺序计算"""
    task1 = Task(id="t1", name="1", agent_service="a1", compensate_service="c1",
                 parameters={}, next_on_success="t2", next_on_failure=None)
    task2 = Task(id="t2", name="2", agent_service="a2", compensate_service="c2",
                 parameters={}, next_on_success="t3", next_on_failure=None)
    task3 = Task(id="t3", name="3", agent_service="a3", compensate_service="c3",
                 parameters={}, next_on_success=None, next_on_failure=None)

    workflow = Workflow()
    workflow.tasks = {"t1": task1, "t2": task2, "t3": task3}
    order = workflow.get_task_order()
    assert order == ["t1", "t2", "t3"]

if __name__ == '__main__':
    # 简单运行测试
    test_workflow_creation()
    test_task_order_calculation()
    print("All basic tests passed.")

运行单元测试:

python -m pytest tests/test_workflow.py -v

6. 核心机制图示

6.1 Saga执行与补偿序列图

下面的序列图展示了一个包含三个任务的工作流,其中第二个任务失败,触发对第一个任务的补偿。

sequenceDiagram participant Client participant Engine participant Coordinator participant Agent1 as Validate Agent participant Agent2 as Analyze Agent participant Comp1 as Compensate Validate Client->>Engine: 启动工作流 Engine->>Coordinator: 记录开始 Engine->>Agent1: 调用 validate_task Agent1-->>Engine: 成功 Engine->>Coordinator: 报告成功 Coordinator->>Engine: 推进到 analyze_task Engine->>Agent2: 调用 analyze_task Agent2-->>Engine: 失败! Engine->>Coordinator: 报告失败 Coordinator->>Coordinator: 决定补偿 (逆序) Coordinator->>Engine: 开始补偿流程 Engine->>Comp1: 调用 compensate_validation Comp1-->>Engine: 补偿成功 Engine->>Coordinator: 报告补偿完成 Coordinator->>Client: 工作流状态: COMPENSATED

6.2 工作流引擎状态机流程图

工作流引擎的核心是一个状态机,驱动整个Saga的生命周期。

graph LR A[CREATED] -->|start_workflow| B[RUNNING] B -->|所有任务成功| C[COMPLETED] B -->|某个任务失败| D[COMPENSATING] D -->|所有补偿成功| E[COMPENSATED] D -->|任何补偿失败| F[FAILED] B -->|手动暂停| G[PAUSED] G -->|恢复| B style A fill:#e1f5fe style C fill:#e8f5e8 style E fill:#fff3e0 style F fill:#ffebee

7. 扩展说明与最佳实践

  1. 持久化层增强:本项目使用文件模拟持久化,生产环境应替换为数据库。建议使用具有事务能力的数据库(如PostgreSQL)来存储工作流状态和Saga日志,确保持久化操作本身的原子性。
  2. 通信机制:示例中的服务调用是同步的。在分布式系统中,应使用异步消息队列(如RabbitMQ、Kafka)进行解耦,使引擎、协调器和智能体服务之间通过事件驱动,提高系统的可伸缩性和可靠性。
  3. 超时与重试:智能体服务调用可能因网络或临时故障失败。应实现指数退避的重试机制,并为每个任务设置超时,避免工作流无限期挂起。
  4. 补偿事务的幂等性:补偿操作可能被多次调用(例如,协调器崩溃后重启)。必须确保补偿服务是幂等的,即多次执行产生的结果与一次执行相同。
  5. 可视化与监控:记录详细的Saga日志对于调试和监控至关重要。可以将其导出到ELK栈或时序数据库,并构建仪表盘来跟踪工作流的成功/失败率、补偿次数等关键指标。
  6. 模式选择:本项目实现了编排(Choreography)模式,适合中等复杂度、服务间耦合度低的场景。对于非常复杂或需要集中控制的流程,可以考虑协调(Orchestration)模式,即由一个中心协调器显式地命令每个参与者执行操作和补偿。

通过这个项目,我们展示了Saga模式如何为分布式智能体工作流提供强大的容错与最终一致性保障。虽然实现一个生产级的系统需要考虑更多边界情况,但本文提供的核心框架与代码已勾勒出关键路径,可作为深入开发的坚实起点。