面向容量规划的微服务架构分层与关键抽象设计

2900559190
2026年03月13日
更新于 2026年03月14日
4 次阅读
摘要:本文介绍了一个面向容量规划的微服务架构原型系统。该系统通过设计**服务(Service)**、**实例(Instance)**、**指标(Metric)** 和**规划(Plan)** 等核心抽象,实现了对微服务资源模型的标准刻画。系统采用三层架构(API、核心、数据),并提供了一个基于历史指标进行简单负载预测与伸缩建议的核心算法。文中提供了完整的项目代码,包括应用入口、数据模型、核心算法及配置,...

摘要

本文介绍了一个面向容量规划的微服务架构原型系统。该系统通过设计服务(Service)实例(Instance)指标(Metric)规划(Plan) 等核心抽象,实现了对微服务资源模型的标准刻画。系统采用三层架构(API、核心、数据),并提供了一个基于历史指标进行简单负载预测与伸缩建议的核心算法。文中提供了完整的项目代码,包括应用入口、数据模型、核心算法及配置,总代码量控制在1500行以内,并配以清晰的架构图与流程图。读者可参照步骤安装运行,从而理解如何将容量规划的关键概念转化为可运行的软件设计。

1. 项目概述:容量规划系统的核心设计

在微服务架构中,容量规划旨在确保每个服务拥有恰到好处的计算资源,既能满足性能与可用性要求(SLA),又能避免资源浪费以优化成本。传统单点监控告警无法解决这一系统性难题。我们需要一个更高维度的抽象层,对服务的资源供需关系进行建模、分析与决策。

本项目的目标是构建一个轻量级的容量规划系统原型。它不追求替换成熟的监控(如Prometheus)或编排系统(如Kubernetes),而是作为它们之上的"决策大脑"。其核心设计思路如下:

  1. 分层架构:清晰分离关注点,提升可维护性与可扩展性。

    • API层:提供RESTful接口,用于接收指标、触发规划、查询状态。
    • 核心层:包含核心抽象模型、规划算法与业务流程。
    • 数据层:负责抽象化后的模型存储与查询(本项目使用内存存储简化实现)。
  2. 关键抽象:定义容量规划领域内的核心实体。

    • Service(服务):微服务的逻辑单元,包含其资源规格要求。
    • Instance(实例):服务的具体运行副本,拥有独立的资源使用指标。
    • Metric(指标):实例在特定时间点的性能度量(如CPU使用率、QPS)。
    • Plan(规划):系统生成的扩容或缩容建议,描述了目标状态。
  3. 核心算法:实现一个简单的容量规划逻辑。算法周期性分析历史指标,预测未来负载,并与当前资源配置对比,产生扩缩容建议。

graph TD subgraph "外部系统" A[监控系统] -->|推送指标| B(容量规划系统 API) C[编排系统] -->|查询/执行规划| B end subgraph "容量规划系统分层" B --> D[API 层] D --> E[核心层] E --> F[数据层] end subgraph "核心层组件" E --> G[模型抽象] E --> H[规划引擎] E --> I[预测器] end subgraph "数据层" F --> J[内存存储] end G --> H I --> H H --> K{生成规划} K -->|扩容/缩容| L[Plan] L --> D

上图展示了系统的整体架构与数据流。外部监控系统将指标推送至本系统的API层,经核心层的模型与算法处理,最终生成规划建议,可供外部编排系统消费。

2. 项目结构树

capacity-planner/
├── app.py                      # FastAPI应用主入口
├── core/
│   ├── __init__.py
│   ├── models.py               # 核心数据模型定义(Service, Instance, Metric, Plan)
│   ├── algorithm.py            # 容量规划核心算法
│   ├── storage.py              # 数据存储抽象与内存实现
│   └── config.py               # 应用配置
├── api/
│   ├── __init__.py
│   ├── endpoints.py            # RESTful API路由定义
│   └── dependencies.py         # FastAPI依赖注入(如获取存储实例)
├── requirements.txt            # Python依赖列表
└── .env.example                # 环境变量示例文件

3. 核心代码实现

文件路径:core/models.py

此文件定义了系统的领域模型,是所有业务逻辑的基石。

"""
容量规划核心模型定义。
"""
from datetime import datetime
from enum import Enum
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field, validator

class ServiceStatus(str, Enum):
    """服务状态枚举"""
    HEALTHY = "healthy"
    WARNING = "warning"
    CRITICAL = "critical"

