消息队列在十万级TPS场景下的端到端性能剖析与瓶颈定位

2900559190
2025年12月29日
更新于 2025年12月29日
3 次阅读
摘要:本文通过构建一个模拟的高吞吐消息处理系统,深入剖析在十万级TPS(每秒事务处理量)场景下,消息队列端到端链路中的潜在性能瓶颈。我们将实现一个包含生产者、内存队列、消费者及分布式追踪组件的完整可运行项目,并设计一套性能剖析框架,用于收集、分析各环节的耗时指标。通过可视化的追踪数据与关键性能指标(KPI)分析,文章将演示如何定位从消息发布、序列化、队列存储、反序列化到消费处理的完整链路上的性能热点,并...

摘要

本文通过构建一个模拟的高吞吐消息处理系统,深入剖析在十万级TPS(每秒事务处理量)场景下,消息队列端到端链路中的潜在性能瓶颈。我们将实现一个包含生产者、内存队列、消费者及分布式追踪组件的完整可运行项目,并设计一套性能剖析框架,用于收集、分析各环节的耗时指标。通过可视化的追踪数据与关键性能指标(KPI)分析,文章将演示如何定位从消息发布、序列化、队列存储、反序列化到消费处理的完整链路上的性能热点,并提供通用的瓶颈定位方法论与实践代码。

1. 项目概述:高TPS消息系统性能探针

在高并发微服务架构中,消息队列是解耦与削峰填谷的核心组件。当系统TPS达到十万级别时,任何细微的低效都会被放大,导致延迟飙升、吞吐下降。本项目旨在构建一个"性能探针"式系统,它模拟真实业务的消息生产与消费,并注入强大的可观测性,使我们能清晰地"看见"数据在流动过程中于何处减速。

核心设计思路

  1. 内存队列模拟:为避免引入Kafka/RocketMQ等具体中间件的复杂性,我们使用Python的asyncio.Queue模拟一个内存消息队列,聚焦于通用性能逻辑。
  2. 端到端追踪:为每条消息生成唯一追踪ID,并在生产、入队、出队、消费等关键节点打点记录时间戳与上下文。
  3. 可配置负载:通过命令行参数或配置文件,轻松调节生产者速率、消费者数量、消息大小等,以模拟不同压力场景。
  4. 实时分析与可视化:收集的追踪数据在内存中聚合,并输出关键百分位延迟(P50, P90, P99)、吞吐量时间序列等指标,通过控制台与Mermaid时序图直观展示。

2. 项目结构树

high_tps_mq_profiler/
├── config.yaml              # 配置文件
├── main.py                  # 主程序入口
├── core/
│   ├── __init__.py
│   ├── models.py           # 数据模型(消息、追踪Span)
│   ├── producer.py         # 异步消息生产者
│   ├── queue.py            # 模拟内存消息队列
│   ├── consumer.py         # 异步消息消费者
│   └── tracer.py           # 分布式追踪器
├── profiling/
│   ├── __init__.py
│   ├── analyzer.py         # 性能数据分析器
│   └── reporter.py         # 报告生成器(控制台、图表)
└── utils/
    ├── __init__.py
    └── helpers.py          # 辅助函数(如生成随机负载)

3. 核心代码实现

文件路径:config.yaml

# 系统全局配置
system:
  total_messages: 100000  # 本次测试总消息数
  message_size_bytes: 1024  # 每条消息的负载大小(字节)

# 生产者配置
producer:
  rate_limit_tps: 120000  # 生产者目标TPS(用于限流)
  batch_size: 50          # 批量发送大小(模拟优化)

# 消费者配置
consumer:
  worker_count: 4         # 消费者并发worker数量
  # 模拟消费逻辑的处理时间分布(毫秒)
  process_time_ms:
    mean: 10
    stddev: 3

# 追踪与性能分析
profiling:
  span_buffer_size: 10000 # 内存中缓存的追踪Span数量
  report_interval_sec: 2  # 控制台报告间隔(秒)

