摘要
本文介绍了一个面向高并发场景的大语言模型(LLM)推理服务监控与自愈系统的完整实现。项目构建了一个模拟的LLM推理服务,并集成了多维度的可观测性数据(指标、日志、追踪)采集。核心在于设计了一个基于规则的故障自愈控制器,它持续分析监控指标,在检测到延迟飙升、错误率增高等异常时,自动执行熔断、降级、服务重启等恢复动作,形成"观测-分析-决策-执行"的闭环。文章提供了全部可运行的项目代码、清晰的架构说明以及详细的部署验证步骤,为构建具备韧性的AI服务提供了实践参考。
1. 项目概述与设计思路
在高并发场景下部署大语言模型(LLM)推理服务,面临延迟波动、显存溢出、令牌限制、第三方API不稳定等诸多挑战。单纯依赖人工监控和干预效率低下,无法满足SLA要求。因此,实现系统的可观测性闭环与故障自愈能力至关重要。
设计目标:
- 可观测性:全面采集服务指标(QPS、延迟、错误率、Token用量)、日志和分布式追踪。
- 闭环:监控数据能实时反馈给决策系统。
- 自愈:决策系统能自动分析故障模式并触发修复动作,无需人工介入。
系统架构:
我们设计一个简单的微服务系统,包含以下核心组件:
- LLM推理服务(LLM-Inference-Service):模拟高并发LLM请求处理,暴露Prometheus指标端点。
- 监控服务(Monitoring-Service):周期性拉取推理服务指标,进行简单的异常检测(如阈值判断),并将异常事件发布到消息队列。
- 自愈控制器(Self-Healing-Controller):订阅异常事件,根据预定义规则(如"P99延迟 > 3秒")决策并执行自愈动作(如"熔断到轻量级模型"、"重启服务实例")。
- 配置与模拟客户端:提供系统配置和管理模拟请求的客户端。
2. 项目结构树
llm-observability-selfhealing/
├── config/
│ ├── __init__.py
│ └── settings.py # 应用配置
├── core/
│ ├── __init__.py
│ ├── llm_client.py # 模拟LLM客户端
│ └── metrics.py # 指标定义与收集
├── monitoring/
│ ├── __init__.py
│ └── collector.py # 监控数据收集与异常检测
├── self_healing/
│ ├── __init__.py
│ ├── actions.py # 自愈动作执行器
│ ├── controller.py # 自愈决策控制器
│ └── rules.py # 自愈规则定义
├── services/
│ ├── __init__.py
│ ├── inference_service.py # LLM推理服务主程序
│ └── monitoring_service.py# 监控服务主程序
├── requirements.txt
├── run_inference.py # 启动推理服务
├── run_monitoring.py # 启动监控服务
├── run_selfhealing.py # 启动自愈控制器
└── simulate_traffic.py # 模拟请求客户端
3. 核心代码实现
3.1 文件路径:config/settings.py
import os
from typing import Dict, Any
class Settings:
"""应用配置中心"""
# 服务配置
INFERENCE_SERVICE_HOST = os.getenv("INFERENCE_HOST", "0.0.0.0")
INFERENCE_SERVICE_PORT = int(os.getenv("INFERENCE_PORT", "8000"))
MONITORING_SERVICE_HOST = os.getenv("MONITORING_HOST", "0.0.0.0")
MONITORING_SERVICE_PORT = int(os.getenv("MONITORING_PORT", "8001"))
# Prometheus指标路径
METRICS_PATH = "/metrics"
# 模拟LLM配置
SIMULATED_LLM_NORMAL_LATENCY_MS = 500 # 正常延迟(毫秒)
SIMULATED_LLM_ERROR_RATE = 0.02 # 基础错误率 2%
SIMULATED_LLM_TOKEN_LIMIT = 4096 # 令牌限制
# 监控配置
MONITORING_INTERVAL_SECONDS = 5 # 监控采集间隔
PROMETHEUS_TARGET_URL = f"http://{INFERENCE_SERVICE_HOST}:{INFERENCE_SERVICE_PORT}"
# 异常检测阈值
THRESHOLDS: Dict[str, Any] = {
"p99_latency_ms": 3000, # P99延迟超过3秒
"error_rate": 0.10, # 错误率超过10%
"high_token_usage_ratio": 0.9 # 令牌使用率超过90%
}
# 自愈动作配置
SELF_HEALING_ACTION_COOLDOWN_SECONDS = 60 # 同一服务冷却时间
CIRCUIT_BREAKER_FAILURE_THRESHOLD = 5 # 熔断器失败阈值
CIRCUIT_BREAKER_RESET_TIMEOUT_SECONDS = 30 # 熔断器重置时间
# 消息队列配置 (使用Redis模拟)
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
REDIS_CHANNEL_ALERTS = "alerts:llm_service"
settings = Settings()
3.2 文件路径:core/llm_client.py
import random
import time
import logging
from typing import Optional, Dict, Any, Tuple
from config.settings import settings
logger = logging.getLogger(__name__)
class SimulatedLLMClient:
"""
模拟LLM API客户端。
在高并发下可模拟:延迟增加、随机错误、令牌限制。
"""
def __init__(self, model_name: str = "gpt-simulated"):
self.model_name = model_name
self._failure_count = 0 # 用于模拟累积故障
self._is_circuit_open = False
self._circuit_opened_at: Optional[float] = None
def generate(self, prompt: str, max_tokens: int = 100) -> Tuple[Optional[str], Dict[str, Any]]:
"""
模拟生成文本。
返回: (生成的文本或None, 元数据字典)
"""
metadata = {
"model": self.model_name,
"prompt_tokens": len(prompt.split()),
"completion_tokens": 0,
"total_tokens": 0,
"latency_ms": 0,
"success": False
}
# 1. 检查熔断器
if self._is_circuit_open:
if time.time() - self._circuit_opened_at < settings.CIRCUIT_BREAKER_RESET_TIMEOUT_SECONDS:
logger.warning(f"Circuit breaker OPEN for {self.model_name}. Request blocked.")
metadata["error"] = "circuit_breaker_open"
return None, metadata
else:
# 尝试半开状态重置
logger.info(f"Circuit breaker transitioning to HALF-OPEN for {self.model_name}.")
self._is_circuit_open = False
start_time = time.time()
# 2. 模拟令牌超限
if max_tokens > settings.SIMULATED_LLM_TOKEN_LIMIT:
metadata["error"] = "token_limit_exceeded"
metadata["latency_ms"] = int((time.time() - start_time) * 1000)
self._record_failure()
return None, metadata
# 3. 基础延迟 + 并发压力模拟(延迟随失败次数增加)
pressure_factor = 1 + self._failure_count * 0.1
base_latency = settings.SIMULATED_LLM_NORMAL_LATENCY_MS / 1000.0 # 转秒
simulated_latency = base_latency * pressure_factor * random.uniform(0.8, 1.2)
# 4. 模拟随机错误
if random.random() < settings.SIMULATED_LLM_ERROR_RATE * pressure_factor:
time.sleep(simulated_latency)
metadata["error"] = "simulated_llm_error"
metadata["latency_ms"] = int(simulated_latency * 1000)
self._record_failure()
return None, metadata
# 5. 正常响应
time.sleep(simulated_latency)
completion_tokens = random.randint(1, max_tokens)
metadata.update({
"success": True,
"completion_tokens": completion_tokens,
"total_tokens": metadata["prompt_tokens"] + completion_tokens,
"latency_ms": int(simulated_latency * 1000)
})
self._reset_failure() # 成功请求重置失败计数
return f"Simulated response for '{prompt[:20]}...'", metadata
def _record_failure(self):
"""记录失败,可能触发熔断"""
self._failure_count += 1
if self._failure_count >= settings.CIRCUIT_BREAKER_FAILURE_THRESHOLD:
self._is_circuit_open = True
self._circuit_opened_at = time.time()
logger.error(f"Circuit breaker TRIPPED for {self.model_name}.")
def _reset_failure(self):
"""重置失败计数"""
self._failure_count = 0
self._is_circuit_open = False
def force_degrade_latency(self, factor: float = 2.0):
"""模拟性能降级(用于测试)"""
global SIMULATED_LLM_NORMAL_LATENCY_MS
settings.SIMULATED_LLM_NORMAL_LATENCY_MS = int(settings.SIMULATED_LLM_NORMAL_LATENCY_MS * factor)
logger.warning(f"Simulated LLM latency degraded by factor {factor}.")
3.3 文件路径:core/metrics.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest, REGISTRY
import time
from typing import Dict, Any
# 定义指标
LLM_REQUESTS_TOTAL = Counter(
'llm_requests_total',
'Total number of LLM requests',
['model', 'status'] # 标签:模型名称,状态(success, error)
)
LLM_REQUEST_DURATION_MS = Histogram(
'llm_request_duration_milliseconds',
'LLM request duration in milliseconds',
['model'],
buckets=(50, 100, 200, 500, 1000, 2000, 5000, 10000) # 自定义桶
)
LLM_TOKENS_TOTAL = Counter(
'llm_tokens_total',
'Total tokens processed',
['model', 'type'] # type: prompt, completion
)
LLM_CIRCUIT_BREAKER_STATE = Gauge(
'llm_circuit_breaker_state',
'Circuit breaker state (0=closed, 1=open, 2=half-open)',
['model']
)
LLM_ERROR_RATE_GAUGE = Gauge(
'llm_error_rate',
'Current error rate (rolling window)',
['model']
)
class MetricsCollector:
"""指标收集器,封装Prometheus指标更新逻辑"""
@staticmethod
def record_request(model: str, metadata: Dict[str, Any]):
"""记录一次请求的指标"""
status = 'success' if metadata.get('success') else 'error'
LLM_REQUESTS_TOTAL.labels(model=model, status=status).inc()
latency = metadata.get('latency_ms', 0)
if latency > 0:
LLM_REQUEST_DURATION_MS.labels(model=model).observe(latency)
if metadata.get('prompt_tokens'):
LLM_TOKENS_TOTAL.labels(model=model, type='prompt').inc(metadata['prompt_tokens'])
if metadata.get('completion_tokens'):
LLM_TOKENS_TOTAL.labels(model=model, type='completion').inc(metadata['completion_tokens'])
@staticmethod
def update_circuit_breaker_state(model: str, state: int):
"""更新熔断器状态指标 (0: closed, 1: open, 2: half-open)"""
LLM_CIRCUIT_BREAKER_STATE.labels(model=model).set(state)
@staticmethod
def update_error_rate(model: str, rate: float):
"""更新错误率指标"""
LLM_ERROR_RATE_GAUGE.labels(model=model).set(rate)
@staticmethod
def get_metrics():
"""返回Prometheus格式的指标数据"""
return generate_latest(REGISTRY)
3.4 文件路径:services/inference_service.py
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from fastapi.responses import Response, JSONResponse
import uvicorn
import logging
import random
from typing import Dict
from datetime import datetime
from config.settings import settings
from core.llm_client import SimulatedLLMClient
from core.metrics import MetricsCollector
# 初始化
app = FastAPI(title="LLM Inference Service")
logger = logging.getLogger("inference_service")
llm_client = SimulatedLLMClient(model_name="simulated-llm-v1")
# 内存中的请求日志(简化,生产环境应使用外部存储)
recent_requests = []
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
"""中间件:记录请求耗时,用于内部追踪"""
start_time = time.time()
response = await call_next(request)
process_time = (time.time() - start_time) * 1000
response.headers["X-Process-Time-MS"] = str(process_time)
# 记录追踪信息(简化)
logger.debug(f"{request.method} {request.url.path} took {process_time:.2f}ms")
return response
@app.post("/v1/completions")
async def create_completion(request: Request, background_tasks: BackgroundTasks):
"""模拟OpenAI兼容的补全接口"""
try:
body = await request.json()
prompt = body.get("prompt", "Hello, world!")
max_tokens = body.get("max_tokens", 100)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid request body: {e}")
# 调用模拟LLM
text, metadata = llm_client.generate(prompt, max_tokens)
# 异步记录指标(避免阻塞响应)
background_tasks.add_task(MetricsCollector.record_request, llm_client.model_name, metadata)
# 记录请求日志(简化)
recent_requests.append({
"timestamp": datetime.utcnow().isoformat(),
"prompt": prompt[:50],
**metadata
})
if len(recent_requests) > 1000: # 简单滚动窗口
recent_requests.pop(0)
if metadata["success"]:
return JSONResponse(content={
"model": metadata["model"],
"choices": [{"text": text}],
"usage": {
"prompt_tokens": metadata["prompt_tokens"],
"completion_tokens": metadata["completion_tokens"],
"total_tokens": metadata["total_tokens"]
}
})
else:
# 返回模拟的错误
error_msg = metadata.get("error", "unknown_error")
return JSONResponse(
status_code=429 if error_msg == "token_limit_exceeded" else 500,
content={"error": {"message": f"Simulated error: {error_msg}"}}
)
@app.get(settings.METRICS_PATH)
async def get_metrics():
"""暴露Prometheus指标端点"""
return Response(content=MetricsCollector.get_metrics(), media_type="text/plain")
@app.get("/health")
async def health_check():
"""健康检查端点"""
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
@app.get("/debug/requests")
async def get_recent_requests(limit: int = 10):
"""调试端点:查看最近请求(非生产功能)"""
return {"recent_requests": recent_requests[-limit:]}
if __name__ == "__main__":
uvicorn.run(
app,
host=settings.INFERENCE_SERVICE_HOST,
port=settings.INFERENCE_SERVICE_PORT,
log_level="info"
)
3.5 文件路径:monitoring/collector.py
import requests
import logging
import time
import json
import redis
from typing import Dict, Any, Optional
from prometheus_client.parser import text_string_to_metric_families
from config import settings
logger = logging.getLogger(__name__)
class MonitoringCollector:
"""监控收集器:拉取指标、分析异常、发布警报"""
def __init__(self):
self.prometheus_url = f"{settings.PROMETHEUS_TARGET_URL}{settings.METRICS_PATH}"
# 连接Redis作为简单的消息队列
try:
self.redis_client = redis.Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
decode_responses=True,
socket_connect_timeout=3
)
self.redis_client.ping()
logger.info("Connected to Redis for alerting.")
except redis.ConnectionError as e:
logger.error(f"Could not connect to Redis: {e}. Alerts will be logged only.")
self.redis_client = None
self._last_error_rate = 0.0
def fetch_metrics(self) -> Optional[Dict[str, Any]]:
"""从目标服务拉取Prometheus指标并解析"""
try:
resp = requests.get(self.prometheus_url, timeout=5)
resp.raise_for_status()
metrics_data = {}
for family in text_string_to_metric_families(resp.text):
for sample in family.samples:
# 简化处理:我们关注几个关键指标
key = sample.name
if sample.labels:
key += f"{{{','.join([f'{k}={v}' for k,v in sample.labels.items()])}}}"
metrics_data[key] = sample.value
return metrics_data
except requests.exceptions.RequestException as e:
logger.error(f"Failed to fetch metrics from {self.prometheus_url}: {e}")
return None
def analyze_metrics(self, metrics: Dict[str, Any]) -> list:
"""分析指标,检测异常,返回异常事件列表"""
alerts = []
model_label = "model=\"simulated-llm-v1\""
# 1. 分析延迟 (P99近似估算:假设histogram中`le`桶的最后一个值)
latency_buckets = {}
for key, value in metrics.items():
if key.startswith('llm_request_duration_milliseconds_bucket{'):
if model_label in key:
# 解析桶的`le`标签值
import re
le_match = re.search(r'le="([0-9.]+)"', key)
if le_match:
le_val = float(le_match.group(1))
latency_buckets[le_val] = value
if latency_buckets:
# 简单计算:最后一个`le`桶的计数可以视为总请求数,找出P99所在的桶
sorted_buckets = sorted(latency_buckets.items())
total_requests = sorted_buckets[-1][1] if sorted_buckets else 0
if total_requests > 0:
percentile = 0.99
target_count = total_requests * percentile
cumulative = 0
p99_latency = sorted_buckets[-1][0] # 默认取最大值
for le, count in sorted_buckets:
cumulative += count
if cumulative >= target_count:
p99_latency = le
break
if p99_latency > settings.THRESHOLDS["p99_latency_ms"]:
alerts.append({
"type": "high_latency",
"severity": "warning",
"metric": "p99_latency_ms",
"value": p99_latency,
"threshold": settings.THRESHOLDS["p99_latency_ms"],
"timestamp": time.time()
})
# 2. 分析错误率
success_key = f'llm_requests_total{{model="simulated-llm-v1",status="success"}}'
error_key = f'llm_requests_total{{model="simulated-llm-v1",status="error"}}'
success_count = metrics.get(success_key, 0)
error_count = metrics.get(error_key, 0)
total_count = success_count + error_count
if total_count > 0:
current_error_rate = error_count / total_count
# 更新核心指标中的错误率(供仪表盘查看)
from core.metrics import MetricsCollector
MetricsCollector.update_error_rate("simulated-llm-v1", current_error_rate)
if current_error_rate > settings.THRESHOLDS["error_rate"]:
alerts.append({
"type": "high_error_rate",
"severity": "critical",
"metric": "error_rate",
"value": current_error_rate,
"threshold": settings.THRESHOLDS["error_rate"],
"timestamp": time.time()
})
self._last_error_rate = current_error_rate
# 3. 分析令牌使用率 (模拟)
total_tokens_key = f'llm_tokens_total{{model="simulated-llm-v1",type="total"}}' # 注:需调整指标
# 简化:直接使用请求数估算
if total_count > 50: # 有一定基数
avg_tokens_per_req = 150 # 假设值
estimated_token_usage = total_count * avg_tokens_per_req
usage_ratio = estimated_token_usage / (settings.SIMULATED_LLM_TOKEN_LIMIT * 10) # 假设的周期限值
if usage_ratio > settings.THRESHOLDS["high_token_usage_ratio"]:
alerts.append({
"type": "high_token_usage",
"severity": "warning",
"metric": "token_usage_ratio",
"value": usage_ratio,
"threshold": settings.THRESHOLDS["high_token_usage_ratio"],
"timestamp": time.time()
})
return alerts
def publish_alert(self, alert: Dict[str, Any]):
"""发布警报到消息队列(Redis Pub/Sub)"""
alert_str = json.dumps(alert)
logger.warning(f"Alert generated: {alert_str}")
if self.redis_client:
try:
self.redis_client.publish(settings.REDIS_CHANNEL_ALERTS, alert_str)
except redis.RedisError as e:
logger.error(f"Failed to publish alert to Redis: {e}")
def run_one_cycle(self):
"""执行一次监控采集与分析周期"""
logger.debug("Starting monitoring cycle...")
metrics = self.fetch_metrics()
if metrics:
alerts = self.analyze_metrics(metrics)
for alert in alerts:
self.publish_alert(alert)
logger.info(f"Alert published: {alert['type']} (severity: {alert['severity']})")
else:
logger.warning("No metrics fetched, skipping analysis.")
def main():
"""监控服务主循环"""
import signal
logging.basicConfig(level=logging.INFO)
collector = MonitoringCollector()
def shutdown(signum, frame):
logger.info("Shutdown signal received. Exiting.")
exit(0)
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
logger.info(f"Monitoring service started. Target: {collector.prometheus_url}")
while True:
start = time.time()
collector.run_one_cycle()
elapsed = time.time() - start
sleep_time = max(0, settings.MONITORING_INTERVAL_SECONDS - elapsed)
time.sleep(sleep_time)
if __name__ == "__main__":
main()
3.6 文件路径:self_healing/controller.py
import json
import logging
import time
import redis
import threading
from typing import Dict, Any, Callable
from datetime import datetime, timedelta
from config import settings
from . import rules
from . import actions
logger = logging.getLogger(__name__)
class SelfHealingController:
"""
自愈控制器。
订阅警报通道,根据规则匹配,执行相应的自愈动作。
包含简单的冷却机制防止动作风暴。
"""
def __init__(self):
try:
self.redis_client = redis.Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
decode_responses=True,
socket_connect_timeout=3
)
self.pubsub = self.redis_client.pubsub()
self.pubsub.subscribe(settings.REDIS_CHANNEL_ALERTS)
logger.info(f"Subscribed to channel: {settings.REDIS_CHANNEL_ALERTS}")
except redis.ConnectionError as e:
logger.error(f"Could not connect to Redis: {e}. Controller cannot receive alerts.")
self.pubsub = None
# 注册规则与动作的映射
self.rule_action_map: Dict[str, Callable] = {
"high_latency": self._handle_high_latency,
"high_error_rate": self._handle_high_error_rate,
"high_token_usage": self._handle_high_token_usage,
}
# 动作冷却记录 {action_key: last_executed_time}
self.cooldown_registry = {}
self.action_executor = actions.ActionExecutor()
def _is_in_cooldown(self, action_key: str) -> bool:
"""检查指定动作是否在冷却期内"""
last_time = self.cooldown_registry.get(action_key)
if not last_time:
return False
cooldown_end = last_time + timedelta(seconds=settings.SELF_HEALING_ACTION_COOLDOWN_SECONDS)
return datetime.utcnow() < cooldown_end
def _record_cooldown(self, action_key: str):
"""记录动作执行时间,进入冷却"""
self.cooldown_registry[action_key] = datetime.utcnow()
def _handle_high_latency(self, alert: Dict[str, Any]):
"""处理高延迟警报"""
# 规则1: 若错误率不高,可能是负载问题,尝试扩容或降级
# 规则2: 若错误率也高,可能服务异常,尝试重启
# 此处简化:直接触发延迟降级动作
action_key = f"degrade_latency_{alert['timestamp']}"
if not self._is_in_cooldown("degrade_latency"):
logger.info(f"Executing action for high latency: degrade model latency.")
self.action_executor.degrade_model_performance(factor=0.7) # 降低70%延迟(模拟)
self._record_cooldown("degrade_latency")
else:
logger.warning("Action 'degrade_latency' is in cooldown. Skipping.")
def _handle_high_error_rate(self, alert: Dict[str, Any]):
"""处理高错误率警报"""
# 规则1: 触发熔断
# 规则2: 重启服务实例
action_key_circuit = f"trip_circuit_breaker_{alert['timestamp']}"
if not self._is_in_cooldown("trip_circuit_breaker"):
logger.critical(f"Executing action for high error rate: trip circuit breaker.")
self.action_executor.trip_circuit_breaker(service_id="inference_service_1")
self._record_cooldown("trip_circuit_breaker")
else:
# 如果在冷却期,考虑重启
action_key_restart = f"restart_service_{alert['timestamp']}"
if not self._is_in_cooldown("restart_service"):
logger.critical(f"Circuit breaker cooldown. Attempting service restart.")
self.action_executor.restart_service(service_id="inference_service_1")
self._record_cooldown("restart_service")
else:
logger.error("Both circuit breaker and restart are in cooldown. Manual intervention may be needed.")
def _handle_high_token_usage(self, alert: Dict[str, Any]):
"""处理高令牌使用警报"""
# 规则: 触发自动缩容或提示管理员
logger.warning(f"High token usage detected ({alert['value']:.2%}). Sending notification.")
self.action_executor.send_notification(
f"⚠️ High token usage alert: {alert['value']:.2%} of limit."
)
def process_alert(self, alert_data: str):
"""处理一条警报消息"""
try:
alert = json.loads(alert_data)
alert_type = alert.get("type")
handler = self.rule_action_map.get(alert_type)
if handler:
logger.info(f"Processing alert: {alert_type}")
handler(alert)
else:
logger.debug(f"No handler for alert type: {alert_type}")
except json.JSONDecodeError as e:
logger.error(f"Failed to decode alert JSON: {e}, data: {alert_data}")
except Exception as e:
logger.exception(f"Unexpected error processing alert: {e}")
def run(self):
"""主循环:监听警报并处理"""
if not self.pubsub:
logger.error("PubSub not available. Exiting controller.")
return
logger.info("Self-healing controller started. Listening for alerts...")
for message in self.pubsub.listen():
if message['type'] == 'message':
self.process_alert(message['data'])
def main():
"""自愈控制器主入口"""
logging.basicConfig(level=logging.INFO)
controller = SelfHealingController()
controller.run()
if __name__ == "__main__":
main()
3.7 文件路径:self_healing/actions.py
import logging
import requests
import time
import subprocess
import sys
import os
from typing import Optional
from config import settings
logger = logging.getLogger(__name__)
class ActionExecutor:
"""执行具体自愈动作的类"""
def __init__(self):
self.service_base_url = f"http://{settings.INFERENCE_SERVICE_HOST}:{settings.INFERENCE_SERVICE_PORT}"
def degrade_model_performance(self, factor: float = 0.7):
"""
模拟性能降级(例如切换到更快的模型或减少复杂度)。
实际场景可能调用配置管理API或服务网格规则。
"""
logger.info(f"Action: Degrading model performance by factor {factor}.")
# 这里我们通过一个模拟的管理接口来调整
try:
# 注意:这是一个示例,实际服务需要实现对应的管理端点
resp = requests.post(
f"{self.service_base_url}/admin/degrade",
json={"factor": factor},
timeout=2
)
if resp.status_code == 200:
logger.info("Model degradation action successful.")
else:
logger.warning(f"Model degradation action failed with status {resp.status_code}.")
except requests.exceptions.RequestException as e:
logger.error(f"Failed to call degrade endpoint: {e}")
# 作为备选,我们直接修改配置(仅用于演示,生产环境不推荐)
from core.llm_client import SimulatedLLMClient
# 需要能访问到全局实例,这里仅记录概念
def trip_circuit_breaker(self, service_id: str):
"""触发熔断器,阻止流量进入故障服务"""
logger.critical(f"Action: Tripping circuit breaker for service {service_id}.")
# 在实际系统中,这可能通过更新服务网格配置(如Istio DestinationRule)
# 或调用熔断器管理API实现。
# 此处记录日志并模拟
time.sleep(0.5) # 模拟API调用延迟
logger.info(f"Circuit breaker for {service_id} is now OPEN.")
def restart_service(self, service_id: str):
"""重启服务实例(模拟)"""
logger.critical(f"Action: Attempting to restart service {service_id}.")
# 严重警告:在生产环境中,自动重启需要非常谨慎,应有健康检查和滚动重启策略。
# 此处为模拟,仅记录日志。
# 示例:通过supervisorctl或kubectl重启
# command = f"supervisorctl restart {service_id}"
# try:
# subprocess.run(command, shell=True, check=True, timeout=30)
# logger.info(f"Service {service_id} restart command issued.")
# except subprocess.TimeoutExpired:
# logger.error(f"Restart command for {service_id} timed out.")
# except subprocess.CalledProcessError as e:
# logger.error(f"Restart command for {service_id} failed: {e}")
logger.warning(f"SIMULATION: Would restart service {service_id} now.")
# 模拟动作后,等待并检查健康
time.sleep(2)
try:
resp = requests.get(f"{self.service_base_url}/health", timeout=5)
if resp.status_code == 200:
logger.info(f"Service {service_id} appears healthy after restart simulation.")
else:
logger.error(f"Service {service_id} health check failed after restart simulation.")
except requests.exceptions.RequestException:
logger.error(f"Could not reach service {service_id} after restart simulation.")
def send_notification(self, message: str):
"""发送通知(例如到Slack、邮件、钉钉)"""
logger.info(f"Action: Sending notification: {message}")
# 集成第三方通知API的代码应在此处
# 例如使用requests发送到Webhook
# 此处仅记录日志
pass
3.8 文件路径:run_inference.py
#!/usr/bin/env python3
"""
启动LLM推理服务
"""
import uvicorn
from config import settings
from services.inference_service import app
if __name__ == "__main__":
uvicorn.run(
app,
host=settings.INFERENCE_SERVICE_HOST,
port=settings.INFERENCE_SERVICE_PORT,
log_level="info",
reload=False # 生产环境设为False
)
3.9 文件路径:run_monitoring.py
#!/usr/bin/env python3
"""
启动监控服务
"""
from monitoring.collector import main
if __name__ == "__main__":
main()
3.10 文件路径:run_selfhealing.py
#!/usr/bin/env python3
"""
启动自愈控制器
"""
from self_healing.controller import main
if __name__ == "__main__":
main()
3.11 文件路径:simulate_traffic.py
#!/usr/bin/env python3
"""
模拟高并发流量,用于测试系统
"""
import aiohttp
import asyncio
import random
import time
import sys
import argparse
from typing import List
import threading
async def make_request(session: aiohttp.ClientSession, url: str, prompt: str):
"""发起单个异步请求"""
payload = {"prompt": prompt, "max_tokens": random.randint(50, 200)}
start = time.time()
try:
async with session.post(url, json=payload, timeout=30) as resp:
latency = (time.time() - start) * 1000
status = resp.status
text = await resp.text()
return {"latency_ms": latency, "status": status, "success": 200 <= status < 300}
except Exception as e:
latency = (time.time() - start) * 1000
return {"latency_ms": latency, "status": 0, "success": False, "error": str(e)}
async def burst_traffic(url: str, requests_per_second: int, duration_seconds: int):
"""模拟突发流量"""
print(f"Starting burst: {requests_per_second} req/s for {duration_seconds}s")
tasks = []
prompts = [
"Explain the concept of quantum computing.",
"Write a short poem about the sea.",
"Translate 'Hello, world!' to French.",
"Summarize the key points of machine learning.",
"Generate a list of 5 creative project ideas."
]
async with aiohttp.ClientSession() as session:
for _ in range(duration_seconds):
start_second = time.time()
for _ in range(requests_per_second):
prompt = random.choice(prompts) + f" {random.randint(1,1000)}" # 添加变化
task = asyncio.create_task(make_request(session, url, prompt))
tasks.append(task)
# 控制这一秒内发出的请求速率
elapsed_in_second = time.time() - start_second
if elapsed_in_second < 1.0:
await asyncio.sleep(1.0 - elapsed_in_second)
# 收集结果
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if not isinstance(r, Exception)]
def print_stats(results: List[dict]):
"""打印请求统计信息"""
if not results:
print("No results collected.")
return
total = len(results)
success = sum(1 for r in results if r.get('success'))
error = total - success
latencies = [r['latency_ms'] for r in results if 'latency_ms' in r]
avg_latency = sum(latencies) / len(latencies) if latencies else 0
sorted_lats = sorted(latencies)
p99 = sorted_lats[int(len(sorted_lats) * 0.99)] if len(sorted_lats) > 0 else 0
print(f"\n--- Traffic Simulation Results ---")
print(f"Total Requests: {total}")
print(f"Successful: {success} ({success/total*100:.1f}%)")
print(f"Errors: {error} ({error/total*100:.1f}%)")
print(f"Average Latency: {avg_latency:.2f} ms")
print(f"P99 Latency: {p99:.2f} ms")
print("----------------------------------")
def main():
parser = argparse.ArgumentParser(description='Simulate traffic to LLM service.')
parser.add_argument('--url', default='http://localhost:8000/v1/completions',
help='Service endpoint URL')
parser.add_argument('--rps', type=int, default=20,
help='Requests per second during burst')
parser.add_argument('--duration', type=int, default=30,
help='Duration of burst in seconds')
parser.add_argument('--inject-failure', action='store_true',
help='Inject a simulated failure after 10s (force high latency)')
args = parser.parse_args()
# 启动流量模拟
print(f"Target URL: {args.url}")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 可选:在另一个线程中注入故障(模拟延迟飙升)
if args.inject_failure:
def inject_latency():
time.sleep(10)
print("\n[!] Injecting artificial latency degradation [!]")
import requests
try:
# 调用一个模拟的管理端点(需要服务端实现/admin/degrade)
resp = requests.post(f"http://localhost:8000/admin/degrade",
json={"factor": 5.0}, timeout=2)
print(f"Injection result: {resp.status_code}")
except Exception as e:
print(f"Failed to inject: {e}")
threading.Thread(target=inject_latency, daemon=True).start()
try:
results = loop.run_until_complete(burst_traffic(args.url, args.rps, args.duration))
print_stats(results)
except KeyboardInterrupt:
print("\nSimulation interrupted.")
finally:
loop.close()
if __name__ == "__main__":
main()
4. 安装依赖与运行步骤
4.1 环境准备
确保已安装Python 3.8+和Redis。可使用Docker快速启动Redis:
docker run -d -p 6379:6379 --name llm-redis redis:alpine
4.2 安装Python依赖
pip install -r requirements.txt
requirements.txt 内容:
fastapi==0.104.1
uvicorn[standard]==0.24.0
prometheus-client==0.19.0
requests==2.31.0
redis==5.0.1
aiohttp==3.9.1
pydantic==2.5.0
4.3 启动服务(需要三个独立的终端)
终端1 - 启动LLM推理服务:
python run_inference.py
服务启动后,访问 http://localhost:8000/docs 查看API文档,http://localhost:8000/metrics 查看指标。
终端2 - 启动监控服务:
python run_monitoring.py
它将每5秒拉取一次指标并检查异常。
终端3 - 启动自愈控制器:
python run_selfhealing.py
它将监听Redis中的警报消息并执行自愈动作。
4.4 模拟流量并观察系统
终端4 - 模拟正常流量:
python simulate_traffic.py --rps 15 --duration 60
观察各个终端的日志输出。监控服务会打印采集日志,自愈控制器应保持安静(无异常)。
终端5 - 模拟故障流量:
python simulate_traffic.py --rps 50 --duration 40 --inject-failure
--inject-failure 会在10秒后尝试触发模拟的延迟增长。此时应观察到:
- 监控服务检测到高延迟或高错误率,发布警报。
- 自愈控制器收到警报,触发熔断或降级动作,并在日志中显示。
- 推理服务的指标(如错误率)可能因自愈动作而逐渐恢复。
5. 测试与验证步骤
5.1 基础健康检查
curl http://localhost:8000/health
应返回:{"status":"healthy","timestamp":"..."}
5.2 验证指标端点
curl http://localhost:8000/metrics | grep llm_requests_total
应看到类似 llm_requests_total{model="simulated-llm-v1",status="success"} 123 的指标行。
5.3 验证监控链路
- 观察监控服务终端,应每5秒打印一次
Starting monitoring cycle...。 - 使用
redis-cli订阅警报频道,查看原始警报:
redis-cli
127.0.0.1:6379> SUBSCRIBE alerts:llm_service
- 运行故障模拟脚本 (
simulate_traffic.py --inject-failure),在Redis客户端中应能看到发布的JSON格式警报。
5.4 验证自愈动作
- 在自愈控制器终端,观察收到警报后的决策日志,例如:
Processing alert: high_latency
Executing action for high latency: degrade model latency.
- (可选)在自愈动作
actions.py中,可以取消注释restart_service函数中的模拟命令,或添加真实的API调用,以验证更复杂的恢复流程。
6. 扩展说明与最佳实践
性能:本示例使用内存存储和简单循环,适用于演示。生产环境需考虑:
- 使用真正的时序数据库(如Prometheus + Thanos)存储指标。
- 使用Apache Kafka或RabbitMQ作为可靠的消息队列。
- 将监控分析和自愈决策实现为无状态服务,便于水平扩展。
部署:建议容器化部署,使用Kubernetes管理服务生命周期。自愈动作可以集成K8s Operator模式,直接操作Pod、Deployment或VirtualService资源。
规则引擎:当前使用硬编码规则。对于复杂场景,可集成轻量级规则引擎(如durable_rules)或机器学习模型进行异常检测与根因分析。
安全性:管理端点(如/admin/degrade)必须施加严格的认证与授权。所有自愈动作应有审批工作流或"干运行"模式,防止误操作。
可观测性闭环的深化:可集成分布式追踪(如Jaeger)来定位延迟瓶颈的具体组件;将自愈动作的执行结果(成功/失败)也作为指标反馈回监控系统,形成更完整的闭环。