oss_uploader.py
7.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
"""
阿里云OSS文件上传模块
"""
import os
import uuid
import logging
from datetime import datetime, timedelta
import oss2
from app.core.config import settings
logger = logging.getLogger(__name__)
class OSSUploader:
"""阿里云OSS上传器"""
def __init__(self):
"""初始化OSS客户端"""
self.access_key_id = settings.OSS_ACCESS_KEY_ID
self.access_key_secret = settings.OSS_ACCESS_KEY_SECRET
self.endpoint = settings.OSS_ENDPOINT
self.bucket_name = settings.OSS_BUCKET_NAME
if not all([
self.access_key_id,
self.access_key_secret,
self.endpoint,
self.bucket_name,
]):
raise ValueError("OSS配置不完整,请检查 .env 中的 OSS_ACCESS_KEY_ID/OSS_ACCESS_KEY_SECRET/OSS_ENDPOINT/OSS_BUCKET_NAME")
logger.info(
"OSS配置: endpoint=%s, bucket=%s",
self.endpoint,
self.bucket_name,
)
# 创建认证对象
self.auth = oss2.Auth(self.access_key_id, self.access_key_secret)
# 默认使用公网 endpoint;非阿里云内网环境下访问 internal endpoint 容易失败。
self.bucket = oss2.Bucket(self.auth, self.endpoint, self.bucket_name)
def upload_file(self, local_file_path, oss_object_name=None):
"""
上传文件到OSS
Args:
local_file_path: 本地文件路径
oss_object_name: OSS对象名称,如果不指定则使用时间戳+原文件名
Returns:
tuple: (success: bool, url: str) 或 (success: bool, error: str)
"""
try:
if not os.path.exists(local_file_path):
logger.error(f"本地文件不存在: {local_file_path}")
return False, "本地文件不存在"
if not oss_object_name:
_, ext = os.path.splitext(local_file_path)
oss_object_name = f"{uuid.uuid4()}{ext}"
# 如果没有指定OSS对象名称,则生成一个
date = datetime.now().strftime("%Y%m%d")
oss_object_name = f"temp_ai/{date}/{oss_object_name}"
# 上传文件
result = self.bucket.put_object_from_file(oss_object_name, local_file_path)
# 构建文件URL
file_url = f"https://{self.bucket_name}.{self.endpoint}/{oss_object_name}"
logger.info(f"文件上传成功: {local_file_path} -> {file_url}")
return True, file_url
except Exception as e:
logger.error(f"文件上传失败: {local_file_path}, 错误: {e}")
return False, str(e)
def upload_data(self, data, oss_object_name):
"""
上传数据到OSS
Args:
data: 要上传的数据(字符串或字节)
oss_object_name: OSS对象名称
Returns:
dict: 包含上传结果的字典
"""
try:
# 上传数据
result = self.bucket.put_object(oss_object_name, data)
# 构建文件URL
file_url = f"{self.endpoint.rstrip('/')}/{self.bucket_name}/{oss_object_name}"
return {
"success": True,
"oss_object_name": oss_object_name,
"file_url": file_url,
"etag": result.etag,
"size": len(data) if isinstance(data, (str, bytes)) else 0
}
except Exception as e:
return {"success": False, "error": str(e)}
def get_bucket():
"""获取Bucket对象"""
if not all([
settings.OSS_ACCESS_KEY_ID,
settings.OSS_ACCESS_KEY_SECRET,
settings.OSS_ENDPOINT,
settings.OSS_BUCKET_NAME,
]):
raise ValueError("OSS配置不完整,请检查 .env 中的 OSS_ACCESS_KEY_ID/OSS_ACCESS_KEY_SECRET/OSS_ENDPOINT/OSS_BUCKET_NAME")
auth = oss2.Auth(settings.OSS_ACCESS_KEY_ID, settings.OSS_ACCESS_KEY_SECRET)
bucket = oss2.Bucket(auth, settings.OSS_ENDPOINT, settings.OSS_BUCKET_NAME)
return bucket
def clean_expire_file():
"""核心任务函数"""
print(f"\n[{datetime.now()}] 开始执行每日清理任务...")
ROOT_PREFIX = 'temp_ai/'
bucket = get_bucket()
# 1. 计算时间阈值
now = datetime.now()
yesterday_date = (now - timedelta(days=1)).date()
print(f"保留阈值: {yesterday_date} (即 {yesterday_date} 之前的数据将被删除)")
# 2. 遍历目录
try:
for obj in oss2.ObjectIterator(bucket, prefix=ROOT_PREFIX, delimiter='/'):
path = ""
is_directory = False
# --- [核心修改] 统一路径获取方式 ---
# 情况 A: 它是虚拟目录 (CommonPrefix)
if hasattr(obj, 'prefix'):
path = obj.prefix
is_directory = True
# 情况 B: 它是实际对象 (SimplifiedObjectInfo)
elif hasattr(obj, 'key'):
path = obj.key
# 如果 key 以 / 结尾,说明它是一个显式创建的文件夹对象
if path.endswith('/'):
is_directory = True
else:
is_directory = False # 这是一个普通文件
# --- 逻辑分流 ---
if not is_directory:
# 这是一个真正的文件(且不是文件夹对象),直接跳过
# print(f"[跳过] 散落文件: {path}")
continue
# 此时 path 必定是目录格式 (如 'temp_ai/20251229/')
# 下面开始正常的日期判断逻辑
# 防御性去空,防止路径即为 'temp_ai/' 本身
if path == ROOT_PREFIX:
continue
# 解析目录名 (取倒数第二个元素,因为最后一位是空字符串)
folder_name_raw = path.strip('/').split('/')[-1]
try:
folder_date_obj = datetime.strptime(folder_name_raw, "%Y%m%d").date()
if folder_date_obj < yesterday_date:
print(f"[删除] 发现过期目录: {path}")
# 注意:delete_objects_by_prefix 会删除该前缀下的所有文件
# 如果这个目录本身是个对象,也会被一并删除,无需特殊处理
delete_objects_by_prefix(bucket, path)
else:
# print(f"[跳过] 目录较新: {path}")
pass
except ValueError:
print(f"[跳过] 非日期命名目录: {path}")
except Exception as e:
import traceback
print(f"[严重错误] 任务执行失败: {e}")
traceback.print_exc()
def delete_objects_by_prefix(bucket, prefix):
"""递归删除指定前缀下的所有文件"""
print(f" -> 正在清理目录: {prefix} ...")
batch_list = []
try:
for obj in oss2.ObjectIterator(bucket, prefix=prefix):
batch_list.append(obj.key)
if len(batch_list) >= 1000:
bucket.batch_delete_objects(batch_list)
batch_list = []
if batch_list:
bucket.batch_delete_objects(batch_list)
print(f" -> 目录 {prefix} 清理完毕。")
except Exception as e:
print(f" [错误] 删除过程出错: {e}")
# 创建OSS上传器实例
oss_uploader = OSSUploader()
if __name__ == '__main__':
resp = oss_uploader.upload_file('想-dj-片段.mp3')
print(resp)