链路追踪在供应链安全中的架构设计与权衡

2900559190
2026年01月06日
更新于 2026年02月04日
37 次阅读
摘要:本文探讨了如何将链路追踪技术深度应用于软件供应链安全领域,设计并实现一个名为"供应链安全追踪者"的轻量级一体化原型系统。该系统通过为微服务注入分布式追踪能力,自动捕获运行时调用链,并关联静态的软件物料清单(SBOM),从而实现从动态API调用到静态依赖漏洞的贯穿式分析与风险可视。文章将详细阐述核心架构设计、关键代码实现(包括追踪上下文传播、SBOM解析聚合及安全分析引擎)以及面临的性能、数据一致性...

摘要

本文探讨了如何将链路追踪技术深度应用于软件供应链安全领域,设计并实现一个名为"供应链安全追踪者"的轻量级一体化原型系统。该系统通过为微服务注入分布式追踪能力,自动捕获运行时调用链,并关联静态的软件物料清单(SBOM),从而实现从动态API调用到静态依赖漏洞的贯穿式分析与风险可视。文章将详细阐述核心架构设计、关键代码实现(包括追踪上下文传播、SBOM解析聚合及安全分析引擎)以及面临的性能、数据一致性等工程权衡。我们将提供一个完整的、约1500行的可运行Python项目,包含模拟微服务、数据收集器和分析界面,并通过具体示例和Mermaid图表展示数据流转与风险定位过程。

1 项目概述:"供应链安全追踪者"原型

"供应链安全追踪者"项目旨在构建一个概念验证系统,演示如何利用链路追踪技术增强软件供应链安全。核心思想是:当用户发起一个请求(如提交订单)时,系统不仅记录该请求经过的微服务路径(动态链路),还能自动关联并分析这些服务及其依赖库的SBOM(静态清单),从而在安全事件发生时,快速定位受影响的API、服务及具体的漏洞依赖版本。

设计思路:

  1. 追踪与SBOM的融合: 在传统的追踪Span中注入service.versiondependencies.snapshot等标签。每个服务启动时生成/加载其SBOM,并在处理首个追踪请求时,将SBOM摘要信息附加到追踪上下文中向下游传递。
  2. 中心化分析与存储: 收集器接收追踪数据与SBOM信息,进行关联存储。分析引擎定期或实时对聚合后的"服务-依赖"图谱进行漏洞扫描。
  3. 风险可视化与追溯: 提供UI界面,可基于漏洞ID、服务名或依赖包名进行查询,并以调用链图的形式展示受影响的路径。

本项目模拟一个简化的电商场景,包含三个服务:frontend(前端网关)、inventory(库存服务)、order(订单服务)。

2 项目结构树

supply-chain-trace/
├── README.md                 # 项目说明(输出时忽略)
├── pyproject.toml            # 项目依赖与配置
├── run.py                    # 主启动脚本
├── services/                 # 模拟微服务目录
│   ├── common.py             # 公共工具(追踪、SBOM处理)
│   ├── frontend.py
│   ├── inventory.py
│   └── order.py
├── collector/                # 追踪与SBOM收集器
│   └── server.py
├── analyzer/                 # 安全分析引擎
│   ├── engine.py
│   └── models.py
├── ui/                       # 简易Web界面
│   └── app.py
├── data/                     # 数据存储(SQLite)
│   └── init_db.py
└── config.yaml               # 全局配置文件

3 核心代码实现

文件路径:pyproject.toml

[project]
name = "supply-chain-trace"
version = "0.1.0"
dependencies = [
    "flask>=3.0.0",
    "requests>=2.31.0",
    "pyyaml>=6.0",
    "sqlalchemy>=2.0.0",
    "python-json-logger>=2.0.0",
    "typer>=0.9.0",
]