文件路径:core/models.py

import time
import uuid
from dataclasses import dataclass, field, asdict
from typing import Dict, Any, Optional
from enum import Enum

class SpanType(str, Enum):
    PRODUCE = "PRODUCE"
    ENQUEUE = "ENQUEUE"
    DEQUEUE = "DEQUEUE"
    CONSUME = "CONSUME"

@dataclass
class Message:
    """消息实体"""
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    payload: bytes = b""
    headers: Dict[str, str] = field(default_factory=dict)
    trace_id: str = field(default_factory=lambda: str(uuid.uuid4()))

    def to_dict(self) -> Dict[str, Any]:
        return {"id": self.id, "payload_size": len(self.payload), "trace_id": self.trace_id}

@dataclass
class Span:
    """追踪链路中的一个跨度"""
    trace_id: str
    span_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    parent_span_id: Optional[str] = None
    type: SpanType = SpanType.PRODUCE
    operation: str = ""  # 具体操作,如 "serialize", "network_send"
    start_time_ns: int = field(default_factory=lambda: time.time_ns())
    duration_ns: Optional[int] = None
    tags: Dict[str, Any] = field(default_factory=dict)

    def finish(self):
        """结束当前Span,记录持续时间"""
        self.duration_ns = time.time_ns() - self.start_time_ns

    def to_dict(self):
        data = asdict(self)
        data['type'] = self.type.value
        return data

文件路径:core/tracer.py

import asyncio
from typing import List, Dict, Any
from .models import Span, SpanType

class Tracer:
    """简易分布式追踪器,收集并缓冲Span数据"""
    def __init__(self, buffer_size: int = 10000):
        self.buffer_size = buffer_size
        self._spans: List[Span] = []
        self._lock = asyncio.Lock()

    async def record_span(self, span: Span):
        """异步记录一个Span。当缓冲区满时,丢弃最旧的Span。"""
        async with self._lock:
            if len(self._spans) >= self.buffer_size:
                self._spans.pop(0)  # 简单丢弃策略,生产环境应改用磁盘或网络发送
            self._spans.append(span)

    async def get_spans_by_type(self, span_type: SpanType) -> List[Span]:
        """按类型获取Span拷贝列表"""
        async with self._lock:
            return [s for s in self._spans if s.type == span_type]

    async def get_all_spans(self) -> List[Span]:
        """获取所有Span的拷贝"""
        async with self._lock:
            return self._spans.copy()

    def clear(self):
        """清空缓冲区(用于测试)"""
        self._spans.clear()

文件路径:core/queue.py

import asyncio
import time
from typing import Optional
from .models import Message, Span, SpanType
from .tracer import Tracer

class AsyncMemoryQueue:
    """模拟的异步内存消息队列,注入追踪点"""
    def __init__(self, tracer: Tracer, maxsize: int = 100000):
        self._queue = asyncio.Queue(maxsize=maxsize)
        self.tracer = tracer
        self.metrics = {"enqueued": 0, "dequeued": 0}

    async def put(self, message: Message, enqueue_span: Span):
        """消息入队,并记录入队Span"""
        # 模拟入队操作(内存操作,但记录耗时)
        enqueue_span.operation = "memory_enqueue"
        await self._queue.put(message)
        enqueue_span.finish()
        await self.tracer.record_span(enqueue_span)
        self.metrics["enqueued"] += 1

    async def get(self) -> Optional[Message]:
        """消息出队,并记录出队Span"""
        try:
            dequeue_span = Span(trace_id="", type=SpanType.DEQUEUE, operation="memory_dequeue")
            message = await asyncio.wait_for(self._queue.get(), timeout=0.001)
            dequeue_span.finish()
            dequeue_span.trace_id = message.trace_id
            await self.tracer.record_span(dequeue_span)
            self.metrics["dequeued"] += 1
            return message
        except asyncio.TimeoutError:
            return None

    def qsize(self) -> int:
        return self._queue.qsize()

