结合大模型与 MCP 实现自动化工具调用的实践经验

📘 开发记录 · 16 天前 · 30 人浏览
结合大模型与 MCP 实现自动化工具调用的实践经验

最近一直在研究如何将大模型(LLM)与 MCP 协议结合,用于业务处理自动化。
在查阅了大量资料后发现,大多数实现方案主要依赖于已有工具,比如 clientcherrystudio 来实现 MCP 客户端功能,而 MCP Server 通常则通过 JSON 配置文件来进行配置。

因为我这段时间一直在开发基于大模型的 RAG 知识库系统(目前已基本完成),刚好客户提出业务需求,希望通过大模型调用数据库进行数据处理。这种需求本质上与之前使用的 function call 十分类似。

虽然网上已有一些可以跑通的示例代码,但多数都是基于命令行交互的形式进行大模型对话。我结合项目的实际需求,对部分代码进行了优化,最终实现了通过 API 接口 的方式调用 MCP Server,大幅提升了与大模型集成的灵活性。


核心架构拆解

整个实现主要分为两个部分:

1. MCP Server 工具创建(server.py)

这一部分负责定义工具的注册与具体实现逻辑。

from mcp.server.fastmcp import FastMCP
import pymysql


# 初始化 FastMCP
mcp = FastMCP("FileWriter")

@mcp.tool()
def interact_with_mysql_safe(query: str, confirm: bool = False) -> str:
    """
    使用此工具与 MySQL 数据库交互(带确认机制)如果是查询语句可以直接执行不需要用户确认。
    
    参数:
      query: 要执行的 SQL 查询(例如 "INSERT INTO users (name) VALUES ('Alice')")
      confirm: 是否确认执行。对于 INSERT、UPDATE、DELETE 等操作,必须显式设置为 True 才会执行。
    
    返回:
      查询结果或等待确认的提示。
    """
    import re

    try:
        operation = query.strip().split()[0].upper()
        modifying = operation in ("INSERT", "UPDATE", "DELETE")

        if modifying and not confirm:
            return f"检测到可能修改数据的操作 [{operation}],请确认是否执行。请将 confirm 设置为 True 来确认。"

        cursor.execute(query)

        if operation == "SELECT":
            result = cursor.fetchall()
            return f"查询结果:{result}"
        else:
            conn.commit()
            return f"{operation} 操作成功,影响行数:{cursor.rowcount}"

    except Exception as e:
        return f"MySQL 执行失败:{e}"

@mcp.tool()
def list_tables() -> str:
    """
    获取当前数据库中的所有表名,适用于用户不确定可用表名时的探索操作。
    
    返回:
      包含所有表名的列表字符串,用户可据此选择后续查询目标。
    """
    try:
        cursor.execute("SHOW TABLES;")
        tables = cursor.fetchall()
        # 提取表名列表
        table_names = [table[0] for table in tables]
        return f"数据库中的所有表:{table_names}"
    except Exception as e:
        return f"MySQL 查询失败:{e}"

if __name__ == "__main__":
    conn = pymysql.connect(
        host="localhost",
        user="root",
        password="root",
        database="aqsc"
    )
    cursor = conn.cursor()
    mcp.run(transport='stdio')  # 默认使用 stdio 传输

2. MCP Client 启动 API 服务,连接大模型

客户端负责连接 MCP Server,并获取其注册的工具集合。
每次与大模型对话时,将这些工具作为 tools 参数传给大模型。
大模型会根据对话内容自动判断是否需要调用工具,并在确认调用后执行对应操作,随后将结果加工优化后返回给前端页面。

如果业务流程中需要多轮调用,建议实现对话上下文的记录,以帮助大模型更好理解上下文内容,从而提升响应的准确性与连贯性。


大模型支持 MCP 的关键配置

要实现大模型自动调用 MCP 工具,有几个关键点必须注意:

  • 模型本身需要支持 MCP Tool Call(如 OpenAI 或某些兼容模型)
  • 启动模型服务时必须开启工具自动调用支持

启动参数配置:

python3 -m vllm.entrypoints.openai.api_server \
  --model /path/to/model \
  --enable-tools \
  --enable-auto-tool-choice \
  --tool-call-parser openai

或者在部署时,添加以下两个参数也可以:

  • --enable-auto-tool-choice
  • --tool-call-parser openai

MCP Client 启动(client.py)

这个模块用于与 MCP Server 建立连接,并将工具注册到大模型中。
同时它提供了一个 API 接口,让前端通过 HTTP 请求的方式进行对话。

import os
import sys
import asyncio
from typing import Optional, List, Dict
from contextlib import AsyncExitStack
import json
import traceback

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

from openai import OpenAI
from dotenv import load_dotenv
from fastapi.responses import StreamingResponse

load_dotenv()

