File size: 9,760 Bytes
4e37375
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
"""OSS云存储服务模块

提供阿里云OSS文件上传、下载和管理功能。
"""

import os
import uuid
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Optional, Tuple
import asyncio
import aiohttp
import oss2
from oss2.exceptions import OssError

from ..core.config import get_config
from ..utils.logger import get_task_logger


class OSSService:
    """OSS云存储服务"""
    
    def __init__(self):
        """初始化OSS服务"""
        self.config = get_config()
        self.oss_config = self.config.oss
        
        # 初始化OSS客户端
        auth = oss2.Auth(
            self.oss_config.access_key_id,
            self.oss_config.access_key_secret
        )
        self.bucket = oss2.Bucket(
            auth,
            self.oss_config.endpoint,
            self.oss_config.bucket_name
        )
        
        self.logger = get_task_logger(logger_name="transcript_service.oss")
    
    def _generate_object_key(self, filename: str, task_id: str) -> str:
        """生成OSS对象键名
        
        Args:
            filename: 原始文件名
            task_id: 任务ID
            
        Returns:
            OSS对象键名
        """
        now = datetime.now()
        date_path = now.strftime("%Y/%m/%d")
        timestamp = now.strftime("%Y%m%d_%H%M%S")
        
        # 获取文件扩展名
        file_ext = Path(filename).suffix
        safe_filename = f"{timestamp}_{task_id}_{uuid.uuid4().hex[:8]}{file_ext}"
        
        return f"{self.oss_config.temp_prefix}/{date_path}/{safe_filename}"
    
    async def upload_file(self, file_path: Path, task_id: str) -> Tuple[bool, str, Optional[str]]:
        """上传文件到OSS
        
        Args:
            file_path: 本地文件路径
            task_id: 任务ID
            
        Returns:
            (是否成功, 公网URL或错误信息, 对象键名)
        """
        try:
            self.logger.info(f"开始上传文件到OSS: {file_path.name}")
            
            # 生成对象键名
            object_key = self._generate_object_key(file_path.name, task_id)
            
            # 上传文件并设置公共读取权限
            try:
                # 首先上传文件
                self.bucket.put_object_from_file(object_key, str(file_path))
                
                # 设置对象ACL为公共读取
                self.bucket.put_object_acl(object_key, oss2.OBJECT_ACL_PUBLIC_READ)
                
                # 生成公网访问URL
                url = self._generate_public_url(object_key)
                self.logger.info(f"文件上传成功: {object_key}, URL: {url}")
                return True, url, object_key
                
            except oss2.exceptions.OssError as oss_err:
                # 如果设置ACL失败,尝试使用签名URL
                if 'public-read' in str(oss_err).lower():
                    self.logger.warning(f"ACL设置失败,使用签名URL: {oss_err}")
                    url = self._generate_signed_url(object_key)
                    self.logger.info(f"文件上传成功: {object_key}, URL: {url}")
                    return True, url, object_key
                else:
                    raise
                
        except OssError as e:
            error_msg = f"OSS错误: {str(e)}"
            self.logger.error(error_msg)
            return False, error_msg, None
        except Exception as e:
            error_msg = f"上传文件时发生未知错误: {str(e)}"
            self.logger.exception(error_msg)
            return False, error_msg, None
    
    async def upload_multiple_files(self, file_paths: List[Path], task_id: str) -> List[Tuple[str, bool, str, Optional[str]]]:
        """批量上传文件到OSS
        
        Args:
            file_paths: 本地文件路径列表
            task_id: 任务ID
            
        Returns:
            [(文件名, 是否成功, URL或错误信息, 对象键名), ...]
        """
        results = []
        
        # 创建异步任务
        tasks = []
        for file_path in file_paths:
            task = self._upload_single_file_async(file_path, task_id)
            tasks.append((file_path.name, task))
        
        # 等待所有上传完成
        for filename, task in tasks:
            success, url_or_error, object_key = await task
            results.append((filename, success, url_or_error, object_key))
        
        return results
    
    async def _upload_single_file_async(self, file_path: Path, task_id: str) -> Tuple[bool, str, Optional[str]]:
        """异步上传单个文件"""
        return await asyncio.get_event_loop().run_in_executor(
            None, 
            lambda: asyncio.run(self.upload_file(file_path, task_id))
        )
    
    def _generate_public_url(self, object_key: str) -> str:
        """生成公网访问URL
        
        Args:
            object_key: OSS对象键名
            
        Returns:
            公网访问URL
        """
        # 生成简单的公网访问URL(不带签名)
        # 正确的格式: https://bucket-name.endpoint/object-key
        # 注意: endpoint不能包含协议前缀
        endpoint = self.oss_config.endpoint
        if endpoint.startswith('http://'):
            endpoint = endpoint[7:]
        elif endpoint.startswith('https://'):
            endpoint = endpoint[8:]
        
        # 构造公网URL - 注意这里的格式必须正确
        url = f"https://{self.oss_config.bucket_name}.{endpoint}/{object_key}"
        
        # 记录生成的URL以便调试
        self.logger.debug(f"生成公网URL: {url}")
        
        return url
    
    def _generate_signed_url(self, object_key: str) -> str:
        """生成签名URL(备用方案)
        
        Args:
            object_key: OSS对象键名
            
        Returns:
            签名URL
        """
        # 生成有时效性的签名URL
        expire_time = int((datetime.now() + timedelta(hours=self.oss_config.url_expire_hours)).timestamp())
        url = self.bucket.sign_url('GET', object_key, expire_time)
        return url
    
    def delete_file(self, object_key: str) -> bool:
        """删除OSS文件
        
        Args:
            object_key: OSS对象键名
            
        Returns:
            是否删除成功
        """
        try:
            self.bucket.delete_object(object_key)
            self.logger.info(f"文件删除成功: {object_key}")
            return True
        except OssError as e:
            self.logger.error(f"删除文件失败: {object_key}, 错误: {str(e)}")
            return False
        except Exception as e:
            self.logger.exception(f"删除文件时发生未知错误: {object_key}, 错误: {str(e)}")
            return False
    
    def cleanup_old_files(self, days: Optional[int] = None) -> int:
        """清理过期的临时文件
        
        Args:
            days: 保留天数,默认使用配置中的值
            
        Returns:
            删除的文件数量
        """
        cleanup_days = days or self.oss_config.auto_cleanup_days
        cutoff_date = datetime.now() - timedelta(days=cleanup_days)
        
        deleted_count = 0
        prefix = self.oss_config.temp_prefix
        
        try:
            # 列出所有临时文件
            for obj in oss2.ObjectIterator(self.bucket, prefix=prefix):
                # 检查文件最后修改时间
                if obj.last_modified.replace(tzinfo=None) < cutoff_date:
                    if self.delete_file(obj.key):
                        deleted_count += 1
            
            self.logger.info(f"清理完成,删除了 {deleted_count} 个过期文件")
            return deleted_count
            
        except Exception as e:
            self.logger.exception(f"清理过期文件时发生错误: {str(e)}")
            return deleted_count
    
    def get_file_info(self, object_key: str) -> Optional[dict]:
        """获取文件信息
        
        Args:
            object_key: OSS对象键名
            
        Returns:
            文件信息字典
        """
        try:
            info = self.bucket.head_object(object_key)
            return {
                'size': info.content_length,
                'last_modified': info.last_modified,
                'etag': info.etag,
                'content_type': info.content_type
            }
        except OssError as e:
            self.logger.error(f"获取文件信息失败: {object_key}, 错误: {str(e)}")
            return None
    
    def check_bucket_exists(self) -> bool:
        """检查存储桶是否存在
        
        Returns:
            存储桶是否存在
        """
        try:
            return self.bucket.bucket_exists()
        except Exception as e:
            self.logger.error(f"检查存储桶失败: {str(e)}")
            return False
    
    def get_bucket_info(self) -> Optional[dict]:
        """获取存储桶信息
        
        Returns:
            存储桶信息
        """
        try:
            info = self.bucket.get_bucket_info()
            return {
                'name': info.name,
                'location': info.location,
                'creation_date': info.creation_date,
                'storage_class': info.storage_class
            }
        except Exception as e:
            self.logger.error(f"获取存储桶信息失败: {str(e)}")
            return None


# 全局OSS服务实例
oss_service = OSSService()


def get_oss_service() -> OSSService:
    """获取OSS服务实例
    
    Returns:
        OSS服务实例
    """
    return oss_service