数据湖仓架构下基于查询历史与成本的自动优化器调优策略

2900559190
2026年01月08日
更新于 2026年02月04日
22 次阅读
摘要:本文探讨了在数据湖仓(Lakehouse)架构下,如何利用查询历史与执行成本信息构建一个自适应查询优化器。我们设计并实现了一个轻量级原型系统,该系统能够监控查询执行、收集性能指标、基于历史模式与成本模型分析查询计划,并动态调整优化策略(如选择聚合算法、连接顺序、是否使用缓存等)。文章将详细阐述系统的设计思路、核心架构,并提供完整的、可运行的Python项目代码,涵盖查询历史管理、成本分析、优化决策...

摘要

本文探讨了在数据湖仓(Lakehouse)架构下,如何利用查询历史与执行成本信息构建一个自适应查询优化器。我们设计并实现了一个轻量级原型系统,该系统能够监控查询执行、收集性能指标、基于历史模式与成本模型分析查询计划,并动态调整优化策略(如选择聚合算法、连接顺序、是否使用缓存等)。文章将详细阐述系统的设计思路、核心架构,并提供完整的、可运行的Python项目代码,涵盖查询历史管理、成本分析、优化决策与执行反馈等关键模块,最终通过一个示例演示系统的自动调优过程。

1. 项目概述:自适应查询优化器原型

在现代数据湖仓架构(如Delta Lake、Apache Iceberg)中,查询优化器是性能的核心。传统的基于静态统计信息的优化器在面对动态变化的数据特征、多样的查询模式以及云原生环境的弹性资源时,往往表现不佳。本项目旨在构建一个原型系统,演示如何通过收集查询历史与真实执行成本,训练一个简单的成本模型,并利用该模型对未来查询的执行计划进行动态调整与优化。

核心设计思路:

  1. 历史记录:系统拦截或接收SQL查询,为其生成唯一的指纹(如查询结构哈希),并记录其最终执行成本(如时间、CPU周期、扫描数据量)。
  2. 成本分析:基于历史数据,为不同的查询操作(扫描、过滤、连接、聚合)建立经验性的成本估算模型。
  3. 模式匹配与优化:当新查询到来时,首先尝试在历史中寻找相似查询。若找到,则分析其历史执行计划与成本,推荐更优的配置或计划(例如,对于高频小范围过滤查询,建议使用合适的索引或Z-Order排序)。若未找到,则使用基础优化器生成计划,并在执行后收集信息反馈给历史库。
  4. 反馈循环:形成一个"执行-收集-分析-优化"的闭环,使优化器能够自适应数据与负载的变化。

2. 项目结构树

lakehouse-auto-optimizer/
├── config/
│   └── optimizer_config.yaml      # 优化器配置文件
├── core/
│   ├── __init__.py
│   ├── query_history_manager.py   # 查询历史管理
│   ├── cost_analyzer.py           # 成本分析器
│   ├── plan_optimizer.py          # 优化决策器
│   └── executor.py                # 模拟执行器
├── models/
│   ├── __init__.py
│   └── query_record.py            # 查询记录数据模型
├── utils/
│   ├── __init__.py
│   └── query_fingerprint.py       # 查询指纹生成
├── main.py                        # 主程序入口
├── requirements.txt               # 项目依赖
└── demo_queries.sql              # 示例查询文件

3. 核心代码实现

文件路径:models/query_record.py

import json
from dataclasses import dataclass, asdict, field
from datetime import datetime
from typing import Dict, List, Any, Optional

@dataclass
class QueryRecord:
    """
    查询记录数据模型,用于存储历史查询的元数据与执行指标。
    """
    query_id: str                     # 查询唯一ID (UUID或指纹)
    sql_text: str                     # 原始SQL文本
    fingerprint: str                  # 查询结构指纹
    execution_plan: Optional[Dict[str, Any]] = None  # 原始执行计划(JSON化)
    optimized_plan: Optional[Dict[str, Any]] = None  # 优化后的执行计划
    execution_stats: Dict[str, float] = field(default_factory=dict)  # 执行统计信息
    created_at: datetime = field(default_factory=datetime.now)
    tags: List[str] = field(default_factory=list)    # 标签,如"高频","大表扫描"

    # 核心执行成本指标
    @property
    def total_cost(self) -> float:
        """计算综合成本。这里简单定义为执行时间,可扩展为加权模型。"""
        return self.execution_stats.get('execution_time_ms', 0.0)

    def to_dict(self) -> Dict[str, Any]:
        """转换为字典,便于序列化存储。"""
        data = asdict(self)
        data['created_at'] = self.created_at.isoformat()
        return data

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'QueryRecord':
        """从字典反序列化。"""
        data['created_at'] = datetime.fromisoformat(data['created_at'])
        return cls(**data)

