api_test.py 29.2 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515
import requests
import openpyxl
import json
from datetime import datetime
import time
import os
import re
import importlib.util
from jsonpath_ng import parse
import ast

def get_deploy_env():
    """获取当前部署环境,默认为test"""
    return os.getenv('DEPLOY_ENV', 'test')

def should_run_in_env(env_scope, current_env):
    """
    判断用例是否应在当前环境执行

    env_scope 格式:
    - 空、None、'None'、'all' → 所有环境执行
    - 'test' → 仅测试环境
    - 'uat' → 仅UAT环境
    - 'prod' → 仅生产环境
    - 'test,uat' → 测试和UAT环境
    - 'test,prod' → 测试和生产环境
    """
    if not env_scope or env_scope.strip() == '':
        return True
    # 处理字符串"None"或"none"的情况
    if str(env_scope).strip().lower() in ['none', 'all']:
        return True
    allowed_envs = [e.strip() for e in env_scope.split(',')]
    return current_env in allowed_envs

def load_env_vars():
    env_vars = {}
    # 优先读取env_config.py
    if os.path.exists('env_config.py'):
        spec = importlib.util.spec_from_file_location('env_config', 'env_config.py')
        env = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(env)
        for k in dir(env):
            if not k.startswith('__'):
                env_vars[k] = getattr(env, k)
    elif os.path.exists('env.txt'):
        with open('env.txt', encoding='utf-8') as f:
            for line in f:
                if '=' in line:
                    k, v = line.strip().split('=', 1)
                    env_vars[k.strip()] = v.strip()
    return env_vars

def replace_vars(s, env_vars):
    if not isinstance(s, str):
        return s
    prev = None
    curr = s
    # 最多递归5次,防止死循环
    for _ in range(5):
        prev = curr
        def repl(match):
            var = match.group(1)
            return str(env_vars.get(var, match.group(0)))
        curr = re.sub(r'\{(\w+)\}', repl, prev)
        if curr == prev:
            break
    return curr

