持续交付落地可观测性体系:架构分层与关键抽象

2900559190
2026年01月06日
更新于 2026年02月04日
30 次阅读
摘要:本文深入探讨了在现代软件工程中,如何将可观测性(Observability)深度融入持续交付(Continuous Delivery)体系。我们提出了一套清晰的四层架构(采集层、抽象层、聚合层、应用层)与关键抽象(如Pipeline、Stage、Metric、Span),并以此构建了一个名为"ObsCD"的完整、可运行演示项目。该项目模拟了一个简化的CI/CD管道,通过具体的代码实现,展示了如何自...

摘要

本文深入探讨了在现代软件工程中,如何将可观测性(Observability)深度融入持续交付(Continuous Delivery)体系。我们提出了一套清晰的四层架构(采集层、抽象层、聚合层、应用层)与关键抽象(如Pipeline、Stage、Metric、Span),并以此构建了一个名为"ObsCD"的完整、可运行演示项目。该项目模拟了一个简化的CI/CD管道,通过具体的代码实现,展示了如何自动化收集管道执行过程中的日志、指标与分布式追踪数据,并利用Grafana进行可视化展示,最终实现发布过程的透明化、异常的快速定位与交付效率的量化分析。

1 项目概述:ObsCD - 一个可观测的持续交付模拟器

在追求快速、可靠软件交付的今天,持续交付(CD)管道已成为研发基础设施的核心。然而,一个常见的痛点是:管道本身常作为一个"黑盒"运行。我们能看到"成功"或"失败"的最终状态,但对于"为什么失败?"、"构建阶段为何耗时激增?"、"本次发布引入了多少新的错误日志?"等问题,往往缺乏系统性的数据支撑与深入洞察的能力。这正是可观测性需要介入的地方。

可观测性(Observability)不仅仅是对生产系统的监控,它更是一种通过系统外部输出(如日志、指标、追踪)来理解其内部状态的能力。将这种能力赋予CI/CD管道,我们称之为"Pipeline Observability"。

本项目 ObsCD (Observable Continuous Delivery) 旨在通过一个轻量级、可运行的项目骨架,演示如何落地一套与持续交付流程紧密结合的可观测性体系。其核心设计思路如下:

  1. 架构分层:我们将系统划分为清晰的四层,确保职责分离与良好的扩展性。
  2. 关键抽象:定义如PipelineMetricSpan等核心模型,作为连接持续交付流程与可观测性数据的桥梁。
  3. 插件化:采集、导出等组件采用插件模式,便于替换和扩展(例如,从控制台日志切换到ELK,从Prometheus切换到Datadog)。
  4. 模拟与真实:项目包含一个模拟的CI/CD管道执行引擎,能够产生接近真实的管道事件与数据,同时集成了真实的观测数据收集与可视化工具(如Prometheus, Jaeger, Grafana)。

通过运行本项目,您可以直观地看到一次模拟的代码提交如何触发管道,并在Grafana看板上实时呈现出各阶段的耗时、成功率等指标,以及通过Jaeger查看详细的分布式追踪链。

2 系统架构与关键抽象

2.1 四层架构设计

我们的可观测性体系围绕CI/CD管道构建,分为以下四个逻辑层次:

graph TB subgraph "应用层 (Application)" A1["Grafana 仪表盘"] A2["告警管理器 (Alertmanager)"] A3["CI/CD 平台 UI (如Jenkins, GitLab)"] end subgraph "聚合层 (Aggregation)" B1["Prometheus (指标)"] B2["Loki (日志)"] B3["Jaeger (追踪)"] end subgraph "抽象层 (Abstraction)" C1["Pipeline 抽象"] C2["Metric 抽象"] C3["Span 抽象"] C4["Log 抽象"] C1 -- "发射" --> C2 C1 -- "创建" --> C3 C1 -- "产生" --> C4 end subgraph "采集层 (Collection)" D1["CI/CD 模拟器 (ObsCD Core)"] D2["应用/服务 Instrumentation"] end D1 --> C1 D2 --> C4 C2 -- "推送" --> B1 C3 -- "上报" --> B3 C4 -- "发送" --> B2 B1 --> A1 B2 --> A1 B3 --> A1 B1 --> A2
  • 采集层:负责产生原始可观测性数据。在本项目中,核心是ObsCD的管道模拟器,它在执行每个任务(如构建、测试)时,会自动记录日志、生成性能指标、创建分布式追踪跨度。在真实场景中,这一层还包括已植入观测代码的业务应用本身。
  • 抽象层:这是连接业务逻辑(CD管道)与观测数据模型的关键。它定义了PipelineRunStageJob等领域对象,并规定了它们如何转换为标准的指标(如pipeline_duration_seconds)、追踪(Span)和结构化日志。这一层确保了观测数据具有一致的、富含业务语义的标签(如pipeline_id, stage_name, git_branch)。
  • 聚合层:负责接收、存储和索引来自抽象层的海量数据。我们选用云原生观测领域的经典组合:Prometheus(时间序列指标)、Loki(日志聚合)、Jaeger(分布式追踪)。它们提供了高效的查询能力。
  • 应用层:面向用户的部分。Grafana作为统一的可视化平台,从聚合层查询数据,绘制出关于管道健康度、效率、资源消耗等全方位的仪表盘。此外,告警规则可以基于Prometheus数据定义,并通过Alertmanager通知给相关人员。

