系统API接口开发及webui应用
发布于 2025-12-31
基于 FastAPI 的 FAQ 与知识库查询接口
Section titled “基于 FastAPI 的 FAQ 与知识库查询接口”- 理解如何基于 FastAPI 构建高效的问答系统 API 接口。
- 掌握 WebSocket 流式输出和 HTTP 接口的实现,优化用户交互体验。
- 学习前后端交互、前端静态文件服务以及日常问候处理的集成。
app.py 是对 new_main.py 的进一步优化,基于 FastAPI 框架构建了 RESTful API 和 WebSocket 接口,集成了 MySQL FAQ 和 RAG 系统,支持流式输出、对话历史管理和日常问候处理。同时,通过静态文件服务支持前端页面交互,适用于教育场景下的实时问答系统。
1 查询流程图
Section titled “1 查询流程图”以下是基于 FastAPI 的问答系统查询流程图,展示从前端请求到后端响应的处理逻辑。

2 流程说明
Section titled “2 流程说明”-
请求接收:前端通过 HTTP POST (
/api/query) 或 WebSocket (/api/stream) 发送查询请求,包含查询内容 (query)、学科过滤 (source_filter) 和会话 ID (session_id)。 -
日常问候处理:检查查询是否为日常问候(如“你好”),若匹配则返回模板化回复。
-
BM25 搜索:使用 BM25 算法搜索 MySQL 知识库,设置相似度阈值 0.85。
-
答案判断:
- 若找到可靠答案(相似度 > 0.85),通过 HTTP 一次性返回或 WebSocket 流式返回。
- 若无可靠答案且需要 RAG,HTTP 接口提示使用 WebSocket,WebSocket 接口流式返回 RAG 答案。
- 若无需 RAG,返回“未找到答案”。
-
历史更新:将查询和答案存入 MySQL 的
conversations表,保留最近 5 轮对话。 -
响应输出:HTTP 返回 JSON 格式响应,WebSocket 流式发送 JSON 数据(包含
start、token、end或error类型)。
3 代码介绍
Section titled “3 代码介绍”以下是 app.py 的完整代码,包含逐行注释,详细解析功能与实现逻辑。
3.1 导入必备的工具包
Section titled “3.1 导入必备的工具包”# 导入 FastAPI 相关模块,用于构建 API 和 WebSocketfrom fastapi import FastAPI, WebSocket, HTTPException, Query, Depends# 导入 FastAPI 响应类型,用于流式响应和文件服务from fastapi.responses import StreamingResponse, FileResponse# 导入 CORS 中间件,支持跨域请求from fastapi.middleware.cors import CORSMiddleware# 导入静态文件服务模块from fastapi.staticfiles import StaticFiles# 导入 WebSocket 断开异常from starlette.websockets import WebSocketDisconnect# 导入系统操作模块,用于文件目录管理import os# 导入 Pydantic 模型,用于请求验证from pydantic import BaseModel# 导入异步事件循环模块import asyncio# 导入 JSON 处理模块import json# 导入 UUID 模块,生成唯一会话 IDimport uuid# 导入类型注解模块from typing import Optional, List, Dict, Any# 导入时间模块,记录处理时间import time# 导入正则表达式模块,用于匹配日常问候import re# 导入优化后的问答系统from new_main import IntegratedQASystem3.2 创建 FastAPI 应用并配置
Section titled “3.2 创建 FastAPI 应用并配置”- 创建 FastAPI 实例并配置 CORS 和静态文件服务。
- 功能:初始化 FastAPI 应用,允许跨域请求,挂载静态文件目录。
- 关键点:CORS 配置在生产环境中需限制域名,静态文件服务支持前端页面。
# 创建 FastAPI 应用实例,设置标题和描述app = FastAPI(title="问答系统API", description="集成MySQL和RAG的智能问答系统")
# 配置 CORS 中间件,允许跨域请求app.add_middleware( CORSMiddleware, allow_origins=["*"], # 允许所有来源(生产环境需限制) allow_credentials=True, # 允许凭证 allow_methods=["*"], # 允许所有 HTTP 方法 allow_headers=["*"], # 允许所有头部)
# 创建静态文件目录(如果不存在)os.makedirs("static", exist_ok=True)
# 创建全局问答系统实例qa_system = IntegratedQASystem()3.3 定义日常问候模式
Section titled “3.3 定义日常问候模式”- 定义问候语正则匹配模式和回复。
- 功能:快速响应常见问候,提升用户体验。
- 关键点:使用正则表达式匹配,支持大小写无关匹配。
# 定义日常问候用语模式和回复GREETING_PATTERNS = [ { "pattern": r"^(你好|您好|hi|hello)", # 匹配问候语 "response": "你好!我是智能助手,专注于为学生答疑解惑,很高兴为你服务!" }, { "pattern": r"^(你是谁|您是谁|你叫什么|你的名字|who are you)", # 匹配身份询问 "response": "我是智能助手,你的学习伙伴,致力于提供 IT 教育相关的解答!" }, { "pattern": r"^(在吗|在不在|有人吗)", # 匹配在线确认 "response": "我在!我是智能助手,随时为你解答问题!" }, { "pattern": r"^(干嘛呢|你在干嘛|做什么)", # 匹配状态询问 "response": "我正在待命,随时为你解答 IT 学习相关的问题!有什么我可以帮你的?" }]3.4 定义请求和响应模型
Section titled “3.4 定义请求和响应模型”- 使用 Pydantic 定义请求和响应数据结构。
- 功能:规范 API 输入和输出格式,确保数据验证。
- 关键点:支持可选字段,方便扩展。
# 定义查询请求模型class QueryRequest(BaseModel): query: str # 查询内容,必填 source_filter: Optional[str] = None # 学科过滤,可选 session_id: Optional[str] = None # 会话 ID,可选
# 定义查询响应模型class QueryResponse(BaseModel): answer: str # 答案内容 is_streaming: bool # 是否流式响应 session_id: str # 会话 ID processing_time: float # 处理时间3.5 挂载静态文件服务
Section titled “3.5 挂载静态文件服务”- 挂载静态文件目录并设置根路径。
- 功能:服务前端 HTML 文件,支持交互页面。
- 关键点:根路径重定向到
index.html。
# 挂载静态文件目录,服务前端文件app.mount("/static", StaticFiles(directory="static"), name="static")
# 根路径重定向到 index.html@app.get("/")async def read_root(): return FileResponse("static/index.html")3.6 创建新会话
Section titled “3.6 创建新会话”- (
/api/create_session) 接口 - 功能:生成新的唯一会话 ID。
- 关键点:使用 UUID 确保唯一性。
# 创建新会话接口@app.post("/api/create_session")async def create_session(): session_id = str(uuid.uuid4()) # 生成唯一会话 ID return {"session_id": session_id} # 返回会话 ID3.7 查询历史消息
Section titled “3.7 查询历史消息”- (
/api/history/{session_id}) 接口 - 功能:获取指定会话的历史记录。
- 关键点:异常处理确保接口健壮性。
# 查询历史消息接口@app.get("/api/history/{session_id}")async def get_history(session_id: str): try: # 获取指定会话的历史记录 history = qa_system.get_session_history(session_id) # 返回会话 ID 和历史记录 return {"session_id": session_id, "history": history} except Exception as e: # 抛出 HTTP 异常,包含错误信息 raise HTTPException(status_code=500, detail=f"获取历史记录失败: {str(e)}")3.8 清除历史消息
Section titled “3.8 清除历史消息”- (
/api/history/{session_id}) 接口 - 功能:删除指定会话的所有历史记录。
- 关键点:返回操作状态,异常处理确保健壮性。
# 清除历史消息接口@app.delete("/api/history/{session_id}")async def clear_history(session_id: str): # 清除指定会话的历史记录 success = qa_system.clear_session_history(session_id) if success: # 返回成功状态 return {"status": "success", "message": "历史记录已清除"} else: # 抛出 HTTP 异常 raise HTTPException(status_code=500, detail="清除历史记录失败")3.9 检查日常问候
Section titled “3.9 检查日常问候”- (
check_greeting) 函数 - 功能:检查查询是否匹配日常问候,返回模板化回复。
- 关键点:正则匹配支持大小写无关,简化交互逻辑。
# 检查是否为日常问候用语并返回模板回复def check_greeting(query: str) -> Optional[str]: query_text = query.strip() # 去除首尾空格 for pattern_info in GREETING_PATTERNS: # 使用正则匹配,忽略大小写 if re.match(pattern_info["pattern"], query_text, re.IGNORECASE): return pattern_info["response"] # 返回匹配的回复 return None # 无匹配返回 None3.10 非流式查询接口
Section titled “3.10 非流式查询接口”- (
/api/query) 接口 - 功能:处理非流式查询,支持 MySQL 答案和日常问候,提示使用 WebSocket 获取 RAG 流式答案。
- 关键点:返回 JSON 格式响应,记录处理时间。
# 非流式查询接口@app.post("/api/query")async def query(request: QueryRequest): start_time = time.time() # 记录开始时间 # 使用请求中的 session_id 或生成新 ID session_id = request.session_id or str(uuid.uuid4()) # 检查是否为日常问候 greeting_response = check_greeting(request.query) if greeting_response: # 返回问候回复 return { "answer": greeting_response, "is_streaming": False, "session_id": session_id, "processing_time": time.time() - start_time } # 执行 BM25 搜索 answer, need_rag = qa_system.bm25_search.search(request.query, threshold=0.85) if need_rag: # 需要 RAG,提示使用 WebSocket return { "answer": "请使用WebSocket接口获取流式响应", "is_streaming": True, "session_id": session_id, "processing_time": time.time() - start_time } # 返回 MySQL 答案 return { "answer": answer, "is_streaming": False, "session_id": session_id, "processing_time": time.time() - start_time }3.11 流式查询 WebSocket 接口
Section titled “3.11 流式查询 WebSocket 接口”- (
/api/stream) 接口 - 功能:通过 WebSocket 提供流式查询,支持 RAG 答案、日常问候和错误处理。
- 关键点:发送
start、token、end或error类型的 JSON 消息,异步处理确保实时性。
# 流式查询 WebSocket 接口@app.websocket("/api/stream")async def websocket_endpoint(websocket: WebSocket): await websocket.accept() # 接受 WebSocket 连接 try: while True: # 接收客户端消息 data = await websocket.receive_text() request_data = json.loads(data) # 解析 JSON 数据 # 获取查询参数 query = request_data.get("query") source_filter = request_data.get("source_filter") session_id = request_data.get("session_id", str(uuid.uuid4())) start_time = time.time() # 记录开始时间 # 发送开始标志 if websocket.client_state == websocket.client_state.CONNECTED: await websocket.send_json({ "type": "start", "session_id": session_id }) # 检查是否为日常问候 greeting_response = check_greeting(query) if greeting_response: if websocket.client_state == websocket.client_state.CONNECTED: # 发送问候回复 await websocket.send_json({ "type": "token", "token": greeting_response, "session_id": session_id }) # 发送结束标志 await websocket.send_json({ "type": "end", "session_id": session_id, "is_complete": True, "processing_time": time.time() - start_time }) break # 调用问答系统,流式处理查询 collected_answer = "" for token, is_complete in qa_system.query(query, source_filter=source_filter, session_id=session_id): collected_answer += token # 累积答案 if is_complete and not collected_answer: if websocket.client_state == websocket.client_state.CONNECTED: # 发送结束标志 await websocket.send_json({ "type": "end", "session_id": session_id, "is_complete": True, "processing_time": time.time() - start_time }) break if token and websocket.client_state == websocket.client_state.CONNECTED: # 发送 token 数据 await websocket.send_json({ "type": "token", "token": token, "session_id": session_id }) if is_complete: if websocket.client_state == websocket.client_state.CONNECTED: # 发送结束标志 await websocket.send_json({ "type": "end", "session_id": session_id, "is_complete": True, "processing_time": time.time() - start_time }) break await asyncio.sleep(0.01) # 控制流式输出的速度 except WebSocketDisconnect as e: # 记录 WebSocket 断开信息 print(f"WebSocket disconnected: code={e.code}, reason={e.reason}") except Exception as e: # 记录错误信息 print(f"WebSocket error: {str(e)}") if websocket.client_state == websocket.client_state.CONNECTED: # 发送错误消息 await websocket.send_json({ "type": "error", "error": str(e) }) finally: try: if websocket.client_state == websocket.client_state.CONNECTED: # 关闭 WebSocket 连接 await websocket.close() except Exception as e: # 记录关闭连接时的错误 print(f"Error closing WebSocket: {str(e)}")3.12 健康检查接口
Section titled “3.12 健康检查接口”- (
/health) 接口 - 功能:提供健康检查端点,确认服务状态。
- 关键点:简单可靠,适合监控系统可用性。
# 健康检查接口@app.get("/health")async def health_check(): return {"status": "healthy"} # 返回健康状态3.13 获取学科类别
Section titled “3.13 获取学科类别”- (
/api/sources) 接口 - 功能:返回系统支持的学科类别。
- 关键点:便于前端动态展示过滤选项。
# 获取有效学科类别接口@app.get("/api/sources")async def get_sources(): return {"sources": qa_system.config.VALID_SOURCES} # 返回学科类别列表3.14 启动 FastAPI 服务
Section titled “3.14 启动 FastAPI 服务”- 主程序入口
- 功能:使用 Uvicorn 运行 FastAPI 应用。
- 关键点:配置主机和端口,支持生产环境部署。
# 主程序入口if __name__ == "__main__": import uvicorn # 运行 FastAPI 应用,监听 0.0.0.0:8000 uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=False)本章展示了基于 FastAPI 的 app.py,通过 RESTful API 和 WebSocket 接口实现了高效的问答系统。系统集成了 MySQL FAQ 和 RAG 的查询功能,支持流式输出、对话历史管理和日常问候处理,结合静态文件服务实现了前后端交互,配备健壮的错误处理和日志记录,适合教育场景下的实时问答需求。
发布于 2025-12-31