文件路径:utils/query_fingerprint.py

import hashlib
import sqlparse
from sqlparse.sql import TokenList, Token
from sqlparse.tokens import Keyword, DML

def generate_query_fingerprint(sql_text: str) -> str:
    """
    生成查询的结构化指纹。
    通过规范化SQL(去除字面量、别名,统一大小写)后计算哈希值,
    使得结构相似的查询具有相同指纹。
    """
    try:
        # 1. 解析SQL
        parsed = sqlparse.parse(sql_text)[0]
        # 2. 简单规范化:转为大写,移除多余空格
        normalized_tokens = []
        for token in parsed.flatten():
            # 保留关键字、标识符、运算符等结构token,移除字符串、数字等字面量
            if token.ttype is None or token.ttype in (Keyword, DML):
                normalized_tokens.append(token.value.upper())
            elif token.is_group:
                # 递归处理子组,这里简化处理
                pass
        normalized_sql = " ".join(normalized_tokens)
        # 3. 进一步简化:移除 BETWEEN, IN 等后面的列表,WHERE后的具体值等(示例简化版)
        # 这里使用一个简单的正则替换来移除数字和单引号字符串(生产环境需更健壮)
        import re
        cleaned_sql = re.sub(r'\b\d+\b', '?', normalized_sql)         # 替换数字
        cleaned_sql = re.sub(r"'[^']*'", '?', cleaned_sql)           # 替换单引号字符串
        cleaned_sql = re.sub(r'"[^"]*"', '?', cleaned_sql)           # 替换双引号字符串
        # 4. 计算MD5哈希作为指纹
        fingerprint = hashlib.md5(cleaned_sql.encode('utf-8')).hexdigest()
        return fingerprint
    except Exception as e:
        # 如果解析失败,回退到整个SQL文本的哈希
        return hashlib.md5(sql_text.encode('utf-8')).hexdigest()

# 示例使用
if __name__ == "__main__":
    sql1 = "SELECT id, name FROM users WHERE age > 25 AND department = 'Sales' LIMIT 10"
    sql2 = "SELECT id, name FROM users WHERE age > 30 AND department = 'Engineering' LIMIT 5"
    fp1 = generate_query_fingerprint(sql1)
    fp2 = generate_query_fingerprint(sql2)
    print(f"SQL1指纹: {fp1}")
    print(f"SQL2指纹: {fp2}")
    print(f"结构相似? {fp1 == fp2}")  # 应该为 True,因为结构相同

文件路径:core/query_history_manager.py

import json
import os
from typing import Dict, List, Optional, Tuple
from models.query_record import QueryRecord

class QueryHistoryManager:
    """
    查询历史管理器。负责查询记录的存储、检索与维护。
    使用简单的JSON文件作为后端存储,生产环境可替换为数据库。
    """
    def __init__(self, storage_path: str = "data/query_history.json"):
        self.storage_path = storage_path
        self._history: Dict[str, QueryRecord] = {}  # query_id -> QueryRecord
        self._load_history()

    def _load_history(self):
        """从文件加载历史记录。"""
        if os.path.exists(self.storage_path):
            try:
                with open(self.storage_path, 'r') as f:
                    data_list = json.load(f)
                    for data in data_list:
                        record = QueryRecord.from_dict(data)
                        self._history[record.query_id] = record
                print(f"已加载 {len(self._history)} 条历史查询记录。")
            except (json.JSONDecodeError, KeyError) as e:
                print(f"加载历史记录失败,将重新创建: {e}")
                self._history = {}

    def _save_history(self):
        """保存历史记录到文件。"""
        os.makedirs(os.path.dirname(self.storage_path), exist_ok=True)
        data_list = [record.to_dict() for record in self._history.values()]
        with open(self.storage_path, 'w') as f:
            json.dump(data_list, f, indent=2)

    def add_record(self, record: QueryRecord):
        """添加新的查询记录。"""
        self._history[record.query_id] = record
        self._save_history()

    def find_by_fingerprint(self, fingerprint: str) -> List[QueryRecord]:
        """根据查询指纹查找所有历史记录。"""
        return [r for r in self._history.values() if r.fingerprint == fingerprint]

    def find_similar_by_sql(self, sql_text: str, threshold: float = 0.8) -> List[Tuple[QueryRecord, float]]:
        """
        通过简单的文本相似度查找相似查询(示例方法)。
        生产环境可使用更高级的NLP或结构相似度算法。
        """
        from difflib import SequenceMatcher
        similar = []
        for record in self._history.values():
            ratio = SequenceMatcher(None, sql_text, record.sql_text).ratio()
            if ratio > threshold:
                similar.append((record, ratio))
        # 按相似度降序排序
        similar.sort(key=lambda x: x[1], reverse=True)
        return similar

    def get_all_records(self) -> List[QueryRecord]:
        """获取所有历史记录。"""
        return list(self._history.values())

    def clear_history(self):
        """清空历史记录(用于测试)。"""
        self._history = {}
        self._save_history()