2.2 核心领域模型(关键抽象)

  1. Pipeline & Run:一个Pipeline定义了固定的阶段序列(如Build, Test, Deploy-Staging, E2E-Test, Deploy-Prod)。每次代码提交或手动触发都会创建一个PipelineRun实例,它包含本次运行的所有上下文(如提交ID、分支、发起人)和状态。
  2. Stage & Job:一个Stage(阶段)包含一个或多个Job(任务)。Job是最小的可执行单元,例如"运行单元测试套件"、"构建Docker镜像"。每个Job的执行都会被一个追踪Span覆盖,并记录其开始/结束时间、结果状态。
  3. Observability Primitives:
    • Metric:我们定义了CounterGaugeHistogram等类型,用于量化度量。例如:cicd_pipeline_duration_seconds(直方图),cicd_job_total(计数器,用status标签区分成功/失败)。
    • Span:代表一个工作单元。一个PipelineRun是一个Trace,下面的每个StageJob都是其中的Span,形成层级关系,清晰展示时间消耗在何处。
    • Structured Log:每条日志都是JSON格式,包含统一的时间戳、级别、消息体,以及丰富的上下文字段(如pipeline_run_id, job_name),便于在Loki中通过标签进行高效筛选。

3 项目结构树

obscd/
├── pyproject.toml               # 项目依赖与配置 (Poetry)
├── docker-compose.yml           # 启动聚合层组件 (Prometheus, Loki, Jaeger, Grafana)
├── config/
│   ├── __init__.py
│   └── observability_config.py  # 观测性客户端配置中心
├── core/
│   ├── __init__.py
│   ├── pipeline.py              # Pipeline, Stage, Job 核心领域模型
│   └── simulator.py             # 模拟管道执行引擎
├── plugins/
│   ├── __init__.py
│   ├── metrics/
│   │   ├── __init__.py
│   │   ├── collector.py         # 指标收集器抽象与实现
│   │   └── prometheus_client.py # Prometheus 客户端包装
│   ├── tracing/
│   │   ├── __init__.py
│   │   └── jaeger_client.py     # Jaeger 客户端包装
│   └── logging/
│       ├── __init__.py
│       └── loki_logger.py       # 结构化日志处理器
├── dashboards/                  # Grafana 仪表盘 JSON 定义
│   └── pipeline_observability.json
└── run.py                       # 项目主入口,启动模拟器

4 核心代码实现

文件路径:config/observability_config.py

此文件是所有可观测性客户端配置的单点入口,采用单例模式。

import os
from dataclasses import dataclass
from typing import Optional

@dataclass
class TracingConfig:
    enabled: bool = True
    agent_host: str = "localhost"
    agent_port: int = 6831
    service_name: str = "obscd-pipeline"

@dataclass
class MetricsConfig:
    enabled: bool = True
    # Prometheus推送网关地址(如果使用Pushgateway模式),本例使用Prometheus直接拉取
    prometheus_port: int = 8000  # 模拟器暴露/metrics端口的端口

@dataclass
class LoggingConfig:
    enabled: bool = True
    loki_url: str = "http://localhost:3100/loki/api/v1/push"
    level: str = "INFO"
    # 额外的静态标签,会被附加到所有日志流
    static_labels: dict = None

    def __post_init__(self):
        if self.static_labels is None:
            self.static_labels = {"app": "obscd", "component": "core"}

class ObservabilityConfig:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._load_from_env()
        return cls._instance

    def _load_from_env(self):
        self.tracing = TracingConfig(
            enabled=os.getenv("TRACING_ENABLED", "true").lower() == "true",
            agent_host=os.getenv("JAEGER_AGENT_HOST", "localhost"),
            agent_port=int(os.getenv("JAEGER_AGENT_PORT", 6831)),
            service_name=os.getenv("SERVICE_NAME", "obscd-pipeline")
        )
        self.metrics = MetricsConfig(
            enabled=os.getenv("METRICS_ENABLED", "true").lower() == "true",
            prometheus_port=int(os.getenv("METRICS_PORT", 8000))
        )
        self.logging = LoggingConfig(
            enabled=os.getenv("LOGGING_ENABLED", "true").lower() == "true",
            loki_url=os.getenv("LOKI_URL", "http://localhost:3100/loki/api/v1/push"),
            level=os.getenv("LOG_LEVEL", "INFO"),
            static_labels={
                "app": "obscd",
                "component": "core",
                "env": os.getenv("ENV", "development")
            }
        )