class PlanAction(str, Enum):
    """规划动作枚举"""
    SCALE_OUT = "scale_out"
    SCALE_IN = "scale_in"
    DO_NOTHING = "do_nothing"

class Service(BaseModel):
    """服务抽象"""
    id: str = Field(..., description="服务唯一标识")
    name: str = Field(..., description="服务名称")
    min_instances: int = Field(1, ge=1, description="最小实例数")
    max_instances: int = Field(10, ge=1, description="最大实例数")
    target_cpu_utilization: float = Field(0.65, ge=0.1, le=1.0, description="目标CPU利用率")
    current_instances: int = Field(1, description="当前运行实例数")
    status: ServiceStatus = Field(ServiceStatus.HEALTHY, description="服务状态")
    created_at: datetime = Field(default_factory=datetime.utcnow)
    updated_at: datetime = Field(default_factory=datetime.utcnow)

    class Config:
        use_enum_values = True

    @validator('max_instances')
    def max_instances_greater_than_min(cls, v, values):
        if 'min_instances' in values and v < values['min_instances']:
            raise ValueError('max_instances must be >= min_instances')
        return v

class Instance(BaseModel):
    """服务实例抽象"""
    id: str = Field(..., description="实例唯一标识")
    service_id: str = Field(..., description="所属服务ID")
    host: str = Field(..., description="运行主机")
    port: int = Field(..., description="服务端口")
    metadata: Dict[str, Any] = Field(default_factory=dict, description="实例元数据")
    created_at: datetime = Field(default_factory=datetime.utcnow)
    is_active: bool = Field(True, description="实例是否活跃")

class Metric(BaseModel):
    """性能指标抽象"""
    id: Optional[str] = Field(None, description="指标记录ID(由存储生成)")
    instance_id: str = Field(..., description="对应的实例ID")
    service_id: str = Field(..., description="对应的服务ID")
    cpu_utilization: float = Field(..., ge=0.0, le=1.0, description="CPU使用率")
    memory_utilization: float = Field(..., ge=0.0, le=1.0, description="内存使用率")
    requests_per_second: float = Field(0.0, ge=0.0, description="每秒请求数")
    timestamp: datetime = Field(default_factory=datetime.utcnow, description="指标采集时间")

class Plan(BaseModel):
    """容量规划建议"""
    id: str = Field(..., description="规划唯一标识")
    service_id: str = Field(..., description="目标服务ID")
    action: PlanAction = Field(..., description="规划动作")
    reason: str = Field(..., description="规划原因")
    details: Dict[str, Any] = Field(default_factory=dict, description="规划详情,如目标实例数")
    generated_at: datetime = Field(default_factory=datetime.utcnow, description="规划生成时间")
    is_executed: bool = Field(False, description="是否已执行")

文件路径:core/storage.py

定义了一个简单的存储抽象层,并使用内存实现。在生产环境中,可替换为数据库实现。

"""
数据存储抽象层与内存实现。
"""
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
from uuid import uuid4
from datetime import datetime, timedelta
from .models import Service, Instance, Metric, Plan

class Storage(ABC):
    """存储抽象基类"""

    @abstractmethod
    def add_service(self, service: Service) -> Service:
        pass

    @abstractmethod
    def get_service(self, service_id: str) -> Optional[Service]:
        pass

    @abstractmethod
    def update_service(self, service: Service) -> Service:
        pass

    @abstractmethod
    def list_services(self) -> List[Service]:
        pass

    @abstractmethod
    def add_instance(self, instance: Instance) -> Instance:
        pass

    @abstractmethod
    def get_instances_by_service(self, service_id: str, active_only: bool = True) -> List[Instance]:
        pass

    @abstractmethod
    def add_metric(self, metric: Metric) -> Metric:
        pass

    @abstractmethod
    def get_metrics(self, service_id: str, start_time: datetime, end_time: datetime) -> List[Metric]:
        pass

    @abstractmethod
    def add_plan(self, plan: Plan) -> Plan:
        pass

    @abstractmethod
    def get_pending_plans(self, service_id: Optional[str] = None) -> List[Plan]:
        pass

    @abstractmethod
    def mark_plan_executed(self, plan_id: str):
        pass