文件路径:core/cost_analyzer.py

from typing import Dict, List, Optional
from models.query_record import QueryRecord
import statistics

class CostAnalyzer:
    """
    成本分析器。分析历史查询记录,构建操作成本模型并提供成本预测。
    """
    def __init__(self):
        # 存储操作类型的基础成本(从历史中学习得到)
        self.op_cost_model: Dict[str, Dict[str, float]] = {
            "TableScan": {"base_cost": 10.0, "cost_per_mb": 0.5},
            "Filter": {"base_cost": 2.0, "cost_per_row": 0.001},
            "HashAggregate": {"base_cost": 15.0, "cost_per_group": 0.1},
            "Sort": {"base_cost": 20.0, "cost_per_row": 0.005},
        }
        self.history_samples: List[QueryRecord] = []

    def update_model(self, records: List[QueryRecord]):
        """
        使用历史记录更新成本模型(简化版)。
        这里仅演示:计算每种操作的平均选择率等。
        """
        if not records:
            return
        # 示例:分析Filter操作的选择率
        filter_times = []
        for record in records:
            # 假设我们从execution_plan中提取Filter操作的行数信息(这里模拟)
            if "Filter" in str(record.execution_plan):
                # 模拟:根据执行时间估算选择率的影响
                filter_times.append(record.execution_stats.get('filter_time_ms', 0))
        if filter_times:
            avg_filter_time = statistics.mean(filter_times)
            # 简单更新模型:假设平均过滤时间反映了成本
            self.op_cost_model["Filter"]["base_cost"] = avg_filter_time * 0.8  # 调整因子

        self.history_samples.extend(records)
        print(f"成本模型已更新,基于 {len(self.history_samples)} 个样本。")

    def estimate_plan_cost(self, execution_plan: Dict) -> float:
        """
        给定一个执行计划(JSON结构),估算其总成本。
        这里实现一个非常简化的递归估算。
        """
        total_cost = 0.0
        plan_type = execution_plan.get("Node Type", "Unknown")
        plan_rows = execution_plan.get("Plan Rows", 1000)
        plan_width = execution_plan.get("Plan Width", 100)  # 假设字节宽度

        # 获取当前节点的成本
        node_cost = self._estimate_node_cost(plan_type, plan_rows, plan_width)
        total_cost += node_cost

        # 递归估算子节点成本
        if "Plans" in execution_plan:
            for subplan in execution_plan["Plans"]:
                total_cost += self.estimate_plan_cost(subplan)

        return total_cost

    def _estimate_node_cost(self, node_type: str, rows: float, width: int) -> float:
        """估算单个节点的成本。"""
        model = self.op_cost_model.get(node_type, {"base_cost": 5.0})
        cost = model.get("base_cost", 5.0)
        # 根据行数调整成本(如果模型中有相关参数)
        if "cost_per_row" in model:
            cost += model["cost_per_row"] * rows
        if "cost_per_mb" in model and width > 0:
            data_mb = (rows * width) / (1024 * 1024)
            cost += model["cost_per_mb"] * data_mb
        return cost

    def recommend_optimization(self, record: QueryRecord) -> Dict[str, any]:
        """
        基于历史成本分析,为查询推荐优化策略。
        """
        recommendations = []
        # 策略1:如果查询是高频的且执行时间长,考虑建议创建索引/Z-Order
        similar_records = [r for r in self.history_samples if r.fingerprint == record.fingerprint]
        if len(similar_records) > 5:
            avg_cost = statistics.mean([r.total_cost for r in similar_records])
            if avg_cost > 1000:  # 阈值,表示执行慢
                recommendations.append({
                    "type": "INDEX/Z-ORDER",
                    "reason": f"高频查询(执行{len(similar_records)}次),平均成本{avg_cost:.2f}ms过高",
                    "suggestion": "考虑在WHERE条件涉及的列上创建索引或Z-Order排序。"
                })

        # 策略2:如果发现某个Filter操作选择率很低但成本高,建议统计信息更新或谓词下推检查
        # ... (简化,省略详细实现)

        return {
            "query_id": record.query_id,
            "fingerprint": record.fingerprint,
            "recommendations": recommendations
        }