文件路径:core/producer.py

import asyncio
import time
from typing import List
from .models import Message, Span, SpanType
from .tracer import Tracer
from .queue import AsyncMemoryQueue

class Producer:
    def __init__(self, queue: AsyncMemoryQueue, tracer: Tracer, rate_limit_tps: int, batch_size: int):
        self.queue = queue
        self.tracer = tracer
        self.rate_limit_tps = rate_limit_tps
        self.batch_size = batch_size
        self._stop_event = asyncio.Event()

    async def _produce_one(self, message: Message):
        """生产单条消息的核心流程:创建追踪链路 -> 序列化 -> 发送入队"""
        # 1. 创建生产Span (模拟序列化)
        produce_span = Span(trace_id=message.trace_id, type=SpanType.PRODUCE, operation="serialize")
        # 模拟序列化耗时
        await asyncio.sleep(0.0001)
        produce_span.finish()
        await self.tracer.record_span(produce_span)

        # 2. 创建入队Span
        enqueue_span = Span(
            trace_id=message.trace_id,
            type=SpanType.ENQUEUE,
            operation="send_to_queue",
            parent_span_id=produce_span.span_id
        )
        # 3. 调用队列入队
        await self.queue.put(message, enqueue_span)

    async def run(self, total_messages: int, message_size: int):
        """生产者主循环,控制速率并批量发送"""
        messages_produced = 0
        interval = 1.0 / self.rate_limit_tps if self.rate_limit_tps > 0 else 0

        while messages_produced < total_messages and not self._stop_event.is_set():
            batch_start = time.time()
            # 创建一批消息
            batch = [
                Message(payload=b"x" * message_size)
                for _ in range(min(self.batch_size, total_messages - messages_produced))
            ]
            # 并发发送该批消息
            tasks = [self._produce_one(msg) for msg in batch]
            await asyncio.gather(*tasks, return_exceptions=True)

            messages_produced += len(batch)
            # 精确速率控制
            batch_elapsed = time.time() - batch_start
            target_time = len(batch) * interval
            if target_time > batch_elapsed:
                await asyncio.sleep(target_time - batch_elapsed)

        print(f"[Producer] Finished. Produced {messages_produced} messages.")

    def stop(self):
        self._stop_event.set()

文件路径:core/consumer.py

import asyncio
import random
import time
from .models import Span, SpanType
from .tracer import Tracer
from .queue import AsyncMemoryQueue

class Consumer:
    def __init__(self, consumer_id: int, queue: AsyncMemoryQueue, tracer: Tracer, process_time_mean_ms: float, process_time_stddev_ms: float):
        self.id = consumer_id
        self.queue = queue
        self.tracer = tracer
        self.process_time_mean_ns = process_time_mean_ms * 1_000_000
        self.process_time_stddev_ns = process_time_stddev_ms * 1_000_000
        self._stop_event = asyncio.Event()
        self.messages_consumed = 0

    def _simulate_work(self) -> int:
        """模拟消费逻辑的处理时间,服从正态分布"""
        t = random.gauss(self.process_time_mean_ns, self.process_time_stddev_ns)
        return max(int(t), 1000)  # 至少1微秒

    async def run(self):
        """消费者主循环:从队列拉取消息 -> 处理 -> 记录消费Span"""
        while not self._stop_event.is_set():
            message = await self.queue.get()
            if message is None:
                await asyncio.sleep(0.001)  # 队列空,短暂休眠
                continue

            # 创建消费Span
            consume_span = Span(
                trace_id=message.trace_id,
                type=SpanType.CONSUME,
                operation="business_logic"
            )
            # 模拟处理耗时
            work_ns = self._simulate_work()
            await asyncio.sleep(work_ns / 1_000_000_000)
            consume_span.finish()
            consume_span.tags = {"work_simulated_ns": work_ns}
            await self.tracer.record_span(consume_span)

            self.messages_consumed += 1

    def stop(self):
        self._stop_event.set()