# 全局配置访问点
CONFIG = ObservabilityConfig()

文件路径:core/pipeline.py

定义核心领域模型。这里是"关键抽象"的代码体现。

import time
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import List, Dict, Any, Optional

class JobStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class Job:
    """最小的可执行任务单元,例如‘运行单元测试'、‘构建镜像'"""
    name: str
    image: str = "alpine:latest"  # 模拟任务执行的容器镜像
    commands: List[str] = field(default_factory=list)  # 模拟执行的命令
    status: JobStatus = JobStatus.PENDING
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    logs: List[str] = field(default_factory=list)  # 模拟任务输出日志
    metadata: Dict[str, Any] = field(default_factory=dict)  # 扩展元数据

    def execute(self, context: Dict):
        """模拟任务执行。真实场景中会调用K8s Job、Tekton Task等。"""
        self.status = JobStatus.RUNNING
        self.start_time = datetime.utcnow()
        # 模拟执行时间,通常0.5-2秒
        time.sleep(max(0.5, min(2.0, len(self.commands) * 0.3)))
        # 模拟90%的成功率
        import random
        if random.random() < 0.9:
            self.status = JobStatus.SUCCESS
            self.logs.append(f"Job '{self.name}' completed successfully.")
        else:
            self.status = JobStatus.FAILED
            self.logs.append(f"Job '{self.name}' failed with simulated error.")
        self.end_time = datetime.utcnow()

@dataclass
class Stage:
    """管道阶段,包含一个或多个Job,例如‘测试阶段'"""
    name: str
    jobs: List[Job]
    run_after: List[str] = field(default_factory=list)  # 依赖的阶段名
    status: JobStatus = JobStatus.PENDING

@dataclass
class PipelineRun:
    """一次具体的管道执行实例"""
    id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    pipeline_name: str = "main"
    trigger: str = "git-push"  # git-push, manual, schedule
    git_ref: str = "main"
    git_commit: str = field(default_factory=lambda: f"commit_{uuid.uuid4().hex[:7]}")
    started_at: datetime = field(default_factory=datetime.utcnow)
    finished_at: Optional[datetime] = None
    status: JobStatus = JobStatus.PENDING
    stages: Dict[str, Stage] = field(default_factory=dict)  # 阶段名 -> Stage对象
    initiator: str = "system"
    labels: Dict[str, str] = field(default_factory=dict)  # 用于观测数据打标的关键标签

    def __post_init__(self):
        # 初始化观测标签
        self.labels = {
            "pipeline": self.pipeline_name,
            "trigger": self.trigger,
            "branch": self.git_ref,
            "commit": self.git_commit[:7],
            "run_id": self.id,
        }

    def get_duration_seconds(self):
        if self.finished_at and self.started_at:
            return (self.finished_at - self.started_at).total_seconds()
        return 0.0

文件路径:plugins/metrics/collector.py

指标收集器的抽象与实现。它负责将PipelineRunJob的状态转换为Prometheus格式的指标。

import time
from typing import Dict
from prometheus_client import Counter, Gauge, Histogram, REGISTRY, start_http_server
from .. import get_global_context
from config.observability_config import CONFIG

class MetricsCollector:
    """指标收集器,单例"""
    _instance = None

    def __init__(self):
        if MetricsCollector._instance is not None:
            raise Exception("This class is a singleton!")
        MetricsCollector._instance = self
        self._init_metrics()

    def _init_metrics(self):
        # 管道级别指标
        self.pipeline_duration = Histogram(
            'cicd_pipeline_duration_seconds',
            'Duration of pipeline runs in seconds',
            ['pipeline', 'trigger', 'branch', 'status'],
            buckets=(10, 30, 60, 120, 300, 600)
        )
        self.pipeline_total = Counter(
            'cicd_pipeline_total',
            'Total number of pipeline runs',
            ['pipeline', 'trigger', 'branch', 'status']
        )
        self.pipeline_active = Gauge(
            'cicd_pipeline_active',
            'Number of active pipeline runs',
            ['pipeline']
        )

        # 任务级别指标
        self.job_duration = Histogram(
            'cicd_job_duration_seconds',
            'Duration of individual jobs in seconds',
            ['pipeline', 'stage', 'job', 'status'],
            buckets=(1, 2, 5, 10, 20)
        )
        self.job_total = Counter(
            'cicd_job_total',
            'Total number of job executions',
            ['pipeline', 'stage', 'job', 'status']
        )

    @classmethod
    def get_instance(cls):
        if cls._instance is None:
            cls()
        return cls._instance

    def observe_pipeline_start(self, pipeline_run):
        labels = {
            'pipeline': pipeline_run.pipeline_name,
            'trigger': pipeline_run.trigger,
            'branch': pipeline_run.git_ref
        }
        self.pipeline_active.labels(**labels).inc()

    def observe_pipeline_finish(self, pipeline_run):
        labels = {
            'pipeline': pipeline_run.pipeline_name,
            'trigger': pipeline_run.trigger,
            'branch': pipeline_run.git_ref,
            'status': pipeline_run.status.value
        }
        duration = pipeline_run.get_duration_seconds()
        if duration > 0:
            self.pipeline_duration.labels(**labels).observe(duration)
        self.pipeline_total.labels(**labels).inc()
        # 减少活跃计数
        self.pipeline_active.labels(pipeline=pipeline_run.pipeline_name).dec()

    def observe_job(self, job, stage_name, pipeline_name, status, duration):
        self.job_duration.labels(
            pipeline=pipeline_name,
            stage=stage_name,
            job=job.name,
            status=status.value
        ).observe(duration)
        self.job_total.labels(
            pipeline=pipeline_name,
            stage=stage_name,
            job=job.name,
            status=status.value
        ).inc()

    def start_metrics_server(self):
        """启动一个HTTP服务器,暴露/metrics端点供Prometheus拉取"""
        if CONFIG.metrics.enabled:
            start_http_server(CONFIG.metrics.prometheus_port)
            print(f"Metrics server started on port {CONFIG.metrics.prometheus_port}")