class InMemoryStorage(Storage):
    """内存存储实现(用于演示)"""

    def __init__(self):
        self._services: Dict[str, Service] = {}
        self._instances: Dict[str, Instance] = {}
        self._metrics: List[Metric] = []
        self._plans: Dict[str, Plan] = {}

    def add_service(self, service: Service) -> Service:
        service.id = service.id or f"svc-{uuid4().hex[:8]}"
        service.updated_at = datetime.utcnow()
        self._services[service.id] = service
        return service

    def get_service(self, service_id: str) -> Optional[Service]:
        return self._services.get(service_id)

    def update_service(self, service: Service) -> Service:
        if service.id not in self._services:
            raise KeyError(f"Service {service.id} not found")
        service.updated_at = datetime.utcnow()
        self._services[service.id] = service
        return service

    def list_services(self) -> List[Service]:
        return list(self._services.values())

    def add_instance(self, instance: Instance) -> Instance:
        instance.id = instance.id or f"ins-{uuid4().hex[:8]}"
        self._instances[instance.id] = instance
        return instance

    def get_instances_by_service(self, service_id: str, active_only: bool = True) -> List[Instance]:
        instances = [i for i in self._instances.values() if i.service_id == service_id]
        if active_only:
            instances = [i for i in instances if i.is_active]
        return instances

    def add_metric(self, metric: Metric) -> Metric:
        metric.id = f"met-{uuid4().hex[:8]}"
        self._metrics.append(metric)
        return metric

    def get_metrics(self, service_id: str, start_time: datetime, end_time: datetime) -> List[Metric]:
        # 简单的时间过滤,实际项目需考虑索引
        filtered = [
            m for m in self._metrics
            if m.service_id == service_id and start_time <= m.timestamp <= end_time
        ]
        return sorted(filtered, key=lambda x: x.timestamp)

    def add_plan(self, plan: Plan) -> Plan:
        plan.id = plan.id or f"plan-{uuid4().hex[:8]}"
        self._plans[plan.id] = plan
        return plan

    def get_pending_plans(self, service_id: Optional[str] = None) -> List[Plan]:
        plans = [p for p in self._plans.values() if not p.is_executed]
        if service_id:
            plans = [p for p in plans if p.service_id == service_id]
        return sorted(plans, key=lambda x: x.generated_at)

    def mark_plan_executed(self, plan_id: str):
        if plan_id in self._plans:
            self._plans[plan_id].is_executed = True

文件路径:core/algorithm.py

实现了容量规划的核心逻辑。本示例采用一个简单的基于移动平均的预测算法。

"""
容量规划核心算法。
"""
from datetime import datetime, timedelta
from typing import List, Tuple, Optional
from .models import Service, Metric, Plan, PlanAction
from .storage import Storage

