MCP服务
3.5 MCP服务
Section titled “3.5 MCP服务”- 掌握天气MCP服务器
- 掌握票务MCP服务器
- 掌握订票MCP服务器
一、天气MCP服务器
Section titled “一、天气MCP服务器”mcp_weather_server天气 MCP 服务器,提供 weather_data 表的 SELECT 查询接口,返回 JSON 格式结果。
核心功能:
-
初始化 MySQL 数据库连接。
-
执行 SELECT 查询,返回 JSON 格式结果。
-
格式化日期和数值字段,确保 JSON 序列化兼容。
-
通过 FastAPI 提供 HTTP 接口,响应 MCP 工具调用。
1 格式编码
Section titled “1 格式编码”format.py中包含一个编码器方法和JSON编码器类。
目标:定义编码器方法,用于格式化单个对象;自定义 JSON 编码器,处理 MySQL 查询结果中的非标准类型。
功能:将 MySQL查询结果中的date、datetime、timedelta 和 Decimal 类型转换为 JSON 兼容的字符串或数值。
位置:SmartVoyage/utils/format.py
import jsonfrom datetime import date, datetime, timedeltafrom decimal import Decimal
def default_encoder(obj): # 定义编码器方法,用于格式化单个对象 if isinstance(obj, datetime): # 检查是否为datetime,返回带时间的格式化字符串 return obj.strftime('%Y-%m-%d %H:%M:%S') if isinstance(obj, date): # 检查是否为date,返回日期格式化字符串 return obj.strftime('%Y-%m-%d') if isinstance(obj, timedelta): # 检查是否为timedelta,转换为字符串 return str(obj) if isinstance(obj, Decimal): # 检查是否为Decimal,转换为浮点数 return float(obj) return obj # 否则返回原对象
# 定义自定义JSON编码器类,继承自json.JSONEncoder,用于处理非标准类型序列化class DateEncoder(json.JSONEncoder): def default(self, obj): # 重写default方法,处理序列化时的默认对象转换 if isinstance(obj, (date, datetime)): # 检查对象是否为date或datetime类型,对于datetime返回带时间的字符串,对于date返回日期字符串 return obj.strftime('%Y-%m-%d %H:%M:%S') if isinstance(obj, datetime) else obj.strftime('%Y-%m-%d') if isinstance(obj, timedelta): # 检查对象是否为timedelta类型,将时间差转换为字符串 return str(obj) if isinstance(obj, Decimal): # 检查对象是否为Decimal类型,将Decimal转换为浮点数以兼容JSON return float(obj) return super().default(obj) # 对于其他类型,调用父类默认方法测试:
if __name__ == '__main__': print(default_encoder(datetime(2025, 8, 11, 8, 0))) print(default_encoder(date(2025, 8, 11))) print(default_encoder(timedelta(days=1))) print(default_encoder(Decimal('123.45'))) print('*'*80)
encoder = DateEncoder() print(encoder.default(datetime(2025, 8, 11, 8, 0))) print(encoder.default(date(2025, 8, 11))) print(encoder.default(timedelta(days=1))) print(encoder.default(Decimal('123.45')))2 WeatherService类
Section titled “2 WeatherService类”目标:提供天气数据查询服务,响应代理的 SQL 请求。
功能:初始化 MySQL 连接,执行 SELECT 查询,格式化结果为 JSON。
位置:SmartVoyage/mcp_server/mcp_weather_server.py
import mysql.connectorimport jsonfrom datetime import date, datetime, timedeltafrom decimal import Decimalfrom mcp.server.fastmcp import FastMCP
from SmartVoyage.config import Configfrom SmartVoyage.create_logger import loggerfrom SmartVoyage.utils.format import DateEncoder, default_encoder
conf = Config()
# 天气服务类class WeatherService: # 定义天气服务类,封装数据库操作逻辑 def __init__(self): # 连接数据库 self.conn = mysql.connector.connect( host=conf.host, user=conf.user, password=conf.password, database=conf.database )
# 定义执行SQL查询方法,输入SQL字符串,返回JSON字符串 def execute_query(self, sql: str) -> str: try: cursor = self.conn.cursor(dictionary=True) cursor.execute(sql) results = cursor.fetchall() cursor.close() # 格式化结果 for result in results: # 遍历每个结果字典 for key, value in result.items(): if isinstance(value, (date, datetime, timedelta, Decimal)): # 检查值是否为特殊类型 result[key] = default_encoder(value) # 使用自定义编码器格式化该值 # 序列化为JSON,如果有结果返回success,否则no_data;使用DateEncoder,非ASCII不转义 return json.dumps({"status": "success", "data": results} if results else {"status": "no_data", "message": "未找到天气数据,请确认城市和日期。"}, cls=DateEncoder, ensure_ascii=False) except Exception as e: logger.error(f"天气查询错误: {str(e)}") # 返回错误JSON响应 return json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False)测试:
if __name__ == "__main__": service = WeatherService() sql = "SELECT * FROM weather_data WHERE city='上海' limit 2" print(service.execute_query(sql))3 启动MCP服务器
Section titled “3 启动MCP服务器”create_weather_mcp_server()函数
目标:创建并启动天气 MCP 服务器。
功能:初始化 FastMCP,注册 query_weather 工具,启动 FastAPI 服务器,监听端口 6001。
# 创建天气MCP服务器def create_weather_mcp_server(): # 创建FastMCP实例 weather_mcp = FastMCP(name="WeatherTools", instructions="天气查询工具,基于 weather_data 表。", log_level="ERROR", host="127.0.0.1", port=8002)
# 实例化天气服务对象 service = WeatherService()
@weather_mcp.tool( name="query_weather", description="查询天气数据,输入 SQL,如 'SELECT * FROM weather_data WHERE city = \"北京\" AND fx_date = \"2025-07-30\"'" ) def query_weather(sql: str) -> str: logger.info(f"执行天气查询: {sql}") return service.execute_query(sql)
# 打印服务器信息 logger.info("=== 天气MCP服务器信息 ===") logger.info(f"名称: {weather_mcp.name}") logger.info(f"描述: {weather_mcp.instructions}")
# 运行服务器 try: print("服务器已启动,请访问 http://127.0.0.1:8002/mcp") weather_mcp.run(transport="streamable-http") # 使用 streamable-http 传输方式 except Exception as e: print(f"服务器启动失败: {e}")客户端测试:
位置:SmartVoyage/test/test_weather_mcp_server.py
import asyncioimport json
from langchain_mcp_adapters.tools import load_mcp_toolsfrom mcp import ClientSessionfrom mcp.client.streamable_http import streamablehttp_client
# 定义服务器地址server_url = "http://127.0.0.1:8002/mcp"
async def test_weather_mcp(): try: # 启动 MCP server,通过streamable建立连接 async with streamablehttp_client(server_url) as (read, write, _): # 使用读写通道创建 MCP 会话 async with ClientSession(read, write) as session: try: await session.initialize() print("会话初始化成功,可以开始调用工具。")
# 从 session 自动获取 MCP server 提供的工具列表。 tools = await load_mcp_tools(session) print(f"tools-->{tools}")
# 测试1: 查询指定日期天气 sql = "SELECT * FROM weather_data WHERE city = '北京' AND fx_date = '2025-10-28'" result = await session.call_tool("query_weather", {"sql": sql}) result_data = json.loads(result) if isinstance(result, str) else result print(f"指定日期天气结果:{result_data}")
# 测试2: 查询未来3天天气 sql_range = "SELECT * FROM weather_data WHERE city = '北京' AND fx_date BETWEEN '2025-10-28' AND '2025-10-30'" result_range = await session.call_tool("query_weather", {"sql": sql_range}) result_range_data = json.loads(result_range) if isinstance(result_range, str) else result_range print(f"天气范围查询结果:{result_range_data}") except Exception as e: print(f"天气 MCP 测试出错:{str(e)}") except Exception as e: print(f"连接或会话初始化时发生错误: {e}") print("请确认服务端脚本已启动并运行在 http://127.0.0.1:8002/mcp")
if __name__ == "__main__": asyncio.run(test_weather_mcp())二、票务MCP服务器
Section titled “二、票务MCP服务器”mcp_ticket_server.py:票务 MCP 服务器,提供 train_tickets、flight_tickets 和 concert_tickets 表的 SELECT 查询接口,返回 JSON 格式结果。
核心功能:
- 初始化 MySQL 数据库连接。
- 执行 SELECT 查询,返回 JSON 格式结果。
- 格式化日期和数值字段,确保 JSON 序列化兼容。
- 通过 FastAPI 提供 HTTP 接口,响应 MCP 工具调用。
1 TicketService类
Section titled “1 TicketService类”目标:提供票务数据查询服务,响应代理的 SQL 请求。
功能:初始化 MySQL 连接,执行 SELECT 查询,格式化结果为 JSON。
位置:SmartVoyage/mcp_server/mcp_ticket_server.py
import mysql.connectorimport jsonfrom datetime import date, datetime, timedeltafrom decimal import Decimalfrom mcp.server.fastmcp import FastMCP
from SmartVoyage.config import Configfrom SmartVoyage.create_logger import loggerfrom SmartVoyage.utils.format import DateEncoder, default_encoder
conf = Config()
# 票务服务类class TicketService: # 定义票务服务类,封装数据库操作逻辑 def __init__(self): # 初始化方法,建立数据库连接 # 连接数据库 self.conn = mysql.connector.connect( host=conf.host, user=conf.user, password=conf.password, database=conf.database )
# 定义执行SQL查询方法,输入SQL字符串,返回JSON字符串 def execute_query(self, sql: str) -> str: try: cursor = self.conn.cursor(dictionary=True) cursor.execute(sql) results = cursor.fetchall() cursor.close() # 格式化结果 for result in results: # 遍历每个结果字典 for key, value in result.items(): if isinstance(value, (date, datetime, timedelta, Decimal)): # 检查值是否为特殊类型 result[key] = default_encoder(value) # 使用自定义编码器格式化该值 # 序列化为JSON,如果有结果返回success,否则no_data;使用DateEncoder,非ASCII不转义 return json.dumps({"status": "success", "data": results} if results else {"status": "no_data", "message": "未找到票务数据,请确认查询条件。"}, cls=DateEncoder, ensure_ascii=False) except Exception as e: logger.error(f"票务查询错误: {str(e)}") # 返回错误JSON响应 return json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False)测试:
if __name__ == "__main__": service = TicketService() sql = "SELECT * FROM flight_tickets WHERE departure_city = '上海' AND arrival_city = '北京' AND DATE(departure_time) = '2025-10-28' AND cabin_type = '公务舱'" print(service.execute_query(sql))2 启动MCP 服务器
Section titled “2 启动MCP 服务器”create_ticket_mcp_server()函数
目标:创建并启动票务 MCP 服务器。
功能:初始化 FastMCP,注册 query_tickets 工具,启动 FastAPI 服务器,监听端口 6002。
# 创建票务MCP服务器def create_ticket_mcp_server(): # 创建FastMCP实例 ticket_mcp = FastMCP(name="TicketTools", instructions="票务查询工具,基于 train_tickets, flight_tickets, concert_tickets 表。只支持查询。", log_level="ERROR", host="127.0.0.1", port=8001)
# 实例化票务服务对象 service = TicketService()
@ticket_mcp.tool( name="query_tickets", description="查询票务数据,输入 SQL,如 'SELECT * FROM train_tickets WHERE departure_city = \"北京\" AND arrival_city = \"上海\"'" ) def query_tickets(sql: str) -> str: logger.info(f"执行票务查询: {sql}") return service.execute_query(sql)
# 打印服务器信息 logger.info("=== 票务MCP服务器信息 ===") logger.info(f"名称: {ticket_mcp.name}") logger.info(f"描述: {ticket_mcp.instructions}")
# 运行服务器 try: print("服务器已启动,请访问 http://127.0.0.1:8001/mcp") ticket_mcp.run(transport="streamable-http") # 使用 streamable-http 传输方式 except Exception as e: print(f"服务器启动失败: {e}")客户端测试:
位置:SmartVoyage/test/test_ticket_mcp_server.py
import asyncioimport json
from langchain_mcp_adapters.tools import load_mcp_toolsfrom mcp import ClientSessionfrom mcp.client.streamable_http import streamablehttp_client
# 定义服务器地址server_url = "http://127.0.0.1:8001/mcp"
async def test_ticket_mcp(): try: # 启动 MCP server,通过streamable建立连接 async with streamablehttp_client(server_url) as (read, write, _): # 使用读写通道创建 MCP 会话 async with ClientSession(read, write) as session: try: await session.initialize() print("会话初始化成功,可以开始调用工具。")
# 从 session 自动获取 MCP server 提供的工具列表。 tools = await load_mcp_tools(session) print(f"tools-->{tools}")
# 调用远程工具 # 测试1: 查询机票 sql_flights = "SELECT * FROM flight_tickets WHERE departure_city = '上海' AND arrival_city = '北京' AND DATE(departure_time) = '2025-10-28' AND cabin_type = '公务舱'" result_flights = await session.call_tool("query_tickets", {"sql": sql_flights}) result_flights_data = json.loads(result_flights) if isinstance(result_flights, str) else result_flights print(f"机票查询结果:{result_flights_data}")
# 测试2: 查询火车票 sql_trains = "SELECT * FROM train_tickets WHERE departure_city = '北京' AND arrival_city = '上海' AND DATE(departure_time) = '2025-10-22' AND seat_type = '二等座'" result_trains = await session.call_tool("query_tickets", {"sql": sql_trains}) result_trains_data = json.loads(result_trains) if isinstance(result_trains, str) else result_trains print(f"火车票查询结果:{result_trains_data}")
# 测试3: 查询演唱会票 sql_concerts = "SELECT * FROM concert_tickets WHERE city = '北京' AND artist = '刀郎' AND DATE(start_time) = '2025-10-31' AND ticket_type = '看台'" result_concerts = await session.call_tool("query_tickets", {"sql": sql_concerts}) result_concerts_data = json.loads(result_concerts) if isinstance(result_concerts, str) else result_concerts print(f"演唱会票查询结果:{result_concerts_data}") except Exception as e: print(f"票务 MCP 测试出错:{str(e)}") except Exception as e: print(f"连接或会话初始化时发生错误: {e}") print("请确认服务端脚本已启动并运行在 http://127.0.0.1:8001/mcp")
if __name__ == "__main__": asyncio.run(test_ticket_mcp())三、订票MCP服务器
Section titled “三、订票MCP服务器”mcp_order_server.py:订票 MCP 服务器,通过调用API完成火车票、飞机票和演唱会票的预定。
核心功能:
- 火车票预定、飞机票预定、演出票预定。
- 通过 FastAPI 提供 HTTP 接口,响应 MCP 工具调用。
位置:SmartVoyage/mcp_server/mcp_order_server.py
from mcp.server.fastmcp import FastMCP
from SmartVoyage.config import Configfrom SmartVoyage.create_logger import logger
conf = Config()
# 创建FastMCP实例order_mcp = FastMCP(name="OrderTools", instructions="票务预定工具,通过调用API完成火车票、飞机票和演唱会票的预定。", log_level="ERROR", host="127.0.0.1", port=8003)
@order_mcp.tool( name="order_train", description="根据时间、车次、座位类型、数量预定火车票")def order_train(departure_date: str, train_number: str, seat_type: str, number: int) -> str: ''' Args: departure_date (str): 出发日期,如 '2025-10-30' train_number (str): 火车车次,如 'G346' seat_type (str): 座位类型,如 '二等座' number (int): 订购张数 ''' logger.info(f"正在订购火车票: {departure_date}, {train_number}, {seat_type}, {number}") logger.info(f"恭喜,火车票预定成功!") return "恭喜,火车票预定成功!"
@order_mcp.tool( name="order_flight", description="根据时间、班次、座位类型、数量预定飞机票")def order_flight(departure_date: str, flight_number: str, seat_type: str, number: int) -> str: ''' Args: departure_date (str): 出发日期,如 '2025-10-30' flight_number (str): 飞机班次,如 'CA6557' seat_type (str): 座位类型,如 '经济舱' number (int): 订购张数 ''' logger.info(f"正在订购飞机票: {departure_date}, {flight_number}, {seat_type}, {number}") logger.info(f"恭喜,飞机票预定成功!") return "恭喜,飞机票预定成功!"
@order_mcp.tool( name="order_concert", description="根据时间、明星、场地、座位类型、数量预定演出票")def order_concert(start_date: str, aritist: str, venue: str, seat_type: str, number: int) -> str: ''' Args: start_date (str): 开始日期,如 '2025-10-30' aritist (str): 明星,如 '刀郎' venue (str): 场地,如 '上海体育馆' seat_type (str): 座位类型,如 '看台' number (int): 订购张数 ''' logger.info(f"正在订购演出票: {start_date}, {aritist}, {venue}, {seat_type}, {number}") logger.info(f"恭喜,演出票预定成功!") return "恭喜,演出票预定成功!"
# 创建票务预定MCP服务器def create_order_mcp_server(): # 打印服务器信息 logger.info("=== 票务预定MCP服务器信息 ===") logger.info(f"名称: {order_mcp.name}") logger.info(f"描述: {order_mcp.instructions}")
# 运行服务器 try: print("服务器已启动,请访问 http://127.0.0.1:8003/mcp") order_mcp.run(transport="streamable-http") # 使用 streamable-http 传输方式 except Exception as e: print(f"服务器启动失败: {e}")
if __name__ == "__main__": # 调用创建服务器函数 create_order_mcp_server()客户端测试:
位置:SmartVoyage/test/test_order_mcp_server.py
import asyncioimport json
from langchain.agents import create_tool_calling_agent, AgentExecutorfrom langchain_core.prompts import ChatPromptTemplatefrom langchain_mcp_adapters.tools import load_mcp_toolsfrom langchain_openai import ChatOpenAIfrom mcp import ClientSessionfrom mcp.client.streamable_http import streamablehttp_client
from SmartVoyage.config import Configfrom SmartVoyage.create_logger import logger
conf = Config()
# 初始化LLMllm = ChatOpenAI( model=conf.model_name, base_url=conf.base_url, api_key=conf.api_key, temperature=0.1)
async def order_tickets(query): try: # 启动 MCP server,通过streamable建立连接 async with streamablehttp_client("http://127.0.0.1:8003/mcp") as (read, write, _): # 使用读写通道创建 MCP 会话 async with ClientSession(read, write) as session: try: await session.initialize()
# 从 session 自动获取 MCP server 提供的工具列表。 tools = await load_mcp_tools(session) # print(f"tools-->{tools}")
# 创建 agent 的提示模板 prompt = ChatPromptTemplate.from_messages([ ("system", "你是一个票务预定助手,能够调用工具来完成火车票、飞机票或演出票的预定。你需要仔细分析工具需要的参数,然后从用户提供的信息中提取信息。如果用户提供的信息不足以提取到调用工具所有必要参数,则向用户追问,以获取该信息。不能自己编撰参数。"), ("human", "{input}"), ("placeholder", "{agent_scratchpad}"), ])
# 构建工具调用代理 agent = create_tool_calling_agent(llm, tools, prompt)
# 创建代理执行器 agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
# 代理调用 response = await agent_executor.ainvoke({"input": query})
return response['output'] except Exception as e: logger.info(f"票务 MCP 测试出错:{str(e)}") return f"票务 MCP 查询出错:{str(e)}" except Exception as e: logger.error(f"连接或会话初始化时发生错误: {e}") return "连接或会话初始化时发生错误"
if __name__ == "__main__": while True: query = input("请输入查询:") if query == "exit": break print(asyncio.run(order_tickets(query)))本节主要描述了smartVoyage项目用到的所有MCP服务。