CXL设备测试终极脚本(本地远程控制服务器,进行一系列重启加载驱动查询信息的操作)
#!/usr/bin/env python3
"""
CXL设备测试脚本 - 终极简单版
直接模拟手动操作
"""
import paramiko
import time
import csv
import os
import json
import argparse
from datetime import datetime
class CXLSimpleTest:
def __init__(self, host, username, password, total_loops=20, auto_yes=False, no_reboot=False, preferred_insmod_path=None, wait_before_insmod=25):
self.host = host
self.username = username
self.password = password
self.total_loops = total_loops
self.ssh_client = None
self.auto_yes = auto_yes
self.no_reboot = no_reboot
self.preferred_insmod_path = preferred_insmod_path
self.wait_before_insmod = wait_before_insmod
# 创建结果目录
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
self.result_dir = f"cxl_simple_{timestamp}"
os.makedirs(self.result_dir, exist_ok=True)
# 主CSV文件
self.csv_file = os.path.join(self.result_dir, "results.csv")
print(f"🎯 CXL设备测试 - 终极简单版")
print(f"📁 结果目录: {self.result_dir}")
print(f"📊 CSV文件: {self.csv_file}")
def connect_ssh(self):
"""建立SSH连接"""
try:
self.ssh_client = paramiko.SSHClient()
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh_client.connect(
hostname=self.host,
username=self.username,
password=self.password,
timeout=30
)
return True
except Exception as e:
print(f"❌ SSH连接失败: {e}")
return False
def execute_cmd(self, command, timeout=30):
try:
shell_cmd = f"/bin/bash -lc {json.dumps(command)}"
stdin, stdout, stderr = self.ssh_client.exec_command(shell_cmd, timeout=timeout)
output = stdout.read().decode('utf-8', errors='ignore').strip()
error = stderr.read().decode('utf-8', errors='ignore').strip()
return output, error
except Exception as e:
return "", f"命令执行失败: {e}"
def discover_insmod_paths(self):
paths = [
"/home/uniscmkit/script/insmod_drv.sh",
"/home/uniscmkit/script/insmod_drv",
"/root/tcy/aliscm/insmod_drv",
"/root/tcy/aliscm/insmod_drv.sh",
"/root/insmod_drv",
"/root/insmod_drv.sh",
]
for base in ["/home", "/root", "/usr/local", "/opt"]:
out1, _ = self.execute_cmd(f"find {base} -maxdepth 4 -type f -name insmod_drv 2>/dev/null | head -n 1", 20)
out2, _ = self.execute_cmd(f"find {base} -maxdepth 4 -type f -name insmod_drv.sh 2>/dev/null | head -n 1", 20)
if out1.strip():
paths.append(out1.strip())
if out2.strip():
paths.append(out2.strip())
return [p for p in paths if p]
def find_and_run_insmod(self, max_tries=6, wait_sec=5):
candidates = [
"insmod_drv",
"./insmod_drv",
"/root/insmod_drv",
"cd /root && ./insmod_drv",
"cd /root/tcy/aliscm && ./insmod_drv",
"cd /home/uniscmkit/script && ./insmod_drv",
"/home/uniscmkit/script/insmod_drv.sh",
"cd /home/uniscmkit/script && ./insmod_drv.sh",
]
if self.preferred_insmod_path:
candidates = [
f"bash {self.preferred_insmod_path}",
f"sh {self.preferred_insmod_path}",
f"cd $(dirname {self.preferred_insmod_path}) && ./$(basename {self.preferred_insmod_path})",
f"chmod +x {self.preferred_insmod_path}; {self.preferred_insmod_path}",
] + candidates
out, err = self.execute_cmd("which insmod_drv || true", 10)
if out.strip():
candidates = [out.strip()] + candidates
for p in self.discover_insmod_paths():
candidates = [f"bash {p}", f"sh {p}", f"chmod +x {p}; {p}"] + candidates
attempts_log = []
for _ in range(max_tries):
for cmd in candidates:
o, e = self.execute_cmd(cmd, 40)
attempts_log.append(f"$ {cmd} {o} {e} ")
self.execute_cmd("udevadm settle || true", 10)
after_cxl, _ = self.execute_cmd("cxl list -u || cxl list || true", 25)
lsmod_out, _ = self.execute_cmd("lsmod | grep -i cxl || true", 10)
cond_log = ("has been insmod" in (o or "")) or ("has been insmod" in (e or ""))
cond_lsmod = bool(lsmod_out.strip())
cond_cxl = after_cxl and after_cxl.strip() and after_cxl.strip() != "[]" and "no matching devices" not in after_cxl.lower()
if cond_log or cond_lsmod or cond_cxl:
return True, " ".join(attempts_log), after_cxl, lsmod_out
time.sleep(wait_sec)
lsmod_out, _ = self.execute_cmd("lsmod | grep -i cxl || true", 10)
# 尝试modprobe回退
self.execute_cmd("modprobe cxl_core || true", 15)
self.execute_cmd("modprobe cxl_acpi || true", 15)
self.execute_cmd("modprobe cxl_mem || true", 15)
self.execute_cmd("udevadm settle || true", 10)
after_cxl, _ = self.execute_cmd("cxl list -u || cxl list || true", 25)
lsmod_out2, _ = self.execute_cmd("lsmod | grep -i cxl || true", 10)
cond_lsmod = bool(lsmod_out2.strip())
cond_cxl = after_cxl and after_cxl.strip() and after_cxl.strip() != "[]" and "no matching devices" not in after_cxl.lower()
if cond_lsmod or cond_cxl:
return True, " ".join(attempts_log) + " [modprobe fallback]", after_cxl, lsmod_out2
return False, " ".join(attempts_log), "", lsmod_out
def wait_cxl_ready(self, max_wait=180, interval=5):
t0 = time.time()
last_out = ""
while time.time() - t0 < max_wait:
out, _ = self.execute_cmd("cxl list || true", 30)
if out and out.strip() and out.strip() != "[]" and "no matching devices" not in out.lower():
return True, out
last_out = out
time.sleep(interval)
return False, last_out
def run_single_loop(self, loop_num):
"""执行单次循环 - 模拟手动操作"""
print(f" {'='*60}")
print(f"🔄 第 {loop_num} 次循环")
print(f"⏰ 开始时间: {datetime.now().strftime('%H:%M:%S')}")
print(f"{'='*60}")
loop_start = datetime.now()
results = {
'loop': loop_num,
'start_time': loop_start.strftime('%Y-%m-%d %H:%M:%S'),
'reboot_success': False,
'reboot_time': 0,
'lspci_output': "",
'cxl_output': "",
'cxl_output_before': "",
'memdev_count': 0,
'region_count': 0,
'error': ""
}
try:
# 1/2. 重启或跳过重启
connected = False
if self.no_reboot:
print("1. 跳过重启 ✅")
# 确保已连接
if not self.ssh_client and not self.connect_ssh():
results['error'] = "SSH连接失败"
return results
connected = True
else:
print(f"1. 发送重启命令...", end="")
reboot_output, reboot_error = self.execute_cmd("ipmitool power cycle", 15)
if reboot_error and "command not found" not in reboot_error.lower():
print(f" ❌ {reboot_error[:50]}")
results['error'] = f"重启命令失败: {reboot_error[:50]}"
return results
print(" ✅")
print(f"2. 等待系统重启...", end="")
if self.ssh_client:
self.ssh_client.close()
self.ssh_client = None
reboot_start = time.time()
time.sleep(30)
max_wait = 180
for attempt in range(max_wait // 10):
try:
if self.connect_ssh():
test_output, _ = self.execute_cmd("echo 'READY'", 5)
if "READY" in test_output:
reboot_time = int(time.time() - reboot_start)
print(f" ✅ ({reboot_time}秒)")
results['reboot_success'] = True
results['reboot_time'] = reboot_time
connected = True
break
if self.ssh_client:
self.ssh_client.close(); self.ssh_client = None
except:
pass
print(".", end="", flush=True)
time.sleep(10)
if not connected:
print(f" ❌ (超时)")
results['error'] = "重启超时"
return results
# 3. 等待系统稳定
print(f"3. 等待系统稳定...", end="")
time.sleep(self.wait_before_insmod)
print(" ✅")
# 4. 先执行lspci
print(f"4. 执行lspci | grep -i cxl...", end="")
lspci_output, lspci_error = self.execute_cmd("lspci | grep -i cxl", 20)
if lspci_output:
devices = lspci_output.split(' ')
print(f" ✅ ({len(devices)}个设备)")
results['lspci_output'] = lspci_output
for i, device in enumerate(devices[:3], 1):
print(f" {i}. {device}")
if len(devices) > 3:
print(f" ... 还有{len(devices)-3}个")
else:
print(f" ❌ (无输出)")
pre_cxl, _ = self.execute_cmd("cxl list -u || cxl list || true", 30)
results['cxl_output_before'] = pre_cxl or ""
# 5. 加载驱动并确保成功
print(f"5. 加载驱动并验证...")
ok, insmod_log, after_cxl, lsmod_out = self.find_and_run_insmod()
time.sleep(5)
if not ok:
print(" 驱动未就绪,继续等待设备出现...")
ready, after_cxl = self.wait_cxl_ready(max_wait=180, interval=5)
ok = ok or ready
# 6. 执行cxl list
print(f"6. 执行cxl list...", end="")
cxl_output = after_cxl
cxl_error = ""
if cxl_output and cxl_output != "[]" and "no matching devices" not in cxl_output.lower():
print(f" ✅")
results['cxl_output'] = cxl_output
try:
data = json.loads(cxl_output)
memdevs = []
for item in data:
if 'memdevs' in item:
memdevs.extend(item['memdevs'])
if 'regions' in item:
results['region_count'] = len(item['regions'])
results['memdev_count'] = len(memdevs)
if memdevs:
total_size = sum(dev.get('pmem_size', 0) for dev in memdevs)
print(f" 内存设备: {len(memdevs)}个")
print(f" 总PMEM: {total_size / (1024**3):.1f} GB")
except:
pass
else:
print(f" ❌ (无有效输出)")
if cxl_error:
print(f" 错误: {cxl_error[:100]}")
results['insmod_log'] = insmod_log if ok else (insmod_log or "")
results['lsmod'] = lsmod_out
results['end_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f" ✅ 第 {loop_num} 次循环完成")
except Exception as e:
print(f" ❌ 循环执行出错: {e}")
results['error'] = str(e)
return results
def save_results(self, loop_num, results):
"""保存结果到文件和CSV"""
# 保存lspci输出
if results['lspci_output']:
lspci_file = os.path.join(self.result_dir, f"loop{loop_num:02d}_lspci.txt")
with open(lspci_file, 'w', encoding='utf-8') as f:
f.write(f"循环 #{loop_num} - lspci | grep -i cxl ")
f.write(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ")
f.write("="*60 + " ")
f.write(results['lspci_output'] + " ")
# 保存cxl list输出
if results['cxl_output']:
cxl_file = os.path.join(self.result_dir, f"loop{loop_num:02d}_cxl_list.txt")
with open(cxl_file, 'w', encoding='utf-8') as f:
f.write(f"循环 #{loop_num} - cxl list ")
f.write(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ")
f.write("="*60 + " ")
f.write(results['cxl_output'] + " ")
# 尝试保存为JSON
try:
data = json.loads(results['cxl_output'])
json_file = os.path.join(self.result_dir, f"loop{loop_num:02d}_cxl_parsed.json")
with open(json_file, 'w') as f:
json.dump(data, f, indent=2)
except:
pass
# 保存驱动日志与lsmod
driver_log_file = os.path.join(self.result_dir, f"loop{loop_num:02d}_driver.log")
with open(driver_log_file, 'w', encoding='utf-8') as f:
f.write(results.get('insmod_log', ''))
f.write(" --- lsmod | grep -i cxl --- ")
f.write(results.get('lsmod', ''))
f.write(" --- dmesg (CXL/PCI) --- ")
dmesg_out, _ = self.execute_cmd("dmesg -T | egrep -i 'cxl|pci|acpi' | tail -n 200 || true", 30)
f.write(dmesg_out or '')
# 更新CSV
file_exists = os.path.exists(self.csv_file)
with open(self.csv_file, 'a', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow([
'循环', '开始时间', '结束时间', '重启成功', '重启耗时(秒)',
'CXL设备数', 'cxl list成功', 'cxl list(前)非空', 'memdev数', 'region数', 'lspci输出文件', 'cxl list输出文件', '驱动日志文件', '错误信息'
])
# 计算CXL设备数
lspci_count = len(results['lspci_output'].split(' ')) if results['lspci_output'] else 0
cxl_success = '是' if results['cxl_output'] and results['cxl_output'] != "[]" else '否'
cxl_before_nonempty = '是' if results.get('cxl_output_before') and results.get('cxl_output_before') != "[]" and 'no matching devices' not in results.get('cxl_output_before','').lower() else '否'
writer.writerow([
loop_num,
results['start_time'],
results.get('end_time', ''),
'是' if results['reboot_success'] else '否',
results['reboot_time'],
lspci_count,
cxl_success,
cxl_before_nonempty,
results.get('memdev_count', 0),
results.get('region_count', 0),
f"loop{loop_num:02d}_lspci.txt" if results['lspci_output'] else '',
f"loop{loop_num:02d}_cxl_list.txt" if results['cxl_output'] else '',
os.path.basename(driver_log_file),
results.get('error', '')
])
print(f"📝 结果已保存")
def run_test(self):
"""运行完整测试"""
print("="*60)
print("🎯 CXL设备测试 - 终极简单版")
print("="*60)
print(f"目标主机: {self.host}")
print(f"用户名: {self.username}")
print(f"循环次数: {self.total_loops}")
print("="*60)
print("简单直接的流程:")
print(" 1. ipmitool power cycle")
print(" 2. 等待重启完成")
print(" 3. cd /root/tcy/aliscm && ./insmod_drv")
print(" 4. lspci | grep -i cxl")
print(" 5. cxl list")
print("="*60)
if not self.auto_yes:
confirm = input(f"⚠️ 确认执行 {self.total_loops} 次测试吗?(y/N): ")
if confirm.lower() != 'y':
print("测试取消")
return
print(" 开始测试...")
# 测试连接
print(f"测试初始连接...", end="")
if not self.connect_ssh():
print(" ❌")
return
print(" ✅")
all_results = []
for loop in range(1, self.total_loops + 1):
results = self.run_single_loop(loop)
self.save_results(loop, results)
all_results.append(results)
if loop < self.total_loops:
wait = 20
print(f" ⏸️ 等待{wait}秒后继续...")
time.sleep(wait)
# 显示统计
successful = sum(1 for r in all_results if r['reboot_success'])
has_cxl_output = sum(1 for r in all_results if r['cxl_output'])
print(f" {'='*60}")
print("📊 测试完成统计")
print(f"{'='*60}")
print(f"总循环次数: {len(all_results)}")
print(f"重启成功: {successful}")
print(f"cxl list有输出: {has_cxl_output}")
print(f" 📁 所有结果保存在: {self.result_dir}")
print(f"📊 CSV文件: {self.csv_file}")
print(f" 输出文件:")
print(f" loopXX_lspci.txt - lspci输出")
print(f" loopXX_cxl_list.txt - cxl list输出")
print(f" loopXX_cxl_parsed.json - 解析后的JSON")
print(f"{'='*60}")
# 关闭连接
if self.ssh_client:
self.ssh_client.close()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--host", default=os.environ.get("SSH_HOST", "192.168.32.122"))
parser.add_argument("--user", default=os.environ.get("SSH_USER", "root"))
parser.add_argument("--pass", dest="password", default=os.environ.get("SSH_PASS", "312312"))
_env_loops = os.environ.get("LOOPS")
parser.add_argument("--loops", type=int, default=int(_env_loops) if _env_loops else None)
parser.add_argument("--yes", action="store_true")
parser.add_argument("--no-reboot", action="store_true")
parser.add_argument("--insmod", dest="preferred_insmod_path", default=os.environ.get("INSMOD_PATH", None))
parser.add_argument("--wait", dest="wait_before_insmod", type=int, default=int(os.environ.get("WAIT_BEFORE_INSMOD", "25")))
parser.add_argument("--probe", action="store_true")
args = parser.parse_args()
print("🎯 CXL设备测试 - 终极简单版")
print("-"*50)
print("直接模拟手动操作:")
print(" cd /root/tcy/aliscm && ./insmod_drv")
print("-"*50)
print(f" 配置:")
print(f" 主机: {args.host}")
print(f" 用户: {args.user}")
print(f" 循环: {args.loops if args.loops is not None else '未设置(将询问)'}次")
print(f" insmod命令: cd /root/tcy/aliscm && ./insmod_drv")
print("-"*50)
# 交互选择循环次数(当未提供 --loops 且未通过环境设置时)
loops = args.loops
if loops is None:
while True:
try:
val = input("请输入循环次数(默认1): ").strip()
if val == "":
loops = 1
break
v = int(val)
if v > 0:
loops = v
break
else:
print("请输入正整数")
except Exception:
print("请输入有效数字")
tester = CXLSimpleTest(
args.host,
args.user,
args.password,
total_loops=loops,
auto_yes=args.yes,
no_reboot=args.no_reboot,
preferred_insmod_path=args.preferred_insmod_path,
wait_before_insmod=args.wait_before_insmod,
)
if args.probe:
print("开始探测环境...")
if not tester.connect_ssh():
print("SSH连接失败")
return
out, _ = tester.execute_cmd("which insmod_drv || true", 10)
print(f"which insmod_drv: {out.strip()}")
out, _ = tester.execute_cmd("find / -type f -name insmod_drv 2>/dev/null | head -n 10", 60)
print("find insmod_drv:")
print(out.strip())
out, _ = tester.execute_cmd("which cxl || true", 10)
print(f"which cxl: {out.strip()}")
out, _ = tester.execute_cmd("ls /home/uniscmkit/script 2>/dev/null | head -n 20", 20)
print("ls /home/uniscmkit/script:")
print(out.strip())
return
tester.run_test()
if __name__ == "__main__":
try:
import paramiko
except ImportError:
print("请先安装: pip install paramiko")
exit(1)
try:
main()
except KeyboardInterrupt:
print(" 测试被用户中断")
except Exception as e:
print(f" 程序错误: {e}")







