send_wecom_notification.py
3.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/env python3
"""
企微群通知脚本
用于在Jenkins流水线完成后发送通知到企微群
"""
import requests
import json
import sys
from datetime import datetime
import os
def send_wecom_notification(webhook_url, env_name, build_status, build_number, job_name, jenkins_url):
"""
发送企微群通知
Args:
webhook_url: 企微群webhook URL
env_name: 运行环境 (test/uat/prod)
build_status: 构建状态 ('success' 或 'failure')
build_number: Jenkins构建号
job_name: Jenkins任务名称
jenkins_url: Jenkins服务器地址
"""
# 构建报告链接
report_url = f"{jenkins_url}/job/{job_name}/{build_number}/artifact/reports/{env_name}/"
# 根据状态构建消息
if build_status == 'success':
status_emoji = "✅"
status_text = "运行成功"
status_color = "info"
result_text = "✅ 全部通过"
else:
status_emoji = "❌"
status_text = "运行失败"
status_color = "warning"
result_text = "❌ 存在失败用例"
# 获取当前时间
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 构建Markdown格式的消息
message_content = f"""<font color="{status_color}">{status_emoji} API自动化测试 - {status_text}</font>
**运行环境**:{env_name}
**运行结果**:{result_text}
**构建号**:{build_number}
**报告地址**:[点击查看报告]({report_url})
执行时间:{current_time}"""
# 构建企微消息体
payload = {
"msgtype": "markdown",
"markdown": {
"content": message_content
}
}
try:
# 发送POST请求
response = requests.post(
webhook_url,
json=payload,
timeout=10,
headers={'Content-Type': 'application/json'}
)
# 检查响应状态
response.raise_for_status()
# 解析返回结果
result = response.json()
if result.get('errcode') == 0:
print(f"✅ 企微通知发送成功: {result.get('errmsg', 'OK')}")
return True
else:
print(f"❌ 企微通知发送失败: {result.get('errmsg', '未知错误')} (errcode: {result.get('errcode')})")
return False
except requests.exceptions.RequestException as e:
print(f"❌ 请求失败: {str(e)}")
return False
except json.JSONDecodeError as e:
print(f"❌ 解析响应失败: {str(e)}")
return False
except Exception as e:
print(f"❌ 发送通知异常: {str(e)}")
return False
if __name__ == "__main__":
# 从环境变量获取参数
webhook_url = os.getenv('WECOM_WEBHOOK_URL')
env_name = os.getenv('DEPLOY_ENV', 'test')
build_status = os.getenv('BUILD_STATUS', 'success')
build_number = os.getenv('BUILD_NUMBER', 'unknown')
job_name = os.getenv('JOB_NAME', 'api_test')
jenkins_url = os.getenv('JENKINS_URL', '').rstrip('/') or 'http://localhost:8080'
# 验证必需参数
if not webhook_url:
print("❌ 错误: 未设置 WECOM_WEBHOOK_URL 环境变量")
sys.exit(1)
# 发送通知
success = send_wecom_notification(
webhook_url=webhook_url,
env_name=env_name,
build_status=build_status,
build_number=build_number,
job_name=job_name,
jenkins_url=jenkins_url
)
# 返回退出码
sys.exit(0 if success else 1)