基于Mysql数据库实现问答系统
基于MySQL的FQA问答系统实现
Section titled “基于MySQL的FQA问答系统实现”- 理解FQA问答系统的整体流程。
- 掌握如何整合MySQL、Redis和BM25算法构建QA问答系统。
1 FQA系统概述
Section titled “1 FQA系统概述”本系统从MySQL数据库检索问答对,使用BM25算法计算相似度,并通过Softmax归一化将得分转换为概率值,阈值0.85判断答案可靠性。Redis仅缓存高可靠性结果(相似度>0.85且有答案)。若MySQL无可靠答案,则调用RAG系统检索。
1.1 系统流程
Section titled “1.1 系统流程”- 数据存储:MySQL存储FQA高频问答对数据。
- 问题检索:BM25计算相似度,Softmax归一化后判断阈值0.85。
- 缓存管理:Redis仅存储相似度>0.85且有答案的数据。
- 答案返回:
- 若MySQL返回可靠答案,直接返回。
- 否则,调用RAG系统检索。
1.2 项目结构
Section titled “1.2 项目结构”integrated_qa_system/├── config.ini # 配置文件,包含所有模块的配置├── base/│ ├── config.py # 配置管理,加载 config.ini│ ├── logger.py # 日志设置├── mysql_qa/│ ├── data/│ │ ├── JP学科知识问答.csv # FQA数据集│ ├── db/│ │ ├── mysql_client.py # MySQL 数据库操作│ ├── cache/│ │ ├── redis_client.py # Redis 缓存操作│ ├── retrieval/│ │ ├── bm25_search.py # BM25 搜索│ ├── utils/│ │ ├── preprocess.py # 文本预处理│ ├── main.py # MySQL 系统独立入口,支持查询├── requirements.txt # 依赖文件└── logs/ └── app.log # 日志文件2 代码实现
Section titled “2 代码实现”配置文件 (config.ini)
Section titled “配置文件 (config.ini)”# MySQL 配置[mysql]host = localhostuser = rootpassword = 123456database = subjects_kg
# Redis 配置[redis]host = localhostport = 6379password = 1234db = 0
# 日志配置[logger]log_file = /path/to/your/logs/app.log2.1 配置管理
Section titled “2.1 配置管理”config.py文件定义了Config类,用于集中管理系统中的所有配置参数。这些参数包括数据库连接信息、模型选择、分块策略、API设置等。通过集中管理配置,系统可以方便地调整参数、适配不同环境,并支持通过环境变量进行灵活配置。
# 导入配置解析库import configparser# 导入路径操作库import os
class Config: # 初始化配置,加载 config.ini 文件 def __init__(self, config_file='../config.ini'): # 创建配置解析器 self.config = configparser.ConfigParser() # 读取配置文件 self.config.read(config_file)
# MySQL 配置 # MySQL 主机地址 self.MYSQL_HOST = self.config.get('mysql', 'host', fallback='localhost') # MySQL 用户名 self.MYSQL_USER = self.config.get('mysql', 'user', fallback='root') # MySQL 密码 self.MYSQL_PASSWORD = self.config.get('mysql', 'password', fallback='123456') # MySQL 数据库名 self.MYSQL_DATABASE = self.config.get('mysql', 'database', fallback='subjects_kg')
# Redis 配置 # Redis 主机地址 self.REDIS_HOST = self.config.get('redis', 'host', fallback='localhost') # Redis 端口 self.REDIS_PORT = self.config.getint('redis', 'port', fallback=6379) # Redis 密码 self.REDIS_PASSWORD = self.config.get('redis', 'password', fallback='1234') # Redis 数据库编号 self.REDIS_DB = self.config.getint('redis', 'db', fallback=0) # 日志文件路径 self.LOG_FILE = self.config.get('logger', 'log_file', fallback='logs/app.log')
if __name__ == '__main__': conf = Config() print(conf.CHILD_CHUNK_SIZE)- 默认值:每个参数设有默认值,确保未配置环境变量时系统仍可运行。
- 参数分类:按功能分类(如数据库、模型、分块等),便于管理和维护。
2.2 日志记录
Section titled “2.2 日志记录”logger.py文件定义了setup_logging函数,用于配置系统的日志记录器。日志记录器将运行信息、警告和错误输出到文件和控制台,便于开发、调试和运维人员监控系统状态。
# 导入日志库import logging# 导入路径操作库import os# 导入配置类from config import Config
def setup_logging(log_file=Config().LOG_FILE): # 创建日志目录 os.makedirs(os.path.dirname(log_file), exist_ok=True) # 获取日志器 logger = logging.getLogger("EduRAG") # 设置日志级别 logger.setLevel(logging.INFO) # 避免重复添加处理器 if not logger.handlers: # 创建文件处理器 file_handler = logging.FileHandler(log_file, encoding='utf-8') # 设置文件处理器级别 file_handler.setLevel(logging.INFO) # 创建控制台处理器 console_handler = logging.StreamHandler() # 设置控制台处理器级别 console_handler.setLevel(logging.INFO) # 设置日志格式 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') # 为文件处理器设置格式 file_handler.setFormatter(formatter) # 为控制台处理器设置格式 console_handler.setFormatter(formatter) # 添加文件处理器 logger.addHandler(file_handler) # 添加控制台处理器 logger.addHandler(console_handler) # 返回日志器 return logger
# 初始化日志器logger = setup_logging()- 日志级别:默认设为
INFO,记录关键运行信息。 - 双重输出:同时输出到文件和控制台,便于实时监控和后续分析。
- 格式化:日志包含时间戳、名称、级别和内容,便于问题定位。
2.3 MySQL操作模块
Section titled “2.3 MySQL操作模块”mysql_client.py是一个用于与 MySQL 交互的模块。模块通过读取配置文件连接数据库,支持创建表、从 CSV 文件插入数据、查询问题和答案,以及安全关闭连接。所有操作均通过日志记录,便于调试和监控系统状态。
# 导入 MySQL 连接库import pymysql# 导入pandasimport pandas as pd# 导入配置和日志from base import Config, logger
class MySQLClient: def __init__(self): # 初始化日志 self.logger = logger try: # 连接 MySQL 数据库 self.connection = pymysql.connect( host=Config().MYSQL_HOST, user=Config().MYSQL_USER, password=Config().MYSQL_PASSWORD, database=Config().MYSQL_DATABASE ) # 创建游标 self.cursor = self.connection.cursor() # 记录连接成功 self.logger.info("MySQL 连接成功") except pymysql.MySQLError as e: # 记录连接失败 self.logger.error(f"MySQL 连接失败: {e}") raise
def create_table(self): create_table_query = ''' CREATE TABLE IF NOT EXISTS jpkb ( id INT AUTO_INCREMENT PRIMARY KEY, subject_name VARCHAR(20), question VARCHAR(1000), answer VARCHAR(1000)) ''' try: self.cursor.execute(create_table_query) self.connection.commit() self.logger.info("表创建成功") except pymysql.MySQLError as e: self.logger.error(f"表创建失败: {e}") raise
def insert_data(self, csv_path): try: data = pd.read_csv(csv_path) for _, row in data.iterrows(): insert_query = "INSERT INTO jpkb (subject_name, question, answer) VALUES (%s, %s, %s)" self.cursor.execute(insert_query, (row['学科名称'], row['问题'], row['答案'])) self.connection.commit() self.logger.info("数据插入成功") except Exception as e: self.logger.error(f"数据插入失败: {e}") self.connection.rollback() raise
def fetch_questions(self): # 获取所有问题 try: # 执行查询 self.cursor.execute("SELECT question FROM jpkb") # 获取结果 results = self.cursor.fetchall() # 记录获取成功 self.logger.info("成功获取问题") # 返回结果 return results except pymysql.MySQLError as e: # 记录查询失败 self.logger.error(f"查询失败: {e}") # 返回空列表 return []
def fetch_answer(self, question): # 获取指定问题的答案 try: # 执行查询 self.cursor.execute("SELECT answer FROM jpkb WHERE question=%s", (question,)) # 获取结果 result = self.cursor.fetchone() # 返回答案或 None return result[0] if result else None except pymysql.MySQLError as e: # 记录答案获取失败 self.logger.error(f"答案获取失败: {e}") # 返回 None return None
def close(self): # 关闭数据库连接 try: # 关闭连接 self.connection.close() # 记录关闭成功 self.logger.info("MySQL 连接已关闭") except pymysql.MySQLError as e: # 记录关闭失败 self.logger.error(f"关闭连接失败: {e}")if __name__ == '__main__': mysql_client = MySQLClient() mysql_client.create_table() mysql_client.insert_data('../data/JP学科知识问答.csv')- 数据库连接:通过 config.ini 配置文件读取 MySQL 参数,使用 pymysql 建立连接。
- 表管理:创建 jpkb 表,包含字段 id(自增主键)、subject_name(学科名称)、question(问题)、answer(答案),使用 IF NOT EXISTS 避免重复创建。
- 异常处理:每个方法均捕获异常,记录错误日志并根据需要回滚事务或抛出异常。
2.4 Redis 缓存操作模块
Section titled “2.4 Redis 缓存操作模块”redis_client.py该模块用于与 Redis 数据库交互。模块通过配置文件连接 Redis,支持键值对存储与查询(使用 JSON 序列化)、答案缓存查询,并记录操作日志,便于调试和监控。
# 导入 Redis 客户端import redis# 导入 JSON 处理import json# 导入配置和日志from base import Config, logger
class RedisClient: def __init__(self): # 初始化日志 self.logger = logger try: # 连接 Redis self.client = redis.StrictRedis( host=Config().REDIS_HOST, port=Config().REDIS_PORT, password=Config().REDIS_PASSWORD, db=Config().REDIS_DB, decode_responses=True ) # 记录连接成功 self.logger.info("Redis 连接成功") except redis.RedisError as e: # 记录连接失败 self.logger.error(f"Redis 连接失败: {e}") raise
def set_data(self, key, value): # 存储数据到 Redis try: # 存储 JSON 数据 self.client.set(key, json.dumps(value)) # 记录存储成功 self.logger.info(f"存储数据到 Redis: {key}") except redis.RedisError as e: # 记录存储失败 self.logger.error(f"Redis 存储失败: {e}")
def get_data(self, key): # 从 Redis 获取数据 try: # 获取数据 data = self.client.get(key) # 返回解析后的 JSON 数据或 None return json.loads(data) if data else None except redis.RedisError as e: # 记录获取失败 self.logger.error(f"Redis 获取失败: {e}") # 返回 None return None
def get_answer(self, query): # 获取查询的缓存答案 try: # 从 Redis 获取答案 answer = self.client.get(f"answer:{query}") if answer: # 记录获取成功 self.logger.info(f"从 Redis 获取答案: {query}") # 返回答案 return answer # 返回 None return None except redis.RedisError as e: # 记录查询失败 self.logger.error(f"Redis 查询失败: {e}") # 返回 None return Noneif __name__ == '__main__': redcli = RedisClient() print(redcli)- Redis 连接:通过 config.ini 读取 Redis 配置,使用 redis.StrictRedis 建立连接。
- 数据操作:
- set_data:将键值对(值序列化为 JSON)存储到 Redis。
- get_data:根据键获取值并反序列化 JSON。
- get_answer:查询以 answer:{query} 格式存储的答案缓存。
2.5 文本预处理模块
Section titled “2.5 文本预处理模块”preprocess.py是一个基于 jieba 分词库实现文本预处理的模块。该模块将输入文本转换为小写并进行分词,返回分词结果,支持日志记录以监控处理状态。
# 导入分词库import jieba# 导入日志from base import logger
def preprocess_text(text): # 预处理文本 logger.info("开始预处理文本") try: # 分词并转换为小写 return jieba.lcut(text.lower()) except AttributeError as e: # 记录预处理失败 logger.error(f"文本预处理失败: {e}") # 返回空列表 return []- 文本处理:使用 jieba.lcut 对输入文本进行中文分词,并将文本转换为小写以规范化。
2.6 BM25+Softmax检索模块
Section titled “2.6 BM25+Softmax检索模块”bm25_search.py 是一个基于 BM25 算法和 Softmax 归一化的文本检索模块,用于从问题库中检索与查询最匹配的答案。模块结合 Redis 缓存和 MySQL 数据库,支持问题加载、分词、BM25 评分、Softmax 归一化,并记录操作日志。
# 导入 BM25 算法from rank_bm25 import BM25Okapi# 导入数值计算库import numpy as np# 导入文本预处理from utils.preprocess import preprocess_text# 导入日志from base import logger
class BM25Search: def __init__(self, redis_client, mysql_client): # 初始化日志 self.logger = logger # 初始化 Redis 客户端 self.redis_client = redis_client # 初始化 MySQL 客户端 self.mysql_client = mysql_client # 初始化 BM25 模型 self.bm25 = None # 初始化问题列表 self.questions = None # 初始化原始问题 self.original_questions = None # 加载数据 self._load_data()
def _load_data(self): # 加载数据 original_key = "qa_original_questions" tokenized_key = "qa_tokenized_questions" # 从 Redis 获取原始问题 self.original_questions = self.redis_client.get_data(original_key) # 从 Redis 获取分词问题 tokenized_questions = self.redis_client.get_data(tokenized_key) # 如果 Redis 中没有数据,从 MySQL 加载 if not self.original_questions or not tokenized_questions: # 从 MySQL 获取问题 self.original_questions = self.mysql_client.fetch_questions() if not self.original_questions: # 记录无问题警告 self.logger.warning("未加载到问题") return # 分词问题 tokenized_questions = [preprocess_text(q[0]) for q in self.original_questions] # 存储原始问题到 Redis self.redis_client.set_data(original_key, [(q[0]) for q in self.original_questions]) # 存储分词问题到 Redis self.redis_client.set_data(tokenized_key, tokenized_questions) # 设置问题列表 self.questions = tokenized_questions # 初始化 BM25 模型 self.bm25 = BM25Okapi(self.questions) # 记录 BM25 初始化成功 self.logger.info("BM25 模型初始化完成")
def _softmax(self, scores): # 计算 Softmax 分数 exp_scores = np.exp(scores - np.max(scores)) # 返回归一化分数 return exp_scores / exp_scores.sum()
def search(self, query, threshold=0.85): # 搜索查询 if not query or not isinstance(query, str): # 记录无效查询 self.logger.error("无效查询") # 返回 None 和 False return None, False # 检查 Redis 缓存 cached_answer = self.redis_client.get_answer(query) if cached_answer: # 返回缓存答案 return cached_answer, False try: # 分词查询 query_tokens = preprocess_text(query) # 计算 BM25 分数 scores = self.bm25.get_scores(query_tokens) # 计算 Softmax 分数 softmax_scores = self._softmax(scores) # 获取最高分索引 best_idx = softmax_scores.argmax() # 获取最高分 best_score = softmax_scores[best_idx] # 检查是否超过阈值 if best_score >= threshold: # 获取原始问题 original_question = self.original_questions[best_idx] # 获取答案 answer = self.mysql_client.fetch_answer(original_question) if answer: # 缓存答案 self.redis_client.set_data(f"answer:{query}", answer) # 记录搜索成功 self.logger.info(f"搜索成功,Softmax 相似度: {best_score:.3f}") # 返回答案和 False return answer, False # 记录无可靠答案 self.logger.info(f"未找到可靠答案,最高 Softmax 相似度: {best_score:.3f}") # 返回 None 和 True return None, True except Exception as e: # 记录搜索失败 self.logger.error(f"搜索失败: {e}") # 返回 None 和 True return None, True- 数据加载:优先从 Redis 获取问题和分词数据,若无则从 MySQL 加载并分词后缓存到 Redis。
- BM25 检索:使用 BM25Okapi 计算查询与问题库的相似度,结合 Softmax 归一化评分。
- 答案查询:通过 Redis 缓存答案,若无缓存则从 MySQL 获取并缓存,阈值(默认 0.85)控制答案可靠性。
3 主程序 (main.py)
Section titled “3 主程序 (main.py)”# 导入 MySQL 客户端from db.mysql_client import MySQLClient# 导入 Redis 客户端from cache.redis_client import RedisClient# 导入 BM25 搜索from retrieval.bm25_search import BM25Search# 导入日志from base import logger# 导入时间库import time
class MySQLQASystem: def __init__(self): # 初始化日志 self.logger = logger # 初始化 MySQL 客户端 self.mysql_client = MySQLClient() # 初始化 Redis 客户端 self.redis_client = RedisClient() # 初始化 BM25 搜索 self.bm25_search = BM25Search(self.redis_client, self.mysql_client)
def query(self, query): # 查询 MySQL 系统 start_time = time.time() # 记录查询信息 self.logger.info(f"处理查询: '{query}'") # 执行 BM25 搜索 answer, _ = self.bm25_search.search(query, threshold=0.85) if answer: # 记录 MySQL 答案 self.logger.info(f"MySQL 答案: {answer}") else: # 记录无答案 self.logger.info("SQL中未找到答案, 需要调用RAG系统") # 设置默认答案 answer = "SQL未找到答案" # 计算处理时间 processing_time = time.time() - start_time # 记录处理时间 self.logger.info(f"查询处理耗时 {processing_time:.2f}秒") # 返回答案 return answer
def main(): # 初始化 MySQL 系统 mysql_system = MySQLQASystem() try: # 打印欢迎信息 print("\n欢迎使用 MySQL 问答系统!") print("输入查询进行问答,输入 'exit' 退出。") while True: # 获取用户输入 query = input("\n输入查询: ").strip() if query.lower() == "exit": # 记录退出日志 logger.info("退出 MySQL 系统") # 打印退出信息 print("再见!") break # 执行查询 answer = mysql_system.query(query) # 打印答案 print(f"\n答案: {answer}") except Exception as e: # 记录系统错误 logger.error(f"系统错误: {e}") # 打印错误信息 print(f"发生错误: {e}") finally: # 关闭 MySQL 连接 mysql_system.mysql_client.close()
if __name__ == "__main__": # 运行主程序 main()示例运行结果
Section titled “示例运行结果”假设MySQL中有数据: - 问题:“特殊符号如何切割”,答案:“使用split函数” - 问题:“如何处理字符串”,答案:“使用字符串方法”
查询:“特殊符号的切割”
2025-04-01 10:00:00,123 - INFO - MySQL连接成功2025-04-01 10:00:00,125 - INFO - Redis连接成功2025-04-01 10:00:00,126 - INFO - BM25模型初始化完成欢迎使用 MySQL 问答系统!输入查询进行问答,输入 'exit' 退出。2025-04-01 10:00:00,127 - INFO - 检索成功,Softmax相似度: 0.8922025-04-01 10:00:00,128 - INFO - 数据存入Redis: answer:特殊符号的切割2025-04-01 10:00:00,129 - INFO - MySQL答案: 使用split函数2025-04-01 10:00:00,130 - INFO - MySQL连接已关闭本章整合Mysql和Redis功能,实现了基于余弦相似度问答的QA系统:
- 流程:MySQL存储数据,Redis缓存优化,TF-IDF和余弦相似度匹配问题。
- 工程化:模块化设计、配置文件、日志记录。