graph TD subgraph "输入" A[新SQL查询] --> B[查询指纹生成] end subgraph "核心优化循环" B --> C{历史匹配} C -- 找到相似历史 --> D[检索历史执行计划与成本] C -- 无匹配 --> E[基础优化器生成计划] D --> F[成本分析器评估历史计划] F --> G[优化决策器生成建议/新计划] E --> H[模拟执行器运行计划] end subgraph "反馈与学习" H --> I[收集执行统计信息] I --> J[更新查询历史记录] J --> K[成本分析器更新模型] K --> C end subgraph "输出" G --> L[优化后的执行计划] H --> M[查询结果] I --> N[性能报告] end style A fill:#e1f5e1 style L fill:#f0f8ff style M fill:#f0f8ff style N fill:#f0f8ff

文件路径:core/plan_optimizer.py

import copy
from typing import Dict, Any, Optional, List
from models.query_record import QueryRecord
from core.cost_analyzer import CostAnalyzer

class PlanOptimizer:
    """
    优化决策器。整合历史信息与成本模型,生成优化后的执行计划或优化建议。
    """
    def __init__(self, history_manager, cost_analyzer: CostAnalyzer):
        self.history_manager = history_manager
        self.cost_analyzer = cost_analyzer
        # 优化规则库
        self.optimization_rules = [
            self._rule_use_hash_aggregate,
            self._rule_push_down_filter,
            self._rule_avoid_cartesian_join,
        ]

    def optimize(self, sql_text: str, original_plan: Dict[str, Any]) -> Dict[str, Any]:
        """
        主优化函数。输入原始SQL和基础执行计划,输出优化后的计划。
        """
        optimized_plan = copy.deepcopy(original_plan)
        fingerprint = self.history_manager.find_by_fingerprint(sql_text)  # 这里应使用工具生成,简化调用

        # 1. 应用基于规则的优化
        for rule in self.optimization_rules:
            optimized_plan = rule(optimized_plan, sql_text)

        # 2. 应用基于成本的优化(如果历史中有类似查询)
        similar_records = self.history_manager.find_similar_by_sql(sql_text, threshold=0.7)
        if similar_records:
            best_record = min(similar_records, key=lambda x: x[0].total_cost)[0]
            # 如果历史最优计划与当前规则优化后不同,且成本更低,考虑采用历史计划结构
            hist_plan_cost = best_record.total_cost
            current_estimated_cost = self.cost_analyzer.estimate_plan_cost(optimized_plan)
            if hist_plan_cost < current_estimated_cost * 0.9:  # 历史成本低10%以上
                print(f"基于历史记录 {best_record.query_id},采用更优的计划结构。")
                # 注意:这里直接替换计划可能不安全,生产环境需做兼容性检查
                # optimized_plan = copy.deepcopy(best_record.optimized_plan or best_record.execution_plan)

        # 3. 生成优化建议报告
        recommendations = self.cost_analyzer.recommend_optimization(
            QueryRecord(query_id="temp", sql_text=sql_text, fingerprint="")
        )

        return {
            "optimized_plan": optimized_plan,
            "estimated_cost_saving": self.cost_analyzer.estimate_plan_cost(original_plan) -
                                     self.cost_analyzer.estimate_plan_cost(optimized_plan),
            "recommendations": recommendations.get("recommendations", [])
        }

    def _rule_use_hash_aggregate(self, plan: Dict, sql_text: str) -> Dict:
        """规则:如果GROUP BY列基数高,且历史表明HashAggregate更快,则选用HashAggregate。"""
        # 简化实现:检测到Aggregate节点且没有显式指定算法时,判断SQL特征
        if plan.get("Node Type") == "Aggregate" and "Hash" not in plan.get("Strategy", ""):
            if "GROUP BY" in sql_text.upper():
                # 模拟判断:如果GROUP BY列数少,可能适合HashAggregate
                plan["Strategy"] = "Hash"  # 修改计划策略
                plan["Node Type"] = "HashAggregate"
        return plan

    def _rule_push_down_filter(self, plan: Dict, sql_text: str) -> Dict:
        """规则:尽可能将过滤条件下推。"""
        # 这是一个简化的演示。实际需要递归遍历计划树。
        if plan.get("Node Type") == "Join":
            # 检查是否有Filter可以在Join前执行
            pass
        return plan

    def _rule_avoid_cartesian_join(self, plan: Dict, sql_text: str) -> Dict:
        """规则:警告或避免笛卡尔积。"""
        if plan.get("Node Type") == "Join" and plan.get("Join Type") == "Nested Loop":
            if not plan.get("Join Condition"):
                print("警告:检测到可能产生笛卡尔积的Nested Loop Join。")
                # 可以考虑改为Hash Join或添加警告
        return plan

