摘要
本文介绍了一个结合因果推断与规则引擎的混沌工程实验根因定位与误报治理系统。项目模拟了一个简化的微服务调用链,通过注入预设故障并收集服务指标,应用基于约束的PC算法构建因果图以推断故障根源,并引入基于指标聚合与历史实验比对的误报治理策略来提升分析结果的准确性。文章提供了完整的、可运行的项目代码(约1200行),涵盖项目结构、核心算法实现、配置示例以及运行验证步骤,旨在为混沌工程平台的智能化根因分析模块提供一套切实可行的实践方案。
1. 项目概述:混沌实验中的智能根因定位
在混沌工程实践中,故障注入后,系统往往会产生海量的、相互关联的可观测性数据(如Metrics、Logs、Traces)。快速、准确地从这些"噪声"中定位出故障的根本原因(Root Cause),是缩短平均恢复时间(MTTR)的关键。传统方法多基于专家经验规则或相关性分析,前者难以覆盖复杂场景,后者易受混杂因子影响导致误报。
本项目旨在构建一个轻量级但完整的"根因定位与误报治理"模块原型。其核心设计思路为:
- 数据模拟:构建一个模拟的微服务调用树(Service A -> B -> C),并生成包含正常与故障场景下的服务级指标(如延迟、错误率、吞吐量)。
- 因果推断:采用基于约束的因果发现算法(PC算法),从观测数据中学习指标间的因果图(Causal Graph),识别出故障注入点(根因)在因果网络中的位置。
- 误报治理:设计一个简单的规则引擎,对因果推断的初步结果进行后处理,通过多指标聚合、与历史稳态数据比对等策略,过滤掉可能由数据噪声或算法局限引起的误报。
- 可观测集成:模拟将分析结果(根因服务、置信度、关联指标)输出到日志或外部系统,为后续的告警与自愈提供输入。
本项目的代码将聚焦于核心的算法逻辑、数据模拟以及治理规则,省略完整生产环境所需的持久化、高可用等复杂特性。
2. 项目结构树
chaos-root-cause/
├── config/
│ └── service_config.yaml # 微服务拓扑与指标配置
├── core/
│ ├── __init__.py
│ ├── causal_graph.py # 因果图构建算法核心
│ └── false_positive_filter.py # 误报治理规则引擎
├── simulation/
│ ├── __init__.py
│ ├── service.py # 微服务节点模拟
│ └── metrics_generator.py # 指标数据生成器
├── main.py # 主执行入口
├── requirements.txt # Python依赖
└── README.md # (根据要求,不在结构树中显示)
3. 核心代码实现
文件路径:config/service_config.yaml
# 微服务调用拓扑配置
service_topology:
services:
- name: service_a
upstream: []
downstream: ["service_b", "service_c"]
baseline_latency_ms: 50
baseline_error_rate: 0.001
- name: service_b
upstream: ["service_a"]
downstream: []
baseline_latency_ms: 100
baseline_error_rate: 0.005
- name: service_c
upstream: ["service_a"]
downstream: []
baseline_latency_ms: 80
baseline_error_rate: 0.002
# 故障注入配置(实验参数)
experiment:
fault_target: service_b # 本次实验注入故障的服务
fault_type: latency_increase # 故障类型:延迟增加
fault_intensity: 3.0 # 延迟倍数
injection_start_step: 100 # 在第100个数据点开始注入
total_steps: 200 # 总共生成200个数据点
# 因果推断算法参数
causal_inference:
pc_alpha: 0.05 # 条件独立性测试的显著性水平
max_condition_set_size: 3 # 最大条件集大小
# 误报治理规则
false_positive_rules:
min_confidence: 0.7 # 因果边的最小置信度阈值
required_concurrent_anomalies: 2 # 判定根因所需的同时异常指标数
history_baseline_window: 50 # 用于计算历史基线的数据窗口大小
文件路径:core/causal_graph.py
import numpy as np
import pandas as pd
from itertools import combinations
from typing import List, Set, Tuple, Dict, Optional
import networkx as nx
import logging
logger = logging.getLogger(__name__)
class PCAlgorithm:
"""
实现PC算法(Peter-Clark)进行因果发现。
这是一个简化的、基于方差(高斯假设)的条件独立性测试版本,用于演示核心逻辑。
"""
def __init__(self, alpha: float = 0.05, max_condition_set_size: int = 3):
self.alpha = alpha
self.max_cond_set_size = max_condition_set_size
self.graph = None
self.sepset = {} # 存储分离集
def _ci_test_gaussian(self, data: pd.DataFrame, var_i: str, var_j: str, cond_set: List[str]) -> Tuple[bool, float]:
"""
执行基于偏相关的条件独立性检验(高斯假设)。
返回:(是否独立, p-value)
"""
if not cond_set:
# 无条件集,使用普通相关系数
corr, p_val = self._pearson_corr_test(data[var_i], data[var_j])
return p_val > self.alpha, p_val
else:
# 计算偏相关系数
from scipy import stats
import statsmodels.api as sm
# 构建回归模型: var_i ~ cond_set + var_j
X = data[cond_set + [var_j]]
X = sm.add_constant(X)
y = data[var_i]
try:
model = sm.OLS(y, X).fit()
# 获取var_j系数的p-value
p_val = model.pvalues[var_j]
return p_val > self.alpha, p_val
except Exception as e:
logger.warning(f"CI test failed for {var_i} vs {var_j} | {cond_set}: {e}")
return True, 1.0 # 失败时默认独立
def _pearson_corr_test(self, x, y):
"""计算Pearson相关系数和p-value。"""
from scipy import stats
corr, p_val = stats.pearsonr(x, y)
return corr, p_val
def learn_skeleton(self, data: pd.DataFrame) -> nx.Graph:
"""
学习因果图的骨架(无向图)。
"""
variables = data.columns.tolist()
n_vars = len(variables)
# 初始化完全连接的无向图
graph = nx.complete_graph(n_vars)
mapping = {i: var for i, var in enumerate(variables)}
nx.relabel_nodes(graph, mapping, copy=False)
self.sepset = { (i, j): [] for i in variables for j in variables if i != j }
condition_set_size = 0
while condition_set_size <= self.max_cond_set_size and graph.number_of_edges() > 0:
edges = list(graph.edges())
for (var_i, var_j) in edges:
if not graph.has_edge(var_i, var_j):
continue
# 获取当前节点的邻居,并移除对方节点
adj_i = set(graph.neighbors(var_i)) - {var_j}
adj_j = set(graph.neighbors(var_j)) - {var_i}
# 所有可能的条件集(从邻居中选取)
possible_cond_sets = combinations(adj_i.union(adj_j), condition_set_size)
independent = False
for cond_set in possible_cond_sets:
cond_set_list = list(cond_set)
indep, p_val = self._ci_test_gaussian(data, var_i, var_j, cond_set_list)
if indep:
independent = True
self.sepset[(var_i, var_j)] = cond_set_list
self.sepset[(var_j, var_i)] = cond_set_list
break # 找到一个条件集使得独立即可
if independent:
graph.remove_edge(var_i, var_j)
logger.debug(f"Removed edge {var_i} - {var_j} | conditioning on {self.sepset.get((var_i, var_j), [])}")
condition_set_size += 1
self.graph = graph
return graph
def orient_edges(self, data: pd.DataFrame) -> nx.DiGraph:
"""
基于骨架和分离集进行边定向(规则:碰撞点识别)。
这是PC算法的定向步骤简化版。
"""
if self.graph is None:
raise ValueError("Must learn skeleton first.")
dag = nx.DiGraph()
dag.add_edges_from(self.graph.edges()) # 初始化为无向边
# 规则1: 识别碰撞点 (collider) X -> Z <- Y
for node in dag.nodes():
neighbors = list(dag.neighbors(node))
for (x, y) in combinations(neighbors, 2):
if not dag.has_edge(x, y) and not dag.has_edge(y, x):
# x - node - y 且 x和y不相邻
if node not in self.sepset.get((x, y), []):
# node 不在x,y的分离集中,则构成碰撞点
if dag.has_edge(x, node):
dag.remove_edge(x, node)
if dag.has_edge(node, x):
dag.remove_edge(node, x)
if dag.has_edge(y, node):
dag.remove_edge(y, node)
if dag.has_edge(node, y):
dag.remove_edge(node, y)
dag.add_edge(x, node)
dag.add_edge(y, node)
logger.debug(f"Oriented collider: {x} -> {node} <- {y}")
# 简单规则2: 避免新环 (只做简单处理,生产环境需更复杂规则)
edges_to_orient = list(dag.edges())
for (u, v) in edges_to_orient:
if dag.has_edge(u, v) and dag.has_edge(v, u):
# 双向边,随机选择一个方向(简化)。实际应基于更多检验。
if np.random.rand() > 0.5:
dag.remove_edge(u, v)
else:
dag.remove_edge(v, u)
return dag
def run(self, data: pd.DataFrame) -> nx.DiGraph:
"""执行完整的PC算法。"""
logger.info("Learning causal skeleton...")
self.learn_skeleton(data)
logger.info("Orienting edges...")
dag = self.orient_edges(data)
logger.info(f"Causal DAG learned with {dag.number_of_edges()} directed edges.")
return dag
class RootCauseLocator:
"""基于因果图的根因定位器。"""
def __init__(self, causal_dag: nx.DiGraph, anomaly_scores: Dict[str, float]):
self.dag = causal_dag
self.anomaly_scores = anomaly_scores # 各指标/服务的异常分数 (0-1)
def _calculate_node_influence(self, node: str) -> float:
"""计算一个节点在因果网络中的综合影响分数(基于PageRank思想简化)。"""
# 简单加权:自身异常分数 + 指向它的父节点的异常分数
score = self.anomaly_scores.get(node, 0)
parents = list(self.dag.predecessors(node))
for p in parents:
score += 0.5 * self.anomaly_scores.get(p, 0) # 父节点影响权重因子
return score
def locate(self, top_k: int = 3) -> List[Tuple[str, float]]:
"""定位最有可能的根因节点(服务)。"""
influence_scores = {}
for node in self.dag.nodes():
# 只考虑异常分数较高的节点
if self.anomaly_scores.get(node, 0) > 0.3:
influence_scores[node] = self._calculate_node_influence(node)
ranked_causes = sorted(influence_scores.items(), key=lambda x: x[1], reverse=True)
return ranked_causes[:top_k]
文件路径:core/false_positive_filter.py
import numpy as np
from typing import List, Dict, Any, Tuple
import logging
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class RuleConfig:
min_confidence: float
required_concurrent_anomalies: int
history_baseline_window: int
class FalsePositiveFilter:
"""
误报治理过滤器。
应用规则来验证和过滤因果推断产生的初步根因列表。
"""
def __init__(self, config: RuleConfig):
self.config = config
def filter_by_confidence(self, causes: List[Tuple[str, float]]) -> List[Tuple[str, float]]:
"""规则1: 基于置信度阈值过滤。"""
filtered = [(svc, score) for svc, score in causes if score >= self.config.min_confidence]
logger.info(f"Confidence filter: {len(causes)} -> {len(filtered)} candidates.")
return filtered
def filter_by_concurrent_anomalies(self,
causes: List[Tuple[str, float]],
service_metrics: Dict[str, List[float]]) -> List[Tuple[str, float]]:
"""
规则2: 要求根因服务有多个指标同时异常。
service_metrics: 字典,键为指标名(如'service_b_latency'),值为指标值列表。
"""
filtered = []
for svc, score in causes:
# 找出属于该服务的所有指标
svc_metrics = [m for m in service_metrics.keys() if m.startswith(svc)]
# 检查这些指标中,异常的比例
anomalous_count = 0
for metric in svc_metrics:
vals = service_metrics.get(metric, [])
if len(vals) == 0:
continue
current_val = vals[-1] # 取最新值
# 简单阈值异常检测(应与历史基线比较,此处简化)
hist_vals = vals[-self.config.history_baseline_window:-1] if len(vals) > 10 else vals
if len(hist_vals) > 0:
baseline_mean = np.mean(hist_vals)
baseline_std = np.std(hist_vals)
if baseline_std > 0 and abs(current_val - baseline_mean) > 2 * baseline_std:
anomalous_count += 1
if anomalous_count >= self.config.required_concurrent_anomalies:
filtered.append((svc, score))
logger.debug(f"Service {svc} passed concurrent anomalies check with {anomalous_count} anomalous metrics.")
logger.info(f"Concurrent anomalies filter: {len(causes)} -> {len(filtered)} candidates.")
return filtered
def filter_by_causal_structure(self,
causes: List[Tuple[str, float]],
causal_dag) -> List[Tuple[str, float]]:
"""
规则3: 基于因果图结构过滤。倾向于选择在因果图中是多个异常节点祖先的节点。
简化:优先选择入度低(可能是源头)、出度高(影响广泛)的节点。
"""
if causal_dag is None or causal_dag.number_of_nodes() == 0:
return causes
filtered = []
for svc, score in causes:
if svc not in causal_dag.nodes():
continue
in_deg = causal_dag.in_degree(svc)
out_deg = causal_dag.out_degree(svc)
# 简单的启发式规则:源头节点通常出度大于入度
if out_deg >= in_deg:
filtered.append((svc, score))
logger.debug(f"Service {svc} passed structure check (in={in_deg}, out={out_deg}).")
logger.info(f"Causal structure filter: {len(causes)} -> {len(filtered)} candidates.")
return filtered
def apply_all_filters(self,
initial_causes: List[Tuple[str, float]],
service_metrics: Dict[str, List[float]],
causal_dag) -> List[Tuple[str, float]]:
"""应用所有过滤规则。"""
candidates = initial_causes
candidates = self.filter_by_confidence(candidates)
candidates = self.filter_by_concurrent_anomalies(candidates, service_metrics)
candidates = self.filter_by_causal_structure(candidates, causal_dag)
return candidates
文件路径:simulation/service.py
from dataclasses import dataclass
from typing import List, Optional
import numpy as np
@dataclass
class ServiceNode:
"""模拟一个微服务节点。"""
name: str
upstream: List[str]
downstream: List[str]
baseline_latency_ms: float
baseline_error_rate: float
current_latency_factor: float = 1.0
current_error_rate_factor: float = 1.0
def inject_fault(self, fault_type: str, intensity: float):
"""注入故障。"""
if fault_type == "latency_increase":
self.current_latency_factor = intensity
print(f"[Simulation] Injected {fault_type}({intensity}x) into {self.name}")
elif fault_type == "error_rate_increase":
self.current_error_rate_factor = intensity
print(f"[Simulation] Injected {fault_type}({intensity}x) into {self.name}")
else:
raise ValueError(f"Unknown fault type: {fault_type}")
def clear_fault(self):
"""清除故障。"""
self.current_latency_factor = 1.0
self.current_error_rate_factor = 1.0
def generate_metrics(self, step: int, is_faulty: bool = False) -> dict:
"""为该服务生成在当前时间步的模拟指标。"""
# 基础值 + 随机波动
latency_noise = np.random.normal(0, self.baseline_latency_ms * 0.1)
latency = self.baseline_latency_ms * self.current_latency_factor + latency_noise
latency = max(latency, 1.0)
error_rate = self.baseline_error_rate * self.current_error_rate_factor
# 模拟错误发生
error_occurred = np.random.random() < error_rate
# 吞吐量受延迟和错误影响
throughput_baseline = 1000.0 / (self.baseline_latency_ms / 1000.0) # RPS
throughput = throughput_baseline / self.current_latency_factor
if error_occurred:
throughput *= 0.8 # 错误导致吞吐下降
throughput_noise = np.random.normal(0, throughput * 0.05)
throughput += throughput_noise
throughput = max(throughput, 1.0)
return {
f"{self.name}_latency": latency,
f"{self.name}_error_rate": 1.0 if error_occurred else 0.0, # 简化成二值
f"{self.name}_throughput": throughput,
}
文件路径:simulation/metrics_generator.py
import numpy as np
import pandas as pd
from typing import List, Dict
from .service import ServiceNode
import logging
logger = logging.getLogger(__name__)
class MetricsGenerator:
"""协调多个服务,生成全局指标时间序列数据。"""
def __init__(self, services: List[ServiceNode]):
self.services = {svc.name: svc for svc in services}
self.metrics_history = {} # type: Dict[str, List[float]]
def generate_time_series(self,
total_steps: int,
fault_config: Dict[str, any]) -> pd.DataFrame:
"""
生成多步指标数据。
fault_config: 包含 'target', 'type', 'intensity', 'start_step'
"""
all_data = []
fault_injected = False
for step in range(total_steps):
step_metrics = {}
# 检查是否需要注入故障
if (not fault_injected and
step >= fault_config.get('start_step', total_steps//2) and
fault_config.get('target') in self.services):
target_svc = self.services[fault_config['target']]
target_svc.inject_fault(fault_config['type'], fault_config['intensity'])
fault_injected = True
# 为每个服务生成指标
for svc_name, svc in self.services.items():
is_faulty = (fault_injected and svc_name == fault_config.get('target'))
metrics = svc.generate_metrics(step, is_faulty)
step_metrics.update(metrics)
all_data.append(step_metrics)
# 更新历史记录
for metric_name, value in step_metrics.items():
self.metrics_history.setdefault(metric_name, []).append(value)
# 生成DataFrame
df = pd.DataFrame(all_data)
logger.info(f"Generated {len(df)} time steps of metrics data with {len(df.columns)} metrics.")
return df
文件路径:main.py
#!/usr/bin/env python3
"""
混沌工程根因定位与误报治理主程序。
"""
import sys
import yaml
import logging
import pandas as pd
import numpy as np
from pathlib import Path
# 导入项目模块
from simulation.service import ServiceNode
from simulation.metrics_generator import MetricsGenerator
from core.causal_graph import PCAlgorithm, RootCauseLocator
from core.false_positive_filter import FalsePositiveFilter, RuleConfig
def setup_logging():
"""配置日志。"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
def load_config(config_path: str) -> dict:
"""加载YAML配置文件。"""
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
return config
def calculate_anomaly_scores(df: pd.DataFrame, fault_start_step: int) -> Dict[str, float]:
"""
计算各指标的异常分数(简化版)。
对比故障注入前后的均值变化。
"""
scores = {}
baseline_window = 50
# 使用故障开始前的一小段作为基线
baseline_end = max(0, fault_start_step - 10)
baseline_start = max(0, baseline_end - baseline_window)
for col in df.columns:
baseline_data = df.iloc[baseline_start:baseline_end][col]
fault_data = df.iloc[fault_start_step:][col]
if len(baseline_data) < 2 or len(fault_data) < 2:
scores[col] = 0.0
continue
baseline_mean = baseline_data.mean()
baseline_std = baseline_data.std()
fault_mean = fault_data.mean()
if baseline_std > 0:
# 计算标准化的变化量,并映射到0-1分数
change = abs(fault_mean - baseline_mean) / baseline_std
score = min(change / 5.0, 1.0) # 假设5个标准差对应满分1
else:
score = 1.0 if abs(fault_mean - baseline_mean) > 0 else 0.0
# 服务级别的异常分数取它所有指标的最大值
svc_name = col.split('_')[0] # 假设指标命名格式:`{service}_{metric}`
scores[svc_name] = max(scores.get(svc_name, 0.0), score)
scores[col] = score # 也保留指标级别的分数
return scores
def main():
setup_logging()
logger = logging.getLogger(__name__)
# 1. 加载配置
config_path = Path(__file__).parent / 'config' / 'service_config.yaml'
config = load_config(config_path)
logger.info("Configuration loaded.")
# 2. 初始化服务模拟器
services = []
for svc_cfg in config['service_topology']['services']:
svc = ServiceNode(
name=svc_cfg['name'],
upstream=svc_cfg.get('upstream', []),
downstream=svc_cfg.get('downstream', []),
baseline_latency_ms=svc_cfg['baseline_latency_ms'],
baseline_error_rate=svc_cfg['baseline_error_rate']
)
services.append(svc)
logger.info(f"Initialized {len(services)} services.")
# 3. 生成指标数据
exp_cfg = config['experiment']
fault_cfg = {
'target': exp_cfg['fault_target'],
'type': exp_cfg['fault_type'],
'intensity': exp_cfg['fault_intensity'],
'start_step': exp_cfg['injection_start_step']
}
generator = MetricsGenerator(services)
metrics_df = generator.generate_time_series(
total_steps=exp_cfg['total_steps'],
fault_config=fault_cfg
)
logger.info(f"Metrics DataFrame shape: {metrics_df.shape}")
# 4. 因果推断
ci_cfg = config['causal_inference']
pc = PCAlgorithm(alpha=ci_cfg['pc_alpha'], max_condition_set_size=ci_cfg['max_condition_set_size'])
# 使用所有指标数据进行因果学习(生产上可能需选择关键指标)
causal_dag = pc.run(metrics_df)
logger.info("Causal inference completed.")
# 5. 根因定位(初步)
anomaly_scores = calculate_anomaly_scores(metrics_df, exp_cfg['injection_start_step'])
logger.info(f"Anomaly scores calculated for {len(anomaly_scores)} entities.")
locator = RootCauseLocator(causal_dag, anomaly_scores)
initial_root_causes = locator.locate(top_k=5)
logger.info(f"Initial root cause candidates: {initial_root_causes}")
# 6. 误报治理
fp_cfg = config['false_positive_rules']
rule_config = RuleConfig(
min_confidence=fp_cfg['min_confidence'],
required_concurrent_anomalies=fp_cfg['required_concurrent_anomalies'],
history_baseline_window=fp_cfg['history_baseline_window']
)
fp_filter = FalsePositiveFilter(rule_config)
# 准备指标历史数据(字典格式)
metrics_history_dict = {col: metrics_df[col].tolist() for col in metrics_df.columns}
final_causes = fp_filter.apply_all_filters(
initial_root_causes,
metrics_history_dict,
causal_dag
)
# 7. 输出结果
print("\n" + "="*60)
print("CHAOS EXPERIMENT ROOT CAUSE ANALYSIS REPORT")
print("="*60)
print(f"Fault Injected: {exp_cfg['fault_type']} on '{exp_cfg['fault_target']}' "
f"(intensity: {exp_cfg['fault_intensity']})")
print(f"Injection Step: {exp_cfg['injection_start_step']}")
print("-"*60)
print("FINAL ROOT CAUSE RANKING (after false-positive filtering):")
if final_causes:
for i, (svc, score) in enumerate(final_causes, 1):
print(f" {i}. Service: {svc:15s} | Confidence Score: {score:.3f}")
if svc == exp_cfg['fault_target']:
print(f" -> **MATCHES INJECTED FAULT TARGET**")
else:
print(" No root cause identified after filtering.")
print("="*60)
# 可视化因果图(文本形式)
print("\nCAUSAL GRAPH (simplified adjacency):")
for node in causal_dag.nodes():
parents = list(causal_dag.predecessors(node))
children = list(causal_dag.successors(node))
if parents or children:
print(f" {node}: Parents {parents}, Children {children}")
if __name__ == "__main__":
main()
4. 安装依赖与运行步骤
4.1 环境要求
- Python 3.8+
- pip 包管理器
4.2 安装依赖
项目根目录下创建 requirements.txt 文件,内容如下:
numpy>=1.21.0
pandas>=1.3.0
scipy>=1.7.0
statsmodels>=0.13.0
networkx>=2.6.0
pyyaml>=6.0
在终端中执行以下命令安装依赖:
pip install -r requirements.txt
4.3 运行实验
确保项目结构完整,配置文件 config/service_config.yaml 已就位。在项目根目录下直接运行主程序:
python main.py
预期输出:
程序将模拟故障注入、生成指标、执行因果推断与误报治理,并在控制台打印一份分析报告,包括最终筛选出的根因服务列表及其置信度分数。如果配置正确,最终根因列表的第一位应匹配配置中注入故障的目标服务(例如 service_b)。
5. 关键流程与算法交互图
6. 误报治理决策流程图
7. 测试与验证
7.1 单元测试(示例)
在项目根目录下创建 test_basic.py,对核心算法进行简单验证。
import sys
sys.path.insert(0, '.')
import numpy as np
import pandas as pd
from core.causal_graph import PCAlgorithm
from core.false_positive_filter import FalsePositiveFilter, RuleConfig
def test_pc_algorithm_simple():
"""测试PC算法在简单线性关系下的表现。"""
np.random.seed(42)
n = 500
X = np.random.normal(0, 1, n)
Y = X + np.random.normal(0, 0.5, n)
Z = Y + np.random.normal(0, 0.5, n)
data = pd.DataFrame({'X': X, 'Y': Y, 'Z': Z})
pc = PCAlgorithm(alpha=0.05, max_condition_set_size=2)
dag = pc.run(data)
# 期望的边: X->Y, Y->Z
edges = list(dag.edges())
print(f"Learned edges: {edges}")
assert ('X', 'Y') in edges or ('Y', 'Z') in edges, "Should learn some causal relationships"
print("PC algorithm test passed.")
def test_false_positive_filter():
"""测试误报过滤器的基础逻辑。"""
config = RuleConfig(min_confidence=0.6, required_concurrent_anomalies=2, history_baseline_window=10)
filter = FalsePositiveFilter(config)
causes = [('A', 0.8), ('B', 0.5), ('C', 0.9)]
filtered = filter.filter_by_confidence(causes)
assert len(filtered) == 2 # B被过滤掉
assert ('B', 0.5) not in filtered
print("False positive filter test passed.")
if __name__ == '__main__':
test_pc_algorithm_simple()
test_false_positive_filter()
print("All tests passed.")
运行测试:
python test_basic.py
7.2 集成验证
修改 config/service_config.yaml 中的 fault_target 为不同的服务(如 service_c),重新运行 python main.py,观察最终输出的根因是否随之改变,以验证系统的灵敏性。
8. 扩展说明与最佳实践
-
生产级改进:
- 算法增强:使用更鲁棒的因果发现算法(如FCI处理隐变量,或基于分数的GES算法)及非参数条件独立性检验。
- 数据预处理:引入更专业的异常检测算法(如S-H-ESD、机器学习模型)计算异常分数,并处理指标间的量纲与延迟。
- 规则引擎:集成Drools等成熟规则引擎,实现动态加载和更复杂的逻辑组合。
- 可观测性集成:将本模块输出与Prometheus Alertmanager、OpenTelemetry Trace等结合,实现从告警到根因建议的自动化链路。
-
性能考量:PC算法复杂度随变量数指数增长。生产环境应限制分析的关键指标数量(如通过业务重要性筛选),或采用分布式/增量学习算法。
-
部署建议:可将本模块封装为独立的微服务,通过gRPC或REST API接收混沌实验期间的指标快照,返回根因分析结果,便于与混沌工程平台(如ChaosBlade、Litmus)集成。