def run_api_test(cases):
    env_vars = load_env_vars()  # 只初始化一次,所有用例共享
    current_env = get_deploy_env()  # 获取当前环境
    results = []
    for case in cases:
        # 新增:支持is_run字段,等于1跳过,等于0才执行
        is_run = str(case.get('is_run', '0')).strip()
        if is_run == '1' or is_run == 1:
            continue

        # 新增:根据env_scope和当前环境判断是否执行
        env_scope = str(case.get('env_scope', '')).strip()
        if not should_run_in_env(env_scope, current_env):
            print(f"⏭️  跳过用例 case_id: {case.get('case_id')},原因:不在{current_env}环境执行范围内 (env_scope: {env_scope})")
            continue

        # 变量替换,优先用ip_host+path拼接url
        ip_host = replace_vars(case.get('ip_host', ''), env_vars)
        path = replace_vars(case.get('path', ''), env_vars)
        url = ''
        url_error = ''
        if ip_host and path:
            if not ip_host.startswith('http://') and not ip_host.startswith('https://'):
                ip_host = 'https://' + ip_host
            url = ip_host.rstrip('/') + '/' + path.lstrip('/')
        elif case.get('url', ''):
            url = replace_vars(case.get('url', ''), env_vars)
        else:
            url_error = 'ip_host和path或url参数缺失'
        method = case.get('method', '').upper()
        params_str = replace_vars(case.get('params', ''), env_vars)
        headers_str = replace_vars(case.get('headers', ''), env_vars)
        expected_code = int(case.get('expected_code', 0) or 0)
        expected_msg = case.get('expected_msg', '') if 'expected_msg' in case else ''
        expected_response = replace_vars(case.get('expected_response', ''), env_vars)
        api_name = case.get('api_name', '')

        # 拼接后再统一加前缀,确保url一定带https://
        if url and not (url.startswith('http://') or url.startswith('https://')):
            url = 'https://' + url.lstrip('/')

        # 统一先定义result,保证后续except分支可用
        result = {
            "case_id": case.get('case_id', ''),
            "api_name": api_name,
            "is_run": is_run,
            "url": url,
            "method": method or '参数不完整',
            "params": params_str,
            "headers": headers_str,
            "expected_code": expected_code,
            "expected_msg": expected_msg,
            "expected_response": expected_response,
            "actual_code": 'None',
            "actual_response": 'None',
            "code_pass": False,
            "msg_pass": False,
            "response_pass": False,
            "test_result": "失败",
            "error": url_error or "url或method参数缺失",
            "duration_ms": None
        }

        # 调试输出每条用例的关键信息
        print(f"case_id: {case.get('case_id')}, is_run: {is_run}, url: {url}, method: {method}")
        print('原始url:', case.get('url'), '替换后url:', url)

        # 参数不完整,直接标记失败
        if url_error or not url or not method:
            print(f"参数校验未通过,case_id: {case.get('case_id')}, url: {url}, method: {method}, url_error: {url_error}")
            results.append(result)
            continue

        # 健壮性增强:params、headers解析异常时直接标记失败
        try:
            params = json.loads(params_str) if params_str else {}
        except Exception as e:
            result["error"] = f"params字段不是合法JSON: {e}"
            result["test_result"] = "失败"
            results.append(result)
            continue
        try:
            headers = json.loads(headers_str) if headers_str else {}
        except Exception as e:
            result["error"] = f"headers字段不是合法JSON: {e}"
            result["test_result"] = "失败"
            results.append(result)
            continue

        try:
            start_time = time.time()
            timeout_val = 60  # 超时时间60秒
            if method == 'GET':
                resp = requests.get(url, params=params, headers=headers, timeout=timeout_val)
            elif method == 'POST':
                resp = requests.post(url, json=params, headers=headers, timeout=timeout_val)
            elif method == 'PUT':
                resp = requests.put(url, json=params, headers=headers, timeout=timeout_val)
            elif method == 'DELETE':
                resp = requests.delete(url, json=params, headers=headers, timeout=timeout_val)
            else:
                result["error"] = f"不支持的请求方法: {method}"
                result["test_result"] = "失败"
                results.append(result)
                continue
            end_time = time.time()
            duration_ms = int((end_time - start_time) * 1000)
            result["duration_ms"] = duration_ms

            # 修正resp_json解析
            try:
                resp_json = resp.json()
            except Exception:
                try:
                    resp_json = ast.literal_eval(resp.text)
                except Exception:
                    resp_json = resp.text
            result["actual_code"] = resp.status_code
            result["actual_response"] = str(resp_json)
            result["code_pass"] = resp.status_code == expected_code
            result["msg_pass"] = expected_msg in str(resp_json) if expected_msg else None

            # 部分字段校验返回内容
            if expected_response:
                try:
                    expected_json = json.loads(expected_response)
                    actual_json = resp_json if isinstance(resp_json, dict) else json.loads(resp_json)

                    # 运算符检查函数
                    def check_operator(expected, actual):
                        """检查是否满足各种关系运算符条件"""
                        if not isinstance(expected, dict):
                            return False

                        # 检查是否为运算符字典(至少有一个$前缀的key)
                        has_operator = any(k.startswith('$') for k in expected.keys())
                        if not has_operator:
                            return False

                        for op, value in expected.items():
                            if not op.startswith('$'):
                                continue

                            try:
                                if op == '$ne':  # 不等于
                                    if actual == value:
                                        return False
                                elif op == '$eq':  # 等于
                                    if actual != value:
                                        return False
                                elif op == '$gt':  # 大于
                                    if not (actual > value):
                                        return False
                                elif op == '$gte':  # 大于等于
                                    if not (actual >= value):
                                        return False
                                elif op == '$lt':  # 小于
                                    if not (actual < value):
                                        return False
                                elif op == '$lte':  # 小于等于
                                    if not (actual <= value):
                                        return False
                                elif op == '$in':  # 包含(值在列表中)
                                    if actual not in value:
                                        return False
                                elif op == '$nin':  # 不包含(值不在列表中)
                                    if actual in value:
                                        return False
                                elif op == '$regex':  # 正则表达匹配
                                    if not re.search(value, str(actual)):
                                        return False
                                elif op == '$contains':  # 字符串包含
                                    if value not in str(actual):
                                        return False
                                elif op == '$not_contains':  # 字符串不包含
                                    if value in str(actual):
                                        return False
                                elif op == '$range':  # 范围判断 [min, max]
                                    if not isinstance(value, list) or len(value) != 2:
                                        return False
                                    min_val, max_val = value
                                    if not (min_val <= actual <= max_val):
                                        return False
                                else:
                                    # 未知运算符视为失败
                                    return False
                            except (TypeError, ValueError):
                                # 类型不匹配或运算失败
                                return False

                        return True

                    # 优化check_partial,支持["__NOT_NULL__"]和运算符
                    def check_partial(expected, actual):
                        if expected == "__NOT_NULL__":
                            return actual not in [None, "", [], {}]
                        if isinstance(expected, list):
                            # 只要期望是["__NOT_NULL__"],实际list非空即可
                            if len(expected) == 1 and expected[0] == "__NOT_NULL__":
                                return isinstance(actual, list) and len(actual) > 0
                            # 否则递归比对每个元素
                            if not isinstance(actual, list) or len(expected) != len(actual):
                                return False
                            return all(check_partial(e, a) for e, a in zip(expected, actual))
                        if isinstance(expected, dict) and isinstance(actual, dict):
                            # 检查是否为运算符字典
                            has_operator = any(k.startswith('$') for k in expected.keys())
                            if has_operator:
                                return check_operator(expected, actual)
                            # 普通字典递归检查
                            for k, v in expected.items():
                                if k not in actual or not check_partial(v, actual[k]):
                                    return False
                            return True
                        # 如果期望是运算符字典但actual不是字典,尝试直接用operator检查
                        if isinstance(expected, dict) and any(k.startswith('$') for k in expected.keys()):
                            return check_operator(expected, actual)
                        return expected == actual
                    result["response_pass"] = check_partial(expected_json, actual_json)
                except Exception as e:
                    result["response_pass"] = False
                    result["error"] += f" | 返回内容或期望内容不是合法JSON: {e}"
            else:
                result["response_pass"] = None

            # 执行后提取变量(支持多个,分号或换行分隔,支持split/regex表达式)
            extract_vars = case.get('extract_vars', '')
            if extract_vars and result['actual_response']:
                try:
                    actual_json = None
                    try:
                        actual_json = json.loads(result['actual_response'])
                    except Exception:
                        try:
                            actual_json = ast.literal_eval(result['actual_response'])
                        except Exception:
                            pass
                    if actual_json:
                        for line in extract_vars.replace('\n', ';').split(';'):
                            if '=' in line:
                                var, path_expr = line.split('=', 1)
                                var = var.strip()
                                path_expr = path_expr.strip()
                                if not var or not path_expr:
                                    continue
                                # 支持表达式:jsonpath|split('?')[0] 或 |regex(...)
                                if '|' in path_expr:
                                    path, expr = path_expr.split('|', 1)
                                    path = path.strip()
                                    expr = expr.strip()
                                else:
                                    path, expr = path_expr, None
                                matches = parse(path).find(actual_json)
                                if matches:
                                    val = matches[0].value
                                    # split表达式
                                    if expr and expr.startswith('split('):
                                        import re
                                        m = re.match(r"split\(['\"](.*?)['\"]\)\[(\d+)\]", expr)
                                        if m:
                                            sep, idx = m.group(1), int(m.group(2))
                                            val = str(val).split(sep)[idx] if sep in str(val) else str(val)
                                    # regex表达式
                                    elif expr and expr.startswith('regex('):
                                        import re
                                        m = re.match(r"regex\((.*)\)", expr)
                                        if m:
                                            pattern = m.group(1)
                                            reg = re.compile(pattern)
                                            reg_match = reg.search(str(val))
                                            if reg_match:
                                                val = reg_match.group(1) if reg_match.groups() else reg_match.group(0)
                                    env_vars[var] = val
                except Exception as e:
                    result['error'] += f' | 提取变量失败: {e}'

            # 总体通过条件和error信息优化
            error_msgs = []
            if not result["code_pass"]:
                error_msgs.append(f"实际返回码与期望不符(expected:{expected_code}, actual:{result['actual_code']})")
            if expected_response and result["response_pass"] is False:
                error_msgs.append("实际返回内容与期望内容不符")
            if result["code_pass"] and (result["response_pass"] in [True, None]):
                result["test_result"] = "通过"
                result["error"] = ""  # 用例通过时error置空
            else:
                result["test_result"] = "失败"
                # 只保留本次判定的error信息,前面初始化的url或method参数缺失等只在参数校验未通过时保留
                if error_msgs:
                    result["error"] = " | ".join(error_msgs)
            results.append(result)
        except requests.Timeout:
            result["error"] = "执行超时"
            result["test_result"] = "失败"
            result["duration_ms"] = 60000
            results.append(result)
            continue
        except Exception as e:
            result["error"] = str(e)
            result["test_result"] = "异常"
            results.append(result)
            continue
    return results