文件路径:plugins/tracing/__init__.py

集成分布式追踪。使用OpenTelemetry API(此处简化为概念展示)或直接使用Jaeger客户端。

import time
from typing import Dict, Optional
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource

from config.observability_config import CONFIG

class TracingManager:
    _instance = None
    _tracer = None

    def __init__(self):
        if TracingManager._instance is not None:
            raise Exception("This class is a singleton!")
        TracingManager._instance = self
        self._init_tracer()

    def _init_tracer(self):
        if not CONFIG.tracing.enabled:
            return
        # 设置TracerProvider
        resource = Resource(attributes={
            SERVICE_NAME: CONFIG.tracing.service_name
        })
        trace.set_tracer_provider(TracerProvider(resource=resource))
        # 配置Jaeger导出器
        jaeger_exporter = JaegerExporter(
            agent_host_name=CONFIG.tracing.agent_host,
            agent_port=CONFIG.tracing.agent_port,
        )
        # 将导出器添加到TracerProvider
        span_processor = BatchSpanProcessor(jaeger_exporter)
        trace.get_tracer_provider().add_span_processor(span_processor)
        self._tracer = trace.get_tracer(__name__)

    @classmethod
    def get_instance(cls):
        if cls._instance is None:
            cls()
        return cls._instance

    def get_tracer(self):
        return self._tracer

    def start_pipeline_span(self, pipeline_run) -> Optional['Span']:
        """为整个PipelineRun创建一个根Span"""
        if not self._tracer:
            return None
        # 创建一个根Span,其名称包含Pipeline ID以便识别
        span = self._tracer.start_span(f"pipeline:{pipeline_run.pipeline_name}:{pipeline_run.id}")
        # 将Pipeline的上下文信息设置为Span的属性(标签)
        span.set_attributes({
            "pipeline.id": pipeline_run.id,
            "pipeline.name": pipeline_run.pipeline_name,
            "pipeline.trigger": pipeline_run.trigger,
            "pipeline.git.ref": pipeline_run.git_ref,
            "pipeline.git.commit": pipeline_run.git_commit[:7],
        })
        return span

    def start_stage_span(self, stage_name, parent_span: Optional['Span'] = None) -> Optional['Span']:
        if not self._tracer or not parent_span:
            return None
        # 创建一个子Span,代表一个阶段
        context = trace.set_span_in_context(parent_span)
        span = self._tracer.start_span(f"stage:{stage_name}", context=context)
        span.set_attribute("stage.name", stage_name)
        return span

    def start_job_span(self, job_name, parent_span: Optional['Span'] = None) -> Optional['Span']:
        if not self._tracer or not parent_span:
            return None
        context = trace.set_span_in_context(parent_span)
        span = self._tracer.start_span(f"job:{job_name}", context=context)
        span.set_attribute("job.name", job_name)
        return span

# 便捷函数
def get_tracer():
    return TracingManager.get_instance().get_tracer()

def start_pipeline_span(pipeline_run):
    return TracingManager.get_instance().start_pipeline_span(pipeline_run)

def start_stage_span(stage_name, parent_span):
    return TracingManager.get_instance().start_stage_span(stage_name, parent_span)

def start_job_span(job_name, parent_span):
    return TracingManager.get_instance().start_job_span(job_name, parent_span)

文件路径:core/simulator.py

这是管道执行引擎,它协调领域模型的执行,并在关键时刻调用各观测性插件来发射数据。

import time
import random
import threading
from datetime import datetime
from queue import Queue
from typing import List, Dict
import logging

from .pipeline import PipelineRun, Stage, Job, JobStatus
from plugins.metrics.collector import MetricsCollector
from plugins.tracing import start_pipeline_span, start_stage_span, start_job_span
from config.observability_config import CONFIG