class CapacityPlanner:
    """容量规划器"""

    def __init__(self, storage: Storage):
        self.storage = storage

    def analyze_and_plan(self, service_id: str) -> Optional[Plan]:
        """
        分析指定服务的指标并生成规划建议。
        核心算法步骤:

        1. 获取服务配置与当前实例信息。
        2. 获取最近一段时间的历史指标。
        3. 预测下一时间段的负载(如CPU使用率)。
        4. 将预测负载与目标阈值、实例数进行比较。
        5. 生成扩容、缩容或无操作建议。
        """
        service = self.storage.get_service(service_id)
        if not service:
            return None

        # 1. 获取当前活跃实例
        current_instances = self.storage.get_instances_by_service(service_id, active_only=True)
        current_instance_count = len(current_instances)
        if current_instance_count == 0:
            # 没有活跃实例,可能是新服务或全部宕机,建议扩容到最小实例数
            if service.min_instances > 0:
                return self._create_plan(
                    service_id,
                    PlanAction.SCALE_OUT,
                    reason="No active instances found.",
                    target_instances=service.min_instances
                )
            return None

        # 2. 获取历史指标(例如过去30分钟)
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(minutes=30)
        historical_metrics = self.storage.get_metrics(service_id, start_time, end_time)

        if not historical_metrics:
            # 无历史数据,无法分析,建议保持现状
            return None

        # 3. 简单预测:使用最近N个指标的平均值作为预测值 (N=5)
        recent_metrics = historical_metrics[-5:] if len(historical_metrics) >= 5 else historical_metrics
        avg_cpu = sum(m.cpu_utilization for m in recent_metrics) / len(recent_metrics)
        avg_rps = sum(m.requests_per_second for m in recent_metrics) / len(recent_metrics)

        # 4. 基于预测的CPU利用率进行决策
        projected_cpu_per_instance = avg_cpu  # 假设负载均匀分布

        # 计算满足目标利用率所需的理想实例数
        # 所需实例数 = ceil(预测总负载 / (单个实例容量 * 目标利用率))
        # 简化:假设单个实例容量为1 (100% CPU),总负载为 `avg_cpu * current_instance_count`
        total_projected_load = projected_cpu_per_instance * current_instance_count
        desired_instance_count = self._calculate_desired_instances(
            total_projected_load, service.target_cpu_utilization
        )

        # 约束在最小/最大实例数之间
        desired_instance_count = max(service.min_instances, min(service.max_instances, desired_instance_count))

        # 5. 生成规划
        if desired_instance_count > current_instance_count:
            return self._create_plan(
                service_id,
                PlanAction.SCALE_OUT,
                reason=f"Projected CPU load ({projected_cpu_per_instance:.2%}) per instance exceeds "
                       f"target ({service.target_cpu_utilization:.0%}). "
                       f"Avg RPS: {avg_rps:.2f}",
                target_instances=desired_instance_count
            )
        elif desired_instance_count < current_instance_count and current_instance_count > service.min_instances:
            # 只有当当前实例数大于最小值时才考虑缩容
            return self._create_plan(
                service_id,
                PlanAction.SCALE_IN,
                reason=f"Projected CPU load ({projected_cpu_per_instance:.2%}) per instance is below "
                       f"target ({service.target_cpu_utilization:.0%}). Opportunity to save resources. "
                       f"Avg RPS: {avg_rps:.2f}",
                target_instances=desired_instance_count
            )
        else:
            # 无需操作
            return self._create_plan(
                service_id,
                PlanAction.DO_NOTHING,
                reason=f"Current configuration is optimal. "
                       f"Projected CPU: {projected_cpu_per_instance:.2%}, "
                       f"Target: {service.target_cpu_utilization:.0%}, "
                       f"Instances: {current_instance_count}",
                target_instances=current_instance_count
            )

    def _calculate_desired_instances(self, total_load: float, target_utilization: float) -> int:
        """计算达到目标利用率所需的实例数(向上取整)"""
        if target_utilization <= 0:
            return 1
        import math
        return math.ceil(total_load / target_utilization)

    def _create_plan(self,
                     service_id: str,
                     action: PlanAction,
                     reason: str,
                     target_instances: int) -> Plan:
        """创建规划对象"""
        from uuid import uuid4
        return Plan(
            id=f"plan-{uuid4().hex[:8]}",
            service_id=service_id,
            action=action,
            reason=reason,
            details={
                "target_instance_count": target_instances,
                "generated_by": "simple_capacity_planner_v1"
            }
        )

文件路径:core/config.py

管理应用配置,支持从环境变量加载。

"""
应用配置。
"""
import os
from pydantic import BaseSettings

class Settings(BaseSettings):
    """应用设置"""
    app_name: str = "Capacity Planning Service"
    app_version: str = "1.0.0"
    debug: bool = False

    # API配置
    api_prefix: str = "/api/v1"
    host: str = "0.0.0.0"
    port: int = 8000

    # 规划器配置
    planner_interval_seconds: int = 60  # 自动规划任务执行间隔

    class Config:
        env_file = ".env"

settings = Settings()

文件路径:api/dependencies.py

定义FastAPI的依赖项,用于在整个应用范围内共享单例对象(如存储、规划器)。

"""
FastAPI 依赖项。
"""
from core.storage import InMemoryStorage
from core.algorithm import CapacityPlanner

# 创建全局单例(简单起见,实际生产环境可能需要更复杂的管理)
_storage = InMemoryStorage()
_planner = CapacityPlanner(_storage)

def get_storage():
    """获取存储实例依赖"""
    return _storage

def get_planner():
    """获取容量规划器实例依赖"""
    return _planner

文件路径:api/endpoints.py

定义所有的RESTful API端点。这是系统与外界交互的主要入口。

"""
RESTful API 端点定义。
"""
from datetime import datetime, timedelta
from typing import List
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from core.models import Service, Instance, Metric, Plan
from core.algorithm import CapacityPlanner
from .dependencies import get_storage, get_planner
from core.storage import Storage

router = APIRouter()