def write_report_to_excel(results, report_path):
    from openpyxl.styles import Font, Alignment, PatternFill
    wb = openpyxl.Workbook()
    ws = wb.active
    ws.title = "测试报告"
    headers = [
        "case_id", "api_name", "is_run", "method", "url", "params", "headers", "expected_code", "expected_msg", "expected_response",
        "actual_code", "actual_response", "code_pass", "msg_pass", "response_pass", "test_result", "duration", "error"
    ]
    ws.append(headers)
    # 设置表头样式
    for col in range(1, len(headers)+1):
        cell = ws.cell(row=1, column=col)
        cell.font = Font(bold=True, color="FFFFFF")
        cell.fill = PatternFill("solid", fgColor="2D3E50")
        cell.alignment = Alignment(horizontal="center", vertical="center")
    # 写入内容
    for i, r in enumerate(results, 2):
        row = [r.get(h, "") for h in headers]
        ws.append(row)
        # 自动换行
        for col in range(1, len(headers)+1):
            ws.cell(row=i, column=col).alignment = Alignment(wrap_text=True, vertical="center")
        # 失败用例整行标红
        if r.get('test_result') == '失败':
            for col in range(1, len(headers)+1):
                ws.cell(row=i, column=col).fill = PatternFill("solid", fgColor="FFF0F0")
    wb.save(report_path)
    print(f"测试报告已生成:{report_path}")