# 配置结构化日志记录器
logger = logging.getLogger("obscd.simulator")
logger.setLevel(getattr(logging, CONFIG.logging.level))
# 注意:在实际项目中,这里应添加一个自定义的Handler将日志发送到Loki。
# 为简化,我们仅打印到控制台,但格式为JSON。
import json
class JsonFormatter(logging.Formatter):
    def format(self, record):
        log_object = {
            "timestamp": self.formatTime(record, self.datefmt),
            "level": record.levelname,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            **CONFIG.logging.static_labels  # 添加静态标签
        }
        if hasattr(record, 'custom_labels'):
            log_object.update(record.custom_labels)
        return json.dumps(log_object)

ch = logging.StreamHandler()
ch.setFormatter(JsonFormatter())
logger.addHandler(ch)

class PipelineSimulator:
    """模拟CI/CD管道执行,并集成观测性数据发射"""

    def __init__(self):
        self.metrics_collector = MetricsCollector.get_instance()
        self.pipeline_queue = Queue()
        self.is_running = True
        self.worker_thread = threading.Thread(target=self._worker, daemon=True)
        self.worker_thread.start()
        logger.info("PipelineSimulator initialized")

    def submit_pipeline(self, pipeline_run: PipelineRun):
        """提交一个管道执行请求到队列"""
        self.pipeline_queue.put(pipeline_run)
        logger.info("Pipeline submitted", extra={'custom_labels': pipeline_run.labels})

    def _worker(self):
        """工作线程,持续从队列中取出并执行管道"""
        while self.is_running:
            try:
                pipeline_run = self.pipeline_queue.get(timeout=1)
                self._execute_pipeline(pipeline_run)
            except:
                continue

    def _execute_pipeline(self, pipeline_run: PipelineRun):
        """执行一个具体的PipelineRun"""
        # 1. 开始管道层面的观测
        self.metrics_collector.observe_pipeline_start(pipeline_run)
        pipeline_span = start_pipeline_span(pipeline_run)
        pipeline_run.status = JobStatus.RUNNING

        log_ctx = {'custom_labels': pipeline_run.labels}
        logger.info(f"Pipeline {pipeline_run.id} started", extra=log_ctx)

        try:
            # 定义示例管道阶段 (这是一个固定的管道定义)
            stages_def = [
                ("clone", []),
                ("build", ["compile", "docker-build"]),
                ("test", ["unit-test", "integration-test"]),
                ("deploy-staging", ["deploy-to-staging"]),
                ("e2e-test", ["run-e2e-tests"]),
                ("deploy-prod", ["deploy-to-prod"]),
            ]

            # 创建Stage和Job对象
            for stage_name, job_names in stages_def:
                jobs = [Job(name=jn) for jn in job_names]
                pipeline_run.stages[stage_name] = Stage(name=stage_name, jobs=jobs)

            # 按顺序执行各个阶段
            for stage_name in pipeline_run.stages.keys():
                stage = pipeline_run.stages[stage_name]
                stage_span = start_stage_span(stage_name, pipeline_span) if pipeline_span else None

                logger.info(f"Entering stage: {stage_name}", extra={**log_ctx, 'custom_labels': {**log_ctx['custom_labels'], 'stage': stage_name}})

                stage.status = JobStatus.RUNNING
                # 并行执行该阶段内的所有Job(简化:实际中可能有依赖)
                for job in stage.jobs:
                    job_span = start_job_span(job.name, stage_span) if stage_span else None
                    job_start = datetime.utcnow()
                    # 执行任务
                    job.execute(pipeline_run.labels)
                    job_end = datetime.utcnow()
                    duration = (job_end - job_start).total_seconds()

                    # 记录任务观测数据
                    self.metrics_collector.observe_job(job, stage_name, pipeline_run.pipeline_name, job.status, duration)
                    job_log_ctx = {**log_ctx, 'custom_labels': {**log_ctx['custom_labels'], 'stage': stage_name, 'job': job.name, 'job_status': job.status.value}}
                    logger.info(f"Job '{job.name}' finished with status: {job.status}", extra=job_log_ctx)

                    if job_span:
                        job_span.set_attribute("job.status", job.status.value)
                        job_span.set_attribute("job.duration_seconds", duration)
                        job_span.end()

                    # 如果任务失败,标记阶段和管道为失败,并跳出(模拟快速失败)
                    if job.status == JobStatus.FAILED:
                        stage.status = JobStatus.FAILED
                        pipeline_run.status = JobStatus.FAILED
                        logger.error(f"Stage '{stage_name}' failed due to job '{job.name}'", extra=job_log_ctx)
                        break # 跳出当前阶段的job循环

                if stage.status != JobStatus.FAILED:
                    stage.status = JobStatus.SUCCESS

                if stage_span:
                    stage_span.set_attribute("stage.status", stage.status.value)
                    stage_span.end()

                # 如果阶段失败,跳出整个管道
                if pipeline_run.status == JobStatus.FAILED:
                    logger.warning(f"Pipeline {pipeline_run.id} failed at stage {stage_name}", extra=log_ctx)
                    break

            # 所有阶段执行完毕,设置管道最终状态
            if pipeline_run.status != JobStatus.FAILED:
                pipeline_run.status = JobStatus.SUCCESS

        except Exception as e:
            pipeline_run.status = JobStatus.FAILED
            logger.exception(f"Pipeline {pipeline_run.id} execution crashed", extra=log_ctx)
        finally:
            pipeline_run.finished_at = datetime.utcnow()
            # 完成管道层面的观测
            self.metrics_collector.observe_pipeline_finish(pipeline_run)
            if pipeline_span:
                pipeline_span.set_attribute("pipeline.status", pipeline_run.status.value)
                pipeline_span.set_attribute("pipeline.duration_seconds", pipeline_run.get_duration_seconds())
                pipeline_span.end()
            final_log = {**log_ctx, 'custom_labels': {**log_ctx['custom_labels'], 'pipeline_status': pipeline_run.status.value, 'duration_seconds': pipeline_run.get_duration_seconds()}}
            logger.info(f"Pipeline {pipeline_run.id} finished with status: {pipeline_run.status}", extra=final_log)

    def stop(self):
        self.is_running = False
        self.worker_thread.join(timeout=5)