文件路径:core/executor.py

import time
import random
from typing import Dict, Any
from models.query_record import QueryRecord

class SimulatedExecutor:
    """
    模拟执行器。在实际系统中,这里会连接Spark/Trino/Presto等引擎。
    本模拟器根据计划节点类型和假定的数据量,模拟执行并生成性能指标。
    """
    def __init__(self):
        self.base_scan_speed = 100  # MB/ms (模拟)

    def execute_plan(self, plan: Dict[str, Any], query_id: str) -> Dict[str, float]:
        """
        模拟执行计划,返回收集到的性能指标字典。
        """
        stats = {}
        total_time = 0.0

        # 模拟递归执行计划并累加时间
        def simulate_node(node: Dict, parent_rows: int = 0) -> float:
            node_type = node.get("Node Type", "Unknown")
            estimated_rows = node.get("Plan Rows", 1000)
            node_time = 0.0

            if node_type == "Seq Scan":
                # 模拟扫描时间,与数据量有关
                node_time = estimated_rows * 0.01 + random.uniform(0, 5)
                stats['scan_time_ms'] = stats.get('scan_time_ms', 0) + node_time
            elif node_type == "HashAggregate":
                node_time = estimated_rows * 0.005 + 10
                stats['agg_time_ms'] = stats.get('agg_time_ms', 0) + node_time
            elif node_type == "Hash Join":
                node_time = (parent_rows + estimated_rows) * 0.002 + 15
                stats['join_time_ms'] = stats.get('join_time_ms', 0) + node_time
            elif node_type == "Filter":
                node_time = estimated_rows * 0.001 + 2
                stats['filter_time_ms'] = stats.get('filter_time_ms', 0) + node_time
            elif node_type == "Sort":
                node_time = estimated_rows * 0.007 * (estimated_rows / 10000)  # N log N 模拟
                stats['sort_time_ms'] = stats.get('sort_time_ms', 0) + node_time
            else:
                node_time = estimated_rows * 0.001

            # 递归处理子节点
            child_time = 0.0
            if "Plans" in node:
                for child in node["Plans"]:
                    child_time += simulate_node(child, estimated_rows)
            return node_time + child_time

        total_time = simulate_node(plan)
        stats['execution_time_ms'] = total_time
        stats['cpu_cost'] = total_time * 0.8  # 模拟CPU成本
        stats['io_cost'] = total_time * 0.2   # 模拟IO成本
        return stats

    def execute_and_collect(self, sql_text: str, plan: Dict, query_id: str) -> QueryRecord:
        """
        执行查询并收集完整记录。
        """
        # 1. 模拟执行,收集统计信息
        execution_stats = self.execute_plan(plan, query_id)
        # 2. 模拟实际执行时间(加入随机性)
        time.sleep(execution_stats['execution_time_ms'] / 1000.0)

        # 3. 创建查询记录
        from utils.query_fingerprint import generate_query_fingerprint
        record = QueryRecord(
            query_id=query_id,
            sql_text=sql_text,
            fingerprint=generate_query_fingerprint(sql_text),
            execution_plan=plan,
            execution_stats=execution_stats,
            tags=["simulated_execution"]
        )
        return record

文件路径:config/optimizer_config.yaml