def write_report_to_html(results, html_path):
    headers = [
        "序号", "接口名称", "method", "url", "headers", "params", "expected_code", "expected_response", "actual_code", "actual_response", "duration", "error"
    ]
    total = len(results)
    passed = sum(1 for r in results if r.get('test_result') == '通过')
    failed = sum(1 for r in results if r.get('test_result') == '失败' or r.get('test_result') == '异常')
    total_duration = sum(float(r.get('duration_ms', 0) or 0) for r in results if r.get('duration_ms') is not None)
    total_seconds = total_duration / 1000
    if total_seconds < 60:
        duration_str = f"{total_seconds:.2f}秒"
    else:
        duration_str = f"{total_seconds/60:.2f}分钟"
    over_1s_count = sum(1 for r in results if r.get('duration_ms') is not None and float(r.get('duration_ms') or 0) > 1000)
    over_3s_count = sum(1 for r in results if r.get('duration_ms') is not None and float(r.get('duration_ms') or 0) > 3000)
    task_result = "执行成功" if failed == 0 else "执行失败"
    task_color = "green" if failed == 0 else "red"
    html = [
        '<html><head><meta charset="utf-8"><title>接口自动化测试报告</title>',
        '<style>\nbody{font-family:Segoe UI,Arial,sans-serif;background:#f8f9fa;}\nh2{text-align:center;margin-top:24px;}\n.summary{text-align:center;margin-bottom:18px;font-size:17px;}\ntable{border-collapse:collapse;margin:auto;background:#fff;box-shadow:0 2px 8px #eee;width:98vw;table-layout:fixed;}\nth,td{border:1px solid #ccc;padding:6px 8px;overflow:hidden;text-overflow:ellipsis;white-space:nowrap;font-size:13px;position:relative;}\nth:nth-child(1),td:nth-child(1){width:1.5vw;min-width:24px;}\nth:nth-child(7),td:nth-child(7){width:2vw;min-width:32px;}\nth:nth-child(9),td:nth-child(9){width:2vw;min-width:32px;}\nth:nth-child(11),td:nth-child(11){width:2vw;min-width:32px;}\nth:nth-child(8),td:nth-child(8){width:15vw;min-width:180px;}\nth:nth-child(10),td:nth-child(10){width:15vw;min-width:180px;}\nth:nth-child(2),td:nth-child(2){width:8vw;min-width:90px;}\ntd.api-name-cell,td.url-cell,td.headers-cell,td.error-cell{white-space:normal !important;word-break:break-all;overflow:auto;max-height:8em;}\ntd.scroll-cell > div, td.url-cell > div, td.headers-cell > div, td.api-name-cell > div {max-height:8em;overflow:auto;white-space:pre-wrap;word-break:break-all;}\ntd.scroll-cell{max-height:8em;min-width:120px;max-width:600px;overflow:auto;white-space:pre-wrap;word-break:break-all;position:relative;}\nbutton.copy-btn{margin-left:6px;padding:2px 10px;font-size:12px;cursor:pointer;background:#409eff;border:none;border-radius:3px;color:#fff;transition:background 0.2s;}\nbutton.copy-btn:hover{background:#66b1ff;}\ntr.failrow{background:#ffeaea !important;}\n.resizer{position:absolute;right:0;top:0;width:6px;height:100%;cursor:col-resize;user-select:none;z-index:2;}\n.duration-warn{color:#e74c3c;}\n.duration-strong{color:#e74c3c;font-weight:bold;}\n.filter-bar{margin:10px 0 0 0;text-align:left;width:98vw;max-width:1800px;}\n.filter-btn{display:inline-block;margin:0 8px 8px 0;padding:4px 18px;font-size:13px;border:1px solid #409eff;background:#fff;color:#409eff;border-radius:4px;cursor:pointer;transition:all 0.2s;outline:none;}\n.filter-btn.active,.filter-btn:hover{background:#409eff;color:#fff;}\n.back-to-top-btn{position:fixed;right:32px;bottom:32px;z-index:9999;width:48px;height:48px;border-radius:50%;background:rgba(200,200,200,0.45);color:#333;display:flex;align-items:center;justify-content:center;font-size:22px;box-shadow:0 2px 8px #ccc;cursor:pointer;transition:background 0.2s,border 0.2s;border:1px solid #e0e0e0;opacity:0.85;}\n.back-to-top-btn:hover{background:rgba(180,180,180,0.7);}\n</style>',
        '<script>\nfunction makeResizable(tableId){\n  var table = document.getElementById(tableId);\n  if(!table) return;\n  var ths = table.querySelectorAll("th");\n  ths.forEach(function(th,i){\n    var resizer = document.createElement("div");\n    resizer.className = "resizer";\n    th.appendChild(resizer);\n    resizer.addEventListener("mousedown", function(e){\n      var startX = e.pageX;\n      var startWidth = th.offsetWidth;\n      function onMove(e2){\n        var newWidth = startWidth + (e2.pageX - startX);\n        th.style.width = newWidth+"px";\n      }\n      function onUp(){\n        document.removeEventListener("mousemove",onMove);\n        document.removeEventListener("mouseup",onUp);\n      }\n      document.addEventListener("mousemove",onMove);\n      document.addEventListener("mouseup",onUp);\n    });\n  });\n}\nwindow.onload=function(){makeResizable("report-table");filterTable("all");};\nfunction copyText(id){var t=document.getElementById(id).getAttribute("data-full");navigator.clipboard.writeText(t);alert("已复制全部内容");}\nfunction filterTable(type){\n  var trs=document.querySelectorAll("#report-table tbody tr");\n  for(var i=0;i<trs.length;i++){\n    var tr=trs[i];\n    var fail=tr.getAttribute("data-fail");\n    var dur=Number(tr.getAttribute("data-duration"));\n    if(type=="all"){tr.style.display="";}\n    else if(type=="fail"){tr.style.display=fail=="1"?"":"none";}\n    else if(type=="d1"){tr.style.display=dur>1000?"":"none";}\n    else if(type=="d3"){tr.style.display=dur>3000?"":"none";}\n  }\n  var btns=document.querySelectorAll(".filter-btn");\n  for(var j=0;j<btns.length;j++){btns[j].classList.remove("active");}\n  document.getElementById("btn-"+type).classList.add("active");\n}\nwindow.onload=function(){filterTable("all");};\nfunction scrollToTop(){window.scrollTo({top:0,behavior:"smooth"});}\n</script>',
        '</head><body>',
        '<h2 style="text-align:center;margin-top:24px;margin-bottom:10px;font-size:2.2em;">接口自动化测试报告</h2>',
        f'<div class="summary"><b style="color:{task_color}">{task_result}</b>,共执行用例:<b>{total}</b>,成功:<b style="color:green">{passed}</b>,失败:<b style="color:red">{failed}</b></div>',
        f'<div class="summary">执行用时:<b>{duration_str}</b>,响应请求大于1S的接口有:<b style="color:red">{over_1s_count}</b>,大于3S的接口有:<b style="color:red">{over_3s_count}</b></div>',
        '<div class="filter-bar"><button class="filter-btn active" id="btn-all" onclick="filterTable(\'all\')">全部</button><button class="filter-btn" id="btn-fail" onclick="filterTable(\'fail\')">失败用例</button><button class="filter-btn" id="btn-d1" onclick="filterTable(\'d1\')">请求大于1S</button><button class="filter-btn" id="btn-d3" onclick="filterTable(\'d3\')">请求大于3S</button></div>',
        '<table id="report-table">'
    ]
    html.append('<thead>')
    html.append('<tr>' + ''.join(f'<th>{h}</th>' for h in headers) + '</tr>')
    html.append('</thead>')
    html.append('<tbody>')
    for idx, r in enumerate(results, 1):
        is_fail = r.get('test_result') == '失败' or r.get('test_result') == '异常'
        tr_class = 'failrow' if is_fail else ''
        duration_val = r.get('duration', r.get('duration_ms', 0))
        try:
            duration_val = int(duration_val)
        except:
            duration_val = 0
        data_fail = '1' if is_fail else '0'
        row = [f'<td>{idx}</td>']
        row.append(f'<td class="api-name-cell"><div>{r.get("api_name", "")}</div></td>')
        row.append(f'<td>{r.get("method", "")}</td>')
        row.append(f'<td class="url-cell"><div>{r.get("url", "")}</div></td>')
        row.append(f'<td class="headers-cell"><div>{r.get("headers", "")}</div></td>')
        row.append(f'<td class="scroll-cell"><div>{r.get("params", "")}</div></td>')
        row.append(f'<td>{r.get("expected_code", "")}</td>')
        row.append(f'<td class="scroll-cell"><div>{r.get("expected_response", "")}</div></td>')
        row.append(f'<td>{r.get("actual_code", "")}</td>')
        row.append(f'<td class="scroll-cell"><div>{r.get("actual_response", "")}</div></td>')
        duration_class = "duration-strong" if duration_val > 3000 else ("duration-warn" if duration_val > 1000 else "")
        row.append(f'<td class="{duration_class}">{duration_val}</td>')
        row.append(f'<td class="error-cell"><div>{r.get("error", "")}</div></td>')
        html.append(f'<tr class="{tr_class}" data-fail="{data_fail}" data-duration="{duration_val}">' + ''.join(row) + '</tr>')
    html.append('</tbody>')
    html.append('</table>')
    html.append('<div class="back-to-top-btn" onclick="scrollToTop()" title="回到顶部">↑</div>')
    html.append('</body></html>')
    with open(html_path, 'w', encoding='utf-8') as f:
        f.write('\n'.join(html))
    print(f"HTML测试报告已生成:{html_path}")