文件路径:docker-compose.yml

定义并启动聚合层的所有服务(Prometheus, Loki, Jaeger, Grafana)。这是运行项目的关键基础设施。

version: '3.8'

services:
  prometheus:
    image: prom/prometheus:latest
    container_name: obscd-prometheus
    ports:

      - "9090:9090"
    command:

      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--web.console.libraries=/etc/prometheus/console_libraries'
      - '--web.console.templates=/etc/prometheus/consoles'
      - '--storage.tsdb.retention.time=200h'
      - '--web.enable-lifecycle'
    volumes:

      - ./config/prometheus.yml:/etc/prometheus/prometheus.yml:ro
      - prometheus_data:/prometheus
    networks:

      - obscd-network

  loki:
    image: grafana/loki:latest
    container_name: obscd-loki
    ports:

      - "3100:3100"
    command: -config.file=/etc/loki/local-config.yaml
    volumes:

      - loki_data:/loki
    networks:

      - obscd-network

  jaeger:
    image: jaegertracing/all-in-one:latest
    container_name: obscd-jaeger
    ports:

      - "6831:6831/udp"  # Jaeger agent Thrift compact protocol
      - "16686:16686"    # Jaeger UI
    environment:

      - COLLECTOR_ZIPKIN_HTTP_PORT=9411
    networks:

      - obscd-network

  grafana:
    image: grafana/grafana:latest
    container_name: obscd-grafana
    ports:

      - "3000:3000"
    environment:

      - GF_SECURITY_ADMIN_PASSWORD=admin # 设置默认管理员密码,首次登录后请修改
      - GF_INSTALL_PLUGINS=grafana-loki-datasource,marcusolsson-csv-datasource
    volumes:

      - grafana_data:/var/lib/grafana
      - ./dashboards:/etc/grafana/provisioning/dashboards:ro
      - ./config/grafana-datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml:ro
    networks:

      - obscd-network
    depends_on:

      - prometheus
      - loki
      - jaeger

volumes:
  prometheus_data:
  loki_data:
  grafana_data:

networks:
  obscd-network:
    driver: bridge

文件路径:config/prometheus.yml

Prometheus的配置文件,告诉它从哪里拉取指标(我们的模拟器)。

global:
  scrape_interval: 5s # 每5秒拉取一次数据
  evaluation_interval: 15s

scrape_configs:

  - job_name: 'obscd-pipeline'
    static_configs:

      - targets: ['host.docker.internal:8000'] # 关键!从Docker容器内访问主机上的模拟器
        labels:
          service: 'obscd-pipeline'
          env: 'development'

  - job_name: 'prometheus'
    static_configs:

      - targets: ['localhost:9090']

文件路径:config/grafana-datasources.yaml

Grafana的预配置数据源文件,容器启动时会自动加载,无需在UI中手动添加。

apiVersion: 1

datasources:

  - name: Prometheus
    type: prometheus
    access: proxy
    url: http://prometheus:9090
    isDefault: true
    editable: true

  - name: Loki
    type: loki
    access: proxy
    url: http://loki:3100
    editable: true

  - name: Jaeger
    type: jaeger
    access: proxy
    url: http://jaeger:16686
    editable: true

文件路径:run.py

项目的主入口。启动指标服务器,初始化模拟器,并周期性地提交模拟的管道执行请求。

#!/usr/bin/env python3
"""
ObsCD 可观测持续交付模拟器 - 主启动脚本
"""
import time
import random
import signal
import sys
from datetime import datetime

from core.pipeline import PipelineRun
from core.simulator import PipelineSimulator
from plugins.metrics.collector import MetricsCollector