class MCPClient:
    def __init__(self):
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.openai = OpenAI(api_key="EMPTY", base_url="xxxx")
        self.model = "qwen2.5-32B"

    def get_response(self, messages: list, tools: list):
        response = self.openai.chat.completions.create(
            model=self.model,
            max_tokens=30000,
            messages=messages,
            tools=tools
        )
        return response
    def get_response_stream(self, messages: list, tools: list):
        return self.openai.chat.completions.create(
            model=self.model,
            max_tokens=30000,
            messages=messages,
            tools=tools,
            stream=True  # 开启流式响应
        )
    async def get_tools(self):
        response = await self.session.list_tools()
        return [{
            "type": "function",
            "function": {
                "name": tool.name,
                "description": tool.description,
                "parameters": tool.inputSchema
            }
        } for tool in response.tools]

    async def connect_to_server(self, server_script_path: str):
        command = "python" if server_script_path.endswith(".py") else "node"
        if not server_script_path.endswith((".py", ".js")):
            raise ValueError("服务器脚本必须是 .py 或 .js 文件")

        server_params = StdioServerParameters(command=command, args=[server_script_path])
        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
        await self.session.initialize()

        tools = await self.get_tools()
        print("连接到服务器,工具列表:", [t['function']['name'] for t in tools])

    def truncate_messages(self, messages, max_rounds=10):
        system_msgs = [m for m in messages if m["role"] == "system"]
        others = [m for m in messages if m["role"] != "system"]
        return system_msgs + others[-max_rounds * 3:]

    async def process_query(self, query: str) -> str:
        messages = [{"role": "user", "content": query}]
        available_tools = await self.get_tools()
        print("可用工具:", [t["function"]["name"] for t in available_tools])

        while True:
            response = self.get_response(self.truncate_messages(messages), available_tools)
            message = response.choices[0].message

            if message.tool_calls:
                for tool_call in message.tool_calls:
                    tool_name = tool_call.function.name
                    tool_args = json.loads(tool_call.function.arguments)

                    print(f"[调用工具 {tool_name} 参数: {json.dumps(tool_args, ensure_ascii=False)}]")

                    try:
                        result = await self.session.call_tool(tool_name, tool_args)

                        if hasattr(result, "content"):
                            content = result.content
                            if isinstance(content, list):
                                content = "\n".join(str(c) for c in content)
                            else:
                                content = str(content)
                        else:
                            content = str(result)

                        messages.append({
                            "role": "assistant",
                            "content": f"[调用工具 {tool_name} 参数: {json.dumps(tool_args, ensure_ascii=False)}]"
                        })
                        messages.append({
                            "role": "tool",
                            "tool_call_id": tool_call.id,
                            "name": tool_name,
                            "content": content
                        })
                    except Exception as e:
                        err_msg = f"工具调用失败: {str(e)}"
                        print(err_msg)
                        print(traceback.format_exc())
                        messages.append({
                            "role": "assistant",
                            "content": err_msg
                        })
                        return err_msg
            else:
                return message.content or "(空响应)"

    async def cleanup(self):
        await self.exit_stack.aclose()


class QueryRequest(BaseModel):
    query: str


# FastAPI 实例
app = FastAPI()

client = MCPClient()

@app.on_event("startup")
async def startup():
    server_script_path = "server.py"  # 替换为实际路径
    await client.connect_to_server(server_script_path)


@app.post("/query")
async def query(request: QueryRequest):
    query = request.query
    try:
        response = await client.process_query(query)
        return {"response": response}
    except Exception as e:
        print(f"错误: {str(e)}")
        print(traceback.format_exc())
        raise HTTPException(status_code=500, detail="处理查询时出错")

@app.post("/query_stream")
async def query_stream(request: QueryRequest):
    async def stream_generator():
        messages = [{"role": "user", "content": request.query}]
        available_tools = await client.get_tools()
        stream = client.get_response_stream(client.truncate_messages(messages), available_tools)

        full_response = ""
        tool_calls = []

        try:
            for chunk in stream:
                delta = chunk.choices[0].delta
                if hasattr(delta, "content") and delta.content:
                    full_response += delta.content
                    yield delta.content

                # 工具调用信息(如果是函数调用流)
                if hasattr(delta, "tool_calls") and delta.tool_calls:
                    tool_calls.extend(delta.tool_calls)

            # 如果模型流中包含 tool_calls,执行工具
            for tool_call in tool_calls:
                tool_name = tool_call.function.name
                if hasattr(tool_call.function, "arguments") and tool_call.function.arguments:
                    try:
                        tool_args = json.loads(tool_call.function.arguments)
                    except Exception as e:
                        print("解析 tool_call.arguments 失败", e)
                        continue
                else:
                    continue  # 等待下一轮 token 补全参数
                #tool_args = json.loads(tool_call.function.arguments)
                yield f"\n[调用工具 {tool_name} 参数: {json.dumps(tool_args, ensure_ascii=False)}]\n"

                try:
                    result = await client.session.call_tool(tool_name, tool_args)
                    if hasattr(result, "content"):
                        content = result.content
                        if isinstance(content, list):
                            content = "\n".join(str(c) for c in content)
                        else:
                            content = str(content)
                    else:
                        content = str(result)
                    yield f"{content}\n"
                except Exception as e:
                    err_msg = f"工具调用失败: {str(e)}"
                    yield f"\n{err_msg}\n"

        except Exception as e:
            yield f"\n发生错误:{str(e)}\n"
            print(traceback.format_exc())

    return StreamingResponse(stream_generator(), media_type="text/plain")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8001)

接口调用示例

部署完成后,只需启动 client.py,便可通过如下方式向大模型发起对话请求:

curl -X POST http://localhost:8000/query \
  -H "Content-Type: application/json" \
  -d '{"query": "帮我查询用户表有多少条数据"}'

大模型收到请求后会自动判断是否需要调用 MCP 工具,并执行对应逻辑。
如果查询任务较复杂,可能会经历多轮对话,但整个过程是自动化的,极大提升了效率和体验。


以上就是我将大模型与 MCP 协议结合的完整实践过程。希望对你在实现自动化工具调用、大模型应用落地时有所帮助!

Under CC BY NC-SA License.
Powered by Typecho | Theme by Jasmine
您是第 23835 位访客