optimizer:
  # 历史记录配置
  history:
    storage_path: "data/query_history.json"
    max_records: 10000  # 最大保存记录数,防止无限增长

  # 成本模型初始值 (单位:抽象成本单位)
  default_op_costs:
    TableScan:
      base_cost: 10.0
      cost_per_mb: 0.5
    Filter:
      base_cost: 2.0
      cost_per_row: 0.001
    HashAggregate:
      base_cost: 15.0
      cost_per_group: 0.1
    Sort:
      base_cost: 20.0
      cost_per_row: 0.005
    HashJoin:
      base_cost: 25.0
      cost_per_row: 0.002

  # 优化器行为
  behavior:
    enable_history_based_opt: true
    enable_cost_based_opt: true
    similarity_threshold: 0.7  # SQL文本相似度阈值
    min_samples_for_learning: 10  # 开始学习成本模型所需的最小样本数

  # 模拟执行器配置
  simulator:
    base_scan_speed_mb_per_ms: 100
    randomness_factor: 0.2  # 执行时间随机波动因子

文件路径:main.py

import uuid
import yaml
import json
from pathlib import Path
from core.query_history_manager import QueryHistoryManager
from core.cost_analyzer import CostAnalyzer
from core.plan_optimizer import PlanOptimizer
from core.executor import SimulatedExecutor
from utils.query_fingerprint import generate_query_fingerprint