文件路径:profiling/analyzer.py

from typing import List, Dict, Any, Tuple
import numpy as np
from core.models import Span, SpanType

class PerformanceAnalyzer:
    """性能数据分析器,计算关键指标"""
    @staticmethod
    def calculate_latency_percentiles(spans: List[Span]) -> Dict[str, float]:
        """计算延迟百分位数(P50, P90, P95, P99),单位:毫秒"""
        if not spans:
            return {}
        durations_ms = [s.duration_ns / 1_000_000 for s in spans if s.duration_ns]
        percentiles = [50, 90, 95, 99]
        results = {}
        for p in percentiles:
            results[f'p{p}'] = float(np.percentile(durations_ms, p))
        results['avg'] = float(np.mean(durations_ms))
        results['max'] = float(np.max(durations_ms))
        return results

    @staticmethod
    def analyze_end_to_end_latency(spans_by_trace: Dict[str, List[Span]]) -> List[Dict[str, Any]]:
        """分析端到端延迟:找出一个Trace中 PRODUCE->ENQUEUE->DEQUEUE->CONSUME 的总耗时"""
        e2e_results = []
        for trace_id, span_list in spans_by_trace.items():
            spans_by_type = {s.type: s for s in span_list if s.duration_ns}
            if all(t in spans_by_type for t in [SpanType.PRODUCE, SpanType.CONSUME]):
                total_ns = sum(s.duration_ns for s in spans_by_type.values())
                e2e_results.append({
                    'trace_id': trace_id,
                    'latency_ms': total_ns / 1_000_000,
                    'span_count': len(span_list)
                })
        return e2e_results

    @staticmethod
    def group_spans_by_trace(spans: List[Span]) -> Dict[str, List[Span]]:
        """按trace_id对Span进行分组"""
        grouped = {}
        for span in spans:
            grouped.setdefault(span.trace_id, []).append(span)
        return grouped

文件路径:profiling/reporter.py

import asyncio
import time
from typing import Dict, Any
from core.tracer import Tracer
from core.queue import AsyncMemoryQueue
from core.models import SpanType
from .analyzer import PerformanceAnalyzer