[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

文件路径:config.yaml

services:
  frontend: &base_service
    host: "localhost"
    port: 8000
    name: "frontend"
    version: "v1.2.0"
  inventory:
    <<: *base_service
    port: 8001
    name: "inventory"
    version: "v1.1.5"
  order:
    <<: *base_service
    port: 8002
    name: "order"
    version: "v1.3.0"
  collector:
    host: "localhost"
    port: 8888
  analyzer:
    scan_interval_seconds: 60
database:
  url: "sqlite:///./data/trace.db"
logging:
  level: "INFO"

文件路径:services/common.py

import json
import logging
import uuid
from dataclasses import asdict, dataclass, field
from typing import Dict, List, Optional
import requests
import yaml

# 加载配置
with open('./config.yaml', 'r') as f:
    CONFIG = yaml.safe_load(f)
COLLECTOR_URL = f"http://{CONFIG['collector']['host']}:{CONFIG['collector']['port']}/ingest"

logger = logging.getLogger(__name__)

@dataclass
class SBOMComponent:
    """模拟SBOM组件(依赖项)"""
    type: str = "library"
    name: str = ""
    version: str = ""
    purl: str = ""  # Package URL

@dataclass
class SBOM:
    """简化版SBOM表示"""
    service_name: str
    service_version: str
    components: List[SBOMComponent] = field(default_factory=list)

    def to_dict(self):
        return asdict(self)

class ServiceSBOM:
    """管理服务的SBOM"""
    _cache: Dict[str, SBOM] = {}

    @classmethod
    def get_for_service(cls, service_name: str, service_version: str) -> SBOM:
        key = f"{service_name}:{service_version}"
        if key not in cls._cache:
            # 模拟:根据服务名和版本生成/加载一个静态SBOM
            # 在实际应用中,这里会从文件(如bom.json)或注册中心加载
            sbom = SBOM(service_name=service_name, service_version=service_version)
            if service_name == "frontend":
                sbom.components = [
                    SBOMComponent(name="flask", version="2.3.0", purl="pkg:pypi/flask@2.3.0"),
                    SBOMComponent(name="requests", version="2.31.0", purl="pkg:pypi/requests@2.31.0"),
                    SBOMComponent(name="vulnerable-lib", version="1.0.0", purl="pkg:pypi/vulnerable-lib@1.0.0"),  # 模拟漏洞库
                ]
            elif service_name == "inventory":
                sbom.components = [
                    SBOMComponent(name="sqlalchemy", version="2.0.0", purl="pkg:pypi/sqlalchemy@2.0.0"),
                ]
            elif service_name == "order":
                sbom.components = [
                    SBOMComponent(name="flask", version="2.3.0", purl="pkg:pypi/flask@2.3.0"),
                    SBOMComponent(name="vulnerable-lib", version="1.0.0", purl="pkg:pypi/vulnerable-lib@1.0.0"),
                ]
            cls._cache[key] = sbom
        return cls._cache[key]

@dataclass
class TraceContext:
    """分布式追踪上下文,携带追踪ID和SBOM摘要"""
    trace_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    span_id: str = field(default_factory=lambda: str(uuid.uuid4().hex[:8]))
    parent_span_id: Optional[str] = None
    service_name: str = ""
    service_version: str = ""
    sbom_snapshot: Optional[Dict] = None  # 上游服务的SBOM摘要

    def to_headers(self) -> Dict[str, str]:
        """将上下文注入HTTP头部,用于传播"""
        return {
            "X-Trace-ID": self.trace_id,
            "X-Span-ID": self.span_id,
            "X-Parent-Span-ID": self.parent_span_id or "",
            "X-Service-Name": self.service_name,
            "X-Service-Version": self.service_version,
            "X-SBOM-Snapshot": json.dumps(self.sbom_snapshot) if self.sbom_snapshot else "",
        }

    @classmethod
    def from_headers(cls, headers: Dict[str, str]) -> Optional["TraceContext"]:
        """从HTTP头部提取上下文"""
        trace_id = headers.get("X-Trace-ID")
        if not trace_id:
            return None
        sbom_snapshot_str = headers.get("X-SBOM-Snapshot", "")
        return cls(
            trace_id=trace_id,
            span_id=headers.get("X-Span-ID", str(uuid.uuid4().hex[:8])),
            parent_span_id=headers.get("X-Parent-Span-ID") or None,
            service_name=headers.get("X-Service-Name", ""),
            service_version=headers.get("X-Service-Version", ""),
            sbom_snapshot=json.loads(sbom_snapshot_str) if sbom_snapshot_str else None,
        )

def report_trace_and_sbom(span_data: Dict, sbom: SBOM):
    """向收集器上报追踪Span和完整的SBOM"""
    payload = {
        "span": span_data,
        "sbom": sbom.to_dict(),
    }
    try:
        resp = requests.post(COLLECTOR_URL, json=payload, timeout=1)
        resp.raise_for_status()
    except Exception as e:
        logger.warning(f"Failed to report to collector: {e}")

def emit_span(operation: str, context: TraceContext, start_time: float, duration: float, tags: Dict = None):
    """构造并上报一个Span"""
    span_data = {
        "trace_id": context.trace_id,
        "span_id": context.span_id,
        "parent_span_id": context.parent_span_id,
        "operation": operation,
        "service_name": context.service_name,
        "service_version": context.service_version,
        "start_time": start_time,
        "duration": duration,
        "tags": tags or {},
    }
    # 获取本服务的完整SBOM
    sbom = ServiceSBOM.get_for_service(context.service_name, context.service_version)
    report_trace_and_sbom(span_data, sbom)

文件路径:services/frontend.py

import time
from flask import Flask, request, jsonify
import requests
import logging
from services.common import TraceContext, emit_span, CONFIG

app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
SERVICE_CONFIG = CONFIG['services']['frontend']
INVENTORY_URL = f"http://{CONFIG['services']['inventory']['host']}:{CONFIG['services']['inventory']['port']}"
ORDER_URL = f"http://{CONFIG['services']['order']['host']}:{CONFIG['services']['order']['port']}"

@app.route('/checkout', methods=['POST'])
def checkout():
    start_time = time.time()
    # 1. 提取或创建追踪上下文
    context = TraceContext.from_headers(request.headers) or TraceContext()
    context.service_name = SERVICE_CONFIG['name']
    context.service_version = SERVICE_CONFIG['version']
    # 2. 为当前span生成新的span_id
    current_span_id = context.span_id
    context.span_id = TraceContext().span_id  # 新span的ID
    context.parent_span_id = current_span_id

    # 3. 调用库存服务
    inventory_headers = context.to_headers()
    try:
        resp = requests.post(f"{INVENTORY_URL}/deduct", headers=inventory_headers, timeout=2)
        resp.raise_for_status()
    except requests.exceptions.RequestException as e:
        return jsonify({"error": f"Inventory call failed: {e}"}), 500

    # 4. 调用订单服务
    order_headers = context.to_headers()
    try:
        resp = requests.post(f"{ORDER_URL}/create", headers=order_headers, timeout=2)
        resp.raise_for_status()
    except requests.exceptions.RequestException as e:
        return jsonify({"error": f"Order call failed: {e}"}), 500

    duration = time.time() - start_time
    # 5. 上报frontend自身的span
    emit_span("POST /checkout", context, start_time, duration, tags={"http.method": "POST", "http.route": "/checkout"})
    return jsonify({"status": "order_created", "trace_id": context.trace_id}), 200

if __name__ == '__main__':
    app.run(host=SERVICE_CONFIG['host'], port=SERVICE_CONFIG['port'], debug=False)

文件路径:services/inventory.py

import time
from flask import Flask, request, jsonify
from services.common import TraceContext, emit_span, CONFIG

app = Flask(__name__)
SERVICE_CONFIG = CONFIG['services']['inventory']

@app.route('/deduct', methods=['POST'])
def deduct():
    start_time = time.time()
    context = TraceContext.from_headers(request.headers)
    if not context:
        context = TraceContext()
    context.service_name = SERVICE_CONFIG['name']
    context.service_version = SERVICE_CONFIG['version']
    # 模拟处理逻辑
    time.sleep(0.05)
    duration = time.time() - start_time
    emit_span("POST /deduct", context, start_time, duration, tags={"http.method": "POST"})
    return jsonify({"status": "inventory_deducted"}), 200

if __name__ == '__main__':
    app.run(host=SERVICE_CONFIG['host'], port=SERVICE_CONFIG['port'], debug=False)

文件路径:services/order.py

import time
from flask import Flask, request, jsonify
from services.common import TraceContext, emit_span, CONFIG

app = Flask(__name__)
SERVICE_CONFIG = CONFIG['services']['order']

@app.route('/create', methods=['POST'])
def create():
    start_time = time.time()
    context = TraceContext.from_headers(request.headers)
    if not context:
        context = TraceContext()
    context.service_name = SERVICE_CONFIG['name']
    context.service_version = SERVICE_CONFIG['version']
    # 模拟处理逻辑
    time.sleep(0.1)
    duration = time.time() - start_time
    emit_span("POST /create", context, start_time, duration, tags={"http.method": "POST"})
    return jsonify({"status": "order_created"}), 200

if __name__ == '__main__':
    app.run(host=SERVICE_CONFIG['host'], port=SERVICE_CONFIG['port'], debug=False)

文件路径:collector/server.py

from flask import Flask, request, jsonify
import sqlite3
import threading
import logging
from datetime import datetime
import json as json_lib

app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
DB_PATH = "./data/trace.db"
lock = threading.Lock()

def init_db():
    """初始化数据库表"""
    conn = sqlite3.connect(DB_PATH)
    c = conn.cursor()
    # 存储追踪Span
    c.execute('''
        CREATE TABLE IF NOT EXISTS spans (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            trace_id TEXT NOT NULL,
            span_id TEXT NOT NULL,
            parent_span_id TEXT,
            operation TEXT,
            service_name TEXT NOT NULL,
            service_version TEXT NOT NULL,
            start_time REAL,
            duration REAL,
            tags TEXT, -- JSON字符串
            received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    # 存储SBOM信息
    c.execute('''
        CREATE TABLE IF NOT EXISTS sboms (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            service_name TEXT NOT NULL,
            service_version TEXT NOT NULL,
            component_name TEXT NOT NULL,
            component_version TEXT NOT NULL,
            component_purl TEXT,
            UNIQUE(service_name, service_version, component_name, component_version)
        )
    ''')
    # 存储Span与SBOM组件的关联(简化:一个Span对应其服务的最新SBOM视图)
    # 实际中可能需要更复杂的关联关系
    c.execute('''
        CREATE TABLE IF NOT EXISTS vulnerability_alerts (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            vuln_id TEXT NOT NULL,
            component_name TEXT NOT NULL,
            affected_version TEXT NOT NULL,
            severity TEXT,
            trace_id TEXT, -- 关联到受影响的追踪
            span_id TEXT,
            detected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    conn.commit()
    conn.close()

@app.route('/ingest', methods=['POST'])
def ingest():
    data = request.json
    span_data = data.get('span')
    sbom_data = data.get('sbom')
    if not span_data or not sbom_data:
        return jsonify({"error": "Missing span or sbom data"}), 400

    conn = sqlite3.connect(DB_PATH)
    c = conn.cursor()
    try:
        # 1. 插入Span
        c.execute('''
            INSERT INTO spans (trace_id, span_id, parent_span_id, operation, service_name, service_version, start_time, duration, tags)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        ''', (
            span_data['trace_id'], span_data['span_id'], span_data.get('parent_span_id'),
            span_data['operation'], span_data['service_name'], span_data['service_version'],
            span_data['start_time'], span_data['duration'], json_lib.dumps(span_data.get('tags', {}))
        ))
        # 2. 插入或更新SBOM组件(简化处理,实际可能需要版本管理)
        for comp in sbom_data['components']:
            c.execute('''
                INSERT OR REPLACE INTO sboms (service_name, service_version, component_name, component_version, component_purl)
                VALUES (?, ?, ?, ?, ?)
            ''', (
                sbom_data['service_name'], sbom_data['service_version'],
                comp['name'], comp['version'], comp['purl']
            ))
        conn.commit()
        logging.info(f"Ingested span {span_data['span_id']} for trace {span_data['trace_id']}")
    except Exception as e:
        conn.rollback()
        logging.error(f"Failed to ingest data: {e}")
        return jsonify({"error": str(e)}), 500
    finally:
        conn.close()
    return jsonify({"status": "ok"}), 200

@app.route('/trace/<trace_id>', methods=['GET'])
def get_trace(trace_id):
    conn = sqlite3.connect(DB_PATH)
    c = conn.cursor()
    c.execute('SELECT * FROM spans WHERE trace_id = ? ORDER BY start_time', (trace_id,))
    rows = c.fetchall()
    conn.close()
    columns = [description[0] for description in c.description]
    traces = [dict(zip(columns, row)) for row in rows]
    for t in traces:
        if t['tags']:
            t['tags'] = json_lib.loads(t['tags'])
    return jsonify(traces)

if __name__ == '__main__':
    init_db()
    app.run(host=CONFIG['collector']['host'], port=CONFIG['collector']['port'], debug=False)

文件路径:analyzer/models.py

from dataclasses import dataclass
from typing import List

@dataclass
class Vulnerability:
    """模拟漏洞信息"""
    id: str  # 如 CVE-2024-12345
    component_name: str
    affected_version_range: str  # 简化版本范围表示,如 "<=1.0.0"
    severity: str  # CRITICAL, HIGH, MEDIUM, LOW

# 模拟一个漏洞数据库
VULNERABILITY_DB: List[Vulnerability] = [
    Vulnerability(id="CVE-SIM-2024-001", component_name="vulnerable-lib", affected_version_range="<=1.0.0", severity="HIGH"),
    Vulnerability(id="CVE-SIM-2024-002", component_name="flask", affected_version_range="<2.0.0", severity="MEDIUM"),
]

文件路径:analyzer/engine.py

import sqlite3
import logging
import time
from typing import List
from analyzer.models import VULNERABILITY_DB, Vulnerability

logging.basicConfig(level=logging.INFO)
DB_PATH = "./data/trace.db"

def scan_for_vulnerabilities():
    """扫描数据库中的SBOM组件,匹配漏洞库"""
    conn = sqlite3.connect(DB_PATH)
    c = conn.cursor()
    alerts_created = 0
    try:
        # 获取所有唯一的服务-组件-版本组合
        c.execute('''
            SELECT DISTINCT service_name, service_version, component_name, component_version
            FROM sboms
        ''')
        components = c.fetchall()
        for service_name, service_version, comp_name, comp_version in components:
            for vuln in VULNERABILITY_DB:
                if vuln.component_name != comp_name:
                    continue
                # 简化版本匹配:这里仅做精确版本号等于受影响范围的检查。实际需要使用semver等库。
                # 例如,假设affected_version_range是"<=1.0.0",我们检查comp_version是否为"1.0.0"或更早。
                # 为简化,我们只检查完全相等。
                if vuln.affected_version_range.startswith("<=") and comp_version == vuln.affected_version_range[2:]:
                    is_affected = True
                elif vuln.affected_version_range.startswith("<") and comp_version == vuln.affected_version_range[1:]:
                    is_affected = True
                else:
                    is_affected = False
                if is_affected:
                    logging.warning(f"发现漏洞!服务={service_name}:{service_version}, 组件={comp_name}:{comp_version}, 漏洞={vuln.id}")
                    # 查找最近使用该服务组件的追踪(简化:找最近该服务的任意Span)
                    c.execute('''
                        SELECT trace_id, span_id FROM spans
                        WHERE service_name = ? AND service_version = ?
                        ORDER BY start_time DESC LIMIT 1
                    ''', (service_name, service_version))
                    recent_trace = c.fetchone()
                    trace_id, span_id = recent_trace if recent_trace else (None, None)
                    # 插入告警
                    c.execute('''
                        INSERT INTO vulnerability_alerts (vuln_id, component_name, affected_version, severity, trace_id, span_id)
                        VALUES (?, ?, ?, ?, ?, ?)
                    ''', (vuln.id, comp_name, comp_version, vuln.severity, trace_id, span_id))
                    alerts_created += 1
        conn.commit()
        logging.info(f"漏洞扫描完成,创建了 {alerts_created} 条新告警。")
    except Exception as e:
        logging.error(f"扫描过程中出错: {e}")
        conn.rollback()
    finally:
        conn.close()

def run_periodic_scan(interval_seconds: int = 60):
    """周期性运行扫描"""
    while True:
        scan_for_vulnerabilities()
        time.sleep(interval_seconds)

if __name__ == '__main__':
    # 单次运行
    scan_for_vulnerabilities()

文件路径:ui/app.py

from flask import Flask, render_template_string, request, jsonify
import sqlite3
import json as json_lib

app = Flask(__name__)
DB_PATH = "./data/trace.db"

HTML_TEMPLATE = '''
<!DOCTYPE html>
<html>
<head>
    <title>供应链安全追踪者</title>
    <style>
        body { font-family: sans-serif; margin: 2em; }
        .section { margin-bottom: 2em; padding: 1em; border: 1px solid #ccc; border-radius: 5px; }
        input, button { margin: 0.5em; padding: 0.5em; }
        table { border-collapse: collapse; width: 100%%; }
        th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
        th { background-color: #f2f2f2; }
        .vuln-high { background-color: #ffcccc; }
        .trace-graph { border: 1px solid #000; padding: 1em; min-height: 200px; }
    </style>
    <script src="https://cdn.jsdelivr.net/npm/mermaid@10/dist/mermaid.min.js"></script>
    <script>mermaid.initialize({ startOnLoad: true });</script>
</head>
<body>
    <h1>供应链安全追踪者</h1>
    <div class="section">
        <h3>1. 通过Trace ID查询</h3>
        <form onsubmit="fetchTrace(); return false;">
            <input type="text" id="traceId" placeholder="输入 Trace ID" size="40">
            <button type="submit">查询链路</button>
        </form>
        <div id="traceResult"></div>
    </div>
    <div class="section">
        <h3>2. 查看漏洞告警</h3>
        <button onclick="fetchAlerts()">刷新告警列表</button>
        <div id="alertsResult"></div>
    </div>
    <script>
        function fetchTrace() {
            const traceId = document.getElementById('traceId').value;
            fetch(`/api/trace/${traceId}`)
                .then(r => r.json())
                .then(data => {
                    let html = `<h4>链路详情 (Trace ID: ${traceId})</h4>`;
                    if(data.length === 0) {
                        html += `<p>未找到追踪数据。</p>`;
                    } else {
                        // 生成Mermaid序列图
                        let mermaidCode = \`sequenceDiagram\\n\`;
                        data.forEach(span => {
                            mermaidCode += \`    \${span.service_name}->>\${span.service_name}: \${span.operation}\\n\`;
                        });
                        html += `<div class="mermaid">${mermaidCode}</div>`;
                        // 表格详情
                        html += `<table><tr><th>服务</th><th>操作</th><th>耗时(ms)</th><th>时间戳</th></tr>`;
                        data.forEach(span => {
                            html += `<tr><td>\${span.service_name}:\${span.service_version}</td><td>\${span.operation}</td><td>\${(span.duration*1000).toFixed(2)}</td><td>\${new Date(span.start_time*1000).toISOString()}</td></tr>`;
                        });
                        html += `</table>`;
                        // 关联的漏洞(简化查询)
                        fetch(`/api/alerts?trace_id=${traceId}`)
                            .then(r => r.json())
                            .then(alerts => {
                                if(alerts.length > 0) {
                                    html += `<h5 style="color:red;">⚠️ 在此链路上检测到漏洞告警</h5><ul>`;
                                    alerts.forEach(a => {
                                        html += `<li><strong>\${a.vuln_id}</strong> 影响组件 \${a.component_name}@\${a.affected_version} (严重性: \${a.severity})</li>`;
                                    });
                                    html += `</ul>`;
                                }
                                document.getElementById('traceResult').innerHTML = html;
                                mermaid.contentLoaded(); // 重新渲染Mermaid图
                            });
                    }
                });
        }
        function fetchAlerts() {
            fetch('/api/alerts')
                .then(r => r.json())
                .then(data => {
                    let html = `<h4>漏洞告警</h4>`;
                    if(data.length === 0) {
                        html += `<p>暂无告警。</p>`;
                    } else {
                        html += `<table><tr><th>漏洞ID</th><th>组件</th><th>受影响版本</th><th>严重性</th><th>关联Trace ID</th><th>检测时间</th></tr>`;
                        data.forEach(a => {
                            const rowClass = a.severity === 'HIGH' ? 'class="vuln-high"' : '';
                            const traceLink = a.trace_id ? `<a href="#" onclick="document.getElementById('traceId').value='${a.trace_id}'; fetchTrace();">${a.trace_id.substring(0,8)}...</a>` : 'N/A';
                            html += `<tr ${rowClass}><td>${a.vuln_id}</td><td>${a.component_name}</td><td>${a.affected_version}</td><td>${a.severity}</td><td>${traceLink}</td><td>${a.detected_at}</td></tr>`;
                        });
                        html += `</table>`;
                    }
                    document.getElementById('alertsResult').innerHTML = html;
                });
        }
    </script>
</body>
</html>
'''

@app.route('/')
def index():
    return render_template_string(HTML_TEMPLATE)

@app.route('/api/trace/<trace_id>')
def api_get_trace(trace_id):
    conn = sqlite3.connect(DB_PATH)
    c = conn.cursor()
    c.execute('SELECT * FROM spans WHERE trace_id = ? ORDER BY start_time', (trace_id,))
    rows = c.fetchall()
    conn.close()
    columns = [description[0] for description in c.description]
    traces = [dict(zip(columns, row)) for row in rows]
    for t in traces:
        if t['tags']:
            t['tags'] = json_lib.loads(t['tags'])
    return jsonify(traces)

@app.route('/api/alerts')
def api_get_alerts():
    trace_id = request.args.get('trace_id')
    conn = sqlite3.connect(DB_PATH)
    c = conn.cursor()
    if trace_id:
        c.execute('SELECT * FROM vulnerability_alerts WHERE trace_id = ? ORDER BY detected_at DESC', (trace_id,))
    else:
        c.execute('SELECT * FROM vulnerability_alerts ORDER BY detected_at DESC LIMIT 50')
    rows = c.fetchall()
    conn.close()
    columns = [description[0] for description in c.description]
    alerts = [dict(zip(columns, row)) for row in rows]
    return jsonify(alerts)

if __name__ == '__main__':
    app.run(host="localhost", port=8050, debug=False)

文件路径:data/init_db.py

#!/usr/bin/env python3
import sqlite3
DB_PATH = "./data/trace.db"
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# 此文件用于手动初始化或重置数据库。表结构已在collector中创建。
print("数据库文件已准备在", DB_PATH)
conn.close()

文件路径:run.py

#!/usr/bin/env python3
import subprocess
import sys
import time
import os
import signal
from typing import List
import typer

app = typer.Typer()
processes: List[subprocess.Popen] = []

def start_service(module: str, name: str):
    """启动一个服务进程"""
    env = os.environ.copy()
    # 确保Python路径包含当前目录
    cmd = [sys.executable, "-m", module]
    proc = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    processes.append(proc)
    typer.echo(f"启动 {name} (PID: {proc.pid})")
    time.sleep(0.7)  # 给服务启动留出一点时间
    # 打印初始输出
    for line in iter(proc.stdout.readline, ''):
        if "Running on" in line or "ingest" in line or "scan" in line:
            typer.echo(f"  [{name}]: {line.strip()}")
            break

@app.command()
def start():
    """启动所有组件"""
    typer.echo("正在启动供应链安全追踪者系统...")
    # 0. 确保数据目录存在
    os.makedirs("./data", exist_ok=True)
    # 1. 启动收集器
    start_service("collector.server", "Collector")
    # 2. 启动后端服务 (逆序启动,依赖方先于被依赖方?)
    start_service("services.order", "Order Service")
    start_service("services.inventory", "Inventory Service")
    start_service("services.frontend", "Frontend Service")
    # 3. 启动分析引擎(后台线程模式)
    import threading
    from analyzer.engine import run_periodic_scan
    import config as cfg
    scan_interval = cfg.CONFIG['analyzer']['scan_interval_seconds']
    analyzer_thread = threading.Thread(target=run_periodic_scan, args=(scan_interval,), daemon=True)
    analyzer_thread.start()
    typer.echo(f"分析引擎已在后台线程启动,扫描间隔 {scan_interval} 秒。")
    # 4. 启动UI
    start_service("ui.app", "Web UI")
    typer.echo("\n所有服务已启动!")
    typer.echo("-" * 50)
    typer.echo("前端服务:     http://localhost:8000")
    typer.echo("Web 界面:     http://localhost:8050")
    typer.echo("收集器端点:   http://localhost:8888/ingest")
    typer.echo("-" * 50)
    typer.echo("\n按 Ctrl+C 停止所有服务。")
    # 等待信号
    try:
        for proc in processes:
            proc.wait()
    except KeyboardInterrupt:
        typer.echo("\n正在关闭服务...")
        for proc in processes:
            proc.terminate()
        for proc in processes:
            proc.wait()
        typer.echo("所有服务已停止。")

if __name__ == "__main__":
    # 将当前目录加入Python路径,便于模块导入
    sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
    # 动态加载config
    import yaml
    with open('./config.yaml', 'r') as f:
        config_module = type('config', (), {'CONFIG': yaml.safe_load(f)})()
    sys.modules['config'] = config_module
    app()

4 安装依赖与运行步骤

  1. 环境准备: 确保已安装 Python 3.8+ 和 pip。
  2. 克隆/创建项目目录: 将所有上述文件按结构树放置。
  3. 安装依赖:
cd supply-chain-trace
    pip install .
    # 或直接安装核心依赖
    # pip install flask requests pyyaml sqlalchemy python-json-logger typer
  1. 运行系统:
# 方式一:使用启动脚本(推荐)
    python run.py start

    # 方式二:手动依次启动(多终端)
    # 终端1 - 收集器
    python -m collector.server
    # 终端2 - 订单服务
    python -m services.order
    # 终端3 - 库存服务
    python -m services.inventory
    # 终端4 - 前端服务
    python -m services.frontend
    # 终端5 - 分析引擎 (单次扫描)
    python -m analyzer.engine
    # 终端6 - Web界面
    python -m ui.app

5 测试与验证步骤

  1. 触发一次调用链: 使用 curl 或浏览器访问前端服务的 /checkout 端点。
curl -X POST http://localhost:8000/checkout
响应中将包含一个 `trace_id`。
  1. 在Web界面查看链路:
    • 打开浏览器访问 http://localhost:8050
    • 在 "通过Trace ID查询" 框中粘贴上一步获得的 trace_id,点击查询。
    • 页面将显示该请求的调用序列图及Span详情。
sequenceDiagram participant User participant Frontend participant Inventory participant Order participant Collector User->>Frontend: POST /checkout Frontend->>Inventory: POST /deduct (携带TraceContext) Inventory->>Collector: 上报Span+SBOM Inventory-->>Frontend: 响应 Frontend->>Order: POST /create (携带TraceContext) Order->>Collector: 上报Span+SBOM Order-->>Frontend: 响应 Frontend->>Collector: 上报Span+SBOM Frontend-->>User: 返回 trace_id
  1. 查看漏洞告警:
    • 在Web界面点击"刷新告警列表"。
    • 由于我们在 frontendorder 服务中预置了 vulnerable-lib@1.0.0,且漏洞库(analyzer/models.py)中存在影响 <=1.0.0 版本的漏洞 CVE-SIM-2024-001,因此应能看到至少一条高严重性告警。
    • 告警会关联到最近一次调用链的 trace_id,可点击链接直接查看受影响链路。
  2. 验证数据聚合: 您也可以通过查询收集器的API直接检查原始数据。
# 获取特定追踪的所有Span
    curl http://localhost:8888/trace/<你的trace_id>

6 架构设计与权衡分析

6.1 核心架构图

本系统的核心在于将动态链路数据与静态SBOM在收集器中进行关联,并通过分析引擎产生安全洞察。

graph TD A[客户端请求] --> B[Frontend Service]; B -- 注入TraceContext/SBOM快照 --> C[Inventory Service]; B -- 注入TraceContext/SBOM快照 --> D[Order Service]; B --> E[Collector]; C --> E; D --> E; E --> F[(关联存储)]; F --> G[Analyzer Engine]; H[漏洞情报库] --> G; G --> I[生成安全告警]; F --> J[Web UI]; I --> J;

6.2 关键权衡

  1. 数据粒度与性能: 在Span中传递完整的SBOM会显著增加开销。本设计采用摘要快照(仅服务名和版本)传播,在收集器侧关联完整SBOM,是带宽与功能间的折中。
  2. SBOM一致性: 服务实例的SBOM可能随部署更新。本原型采用"服务启动时加载"的静态模型。生产环境需集成SBOM注册中心,并可能需要在Span中携带SBOM内容的哈希值以验证一致性。
  3. 分析时效性: 定期扫描(批处理)而非实时分析,在漏洞披露和风险暴露间存在延迟。对关键漏洞可引入基于消息的实时触发机制。
  4. 存储与查询: 使用SQLite简化演示。生产环境需要可扩展的时序数据库(如Jaeger后端)和图数据库(用于依赖关系分析)的组合。

6.3 SBOM流转与关联逻辑

下图详细说明了SBOM信息在请求处理过程中的流转与最终关联分析的过程。

graph LR S1[Frontend SBOM] --> TC1[TraceContext]; S2[Inventory SBOM] --> TC2[TraceContext]; S3[Order SBOM] --> TC3[TraceContext]; TC1 --> R1[请求1: /checkout]; TC2 --> R1; TC3 --> R1; R1 --> C[Collector]; C --> DB[(数据库:<br/>Spans + SBOMs)]; DB --> AE[分析引擎]; VDB[漏洞数据库] --> AE; AE --> ALERT[告警: CVE-SIM-2024-001<br/>影响服务: frontend, order]; ALERT --> UI[可视化界面];

7 扩展与生产级考虑

  • 增强的SBOM管理: 集成SPDX或CycloneDX标准,从CI/CD流水线自动发布SBOM至注册中心。
  • 更复杂的依赖图分析: 结合SBOM中的依赖关系,构建服务间及服务内部的组件依赖图,实现影响范围精准分析。
  • 性能与可扩展性: 将收集器替换为OpenTelemetry Collector,后端使用Jaeger或Tempo,分析层使用Apache Spark或Flink进行流式处理。
  • 安全与合规: 对追踪数据中的潜在敏感信息进行脱敏,确保SBOM存储的访问控制,并生成合规性报告。

通过本项目,我们展示了链路追踪技术与供应链安全结合的可行性与强大潜力,为构建具备深度可观测性的安全云原生架构提供了实践参考。