# ---------- Service 相关接口 ----------
@router.post("/services", response_model=Service, tags=["services"])
async def create_service(service: Service, storage: Storage = Depends(get_storage)):
    """注册一个新的微服务"""
    existing = storage.get_service(service.id)
    if existing:
        raise HTTPException(status_code=409, detail=f"Service with ID {service.id} already exists.")
    return storage.add_service(service)

@router.get("/services", response_model=List[Service], tags=["services"])
async def list_services(storage: Storage = Depends(get_storage)):
    """获取所有已注册服务"""
    return storage.list_services()

@router.get("/services/{service_id}", response_model=Service, tags=["services"])
async def get_service(service_id: str, storage: Storage = Depends(get_storage)):
    """获取指定服务详情"""
    service = storage.get_service(service_id)
    if not service:
        raise HTTPException(status_code=404, detail=f"Service {service_id} not found.")
    return service

# ---------- Instance 相关接口 ----------
@router.post("/instances", response_model=Instance, tags=["instances"])
async def register_instance(instance: Instance, storage: Storage = Depends(get_storage)):
    """注册一个服务实例(通常由服务启动时调用)"""
    # 检查对应服务是否存在
    service = storage.get_service(instance.service_id)
    if not service:
        raise HTTPException(status_code=404, detail=f"Service {instance.service_id} not found.")
    return storage.add_instance(instance)

@router.get("/services/{service_id}/instances", response_model=List[Instance], tags=["instances"])
async def list_service_instances(service_id: str,
                                 active_only: bool = True,
                                 storage: Storage = Depends(get_storage)):
    """获取指定服务的所有实例"""
    return storage.get_instances_by_service(service_id, active_only=active_only)

# ---------- Metric 相关接口 ----------
@router.post("/metrics", response_model=Metric, tags=["metrics"])
async def submit_metric(metric: Metric, storage: Storage = Depends(get_storage)):
    """提交性能指标数据(通常由监控Agent周期性推送)"""
    # 可选:验证对应的实例是否存在
    return storage.add_metric(metric)

@router.get("/services/{service_id}/metrics", response_model=List[Metric], tags=["metrics"])
async def get_service_metrics(service_id: str,
                              minutes: int = 30,
                              storage: Storage = Depends(get_storage)):
    """获取指定服务的历史指标"""
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(minutes=minutes)
    return storage.get_metrics(service_id, start_time, end_time)

# ---------- Plan 相关接口 ----------
@router.post("/services/{service_id}/plan", response_model=Plan, tags=["plan"])
async def trigger_planning(service_id: str,
                           background_tasks: BackgroundTasks,
                           planner: CapacityPlanner = Depends(get_planner)):
    """手动触发对指定服务的容量规划分析"""
    plan = planner.analyze_and_plan(service_id)
    if not plan:
        raise HTTPException(status_code=404, detail=f"No actionable plan generated for service {service_id}.")
    # 注意:这里直接返回规划,实际应由后台任务保存到存储
    return plan

@router.get("/plans/pending", response_model=List[Plan], tags=["plan"])
async def get_pending_plans(service_id: str = None, storage: Storage = Depends(get_storage)):
    """获取所有(或指定服务的)待执行规划"""
    return storage.get_pending_plans(service_id)

@router.post("/plans/{plan_id}/execute", response_model=Plan, tags=["plan"])
async def execute_plan(plan_id: str, storage: Storage = Depends(get_storage)):
    """标记一个规划为已执行(通常由外部编排系统调用)"""
    # 简化:只标记为已执行。实际场景会触发真正的扩缩容操作并更新服务实例数。
    storage.mark_plan_executed(plan_id)
    # 返回更新后的规划(需要从存储重新获取,这里简化)
    # 实际应实现一个 get_plan_by_id 方法
    all_plans = storage.get_pending_plans() + [p for p in storage._plans.values() if p.is_executed]
    for p in all_plans:
        if p.id == plan_id:
            return p
    raise HTTPException(status_code=404, detail=f"Plan {plan_id} not found.")

文件路径:app.py

这是FastAPI应用的入口文件,负责组装所有组件并启动服务。

"""
容量规划系统主应用入口。
"""
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from core.config import settings
from api.endpoints import router as api_router
from core.storage import InMemoryStorage
from core.algorithm import CapacityPlanner

# 配置日志
logging.basicConfig(level=logging.INFO if not settings.debug else logging.DEBUG)
logger = logging.getLogger(__name__)

