(7-3-03)基于MCP实现的金融投资Agent(3)金融MCP服务器

7.3.5 金融MCP服务器
文件server.py是本项目的核心服务器实现,基于模型上下文协议(MCP)搭建了一个金融数据服务平台。它整合了多个工具函数,提供了全面的金融数据获取与分析能力,包括市场动态(涨幅榜、跌幅榜等)、CNN 恐惧与贪婪指数、加密货币情绪指数、谷歌趋势数据、个股详细信息(财务指标、新闻、分析师推荐等)、期权数据、价格历史、财务报表、机构持股、收益历史、内幕交易等,还支持在特定依赖(如 TA-Lib、Playwright)存在时提供技术指标计算和收益日历功能。这些工具通过 MCP 协议暴露,可被客户端(如大型语言模型)调用以获取专业金融分析支持。
(1)下面代码的功能是设置日志记录器和基本配置,创建一个名为“Investor-Agent”的FastMCP实例,该实例声明了其依赖的Python库,以便于在MCP环境中正确初始化和运行。
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stderr)]
)
mcp = FastMCP("Investor-Agent", dependencies=["yfinance", "httpx", "pandas", "pytrends", "beautifulsoup4"])
FearGreedIndicator = Literal[
"fear_and_greed",
"fear_and_greed_historical",
"put_call_options",
"market_volatility_vix",
"market_volatility_vix_50",
"junk_bond_demand",
"safe_haven_demand"
]
(2)下面代码的功能是定义一个名为get_market_movers的异步函数,该函数通过调用yahoo_finance_utils模块中的get_market_movers_data函数,获取市场动态数据,如涨幅最大、跌幅最大或最活跃的股票,并支持不同的市场交易时段。
@mcp.tool()
async def get_market_movers(
category: Literal["gainers", "losers", "most-active"] = "most-active",
count: int = 25,
market_session: Literal["regular", "pre-market", "after-hours"] = "regular"
) -> dict:
"""
获取市场动态:涨幅榜、跌幅榜或最活跃股票。
对“最活跃”类别支持不同的市场交易时段。
参数:
category: 要获取的动态类型
count: 要返回的股票数量(最多 100 只)
market_session: 市场交易时段(仅适用于“最活跃”类别)
返回:
包含市场动态数据的字典
"""
return await yahoo_finance_utils.get_market_movers_data(category, count, market_session)
(3)下面代码的功能是定义一个名为get_cnn_fear_greed_index的异步函数,用于获取CNN恐惧与贪婪指数的历史数据,支持指定返回指标和时间范围。
@mcp.tool()
async def get_cnn_fear_greed_index(
days: int = 0,
indicators: list[FearGreedIndicator] | None = None
) -> dict:
"""最多 30 天的历史数据。"""
data = await fetch_fng_data()
if not data:
raise RuntimeError("无法获取 CNN 恐惧与贪婪指数数据")
if indicators:
invalid_keys = set(indicators) - set(data.keys())
if invalid_keys:
raise ValueError(f"无效指标:{list(invalid_keys)}。可用指标:{list(data.keys())}")
data = {k: v for k, v in data.items() if k in indicators}
# 当 days = 0 时排除 fear_and_greed_historical
if days == 0:
data = {k: v for k, v in data.items() if k != "fear_and_greed_historical"}
# 根据 days 参数处理历史数据
max_days = min(days, 30) if days > 0 else 0
for key, value in data.items():
if isinstance(value, dict) and "data" in value:
if days == 0:
data[key] = {k: v for k, v in value.items() if k != "data"}
elif len(value["data"]) > max_days:
data[key] = {**value, "data": value["data"][:max_days]}
return data
(4)下面代码的功能是定义一个名为get_crypto_fear_greed_index的异步函数,用于获取加密货币恐惧与贪婪指数的历史数据。
@mcp.tool()
async def get_crypto_fear_greed_index(days: int = 7) -> dict:
"""获取加密货币恐惧与贪婪指数的历史数据。"""
async with yahoo_finance_utils.create_cached_async_client() as client:
response = await client.get("https://api.alternative.me/fng/ ", params={"limit": days})
response.raise_for_status()
return response.json()["data"]
(5)下面代码的功能是定义一个名为get_google_trends的函数,用于获取指定关键词在Google趋势上的相对搜索热度。
@mcp.tool()
def get_google_trends(
keywords: list[str],
period_days: int = 7
) -> dict:
"""获取指定关键词的谷歌趋势相对搜索热度。"""
from pytrends.request import TrendReq
logger.info(f"获取 {period_days} 天的谷歌趋势数据")
pytrends = TrendReq(hl='en-US', tz=360)
pytrends.build_payload(keywords, timeframe=f'now {period_days}-d')
data = pytrends.interest_over_time()
if data.empty:
raise ValueError("谷歌趋势未返回数据")
return data[keywords].mean().to_dict()
(6)下面代码的功能是定义一个名为get_ticker_data的函数,用于获取股票的基本信息,包括核心财务指标、即将到来的收益日期和股息日期、最新新闻文章、最新分析师推荐以及近期分析师评级变动。
@mcp.tool()
def get_ticker_data(
ticker: str,
max_news: int = 5,
max_recommendations: int = 5,
max_upgrades: int = 5
) -> dict:
"""
返回:
- info: 核心财务指标(市盈率、利润率、增长率、负债率、市值、每股收益等)
- calendar: 即将到来的收益日期和股息日期
- news: 最新新闻文章(标题、日期、来源)
- recommendations: 最新分析师推荐(买入/卖出/持有评级)
- upgrades_downgrades: 近期分析师评级变动
"""
info = yfinance_utils.get_ticker_info(ticker)
if not info:
raise ValueError(f"{ticker} 没有可用信息")
# 只保留关键字段
essential_fields = {
'symbol', 'longName', 'currentPrice', 'marketCap', 'volume', 'trailingPE',
'forwardPE', 'dividendYield', 'beta', 'eps', 'totalRevenue', 'totalDebt',
'profitMargins', 'operatingMargins', 'returnOnEquity', 'returnOnAssets',
'revenueGrowth', 'earningsGrowth', 'bookValue', 'priceToBook',
'enterpriseValue', 'pegRatio', 'trailingEps', 'forwardEps'
}
filtered_info = {k: v for k, v in info.items() if k in essential_fields}
data = {"info": filtered_info}
if calendar := yfinance_utils.get_calendar(ticker):
data["calendar"] = calendar
if news := yfinance_utils.get_news(ticker, limit=max_news):
data["news"] = news
recommendations = yfinance_utils.get_analyst_data(ticker, "recommendations", max_recommendations)
if recommendations is not None and not recommendations.empty:
data["recommendations"] = recommendations.to_dict('split')
upgrades = yfinance_utils.get_analyst_data(ticker, "upgrades", max_upgrades)
if upgrades is not None and not upgrades.empty:
data["upgrades_downgrades"] = upgrades.to_dict('split')
return data
(7)下面代码的功能是定义一个名为get_options的函数,用于获取特定股票的期权数据,支持过滤特定日期范围、行权价范围和期权类型。
@mcp.tool()
def get_options(
ticker_symbol: str,
num_options: int = 10,
start_date: str | None = None,
end_date: str | None = None,
strike_lower: float | None = None,
strike_upper: float | None = None,
option_type: Literal["C", "P"] | None = None,
) -> dict:
"""获取期权数据。日期格式:YYYY-MM-DD。类型:C=认购期权,P=认沽期权。"""
df, error = yfinance_utils.get_filtered_options(
ticker_symbol, start_date, end_date, strike_lower, strike_upper, option_type
)
if error:
raise ValueError(error)
return df.head(num_options).to_dict('split')
(8)下面代码的功能是定义一个名为get_price_history的函数,用于获取特定股票的历史价格数据(OHLCV),支持不同时间周期的数据获取。
@mcp.tool()
def get_price_history(
ticker: str,
period: Literal["1d", "5d", "1mo", "3mo", "6mo", "1y", "2y", "5y", "10y", "ytd", "max"] = "1mo"
) -> dict:
"""获取历史 OHLCV 数据:≤1 年周期为日间隔,≥2 年周期为月间隔。"""
interval = "1mo" if period in ["2y", "5y", "10y", "max"] else "1d"
history = yfinance_utils.get_price_history(ticker, period, interval)
if history is None or history.empty:
raise ValueError(f"{ticker} 没有找到历史数据")
return history.to_dict('split')
(9)下面代码的功能是定义一个名为get_financial_statements的函数,用于获取特定股票的财务报表数据,包括损益表、资产负债表和现金流量表,并支持选择报表类型和频率。
@mcp.tool()
def get_financial_statements(
ticker: str,
statement_type: Literal["income", "balance", "cash"] = "income",
frequency: Literal["quarterly", "annual"] = "quarterly",
max_periods: int = 8
) -> dict:
data = yfinance_utils.get_financial_statements(ticker, statement_type, frequency)
if data is None or data.empty:
raise ValueError(f"{ticker} 没有找到 {statement_type} 报表数据")
if len(data.columns) > max_periods:
data = data.iloc[:, :max_periods]
return data.to_dict('split')
(10)下面代码的功能是定义一个名为get_institutional_holders的函数,用于获取特定股票的主要机构和共同基金持股情况。
@mcp.tool()
def get_institutional_holders(ticker: str, top_n: int = 20) -> dict:
"""获取主要机构和共同基金持股情况。"""
inst_holders, fund_holders = yfinance_utils.get_institutional_holders(ticker, top_n)
if (inst_holders is None or inst_holders.empty) and (fund_holders is None or fund_holders.empty):
raise ValueError(f"{ticker} 没有找到机构持股数据")
return {
key: data.to_dict('split')
for key, data in [
("institutional_holders", inst_holders),
("mutual_fund_holders", fund_holders)
]
if data is not None and not data.empty
}
(11)下面代码的功能是定义一个名为get_earnings_history的函数,用于获取特定股票的收益历史数据。
@mcp.tool()
def get_earnings_history(ticker: str, max_entries: int = 8) -> dict:
earnings_history = yfinance_utils.get_earnings_history(ticker, limit=max_entries)
if earnings_history is None or earnings_history.empty:
raise ValueError(f"{ticker} 没有找到收益历史数据")
return earnings_history.to_dict('split')
(12)下面代码的功能是定义一个名为get_insider_trades的函数,用于获取特定股票的内部交易数据。此外,如果Playwright库可用,还会注册一个名为get_earnings_calendar的异步函数,用于获取指定日期范围内的收益日历;如果TA-Lib库可用,还会注册一个名为calculate_technical_indicator的函数,用于计算技术指标。
@mcp.tool()
def get_insider_trades(ticker: str, max_trades: int = 20) -> dict:
trades = yfinance_utils.get_insider_trades(ticker, limit=max_trades)
if trades is None or trades.empty:
raise ValueError(f"{ticker} 没有找到内幕交易数据")
return trades.to_dict('split')
# 仅当 Playwright 可用时注册收益日历工具
if _playwright_available:
@mcp.tool()
async def get_earnings_calendar(
start: str | None = None,
end: str | None = None,
limit: int = 100
) -> dict:
"""获取指定日期范围内的收益日历。"""
return await yahoo_finance_utils.get_earnings_calendar_data(start, end, limit)
# 仅当 TA-Lib 可用时注册技术指标工具
if _ta_available:
@mcp.tool()
def calculate_technical_indicator(
ticker: str,
indicator: Literal["SMA", "EMA", "RSI", "MACD", "BBANDS"],
period: Literal["1mo", "3mo", "6mo", "1y", "2y", "5y"] = "1y",
timeperiod: int = 14, # SMA、EMA、RSI 的默认时间周期
fastperiod: int = 12, # MACD 快速 EMA 的默认值
slowperiod: int = 26, # MACD 慢速 EMA 的默认值
signalperiod: int = 9, # MACD 信号线的默认值
nbdev: int = 2, # BBANDS 的默认标准差
matype: int = 0, # BBANDS 的默认移动平均线类型(0=SMA)
num_results: int = 50 # 要返回的最近结果数量
) -> dict:
"""计算技术指标,包含适当的日期对齐和结果限制。"""
import numpy as np
history = yfinance_utils.get_price_history(ticker, period=period, interval="1d")
if history is None or history.empty or 'Close' not in history.columns:
raise ValueError(f"{ticker} 没有找到有效的历史数据")
close_prices = history['Close'].values
min_required = {
"SMA": timeperiod, "EMA": timeperiod * 2, "RSI": timeperiod + 1,
"MACD": slowperiod + signalperiod, "BBANDS": timeperiod
}.get(indicator, timeperiod)
if len(close_prices) < min_required:
raise ValueError(f"{indicator} 数据不足({len(close_prices)} 个点,需要 {min_required} 个)")
# 使用映射计算指标
indicator_funcs = {
"SMA": lambda: {"sma": talib.SMA(close_prices, timeperiod=timeperiod)},
"EMA": lambda: {"ema": talib.EMA(close_prices, timeperiod=timeperiod)},
"RSI": lambda: {"rsi": talib.RSI(close_prices, timeperiod=timeperiod)},
"MACD": lambda: dict(zip(["macd", "signal", "histogram"],
talib.MACD(close_prices, fastperiod=fastperiod, slowperiod=slowperiod, signalperiod=signalperiod))),
"BBANDS": lambda: dict(zip(["upper_band", "middle_band", "lower_band"],
talib.BBANDS(close_prices, timeperiod=timeperiod, nbdevup=nbdev, nbdevdn=nbdev, matype=matype)))
}
result = indicator_funcs[indicator]()
# 限制结果并准备数据
dates = history.index.strftime('%Y-%m-%d').tolist()
start_idx = max(0, len(dates) - num_results) if num_results > 0 else 0
return [
{
"date": dates[i],
"price": history.iloc[i].to_dict(),
"indicators": {
key: None if np.isnan(val := values[i]) else float(val)
for key, values in result.items()
}
}
for i in range(start_idx, len(dates))
]











