第8章:MCP扩展功能 - 详细说明与代码案例
目录
一、MCP(Model Context Protocol)简介
什么是MCP?
核心概念
工作原理
二、内置MCP工具使用
1. 文件系统操作
2. 网络请求工具
3. 代码分析工具
三、第三方MCP集成
1. GitHub集成示例
2. Docker集成示例
四、自定义MCP工具开发
1. 基础MCP工具开发
2. 天气查询MCP工具
3. 数据库MCP工具
五、工具链集成
1. Git集成工具
2. Docker Compose集成
六、数据库操作扩展
1. ORM集成MCP工具
2. Redis缓存MCP
七、API调用工具
1. REST API客户端MCP
2. 多API聚合工具
3. 身份验证与API密钥管理
部署与配置示例
MCP服务器配置
启动脚本
最佳实践建议
1. 工具设计原则
2. 安全最佳实践
3. 性能优化
一、MCP(Model Context Protocol)简介
什么是MCP?
MCP(模型上下文协议) 是Anthropic开发的一个开放协议,允许Claude与外部工具、服务和数据进行安全交互,扩展AI的能力边界。
核心概念
// MCP基本架构示例
class MCPProtocol {
constructor() {
this.tools = new Map(); // 工具注册表
this.resources = new Map(); // 资源管理器
this.prompts = new Map(); // 提示模板库
}
// 工具定义结构
registerTool(name, toolConfig) {
this.tools.set(name, {
name,
description: toolConfig.description,
parameters: toolConfig.parameters,
handler: toolConfig.handler
});
}
}
工作原理
用户请求 → Claude分析 → 调用MCP工具 → 工具执行 → 结果返回 → Claude处理 → 回复用户
二、内置MCP工具使用
1. 文件系统操作
# 文件读写工具示例
class FileSystemMCP:
"""内置文件系统MCP工具"""
@mcp_tool(
name="read_file",
description="读取文件内容",
parameters={
"path": {
"type": "string",
"description": "文件路径"
},
"encoding": {
"type": "string",
"default": "utf-8"
}
}
)
async def read_file(self, path: str, encoding: str = "utf-8") -> str:
"""读取文件内容"""
try:
with open(path, 'r', encoding=encoding) as f:
return f.read()
except Exception as e:
return f"读取文件失败: {str(e)}"
@mcp_tool(
name="write_file",
description="写入文件内容",
parameters={
"path": {"type": "string"},
"content": {"type": "string"},
"mode": {
"type": "string",
"enum": ["w", "a", "wb"],
"default": "w"
}
}
)
async def write_file(self, path: str, content: str, mode: str = "w") -> dict:
"""写入文件"""
try:
with open(path, mode, encoding='utf-8') as f:
f.write(content)
return {"success": True, "path": path, "bytes": len(content)}
except Exception as e:
return {"success": False, "error": str(e)}
2. 网络请求工具
// HTTP请求MCP工具
const HttpMCP = {
tools: {
fetch_url: {
description: "发送HTTP请求获取数据",
parameters: {
url: { type: "string", required: true },
method: {
type: "string",
enum: ["GET", "POST", "PUT", "DELETE", "PATCH"],
default: "GET"
},
headers: { type: "object" },
body: { type: "object" },
timeout: { type: "number", default: 10000 }
},
handler: async ({ url, method = "GET", headers, body, timeout }) => {
try {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeout);
const response = await fetch(url, {
method,
headers: {
'Content-Type': 'application/json',
...headers
},
body: body ? JSON.stringify(body) : undefined,
signal: controller.signal
});
clearTimeout(timeoutId);
const data = await response.json();
return {
status: response.status,
statusText: response.statusText,
headers: Object.fromEntries(response.headers.entries()),
data
};
} catch (error) {
return { error: error.message };
}
}
}
}
};
3. 代码分析工具
# 代码分析MCP工具
import ast
import json
class CodeAnalysisMCP:
"""代码静态分析工具"""
@mcp_tool(
name="analyze_code",
description="分析代码结构和质量",
parameters={
"code": {"type": "string", "required": True},
"language": {
"type": "string",
"enum": ["python", "javascript", "java", "go"],
"default": "python"
},
"checks": {
"type": "array",
"items": {"type": "string"},
"default": ["complexity", "security", "style"]
}
}
)
async def analyze_code(self, code: str, language: str = "python", checks: list = None) -> dict:
"""分析代码质量"""
if checks is None:
checks = ["complexity", "security", "style"]
analysis_results = {
"language": language,
"lines_of_code": len(code.split('
')),
"checks": {}
}
if language == "python":
# Python代码分析
try:
tree = ast.parse(code)
# 复杂度分析
if "complexity" in checks:
complexity = self._analyze_complexity(tree)
analysis_results["checks"]["complexity"] = complexity
# 安全问题检查
if "security" in checks:
security_issues = self._check_security_issues(tree)
analysis_results["checks"]["security"] = security_issues
# 代码风格检查
if "style" in checks:
style_issues = self._check_style_issues(code)
analysis_results["checks"]["style"] = style_issues
except SyntaxError as e:
analysis_results["error"] = f"语法错误: {str(e)}"
return analysis_results
def _analyze_complexity(self, tree):
"""分析代码复杂度"""
# 实现复杂度分析逻辑
return {"cyclomatic_complexity": 5, "maintainability_index": 85}
def _check_security_issues(self, tree):
"""检查安全问题"""
issues = []
# 检查eval、exec等不安全函数
for node in ast.walk(tree):
if isinstance(node, ast.Call):
if hasattr(node.func, 'id') and node.func.id in ['eval', 'exec', 'compile']:
issues.append({
"type": "security",
"message": f"发现不安全函数调用: {node.func.id}",
"severity": "high"
})
return issues
三、第三方MCP集成
1. GitHub集成示例
// GitHub MCP集成
import { Octokit } from "@octokit/rest";
class GitHubMCP {
private octokit: Octokit;
constructor(token: string) {
this.octokit = new Octokit({ auth: token });
}
@mcp_tool({
name: "github_search_repos",
description: "搜索GitHub仓库",
parameters: {
query: { type: "string", required: true },
language: { type: "string" },
sort: {
type: "string",
enum: ["stars", "forks", "updated"],
default: "stars"
},
per_page: { type: "number", default: 10 }
}
})
async searchRepos(params: {
query: string;
language?: string;
sort?: string;
per_page?: number;
}) {
const { query, language, sort = "stars", per_page = 10 } = params;
let searchQuery = query;
if (language) {
searchQuery += ` language:${language}`;
}
const response = await this.octokit.search.repos({
q: searchQuery,
sort,
per_page,
order: "desc"
});
return response.data.items.map(repo => ({
name: repo.name,
full_name: repo.full_name,
description: repo.description,
stars: repo.stargazers_count,
forks: repo.forks_count,
url: repo.html_url,
language: repo.language
}));
}
@mcp_tool({
name: "github_create_issue",
description: "创建GitHub Issue",
parameters: {
owner: { type: "string", required: true },
repo: { type: "string", required: true },
title: { type: "string", required: true },
body: { type: "string" },
labels: { type: "array", items: { type: "string" } }
}
})
async createIssue(params: {
owner: string;
repo: string;
title: string;
body?: string;
labels?: string[];
}) {
const response = await this.octokit.issues.create({
owner: params.owner,
repo: params.repo,
title: params.title,
body: params.body || "",
labels: params.labels || []
});
return {
number: response.data.number,
url: response.data.html_url,
title: response.data.title,
state: response.data.state
};
}
}
2. Docker集成示例
# Docker MCP集成
import docker
from typing import List, Dict, Optional
class DockerMCP:
def __init__(self):
self.client = docker.from_env()
@mcp_tool(
name="docker_list_containers",
description="列出Docker容器",
parameters={
"all": {"type": "boolean", "default": False},
"filters": {"type": "object"}
}
)
async def list_containers(self, all: bool = False, filters: Optional[Dict] = None) -> List[Dict]:
"""列出所有容器"""
containers = self.client.containers.list(
all=all,
filters=filters or {}
)
result = []
for container in containers:
result.append({
"id": container.short_id,
"name": container.name,
"image": container.image.tags[0] if container.image.tags else "none",
"status": container.status,
"ports": container.ports,
"created": container.attrs["Created"]
})
return result
@mcp_tool(
name="docker_run_container",
description="运行Docker容器",
parameters={
"image": {"type": "string", "required": True},
"command": {"type": "string"},
"ports": {"type": "object"},
"volumes": {"type": "object"},
"environment": {"type": "object"},
"name": {"type": "string"}
}
)
async def run_container(self, image: str, **kwargs) -> Dict:
"""运行容器"""
try:
container = self.client.containers.run(
image=image,
command=kwargs.get("command"),
ports=kwargs.get("ports", {}),
volumes=kwargs.get("volumes", {}),
environment=kwargs.get("environment", {}),
name=kwargs.get("name"),
detach=True
)
return {
"success": True,
"container_id": container.id,
"name": container.name,
"status": container.status
}
except docker.errors.APIError as e:
return {
"success": False,
"error": str(e)
}
四、自定义MCP工具开发
1. 基础MCP工具开发
# 自定义MCP工具框架
from typing import Any, Dict, Callable
import inspect
class MCPServer:
"""MCP服务器实现"""
def __init__(self, name: str):
self.name = name
self.tools: Dict[str, Dict] = {}
self.resources: Dict[str, Any] = {}
def tool(self, name: str = None, description: str = "", parameters: Dict = None):
"""工具装饰器"""
def decorator(func: Callable):
tool_name = name or func.__name__
sig = inspect.signature(func)
# 自动提取参数信息
param_schema = {}
for param_name, param in sig.parameters.items():
param_schema[param_name] = {
"type": self._infer_type(param.annotation),
"required": param.default == inspect.Parameter.empty
}
self.tools[tool_name] = {
"name": tool_name,
"description": description or func.__doc__,
"parameters": parameters or param_schema,
"handler": func
}
return func
return decorator
def _infer_type(self, annotation):
"""推断参数类型"""
type_map = {
str: "string",
int: "number",
float: "number",
bool: "boolean",
list: "array",
dict: "object"
}
return type_map.get(annotation, "string")
async def handle_request(self, request: Dict) -> Dict:
"""处理MCP请求"""
tool_name = request.get("tool")
if tool_name not in self.tools:
return {"error": f"工具不存在: {tool_name}"}
tool = self.tools[tool_name]
try:
result = await tool["handler"](**request.get("parameters", {}))
return {"result": result}
except Exception as e:
return {"error": str(e)}
2. 天气查询MCP工具
# 天气查询MCP工具示例
import requests
from datetime import datetime
class WeatherMCP(MCPServer):
def __init__(self, api_key: str):
super().__init__("weather-mcp")
self.api_key = api_key
self.base_url = "https://api.openweathermap.org/data/2.5"
@mcp_tool(
name="get_current_weather",
description="获取当前天气信息",
parameters={
"city": {"type": "string", "required": True},
"country_code": {"type": "string", "default": ""},
"units": {
"type": "string",
"enum": ["metric", "imperial", "standard"],
"default": "metric"
}
}
)
async def get_current_weather(self, city: str, country_code: str = "", units: str = "metric"):
"""获取当前天气"""
location = f"{city},{country_code}" if country_code else city
response = requests.get(
f"{self.base_url}/weather",
params={
"q": location,
"appid": self.api_key,
"units": units,
"lang": "zh_cn"
}
)
if response.status_code == 200:
data = response.json()
return {
"location": data["name"],
"country": data["sys"]["country"],
"temperature": data["main"]["temp"],
"feels_like": data["main"]["feels_like"],
"humidity": data["main"]["humidity"],
"weather": data["weather"][0]["description"],
"wind_speed": data["wind"]["speed"],
"timestamp": datetime.fromtimestamp(data["dt"])
}
else:
return {"error": f"获取天气失败: {response.text}"}
@mcp_tool(
name="get_weather_forecast",
description="获取天气预报",
parameters={
"city": {"type": "string", "required": True},
"days": {"type": "number", "default": 5}
}
)
async def get_weather_forecast(self, city: str, days: int = 5):
"""获取天气预报"""
# 实现天气预报逻辑
pass
3. 数据库MCP工具
# 数据库MCP工具
import sqlite3
from contextlib import contextmanager
import pandas as pd
class DatabaseMCP(MCPServer):
"""数据库操作MCP工具"""
def __init__(self, db_path: str):
super().__init__("database-mcp")
self.db_path = db_path
@contextmanager
def get_connection(self):
"""获取数据库连接"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
@mcp_tool(
name="execute_sql",
description="执行SQL查询",
parameters={
"query": {"type": "string", "required": True},
"parameters": {"type": "object", "default": {}},
"fetch_results": {"type": "boolean", "default": True}
}
)
async def execute_sql(self, query: str, parameters: dict = None, fetch_results: bool = True):
"""执行SQL语句"""
if parameters is None:
parameters = {}
with self.get_connection() as conn:
cursor = conn.cursor()
try:
if query.strip().upper().startswith("SELECT") and fetch_results:
cursor.execute(query, parameters)
rows = cursor.fetchall()
# 转换为字典列表
results = [dict(row) for row in rows]
return {
"success": True,
"rowcount": len(results),
"results": results
}
else:
cursor.execute(query, parameters)
conn.commit()
return {
"success": True,
"rowcount": cursor.rowcount,
"lastrowid": cursor.lastrowid
}
except sqlite3.Error as e:
return {
"success": False,
"error": str(e)
}
@mcp_tool(
name="get_table_info",
description="获取表结构信息",
parameters={
"table_name": {"type": "string", "required": True}
}
)
async def get_table_info(self, table_name: str):
"""获取表结构"""
with self.get_connection() as conn:
cursor = conn.cursor()
# 获取表结构
cursor.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
# 获取索引信息
cursor.execute(f"PRAGMA index_list({table_name})")
indexes = cursor.fetchall()
return {
"table_name": table_name,
"columns": [
{
"name": col[1],
"type": col[2],
"notnull": bool(col[3]),
"default_value": col[4],
"primary_key": bool(col[5])
}
for col in columns
],
"indexes": [
{
"name": idx[1],
"unique": bool(idx[2])
}
for idx in indexes
]
}
五、工具链集成
1. Git集成工具
# Git MCP集成
import subprocess
import os
from typing import List, Dict, Optional
class GitMCP(MCPServer):
"""Git版本控制MCP工具"""
def __init__(self):
super().__init__("git-mcp")
def _run_git_command(self, args: List[str], cwd: Optional[str] = None) -> Dict:
"""执行Git命令"""
try:
result = subprocess.run(
["git"] + args,
cwd=cwd or os.getcwd(),
capture_output=True,
text=True,
encoding="utf-8"
)
return {
"success": result.returncode == 0,
"stdout": result.stdout.strip(),
"stderr": result.stderr.strip(),
"returncode": result.returncode
}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp_tool(
name="git_status",
description="查看Git仓库状态",
parameters={
"path": {"type": "string", "default": "."}
}
)
async def git_status(self, path: str = ".") -> Dict:
"""查看Git状态"""
return self._run_git_command(["status", "--porcelain"], cwd=path)
@mcp_tool(
name="git_commit",
description="提交更改",
parameters={
"message": {"type": "string", "required": True},
"path": {"type": "string", "default": "."},
"add_all": {"type": "boolean", "default": False}
}
)
async def git_commit(self, message: str, path: str = ".", add_all: bool = False) -> Dict:
"""提交更改"""
results = []
if add_all:
add_result = self._run_git_command(["add", "-A"], cwd=path)
results.append(("git add", add_result))
commit_result = self._run_git_command(
["commit", "-m", message],
cwd=path
)
results.append(("git commit", commit_result))
return {"operations": results}
@mcp_tool(
name="git_branch",
description="分支操作",
parameters={
"operation": {
"type": "string",
"enum": ["list", "create", "delete", "checkout"],
"default": "list"
},
"branch_name": {"type": "string"},
"path": {"type": "string", "default": "."}
}
)
async def git_branch(self, operation: str = "list", branch_name: str = None, path: str = ".") -> Dict:
"""分支操作"""
if operation == "list":
result = self._run_git_command(["branch", "-a"], cwd=path)
if result["success"]:
branches = result["stdout"].split('
')
result["branches"] = [b.strip() for b in branches if b.strip()]
return result
elif operation == "create" and branch_name:
return self._run_git_command(["checkout", "-b", branch_name], cwd=path)
elif operation == "checkout" and branch_name:
return self._run_git_command(["checkout", branch_name], cwd=path)
elif operation == "delete" and branch_name:
return self._run_git_command(["branch", "-d", branch_name], cwd=path)
return {"success": False, "error": "无效的操作或缺少参数"}
2. Docker Compose集成
# docker-compose.yml - MCP集成配置
version: '3.8'
services:
mcp-server:
build: .
ports:
- "8080:8080"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- DATABASE_URL=postgresql://user:pass@db:5432/mcp
volumes:
- ./tools:/app/tools
- ./data:/app/data
depends_on:
- db
- redis
db:
image: postgres:15
environment:
POSTGRES_DB: mcp
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
ports:
- "6379:6379"
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- mcp-server
volumes:
postgres_data:
六、数据库操作扩展
1. ORM集成MCP工具
# SQLAlchemy ORM MCP集成
from sqlalchemy import create_engine, MetaData, Table, select, insert, update, delete
from sqlalchemy.orm import sessionmaker
from typing import List, Dict, Any
class ORMMCP(MCPServer):
"""ORM数据库操作MCP"""
def __init__(self, database_url: str):
super().__init__("orm-mcp")
self.engine = create_engine(database_url)
self.metadata = MetaData()
self.Session = sessionmaker(bind=self.engine)
# 反射数据库结构
self.metadata.reflect(bind=self.engine)
@mcp_tool(
name="query_table",
description="查询数据库表",
parameters={
"table_name": {"type": "string", "required": True},
"filters": {"type": "object"},
"limit": {"type": "number", "default": 100},
"offset": {"type": "number", "default": 0},
"order_by": {"type": "string"}
}
)
async def query_table(self, table_name: str, filters: Dict = None,
limit: int = 100, offset: int = 0, order_by: str = None) -> List[Dict]:
"""查询表数据"""
if table_name not in self.metadata.tables:
return {"error": f"表不存在: {table_name}"}
table = self.metadata.tables[table_name]
with self.Session() as session:
# 构建查询
query = select(table)
# 应用过滤器
if filters:
for column, value in filters.items():
if column in table.columns:
query = query.where(table.columns[column] == value)
# 排序
if order_by:
order_column = table.columns.get(order_by.replace('-', ''))
if order_column:
if order_by.startswith('-'):
query = query.order_by(order_column.desc())
else:
query = query.order_by(order_column.asc())
# 分页
query = query.limit(limit).offset(offset)
# 执行查询
result = session.execute(query)
# 转换为字典列表
rows = []
for row in result:
row_dict = {}
for column in table.columns:
row_dict[column.name] = getattr(row, column.name)
rows.append(row_dict)
return {
"count": len(rows),
"total": self._get_total_count(session, table),
"data": rows
}
@mcp_tool(
name="insert_record",
description="插入记录",
parameters={
"table_name": {"type": "string", "required": True},
"data": {"type": "object", "required": True}
}
)
async def insert_record(self, table_name: str, data: Dict) -> Dict:
"""插入记录"""
if table_name not in self.metadata.tables:
return {"error": f"表不存在: {table_name}"}
table = self.metadata.tables[table_name]
with self.Session() as session:
try:
# 构建插入语句
stmt = insert(table).values(**data)
result = session.execute(stmt)
session.commit()
return {
"success": True,
"inserted_id": result.inserted_primary_key[0] if result.inserted_primary_key else None,
"rowcount": result.rowcount
}
except Exception as e:
session.rollback()
return {"success": False, "error": str(e)}
2. Redis缓存MCP
# Redis MCP工具
import redis
import json
from typing import Union, List, Dict
from datetime import timedelta
class RedisMCP(MCPServer):
"""Redis缓存操作MCP"""
def __init__(self, host: str = "localhost", port: int = 6379,
db: int = 0, password: str = None):
super().__init__("redis-mcp")
self.client = redis.Redis(
host=host,
port=port,
db=db,
password=password,
decode_responses=True
)
@mcp_tool(
name="redis_set",
description="设置Redis键值",
parameters={
"key": {"type": "string", "required": True},
"value": {"type": "any", "required": True},
"expire_seconds": {"type": "number"},
"expire_minutes": {"type": "number"},
"expire_hours": {"type": "number"}
}
)
async def redis_set(self, key: str, value: Union[str, Dict, List],
**expire_params) -> Dict:
"""设置键值"""
try:
# 序列化值
if isinstance(value, (dict, list)):
value_str = json.dumps(value)
else:
value_str = str(value)
# 设置过期时间
expire_seconds = 0
if expire_params.get("expire_seconds"):
expire_seconds = expire_params["expire_seconds"]
elif expire_params.get("expire_minutes"):
expire_seconds = expire_params["expire_minutes"] * 60
elif expire_params.get("expire_hours"):
expire_seconds = expire_params["expire_hours"] * 3600
if expire_seconds > 0:
self.client.setex(key, expire_seconds, value_str)
else:
self.client.set(key, value_str)
return {"success": True, "key": key}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp_tool(
name="redis_get",
description="获取Redis键值",
parameters={
"key": {"type": "string", "required": True},
"parse_json": {"type": "boolean", "default": True}
}
)
async def redis_get(self, key: str, parse_json: bool = True) -> Union[Dict, str, None]:
"""获取键值"""
try:
value = self.client.get(key)
if value is None:
return None
if parse_json:
try:
return json.loads(value)
except json.JSONDecodeError:
return value
else:
return value
except Exception as e:
return {"error": str(e)}
七、API调用工具
1. REST API客户端MCP
# REST API MCP工具
import aiohttp
import asyncio
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
@dataclass
class APIEndpoint:
"""API端点配置"""
name: str
path: str
method: str = "GET"
description: str = ""
parameters: Dict[str, Any] = None
headers: Dict[str, str] = None
requires_auth: bool = False
class RESTAPIMCP(MCPServer):
"""REST API客户端MCP"""
def __init__(self, base_url: str, api_key: str = None):
super().__init__("rest-api-mcp")
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.endpoints: Dict[str, APIEndpoint] = {}
self.session: Optional[aiohttp.ClientSession] = None
def register_endpoint(self, endpoint: APIEndpoint):
"""注册API端点"""
self.endpoints[endpoint.name] = endpoint
# 动态创建MCP工具
tool_params = {
"name": f"api_{endpoint.name}",
"description": endpoint.description or f"调用{endpoint.name} API",
"parameters": endpoint.parameters or {}
}
# 创建工具函数
async def api_call(**kwargs):
return await self._call_endpoint(endpoint.name, **kwargs)
# 注册为MCP工具
self.tool(**tool_params)(api_call)
async def _call_endpoint(self, endpoint_name: str, **kwargs) -> Dict:
"""调用API端点"""
if endpoint_name not in self.endpoints:
return {"error": f"端点未注册: {endpoint_name}"}
endpoint = self.endpoints[endpoint_name]
if self.session is None:
self.session = aiohttp.ClientSession()
try:
# 构建请求
url = f"{self.base_url}{endpoint.path}"
headers = endpoint.headers or {}
# 添加认证
if endpoint.requires_auth and self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
# 发送请求
async with self.session.request(
method=endpoint.method,
url=url,
headers=headers,
json=kwargs if endpoint.method in ["POST", "PUT", "PATCH"] else None,
params=kwargs if endpoint.method == "GET" else None,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status == 200:
data = await response.json()
return {
"success": True,
"status": response.status,
"data": data
}
else:
error_text = await response.text()
return {
"success": False,
"status": response.status,
"error": error_text
}
except Exception as e:
return {"success": False, "error": str(e)}
async def close(self):
"""关闭会话"""
if self.session:
await self.session.close()
2. 多API聚合工具
# API聚合MCP工具
import concurrent.futures
from typing import Dict, List, Callable
class APIAggregatorMCP(MCPServer):
"""多API聚合MCP工具"""
def __init__(self):
super().__init__("api-aggregator-mcp")
self.apis: Dict[str, Callable] = {}
@mcp_tool(
name="aggregate_apis",
description="并行调用多个API并聚合结果",
parameters={
"api_calls": {
"type": "array",
"items": {
"type": "object",
"properties": {
"api_name": {"type": "string", "required": True},
"params": {"type": "object"}
}
},
"required": True
},
"timeout_seconds": {"type": "number", "default": 10}
}
)
async def aggregate_apis(self, api_calls: List[Dict], timeout_seconds: int = 10) -> Dict:
"""并行调用多个API"""
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 提交所有API调用任务
future_to_api = {}
for api_call in api_calls:
api_name = api_call["api_name"]
params = api_call.get("params", {})
if api_name in self.apis:
future = executor.submit(self.apis[api_name], **params)
future_to_api[future] = api_name
# 收集结果
for future in concurrent.futures.as_completed(future_to_api, timeout=timeout_seconds):
api_name = future_to_api[future]
try:
results[api_name] = future.result()
except Exception as e:
results[api_name] = {"error": str(e)}
return {
"total_calls": len(api_calls),
"successful_calls": sum(1 for r in results.values() if "error" not in r),
"results": results
}
3. 身份验证与API密钥管理
# 安全API调用MCP
import hashlib
import hmac
from datetime import datetime
import base64
class SecureAPIMCP(MCPServer):
"""带认证的API调用MCP"""
def __init__(self):
super().__init__("secure-api-mcp")
self.api_keys = {} # 内存中的API密钥存储
def _generate_signature(self, secret: str, message: str) -> str:
"""生成HMAC签名"""
digest = hmac.new(
secret.encode(),
message.encode(),
hashlib.sha256
).digest()
return base64.b64encode(digest).decode()
@mcp_tool(
name="call_secure_api",
description="调用需要签名的安全API",
parameters={
"url": {"type": "string", "required": True},
"method": {"type": "string", "default": "GET"},
"data": {"type": "object"},
"api_key_id": {"type": "string", "required": True}
}
)
async def call_secure_api(self, url: str, method: str = "GET",
data: Dict = None, api_key_id: str = None) -> Dict:
"""调用安全API"""
if api_key_id not in self.api_keys:
return {"error": "无效的API密钥ID"}
api_key = self.api_keys[api_key_id]
# 生成时间戳
timestamp = datetime.utcnow().isoformat()
# 构建签名消息
message = f"{method}
{url}
{timestamp}"
if data and method in ["POST", "PUT", "PATCH"]:
message += f"
{json.dumps(data, sort_keys=True)}"
# 生成签名
signature = self._generate_signature(api_key["secret"], message)
# 发送请求
headers = {
"X-API-Key": api_key["key"],
"X-Timestamp": timestamp,
"X-Signature": signature,
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
async with session.request(
method=method,
url=url,
headers=headers,
json=data
) as response:
return await self._handle_response(response)
部署与配置示例
MCP服务器配置
# mcp-config.yaml
version: "1.0"
name: "claude-code-mcp-server"
tools:
- name: "file_operations"
module: "tools.file_mcp"
enabled: true
config:
root_path: "/workspace"
- name: "git_integration"
module: "tools.git_mcp"
enabled: true
- name: "database_client"
module: "tools.database_mcp"
enabled: true
config:
database_url: "postgresql://user:pass@localhost:5432/claude"
- name: "weather_service"
module: "tools.weather_mcp"
enabled: true
config:
api_key: "${WEATHER_API_KEY}"
security:
api_keys:
- name: "openweathermap"
env_var: "WEATHER_API_KEY"
- name: "github"
env_var: "GITHUB_TOKEN"
allowed_domains:
- "api.openweathermap.org"
- "api.github.com"
logging:
level: "INFO"
file: "/var/log/mcp-server.log"
启动脚本
#!/bin/bash
# start-mcp-server.sh
# 设置环境变量
export WEATHER_API_KEY="your_api_key_here"
export GITHUB_TOKEN="your_github_token_here"
# 启动MCP服务器
python -m mcp.server
--config mcp-config.yaml
--host 0.0.0.0
--port 8080
--log-level INFO
最佳实践建议
1. 工具设计原则
-
单一职责:每个工具只做一件事
-
错误处理:提供清晰的错误信息和恢复建议
-
文档完善:为每个工具提供详细的使用说明
-
性能考虑:避免长时间阻塞的操作
2. 安全最佳实践
# 安全工具示例
class SafeMCPTool:
def __init__(self):
self.max_execution_time = 30 # 秒
self.allowed_operations = ["read", "list"]
async def safe_execute(self, operation, *args, **kwargs):
"""安全执行工具操作"""
if operation not in self.allowed_operations:
raise PermissionError(f"不允许的操作: {operation}")
try:
# 设置超时
return await asyncio.wait_for(
self._execute_operation(operation, *args, **kwargs),
timeout=self.max_execution_time
)
except asyncio.TimeoutError:
raise TimeoutError("操作超时")
3. 性能优化
-
使用连接池管理数据库连接
-
实现缓存机制减少重复请求
-
批量处理操作以提高效率
-
异步执行长时间运行的任务
通过MCP扩展功能,Claude Code能够与各种外部系统和服务进行交互,大大扩展了其应用场景和能力边界。开发者可以根据具体需求创建自定义工具,构建出强大的AI辅助开发环境。