# 全局存储与规划器实例
storage = None
planner = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    应用生命周期管理。
    启动时:初始化全局组件。
    关闭时:清理资源(本示例无)。
    """
    global storage, planner
    # 启动
    logger.info(f"Starting {settings.app_name} v{settings.app_version}")
    storage = InMemoryStorage()
    planner = CapacityPlanner(storage)
    logger.info("Global storage and planner initialized.")

    # 创建一些示例数据用于演示
    from core.models import Service, Instance, Metric
    from datetime import datetime, timedelta
    import random
    try:
        demo_svc = Service(id="demo-frontend", name="Frontend Web Service", min_instances=2, max_instances=10)
        storage.add_service(demo_svc)
        for i in range(3):
            ins = Instance(id=f"demo-ins-{i}", service_id="demo-frontend", host=f"host-{i}.example.com", port=8080)
            storage.add_instance(ins)
        # 生成一些过去30分钟的模拟指标
        now = datetime.utcnow()
        for i in range(30):
            for ins in storage.get_instances_by_service("demo-frontend"):
                m_time = now - timedelta(minutes=(30 - i))
                metric = Metric(
                    instance_id=ins.id,
                    service_id="demo-frontend",
                    cpu_utilization=random.uniform(0.3, 0.8),  # 模拟30%-80%的CPU
                    memory_utilization=random.uniform(0.4, 0.6),
                    requests_per_second=random.uniform(50, 200),
                    timestamp=m_time
                )
                storage.add_metric(metric)
        logger.info("Demo data created for service 'demo-frontend'.")
    except Exception as e:
        logger.warning(f"Could not create demo data: {e}")

    yield
    # 关闭
    logger.info("Shutting down capacity planning service.")

# 创建FastAPI应用实例
app = FastAPI(
    title=settings.app_name,
    version=settings.app_version,
    debug=settings.debug,
    lifespan=lifespan,
    openapi_url=f"{settings.api_prefix}/openapi.json",
    docs_url="/docs",
    redoc_url="/redoc"
)

# 添加CORS中间件(便于前端调试)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 生产环境应限制
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 挂载API路由
app.include_router(api_router, prefix=settings.api_prefix)

@app.get("/", tags=["root"])
async def root():
    """根端点,返回服务信息"""
    return {
        "service": settings.app_name,
        "version": settings.app_version,
        "docs": "/docs",
        "api_base": settings.api_prefix
    }

@app.get("/health", tags=["health"])
async def health_check():
    """健康检查端点"""
    return {"status": "healthy"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(
        "app:app",
        host=settings.host,
        port=settings.port,
        reload=settings.debug,
        log_level="info"
    )

文件路径:requirements.txt

项目的Python依赖。

fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
pydantic-settings==2.1.0

文件路径:.env.example

环境变量示例文件。

# 应用配置
DEBUG=False
APP_NAME="Capacity Planner Demo"
APP_VERSION="1.0.0"

# 服务器配置
HOST="0.0.0.0"
PORT=8000
API_PREFIX="/api/v1"

# 规划器配置
PLANNER_INTERVAL_SECONDS=60

4. 安装依赖与运行步骤

第一步:准备环境

确保已安装Python 3.8+。

# 克隆或创建项目目录
mkdir capacity-planner-demo && cd capacity-planner-demo

第二步:安装依赖

将上述所有代码文件按项目结构树放置到对应目录中。

# 创建并激活虚拟环境(可选但推荐)
python -m venv venv
# Linux/Mac:
source venv/bin/activate
# Windows:
# venv\Scripts\activate

# 安装依赖包
pip install -r requirements.txt

第三步:运行应用

直接运行主应用文件。

python app.py

控制台将输出类似以下信息:

INFO:     Started server process [xxxx]
INFO:     Waiting for application startup.
INFO:     Global storage and planner initialized.
INFO:     Demo data created for service 'demo-frontend'.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)

现在,容量规划服务已在本地的 http://localhost:8000 启动。

5. 测试与验证步骤

5.1 通过API文档交互

打开浏览器,访问 http://localhost:8000/docs,你将看到自动生成的Swagger UI界面,可以在此直接测试所有API。

5.2 使用cURL进行基础测试

  1. 查看已注册的示例服务
curl -X GET "http://localhost:8000/api/v1/services" -H "accept: application/json"
应返回包含`demo-frontend`服务的列表。
  1. 获取该服务的实例列表
curl -X GET "http://localhost:8000/api/v1/services/demo-frontend/instances" -H "accept: application/json"
  1. 手动触发一次容量规划
curl -X POST "http://localhost:8000/api/v1/services/demo-frontend/plan" -H "accept: application/json"
系统将基于模拟的历史指标(CPU 30%-80%)和预设的目标CPU利用率(65%),计算并返回一个规划建议。根据随机生成的指标,结果可能是`scale_out`、`scale_in`或`do_nothing`。
  1. 查看待执行的规划
curl -X GET "http://localhost:8000/api/v1/plans/pending" -H "accept: application/json"

5.3 模拟监控指标上报

使用以下cURL命令模拟监控系统上报一条新的高负载指标,这可能导致下次规划倾向于扩容。

curl -X POST "http://localhost:8000/api/v1/metrics" \
  -H "Content-Type: application/json" \
  -d '{
    "instance_id": "demo-ins-0",
    "service_id": "demo-frontend",
    "cpu_utilization": 0.95,
    "memory_utilization": 0.5,
    "requests_per_second": 500
  }'

提交后,再次触发规划(步骤3),观察规划建议的变化。

sequenceDiagram participant A as 监控Agent participant B as API /metrics participant C as Storage participant D as Scheduler/Manual participant E as Planner participant F as API /plan participant G as API /plans/pending participant H as 编排系统 Note over A,D: 周期性/事件驱动流程 A->>B: POST 指标数据 B->>C: storage.add_metric() C-->>B: 确认 B-->>A: 201 Created Note over D,E: 规划生成流程 D->>F: POST /services/{id}/plan F->>E: planner.analyze_and_plan() E->>C: 获取服务、实例、历史指标 C-->>E: 返回数据 E->>E: 执行预测与决策算法 E->>C: 存储生成的Plan C-->>E: 确认 E-->>F: 返回Plan F-->>D: 返回Plan Note over H,G: 规划消费与执行流程 H->>G: GET /plans/pending G->>C: storage.get_pending_plans() C-->>G: 返回Plan列表 G-->>H: 返回Plan列表 H->>H: 执行扩缩容动作 H->>B: POST /plans/{id}/execute (标记为已执行)

上图展示了系统核心的时序流程:指标上报、规划生成、规划消费与执行。

6. 总结与扩展方向

本项目实现了一个具备核心抽象的、面向容量规划的微服务系统原型。通过运行此项目,你可以直观理解Service、Instance、Metric、Plan等抽象如何协作,以及一个简单的基于阈值与预测的规划算法如何工作。

6.1 性能与生产级考虑

  • 存储InMemoryStorage 仅用于演示。生产环境应替换为持久化数据库(如PostgreSQL、MongoDB),并需设计合适的索引以高效查询时间序列指标。
  • 算法:示例算法非常基础。实际容量规划需考虑:
    • 更复杂的预测模型:如ARIMA、LSTM、Prophet等,用于预测QPS、CPU、内存等多维度指标。
    • 多维约束:同时考虑CPU、内存、I/O、网络带宽以及实例启动时间。
    • 成本模型:结合云厂商的实例定价,做出成本最优的决策。
    • 季节性:区分工作日/周末、促销活动等不同流量模式。
  • 可扩展性:API层应增加认证/授权、速率限制。核心算法可设计为插件化,方便替换或组合不同策略。
  • 可靠性:规划器的自动任务应有分布式锁,避免多个节点重复执行。规划执行应具备幂等性和补偿机制。

6.2 与现有生态集成

  • 监控数据源:可编写适配器,从Prometheus、Datadog、New Relic等系统拉取或接收指标,而非仅通过API推送。
  • 执行器:规划的执行不应只标记为executed。应通过调用Kubernetes API(进行HPA更新或Deployment副本数修改)、Terraform或云厂商的SDK(如AWS Auto Scaling Group)来实际改变资源配置。

6.3 最佳实践

  • 逐步应用:先对非关键服务进行"只观察"模式的容量规划,验证算法准确性后再应用于生产。
  • 人工审核:重要服务的自动规划可先进入"待审批"状态,由运维人员确认后再执行。
  • 持续调优:定期回顾规划的准确性与效果,调整服务的目标阈值(如target_cpu_utilization)和算法参数。

通过扩展和完善此原型系统,你可以构建一个真正赋能于微服务架构,实现高可用性与成本效益平衡的智能容量规划平台。