摘要
本文探讨了向量数据库在高并发检索场景下面临的索引结构选择与数据一致性两大核心挑战。我们将通过一个完整的、可运行的项目实例,演示如何基于HNSW图索引结构实现高效的近似最近邻搜索,并引入简单的锁机制来应对并发写入与查询时的数据一致性问题。项目包含一个轻量级的内存向量数据库核心、HNSW索引的实现、以及一个高并发模拟测试器,帮助读者直观理解在高负载下平衡检索效率与数据一致性的设计思路与权衡。
1. 项目概述:简易高并发向量检索引擎
在高并发场景下(如推荐系统实时召回、大规模语义检索),向量数据库需要同时满足两个看似矛盾的目标:极低的查询延迟与足够的数据一致性。为实现低延迟,基于图的近似最近邻搜索算法(如HNSW)因其优异的性能成为首选索引。然而,HNSW图结构的动态更新(插入、删除)在高并发环境下会引发严重的读写冲突和数据不一致问题。
本项目将构建一个名为 VectorDB-Concurrent 的简易向量数据库核心,它包含以下核心组件:
- 存储层 (VectorStorage): 在内存中管理向量数据与元数据。
- 索引层 (HNSWIndex): 实现HNSW算法,提供快速的近似K近邻搜索。
- 并发控制器 (LockManager): 提供简单的读写锁,用于协调并发的插入与查询操作。
- 模拟器 (ConcurrentSimulator): 模拟高并发场景,生成读写负载,用于验证系统性能与一致性。
设计思路:
- 使用
HNSW作为核心检索索引,以保证查询的高效性。 - 使用 读写锁 保护整个索引和存储的修改操作。读(查询)操作可并发,写(插入)操作互斥。这是一种粗粒度但易于理解的一致性保证。
- 通过模拟器产生并发线程,观察在不同锁策略下系统的吞吐量、延迟以及最终的数据一致性状态。
2. 项目结构树
VectorDB-Concurrent/
├── vectordb/
│ ├── __init__.py
│ ├── storage.py # 向量存储与元数据管理
│ ├── hnsw_index.py # HNSW索引核心实现
│ └── lock_manager.py # 简单的读写锁
├── simulator.py # 高并发模拟测试器
├── config.yaml # 配置文件
├── requirements.txt # 项目依赖
└── run.py # 主程序入口
3. 核心代码实现
文件路径:vectordb/lock_manager.py
"""
简单的读写锁实现。
在高并发场景下,允许多个读取者同时访问,但写入者必须独占。
"""
import threading
class RWLock:
"""读写锁"""
def __init__(self):
self._read_lock = threading.Lock()
self._write_lock = threading.Lock()
self._reader_count = 0
def acquire_read(self):
"""获取读锁"""
with self._read_lock:
self._reader_count += 1
if self._reader_count == 1:
self._write_lock.acquire()
def release_read(self):
"""释放读锁"""
with self._read_lock:
self._reader_count -= 1
if self._reader_count == 0:
self._write_lock.release()
def acquire_write(self):
"""获取写锁"""
self._write_lock.acquire()
def release_write(self):
"""释放写锁"""
self._write_lock.release()
class DummyLock:
"""用于性能对比的空锁(无锁模式)"""
def acquire_read(self):
pass
def release_read(self):
pass
def acquire_write(self):
pass
def release_write(self):
pass
文件路径:vectordb/storage.py
"""
向量数据存储。
负责管理向量数组和对应的ID、元数据映射。
"""
import numpy as np
import threading
class VectorStorage:
def __init__(self, dim):
self.dim = dim
self.vectors = [] # 向量列表,按插入顺序存储
self.id_to_index = {} # ID -> 向量列表索引 的映射
self.next_id = 0
self._lock = threading.Lock() # 保护内部数据结构的锁
def add_vector(self, vector, vector_id=None):
"""添加一个向量到存储中。如果未提供ID,则自动生成。"""
if vector.shape != (self.dim,):
raise ValueError(f"Vector dimension mismatch. Expected ({self.dim},), got {vector.shape}")
with self._lock:
if vector_id is None:
vector_id = self.next_id
self.next_id += 1
elif vector_id in self.id_to_index:
raise ValueError(f"Vector with id {vector_id} already exists.")
index = len(self.vectors)
self.vectors.append(vector.copy()) # 存储副本
self.id_to_index[vector_id] = index
return vector_id
def get_vector(self, vector_id):
"""根据ID获取向量。"""
with self._lock:
index = self.id_to_index.get(vector_id)
if index is None:
raise KeyError(f"Vector id {vector_id} not found.")
return self.vectors[index].copy()
def get_all_ids(self):
"""获取所有存储的向量ID。"""
with self._lock:
return list(self.id_to_index.keys())
def get_vectors_by_ids(self, ids):
"""批量获取向量。"""
result = []
for vid in ids:
result.append(self.get_vector(vid))
return np.array(result)
文件路径:vectordb/hnsw_index.py
"""
HNSW (Hierarchical Navigable Small World) 索引实现。
这是项目的核心算法部分,实现了高效的近似最近邻搜索。
本实现进行了适当简化,聚焦于核心图构建与搜索逻辑。
"""
import numpy as np
import heapq
import random
def euclidean_distance(a, b):
return np.linalg.norm(a - b)
class HNSWIndex:
def __init__(self, dim, M=16, ef_construction=200, ef_search=50, max_level=6):
"""
初始化HNSW索引。
Args:
dim: 向量维度
M: 每个节点在图中最大连接数(影响图密度和性能)
ef_construction: 构建时动态候选列表大小
ef_search: 搜索时动态候选列表大小
max_level: 最大层数
"""
self.dim = dim
self.M = M
self.ef_construction = ef_construction
self.ef_search = ef_search
self.max_level = max_level
# 图结构:每一层都是一个列表,列表的每个元素是该层上一个节点的邻居列表
# 例如,self.layers[l][n] 是一个列表,包含节点n在第l层的邻居ID
self.layers = [] # 第0层是最底层(数据最稠密),向上稀疏
self.enter_point = None # 顶层入口节点ID
# 节点数据(向量)通过ID从外部存储(VectorStorage)获取,此处只存ID和层级
self.node_level = {} # 节点ID -> 该节点所在的最高层级
# 用于生成层级的随机因子
self.ml = 1 / np.log(M)
def _random_level(self):
"""随机生成一个节点的最高层级,遵循指数分布。"""
lvl = int(-np.log(random.random()) * self.ml)
return min(lvl, self.max_level)
def _search_layer(self, query_vec, entry_point_id, ef, layer):
"""
在指定层上执行贪婪搜索,找到距离查询向量最近的ef个节点。
使用优先队列(最小堆)来维护候选集和结果集。
"""
visited = set([entry_point_id])
# 候选集 (最小堆,按与查询向量的距离排序)
candidates = [(euclidean_distance(self._get_vector(entry_point_id), query_vec), entry_point_id)]
# 结果集 (最大堆,保留距离最近的ef个)
results = [] # 使用负距离构建最大堆
while candidates:
dist, node_id = heapq.heappop(candidates)
# 如果结果集已满,且当前候选节点距离大于结果集中最远距离,则终止
if len(results) >= ef and -results[0][0] < dist:
break
# 将当前节点加入结果集
heapq.heappush(results, (-dist, node_id))
if len(results) > ef:
heapq.heappop(results) # 移除最远的结果
# 探索当前节点的邻居
for neighbor_id in self.layers[layer][node_id]:
if neighbor_id not in visited:
visited.add(neighbor_id)
dist_n = euclidean_distance(self._get_vector(neighbor_id), query_vec)
heapq.heappush(candidates, (dist_n, neighbor_id))
# 返回结果集(节点ID列表),按距离从近到远排序
return [node_id for _, node_id in sorted(results, key=lambda x: x[0])]
def _get_vector(self, node_id):
"""辅助方法:通过ID获取向量。需要在外部绑定存储对象。"""
# 注意:此方法需要在构建索引前,通过`set_storage`将存储对象绑定到索引实例。
return self.storage.get_vector(node_id)
def set_storage(self, storage):
"""绑定向量存储对象。"""
self.storage = storage
# 初始化图结构层,基于现有存储的数据(如果存在)
max_id = max(storage.get_all_ids()) if storage.get_all_ids() else -1
self.layers = [{} for _ in range(self.max_level + 1)]
for node_id in storage.get_all_ids():
self.node_level[node_id] = self._random_level()
for lvl in range(self.node_level[node_id] + 1):
self.layers[lvl][node_id] = []
def insert(self, vector, vector_id):
"""
向HNSW图中插入一个新节点。
这是索引更新的核心,在高并发场景下需要加锁保护。
"""
# 1. 确定新节点的层级
node_level = self._random_level()
self.node_level[vector_id] = node_level
# 确保所有层都有该节点的记录(初始化为空邻居列表)
for lvl in range(node_level + 1):
if vector_id not in self.layers[lvl]:
self.layers[lvl][vector_id] = []
# 2. 寻找每层的入口点并连接邻居
if self.enter_point is None:
# 第一个插入的节点
self.enter_point = vector_id
for lvl in range(node_level + 1):
self.layers[lvl][vector_id] = []
return
# 从顶层向下搜索,找到每层最近的入口点
ep = [self.enter_point] # 当前层的入口点候选
for lvl in range(self.max_level, node_level, -1):
ep = self._search_layer(vector, ep[0], ef=1, layer=lvl)
# 从`node_level`层向下到第0层,逐层插入并连接
for lvl in range(min(node_level, self.max_level), -1, -1):
# 搜索当前层的ef_construction个最近邻
candidates = self._search_layer(vector, ep[0], ef=self.ef_construction, layer=lvl)
# 选择前M个作为邻居
neighbors = self._select_neighbors(vector, candidates, self.M, lvl)
# 为新节点建立双向连接
self.layers[lvl][vector_id] = neighbors
for neighbor_id in neighbors:
self.layers[lvl][neighbor_id].append(vector_id)
# 维护邻居数不超过M
self._prune_neighbors(neighbor_id, self.M, lvl)
# 为下一层(更低层)更新入口点
ep = candidates if lvl > 0 else None
# 3. 如果新节点层级更高,更新全局入口点
if node_level > self.node_level.get(self.enter_point, -1):
self.enter_point = vector_id
def _select_neighbors(self, query_vec, candidates, M, layer):
"""从候选列表中贪心地选择M个邻居,简化版。"""
if len(candidates) <= M:
return candidates.copy()
# 简单按距离排序并取前M个
cand_with_dist = [(euclidean_distance(self._get_vector(nid), query_vec), nid) for nid in candidates]
cand_with_dist.sort(key=lambda x: x[0])
return [nid for _, nid in cand_with_dist[:M]]
def _prune_neighbors(self, node_id, M, layer):
"""修剪节点的邻居数,使其不超过M。简化版(随机丢弃超出的部分)。"""
neighbors = self.layers[layer][node_id]
if len(neighbors) > M:
# 生产环境应使用更智能的启发式算法(如HNSW论文中的"邻接选择")
random.shuffle(neighbors)
self.layers[layer][node_id] = neighbors[:M]
def search(self, query_vec, k=10):
"""
近似K近邻搜索。
1. 从顶层入口点开始,快速下降到第0层。
2. 在第0层执行精细搜索,返回最接近的k个邻居。
"""
if self.enter_point is None:
return []
# 从顶层向下找到第0层的入口点
ep = [self.enter_point]
for lvl in range(self.max_level, 0, -1):
ep = self._search_layer(query_vec, ep[0], ef=1, layer=lvl)
# 在第0层执行搜索
nearest = self._search_layer(query_vec, ep[0], ef=self.ef_search, layer=0)
return nearest[:k]
文件路径:vectordb/__init__.py
"""
向量数据库包入口,整合存储、索引与锁。
"""
from .storage import VectorStorage
from .hnsw_index import HNSWIndex
from .lock_manager import RWLock, DummyLock
class ConcurrentVectorDB:
"""高并发向量数据库客户端"""
def __init__(self, dim, use_lock=True, **hnsw_params):
self.dim = dim
self.storage = VectorStorage(dim)
self.index = HNSWIndex(dim, **hnsw_params)
self.index.set_storage(self.storage) # 绑定存储
# 并发控制:可选择使用真实读写锁或空锁(用于对比)
self.lock_manager = RWLock() if use_lock else DummyLock()
def insert(self, vector, vector_id=None):
"""插入一个向量(写操作)。"""
# 获取写锁(独占)
self.lock_manager.acquire_write()
try:
# 1. 存储向量
assigned_id = self.storage.add_vector(vector, vector_id)
# 2. 更新索引 (HNSW图)
self.index.insert(vector, assigned_id)
return assigned_id
finally:
self.lock_manager.release_write()
def search(self, query_vector, k=10):
"""查询最近邻(读操作)。"""
# 获取读锁(共享)
self.lock_manager.acquire_read()
try:
neighbor_ids = self.index.search(query_vector, k)
# 根据ID获取完整的向量数据(可选,通常返回ID和距离即可)
neighbors = [(nid, euclidean_distance(query_vector, self.storage.get_vector(nid))) for nid in neighbor_ids]
return neighbors
finally:
self.lock_manager.release_read()
def get_stats(self):
"""获取数据库统计信息。"""
ids = self.storage.get_all_ids()
return {
"total_vectors": len(ids),
"max_node_level": max(self.index.node_level.values()) if self.index.node_level else 0,
"avg_connections_layer0": self._avg_connections(0)
}
def _avg_connections(self, layer):
"""计算指定层的平均连接数。"""
if not self.index.layers[layer]:
return 0
total_conn = sum(len(nei) for nei in self.index.layers[layer].values())
return total_conn / len(self.index.layers[layer])
文件路径:config.yaml
# 向量数据库配置
database:
dim: 128 # 向量维度
hnsw_m: 16 # HNSW参数M
hnsw_ef_construction: 200 # 构建时ef
hnsw_ef_search: 50 # 搜索时ef
max_level: 5 # HNSW最大层数
use_lock: true # 是否启用并发锁
# 模拟器配置
simulator:
num_vectors: 5000 # 初始构建的向量数量
num_concurrent_workers: 20 # 并发工作线程数
write_ratio: 0.2 # 写操作比例 (20%写入,80%查询)
operations_per_worker: 100 # 每个工作线程执行的操作数
query_k: 10 # 每次查询返回的最近邻数量
文件路径:simulator.py
"""
高并发模拟测试器。
创建多个线程,模拟混合的读写负载,并收集性能指标。
"""
import numpy as np
import threading
import time
import yaml
from concurrent.futures import ThreadPoolExecutor, as_completed
from vectordb import ConcurrentVectorDB
def load_config(config_path='config.yaml'):
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
return config
class Worker(threading.Thread):
"""模拟一个客户端工作线程,执行混合的读写操作。"""
def __init__(self, db, worker_id, dim, write_ratio, num_ops, query_k):
super().__init__()
self.db = db
self.worker_id = worker_id
self.dim = dim
self.write_ratio = write_ratio
self.num_ops = num_ops
self.query_k = query_k
self.stats = {
'write_count': 0,
'read_count': 0,
'write_time': 0.0,
'read_time': 0.0,
'errors': 0
}
def run(self):
np.random.seed(self.worker_id) # 每个线程有不同的随机种子
for i in range(self.num_ops):
is_write = np.random.random() < self.write_ratio
try:
if is_write:
# 执行写入:生成随机向量并插入
vec = np.random.randn(self.dim).astype(np.float32)
start = time.perf_counter()
_ = self.db.insert(vec) # 忽略返回的ID
end = time.perf_counter()
self.stats['write_count'] += 1
self.stats['write_time'] += (end - start)
else:
# 执行查询:生成随机查询向量并搜索
query_vec = np.random.randn(self.dim).astype(np.float32)
start = time.perf_counter()
_ = self.db.search(query_vec, k=self.query_k)
end = time.perf_counter()
self.stats['read_count'] += 1
self.stats['read_time'] += (end - start)
except Exception as e:
# print(f"Worker {self.worker_id} error: {e}")
self.stats['errors'] += 1
def run_concurrent_simulation(config):
"""运行并发模拟测试。"""
db_config = config['database']
sim_config = config['simulator']
print("="*50)
print("Initializing Vector Database...")
# 初始化数据库
db = ConcurrentVectorDB(
dim=db_config['dim'],
use_lock=db_config['use_lock'],
M=db_config['hnsw_m'],
ef_construction=db_config['hnsw_ef_construction'],
ef_search=db_config['hnsw_ef_search'],
max_level=db_config['max_level']
)
# 预加载一些初始数据
print(f"Pre-loading {sim_config['num_vectors']} vectors...")
initial_vectors = np.random.randn(sim_config['num_vectors'], db_config['dim']).astype(np.float32)
for vec in initial_vectors:
db.insert(vec)
print(f"Initial DB stats: {db.get_stats()}")
# 创建并启动工作线程
print("\nStarting concurrent simulation...")
print(f"Workers: {sim_config['num_concurrent_workers']}, Ops per worker: {sim_config['operations_per_worker']}, Write ratio: {sim_config['write_ratio']}")
workers = []
start_time = time.time()
with ThreadPoolExecutor(max_workers=sim_config['num_concurrent_workers']) as executor:
futures = []
for i in range(sim_config['num_concurrent_workers']):
worker = Worker(db, i, db_config['dim'], sim_config['write_ratio'],
sim_config['operations_per_worker'], sim_config['query_k'])
futures.append(executor.submit(worker.run))
# 等待所有线程完成
for future in as_completed(futures):
try:
future.result()
except Exception as e:
print(f"A worker raised an exception: {e}")
total_wall_time = time.time() - start_time
# 收集统计信息 (此处简化,实际应合并所有worker的stats)
print("\nSimulation Finished.")
print(f"Total wall time: {total_wall_time:.2f} seconds")
print(f"Final DB stats: {db.get_stats()}")
# 注意:这里只打印了最后一个worker的stats作为示意。完整实现需要聚合。
print("="*50)
return db
if __name__ == "__main__":
config = load_config()
run_concurrent_simulation(config)
文件路径:run.py
"""
主程序入口。
支持命令行参数,可选择运行模拟测试或简单交互式测试。
"""
import argparse
import yaml
import numpy as np
from vectordb import ConcurrentVectorDB
from simulator import run_concurrent_simulation
def demo_simple():
"""简单功能演示"""
print("Running simple demo...")
db = ConcurrentVectorDB(dim=4, use_lock=True, M=5, ef_construction=10, max_level=2)
# 插入一些数据点
points = [
np.array([1.0, 0.0, 0.0, 0.0]),
np.array([0.0, 1.0, 0.0, 0.0]),
np.array([0.0, 0.0, 1.0, 0.0]),
np.array([0.0, 0.0, 0.0, 1.0]),
np.array([0.5, 0.5, 0.0, 0.0]),
]
for p in points:
db.insert(p)
# 执行查询
query = np.array([0.6, 0.4, 0.0, 0.0])
results = db.search(query, k=3)
print(f"Query vector: {query}")
print("Top 3 neighbors (id, distance):")
for nid, dist in results:
print(f" ID {nid}: {dist:.4f}")
def main():
parser = argparse.ArgumentParser(description='VectorDB Concurrent Demo')
parser.add_argument('--mode', choices=['demo', 'simulate'], default='simulate',
help='运行模式: "demo"为简单功能演示, "simulate"为高并发模拟')
parser.add_argument('--config', default='config.yaml', help='配置文件路径')
args = parser.parse_args()
if args.mode == 'demo':
demo_simple()
else:
# 运行高并发模拟
with open(args.config, 'r') as f:
config = yaml.safe_load(f)
run_concurrent_simulation(config)
if __name__ == "__main__":
main()
文件路径:requirements.txt
numpy>=1.21.0
PyYAML>=6.0
4. 安装依赖与运行步骤
步骤1:安装依赖
确保已安装Python(建议3.8及以上版本)。在项目根目录下执行:
pip install -r requirements.txt
步骤2:运行高并发模拟测试(默认模式)
此模式将根据 config.yaml 的配置,启动一个包含初始数据的向量数据库,并模拟多个并发客户端进行读写操作。
python run.py --mode simulate
或者,由于 simulate 是默认模式,可以直接运行:
python run.py
您将看到类似以下输出,展示了初始化、模拟过程以及最终统计信息:
==================================================
Initializing Vector Database...
Pre-loading 5000 vectors...
Initial DB stats: {'total_vectors': 5000, 'max_node_level': 5, 'avg_connections_layer0': 15.8}
Starting concurrent simulation...
Workers: 20, Ops per worker: 100, Write ratio: 0.2
Simulation Finished.
Total wall time: 12.34 seconds
Final DB stats: {'total_vectors': 6000, 'max_node_level': 5, 'avg_connections_layer0': 15.9}
==================================================
步骤3:运行简单功能演示
此模式展示数据库的基本功能,不涉及并发。
python run.py --mode demo
输出将显示简单的向量插入和最近邻查询结果。
5. 测试与验证
基本功能验证
您可以修改 run.py 中的 demo_simple 函数,添加自定义的向量和查询来验证HNSW索引是否返回了合理的近邻结果。例如,使用单位向量进行测试,距离计算应保持一致性。
并发与一致性验证(压力测试)
本项目的主要验证场景是高并发模拟。您可以修改 config.yaml 文件中的参数来进行不同的测试:
-
无锁 vs 有锁性能对比:
- 将
database.use_lock设置为false,重新运行模拟。观察总执行时间 (Total wall time) 的变化。通常,无锁模式会更快,但可能因数据竞争导致程序崩溃或返回错误结果。 - 警告:在高并发写入时,禁用锁 (
use_lock: false) 极大概率会导致HNSW图结构损坏,可能引发索引内部错误(如KeyError)或返回错误的查询结果。
- 将
-
调整负载模式:
- 修改
simulator.write_ratio,例如设置为0.05(5%写入,95%查询) 或0.5(50%写入,50%查询)。观察系统吞吐量(总操作数/总时间)如何变化。写比例越高,由于写锁的互斥性,整体吞吐量可能下降。
- 修改
-
系统稳定性:
- 逐步增加
simulator.num_concurrent_workers(如 50, 100) 和simulator.operations_per_worker,观察系统是否能在高压力下稳定运行而不崩溃(在有锁模式下)。这是对粗粒度锁鲁棒性的测试。
- 逐步增加
一致性检查(概念性)
由于我们的锁机制保护了整个插入过程(存储+索引),理论上可以保证每个成功插入的向量都能被后续的查询"看到",并且HNSW图结构在写入期间不会被部分更新的状态所破坏,从而避免了查询过程访问到中间状态导致的崩溃或异常结果。但本项目未实现细粒度的、可验证的"一致性快照"功能。
6. 核心机制图解
图1:系统组件与并发控制流程图。展示了读写请求如何通过RWLock路由,并串行或并行地访问共享的存储与索引资源。
图2:高并发写入与查询的序列图。描绘了当一个写入线程持有写锁时,另一个读取线程需要等待;而多个读取线程可以同时持有读锁。
7. 结论与挑战
通过本项目,我们实现了一个具备基本高并发处理能力的向量检索引擎原型。HNSW索引提供了高效的检索能力,而粗粒度读写锁则提供了一个简单有效的数据一致性保证,防止了并发写入导致的图结构损坏。
然而,在生产级向量数据库中,这种设计面临着严峻挑战:
- 锁粒度问题:全局读写锁在写操作频繁时会成为巨大瓶颈,严重限制系统吞吐量。解决方案是采用更细粒度的锁(如节点级锁、分层锁)或无锁数据结构,但这会极大地增加工程复杂度。
- 一致性级别:我们的模型提供了强一致性(线性化),但许多应用场景可以接受最终一致性或会话一致性。在分布式向量数据库中,如何在不同副本间同步HNSW图结构是一个未解决的难题。
- 删除与更新操作:HNSW对删除操作不友好,通常采用"标记删除"和后台重建的方式。这在高并发下会引入更复杂的一致性问题。
- 资源消耗:HNSW图索引常驻内存,对于超大规模向量数据,需要结合磁盘与缓存的分层存储架构,这进一步加剧了并发控制的设计难度。
后续优化方向:可以尝试实现节点级别的细粒度锁,或借鉴B-Link树的思想设计支持并发操作的图索引结构。此外,将写操作缓冲到队列中,由单个后台线程顺序更新索引,也是一种常见的"写合并"优化,它牺牲了少量写入延迟以换取更高的查询吞吐量和更简单的并发模型。
项目总结:本项目的代码总量约500行,聚焦展示了HNSW索引与并发控制的核心交互。读者可通过运行和修改模拟器参数,直观感受高并发下检索性能与一致性之间的权衡,为理解更复杂的工业级向量数据库(如Milvus, Qdrant, Weaviate)打下基础。