class ProfilingReporter:
    """性能报告生成器,定期输出控制台报告和图表数据"""
    def __init__(self, tracer: Tracer, queue: AsyncMemoryQueue, report_interval_sec: int):
        self.tracer = tracer
        self.queue = queue
        self.interval = report_interval_sec
        self._last_report_time = time.time()
        self._last_enqueued = 0
        self._last_dequeued = 0

    async def run_console_reporter(self):
        """运行控制台实时报告循环"""
        while True:
            await asyncio.sleep(self.interval)
            await self._print_report()

    async def _print_report(self):
        """生成并打印一次性能报告"""
        all_spans = await self.tracer.get_all_spans()
        analyzer = PerformanceAnalyzer()
        grouped_spans = analyzer.group_spans_by_trace(all_spans)

        # 计算各阶段延迟
        metrics = {}
        for span_type in SpanType:
            spans = await self.tracer.get_spans_by_type(span_type)
            if spans:
                metrics[span_type.value] = analyzer.calculate_latency_percentiles(spans)

        # 计算吞吐量
        current_time = time.time()
        time_delta = current_time - self._last_report_time
        current_enqueued = self.queue.metrics["enqueued"]
        current_dequeued = self.queue.metrics["dequeued"]
        enqueue_tps = (current_enqueued - self._last_enqueued) / time_delta if time_delta > 0 else 0
        dequeue_tps = (current_dequeued - self._last_dequeued) / time_delta if time_delta > 0 else 0

        print(f"\n--- Performance Report [{time.strftime('%H:%M:%S')}] ---")
        print(f"Queue Size: {self.queue.qsize()} | Total Spans Traced: {len(all_spans)}")
        print(f"Throughput - Enqueue: {enqueue_tps:.0f} msg/s, Dequeue: {dequeue_tps:.0f} msg/s")
        for stage, lat in metrics.items():
            print(f"  {stage:10} | Avg: {lat['avg']:.3f}ms, P50: {lat['p50']:.3f}ms, P99: {lat['p99']:.3f}ms")

        # 更新上次记录
        self._last_report_time = current_time
        self._last_enqueued = current_enqueued
        self._last_dequeued = current_dequeued

    def generate_mermaid_sequence_diagram(self, trace_id: str, spans: List[dict]) -> str:
        """为特定Trace生成Mermaid序列图"""
        # 简化:假设spans是按时间排序的字典列表
        mermaid_lines = ["sequenceDiagram", "    participant P as Producer", "    participant Q as MemoryQueue", "    participant C as Consumer"]
        for span in spans:
            op = span.get('operation', '')
            dur_ms = span.get('duration_ms', 0)
            if span['type'] == 'PRODUCE':
                mermaid_lines.append(f"    P->>Q: {op} ({dur_ms:.2f}ms)")
            elif span['type'] == 'ENQUEUE':
                # ENQUEUE在队列内部,用Note表示
                mermaid_lines.append(f"    Note over Q: {op} ({dur_ms:.2f}ms)")
            elif span['type'] == 'DEQUEUE':
                mermaid_lines.append(f"    Q->>C: dequeue ({dur_ms:.2f}ms)")
            elif span['type'] == 'CONSUME':
                mermaid_lines.append(f"    C-->>C: {op} ({dur_ms:.2f}ms)")
        return "\n".join(mermaid_lines)

文件路径:main.py

import asyncio
import yaml
import signal
from core.queue import AsyncMemoryQueue
from core.tracer import Tracer
from core.producer import Producer
from core.consumer import Consumer
from profiling.reporter import ProfilingReporter

