oss_service.py 42.3 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261
"""
阿里云OSS服务
提供文件上传、下载、删除等功能
"""
import oss2
from typing import Optional, Dict, Any, BinaryIO
from datetime import datetime, timedelta
from pathlib import Path
import hashlib
import mimetypes
import logging
import time
import json
from functools import wraps

from aliyunsdkcore.client import AcsClient
from aliyunsdksts import AssumeRoleRequest

from app.core.config import settings
from app.core.exceptions import (
    BusinessException,
    ValidationException,
    ExternalServiceException,
    NotFoundException
)

logger = logging.getLogger(__name__)


def retry_on_connection_error(max_retries: int = 3, delay: float = 1.0):
    """
    重试装饰器 - 在连接错误时重试

    Args:
        max_retries: 最大重试次数
        delay: 重试延迟(秒)
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except (oss2.exceptions.RequestError,
                        ConnectionError,
                        TimeoutError,
                        Exception) as e:
                    # 只重试连接相关的错误
                    error_str = str(e)
                    if ('Connection' in error_str or
                        'timeout' in error_str.lower() or
                        'closed' in error_str.lower() or
                        'RemoteDisconnected' in error_str):
                        last_exception = e
                        if attempt < max_retries - 1:
                            wait_time = delay * (2 ** attempt)  # 指数退避
                            logger.warning(
                                f"[{func.__name__}] 连接错误,{wait_time}秒后重试 "
                                f"({attempt + 1}/{max_retries}): {error_str}"
                            )
                            time.sleep(wait_time)
                            continue
                        else:
                            logger.error(f"[{func.__name__}] 重试次数已用尽: {error_str}")
                            raise
                    else:
                        # 非连接错误直接抛出
                        raise

            if last_exception:
                raise last_exception
        return wrapper
    return decorator


class OSSService:
    """阿里云OSS服务类"""

    def __init__(self):
        """初始化OSS客户端"""
        if not all([
            settings.OSS_ACCESS_KEY_ID,
            settings.OSS_ACCESS_KEY_SECRET,
            settings.OSS_ENDPOINT,
            settings.OSS_BUCKET_NAME
        ]):
            raise BusinessException("OSS配置不完整,请检查环境变量")

        # 创建认证对象
        auth = oss2.Auth(
            settings.OSS_ACCESS_KEY_ID,
            settings.OSS_ACCESS_KEY_SECRET
        )

        # 确定Endpoint
        endpoint = settings.OSS_ENDPOINT
        if settings.OSS_INTERNAL_ENDPOINT and settings.OSS_REGION:
            endpoint = f"oss-{settings.OSS_REGION}-internal.aliyuncs.com"

        # 创建Bucket对象
        self.bucket = oss2.Bucket(
            auth,
            endpoint,
            settings.OSS_BUCKET_NAME
        )

        # 设置超时参数(在 bucket 对象上设置)
        self.bucket.timeout = (
            settings.OSS_CONNECTION_TIMEOUT,
            settings.OSS_READ_TIMEOUT
        )

        # 配置重试参数
        self.max_retries = settings.OSS_REQUEST_RETRY_TIMES
        self.retry_delay = settings.OSS_RETRY_DELAY

        logger.info(
            f"OSS服务已初始化: endpoint={endpoint}, "
            f"timeout={settings.OSS_CONNECTION_TIMEOUT}/{settings.OSS_READ_TIMEOUT}s, "
            f"retries={self.max_retries}"
        )

    @staticmethod
    def _validate_file_extension(filename: str) -> bool:
        """验证文件扩展名"""
        ext = Path(filename).suffix.lower()
        if not ext:
            return False
        return ext in [e.lower() for e in settings.OSS_ALLOWED_EXTENSIONS]

    @staticmethod
    def _validate_file_size(file_size: int) -> bool:
        """验证文件大小"""
        return 0 < file_size <= settings.OSS_MAX_FILE_SIZE

    @staticmethod
    def _generate_object_key(
        filename: str,
        user_id: int,
        prefix: Optional[str] = None
    ) -> str:
        """
        生成OSS对象键(路径)
        格式: {prefix}/{user_id}/{date}/{hash}_{filename}
        """
        # 获取文件扩展名
        ext = Path(filename).suffix.lower()
        name = Path(filename).stem

        # 生成时间戳目录
        now = datetime.now()
        date_path = now.strftime("%Y%m%d")

        # 生成唯一标识(时间戳 + 4位随机数)
        timestamp = now.strftime("%H%M%S%f")
        unique_id = hashlib.md5(f"{user_id}{timestamp}{name}".encode()).hexdigest()[:8]

        # 组合文件名
        new_filename = f"{unique_id}_{name}{ext}"

        # 确定前缀
        base_prefix = prefix or settings.OSS_UPLOAD_PATH_PREFIX

        # 组合完整路径(加上全局前缀)
        if settings.OSS_GLOBAL_PREFIX:
            return f"{settings.OSS_GLOBAL_PREFIX}/{base_prefix}/{user_id}/{date_path}/{new_filename}"
        return f"{base_prefix}/{user_id}/{date_path}/{new_filename}"

    @staticmethod
    def _generate_object_key_simple(
        filename: str,
        entity_type: str
    ) -> str:
        """
        生成OSS对象键(路径)- 简化版本,用于角色/场景图片转存
        格式: {entity_type}/{date}/{filename}
        """
        # 获取文件扩展名
        ext = Path(filename).suffix.lower()
        name = Path(filename).stem

        # 生成时间戳目录
        now = datetime.now()
        date_path = now.strftime("%Y%m%d")

        # 生成唯一标识(时间戳 + 随机数)
        timestamp = now.strftime("%H%M%S%f")
        unique_id = hashlib.md5(f"{entity_type}{timestamp}{name}".encode()).hexdigest()[:8]

        # 组合文件名
        new_filename = f"{unique_id}{ext}"

        # 组合完整路径:类型/日期/文件名.后缀
        if settings.OSS_GLOBAL_PREFIX:
            return f"{settings.OSS_GLOBAL_PREFIX}/{entity_type}/{date_path}/{new_filename}"
        return f"{entity_type}/{date_path}/{new_filename}"

    @staticmethod
    def _get_content_type(filename: str) -> str:
        """获取文件MIME类型"""
        content_type, _ = mimetypes.guess_type(filename)
        return content_type or 'application/octet-stream'

    @staticmethod
    def _get_extension_from_mime(mime_type: str) -> str:
        """
        根据MIME类型获取文件扩展名

        参数:
            mime_type: MIME类型(如 image/jpeg, video/mp4)

        返回:
            文件扩展名(包含点号,如 .jpg, .mp4)
        """
        # MIME类型到扩展名的映射
        mime_to_ext = {
            # 图片
            'image/jpeg': '.jpg',
            'image/jpg': '.jpg',
            'image/png': '.png',
            'image/gif': '.gif',
            'image/webp': '.webp',
            'image/svg+xml': '.svg',
            'image/bmp': '.bmp',
            'image/x-icon': '.ico',
            # 视频
            'video/mp4': '.mp4',
            'video/mpeg': '.mpeg',
            'video/webm': '.webm',
            'video/quicktime': '.mov',
            'video/x-msvideo': '.avi',
            'video/x-matroska': '.mkv',
            # 音频
            'audio/mpeg': '.mp3',
            'audio/wav': '.wav',
            'audio/ogg': '.ogg',
            'audio/webm': '.weba',
            # 其他
            'application/pdf': '.pdf',
            'text/plain': '.txt',
            'application/json': '.json',
        }

        # 处理带参数的 MIME 类型(如 image/jpeg; charset=utf-8)
        base_mime = mime_type.split(';')[0].strip().lower()

        return mime_to_ext.get(base_mime, '')

    def upload_file(
        self,
        file_data: BinaryIO,
        filename: str,
        user_id: int,
        prefix: Optional[str] = None,
        validate_extension: bool = True
    ) -> Dict[str, Any]:
        """
        直接上传文件到OSS(适合小文件)

        参数:
            file_data: 文件二进制流
            filename: 原始文件名
            user_id: 用户ID
            prefix: 路径前缀(可选)
            validate_extension: 是否验证文件扩展名

        返回:
            上传结果信息
        """
        # 使用重试装饰器包装实际上传操作
        @retry_on_connection_error(
            max_retries=self.max_retries,
            delay=self.retry_delay
        )
        def _do_upload():
            # 验证文件扩展名
            if validate_extension and not self._validate_file_extension(filename):
                raise ValidationException(
                    f"不支持的文件类型,允许的类型: {', '.join(settings.OSS_ALLOWED_EXTENSIONS)}"
                )

            # 生成OSS对象键
            object_key = self._generate_object_key(filename, user_id, prefix)

            # 获取Content-Type
            content_type = self._get_content_type(filename)

            # 上传文件
            result = self.bucket.put_object(
                object_key,
                file_data,
                headers={'Content-Type': content_type}
            )

            # 构建文件URL
            file_url = self._build_file_url(object_key)

            return {
                "object_key": object_key,
                "filename": filename,
                "url": file_url,
                "content_type": content_type,
                "etag": result.etag,
                "uploaded_at": datetime.now().isoformat()
            }

        try:
            return _do_upload()
        except oss2.exceptions.OssError as e:
            logger.error(f"OSS上传失败: {e.message}")
            raise ExternalServiceException(f"OSS上传失败: {e.message}")
        except ValidationException:
            raise
        except Exception as e:
            logger.error(f"文件上传失败: {str(e)}", exc_info=True)
            raise BusinessException(f"文件上传失败: {str(e)}")

    def upload_file_with_size(
        self,
        file_data: BinaryIO,
        filename: str,
        file_size: int,
        user_id: int,
        prefix: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        根据文件大小智能选择上传方式(带文件大小验证)

        参数:
            file_data: 文件二进制流
            filename: 原始文件名
            file_size: 文件大小(字节)
            user_id: 用户ID
            prefix: 路径前缀(可选)

        返回:
            上传结果信息
        """
        # 验证文件大小
        if not self._validate_file_size(file_size):
            raise ValidationException(
                f"文件大小超出限制,最大允许 {settings.OSS_MAX_FILE_SIZE / 1024 / 1024:.2f}MB"
            )

        # 根据文件大小选择上传方式
        if file_size > settings.OSS_MULTIPART_THRESHOLD:
            return self.multipart_upload(file_data, filename, file_size, user_id, prefix)
        else:
            result = self.upload_file(file_data, filename, user_id, prefix)
            result["size"] = file_size
            return result

    def multipart_upload(
        self,
        file_data: BinaryIO,
        filename: str,
        file_size: int,
        user_id: int,
        prefix: Optional[str] = None,
        part_size: Optional[int] = None,
    ) -> Dict[str, Any]:
        """
        分片上传文件到OSS(适合大文件)

        参数:
            file_data: 文件二进制流
            filename: 原始文件名
            file_size: 文件大小(字节)
            user_id: 用户ID
            prefix: 路径前缀(可选)

        返回:
            上传结果信息
        """
        upload_id = None
        object_key = None
        effective_part_size = part_size or settings.OSS_PART_SIZE

        @retry_on_connection_error(
            max_retries=self.max_retries,
            delay=self.retry_delay
        )
        def _do_multipart_upload():
            nonlocal upload_id, object_key

            # 验证文件扩展名
            if not self._validate_file_extension(filename):
                raise ValidationException(
                    f"不支持的文件类型,允许的类型: {', '.join(settings.OSS_ALLOWED_EXTENSIONS)}"
                )

            # 生成OSS对象键
            object_key = self._generate_object_key(filename, user_id, prefix)

            # 获取Content-Type
            content_type = self._get_content_type(filename)

            # 初始化分片上传
            upload_id = self.bucket.init_multipart_upload(
                object_key,
                headers={'Content-Type': content_type}
            ).upload_id

            logger.info(f"[multipart_upload] 开始分片上传: {object_key}, upload_id={upload_id}")

            # 计算分片数量
            part_size = effective_part_size
            part_count = (file_size + part_size - 1) // part_size

            # 上传所有分片
            parts = []
            for part_number in range(1, part_count + 1):
                # 读取分片数据
                offset = (part_number - 1) * part_size
                size = min(part_size, file_size - offset)
                file_data.seek(offset)
                part_data = file_data.read(size)

                # 上传分片(每个分片也使用重试)
                @retry_on_connection_error(
                    max_retries=self.max_retries,
                    delay=self.retry_delay
                )
                def _upload_part():
                    result = self.bucket.upload_part(
                        object_key,
                        upload_id,
                        part_number,
                        part_data
                    )
                    logger.debug(f"分片 {part_number}/{part_count} 上传成功")
                    return result

                result = _upload_part()
                parts.append(oss2.models.PartInfo(part_number, result.etag))

            # 完成分片上传
            @retry_on_connection_error(
                max_retries=self.max_retries,
                delay=self.retry_delay
            )
            def _complete_upload():
                return self.bucket.complete_multipart_upload(
                    object_key,
                    upload_id,
                    parts
                )

            result = _complete_upload()

            # 构建文件URL
            file_url = self._build_file_url(object_key)

            return {
                "object_key": object_key,
                "filename": filename,
                "url": file_url,
                "size": file_size,
                "content_type": content_type,
                "etag": result.etag,
                "upload_id": upload_id,
                "part_count": part_count,
                "uploaded_at": datetime.now().isoformat()
            }

        try:
            return _do_multipart_upload()
        except oss2.exceptions.OssError as e:
            logger.error(f"OSS分片上传失败: {e.message}")
            # 上传失败,尝试取消分片上传
            if upload_id and object_key:
                try:
                    self.bucket.abort_multipart_upload(
                        object_key,
                        upload_id
                    )
                    logger.info(f"已取消分片上传: {object_key}, upload_id={upload_id}")
                except Exception as cancel_error:
                    logger.warning(f"取消分片上传失败: {cancel_error}")
            raise ExternalServiceException(f"OSS分片上传失败: {e.message}")
        except (ValidationException, ExternalServiceException):
            raise
        except Exception as e:
            logger.error(f"分片上传失败: {str(e)}", exc_info=True)
            # 上传失败,尝试取消分片上传
            if upload_id and object_key:
                try:
                    self.bucket.abort_multipart_upload(
                        object_key,
                        upload_id
                    )
                except:
                    pass
            raise BusinessException(f"分片上传失败: {str(e)}")

    def generate_presigned_url(
        self,
        filename: str,
        user_id: int,
        expires: Optional[int] = None,
        prefix: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        生成预签名上传URL(前端直传)

        参数:
            filename: 文件名
            user_id: 用户ID
            expires: 过期时间(秒),默认使用配置值
            prefix: 路径前缀(可选)

        返回:
            预签名URL信息
        """
        try:
            # 验证文件扩展名
            if not self._validate_file_extension(filename):
                raise ValidationException(
                    f"不支持的文件类型,允许的类型: {', '.join(settings.OSS_ALLOWED_EXTENSIONS)}"
                )

            # 生成OSS对象键
            object_key = self._generate_object_key(filename, user_id, prefix)

            # 设置过期时间
            expires = expires or settings.OSS_SIGNED_URL_EXPIRE

            # 生成预签名URL
            signed_url = self.bucket.sign_url(
                'PUT',
                object_key,
                expires,
                headers={'Content-Type': self._get_content_type(filename)}
            )

            # 构建文件URL(上传后的访问URL)
            file_url = self._build_file_url(object_key)

            return {
                "upload_url": signed_url,
                "object_key": object_key,
                "file_url": file_url,
                "expires_in": expires,
                "expires_at": (datetime.now() + timedelta(seconds=expires)).isoformat(),
                "method": "PUT",
                "headers": {
                    "Content-Type": self._get_content_type(filename)
                }
            }

        except oss2.exceptions.OssError as e:
            raise ExternalServiceException(f"生成签名URL失败: {e.message}")
        except ValidationException:
            raise
        except Exception as e:
            raise BusinessException(f"生成签名URL失败: {str(e)}")

    def generate_multipart_presigned_urls(
        self,
        filename: str,
        file_size: int,
        user_id: int,
        expires: Optional[int] = None,
        prefix: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        生成分片上传的预签名URL(前端分片直传)

        参数:
            filename: 文件名
            file_size: 文件大小(字节)
            user_id: 用户ID
            expires: 过期时间(秒)
            prefix: 路径前缀(可选)

        返回:
            分片上传信息和预签名URL列表
        """
        try:
            # 验证文件大小
            if not self._validate_file_size(file_size):
                raise ValidationException(
                    f"文件大小超出限制,最大允许 {settings.OSS_MAX_FILE_SIZE / 1024 / 1024:.2f}MB"
                )

            # 验证文件扩展名
            if not self._validate_file_extension(filename):
                raise ValidationException(
                    f"不支持的文件类型,允许的类型: {', '.join(settings.OSS_ALLOWED_EXTENSIONS)}"
                )

            # 生成OSS对象键
            object_key = self._generate_object_key(filename, user_id, prefix)

            # 初始化分片上传
            upload_id = self.bucket.init_multipart_upload(
                object_key,
                headers={'Content-Type': self._get_content_type(filename)}
            ).upload_id

            # 计算分片数量
            part_size = settings.OSS_PART_SIZE
            part_count = (file_size + part_size - 1) // part_size

            # 设置过期时间
            expires = expires or settings.OSS_SIGNED_URL_EXPIRE

            # 为每个分片生成预签名URL
            part_urls = []
            for part_number in range(1, part_count + 1):
                params = {
                    'uploadId': upload_id,
                    'partNumber': str(part_number)
                }
                signed_url = self.bucket.sign_url(
                    'PUT',
                    object_key,
                    expires,
                    params=params
                )
                part_urls.append({
                    "part_number": part_number,
                    "upload_url": signed_url
                })

            # 构建文件URL(完成后的访问URL)
            file_url = self._build_file_url(object_key)

            return {
                "upload_id": upload_id,
                "object_key": object_key,
                "file_url": file_url,
                "part_size": part_size,
                "part_count": part_count,
                "part_urls": part_urls,
                "expires_in": expires,
                "expires_at": (datetime.now() + timedelta(seconds=expires)).isoformat()
            }

        except oss2.exceptions.OssError as e:
            raise ExternalServiceException(f"初始化分片上传失败: {e.message}")
        except (ValidationException, ExternalServiceException):
            raise
        except Exception as e:
            raise BusinessException(f"初始化分片上传失败: {str(e)}")

    def complete_multipart_upload_by_client(
        self,
        object_key: str,
        upload_id: str,
        parts: list
    ) -> Dict[str, Any]:
        """
        完成客户端分片上传

        参数:
            object_key: OSS对象键
            upload_id: 上传ID
            parts: 分片信息列表 [{"part_number": 1, "etag": "xxx"}, ...]

        返回:
            完成结果
        """
        try:
            # 构建分片信息
            part_info_list = [
                oss2.models.PartInfo(part["part_number"], part["etag"])
                for part in parts
            ]

            # 完成分片上传
            result = self.bucket.complete_multipart_upload(
                object_key,
                upload_id,
                part_info_list
            )

            # 构建文件URL
            file_url = self._build_file_url(object_key)

            return {
                "object_key": object_key,
                "url": file_url,
                "etag": result.etag,
                "completed_at": datetime.now().isoformat()
            }

        except oss2.exceptions.OssError as e:
            raise ExternalServiceException(f"完成分片上传失败: {e.message}")
        except Exception as e:
            raise BusinessException(f"完成分片上传失败: {str(e)}")

    def abort_multipart_upload(
        self,
        object_key: str,
        upload_id: str
    ) -> bool:
        """
        取消分片上传

        参数:
            object_key: OSS对象键
            upload_id: 上传ID

        返回:
            是否成功
        """
        @retry_on_connection_error(
            max_retries=self.max_retries,
            delay=self.retry_delay
        )
        def _do_abort():
            self.bucket.abort_multipart_upload(
                object_key,
                upload_id
            )

        try:
            _do_abort()
            return True
        except oss2.exceptions.OssError as e:
            raise ExternalServiceException(f"取消分片上传失败: {e.message}")
        except Exception as e:
            raise BusinessException(f"取消分片上传失败: {str(e)}")

    def delete_file(self, object_key: str) -> bool:
        """
        删除OSS文件

        参数:
            object_key: OSS对象键

        返回:
            是否成功
        """
        try:
            self.bucket.delete_object(object_key)
            return True
        except oss2.exceptions.OssError as e:
            raise ExternalServiceException(f"删除文件失败: {e.message}")
        except Exception as e:
            raise BusinessException(f"删除文件失败: {str(e)}")

    def delete_files_batch(self, object_keys: list) -> Dict[str, Any]:
        """
        批量删除OSS文件

        参数:
            object_keys: OSS对象键列表

        返回:
            删除结果
        """
        try:
            result = self.bucket.batch_delete_objects(object_keys)
            return {
                "deleted_count": len(result.deleted_keys),
                "deleted_keys": result.deleted_keys
            }
        except oss2.exceptions.OssError as e:
            raise ExternalServiceException(f"批量删除文件失败: {e.message}")
        except Exception as e:
            raise BusinessException(f"批量删除文件失败: {str(e)}")

    def upload_from_url(
        self,
        url: str,
        entity_type: str,
        filename: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        从URL下载文件并上传到OSS(用于转存外部生成的图片或视频)

        参数:
            url: 文件URL(图片或视频)
            entity_type: 实体类型(character/scene),用于构建存储路径
            filename: 自定义文件名(可选),如果未提供则从URL提取或根据Content-Type推断

        返回:
            上传结果信息
        """
        import requests
        from urllib.parse import urlparse

        @retry_on_connection_error(
            max_retries=self.max_retries,
            delay=self.retry_delay
        )
        def _do_upload():
            # 下载文件
            try:
                response = requests.get(url, timeout=30, stream=True)
                response.raise_for_status()
            except Exception as e:
                logger.error(f"从URL下载文件失败: {url}, error: {e}")
                raise ExternalServiceException(f"从URL下载文件失败: {str(e)}")

            # 获取文件内容和Content-Type
            file_data = response.content
            response_content_type = response.headers.get('Content-Type', '')

            # 确定文件名
            final_filename = filename
            if not final_filename:
                # 尝试从URL提取文件名
                parsed_url = urlparse(url)
                path = parsed_url.path
                final_filename = Path(path).name

                # 如果URL中没有文件名或没有扩展名,根据Content-Type推断
                if not final_filename or '.' not in final_filename:
                    ext = self._get_extension_from_mime(response_content_type)
                    if ext:
                        # 根据扩展名确定文件类型前缀
                        type_prefix = 'video' if ext.startswith('.mp') or ext in ('.webm', '.mov', '.avi', '.mkv', '.mpeg') else 'file'
                        final_filename = f"{type_prefix}_{int(datetime.now().timestamp())}{ext}"
                    else:
                        # 如果无法推断,使用通用名称
                        final_filename = f"file_{int(datetime.now().timestamp())}.bin"

            # 生成OSS对象键
            object_key = self._generate_object_key_simple(final_filename, entity_type)

            # 从文件名获取Content-Type(优先使用响应头的Content-Type)
            content_type = response_content_type or self._get_content_type(final_filename)

            # 上传文件
            result = self.bucket.put_object(
                object_key,
                file_data,
                headers={'Content-Type': content_type}
            )

            # 构建文件URL
            file_url = self._build_file_url(object_key)

            logger.info(f"文件转存成功: {url} -> {file_url}")

            return {
                "object_key": object_key,
                "filename": final_filename,
                "url": file_url,
                "content_type": content_type,
                "size": len(file_data),
                "etag": result.etag,
                "uploaded_at": datetime.now().isoformat()
            }

        try:
            return _do_upload()
        except oss2.exceptions.OssError as e:
            logger.error(f"OSS转存失败: {e.message}")
            raise ExternalServiceException(f"OSS转存失败: {e.message}")
        except (ExternalServiceException,):
            raise
        except Exception as e:
            logger.error(f"文件转存失败: {str(e)}", exc_info=True)
            raise BusinessException(f"文件转存失败: {str(e)}")

    def upload_from_base64(
        self,
        base64_data_url: str,
        entity_type: str,
        filename: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        从 Base64 data URL 解码并上传到 OSS

        参数:
            base64_data_url: Base64 data URL (格式: data:image/png;base64,...)
            entity_type: 实体类型(image/character/scene),用于构建存储路径
            filename: 自定义文件名(可选),如果未提供则自动生成

        返回:
            上传结果信息
        """
        import base64
        import re

        @retry_on_connection_error(
            max_retries=self.max_retries,
            delay=self.retry_delay
        )
        def _do_upload():
            # 解析 Base64 data URL
            # 格式: data:image/png;base64,iVBORw0KGgo...
            if not base64_data_url.startswith('data:'):
                raise ValueError(f"无效的 Base64 data URL 格式")

            # 提取 MIME 类型和 Base64 数据
            match = re.match(r'data:([^;]+);base64,(.+)', base64_data_url)
            if not match:
                raise ValueError(f"无法解析 Base64 data URL")

            mime_type = match.group(1)  # 例如: image/png
            base64_data = match.group(2)

            # 根据 MIME 类型确定文件扩展名
            ext_map = {
                'image/png': '.png',
                'image/jpeg': '.jpg',
                'image/jpg': '.jpg',
                'image/gif': '.gif',
                'image/webp': '.webp',
            }
            ext = ext_map.get(mime_type.lower(), '.png')

            # 确定文件名
            if filename:
                final_filename = filename
                # 确保文件名有正确的扩展名
                if not final_filename.endswith(ext):
                    final_filename = Path(filename).stem + ext
            else:
                # 生成默认文件名
                final_filename = f"image_{int(datetime.now().timestamp())}{ext}"

            # 解码 Base64 数据
            try:
                file_data = base64.b64decode(base64_data)
            except Exception as e:
                logger.error(f"Base64 解码失败: {e}")
                raise ExternalServiceException(f"Base64 解码失败: {str(e)}")

            # 生成 OSS 对象键
            object_key = self._generate_object_key_simple(final_filename, entity_type)

            # 上传文件
            result = self.bucket.put_object(
                object_key,
                file_data,
                headers={'Content-Type': mime_type}
            )

            # 构建文件 URL
            file_url = self._build_file_url(object_key)

            logger.info(f"Base64 图片上传成功: size={len(file_data)} -> {file_url}")

            return {
                "object_key": object_key,
                "filename": final_filename,
                "url": file_url,
                "content_type": mime_type,
                "size": len(file_data),
                "etag": result.etag,
                "uploaded_at": datetime.now().isoformat()
            }

        try:
            return _do_upload()
        except oss2.exceptions.OssError as e:
            logger.error(f"OSS 上传失败: {e.message}")
            raise ExternalServiceException(f"OSS 上传失败: {e.message}")
        except (ValueError, ExternalServiceException):
            raise
        except Exception as e:
            logger.error(f"Base64 图片上传失败: {str(e)}", exc_info=True)
            raise BusinessException(f"Base64 图片上传失败: {str(e)}")

    def get_file_info(self, object_key: str) -> Dict[str, Any]:
        """
        获取文件信息

        参数:
            object_key: OSS对象键

        返回:
            文件信息
        """
        try:
            meta = self.bucket.get_object_meta(object_key)
            return {
                "object_key": object_key,
                "size": meta.headers.get('Content-Length'),
                "content_type": meta.headers.get('Content-Type'),
                "etag": meta.headers.get('ETag'),
                "last_modified": meta.headers.get('Last-Modified')
            }
        except oss2.exceptions.NoSuchKey:
            raise NotFoundException(f"文件不存在: {object_key}")
        except oss2.exceptions.OssError as e:
            raise ExternalServiceException(f"获取文件信息失败: {e.message}")
        except Exception as e:
            raise BusinessException(f"获取文件信息失败: {str(e)}")

    def file_exists(self, object_key: str) -> bool:
        """
        检查文件是否存在

        参数:
            object_key: OSS对象键

        返回:
            是否存在
        """
        try:
            return self.bucket.object_exists(object_key)
        except Exception:
            return False

    def generate_download_url(
        self,
        object_key: str,
        expires: Optional[int] = None,
        filename: Optional[str] = None
    ) -> str:
        """
        生成文件下载URL(临时访问)

        参数:
            object_key: OSS对象键
            expires: 过期时间(秒)
            filename: 下载时的文件名(可选)

        返回:
            下载URL
        """
        try:
            expires = expires or settings.OSS_SIGNED_URL_EXPIRE

            params = {}
            if filename:
                params['response-content-disposition'] = f'attachment; filename="{filename}"'

            return self.bucket.sign_url(
                'GET',
                object_key,
                expires,
                params=params
            )
        except oss2.exceptions.OssError as e:
            raise ExternalServiceException(f"生成下载URL失败: {e.message}")
        except Exception as e:
            raise BusinessException(f"生成下载URL失败: {str(e)}")

    def _build_file_url(self, object_key: str) -> str:
        """
        构建文件访问URL

        参数:
            object_key: OSS对象键

        返回:
            文件URL
        """
        # 如果配置了CDN域名,使用CDN域名
        if settings.OSS_CDN_DOMAIN:
            # 移除可能存在的 http:// 或 https:// 前��
            cdn_domain = settings.OSS_CDN_DOMAIN
            if cdn_domain.startswith("http://"):
                cdn_domain = cdn_domain[7:]
            elif cdn_domain.startswith("https://"):
                cdn_domain = cdn_domain[8:]

            # 移除域名开头的 /
            if cdn_domain.startswith("/"):
                cdn_domain = cdn_domain[1:]

            protocol = "https" if settings.OSS_USE_HTTPS else "http"
            return f"{protocol}://{cdn_domain}/{object_key}"

        # 否则使用OSS域名
        # 移除 endpoint 中可能存在的协议前缀
        endpoint = settings.OSS_ENDPOINT
        if endpoint.startswith("http://"):
            endpoint = endpoint[7:]
        elif endpoint.startswith("https://"):
            endpoint = endpoint[8:]

        protocol = "https" if settings.OSS_USE_HTTPS else "http"
        return f"{protocol}://{settings.OSS_BUCKET_NAME}.{endpoint}/{object_key}"

    def generate_sts_credentials(
        self,
        user_id: int,
        duration_seconds: Optional[int] = None,
        policy: Optional[str] = None,
        role_session_name: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        生成OSS STS临时凭证(用于前端直传)

        参数:
            user_id: 用户ID(用于构建会话名称)
            duration_seconds: 凭证有效期(秒),默认使用配置值
            policy: 自定义策略(可选,JSON字符串)
            role_session_name: 角色会话名称(可选)

        返回:
            STS临时凭证信息,包括:
            - access_key_id: 临时AccessKey ID
            - access_key_secret: 临时AccessKey Secret
            - security_token: 安全令牌
            - expiration: 过期时间(UTC)
            - region: 区域
            - bucket: Bucket名称
            - endpoint: OSS端点
            - upload_path_prefix: 上传路径前缀
        """
        if not settings.OSS_STS_ROLE_ARN:
            raise BusinessException(
                "STS Role ARN未配置,请检查环境变量 OSS_STS_ROLE_ARN"
            )

        if not all([
            settings.OSS_ACCESS_KEY_ID,
            settings.OSS_ACCESS_KEY_SECRET,
            settings.OSS_ENDPOINT
        ]):
            raise BusinessException(
                "OSS配置不完整,请检查环境变量"
            )

        try:
            # 确定区域ID
            # AcsClient 的第三个参数是 region_id,不是 endpoint
            # 格式: cn-hangzhou, cn-beijing 等
            if settings.OSS_REGION:
                region = settings.OSS_REGION
            else:
                # 从OSS端点提取区域
                # 格式: oss-cn-hangzhou.aliyuncs.com -> cn-hangzhou
                import re
                endpoint = settings.OSS_ENDPOINT
                if not endpoint:
                    raise BusinessException("OSS_ENDPOINT 未配置")
                match = re.search(r'oss-(\w+)-(\w+)\.aliyuncs\.com', endpoint)
                if match:
                    region = f"{match.group(1)}-{match.group(2)}"
                else:
                    raise BusinessException(
                        "无法从OSS端点确定区域,请配置 OSS_REGION"
                    )

            # 创建STS客户端(使用正确的 region_id 参数)
            client = AcsClient(
                settings.OSS_ACCESS_KEY_ID,
                settings.OSS_ACCESS_KEY_SECRET,
                region
            )

            # 创建AssumeRole请求
            request = AssumeRoleRequest.AssumeRoleRequest()
            request.set_RoleArn(settings.OSS_STS_ROLE_ARN)
            request.set_RoleSessionName(
                role_session_name or f"aimv-frontend-upload-user-{user_id}"
            )
            request.set_DurationSeconds(
                duration_seconds or settings.OSS_STS_DURATION_SECONDS
            )

            # 设置策略(如果提供了自定义策略)
            if policy:
                request.set_Policy(policy)
            elif settings.OSS_STS_POLICY:
                request.set_Policy(settings.OSS_STS_POLICY)

            # 发送请求
            response = client.do_action_with_exception(request)
            result = json.loads(response.decode('utf-8'))

            # 提取凭证信息
            credentials = result['Credentials']

            # 将过期时间转换为北京时间(UTC+8)
            from app.schemas.common import convert_datetime_to_beijing
            expiration_utc = datetime.strptime(credentials['Expiration'], '%Y-%m-%dT%H:%M:%SZ')
            expiration_str = convert_datetime_to_beijing(expiration_utc)

            # 构建上传路径前缀
            upload_path_prefix = f"{settings.OSS_UPLOAD_PATH_PREFIX}/{user_id}"
            if settings.OSS_GLOBAL_PREFIX:
                upload_path_prefix = f"{settings.OSS_GLOBAL_PREFIX}/{upload_path_prefix}"

            return {
                "access_key_id": credentials['AccessKeyId'],
                "access_key_secret": credentials['AccessKeySecret'],
                "security_token": credentials['SecurityToken'],
                "expiration": expiration_str,
                "region": settings.OSS_REGION,
                "bucket": settings.OSS_BUCKET_NAME,
                "endpoint": settings.OSS_ENDPOINT,
                "cdn_domain": settings.OSS_CDN_DOMAIN,
                "upload_path_prefix": upload_path_prefix,
            }

        except Exception as e:
            logger.error(f"生成STS临时凭证失败: {str(e)}", exc_info=True)
            raise ExternalServiceException(
                f"生成STS临时凭证失败: {str(e)}"
            )

    def generate_user_upload_policy(self, user_id: int) -> str:
        """
        为用户生成上传策略(限制只能上传到指定用户目录)

        参数:
            user_id: 用户ID

        返回:
            策略JSON字符串
        """
        # 构建用户专属路径
        upload_path_prefix = f"{settings.OSS_UPLOAD_PATH_PREFIX}/{user_id}"
        if settings.OSS_GLOBAL_PREFIX:
            upload_path_prefix = f"{settings.OSS_GLOBAL_PREFIX}/{upload_path_prefix}"

        # 构建策略
        policy = {
            "Version": "1",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Action": [
                        "oss:PutObject",
                        "oss:InitiateMultipartUpload",
                        "oss:UploadPart",
                        "oss:CompleteMultipartUpload",
                        "oss:AbortMultipartUpload"
                    ],
                    "Resource": [
                        f"acs:oss:*:*:{settings.OSS_BUCKET_NAME}/{upload_path_prefix}/*"
                    ]
                },
                {
                    "Effect": "Allow",
                    "Action": [
                        "oss:ListObjects"
                    ],
                    "Resource": [
                        f"acs:oss:*:*:{settings.OSS_BUCKET_NAME}",
                        f"acs:oss:*:*:{settings.OSS_BUCKET_NAME}/{upload_path_prefix}"
                    ],
                    "Condition": {
                        "StringLike": {
                            "oss:prefix": [f"{upload_path_prefix}/*", f"{upload_path_prefix}"]
                        }
                    }
                }
            ]
        }

        return json.dumps(policy)


# 创建全局OSS服务实例(延迟初始化)
_oss_service_instance = None


def get_oss_service() -> OSSService:
    """获取OSS服务实例(单例模式)"""
    global _oss_service_instance
    if _oss_service_instance is None:
        _oss_service_instance = OSSService()
    return _oss_service_instance


# 全局OSS服务实例
oss_service = get_oss_service()