oss_uploader.py 7.35 KB
"""
阿里云OSS文件上传模块
"""
import os
import uuid
import logging
from datetime import datetime, timedelta

import oss2

from app.core.config import settings

logger = logging.getLogger(__name__)


class OSSUploader:
    """阿里云OSS上传器"""

    def __init__(self):
        """初始化OSS客户端"""
        self.access_key_id = settings.OSS_ACCESS_KEY_ID
        self.access_key_secret = settings.OSS_ACCESS_KEY_SECRET
        self.endpoint = settings.OSS_ENDPOINT
        self.bucket_name = settings.OSS_BUCKET_NAME

        if not all([
            self.access_key_id,
            self.access_key_secret,
            self.endpoint,
            self.bucket_name,
        ]):
            raise ValueError("OSS配置不完整,请检查 .env 中的 OSS_ACCESS_KEY_ID/OSS_ACCESS_KEY_SECRET/OSS_ENDPOINT/OSS_BUCKET_NAME")

        logger.info(
            "OSS配置: endpoint=%s, bucket=%s",
            self.endpoint,
            self.bucket_name,
        )
        # 创建认证对象
        self.auth = oss2.Auth(self.access_key_id, self.access_key_secret)

        # 默认使用公网 endpoint;非阿里云内网环境下访问 internal endpoint 容易失败。
        self.bucket = oss2.Bucket(self.auth, self.endpoint, self.bucket_name)

    def upload_file(self, local_file_path, oss_object_name=None):
        """
        上传文件到OSS

        Args:
            local_file_path: 本地文件路径
            oss_object_name: OSS对象名称,如果不指定则使用时间戳+原文件名

        Returns:
            tuple: (success: bool, url: str) 或 (success: bool, error: str)
        """
        try:
            if not os.path.exists(local_file_path):
                logger.error(f"本地文件不存在: {local_file_path}")
                return False, "本地文件不存在"

            if not oss_object_name:
                _, ext = os.path.splitext(local_file_path)
                oss_object_name = f"{uuid.uuid4()}{ext}"

            # 如果没有指定OSS对象名称,则生成一个
            date = datetime.now().strftime("%Y%m%d")
            oss_object_name = f"temp_ai/{date}/{oss_object_name}"

            # 上传文件
            result = self.bucket.put_object_from_file(oss_object_name, local_file_path)

            # 构建文件URL
            file_url = f"https://{self.bucket_name}.{self.endpoint}/{oss_object_name}"

            logger.info(f"文件上传成功: {local_file_path} -> {file_url}")
            return True, file_url

        except Exception as e:
            logger.error(f"文件上传失败: {local_file_path}, 错误: {e}")
            return False, str(e)

    def upload_data(self, data, oss_object_name):
        """
        上传数据到OSS

        Args:
            data: 要上传的数据(字符串或字节)
            oss_object_name: OSS对象名称

        Returns:
            dict: 包含上传结果的字典
        """
        try:
            # 上传数据
            result = self.bucket.put_object(oss_object_name, data)

            # 构建文件URL
            file_url = f"{self.endpoint.rstrip('/')}/{self.bucket_name}/{oss_object_name}"

            return {
                "success": True,
                "oss_object_name": oss_object_name,
                "file_url": file_url,
                "etag": result.etag,
                "size": len(data) if isinstance(data, (str, bytes)) else 0
            }

        except Exception as e:
            return {"success": False, "error": str(e)}


def get_bucket():
    """获取Bucket对象"""
    if not all([
        settings.OSS_ACCESS_KEY_ID,
        settings.OSS_ACCESS_KEY_SECRET,
        settings.OSS_ENDPOINT,
        settings.OSS_BUCKET_NAME,
    ]):
        raise ValueError("OSS配置不完整,请检查 .env 中的 OSS_ACCESS_KEY_ID/OSS_ACCESS_KEY_SECRET/OSS_ENDPOINT/OSS_BUCKET_NAME")

    auth = oss2.Auth(settings.OSS_ACCESS_KEY_ID, settings.OSS_ACCESS_KEY_SECRET)
    bucket = oss2.Bucket(auth, settings.OSS_ENDPOINT, settings.OSS_BUCKET_NAME)
    return bucket


def clean_expire_file():
    """核心任务函数"""
    print(f"\n[{datetime.now()}] 开始执行每日清理任务...")
    ROOT_PREFIX = 'temp_ai/'
    bucket = get_bucket()

    # 1. 计算时间阈值
    now = datetime.now()
    yesterday_date = (now - timedelta(days=1)).date()
    print(f"保留阈值: {yesterday_date} (即 {yesterday_date} 之前的数据将被删除)")

    # 2. 遍历目录
    try:
        for obj in oss2.ObjectIterator(bucket, prefix=ROOT_PREFIX, delimiter='/'):
            path = ""
            is_directory = False

            # --- [核心修改] 统一路径获取方式 ---

            # 情况 A: 它是虚拟目录 (CommonPrefix)
            if hasattr(obj, 'prefix'):
                path = obj.prefix
                is_directory = True

            # 情况 B: 它是实际对象 (SimplifiedObjectInfo)
            elif hasattr(obj, 'key'):
                path = obj.key
                # 如果 key 以 / 结尾,说明它是一个显式创建的文件夹对象
                if path.endswith('/'):
                    is_directory = True
                else:
                    is_directory = False  # 这是一个普通文件

            # --- 逻辑分流 ---

            if not is_directory:
                # 这是一个真正的文件(且不是文件夹对象),直接跳过
                # print(f"[跳过] 散落文件: {path}")
                continue

            # 此时 path 必定是目录格式 (如 'temp_ai/20251229/')
            # 下面开始正常的日期判断逻辑

            # 防御性去空,防止路径即为 'temp_ai/' 本身
            if path == ROOT_PREFIX:
                continue

            # 解析目录名 (取倒数第二个元素,因为最后一位是空字符串)
            folder_name_raw = path.strip('/').split('/')[-1]

            try:
                folder_date_obj = datetime.strptime(folder_name_raw, "%Y%m%d").date()

                if folder_date_obj < yesterday_date:
                    print(f"[删除] 发现过期目录: {path}")
                    # 注意:delete_objects_by_prefix 会删除该前缀下的所有文件
                    # 如果这个目录本身是个对象,也会被一并删除,无需特殊处理
                    delete_objects_by_prefix(bucket, path)
                else:
                    # print(f"[跳过] 目录较新: {path}")
                    pass

            except ValueError:
                print(f"[跳过] 非日期命名目录: {path}")

    except Exception as e:
        import traceback
        print(f"[严重错误] 任务执行失败: {e}")
        traceback.print_exc()


def delete_objects_by_prefix(bucket, prefix):
    """递归删除指定前缀下的所有文件"""
    print(f"    -> 正在清理目录: {prefix} ...")
    batch_list = []
    try:
        for obj in oss2.ObjectIterator(bucket, prefix=prefix):
            batch_list.append(obj.key)
            if len(batch_list) >= 1000:
                bucket.batch_delete_objects(batch_list)
                batch_list = []

        if batch_list:
            bucket.batch_delete_objects(batch_list)
        print(f"    -> 目录 {prefix} 清理完毕。")
    except Exception as e:
        print(f"    [错误] 删除过程出错: {e}")


# 创建OSS上传器实例
oss_uploader = OSSUploader()

if __name__ == '__main__':
    resp = oss_uploader.upload_file('想-dj-片段.mp3')
    print(resp)