class HighTPSProfiler:
    """主程序,组装所有组件并协调运行"""
    def __init__(self, config_path: str = "config.yaml"):
        with open(config_path, 'r') as f:
            self.config = yaml.safe_load(f)
        # 初始化核心组件
        self.tracer = Tracer(buffer_size=self.config['profiling']['span_buffer_size'])
        self.queue = AsyncMemoryQueue(tracer=self.tracer)
        # 初始化生产者
        self.producer = Producer(
            queue=self.queue,
            tracer=self.tracer,
            rate_limit_tps=self.config['producer']['rate_limit_tps'],
            batch_size=self.config['producer']['batch_size']
        )
        # 初始化消费者池
        self.consumers = [
            Consumer(
                consumer_id=i,
                queue=self.queue,
                tracer=self.tracer,
                process_time_mean_ms=self.config['consumer']['process_time_ms']['mean'],
                process_time_stddev_ms=self.config['consumer']['process_time_ms']['stddev']
            )
            for i in range(self.config['consumer']['worker_count'])
        ]
        # 初始化报告器
        self.reporter = ProfilingReporter(
            tracer=self.tracer,
            queue=self.queue,
            report_interval_sec=self.config['profiling']['report_interval_sec']
        )
        self._tasks = []

    async def run(self):
        """运行性能剖析主流程"""
        print("🚀 Starting High TPS Message Queue Profiler...")
        print(f"   Target: {self.config['system']['total_messages']} messages @ ~{self.config['producer']['rate_limit_tps']} TPS")
        print(f"   Consumers: {self.config['consumer']['worker_count']} workers")

        # 启动消费者
        consumer_tasks = [asyncio.create_task(cons.run()) for cons in self.consumers]
        self._tasks.extend(consumer_tasks)
        # 启动控制台报告器
        reporter_task = asyncio.create_task(self.reporter.run_console_reporter())
        self._tasks.append(reporter_task)

        # 运行生产者(会阻塞直到生产完指定数量消息)
        try:
            await self.producer.run(
                total_messages=self.config['system']['total_messages'],
                message_size=self.config['system']['message_size_bytes']
            )
        except asyncio.CancelledError:
            print("\nProducer interrupted.")

        # 生产者完成后,等待队列清空
        print("\n🔄 Producer finished. Draining queue...")
        while self.queue.qsize() > 0:
            await asyncio.sleep(0.5)
        print("✅ Queue drained.")

        # 停止消费者和报告器
        for cons in self.consumers:
            cons.stop()
        if consumer_tasks:
            await asyncio.gather(*consumer_tasks, return_exceptions=True)
        reporter_task.cancel()

        # 生成最终分析报告
        await self._generate_final_report()

    async def _generate_final_report(self):
        """生成最终性能剖析报告,包含Mermaid图表"""
        from profiling.analyzer import PerformanceAnalyzer
        analyzer = PerformanceAnalyzer()
        all_spans = await self.tracer.get_all_spans()
        grouped = analyzer.group_spans_by_trace(all_spans)

        # 1. 输出关键指标
        print("\n" + "="*60)
        print("FINAL PERFORMANCE ANALYSIS")
        print("="*60)
        for span_type in SpanType:
            spans = await self.tracer.get_spans_by_type(span_type)
            if spans:
                lat = analyzer.calculate_latency_percentiles(spans)
                print(f"\n[{span_type.value}] Latency (ms):")
                print(f"  Avg:{lat['avg']:7.2f} | P50:{lat['p50']:7.2f} | P90:{lat['p90']:7.2f} | P99:{lat['p99']:7.2f} | Max:{lat['max']:7.2f}")

        # 2. 端到端延迟分析
        e2e_list = analyzer.analyze_end_to_end_latency(grouped)
        if e2e_list:
            e2e_latencies = [item['latency_ms'] for item in e2e_list]
            print(f"\n[End-to-End] Complete Traces: {len(e2e_list)}")
            print(f"  Avg Latency: {np.mean(e2e_latencies):.2f}ms, P99: {np.percentile(e2e_latencies, 99):.2f}ms")

        # 3. 生成一个Mermaid序列图(示例)
        print("\n" + "="*60)
        print("EXAMPLE TRACE (Mermaid Sequence Diagram):")
        print("="*60)
        if e2e_list:
            example_trace_id = e2e_list[0]['trace_id']
            example_spans = grouped.get(example_trace_id, [])
            # 转换为字典列表并计算毫秒时间
            span_dicts = []
            for s in example_spans:
                sd = s.to_dict()
                sd['duration_ms'] = s.duration_ns / 1_000_000 if s.duration_ns else 0
                span_dicts.append(sd)
            # 按开始时间排序
            span_dicts.sort(key=lambda x: x['start_time_ns'])
            mermaid_code = self.reporter.generate_mermaid_sequence_diagram(example_trace_id, span_dicts[:8]) # 限制前几个span
            print(f"\n

mermaid\n{mermaid_code}\n

# 4. 生成瓶颈分析Mermaid流程图
        print("\n" + "="*60)
        print("BOTTLENECK ANALYSIS FLOWCHART:")
        print("="*60)
        await self._generate_bottleneck_flowchart()

    async def _generate_bottleneck_flowchart(self):
        """生成瓶颈定位策略的Mermaid流程图"""
        bottleneck_chart = """

mermaid
graph TD
A[十万级TPS性能问题] --> B{端到端延迟高?};
B -- 是 --> C[启用分布式追踪];
B -- 否 --> D[吞吐未达标?];
D -- 是 --> E[检查资源利用率];

C --> F[分析各阶段延迟P99];
F --> G{哪个阶段P99最高?};

G -->|生产/PRODUCE| H[瓶颈: 序列化/网络];
G -->|入队/ENQUEUE| I[瓶颈: 队列存储/持久化];
G -->|出队/DEQUEUE| J[瓶颈: 队列调度/锁竞争];
G -->|消费/CONSUME| K[瓶颈: 业务逻辑/DB/CPU];

H --> L[优化: 换用二进制协议<br/>调整批处理大小];
I --> M[优化: 检查磁盘IO<br/>调整刷盘策略];
J --> N[优化: 增加分区/分片<br/>无锁数据结构];
K --> O[优化: 异步化处理<br/>缓存/索引];

L & M & N & O --> P[重新压测验证];
P --> Q{指标是否达标?};
Q -- 是 --> R[✅ 瓶颈解除];
Q -- 否 --> S[回到步骤C深入分析];
"""
        print(bottleneck_chart)

    async def shutdown(self):
        """优雅关闭"""
        print("\nShutting down...")
        for task in self._tasks:
            task.cancel()
        await asyncio.gather(*self._tasks, return_exceptions=True)

def main():
    profiler = HighTPSProfiler()
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    # 信号处理
    def signal_handler():
        print("\nReceived interrupt signal.")
        loop.create_task(profiler.shutdown())

    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, signal_handler)

    try:
        loop.run_until_complete(profiler.run())
    finally:
        loop.close()
        print("Profiler stopped.")