def read_cases_from_excel(file_path, sheet_name='Sheet1'):
    wb = openpyxl.load_workbook(file_path)
    ws = wb[sheet_name]
    cases = []
    headers = [cell.value for cell in ws[1]]
    for row in ws.iter_rows(min_row=2, values_only=True):
        case = dict(zip(headers, row))
        cases.append(case)
    return cases

if __name__ == "__main__":
    import sys

    # 删除旧的测试报告
    for f in os.listdir('.'):
        if (f.startswith('api_test_report_') and f.endswith('.xlsx')) or (f.startswith('api_test_report_') and f.endswith('.html')):
            try:
                os.remove(f)
            except Exception:
                pass

    # 支持多个Excel文件
    excel_files = [f for f in os.listdir('.') if f.startswith('api_cases') and f.endswith('.xlsx')]
    all_passed = True  # 标记所有用例是否都通过

    for excel_path in excel_files:
        deploy_env = get_deploy_env()
        report_path = f"api_test_report_{excel_path.replace('.xlsx', '')}_{deploy_env}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
        html_path = report_path.replace('.xlsx', '.html')
        cases = read_cases_from_excel(excel_path)
        print(f'读取到的用例({excel_path}):', cases)
        results = run_api_test(cases)
        print(f'最终results({excel_path}):', results)
        write_report_to_excel(results, report_path)
        write_report_to_html(results, html_path)

        # 检查是否有失败用例
        failed_count = sum(1 for r in results if r.get('test_result') in ['失败', '异常'])
        if failed_count > 0:
            all_passed = False
            print(f"\n⚠️ {excel_path}: 有 {failed_count} 个用例执行失败")

    # 根据结果返回退出码
    if all_passed:
        print("\n✅ 所有测试用例运行成功!")
        sys.exit(0)  # Jenkins: 执行成功
    else:
        print("\n❌ 有测试用例执行失败,请查看报告详情!")
        sys.exit(1)  # Jenkins: 执行失败