send_wecom_notification.py
3.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#!/usr/bin/env python3
"""
企微群通知脚本
用于在Jenkins流水线完成后发送通知到企微群
"""
import requests
import json
import sys
import re
from datetime import datetime
import os
def send_wecom_notification(webhook_url, env_name, build_status, build_url):
"""
发送企微群通知
Args:
webhook_url: 企微群webhook URL
env_name: 运行环境 (test/uat/prod)
build_status: 构建状态 ('success' 或 'failure')
build_url: Jenkins完整的构建URL (格式: http://jenkins/job/xxx/123/)
"""
# 构建报告链接 - BUILD_URL通常以/结尾,直接拼接
report_url = f"{build_url}artifact/reports/{env_name}/"
# 从BUILD_URL中提取构建号 (格式: http://jenkins/.../123/)
match = re.search(r'/(\d+)/?$', build_url.rstrip('/'))
build_number = match.group(1) if match else 'unknown'
# 根据状态构建消息
if build_status == 'success':
status_emoji = "✅"
status_text = "运行成功"
status_color = "info"
result_text = "✅ 全部通过"
else:
status_emoji = "❌"
status_text = "运行失败"
status_color = "warning"
result_text = "❌ 存在失败用例"
# 获取当前时间
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 构建Markdown格式的消息
message_content = f"""<font color="{status_color}">{status_emoji} API自动化测试 - {status_text}</font>
**运行环境**:{env_name}
**运行结果**:{result_text}
**构建号**:{build_number}
**报告地址**:[点击查看报告]({report_url})
执行时间:{current_time}"""
# 构建企微消息体
payload = {
"msgtype": "markdown",
"markdown": {
"content": message_content
}
}
try:
# 发送POST请求
response = requests.post(
webhook_url,
json=payload,
timeout=10,
headers={'Content-Type': 'application/json'}
)
# 检查响应状态
response.raise_for_status()
# 解析返回结果
result = response.json()
if result.get('errcode') == 0:
print(f"✅ 企微通知发送成功: {result.get('errmsg', 'OK')}")
return True
else:
print(f"❌ 企微通知发送失败: {result.get('errmsg', '未知错误')} (errcode: {result.get('errcode')})")
return False
except requests.exceptions.RequestException as e:
print(f"❌ 请求失败: {str(e)}")
return False
except json.JSONDecodeError as e:
print(f"❌ 解析响应失败: {str(e)}")
return False
except Exception as e:
print(f"❌ 发送通知异常: {str(e)}")
return False
if __name__ == "__main__":
# 从环境变量获取参数
webhook_url = os.getenv('WECOM_WEBHOOK_URL')
env_name = os.getenv('DEPLOY_ENV', 'test')
build_status = os.getenv('BUILD_STATUS', 'success')
build_url = os.getenv('BUILD_URL')
# 验证必需参数
if not webhook_url:
print("❌ 错误: 未设置 WECOM_WEBHOOK_URL 环境变量")
sys.exit(1)
if not build_url:
print("❌ 错误: 未设置 BUILD_URL 环境变量")
sys.exit(1)
# 发送通知
success = send_wecom_notification(
webhook_url=webhook_url,
env_name=env_name,
build_status=build_status,
build_url=build_url
)
# 返回退出码
sys.exit(0 if success else 1)