if __name__ == "__main__":
    main()

4. 安装依赖与运行步骤

环境要求:Python 3.8+

  1. 创建项目目录并复制代码
mkdir high_tps_mq_profiler && cd high_tps_mq_profiler
    # 将上述所有代码文件按结构树创建并复制进去。
  1. 创建依赖文件 requirements.txt
pyyaml>=6.0
    numpy>=1.21.0
  1. 安装依赖
pip install -r requirements.txt
  1. 运行性能剖析
python main.py

程序将根据config.yaml的配置开始运行。您将在控制台看到实时的吞吐量报告和各阶段延迟指标。程序在完成指定数量的消息处理后,会自动停止并输出最终分析报告,包含示例追踪的Mermaid序列图和瓶颈定位流程图。

5. 测试与验证步骤

  1. 基础功能验证:运行上述命令,观察控制台输出是否正常,是否按配置生产了10万条消息,队列最终是否被清空。

  2. 压力测试调整:修改config.yaml中的参数,观察系统行为变化。

    • 场景A(生产者瓶颈):将producer.rate_limit_tps设置为200000,consumer.worker_count设置为1,consumer.process_time_ms.mean设置为50。预期现象:队列持续增长,消费延迟(CONSUME阶段)的P99显著升高。
    • 场景B(消费者瓶颈):将producer.rate_limit_tps设置为50000,consumer.worker_count设置为1,consumer.process_time_ms.mean设置为100。预期现象:生产者速率受限,消费TPS上不去,CONSUME阶段耗时占主导。
    • 场景C(均衡):调整参数使生产速率略高于消费总能力(如:生产者12万TPS,4个消费者,每个平均处理时间10ms,理论消费能力约为10万TPS)。预期现象:队列缓慢增长,各阶段延迟相对均衡,P99延迟可能出现在入队或出队环节。
  3. 数据验证:程序最终输出的"FINAL PERFORMANCE ANALYSIS"中,检查Total Spans Traced数量是否大致等于 总消息数 * 4 (个关键阶段)。验证端到端延迟是否大致等于各阶段延迟之和。

  4. 图表验证:检查最终输出的Mermaid图表代码块是否能被支持Mermaid的Markdown渲染器(如GitHub Wiki、某些博客平台)正确渲染为序列图和流程图。

通过运行此项目并调整参数,您可以直观地理解在高TPS下,压力如何在不同组件间传递,以及如何利用追踪数据快速定位到导致性能劣化的具体环节。项目代码作为一个基础框架,可以轻松扩展以集成真实的Kafka客户端,或对接Jaeger、Zipkin等开源追踪系统。