摘要:本文探讨了如何将链路追踪技术深度应用于软件供应链安全领域,设计并实现一个名为"供应链安全追踪者"的轻量级一体化原型系统。该系统通过为微服务注入分布式追踪能力,自动捕获运行时调用链,并关联静态的软件物料清单(SBOM),从而实现从动态API调用到静态依赖漏洞的贯穿式分析与风险可视。文章将详细阐述核心架构设计、关键代码实现(包括追踪上下文传播、SBOM解析聚合及安全分析引擎)以及面临的性能、数据一致性...
摘要
本文探讨了如何将链路追踪技术深度应用于软件供应链安全领域,设计并实现一个名为"供应链安全追踪者"的轻量级一体化原型系统。该系统通过为微服务注入分布式追踪能力,自动捕获运行时调用链,并关联静态的软件物料清单(SBOM),从而实现从动态API调用到静态依赖漏洞的贯穿式分析与风险可视。文章将详细阐述核心架构设计、关键代码实现(包括追踪上下文传播、SBOM解析聚合及安全分析引擎)以及面临的性能、数据一致性等工程权衡。我们将提供一个完整的、约1500行的可运行Python项目,包含模拟微服务、数据收集器和分析界面,并通过具体示例和Mermaid图表展示数据流转与风险定位过程。
1 项目概述:"供应链安全追踪者"原型
"供应链安全追踪者"项目旨在构建一个概念验证系统,演示如何利用链路追踪技术增强软件供应链安全。核心思想是:当用户发起一个请求(如提交订单)时,系统不仅记录该请求经过的微服务路径(动态链路),还能自动关联并分析这些服务及其依赖库的SBOM(静态清单),从而在安全事件发生时,快速定位受影响的API、服务及具体的漏洞依赖版本。
设计思路:
- 追踪与SBOM的融合: 在传统的追踪Span中注入
service.version和dependencies.snapshot等标签。每个服务启动时生成/加载其SBOM,并在处理首个追踪请求时,将SBOM摘要信息附加到追踪上下文中向下游传递。 - 中心化分析与存储: 收集器接收追踪数据与SBOM信息,进行关联存储。分析引擎定期或实时对聚合后的"服务-依赖"图谱进行漏洞扫描。
- 风险可视化与追溯: 提供UI界面,可基于漏洞ID、服务名或依赖包名进行查询,并以调用链图的形式展示受影响的路径。
本项目模拟一个简化的电商场景,包含三个服务:frontend(前端网关)、inventory(库存服务)、order(订单服务)。
2 项目结构树
supply-chain-trace/
├── README.md # 项目说明(输出时忽略)
├── pyproject.toml # 项目依赖与配置
├── run.py # 主启动脚本
├── services/ # 模拟微服务目录
│ ├── common.py # 公共工具(追踪、SBOM处理)
│ ├── frontend.py
│ ├── inventory.py
│ └── order.py
├── collector/ # 追踪与SBOM收集器
│ └── server.py
├── analyzer/ # 安全分析引擎
│ ├── engine.py
│ └── models.py
├── ui/ # 简易Web界面
│ └── app.py
├── data/ # 数据存储(SQLite)
│ └── init_db.py
└── config.yaml # 全局配置文件
3 核心代码实现
文件路径:pyproject.toml
[project]
name = "supply-chain-trace"
version = "0.1.0"
dependencies = [
"flask>=3.0.0",
"requests>=2.31.0",
"pyyaml>=6.0",
"sqlalchemy>=2.0.0",
"python-json-logger>=2.0.0",
"typer>=0.9.0",
]
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
文件路径:config.yaml
services:
frontend: &base_service
host: "localhost"
port: 8000
name: "frontend"
version: "v1.2.0"
inventory:
<<: *base_service
port: 8001
name: "inventory"
version: "v1.1.5"
order:
<<: *base_service
port: 8002
name: "order"
version: "v1.3.0"
collector:
host: "localhost"
port: 8888
analyzer:
scan_interval_seconds: 60
database:
url: "sqlite:///./data/trace.db"
logging:
level: "INFO"
文件路径:services/common.py
import json
import logging
import uuid
from dataclasses import asdict, dataclass, field
from typing import Dict, List, Optional
import requests
import yaml
# 加载配置
with open('./config.yaml', 'r') as f:
CONFIG = yaml.safe_load(f)
COLLECTOR_URL = f"http://{CONFIG['collector']['host']}:{CONFIG['collector']['port']}/ingest"
logger = logging.getLogger(__name__)
@dataclass
class SBOMComponent:
"""模拟SBOM组件(依赖项)"""
type: str = "library"
name: str = ""
version: str = ""
purl: str = "" # Package URL
@dataclass
class SBOM:
"""简化版SBOM表示"""
service_name: str
service_version: str
components: List[SBOMComponent] = field(default_factory=list)
def to_dict(self):
return asdict(self)
class ServiceSBOM:
"""管理服务的SBOM"""
_cache: Dict[str, SBOM] = {}
@classmethod
def get_for_service(cls, service_name: str, service_version: str) -> SBOM:
key = f"{service_name}:{service_version}"
if key not in cls._cache:
# 模拟:根据服务名和版本生成/加载一个静态SBOM
# 在实际应用中,这里会从文件(如bom.json)或注册中心加载
sbom = SBOM(service_name=service_name, service_version=service_version)
if service_name == "frontend":
sbom.components = [
SBOMComponent(name="flask", version="2.3.0", purl="pkg:pypi/flask@2.3.0"),
SBOMComponent(name="requests", version="2.31.0", purl="pkg:pypi/requests@2.31.0"),
SBOMComponent(name="vulnerable-lib", version="1.0.0", purl="pkg:pypi/vulnerable-lib@1.0.0"), # 模拟漏洞库
]
elif service_name == "inventory":
sbom.components = [
SBOMComponent(name="sqlalchemy", version="2.0.0", purl="pkg:pypi/sqlalchemy@2.0.0"),
]
elif service_name == "order":
sbom.components = [
SBOMComponent(name="flask", version="2.3.0", purl="pkg:pypi/flask@2.3.0"),
SBOMComponent(name="vulnerable-lib", version="1.0.0", purl="pkg:pypi/vulnerable-lib@1.0.0"),
]
cls._cache[key] = sbom
return cls._cache[key]
@dataclass
class TraceContext:
"""分布式追踪上下文,携带追踪ID和SBOM摘要"""
trace_id: str = field(default_factory=lambda: str(uuid.uuid4()))
span_id: str = field(default_factory=lambda: str(uuid.uuid4().hex[:8]))
parent_span_id: Optional[str] = None
service_name: str = ""
service_version: str = ""
sbom_snapshot: Optional[Dict] = None # 上游服务的SBOM摘要
def to_headers(self) -> Dict[str, str]:
"""将上下文注入HTTP头部,用于传播"""
return {
"X-Trace-ID": self.trace_id,
"X-Span-ID": self.span_id,
"X-Parent-Span-ID": self.parent_span_id or "",
"X-Service-Name": self.service_name,
"X-Service-Version": self.service_version,
"X-SBOM-Snapshot": json.dumps(self.sbom_snapshot) if self.sbom_snapshot else "",
}
@classmethod
def from_headers(cls, headers: Dict[str, str]) -> Optional["TraceContext"]:
"""从HTTP头部提取上下文"""
trace_id = headers.get("X-Trace-ID")
if not trace_id:
return None
sbom_snapshot_str = headers.get("X-SBOM-Snapshot", "")
return cls(
trace_id=trace_id,
span_id=headers.get("X-Span-ID", str(uuid.uuid4().hex[:8])),
parent_span_id=headers.get("X-Parent-Span-ID") or None,
service_name=headers.get("X-Service-Name", ""),
service_version=headers.get("X-Service-Version", ""),
sbom_snapshot=json.loads(sbom_snapshot_str) if sbom_snapshot_str else None,
)
def report_trace_and_sbom(span_data: Dict, sbom: SBOM):
"""向收集器上报追踪Span和完整的SBOM"""
payload = {
"span": span_data,
"sbom": sbom.to_dict(),
}
try:
resp = requests.post(COLLECTOR_URL, json=payload, timeout=1)
resp.raise_for_status()
except Exception as e:
logger.warning(f"Failed to report to collector: {e}")
def emit_span(operation: str, context: TraceContext, start_time: float, duration: float, tags: Dict = None):
"""构造并上报一个Span"""
span_data = {
"trace_id": context.trace_id,
"span_id": context.span_id,
"parent_span_id": context.parent_span_id,
"operation": operation,
"service_name": context.service_name,
"service_version": context.service_version,
"start_time": start_time,
"duration": duration,
"tags": tags or {},
}
# 获取本服务的完整SBOM
sbom = ServiceSBOM.get_for_service(context.service_name, context.service_version)
report_trace_and_sbom(span_data, sbom)
文件路径:services/frontend.py
import time
from flask import Flask, request, jsonify
import requests
import logging
from services.common import TraceContext, emit_span, CONFIG
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
SERVICE_CONFIG = CONFIG['services']['frontend']
INVENTORY_URL = f"http://{CONFIG['services']['inventory']['host']}:{CONFIG['services']['inventory']['port']}"
ORDER_URL = f"http://{CONFIG['services']['order']['host']}:{CONFIG['services']['order']['port']}"
@app.route('/checkout', methods=['POST'])
def checkout():
start_time = time.time()
# 1. 提取或创建追踪上下文
context = TraceContext.from_headers(request.headers) or TraceContext()
context.service_name = SERVICE_CONFIG['name']
context.service_version = SERVICE_CONFIG['version']
# 2. 为当前span生成新的span_id
current_span_id = context.span_id
context.span_id = TraceContext().span_id # 新span的ID
context.parent_span_id = current_span_id
# 3. 调用库存服务
inventory_headers = context.to_headers()
try:
resp = requests.post(f"{INVENTORY_URL}/deduct", headers=inventory_headers, timeout=2)
resp.raise_for_status()
except requests.exceptions.RequestException as e:
return jsonify({"error": f"Inventory call failed: {e}"}), 500
# 4. 调用订单服务
order_headers = context.to_headers()
try:
resp = requests.post(f"{ORDER_URL}/create", headers=order_headers, timeout=2)
resp.raise_for_status()
except requests.exceptions.RequestException as e:
return jsonify({"error": f"Order call failed: {e}"}), 500
duration = time.time() - start_time
# 5. 上报frontend自身的span
emit_span("POST /checkout", context, start_time, duration, tags={"http.method": "POST", "http.route": "/checkout"})
return jsonify({"status": "order_created", "trace_id": context.trace_id}), 200
if __name__ == '__main__':
app.run(host=SERVICE_CONFIG['host'], port=SERVICE_CONFIG['port'], debug=False)
文件路径:services/inventory.py
import time
from flask import Flask, request, jsonify
from services.common import TraceContext, emit_span, CONFIG
app = Flask(__name__)
SERVICE_CONFIG = CONFIG['services']['inventory']
@app.route('/deduct', methods=['POST'])
def deduct():
start_time = time.time()
context = TraceContext.from_headers(request.headers)
if not context:
context = TraceContext()
context.service_name = SERVICE_CONFIG['name']
context.service_version = SERVICE_CONFIG['version']
# 模拟处理逻辑
time.sleep(0.05)
duration = time.time() - start_time
emit_span("POST /deduct", context, start_time, duration, tags={"http.method": "POST"})
return jsonify({"status": "inventory_deducted"}), 200
if __name__ == '__main__':
app.run(host=SERVICE_CONFIG['host'], port=SERVICE_CONFIG['port'], debug=False)
文件路径:services/order.py
import time
from flask import Flask, request, jsonify
from services.common import TraceContext, emit_span, CONFIG
app = Flask(__name__)
SERVICE_CONFIG = CONFIG['services']['order']
@app.route('/create', methods=['POST'])
def create():
start_time = time.time()
context = TraceContext.from_headers(request.headers)
if not context:
context = TraceContext()
context.service_name = SERVICE_CONFIG['name']
context.service_version = SERVICE_CONFIG['version']
# 模拟处理逻辑
time.sleep(0.1)
duration = time.time() - start_time
emit_span("POST /create", context, start_time, duration, tags={"http.method": "POST"})
return jsonify({"status": "order_created"}), 200
if __name__ == '__main__':
app.run(host=SERVICE_CONFIG['host'], port=SERVICE_CONFIG['port'], debug=False)
文件路径:collector/server.py
from flask import Flask, request, jsonify
import sqlite3
import threading
import logging
from datetime import datetime
import json as json_lib
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
DB_PATH = "./data/trace.db"
lock = threading.Lock()
def init_db():
"""初始化数据库表"""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# 存储追踪Span
c.execute('''
CREATE TABLE IF NOT EXISTS spans (
id INTEGER PRIMARY KEY AUTOINCREMENT,
trace_id TEXT NOT NULL,
span_id TEXT NOT NULL,
parent_span_id TEXT,
operation TEXT,
service_name TEXT NOT NULL,
service_version TEXT NOT NULL,
start_time REAL,
duration REAL,
tags TEXT, -- JSON字符串
received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 存储SBOM信息
c.execute('''
CREATE TABLE IF NOT EXISTS sboms (
id INTEGER PRIMARY KEY AUTOINCREMENT,
service_name TEXT NOT NULL,
service_version TEXT NOT NULL,
component_name TEXT NOT NULL,
component_version TEXT NOT NULL,
component_purl TEXT,
UNIQUE(service_name, service_version, component_name, component_version)
)
''')
# 存储Span与SBOM组件的关联(简化:一个Span对应其服务的最新SBOM视图)
# 实际中可能需要更复杂的关联关系
c.execute('''
CREATE TABLE IF NOT EXISTS vulnerability_alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
vuln_id TEXT NOT NULL,
component_name TEXT NOT NULL,
affected_version TEXT NOT NULL,
severity TEXT,
trace_id TEXT, -- 关联到受影响的追踪
span_id TEXT,
detected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
@app.route('/ingest', methods=['POST'])
def ingest():
data = request.json
span_data = data.get('span')
sbom_data = data.get('sbom')
if not span_data or not sbom_data:
return jsonify({"error": "Missing span or sbom data"}), 400
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
try:
# 1. 插入Span
c.execute('''
INSERT INTO spans (trace_id, span_id, parent_span_id, operation, service_name, service_version, start_time, duration, tags)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
span_data['trace_id'], span_data['span_id'], span_data.get('parent_span_id'),
span_data['operation'], span_data['service_name'], span_data['service_version'],
span_data['start_time'], span_data['duration'], json_lib.dumps(span_data.get('tags', {}))
))
# 2. 插入或更新SBOM组件(简化处理,实际可能需要版本管理)
for comp in sbom_data['components']:
c.execute('''
INSERT OR REPLACE INTO sboms (service_name, service_version, component_name, component_version, component_purl)
VALUES (?, ?, ?, ?, ?)
''', (
sbom_data['service_name'], sbom_data['service_version'],
comp['name'], comp['version'], comp['purl']
))
conn.commit()
logging.info(f"Ingested span {span_data['span_id']} for trace {span_data['trace_id']}")
except Exception as e:
conn.rollback()
logging.error(f"Failed to ingest data: {e}")
return jsonify({"error": str(e)}), 500
finally:
conn.close()
return jsonify({"status": "ok"}), 200
@app.route('/trace/<trace_id>', methods=['GET'])
def get_trace(trace_id):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('SELECT * FROM spans WHERE trace_id = ? ORDER BY start_time', (trace_id,))
rows = c.fetchall()
conn.close()
columns = [description[0] for description in c.description]
traces = [dict(zip(columns, row)) for row in rows]
for t in traces:
if t['tags']:
t['tags'] = json_lib.loads(t['tags'])
return jsonify(traces)
if __name__ == '__main__':
init_db()
app.run(host=CONFIG['collector']['host'], port=CONFIG['collector']['port'], debug=False)
文件路径:analyzer/models.py
from dataclasses import dataclass
from typing import List
@dataclass
class Vulnerability:
"""模拟漏洞信息"""
id: str # 如 CVE-2024-12345
component_name: str
affected_version_range: str # 简化版本范围表示,如 "<=1.0.0"
severity: str # CRITICAL, HIGH, MEDIUM, LOW
# 模拟一个漏洞数据库
VULNERABILITY_DB: List[Vulnerability] = [
Vulnerability(id="CVE-SIM-2024-001", component_name="vulnerable-lib", affected_version_range="<=1.0.0", severity="HIGH"),
Vulnerability(id="CVE-SIM-2024-002", component_name="flask", affected_version_range="<2.0.0", severity="MEDIUM"),
]
文件路径:analyzer/engine.py
import sqlite3
import logging
import time
from typing import List
from analyzer.models import VULNERABILITY_DB, Vulnerability
logging.basicConfig(level=logging.INFO)
DB_PATH = "./data/trace.db"
def scan_for_vulnerabilities():
"""扫描数据库中的SBOM组件,匹配漏洞库"""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
alerts_created = 0
try:
# 获取所有唯一的服务-组件-版本组合
c.execute('''
SELECT DISTINCT service_name, service_version, component_name, component_version
FROM sboms
''')
components = c.fetchall()
for service_name, service_version, comp_name, comp_version in components:
for vuln in VULNERABILITY_DB:
if vuln.component_name != comp_name:
continue
# 简化版本匹配:这里仅做精确版本号等于受影响范围的检查。实际需要使用semver等库。
# 例如,假设affected_version_range是"<=1.0.0",我们检查comp_version是否为"1.0.0"或更早。
# 为简化,我们只检查完全相等。
if vuln.affected_version_range.startswith("<=") and comp_version == vuln.affected_version_range[2:]:
is_affected = True
elif vuln.affected_version_range.startswith("<") and comp_version == vuln.affected_version_range[1:]:
is_affected = True
else:
is_affected = False
if is_affected:
logging.warning(f"发现漏洞!服务={service_name}:{service_version}, 组件={comp_name}:{comp_version}, 漏洞={vuln.id}")
# 查找最近使用该服务组件的追踪(简化:找最近该服务的任意Span)
c.execute('''
SELECT trace_id, span_id FROM spans
WHERE service_name = ? AND service_version = ?
ORDER BY start_time DESC LIMIT 1
''', (service_name, service_version))
recent_trace = c.fetchone()
trace_id, span_id = recent_trace if recent_trace else (None, None)
# 插入告警
c.execute('''
INSERT INTO vulnerability_alerts (vuln_id, component_name, affected_version, severity, trace_id, span_id)
VALUES (?, ?, ?, ?, ?, ?)
''', (vuln.id, comp_name, comp_version, vuln.severity, trace_id, span_id))
alerts_created += 1
conn.commit()
logging.info(f"漏洞扫描完成,创建了 {alerts_created} 条新告警。")
except Exception as e:
logging.error(f"扫描过程中出错: {e}")
conn.rollback()
finally:
conn.close()
def run_periodic_scan(interval_seconds: int = 60):
"""周期性运行扫描"""
while True:
scan_for_vulnerabilities()
time.sleep(interval_seconds)
if __name__ == '__main__':
# 单次运行
scan_for_vulnerabilities()
文件路径:ui/app.py
from flask import Flask, render_template_string, request, jsonify
import sqlite3
import json as json_lib
app = Flask(__name__)
DB_PATH = "./data/trace.db"
HTML_TEMPLATE = '''
<!DOCTYPE html>
<html>
<head>
<title>供应链安全追踪者</title>
<style>
body { font-family: sans-serif; margin: 2em; }
.section { margin-bottom: 2em; padding: 1em; border: 1px solid #ccc; border-radius: 5px; }
input, button { margin: 0.5em; padding: 0.5em; }
table { border-collapse: collapse; width: 100%%; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f2f2f2; }
.vuln-high { background-color: #ffcccc; }
.trace-graph { border: 1px solid #000; padding: 1em; min-height: 200px; }
</style>
<script src="https://cdn.jsdelivr.net/npm/mermaid@10/dist/mermaid.min.js"></script>
<script>mermaid.initialize({ startOnLoad: true });</script>
</head>
<body>
<h1>供应链安全追踪者</h1>
<div class="section">
<h3>1. 通过Trace ID查询</h3>
<form onsubmit="fetchTrace(); return false;">
<input type="text" id="traceId" placeholder="输入 Trace ID" size="40">
<button type="submit">查询链路</button>
</form>
<div id="traceResult"></div>
</div>
<div class="section">
<h3>2. 查看漏洞告警</h3>
<button onclick="fetchAlerts()">刷新告警列表</button>
<div id="alertsResult"></div>
</div>
<script>
function fetchTrace() {
const traceId = document.getElementById('traceId').value;
fetch(`/api/trace/${traceId}`)
.then(r => r.json())
.then(data => {
let html = `<h4>链路详情 (Trace ID: ${traceId})</h4>`;
if(data.length === 0) {
html += `<p>未找到追踪数据。</p>`;
} else {
// 生成Mermaid序列图
let mermaidCode = \`sequenceDiagram\\n\`;
data.forEach(span => {
mermaidCode += \` \${span.service_name}->>\${span.service_name}: \${span.operation}\\n\`;
});
html += `<div class="mermaid">${mermaidCode}</div>`;
// 表格详情
html += `<table><tr><th>服务</th><th>操作</th><th>耗时(ms)</th><th>时间戳</th></tr>`;
data.forEach(span => {
html += `<tr><td>\${span.service_name}:\${span.service_version}</td><td>\${span.operation}</td><td>\${(span.duration*1000).toFixed(2)}</td><td>\${new Date(span.start_time*1000).toISOString()}</td></tr>`;
});
html += `</table>`;
// 关联的漏洞(简化查询)
fetch(`/api/alerts?trace_id=${traceId}`)
.then(r => r.json())
.then(alerts => {
if(alerts.length > 0) {
html += `<h5 style="color:red;">⚠️ 在此链路上检测到漏洞告警</h5><ul>`;
alerts.forEach(a => {
html += `<li><strong>\${a.vuln_id}</strong> 影响组件 \${a.component_name}@\${a.affected_version} (严重性: \${a.severity})</li>`;
});
html += `</ul>`;
}
document.getElementById('traceResult').innerHTML = html;
mermaid.contentLoaded(); // 重新渲染Mermaid图
});
}
});
}
function fetchAlerts() {
fetch('/api/alerts')
.then(r => r.json())
.then(data => {
let html = `<h4>漏洞告警</h4>`;
if(data.length === 0) {
html += `<p>暂无告警。</p>`;
} else {
html += `<table><tr><th>漏洞ID</th><th>组件</th><th>受影响版本</th><th>严重性</th><th>关联Trace ID</th><th>检测时间</th></tr>`;
data.forEach(a => {
const rowClass = a.severity === 'HIGH' ? 'class="vuln-high"' : '';
const traceLink = a.trace_id ? `<a href="#" onclick="document.getElementById('traceId').value='${a.trace_id}'; fetchTrace();">${a.trace_id.substring(0,8)}...</a>` : 'N/A';
html += `<tr ${rowClass}><td>${a.vuln_id}</td><td>${a.component_name}</td><td>${a.affected_version}</td><td>${a.severity}</td><td>${traceLink}</td><td>${a.detected_at}</td></tr>`;
});
html += `</table>`;
}
document.getElementById('alertsResult').innerHTML = html;
});
}
</script>
</body>
</html>
'''
@app.route('/')
def index():
return render_template_string(HTML_TEMPLATE)
@app.route('/api/trace/<trace_id>')
def api_get_trace(trace_id):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('SELECT * FROM spans WHERE trace_id = ? ORDER BY start_time', (trace_id,))
rows = c.fetchall()
conn.close()
columns = [description[0] for description in c.description]
traces = [dict(zip(columns, row)) for row in rows]
for t in traces:
if t['tags']:
t['tags'] = json_lib.loads(t['tags'])
return jsonify(traces)
@app.route('/api/alerts')
def api_get_alerts():
trace_id = request.args.get('trace_id')
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
if trace_id:
c.execute('SELECT * FROM vulnerability_alerts WHERE trace_id = ? ORDER BY detected_at DESC', (trace_id,))
else:
c.execute('SELECT * FROM vulnerability_alerts ORDER BY detected_at DESC LIMIT 50')
rows = c.fetchall()
conn.close()
columns = [description[0] for description in c.description]
alerts = [dict(zip(columns, row)) for row in rows]
return jsonify(alerts)
if __name__ == '__main__':
app.run(host="localhost", port=8050, debug=False)
文件路径:data/init_db.py
#!/usr/bin/env python3
import sqlite3
DB_PATH = "./data/trace.db"
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# 此文件用于手动初始化或重置数据库。表结构已在collector中创建。
print("数据库文件已准备在", DB_PATH)
conn.close()
文件路径:run.py
#!/usr/bin/env python3
import subprocess
import sys
import time
import os
import signal
from typing import List
import typer
app = typer.Typer()
processes: List[subprocess.Popen] = []
def start_service(module: str, name: str):
"""启动一个服务进程"""
env = os.environ.copy()
# 确保Python路径包含当前目录
cmd = [sys.executable, "-m", module]
proc = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
processes.append(proc)
typer.echo(f"启动 {name} (PID: {proc.pid})")
time.sleep(0.7) # 给服务启动留出一点时间
# 打印初始输出
for line in iter(proc.stdout.readline, ''):
if "Running on" in line or "ingest" in line or "scan" in line:
typer.echo(f" [{name}]: {line.strip()}")
break
@app.command()
def start():
"""启动所有组件"""
typer.echo("正在启动供应链安全追踪者系统...")
# 0. 确保数据目录存在
os.makedirs("./data", exist_ok=True)
# 1. 启动收集器
start_service("collector.server", "Collector")
# 2. 启动后端服务 (逆序启动,依赖方先于被依赖方?)
start_service("services.order", "Order Service")
start_service("services.inventory", "Inventory Service")
start_service("services.frontend", "Frontend Service")
# 3. 启动分析引擎(后台线程模式)
import threading
from analyzer.engine import run_periodic_scan
import config as cfg
scan_interval = cfg.CONFIG['analyzer']['scan_interval_seconds']
analyzer_thread = threading.Thread(target=run_periodic_scan, args=(scan_interval,), daemon=True)
analyzer_thread.start()
typer.echo(f"分析引擎已在后台线程启动,扫描间隔 {scan_interval} 秒。")
# 4. 启动UI
start_service("ui.app", "Web UI")
typer.echo("\n所有服务已启动!")
typer.echo("-" * 50)
typer.echo("前端服务: http://localhost:8000")
typer.echo("Web 界面: http://localhost:8050")
typer.echo("收集器端点: http://localhost:8888/ingest")
typer.echo("-" * 50)
typer.echo("\n按 Ctrl+C 停止所有服务。")
# 等待信号
try:
for proc in processes:
proc.wait()
except KeyboardInterrupt:
typer.echo("\n正在关闭服务...")
for proc in processes:
proc.terminate()
for proc in processes:
proc.wait()
typer.echo("所有服务已停止。")
if __name__ == "__main__":
# 将当前目录加入Python路径,便于模块导入
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 动态加载config
import yaml
with open('./config.yaml', 'r') as f:
config_module = type('config', (), {'CONFIG': yaml.safe_load(f)})()
sys.modules['config'] = config_module
app()
4 安装依赖与运行步骤
- 环境准备: 确保已安装 Python 3.8+ 和 pip。
- 克隆/创建项目目录: 将所有上述文件按结构树放置。
- 安装依赖:
cd supply-chain-trace
pip install .
# 或直接安装核心依赖
# pip install flask requests pyyaml sqlalchemy python-json-logger typer
- 运行系统:
# 方式一:使用启动脚本(推荐)
python run.py start
# 方式二:手动依次启动(多终端)
# 终端1 - 收集器
python -m collector.server
# 终端2 - 订单服务
python -m services.order
# 终端3 - 库存服务
python -m services.inventory
# 终端4 - 前端服务
python -m services.frontend
# 终端5 - 分析引擎 (单次扫描)
python -m analyzer.engine
# 终端6 - Web界面
python -m ui.app
5 测试与验证步骤
- 触发一次调用链: 使用
curl或浏览器访问前端服务的/checkout端点。
curl -X POST http://localhost:8000/checkout
响应中将包含一个 `trace_id`。
- 在Web界面查看链路:
- 打开浏览器访问
http://localhost:8050。 - 在 "通过Trace ID查询" 框中粘贴上一步获得的
trace_id,点击查询。 - 页面将显示该请求的调用序列图及Span详情。
- 打开浏览器访问
sequenceDiagram
participant User
participant Frontend
participant Inventory
participant Order
participant Collector
User->>Frontend: POST /checkout
Frontend->>Inventory: POST /deduct (携带TraceContext)
Inventory->>Collector: 上报Span+SBOM
Inventory-->>Frontend: 响应
Frontend->>Order: POST /create (携带TraceContext)
Order->>Collector: 上报Span+SBOM
Order-->>Frontend: 响应
Frontend->>Collector: 上报Span+SBOM
Frontend-->>User: 返回 trace_id
- 查看漏洞告警:
- 在Web界面点击"刷新告警列表"。
- 由于我们在
frontend和order服务中预置了vulnerable-lib@1.0.0,且漏洞库(analyzer/models.py)中存在影响<=1.0.0版本的漏洞CVE-SIM-2024-001,因此应能看到至少一条高严重性告警。 - 告警会关联到最近一次调用链的
trace_id,可点击链接直接查看受影响链路。
- 验证数据聚合: 您也可以通过查询收集器的API直接检查原始数据。
# 获取特定追踪的所有Span
curl http://localhost:8888/trace/<你的trace_id>
6 架构设计与权衡分析
6.1 核心架构图
本系统的核心在于将动态链路数据与静态SBOM在收集器中进行关联,并通过分析引擎产生安全洞察。
graph TD
A[客户端请求] --> B[Frontend Service];
B -- 注入TraceContext/SBOM快照 --> C[Inventory Service];
B -- 注入TraceContext/SBOM快照 --> D[Order Service];
B --> E[Collector];
C --> E;
D --> E;
E --> F[(关联存储)];
F --> G[Analyzer Engine];
H[漏洞情报库] --> G;
G --> I[生成安全告警];
F --> J[Web UI];
I --> J;
6.2 关键权衡
- 数据粒度与性能: 在Span中传递完整的SBOM会显著增加开销。本设计采用摘要快照(仅服务名和版本)传播,在收集器侧关联完整SBOM,是带宽与功能间的折中。
- SBOM一致性: 服务实例的SBOM可能随部署更新。本原型采用"服务启动时加载"的静态模型。生产环境需集成SBOM注册中心,并可能需要在Span中携带SBOM内容的哈希值以验证一致性。
- 分析时效性: 定期扫描(批处理)而非实时分析,在漏洞披露和风险暴露间存在延迟。对关键漏洞可引入基于消息的实时触发机制。
- 存储与查询: 使用SQLite简化演示。生产环境需要可扩展的时序数据库(如Jaeger后端)和图数据库(用于依赖关系分析)的组合。
6.3 SBOM流转与关联逻辑
下图详细说明了SBOM信息在请求处理过程中的流转与最终关联分析的过程。
graph LR
S1[Frontend SBOM] --> TC1[TraceContext];
S2[Inventory SBOM] --> TC2[TraceContext];
S3[Order SBOM] --> TC3[TraceContext];
TC1 --> R1[请求1: /checkout];
TC2 --> R1;
TC3 --> R1;
R1 --> C[Collector];
C --> DB[(数据库:<br/>Spans + SBOMs)];
DB --> AE[分析引擎];
VDB[漏洞数据库] --> AE;
AE --> ALERT[告警: CVE-SIM-2024-001<br/>影响服务: frontend, order];
ALERT --> UI[可视化界面];
7 扩展与生产级考虑
- 增强的SBOM管理: 集成SPDX或CycloneDX标准,从CI/CD流水线自动发布SBOM至注册中心。
- 更复杂的依赖图分析: 结合SBOM中的依赖关系,构建服务间及服务内部的组件依赖图,实现影响范围精准分析。
- 性能与可扩展性: 将收集器替换为OpenTelemetry Collector,后端使用Jaeger或Tempo,分析层使用Apache Spark或Flink进行流式处理。
- 安全与合规: 对追踪数据中的潜在敏感信息进行脱敏,确保SBOM存储的访问控制,并生成合规性报告。
通过本项目,我们展示了链路追踪技术与供应链安全结合的可行性与强大潜力,为构建具备深度可观测性的安全云原生架构提供了实践参考。