File size: 10,484 Bytes
8ede856
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
"""媒体文件处理工具

提供音视频格式转换、时长获取等功能。
"""

import asyncio
import os
import subprocess
import uuid
from pathlib import Path

from astrbot import logger
from astrbot.core.utils.astrbot_path import get_astrbot_temp_path


async def get_media_duration(file_path: str) -> int | None:
    """使用ffprobe获取媒体文件时长

    Args:
        file_path: 媒体文件路径

    Returns:
        时长(毫秒),如果获取失败返回None
    """
    try:
        # 使用ffprobe获取时长
        process = await asyncio.create_subprocess_exec(
            "ffprobe",
            "-v",
            "error",
            "-show_entries",
            "format=duration",
            "-of",
            "default=noprint_wrappers=1:nokey=1",
            file_path,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )

        stdout, stderr = await process.communicate()

        if process.returncode == 0 and stdout:
            duration_seconds = float(stdout.decode().strip())
            duration_ms = int(duration_seconds * 1000)
            logger.debug(f"[Media Utils] 获取媒体时长: {duration_ms}ms")
            return duration_ms
        else:
            logger.warning(f"[Media Utils] 无法获取媒体文件时长: {file_path}")
            return None

    except FileNotFoundError:
        logger.warning(
            "[Media Utils] ffprobe未安装或不在PATH中,无法获取媒体时长。请安装ffmpeg: https://ffmpeg.org/"
        )
        return None
    except Exception as e:
        logger.warning(f"[Media Utils] 获取媒体时长时出错: {e}")
        return None


async def convert_audio_to_opus(audio_path: str, output_path: str | None = None) -> str:
    """使用ffmpeg将音频转换为opus格式

    Args:
        audio_path: 原始音频文件路径
        output_path: 输出文件路径,如果为None则自动生成

    Returns:
        转换后的opus文件路径

    Raises:
        Exception: 转换失败时抛出异常
    """
    # 如果已经是opus格式,直接返回
    if audio_path.lower().endswith(".opus"):
        return audio_path

    # 生成输出文件路径
    if output_path is None:
        temp_dir = get_astrbot_temp_path()
        os.makedirs(temp_dir, exist_ok=True)
        output_path = os.path.join(temp_dir, f"media_audio_{uuid.uuid4().hex}.opus")

    try:
        # 使用ffmpeg转换为opus格式
        # -y: 覆盖输出文件
        # -i: 输入文件
        # -acodec libopus: 使用opus编码器
        # -ac 1: 单声道
        # -ar 16000: 采样率16kHz
        process = await asyncio.create_subprocess_exec(
            "ffmpeg",
            "-y",
            "-i",
            audio_path,
            "-acodec",
            "libopus",
            "-ac",
            "1",
            "-ar",
            "16000",
            output_path,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )

        stdout, stderr = await process.communicate()

        if process.returncode != 0:
            # 清理可能已生成但无效的临时文件
            if output_path and os.path.exists(output_path):
                try:
                    os.remove(output_path)
                    logger.debug(
                        f"[Media Utils] 已清理失败的opus输出文件: {output_path}"
                    )
                except OSError as e:
                    logger.warning(f"[Media Utils] 清理失败的opus输出文件时出错: {e}")

            error_msg = stderr.decode() if stderr else "未知错误"
            logger.error(f"[Media Utils] ffmpeg转换音频失败: {error_msg}")
            raise Exception(f"ffmpeg conversion failed: {error_msg}")

        logger.debug(f"[Media Utils] 音频转换成功: {audio_path} -> {output_path}")
        return output_path

    except FileNotFoundError:
        logger.error(
            "[Media Utils] ffmpeg未安装或不在PATH中,无法转换音频格式。请安装ffmpeg: https://ffmpeg.org/"
        )
        raise Exception("ffmpeg not found")
    except Exception as e:
        logger.error(f"[Media Utils] 转换音频格式时出错: {e}")
        raise


