Skip to content

Day 5:MCP 服务化——让你的审查能力被任何 IDE 调用

昨天的问题

前四天的代码审查能力很完善了——读代码、写报告、并发审、缓存优化。但跟

项目二一样,它们只在你自己的终端里跑。 天的问题前四天的代码审查能力很完善了——读代码、写报告、并发审、缓存优化。但跟项目二一样,它们只在你自己的终端里跑。

IDE 能不能调你的审查服务?GitHub CI 能不能在每次 PR 时自动跑?你的同事能不能在不装 Python 环境的情况下用?

答案又是 MCP。

今天做什么

把前四天的审查能力包装成 MCP Server。三个工具:

  1. review_file:审查一个文件
  2. review_project:审查一个目录下所有 Python 文件
  3. review_snippet:审查一段代码片段(不需要保存为文件就能审)

你需要安装

bash
pip install mcp

⚠️ 注意:MCP Python SDK 仍在快速迭代。本书代码基于 0.x 版本 API。如果安装后报 ImportError,请尝试 pip install mcp==0.9.4 锁定版本。

📁 代码目录

code_assistant/
├── sample_code.py
├── day1.py
├── day2.py
├── day3.py
├── day4.py
├── code_reviewer_server.py     ← 新增!MCP审查服务,70行
└── test_module/
    ├── calc.py
    ├── utils.py
    └── helpers.py

代码

python
# code_reviewer_server.py — MCP 版本的代码审查服务

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import asyncio
import os
from pathlib import Path

# ===== 审查逻辑(前四天的精华)=====
def review_code(code: str) -> list[str]:
    """
    核心审查函数——对一段 Python 代码做快速规则检查。
    返回问题描述列表。
    """
    issues = []
    
    # 1. 文档字符串检查
    for line in code.split("\n"):
        line_stripped = line.strip()
        if line_stripped.startswith("def "):
            func_name = line_stripped.split("(")[0].replace("def ", "").strip()
            issues.append(f"🟡 [{func_name}] 建议添加文档字符串(docstring)")
    
    # 2. 除零风险
    if any(w in code for w in ["/ 0", "/0"]):
        issues.append("🔴 [安全] 存在除零风险,建议添加参数校验")
    
    # 3. 文件未关闭
    if "open(" in code and "with " not in code:
        issues.append("🔴 [安全] open() 未使用 with 语句,可能导致文件句柄泄漏")
    
    # 4. 裸 except
    if "except:" in code:
        issues.append("🟡 [规范] 不建议使用裸 except,请指定异常类型")
    
    # 5. 可变默认参数
    if "def " in code and ("=[]" in code or "={}" in code):
        issues.append("🔴 [陷阱] 使用可变对象作为默认参数([] 或 {}),会导致意外的状态共享")
    
    return issues

# ===== 创建 MCP Server =====
server = Server("code-reviewer")

@server.list_tools()
async def list_tools():
    """注册审查工具"""
    return [
        Tool(
            name="review_snippet",
            description="审查一段 Python 代码片段,返回代码质量问题列表",
            inputSchema={
                "type": "object",
                "properties": {
                    "code": {
                        "type": "string",
                        "description": "待审查的 Python 代码"
                    }
                },
                "required": ["code"]
            }
        ),
        Tool(
            name="review_file",
            description="审查一个 Python 文件,返回代码质量报告",
            inputSchema={
                "type": "object",
                "properties": {
                    "filepath": {
                        "type": "string",
                        "description": "Python 文件的绝对路径"
                    }
                },
                "required": ["filepath"]
            }
        ),
        Tool(
            name="review_project",
            description="审查一个目录下所有 Python 文件,返回汇总报告",
            inputSchema={
                "type": "object",
                "properties": {
                    "project_dir": {
                        "type": "string",
                        "description": "项目目录的绝对路径"
                    }
                },
                "required": ["project_dir"]
            }
        ),
    ]

