向量存储
发布于 2025-12-31
- 1.理解向量存储在RAG系统中的功能和重要性。
- 2.学会创建和管理向量数据库。
- 3.掌握如何将文本转化为向量并存入数据库。
- 4.理解混合检索与重排序的实现原理。
vector_store.py是EduRAG系统的核心模块之一,封装了与Milvus向量数据库的交互逻辑。它负责将文档转化为向量存储到数据库中,并提供高效的混合检索功能。通过结合BGE-M3嵌入模型和重排序机制,该模块确保系统能够快速检索到与用户查询最相关的文档。
4.1 模块功能概述
Section titled “4.1 模块功能概述”VectorStore类提供了以下主要功能:
- 初始化与集合管理:创建或加载Milvus向量数据库集合。
- 文档向量化与存储:将分块后的文档转换为向量并存储。
- 混合检索与重排序:结合稠密和稀疏向量进行检索,并通过重排序优化结果。
以下将逐一讲解每个方法的实现细节。
导入必备工具包
Section titled “导入必备工具包”# 导入 BGE-M3 嵌入函数,用于生成文档和查询的向量表示from milvus_model.hybrid import BGEM3EmbeddingFunction# 导入 Milvus 相关类,用于操作向量数据库from pymilvus import MilvusClient, DataType, AnnSearchRequest, WeightedRanker# 导入 Document 类,用于创建文档对象from langchain.docstore.document import Document# 导入 CrossEncoder,用于重排序和 NLI 判断from sentence_transformers import CrossEncoder# 导入 hashlib 模块,用于生成唯一 ID 的哈希值import hashlibfrom base import logger, Config
conf = Config()4.2 初始化方法
Section titled “4.2 初始化方法”__init__方法初始化VectorStore类的实例,设置基本参数并调用集合创建或加载方法。
# 定义 VectorStore 类,封装向量存储和检索功能class VectorStore: # 初始化方法,设置向量存储的基本参数 def __init__(self, collection_name=conf.MILVUS_COLLECTION_NAME, host=conf.MILVUS_HOST, port=conf.MILVUS_PORT, database=conf.MILVUS_DATABASE_NAME): # 设置 Milvus 集合名称 self.collection_name = collection_name # 设置 Milvus 主机地址 self.host = host # 设置 Milvus 端口号 self.port = port # 设置 Milvus 数据库名称 self.database = database # 设置日志记录器 self.logger = logger # 初始化 BGE-Reranker 模型,用于重排序检索结果 self.reranker = CrossEncoder("./bge/bge-reranker-large") # 初始化 BGE-M3 嵌入函数,使用 CPU 设备,不启用 FP16 self.embedding_function = BGEM3EmbeddingFunction(use_fp16=False, device="cpu") # 获取稠密向量的维度 self.dense_dim = self.embedding_function.dim["dense"] # 初始化 Milvus 客户端,连接到指定主机和数据库 self.client = MilvusClient(uri=f"http://{self.host}:{self.port}", db_name=self.database) # 调用方法创建或加载 Milvus 集合 self._create_or_load_collection()- 参数设置:
- 使用
Config中的默认值初始化集合名称、主机、端口和数据库名称。
- 使用
- 模型初始化:
reranker:加载BGE-Reranker模型,用于后续重排序。embedding_function:初始化BGE-M3嵌入模型,禁用FP16,使用CPU运行。dense_dim:获取稠密向量的维度。
- 客户端连接:
- 创建
MilvusClient实例,连接到指定主机和数据库。
- 创建
- 集合管理:
- 调用
_create_or_load_collection方法,确保集合可用。
- 调用
- BGE-M3模型:提供稠密和稀疏向量生成能力。
- 灵活性:通过参数支持自定义配置。
4.3 创建或加载集合
Section titled “4.3 创建或加载集合”_create_or_load_collection方法检查并创建或加载Milvus集合,定义字段结构和索引参数。
# 定义私有方法,创建或加载 Milvus 集合def _create_or_load_collection(self): # 检查指定集合是否已存在 if not self.client.has_collection(self.collection_name): # 创建集合 Schema,禁用自动 ID,启用动态字段 schema = self.client.create_schema(auto_id=False, enable_dynamic_field=True) # 添加 ID 字段,作为主键,VARCHAR 类型,最大长度 100 schema.add_field(field_name="id", datatype=DataType.VARCHAR, is_primary=True, max_length=100) # 添加文本字段,VARCHAR 类型,最大长度 65535 schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535) # 添加稠密向量字段,FLOAT_VECTOR 类型,维度由嵌入函数指定 schema.add_field(field_name="dense_vector", datatype=DataType.FLOAT_VECTOR, dim=self.dense_dim) # 添加稀疏向量字段,SPARSE_FLOAT_VECTOR 类型 schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR) # 添加父块 ID 字段,VARCHAR 类型,最大长度 100 schema.add_field(field_name="parent_id", datatype=DataType.VARCHAR, max_length=100) # 添加父块内容字段,VARCHAR 类型,最大长度 65535 schema.add_field(field_name="parent_content", datatype=DataType.VARCHAR, max_length=65535) # 添加学科类别字段,VARCHAR 类型,最大长度 50 schema.add_field(field_name="source", datatype=DataType.VARCHAR, max_length=50) # 添加时间戳字段,VARCHAR 类型,最大长度 50 schema.add_field(field_name="timestamp", datatype=DataType.VARCHAR, max_length=50)
# 创建索引参数对象 index_params = self.client.prepare_index_params() # 为稠密向量字段添加 IVF_FLAT 索引,度量类型为内积 (IP) index_params.add_index( field_name="dense_vector", index_name="dense_index", index_type="IVF_FLAT", metric_type="IP", params={"nlist": 128} ) # 为稀疏向量字段添加 SPARSE_INVERTED_INDEX 索引,度量类型为内积 (IP) index_params.add_index( field_name="sparse_vector", index_name="sparse_index", index_type="SPARSE_INVERTED_INDEX", metric_type="IP", params={"drop_ratio_build": 0.2} )
# 创建 Milvus 集合,应用定义的 Schema 和索引参数 self.client.create_collection(collection_name=self.collection_name, schema=schema, index_params=index_params) # 记录创建集合的日志 logger.info(f"已创建集合 {self.collection_name}") # 如果集合已存在 else: # 记录加载集合的日志 logger.info(f"已加载集合 {self.collection_name}") # 将集合加载到内存,确保可立即查询 self.client.load_collection(self.collection_name)- 检查集合是否存在:
- 使用
has_collection判断是否需要创建新集合。
- 使用
- 定义Schema:
- 设置字段:包括
id(主键)、text(原文)、向量字段和元数据字段。 - 禁用自动ID,启用动态字段。
- 设置字段:包括
- 创建索引:
- 稠密向量使用
IVF_FLAT索引,稀疏向量使用SPARSE_INVERTED_INDEX。
- 稠密向量使用
- 创建并加载集合:
- 调用
create_collection创建集合,并加载到内存。
- 调用
- 字段设计:支持多种数据类型和元数据管理。
- 索引优化:平衡检索速度和精度。
4.4 添加文档
Section titled “4.4 添加文档”add_documents方法将分块后的文档转换为向量并存储到Milvus集合中。
# 定义方法,向向量存储添加文档def add_documents(self, documents): # 提取所有文档的内容列表 texts = [doc.page_content for doc in documents] # 使用 BGE-M3 嵌入函数生成文档的嵌入 embeddings = self.embedding_function(texts) # 初始化空列表,用于存储插入的数据 data = [] # 遍历每个文档,带上索引 i for i, doc in enumerate(documents): # 生成文档内容的 MD5 哈希值,作为唯一 ID text_hash = hashlib.md5(doc.page_content.encode('utf-8')).hexdigest() # 初始化稀疏向量字典 sparse_vector = {} # 获取第 i 行的稀疏向量数据 row = embeddings["sparse"].getrow(i) # 获取稀疏向量的非零值索引 indices = row.indices # 获取稀疏向量的非零值 values = row.data # 将索引和值配对,填充稀疏向量字典 for idx, value in zip(indices, values): sparse_vector[idx] = value # 创建数据字典,包含所有字段 data.append({ "id": text_hash, "text": doc.page_content, "dense_vector": embeddings["dense"][i], "sparse_vector": sparse_vector, "parent_id": doc.metadata["parent_id"], "parent_content": doc.metadata["parent_content"], "source": doc.metadata.get("source", "unknown"), "timestamp": doc.metadata.get("timestamp", "unknown") }) # 检查是否有数据需要插入 if data: # 使用 upsert 操作插入数据,覆盖重复 ID self.client.upsert(collection_name=self.collection_name, data=data) # 记录插入或更新的文档数量日志 logger.info(f"已插入或更新 {len(data)} 个文档")- 提取文本:
- 从文档对象中提取文本内容。
- 生成向量:
- 使用BGE-M3模型生成稠密和稀疏向量。
- 构造数据:
- 为每篇文档生成唯一ID(MD5哈希)。
- 将向量和元数据组织成字典。
- 存储数据:
- 使用
upsert操作插入或更新数据。
- 使用
- 唯一性:通过MD5哈希确保ID唯一。
- 稀疏向量处理:将稀疏矩阵转换为字典格式。
4.5 混合检索与重排序
Section titled “4.5 混合检索与重排序”hybrid_search_with_rerank方法实现混合检索并重排序,返回最相关文档。
# 定义方法,执行混合检索并重排序def hybrid_search_with_rerank(self, query, k=conf.RETRIEVAL_K, source_filter=None): # 使用 BGE-M3 嵌入函数生成查询的嵌入 query_embeddings = self.embedding_function([query]) # 获取查询的稠密向量 dense_query_vector = query_embeddings["dense"][0] # 初始化查询的稀疏向量字典 sparse_query_vector = {} # 获取查询稀疏向量的第 0 行数据 row = query_embeddings["sparse"].getrow(0) # 获取稀疏向量的非零值索引 indices = row.indices # 获取稀疏向量的非零值 values = row.data # 将索引和值配对,填充稀疏向量字典 for idx, value in zip(indices, values): sparse_query_vector[idx] = value
# 初始化过滤表达式,默认不过滤 filter_expr = f"source == '{source_filter}'" if source_filter else "" # 创建稠密向量搜索请求 dense_request = AnnSearchRequest( data=[dense_query_vector], anns_field="dense_vector", param={"metric_type": "IP", "params": {"nprobe": 10}}, limit=k, expr=filter_expr ) # 创建稀疏向量搜索请求 sparse_request = AnnSearchRequest( data=[sparse_query_vector], anns_field="sparse_vector", param={"metric_type": "IP", "params": {}}, limit=k, expr=filter_expr )
# 创建加权排序器,稀疏向量权重 0.7,稠密向量权重 1.0 ranker = WeightedRanker(0.7, 1.0) # 执行混合搜索,返回 Top-K 结果 results = self.client.hybrid_search( collection_name=self.collection_name, reqs=[dense_request, sparse_request], ranker=ranker, limit=k, output_fields=["text", "parent_id", "parent_content", "source", "timestamp"] )[0]
# 将搜索结果转换为 Document 对象列表 sub_chunks = [self._doc_from_hit(hit["entity"]) for hit in results] print(f'sub_chunks-->{len(sub_chunks)}') # 从子块中提取去重的父文档 parent_docs = self._get_unique_parent_docs(sub_chunks) # 如果只有1个文档,直接返回跳过重排序 if len(parent_docs) < 2: return parent_docs[:conf.CANDIDATE_M] # 如果有父文档,进行重排序 if parent_docs: # 创建查询与文档内容的配对列表 pairs = [[query, doc.page_content] for doc in parent_docs] # 使用 BGE-Reranker 计算每个配对的得分 scores = self.reranker.predict(pairs) # 根据得分从高到低排序文档 ranked_parent_docs = [doc for _, doc in sorted(zip(scores, parent_docs), reverse=True)] # 如果没有父文档,返回空列表 else: ranked_parent_docs = []
# 返回前 k 个重排序后的文档 return ranked_parent_docs[:conf.CANDIDATE_M]- 生成查询向量:
- 使用BGE-M3生成稠密和稀疏向量。
- 构造检索请求:
- 为稠密和稀疏向量分别创建
AnnSearchRequest。
- 为稠密和稀疏向量分别创建
- 混合检索:
- 使用
WeightedRanker融合结果。
- 使用
- 重排序:
- 使用
CrossEncoder重新排序父文档。
- 使用
- 混合检索:提升覆盖率和准确性。
- 重排序:确保最相关文档优先。
4.6 获取唯一父文档
Section titled “4.6 获取唯一父文档”_get_unique_parent_docs方法从子块中提取去重的父文档。
# 定义私有方法,从子块中提取去重的父文档def _get_unique_parent_docs(self, sub_chunks): # 初始化集合,用于存储已处理的父块内容(去重) parent_contents = set() # 初始化列表,用于存储唯一父文档 unique_docs = [] # 遍历所有子块 for chunk in sub_chunks: # 获取子块的父块内容,默认为子块内容 parent_content = chunk.metadata.get("parent_content", chunk.page_content) # 检查父块内容是否非空且未重复 if parent_content and parent_content not in parent_contents: # 创建新的 Document 对象,包含父块内容和元数据 unique_docs.append(Document(page_content=parent_content, metadata=chunk.metadata)) # 将父块内容添加到去重集合 parent_contents.add(parent_content) # 返回去重后的父文档列表 return unique_docs- 去重:
- 使用集合记录已处理的父内容。
- 构造文档:
- 创建包含父内容的
Document对象。
- 创建包含父内容的
- 去重逻辑:避免重复父文档。
- 元数据保留:保持完整性。
4.7 从查询结果创建文档
Section titled “4.7 从查询结果创建文档”_doc_from_hit方法将Milvus查询结果转换为Document对象。
# 定义私有方法,从 Milvus 查询结果创建 Document 对象def _doc_from_hit(self, hit): # 创建并返回 Document 对象,填充内容和元数据 return Document( page_content=hit.get("text"), metadata={ "parent_id": hit.get("parent_id"), "parent_content": hit.get("parent_content"), "source": hit.get("source"), "timestamp": hit.get("timestamp") } )- 提取内容和元数据:
- 从查询结果中获取字段。
- 创建对象:
- 构造
Document实例。
- 构造
本章节全面讲解了vector_store.py模块的每个方法:
-
初始化:设置参数和模型,为向量存储做好准备。
-
创建集合:定义结构和索引,构建高效数据库。
-
添加文档:实现文档向量化与存储。
-
混合检索:结合稠密和稀疏向量并重排序,返回最相关结果。
-
辅助方法:支持去重和文档转换。
学习者通过本章节掌握了向量存储的完整流程,为RAG系统的检索功能奠定了基础。
发布于 2025-12-31