def signal_handler(sig, frame):
    print("\nShutdown signal received. Stopping simulator...")
    sys.exit(0)

def main():
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    print("=" * 60)
    print("ObsCD - Observable Continuous Delivery Simulator")
    print("=" * 60)

    # 1. 启动指标暴露服务器 (Prometheus会来拉取)
    metrics_collector = MetricsCollector.get_instance()
    metrics_collector.start_metrics_server()

    # 2. 初始化并启动管道模拟器
    simulator = PipelineSimulator()
    print("Simulator started. Submitting sample pipeline runs...")
    print("Press Ctrl+C to stop.")

    # 3. 模拟持续不断的管道触发(例如,每10-30秒一个)
    triggers = ["git-push", "manual", "schedule"]
    branches = ["main", "develop", "feature/auth", "hotfix/login-issue"]
    pipeline_names = ["main", "frontend-deploy", "backend-microservice"]

    run_count = 0
    try:
        while True:
            # 随机间隔,模拟真实的触发频率
            wait_time = random.uniform(10, 30)
            time.sleep(wait_time)

            # 创建一个新的PipelineRun
            run_count += 1
            trigger = random.choice(triggers)
            branch = random.choice(branches)
            pipeline_name = random.choice(pipeline_names)

            pipeline_run = PipelineRun(
                pipeline_name=pipeline_name,
                trigger=trigger,
                git_ref=branch,
                git_commit=f"simulated_commit_{run_count:04d}",
                initiator="simulator",
                started_at=datetime.utcnow()
            )
            # 提交给模拟器执行
            simulator.submit_pipeline(pipeline_run)
            print(f"[{datetime.now().strftime('%H:%M:%S')}] Submitted PipelineRun: id={pipeline_run.id}, pipeline={pipeline_name}, trigger={trigger}, branch={branch}")

    except KeyboardInterrupt:
        print("\nSimulation stopped by user.")
    finally:
        simulator.stop()
        print("ObsCD simulator shutdown complete.")

if __name__ == "__main__":
    main()

5 安装、运行与验证步骤

5.1 前置条件

  1. Docker & Docker Compose:用于运行观测数据聚合层(Prometheus, Loki, Jaeger, Grafana)。
  2. Python 3.8+:用于运行模拟器核心逻辑。建议使用虚拟环境。
  3. Git(可选):用于克隆项目。

5.2 安装与运行

步骤1:获取项目代码

# 假设您已经在一个目录下,我们将在此创建项目文件。
# 请将前面提供的所有代码文件按项目结构树放置到对应位置。
# 例如,创建 config/, core/, plugins/ 等目录,并放入对应的.py文件。
# 创建 docker-compose.yml, config/prometheus.yml 等配置文件。
# 此处我们假设您已完成了文件创建,当前目录为 `obscd/`。

步骤2:安装Python依赖
本项目使用 Poetry 管理依赖。如果您没有Poetry,可以使用pip

# 方法一:使用Poetry(推荐)
cd obscd
poetry install

# 方法二:使用pip和requirements.txt
# 首先,创建一个 requirements.txt 文件,内容如下:
# prometheus-client>=0.17.0
# opentelemetry-api>=1.18.0
# opentelemetry-sdk>=1.18.0
# opentelemetry-exporter-jaeger>=1.18.0
# opentelemetry-instrumentation>=0.38b0
# 然后运行:
pip install -r requirements.txt

步骤3:启动聚合层服务
在项目根目录下,使用 Docker Compose 启动所有后台服务。

docker-compose up -d

等待片刻,使用 docker-compose ps 检查所有服务状态是否为 Up

步骤4:启动ObsCD模拟器
在新的终端窗口中,激活虚拟环境,并运行主程序。

# 如果使用Poetry
cd obscd
poetry run python run.py

# 如果使用pip
cd obscd
python run.py

如果一切正常,您将看到类似以下的输出,表示模拟器正在运行并定期提交管道任务:

============================================================
ObsCD - Observable Continuous Delivery Simulator
============================================================
Metrics server started on port 8000
Simulator started. Submitting sample pipeline runs...
Press Ctrl+C to stop.
[14:30:25] Submitted PipelineRun: id=a3f5c1e2, pipeline=main, trigger=git-push, branch=main
[14:30:48] Submitted PipelineRun: id=b8d92a4f, pipeline=backend-microservice, trigger=manual, branch=develop
...

5.3 验证与观测

现在,整个系统已在运行。让我们通过各个UI来验证可观测性数据是否正常流动。

1. 查看管道执行日志 (控制台)
观察运行run.py的终端,会持续输出结构化的JSON日志,其中包含了pipelinerun_idstagejob等丰富的标签信息。

