Commit e012e8a0 e012e8a0fd23e9451421a813ae1025baea5b0826 by chenjing

提交配置文件

1 parent 75e44936
......@@ -15,6 +15,8 @@ pipeline {
IP_HOST_TEST = 'ai-test.hikoon.com'
IP_HOST_UAT = 'ai-uat.hikoon.com'
IP_HOST_PROD = 'api.hikoon.com'
// 企微Webhook URL - 包含完整的webhook地址和key
WECOM_WEBHOOK_URL = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=77f8f0d5-243a-442a-8946-bc713be7bf65'
}
stages {
......@@ -101,10 +103,28 @@ pipeline {
success {
echo "[SUCCESS] [${DEPLOY_ENV}环境] 所有测试用例运行成功!"
script {
sh '''
. venv/bin/activate
export WECOM_WEBHOOK_URL="${WECOM_WEBHOOK_URL}"
export BUILD_STATUS=success
python3 send_wecom_notification.py
'''
}
}
failure {
echo "[FAILURE] [${DEPLOY_ENV}环境] 有测试用例执行失败!"
script {
sh '''
. venv/bin/activate
export WECOM_WEBHOOK_URL="${WECOM_WEBHOOK_URL}"
export BUILD_STATUS=failure
python3 send_wecom_notification.py
'''
}
}
}
}
......
#!/usr/bin/env python3
"""
企微群通知脚本
用于在Jenkins流水线完成后发送通知到企微群
"""
import requests
import json
import sys
from datetime import datetime
import os
def send_wecom_notification(webhook_url, env_name, build_status, build_number, job_name, jenkins_url):
"""
发送企微群通知
Args:
webhook_url: 企微群webhook URL
env_name: 运行环境 (test/uat/prod)
build_status: 构建状态 ('success' 或 'failure')
build_number: Jenkins构建号
job_name: Jenkins任务名称
jenkins_url: Jenkins服务器地址
"""
# 构建报告链接
report_url = f"{jenkins_url}/job/{job_name}/{build_number}/artifact/reports/{env_name}/"
# 根据状态构建消息
if build_status == 'success':
status_emoji = "✅"
status_text = "运行成功"
status_color = "info"
result_text = "✅ 全部通过"
else:
status_emoji = "❌"
status_text = "运行失败"
status_color = "warning"
result_text = "❌ 存在失败用例"
# 获取当前时间
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 构建Markdown格式的消息
message_content = f"""<font color="{status_color}">{status_emoji} API自动化测试 - {status_text}</font>
**运行环境**:{env_name}
**运行结果**:{result_text}
**构建号**:{build_number}
**报告地址**:[点击查看报告]({report_url})
执行时间:{current_time}"""
# 构建企微消息体
payload = {
"msgtype": "markdown",
"markdown": {
"content": message_content
}
}
try:
# 发送POST请求
response = requests.post(
webhook_url,
json=payload,
timeout=10,
headers={'Content-Type': 'application/json'}
)
# 检查响应状态
response.raise_for_status()
# 解析返回结果
result = response.json()
if result.get('errcode') == 0:
print(f"✅ 企微通知发送成功: {result.get('errmsg', 'OK')}")
return True
else:
print(f"❌ 企微通知发送失败: {result.get('errmsg', '未知错误')} (errcode: {result.get('errcode')})")
return False
except requests.exceptions.RequestException as e:
print(f"❌ 请求失败: {str(e)}")
return False
except json.JSONDecodeError as e:
print(f"❌ 解析响应失败: {str(e)}")
return False
except Exception as e:
print(f"❌ 发送通知异常: {str(e)}")
return False
if __name__ == "__main__":
# 从环境变量获取参数
webhook_url = os.getenv('WECOM_WEBHOOK_URL')
env_name = os.getenv('DEPLOY_ENV', 'test')
build_status = os.getenv('BUILD_STATUS', 'success')
build_number = os.getenv('BUILD_NUMBER', 'unknown')
job_name = os.getenv('JOB_NAME', 'api_test')
jenkins_url = os.getenv('JENKINS_URL', '').rstrip('/') or 'http://localhost:8080'
# 验证必需参数
if not webhook_url:
print("❌ 错误: 未设置 WECOM_WEBHOOK_URL 环境变量")
sys.exit(1)
# 发送通知
success = send_wecom_notification(
webhook_url=webhook_url,
env_name=env_name,
build_status=build_status,
build_number=build_number,
job_name=job_name,
jenkins_url=jenkins_url
)
# 返回退出码
sys.exit(0 if success else 1)