@server.call_tool()
async def call_tool(name: str, args: dict):
    """处理工具调用"""
    
    # --- 工具1:审查代码片段 ---
    if name == "review_snippet":
        code = args.get("code", "")
        if not code.strip():
            return [TextContent(type="text", text="❌ 代码片段为空,请提供有效的 Python 代码")]
        
        issues = review_code(code)
        
        if not issues:
            report = "✅ 未发现明显问题,代码质量不错!"
        else:
            lines = [f"🔍 代码审查报告(共 {len(issues)} 个问题)", ""]
            for i, issue in enumerate(issues, 1):
                lines.append(f"{i}. {issue}")
            report = "\n".join(lines)
        
        return [TextContent(type="text", text=report)]
    
    # --- 工具2:审查文件 ---
    elif name == "review_file":
        filepath = args.get("filepath", "")
        
        if not os.path.exists(filepath):
            return [TextContent(type="text", text=f"❌ 文件不存在: {filepath}")]
        
        if not filepath.endswith(".py"):
            return [TextContent(type="text", text="⚠️ 目前只支持 Python (.py) 文件审查")]
        
        with open(filepath, "r", encoding="utf-8") as f:
            code = f.read()
        
        issues = review_code(code)
        filename = os.path.basename(filepath)
        score = max(30, 100 - len(issues) * 10)
        
        lines = [
            f"📄 文件审查: {filename}",
            f"   总行数: {len(code.splitlines())}",
            f"   评分: {score}/100",
            f"   问题数: {len(issues)}",
            f"",
        ]
        
        for i, issue in enumerate(issues, 1):
            lines.append(f"{i}. {issue}")
        
        return [TextContent(type="text", text="\n".join(lines))]
    
    # --- 工具3:审查项目 ---
    elif name == "review_project":
        project_dir = args.get("project_dir", "")
        
        if not os.path.isdir(project_dir):
            return [TextContent(type="text", text=f"❌ 目录不存在: {project_dir}")]
        
        py_files = list(Path(project_dir).glob("*.py"))
        
        if not py_files:
            return [TextContent(type="text", text=f"ℹ️ 目录 {project_dir} 中未找到 Python 文件")]
        
        lines = [
            f"📊 项目审查报告",
            f"   目录: {project_dir}",
            f"   文件数: {len(py_files)}",
            f"",
        ]
        
        total_issues = 0
        total_score = 0
        
        for filepath in py_files:
            with open(filepath, "r", encoding="utf-8") as f:
                code = f.read()
            
            issues = review_code(code)
            score = max(30, 100 - len(issues) * 10)
            total_issues += len(issues)
            total_score += score
            
            lines.append(f"   📄 {filepath.name}: {score}分 ({len(issues)}个问题)")
        
        avg_score = total_score // len(py_files) if py_files else 0
        lines.extend([
            f"",
            f"   平均分: {avg_score}/100",
            f"   总问题数: {total_issues}",
        ])
        
        if avg_score >= 80:
            lines.append(f"   ✅ 整体质量良好")
        elif avg_score >= 60:
            lines.append(f"   ⚠️ 建议修复 high 级别问题")
        else:
            lines.append(f"   🔴 存在较多问题,建议重点审查")
        
        return [TextContent(type="text", text="\n".join(lines))]
    
    return [TextContent(type="text", text="未知工具")]

async def main():
    async with stdio_server() as (read_stream, write_stream):
        await server.run(read_stream, write_stream, server.create_initialization_options())

if __name__ == "__main__":
    asyncio.run(main())

测试

MCP 客户端配置:

json
{
  "mcpServers": {
    "code-reviewer": {
      "command": "python",
      "args": ["code_reviewer_server.py"],
      "cwd": "/path/to/code_assistant/"
    }
  }
}

如果你的 IDE 或工具支持 MCP,配置完后就能看到三个审查工具——直接审查你当前打开的文件,或者一次性审查整个项目。

你学到了什么

三个 MCP Server,同一个模式。 项目一的客服工具、项目二的文档分析工具、项目三的代码审查工具——全是用 @server.list_tools() 注册工具列表、@server.call_tool() 实现工具逻辑。唯一的区别是工具名字和实现了什么。你现在"肌肉记忆"了 MCP Server 的结构。

审查标准是你的 Prompt。 今天的审查规则(除零、文件关闭、裸 except、可变默认参数)是硬编码在 review_code() 里的。在真实 LLM 驱动的审查中,这些规则变成你 Prompt 里的自然语言指令。核心不变:你定义标准,Agent 执行审查。

review_snippet 是"零成本试用入口"。 不需要保存文件、不需要改项目结构——直接把代码粘贴过来,秒出结果。产品思维:降低使用门槛,让用户先感受到价值,再引导到文件和项目审查。

明天的预告

最后一天。用 Streamlit 做一个代码审查面板——左边是代码编辑器,右边是审查结果。上传项目、选中文件、一键审查。最后做项目的完整总结。


Day 5 完成。你的代码审查能力现在是 MCP 标准服务——可以被 Cursor、Claude、任何 MCP 客户端调用了。