class LakehouseAutoOptimizer:
    """
    自适应优化器主类,整合所有组件。
    """
    def __init__(self, config_path: str = "config/optimizer_config.yaml"):
        with open(config_path, 'r') as f:
            self.config = yaml.safe_load(f)

        self.history_manager = QueryHistoryManager(
            self.config['optimizer']['history']['storage_path']
        )
        self.cost_analyzer = CostAnalyzer()
        # 使用历史数据初始化成本分析器
        self.cost_analyzer.update_model(self.history_manager.get_all_records())
        self.optimizer = PlanOptimizer(self.history_manager, self.cost_analyzer)
        self.executor = SimulatedExecutor()

    def process_query(self, sql_text: str) -> Dict[str, Any]:
        """
        处理单个查询的完整流程:优化 -> 执行 -> 记录。
        """
        query_id = str(uuid.uuid4())
        print(f"\n{'='*60}")
        print(f"处理查询: {query_id}")
        print(f"SQL: {sql_text[:100]}...")

        # 1. 生成查询指纹
        fingerprint = generate_query_fingerprint(sql_text)
        print(f"查询指纹: {fingerprint}")

        # 2. 查找历史相似查询
        similar_history = self.history_manager.find_by_fingerprint(fingerprint)
        if similar_history:
            print(f"找到 {len(similar_history)} 条历史记录。")
            # 可以快速推荐已知的好计划
            best_historical = min(similar_history, key=lambda r: r.total_cost)
            print(f"历史最佳执行成本: {best_historical.total_cost:.2f} ms")

        # 3. 生成基础执行计划 (这里模拟一个计划生成)
        # 在实际系统中,这里会调用Catalyst/Calcite等生成初始逻辑计划。
        base_plan = self._generate_base_plan(sql_text)

        # 4. 优化执行计划
        optimization_result = self.optimizer.optimize(sql_text, base_plan)
        optimized_plan = optimization_result['optimized_plan']

        # 5. 执行优化后的计划(或历史最佳计划)
        final_plan = optimized_plan  # 可以选择使用历史计划
        record = self.executor.execute_and_collect(sql_text, final_plan, query_id)
        record.optimized_plan = optimized_plan

        # 6. 保存记录并更新模型
        self.history_manager.add_record(record)
        # 定期更新模型,这里简单每次更新
        self.cost_analyzer.update_model([record])

        # 7. 准备返回结果
        result = {
            "query_id": query_id,
            "fingerprint": fingerprint,
            "original_estimated_cost": self.cost_analyzer.estimate_plan_cost(base_plan),
            "optimized_estimated_cost": self.cost_analyzer.estimate_plan_cost(optimized_plan),
            "actual_execution_cost": record.total_cost,
            "recommendations": optimization_result['recommendations'],
            "execution_stats": record.execution_stats,
        }
        print(f"执行完成。实际成本: {record.total_cost:.2f} ms")
        if optimization_result['recommendations']:
            print("优化建议:")
            for rec in optimization_result['recommendations']:
                print(f"  - [{rec['type']}] {rec['suggestion']} (原因: {rec['reason']})")
        return result

    def _generate_base_plan(self, sql_text: str) -> Dict:
        """
        模拟查询引擎生成基础执行计划。
        根据SQL的关键字生成一个简单的计划树。
        """
        sql_upper = sql_text.upper()
        plan = {"Node Type": "Result"}
        child = None

        if "GROUP BY" in sql_upper and "SUM" in sql_upper or "COUNT" in sql_upper:
            child = {"Node Type": "HashAggregate", "Plan Rows": 500, "Plan Width": 50}
        elif "JOIN" in sql_upper:
            child = {"Node Type": "Hash Join", "Plan Rows": 10000, "Plan Width": 200}
        else:
            child = {"Node Type": "Seq Scan", "Plan Rows": 100000, "Plan Width": 100}

        if "WHERE" in sql_upper:
            filter_node = {"Node Type": "Filter", "Plan Rows": child["Plan Rows"] * 0.3, "Plan Width": child["Plan Width"]}
            filter_node["Plans"] = [child]
            child = filter_node

        if "ORDER BY" in sql_upper:
            sort_node = {"Node Type": "Sort", "Plan Rows": child["Plan Rows"], "Plan Width": child["Plan Width"]}
            sort_node["Plans"] = [child]
            child = sort_node

        plan["Plans"] = [child]
        return plan

    def run_demo_from_file(self, query_file: str = "demo_queries.sql"):
        """
        从文件读取SQL语句并批量运行演示。
        """
        if not Path(query_file).exists():
            # 创建一些示例查询
            sample_queries = [
                "SELECT customer_id, SUM(amount) FROM sales WHERE year = 2023 GROUP BY customer_id ORDER BY SUM(amount) DESC",
                "SELECT * FROM users u JOIN orders o ON u.id = o.user_id WHERE u.country = 'US' AND o.status = 'SHIPPED'",
                "SELECT product_id, COUNT(*) FROM orders WHERE order_date > '2024-01-01' GROUP BY product_id HAVING COUNT(*) > 10",
                "SELECT customer_id, SUM(amount) FROM sales WHERE year = 2024 GROUP BY customer_id ORDER BY SUM(amount) DESC", # 与第一条结构相同
                "SELECT id, name, salary FROM employees WHERE department = 'Engineering' AND salary > 100000 ORDER BY name",
            ]
            with open(query_file, 'w') as f:
                f.write("\n".join(sample_queries))
            print(f"已创建示例查询文件: {query_file}")

        with open(query_file, 'r') as f:
            queries = [line.strip() for line in f if line.strip() and not line.strip().startswith('--')]

        results = []
        for i, sql in enumerate(queries):
            print(f"\n>>> 执行演示查询 {i+1}/{len(queries)}")
            result = self.process_query(sql)
            results.append(result)

        # 打印摘要报告
        print(f"\n{'='*60}")
        print("演示完成!摘要报告:")
        total_original_est = sum(r['original_estimated_cost'] for r in results)
        total_optimized_est = sum(r['optimized_estimated_cost'] for r in results)
        total_actual = sum(r['actual_execution_cost'] for r in results)
        print(f"总估算成本 (原始): {total_original_est:.2f}")
        print(f"总估算成本 (优化后): {total_optimized_est:.2f}")
        print(f"总实际执行成本: {total_actual:.2f}")
        if total_original_est > 0:
            saving = (total_original_est - total_optimized_est) / total_original_est * 100
            print(f"估算成本节省: {saving:.2f}%")

        return results

if __name__ == "__main__":
    optimizer = LakehouseAutoOptimizer()
    optimizer.run_demo_from_file()

文件路径:requirements.txt

PyYAML>=6.0
sqlparse>=0.4.0
sequenceDiagram participant Client participant Main participant HistoryManager participant CostAnalyzer participant PlanOptimizer participant Executor Client->>Main: 提交SQL查询 Main->>Main: 生成查询指纹 Main->>HistoryManager: 查找相似历史记录 HistoryManager-->>Main: 返回历史记录列表 Main->>PlanOptimizer: 请求优化(SQL,基础计划) PlanOptimizer->>CostAnalyzer: 估算基础计划成本 CostAnalyzer-->>PlanOptimizer: 返回估算成本 PlanOptimizer->>PlanOptimizer: 应用优化规则 alt 存在相似历史且成本更低 PlanOptimizer->>PlanOptimizer: 调整计划为历史更优结构 end PlanOptimizer-->>Main: 返回优化后的计划与建议 Main->>Executor: 执行优化后的计划 Executor->>Executor: 模拟执行,收集指标 Executor-->>Main: 返回执行结果与统计信息 Main->>HistoryManager: 保存本次查询完整记录 HistoryManager->>CostAnalyzer: 通知新记录可用 CostAnalyzer->>CostAnalyzer: 更新内部成本模型 Main-->>Client: 返回查询结果、成本报告与优化建议

