摘要
本文通过构建一个模拟的高吞吐消息处理系统,深入剖析在十万级TPS(每秒事务处理量)场景下,消息队列端到端链路中的潜在性能瓶颈。我们将实现一个包含生产者、内存队列、消费者及分布式追踪组件的完整可运行项目,并设计一套性能剖析框架,用于收集、分析各环节的耗时指标。通过可视化的追踪数据与关键性能指标(KPI)分析,文章将演示如何定位从消息发布、序列化、队列存储、反序列化到消费处理的完整链路上的性能热点,并提供通用的瓶颈定位方法论与实践代码。
1. 项目概述:高TPS消息系统性能探针
在高并发微服务架构中,消息队列是解耦与削峰填谷的核心组件。当系统TPS达到十万级别时,任何细微的低效都会被放大,导致延迟飙升、吞吐下降。本项目旨在构建一个"性能探针"式系统,它模拟真实业务的消息生产与消费,并注入强大的可观测性,使我们能清晰地"看见"数据在流动过程中于何处减速。
核心设计思路:
- 内存队列模拟:为避免引入Kafka/RocketMQ等具体中间件的复杂性,我们使用Python的
asyncio.Queue模拟一个内存消息队列,聚焦于通用性能逻辑。 - 端到端追踪:为每条消息生成唯一追踪ID,并在生产、入队、出队、消费等关键节点打点记录时间戳与上下文。
- 可配置负载:通过命令行参数或配置文件,轻松调节生产者速率、消费者数量、消息大小等,以模拟不同压力场景。
- 实时分析与可视化:收集的追踪数据在内存中聚合,并输出关键百分位延迟(P50, P90, P99)、吞吐量时间序列等指标,通过控制台与Mermaid时序图直观展示。
2. 项目结构树
high_tps_mq_profiler/
├── config.yaml # 配置文件
├── main.py # 主程序入口
├── core/
│ ├── __init__.py
│ ├── models.py # 数据模型(消息、追踪Span)
│ ├── producer.py # 异步消息生产者
│ ├── queue.py # 模拟内存消息队列
│ ├── consumer.py # 异步消息消费者
│ └── tracer.py # 分布式追踪器
├── profiling/
│ ├── __init__.py
│ ├── analyzer.py # 性能数据分析器
│ └── reporter.py # 报告生成器(控制台、图表)
└── utils/
├── __init__.py
└── helpers.py # 辅助函数(如生成随机负载)
3. 核心代码实现
文件路径:config.yaml
# 系统全局配置
system:
total_messages: 100000 # 本次测试总消息数
message_size_bytes: 1024 # 每条消息的负载大小(字节)
# 生产者配置
producer:
rate_limit_tps: 120000 # 生产者目标TPS(用于限流)
batch_size: 50 # 批量发送大小(模拟优化)
# 消费者配置
consumer:
worker_count: 4 # 消费者并发worker数量
# 模拟消费逻辑的处理时间分布(毫秒)
process_time_ms:
mean: 10
stddev: 3
# 追踪与性能分析
profiling:
span_buffer_size: 10000 # 内存中缓存的追踪Span数量
report_interval_sec: 2 # 控制台报告间隔(秒)
文件路径:core/models.py
import time
import uuid
from dataclasses import dataclass, field, asdict
from typing import Dict, Any, Optional
from enum import Enum
class SpanType(str, Enum):
PRODUCE = "PRODUCE"
ENQUEUE = "ENQUEUE"
DEQUEUE = "DEQUEUE"
CONSUME = "CONSUME"
@dataclass
class Message:
"""消息实体"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
payload: bytes = b""
headers: Dict[str, str] = field(default_factory=dict)
trace_id: str = field(default_factory=lambda: str(uuid.uuid4()))
def to_dict(self) -> Dict[str, Any]:
return {"id": self.id, "payload_size": len(self.payload), "trace_id": self.trace_id}
@dataclass
class Span:
"""追踪链路中的一个跨度"""
trace_id: str
span_id: str = field(default_factory=lambda: str(uuid.uuid4()))
parent_span_id: Optional[str] = None
type: SpanType = SpanType.PRODUCE
operation: str = "" # 具体操作,如 "serialize", "network_send"
start_time_ns: int = field(default_factory=lambda: time.time_ns())
duration_ns: Optional[int] = None
tags: Dict[str, Any] = field(default_factory=dict)
def finish(self):
"""结束当前Span,记录持续时间"""
self.duration_ns = time.time_ns() - self.start_time_ns
def to_dict(self):
data = asdict(self)
data['type'] = self.type.value
return data
文件路径:core/tracer.py
import asyncio
from typing import List, Dict, Any
from .models import Span, SpanType
class Tracer:
"""简易分布式追踪器,收集并缓冲Span数据"""
def __init__(self, buffer_size: int = 10000):
self.buffer_size = buffer_size
self._spans: List[Span] = []
self._lock = asyncio.Lock()
async def record_span(self, span: Span):
"""异步记录一个Span。当缓冲区满时,丢弃最旧的Span。"""
async with self._lock:
if len(self._spans) >= self.buffer_size:
self._spans.pop(0) # 简单丢弃策略,生产环境应改用磁盘或网络发送
self._spans.append(span)
async def get_spans_by_type(self, span_type: SpanType) -> List[Span]:
"""按类型获取Span拷贝列表"""
async with self._lock:
return [s for s in self._spans if s.type == span_type]
async def get_all_spans(self) -> List[Span]:
"""获取所有Span的拷贝"""
async with self._lock:
return self._spans.copy()
def clear(self):
"""清空缓冲区(用于测试)"""
self._spans.clear()
文件路径:core/queue.py
import asyncio
import time
from typing import Optional
from .models import Message, Span, SpanType
from .tracer import Tracer
class AsyncMemoryQueue:
"""模拟的异步内存消息队列,注入追踪点"""
def __init__(self, tracer: Tracer, maxsize: int = 100000):
self._queue = asyncio.Queue(maxsize=maxsize)
self.tracer = tracer
self.metrics = {"enqueued": 0, "dequeued": 0}
async def put(self, message: Message, enqueue_span: Span):
"""消息入队,并记录入队Span"""
# 模拟入队操作(内存操作,但记录耗时)
enqueue_span.operation = "memory_enqueue"
await self._queue.put(message)
enqueue_span.finish()
await self.tracer.record_span(enqueue_span)
self.metrics["enqueued"] += 1
async def get(self) -> Optional[Message]:
"""消息出队,并记录出队Span"""
try:
dequeue_span = Span(trace_id="", type=SpanType.DEQUEUE, operation="memory_dequeue")
message = await asyncio.wait_for(self._queue.get(), timeout=0.001)
dequeue_span.finish()
dequeue_span.trace_id = message.trace_id
await self.tracer.record_span(dequeue_span)
self.metrics["dequeued"] += 1
return message
except asyncio.TimeoutError:
return None
def qsize(self) -> int:
return self._queue.qsize()
文件路径:core/producer.py
import asyncio
import time
from typing import List
from .models import Message, Span, SpanType
from .tracer import Tracer
from .queue import AsyncMemoryQueue
class Producer:
def __init__(self, queue: AsyncMemoryQueue, tracer: Tracer, rate_limit_tps: int, batch_size: int):
self.queue = queue
self.tracer = tracer
self.rate_limit_tps = rate_limit_tps
self.batch_size = batch_size
self._stop_event = asyncio.Event()
async def _produce_one(self, message: Message):
"""生产单条消息的核心流程:创建追踪链路 -> 序列化 -> 发送入队"""
# 1. 创建生产Span (模拟序列化)
produce_span = Span(trace_id=message.trace_id, type=SpanType.PRODUCE, operation="serialize")
# 模拟序列化耗时
await asyncio.sleep(0.0001)
produce_span.finish()
await self.tracer.record_span(produce_span)
# 2. 创建入队Span
enqueue_span = Span(
trace_id=message.trace_id,
type=SpanType.ENQUEUE,
operation="send_to_queue",
parent_span_id=produce_span.span_id
)
# 3. 调用队列入队
await self.queue.put(message, enqueue_span)
async def run(self, total_messages: int, message_size: int):
"""生产者主循环,控制速率并批量发送"""
messages_produced = 0
interval = 1.0 / self.rate_limit_tps if self.rate_limit_tps > 0 else 0
while messages_produced < total_messages and not self._stop_event.is_set():
batch_start = time.time()
# 创建一批消息
batch = [
Message(payload=b"x" * message_size)
for _ in range(min(self.batch_size, total_messages - messages_produced))
]
# 并发发送该批消息
tasks = [self._produce_one(msg) for msg in batch]
await asyncio.gather(*tasks, return_exceptions=True)
messages_produced += len(batch)
# 精确速率控制
batch_elapsed = time.time() - batch_start
target_time = len(batch) * interval
if target_time > batch_elapsed:
await asyncio.sleep(target_time - batch_elapsed)
print(f"[Producer] Finished. Produced {messages_produced} messages.")
def stop(self):
self._stop_event.set()
文件路径:core/consumer.py
import asyncio
import random
import time
from .models import Span, SpanType
from .tracer import Tracer
from .queue import AsyncMemoryQueue
class Consumer:
def __init__(self, consumer_id: int, queue: AsyncMemoryQueue, tracer: Tracer, process_time_mean_ms: float, process_time_stddev_ms: float):
self.id = consumer_id
self.queue = queue
self.tracer = tracer
self.process_time_mean_ns = process_time_mean_ms * 1_000_000
self.process_time_stddev_ns = process_time_stddev_ms * 1_000_000
self._stop_event = asyncio.Event()
self.messages_consumed = 0
def _simulate_work(self) -> int:
"""模拟消费逻辑的处理时间,服从正态分布"""
t = random.gauss(self.process_time_mean_ns, self.process_time_stddev_ns)
return max(int(t), 1000) # 至少1微秒
async def run(self):
"""消费者主循环:从队列拉取消息 -> 处理 -> 记录消费Span"""
while not self._stop_event.is_set():
message = await self.queue.get()
if message is None:
await asyncio.sleep(0.001) # 队列空,短暂休眠
continue
# 创建消费Span
consume_span = Span(
trace_id=message.trace_id,
type=SpanType.CONSUME,
operation="business_logic"
)
# 模拟处理耗时
work_ns = self._simulate_work()
await asyncio.sleep(work_ns / 1_000_000_000)
consume_span.finish()
consume_span.tags = {"work_simulated_ns": work_ns}
await self.tracer.record_span(consume_span)
self.messages_consumed += 1
def stop(self):
self._stop_event.set()
文件路径:profiling/analyzer.py
from typing import List, Dict, Any, Tuple
import numpy as np
from core.models import Span, SpanType
class PerformanceAnalyzer:
"""性能数据分析器,计算关键指标"""
@staticmethod
def calculate_latency_percentiles(spans: List[Span]) -> Dict[str, float]:
"""计算延迟百分位数(P50, P90, P95, P99),单位:毫秒"""
if not spans:
return {}
durations_ms = [s.duration_ns / 1_000_000 for s in spans if s.duration_ns]
percentiles = [50, 90, 95, 99]
results = {}
for p in percentiles:
results[f'p{p}'] = float(np.percentile(durations_ms, p))
results['avg'] = float(np.mean(durations_ms))
results['max'] = float(np.max(durations_ms))
return results
@staticmethod
def analyze_end_to_end_latency(spans_by_trace: Dict[str, List[Span]]) -> List[Dict[str, Any]]:
"""分析端到端延迟:找出一个Trace中 PRODUCE->ENQUEUE->DEQUEUE->CONSUME 的总耗时"""
e2e_results = []
for trace_id, span_list in spans_by_trace.items():
spans_by_type = {s.type: s for s in span_list if s.duration_ns}
if all(t in spans_by_type for t in [SpanType.PRODUCE, SpanType.CONSUME]):
total_ns = sum(s.duration_ns for s in spans_by_type.values())
e2e_results.append({
'trace_id': trace_id,
'latency_ms': total_ns / 1_000_000,
'span_count': len(span_list)
})
return e2e_results
@staticmethod
def group_spans_by_trace(spans: List[Span]) -> Dict[str, List[Span]]:
"""按trace_id对Span进行分组"""
grouped = {}
for span in spans:
grouped.setdefault(span.trace_id, []).append(span)
return grouped
文件路径:profiling/reporter.py
import asyncio
import time
from typing import Dict, Any
from core.tracer import Tracer
from core.queue import AsyncMemoryQueue
from core.models import SpanType
from .analyzer import PerformanceAnalyzer
class ProfilingReporter:
"""性能报告生成器,定期输出控制台报告和图表数据"""
def __init__(self, tracer: Tracer, queue: AsyncMemoryQueue, report_interval_sec: int):
self.tracer = tracer
self.queue = queue
self.interval = report_interval_sec
self._last_report_time = time.time()
self._last_enqueued = 0
self._last_dequeued = 0
async def run_console_reporter(self):
"""运行控制台实时报告循环"""
while True:
await asyncio.sleep(self.interval)
await self._print_report()
async def _print_report(self):
"""生成并打印一次性能报告"""
all_spans = await self.tracer.get_all_spans()
analyzer = PerformanceAnalyzer()
grouped_spans = analyzer.group_spans_by_trace(all_spans)
# 计算各阶段延迟
metrics = {}
for span_type in SpanType:
spans = await self.tracer.get_spans_by_type(span_type)
if spans:
metrics[span_type.value] = analyzer.calculate_latency_percentiles(spans)
# 计算吞吐量
current_time = time.time()
time_delta = current_time - self._last_report_time
current_enqueued = self.queue.metrics["enqueued"]
current_dequeued = self.queue.metrics["dequeued"]
enqueue_tps = (current_enqueued - self._last_enqueued) / time_delta if time_delta > 0 else 0
dequeue_tps = (current_dequeued - self._last_dequeued) / time_delta if time_delta > 0 else 0
print(f"\n--- Performance Report [{time.strftime('%H:%M:%S')}] ---")
print(f"Queue Size: {self.queue.qsize()} | Total Spans Traced: {len(all_spans)}")
print(f"Throughput - Enqueue: {enqueue_tps:.0f} msg/s, Dequeue: {dequeue_tps:.0f} msg/s")
for stage, lat in metrics.items():
print(f" {stage:10} | Avg: {lat['avg']:.3f}ms, P50: {lat['p50']:.3f}ms, P99: {lat['p99']:.3f}ms")
# 更新上次记录
self._last_report_time = current_time
self._last_enqueued = current_enqueued
self._last_dequeued = current_dequeued
def generate_mermaid_sequence_diagram(self, trace_id: str, spans: List[dict]) -> str:
"""为特定Trace生成Mermaid序列图"""
# 简化:假设spans是按时间排序的字典列表
mermaid_lines = ["sequenceDiagram", " participant P as Producer", " participant Q as MemoryQueue", " participant C as Consumer"]
for span in spans:
op = span.get('operation', '')
dur_ms = span.get('duration_ms', 0)
if span['type'] == 'PRODUCE':
mermaid_lines.append(f" P->>Q: {op} ({dur_ms:.2f}ms)")
elif span['type'] == 'ENQUEUE':
# ENQUEUE在队列内部,用Note表示
mermaid_lines.append(f" Note over Q: {op} ({dur_ms:.2f}ms)")
elif span['type'] == 'DEQUEUE':
mermaid_lines.append(f" Q->>C: dequeue ({dur_ms:.2f}ms)")
elif span['type'] == 'CONSUME':
mermaid_lines.append(f" C-->>C: {op} ({dur_ms:.2f}ms)")
return "\n".join(mermaid_lines)
文件路径:main.py
import asyncio
import yaml
import signal
from core.queue import AsyncMemoryQueue
from core.tracer import Tracer
from core.producer import Producer
from core.consumer import Consumer
from profiling.reporter import ProfilingReporter
class HighTPSProfiler:
"""主程序,组装所有组件并协调运行"""
def __init__(self, config_path: str = "config.yaml"):
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
# 初始化核心组件
self.tracer = Tracer(buffer_size=self.config['profiling']['span_buffer_size'])
self.queue = AsyncMemoryQueue(tracer=self.tracer)
# 初始化生产者
self.producer = Producer(
queue=self.queue,
tracer=self.tracer,
rate_limit_tps=self.config['producer']['rate_limit_tps'],
batch_size=self.config['producer']['batch_size']
)
# 初始化消费者池
self.consumers = [
Consumer(
consumer_id=i,
queue=self.queue,
tracer=self.tracer,
process_time_mean_ms=self.config['consumer']['process_time_ms']['mean'],
process_time_stddev_ms=self.config['consumer']['process_time_ms']['stddev']
)
for i in range(self.config['consumer']['worker_count'])
]
# 初始化报告器
self.reporter = ProfilingReporter(
tracer=self.tracer,
queue=self.queue,
report_interval_sec=self.config['profiling']['report_interval_sec']
)
self._tasks = []
async def run(self):
"""运行性能剖析主流程"""
print("🚀 Starting High TPS Message Queue Profiler...")
print(f" Target: {self.config['system']['total_messages']} messages @ ~{self.config['producer']['rate_limit_tps']} TPS")
print(f" Consumers: {self.config['consumer']['worker_count']} workers")
# 启动消费者
consumer_tasks = [asyncio.create_task(cons.run()) for cons in self.consumers]
self._tasks.extend(consumer_tasks)
# 启动控制台报告器
reporter_task = asyncio.create_task(self.reporter.run_console_reporter())
self._tasks.append(reporter_task)
# 运行生产者(会阻塞直到生产完指定数量消息)
try:
await self.producer.run(
total_messages=self.config['system']['total_messages'],
message_size=self.config['system']['message_size_bytes']
)
except asyncio.CancelledError:
print("\nProducer interrupted.")
# 生产者完成后,等待队列清空
print("\n🔄 Producer finished. Draining queue...")
while self.queue.qsize() > 0:
await asyncio.sleep(0.5)
print("✅ Queue drained.")
# 停止消费者和报告器
for cons in self.consumers:
cons.stop()
if consumer_tasks:
await asyncio.gather(*consumer_tasks, return_exceptions=True)
reporter_task.cancel()
# 生成最终分析报告
await self._generate_final_report()
async def _generate_final_report(self):
"""生成最终性能剖析报告,包含Mermaid图表"""
from profiling.analyzer import PerformanceAnalyzer
analyzer = PerformanceAnalyzer()
all_spans = await self.tracer.get_all_spans()
grouped = analyzer.group_spans_by_trace(all_spans)
# 1. 输出关键指标
print("\n" + "="*60)
print("FINAL PERFORMANCE ANALYSIS")
print("="*60)
for span_type in SpanType:
spans = await self.tracer.get_spans_by_type(span_type)
if spans:
lat = analyzer.calculate_latency_percentiles(spans)
print(f"\n[{span_type.value}] Latency (ms):")
print(f" Avg:{lat['avg']:7.2f} | P50:{lat['p50']:7.2f} | P90:{lat['p90']:7.2f} | P99:{lat['p99']:7.2f} | Max:{lat['max']:7.2f}")
# 2. 端到端延迟分析
e2e_list = analyzer.analyze_end_to_end_latency(grouped)
if e2e_list:
e2e_latencies = [item['latency_ms'] for item in e2e_list]
print(f"\n[End-to-End] Complete Traces: {len(e2e_list)}")
print(f" Avg Latency: {np.mean(e2e_latencies):.2f}ms, P99: {np.percentile(e2e_latencies, 99):.2f}ms")
# 3. 生成一个Mermaid序列图(示例)
print("\n" + "="*60)
print("EXAMPLE TRACE (Mermaid Sequence Diagram):")
print("="*60)
if e2e_list:
example_trace_id = e2e_list[0]['trace_id']
example_spans = grouped.get(example_trace_id, [])
# 转换为字典列表并计算毫秒时间
span_dicts = []
for s in example_spans:
sd = s.to_dict()
sd['duration_ms'] = s.duration_ns / 1_000_000 if s.duration_ns else 0
span_dicts.append(sd)
# 按开始时间排序
span_dicts.sort(key=lambda x: x['start_time_ns'])
mermaid_code = self.reporter.generate_mermaid_sequence_diagram(example_trace_id, span_dicts[:8]) # 限制前几个span
print(f"\n
mermaid\n{mermaid_code}\n
# 4. 生成瓶颈分析Mermaid流程图
print("\n" + "="*60)
print("BOTTLENECK ANALYSIS FLOWCHART:")
print("="*60)
await self._generate_bottleneck_flowchart()
async def _generate_bottleneck_flowchart(self):
"""生成瓶颈定位策略的Mermaid流程图"""
bottleneck_chart = """
mermaid
graph TD
A[十万级TPS性能问题] --> B{端到端延迟高?};
B -- 是 --> C[启用分布式追踪];
B -- 否 --> D[吞吐未达标?];
D -- 是 --> E[检查资源利用率];
C --> F[分析各阶段延迟P99];
F --> G{哪个阶段P99最高?};
G -->|生产/PRODUCE| H[瓶颈: 序列化/网络];
G -->|入队/ENQUEUE| I[瓶颈: 队列存储/持久化];
G -->|出队/DEQUEUE| J[瓶颈: 队列调度/锁竞争];
G -->|消费/CONSUME| K[瓶颈: 业务逻辑/DB/CPU];
H --> L[优化: 换用二进制协议<br/>调整批处理大小];
I --> M[优化: 检查磁盘IO<br/>调整刷盘策略];
J --> N[优化: 增加分区/分片<br/>无锁数据结构];
K --> O[优化: 异步化处理<br/>缓存/索引];
L & M & N & O --> P[重新压测验证];
P --> Q{指标是否达标?};
Q -- 是 --> R[✅ 瓶颈解除];
Q -- 否 --> S[回到步骤C深入分析];
"""
print(bottleneck_chart)
async def shutdown(self):
"""优雅关闭"""
print("\nShutting down...")
for task in self._tasks:
task.cancel()
await asyncio.gather(*self._tasks, return_exceptions=True)
def main():
profiler = HighTPSProfiler()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 信号处理
def signal_handler():
print("\nReceived interrupt signal.")
loop.create_task(profiler.shutdown())
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, signal_handler)
try:
loop.run_until_complete(profiler.run())
finally:
loop.close()
print("Profiler stopped.")
if __name__ == "__main__":
main()
4. 安装依赖与运行步骤
环境要求:Python 3.8+
- 创建项目目录并复制代码:
mkdir high_tps_mq_profiler && cd high_tps_mq_profiler
# 将上述所有代码文件按结构树创建并复制进去。
- 创建依赖文件
requirements.txt:
pyyaml>=6.0
numpy>=1.21.0
- 安装依赖:
pip install -r requirements.txt
- 运行性能剖析:
python main.py
程序将根据config.yaml的配置开始运行。您将在控制台看到实时的吞吐量报告和各阶段延迟指标。程序在完成指定数量的消息处理后,会自动停止并输出最终分析报告,包含示例追踪的Mermaid序列图和瓶颈定位流程图。
5. 测试与验证步骤
-
基础功能验证:运行上述命令,观察控制台输出是否正常,是否按配置生产了10万条消息,队列最终是否被清空。
-
压力测试调整:修改
config.yaml中的参数,观察系统行为变化。- 场景A(生产者瓶颈):将
producer.rate_limit_tps设置为200000,consumer.worker_count设置为1,consumer.process_time_ms.mean设置为50。预期现象:队列持续增长,消费延迟(CONSUME阶段)的P99显著升高。 - 场景B(消费者瓶颈):将
producer.rate_limit_tps设置为50000,consumer.worker_count设置为1,consumer.process_time_ms.mean设置为100。预期现象:生产者速率受限,消费TPS上不去,CONSUME阶段耗时占主导。 - 场景C(均衡):调整参数使生产速率略高于消费总能力(如:生产者12万TPS,4个消费者,每个平均处理时间10ms,理论消费能力约为10万TPS)。预期现象:队列缓慢增长,各阶段延迟相对均衡,P99延迟可能出现在入队或出队环节。
- 场景A(生产者瓶颈):将
-
数据验证:程序最终输出的"FINAL PERFORMANCE ANALYSIS"中,检查
Total Spans Traced数量是否大致等于总消息数 * 4 (个关键阶段)。验证端到端延迟是否大致等于各阶段延迟之和。 -
图表验证:检查最终输出的Mermaid图表代码块是否能被支持Mermaid的Markdown渲染器(如GitHub Wiki、某些博客平台)正确渲染为序列图和流程图。
通过运行此项目并调整参数,您可以直观地理解在高TPS下,压力如何在不同组件间传递,以及如何利用追踪数据快速定位到导致性能劣化的具体环节。项目代码作为一个基础框架,可以轻松扩展以集成真实的Kafka客户端,或对接Jaeger、Zipkin等开源追踪系统。