async def convert_video_format(
    video_path: str, output_format: str = "mp4", output_path: str | None = None
) -> str:
    """使用ffmpeg转换视频格式

    Args:
        video_path: 原始视频文件路径
        output_format: 目标格式,默认mp4
        output_path: 输出文件路径,如果为None则自动生成

    Returns:
        转换后的视频文件路径

    Raises:
        Exception: 转换失败时抛出异常
    """
    # 如果已经是目标格式,直接返回
    if video_path.lower().endswith(f".{output_format}"):
        return video_path

    # 生成输出文件路径
    if output_path is None:
        temp_dir = get_astrbot_temp_path()
        os.makedirs(temp_dir, exist_ok=True)
        output_path = os.path.join(
            temp_dir,
            f"media_video_{uuid.uuid4().hex}.{output_format}",
        )

    try:
        # 使用ffmpeg转换视频格式
        process = await asyncio.create_subprocess_exec(
            "ffmpeg",
            "-y",
            "-i",
            video_path,
            "-c:v",
            "libx264",
            "-c:a",
            "aac",
            output_path,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )

        stdout, stderr = await process.communicate()

        if process.returncode != 0:
            # 清理可能已生成但无效的临时文件
            if output_path and os.path.exists(output_path):
                try:
                    os.remove(output_path)
                    logger.debug(
                        f"[Media Utils] 已清理失败的{output_format}输出文件: {output_path}"
                    )
                except OSError as e:
                    logger.warning(
                        f"[Media Utils] 清理失败的{output_format}输出文件时出错: {e}"
                    )

            error_msg = stderr.decode() if stderr else "未知错误"
            logger.error(f"[Media Utils] ffmpeg转换视频失败: {error_msg}")
            raise Exception(f"ffmpeg conversion failed: {error_msg}")

        logger.debug(f"[Media Utils] 视频转换成功: {video_path} -> {output_path}")
        return output_path

    except FileNotFoundError:
        logger.error(
            "[Media Utils] ffmpeg未安装或不在PATH中,无法转换视频格式。请安装ffmpeg: https://ffmpeg.org/"
        )
        raise Exception("ffmpeg not found")
    except Exception as e:
        logger.error(f"[Media Utils] 转换视频格式时出错: {e}")
        raise


async def convert_audio_format(
    audio_path: str,
    output_format: str = "amr",
    output_path: str | None = None,
) -> str:
    """使用ffmpeg将音频转换为指定格式。

    Args:
        audio_path: 原始音频文件路径
        output_format: 目标格式,例如 amr / ogg
        output_path: 输出文件路径,如果为None则自动生成

    Returns:
        转换后的音频文件路径
    """
    if audio_path.lower().endswith(f".{output_format}"):
        return audio_path

    if output_path is None:
        temp_dir = Path(get_astrbot_temp_path())
        temp_dir.mkdir(parents=True, exist_ok=True)
        output_path = str(temp_dir / f"media_audio_{uuid.uuid4().hex}.{output_format}")

    args = ["ffmpeg", "-y", "-i", audio_path]
    if output_format == "amr":
        args.extend(["-ac", "1", "-ar", "8000", "-ab", "12.2k"])
    elif output_format == "ogg":
        args.extend(["-acodec", "libopus", "-ac", "1", "-ar", "16000"])
    args.append(output_path)

    try:
        process = await asyncio.create_subprocess_exec(
            *args,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
        _, stderr = await process.communicate()
        if process.returncode != 0:
            if output_path and os.path.exists(output_path):
                try:
                    os.remove(output_path)
                except OSError as e:
                    logger.warning(f"[Media Utils] 清理失败的音频输出文件时出错: {e}")
            error_msg = stderr.decode() if stderr else "未知错误"
            raise Exception(f"ffmpeg conversion failed: {error_msg}")
        logger.debug(f"[Media Utils] 音频转换成功: {audio_path} -> {output_path}")
        return output_path
    except FileNotFoundError:
        raise Exception("ffmpeg not found")


async def convert_audio_to_amr(audio_path: str, output_path: str | None = None) -> str:
    """将音频转换为amr格式。"""
    return await convert_audio_format(
        audio_path=audio_path,
        output_format="amr",
        output_path=output_path,
    )


async def convert_audio_to_wav(audio_path: str, output_path: str | None = None) -> str:
    """将音频转换为wav格式。"""
    return await convert_audio_format(
        audio_path=audio_path,
        output_format="wav",
        output_path=output_path,
    )


async def extract_video_cover(
    video_path: str,
    output_path: str | None = None,
) -> str:
    """从视频中提取封面图(JPG)。"""
    if output_path is None:
        temp_dir = Path(get_astrbot_temp_path())
        temp_dir.mkdir(parents=True, exist_ok=True)
        output_path = str(temp_dir / f"media_cover_{uuid.uuid4().hex}.jpg")

    try:
        process = await asyncio.create_subprocess_exec(
            "ffmpeg",
            "-y",
            "-i",
            video_path,
            "-ss",
            "00:00:00",
            "-frames:v",
            "1",
            output_path,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
        _, stderr = await process.communicate()
        if process.returncode != 0:
            if output_path and os.path.exists(output_path):
                try:
                    os.remove(output_path)
                except OSError as e:
                    logger.warning(f"[Media Utils] 清理失败的视频封面文件时出错: {e}")
            error_msg = stderr.decode() if stderr else "未知错误"
            raise Exception(f"ffmpeg extract cover failed: {error_msg}")
        return output_path
    except FileNotFoundError:
        raise Exception("ffmpeg not found")