4. 安装依赖与运行步骤

  1. 环境准备:确保已安装Python 3.8或更高版本。

  2. 克隆/创建项目目录

mkdir lakehouse-auto-optimizer
    cd lakehouse-auto-optimizer
  1. 创建虚拟环境(推荐)
python -m venv venv
    # 激活虚拟环境
    # Linux/Mac:
    source venv/bin/activate
    # Windows:
    # venv\Scripts\activate
  1. 安装依赖
pip install -r requirements.txt
  1. 运行演示程序
python main.py

首次运行会创建 demo_queries.sql 文件和 data/query_history.json 文件。程序将依次执行文件中的示例SQL,模拟优化与执行过程,并在控制台输出详细的日志,包括查询指纹、成本对比和优化建议。执行完毕后,会生成一个简单的摘要报告。

5. 测试与验证

本项目包含一个自验证的演示流程。为了进一步验证优化效果,可以:

  1. 观察历史学习:多次运行 python main.py。由于历史记录被保存,系统在第二次及以后运行时,会对结构相似的查询(如示例中的两个 SUM 查询)识别出来,并可能应用历史学到的优化策略。观察控制台输出的"找到 X 条历史记录"和"历史最佳执行成本"日志。

  2. 手动添加复杂查询进行测试:编辑 demo_queries.sql 文件,加入更复杂或特定的SQL语句,重新运行主程序,观察优化建议的变化。

  3. 检查历史数据文件:查看生成的 data/query_history.json 文件,可以直观看到系统记录的所有查询的详细信息,包括指纹、执行计划和成本指标。

  4. 单元测试(示例):可以创建一个简单的测试文件 test_optimizer.py 来验证核心组件功能。

# test_optimizer.py (简略示例)
    import sys
    sys.path.append('.')
    from utils.query_fingerprint import generate_query_fingerprint

    def test_fingerprint():
        sql1 = "SELECT a FROM t WHERE b = 1"
        sql2 = "SELECT a FROM t WHERE b = 999"
        fp1 = generate_query_fingerprint(sql1)
        fp2 = generate_query_fingerprint(sql2)
        assert fp1 == fp2, "结构相同的查询应具有相同指纹"
        print("指纹生成测试通过。")

    if __name__ == "__main__":
        test_fingerprint()
运行测试:`python test_optimizer.py`

6. 扩展说明与最佳实践

生产环境部署考虑:

  1. 可扩展存储:将 QueryHistoryManager 的后端从JSON文件替换为数据库(如PostgreSQL)或键值存储(如Redis),以支持高并发和大数据量。
  2. 集成真实引擎SimulatedExecutor 应替换为与真实查询引擎(如Apache Spark SQL、Trino、Flink)的交互模块,通过其提供的Listener或Hook接口收集真实的执行计划与指标。
  3. 增强成本模型CostAnalyzer 中的成本模型应基于更丰富的统计信息(数据倾斜、集群资源状态、网络开销)进行机器学习训练,以得到更准确的预测。
  4. 安全与隔离:在多租户环境中,需要按租户或项目隔离查询历史与模型。
  5. 渐进式优化:优化策略应采取渐进、可回滚的方式应用到生产查询,例如先在小部分查询流量上进行A/B测试,验证有效后再全量推广。

性能考量:

  • 指纹生成与历史匹配应高效,避免成为查询链路的瓶颈。可以考虑使用布隆过滤器进行快速筛选,或对历史记录建立索引。
  • 成本模型的学习和更新可以异步进行,不影响实时查询路径。

通过此原型系统,我们展示了构建一个具备自我学习与调整能力的智能查询优化器的可行路径。在实际的Lakehouse环境中,将此理念与现有优化器深度结合,有望显著降低查询延迟与资源消耗,实现真正的"自适应"数据平台。