2. 查看指标 (Prometheus)
打开浏览器,访问 http://localhost:9090

  • 在Graph页面,尝试输入一个我们的指标,例如:cicd_pipeline_duration_seconds_countrate(cicd_job_total[5m]),点击 Execute,您应该能看到图表和数据。
  • 切换到 Status -> Targets 页面,应该能看到一个名为 obscd-pipeline 的target,其状态应为 UP,这表明Prometheus成功拉取到了我们模拟器暴露的指标。

3. 查看分布式追踪 (Jaeger)
打开浏览器,访问 http://localhost:16686

  • 在Service下拉菜单中,选择 obscd-pipeline
  • 点击 Find Traces。您会看到一系列追踪记录,每条对应一个PipelineRun
  • 点击任意一条Trace,可以展开看到详细的Span树状结构,清晰地展示了 pipeline -> stage -> job 的层级和执行时间,并且每个Span都带有我们设置的属性(如git分支、状态等)。

4. 查看统一仪表盘 (Grafana)
打开浏览器,访问 http://localhost:3000,使用默认账号密码登录 (admin / admin)。

  • 添加数据源:由于我们使用了预配置(grafana-datasources.yaml),Prometheus、Loki、Jaeger数据源应已自动添加完毕。您可以在 Configuration -> Data Sources 中确认。
  • 导入仪表盘:我们已经预定义了一个仪表盘。进入 Dashboards -> Browse,您应该能看到一个名为 "CI/CD Pipeline Observability" 的仪表盘。如果没看到,可以手动导入:点击 New -> Import,将 dashboards/pipeline_observability.json 文件内容粘贴进去或上传。
  • 观察仪表盘:该仪表盘包含了多个面板,例如:
    • Pipeline Overview:显示当前活跃管道数、最近成功率、平均耗时等。
    • Pipeline Duration Trend:展示管道耗时的历史趋势。
    • Job Success Rate by Stage:以阶段为维度,展示各个任务的成功率。
    • Recent Pipeline Traces:直接嵌入Jaeger的Trace列表,方便点击跳转。
    • Pipeline Logs:一个Loki日志查询面板,可以查看特定管道或任务的详细日志。
sequenceDiagram participant D as Developer (Git Push) participant O as ObsCD Simulator participant P as Prometheus participant J as Jaeger participant G as Grafana Note over D,O: 1. 事件触发 D->>O: 代码提交 (模拟) activate O Note over O: 2. 管道执行与数据采集 O->>O: 创建PipelineRun<br/>开始根Span(P) O->>P: 记录 pipeline_start metric loop 对于每个Stage/Job O->>O: 创建Stage/Job Span(S/J)<br/>执行模拟任务 O->>P: 记录 job_duration metric O->>O: 生成结构化日志 (本地) end O->>O: 设置最终状态 O->>P: 记录 pipeline_finish metric O->>J: 上报完整的Trace (包含P,S,J Spans) deactivate O Note over G: 3. 数据聚合与展示 P-->>G: (拉取) 提供指标数据 J-->>G: (查询) 提供追踪数据 G->>G: 渲染仪表盘:<br/>- 指标图表<br/>- 成功率<br/>- 追踪列表

图:一次代码提交触发的可观测性数据流序列图。该图展示了从开发者行为到最终在Grafana上可视化的完整数据流转过程。

5.4 停止项目

  1. 在运行 run.py 的终端中,按 Ctrl+C 停止模拟器。
  2. 在项目根目录下,运行以下命令停止并清理所有Docker服务:
docker-compose down -v
`-v` 参数会同时删除创建的volume数据,如果您希望保留数据以便下次启动时恢复,请去掉此参数。

6 总结与扩展

通过 ObsCD 项目,我们成功构建了一个微型的、但五脏俱全的"可观测持续交付"演示环境。我们实践了架构分层思想,并通过 PipelineRunMetricSpan 等关键抽象,将业务概念与观测数据模型清晰地映射起来。

生产级扩展建议:

  1. 真实集成:将 core/simulator.py 中的模拟执行逻辑,替换为对真实CI/CD系统(如Tekton、Argo CD、Jenkins)的API调用或事件监听。
  2. 安全与认证:为Prometheus、Loki、Grafana等组件配置HTTPS和用户认证。
  3. 高可用与伸缩:生产环境的Prometheus、Loki等通常需要集群化部署,并考虑长期存储(如Thanos、Cortex、S3)。
  4. 告警:在Prometheus中定义告警规则(如"管道失败率5分钟内高于10%"),并配置Alertmanager将告警发送到钉钉、Slack或PagerDuty。
  5. 更丰富的指标:添加资源使用指标(CPU、内存)、代码质量指标(测试覆盖率、安全漏洞数)、业务指标(本次发布相关的关键业务指标变化)。
  6. 混沌工程注入:在模拟器中增加随机故障(网络延迟、依赖服务不可用),以验证观测性体系是否能有效发现此类问题。

可观测性不是工具的堆砌,而是一种贯穿研发到运维的工程文化。希望 ObsCD 项目能为您在自己的持续交付体系中设计和落地可观测性,提供一个坚实的起点和清晰的蓝图。