| """媒体文件处理工具 |
| |
| 提供音视频格式转换、时长获取等功能。 |
| """ |
|
|
| import asyncio |
| import os |
| import subprocess |
| import uuid |
| from pathlib import Path |
|
|
| from astrbot import logger |
| from astrbot.core.utils.astrbot_path import get_astrbot_temp_path |
|
|
|
|
| async def get_media_duration(file_path: str) -> int | None: |
| """使用ffprobe获取媒体文件时长 |
| |
| Args: |
| file_path: 媒体文件路径 |
| |
| Returns: |
| 时长(毫秒),如果获取失败返回None |
| """ |
| try: |
| |
| process = await asyncio.create_subprocess_exec( |
| "ffprobe", |
| "-v", |
| "error", |
| "-show_entries", |
| "format=duration", |
| "-of", |
| "default=noprint_wrappers=1:nokey=1", |
| file_path, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| ) |
|
|
| stdout, stderr = await process.communicate() |
|
|
| if process.returncode == 0 and stdout: |
| duration_seconds = float(stdout.decode().strip()) |
| duration_ms = int(duration_seconds * 1000) |
| logger.debug(f"[Media Utils] 获取媒体时长: {duration_ms}ms") |
| return duration_ms |
| else: |
| logger.warning(f"[Media Utils] 无法获取媒体文件时长: {file_path}") |
| return None |
|
|
| except FileNotFoundError: |
| logger.warning( |
| "[Media Utils] ffprobe未安装或不在PATH中,无法获取媒体时长。请安装ffmpeg: https://ffmpeg.org/" |
| ) |
| return None |
| except Exception as e: |
| logger.warning(f"[Media Utils] 获取媒体时长时出错: {e}") |
| return None |
|
|
|
|
| async def convert_audio_to_opus(audio_path: str, output_path: str | None = None) -> str: |
| """使用ffmpeg将音频转换为opus格式 |
| |
| Args: |
| audio_path: 原始音频文件路径 |
| output_path: 输出文件路径,如果为None则自动生成 |
| |
| Returns: |
| 转换后的opus文件路径 |
| |
| Raises: |
| Exception: 转换失败时抛出异常 |
| """ |
| |
| if audio_path.lower().endswith(".opus"): |
| return audio_path |
|
|
| |
| if output_path is None: |
| temp_dir = get_astrbot_temp_path() |
| os.makedirs(temp_dir, exist_ok=True) |
| output_path = os.path.join(temp_dir, f"media_audio_{uuid.uuid4().hex}.opus") |
|
|
| try: |
| |
| |
| |
| |
| |
| |
| process = await asyncio.create_subprocess_exec( |
| "ffmpeg", |
| "-y", |
| "-i", |
| audio_path, |
| "-acodec", |
| "libopus", |
| "-ac", |
| "1", |
| "-ar", |
| "16000", |
| output_path, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| ) |
|
|
| stdout, stderr = await process.communicate() |
|
|
| if process.returncode != 0: |
| |
| if output_path and os.path.exists(output_path): |
| try: |
| os.remove(output_path) |
| logger.debug( |
| f"[Media Utils] 已清理失败的opus输出文件: {output_path}" |
| ) |
| except OSError as e: |
| logger.warning(f"[Media Utils] 清理失败的opus输出文件时出错: {e}") |
|
|
| error_msg = stderr.decode() if stderr else "未知错误" |
| logger.error(f"[Media Utils] ffmpeg转换音频失败: {error_msg}") |
| raise Exception(f"ffmpeg conversion failed: {error_msg}") |
|
|
| logger.debug(f"[Media Utils] 音频转换成功: {audio_path} -> {output_path}") |
| return output_path |
|
|
| except FileNotFoundError: |
| logger.error( |
| "[Media Utils] ffmpeg未安装或不在PATH中,无法转换音频格式。请安装ffmpeg: https://ffmpeg.org/" |
| ) |
| raise Exception("ffmpeg not found") |
| except Exception as e: |
| logger.error(f"[Media Utils] 转换音频格式时出错: {e}") |
| raise |
|
|
|
|
| async def convert_video_format( |
| video_path: str, output_format: str = "mp4", output_path: str | None = None |
| ) -> str: |
| """使用ffmpeg转换视频格式 |
| |
| Args: |
| video_path: 原始视频文件路径 |
| output_format: 目标格式,默认mp4 |
| output_path: 输出文件路径,如果为None则自动生成 |
| |
| Returns: |
| 转换后的视频文件路径 |
| |
| Raises: |
| Exception: 转换失败时抛出异常 |
| """ |
| |
| if video_path.lower().endswith(f".{output_format}"): |
| return video_path |
|
|
| |
| if output_path is None: |
| temp_dir = get_astrbot_temp_path() |
| os.makedirs(temp_dir, exist_ok=True) |
| output_path = os.path.join( |
| temp_dir, |
| f"media_video_{uuid.uuid4().hex}.{output_format}", |
| ) |
|
|
| try: |
| |
| process = await asyncio.create_subprocess_exec( |
| "ffmpeg", |
| "-y", |
| "-i", |
| video_path, |
| "-c:v", |
| "libx264", |
| "-c:a", |
| "aac", |
| output_path, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| ) |
|
|
| stdout, stderr = await process.communicate() |
|
|
| if process.returncode != 0: |
| |
| if output_path and os.path.exists(output_path): |
| try: |
| os.remove(output_path) |
| logger.debug( |
| f"[Media Utils] 已清理失败的{output_format}输出文件: {output_path}" |
| ) |
| except OSError as e: |
| logger.warning( |
| f"[Media Utils] 清理失败的{output_format}输出文件时出错: {e}" |
| ) |
|
|
| error_msg = stderr.decode() if stderr else "未知错误" |
| logger.error(f"[Media Utils] ffmpeg转换视频失败: {error_msg}") |
| raise Exception(f"ffmpeg conversion failed: {error_msg}") |
|
|
| logger.debug(f"[Media Utils] 视频转换成功: {video_path} -> {output_path}") |
| return output_path |
|
|
| except FileNotFoundError: |
| logger.error( |
| "[Media Utils] ffmpeg未安装或不在PATH中,无法转换视频格式。请安装ffmpeg: https://ffmpeg.org/" |
| ) |
| raise Exception("ffmpeg not found") |
| except Exception as e: |
| logger.error(f"[Media Utils] 转换视频格式时出错: {e}") |
| raise |
|
|
|
|
| async def convert_audio_format( |
| audio_path: str, |
| output_format: str = "amr", |
| output_path: str | None = None, |
| ) -> str: |
| """使用ffmpeg将音频转换为指定格式。 |
| |
| Args: |
| audio_path: 原始音频文件路径 |
| output_format: 目标格式,例如 amr / ogg |
| output_path: 输出文件路径,如果为None则自动生成 |
| |
| Returns: |
| 转换后的音频文件路径 |
| """ |
| if audio_path.lower().endswith(f".{output_format}"): |
| return audio_path |
|
|
| if output_path is None: |
| temp_dir = Path(get_astrbot_temp_path()) |
| temp_dir.mkdir(parents=True, exist_ok=True) |
| output_path = str(temp_dir / f"media_audio_{uuid.uuid4().hex}.{output_format}") |
|
|
| args = ["ffmpeg", "-y", "-i", audio_path] |
| if output_format == "amr": |
| args.extend(["-ac", "1", "-ar", "8000", "-ab", "12.2k"]) |
| elif output_format == "ogg": |
| args.extend(["-acodec", "libopus", "-ac", "1", "-ar", "16000"]) |
| args.append(output_path) |
|
|
| try: |
| process = await asyncio.create_subprocess_exec( |
| *args, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| ) |
| _, stderr = await process.communicate() |
| if process.returncode != 0: |
| if output_path and os.path.exists(output_path): |
| try: |
| os.remove(output_path) |
| except OSError as e: |
| logger.warning(f"[Media Utils] 清理失败的音频输出文件时出错: {e}") |
| error_msg = stderr.decode() if stderr else "未知错误" |
| raise Exception(f"ffmpeg conversion failed: {error_msg}") |
| logger.debug(f"[Media Utils] 音频转换成功: {audio_path} -> {output_path}") |
| return output_path |
| except FileNotFoundError: |
| raise Exception("ffmpeg not found") |
|
|
|
|
| async def convert_audio_to_amr(audio_path: str, output_path: str | None = None) -> str: |
| """将音频转换为amr格式。""" |
| return await convert_audio_format( |
| audio_path=audio_path, |
| output_format="amr", |
| output_path=output_path, |
| ) |
|
|
|
|
| async def convert_audio_to_wav(audio_path: str, output_path: str | None = None) -> str: |
| """将音频转换为wav格式。""" |
| return await convert_audio_format( |
| audio_path=audio_path, |
| output_format="wav", |
| output_path=output_path, |
| ) |
|
|
|
|
| async def extract_video_cover( |
| video_path: str, |
| output_path: str | None = None, |
| ) -> str: |
| """从视频中提取封面图(JPG)。""" |
| if output_path is None: |
| temp_dir = Path(get_astrbot_temp_path()) |
| temp_dir.mkdir(parents=True, exist_ok=True) |
| output_path = str(temp_dir / f"media_cover_{uuid.uuid4().hex}.jpg") |
|
|
| try: |
| process = await asyncio.create_subprocess_exec( |
| "ffmpeg", |
| "-y", |
| "-i", |
| video_path, |
| "-ss", |
| "00:00:00", |
| "-frames:v", |
| "1", |
| output_path, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| ) |
| _, stderr = await process.communicate() |
| if process.returncode != 0: |
| if output_path and os.path.exists(output_path): |
| try: |
| os.remove(output_path) |
| except OSError as e: |
| logger.warning(f"[Media Utils] 清理失败的视频封面文件时出错: {e}") |
| error_msg = stderr.decode() if stderr else "未知错误" |
| raise Exception(f"ffmpeg extract cover failed: {error_msg}") |
| return output_path |
| except FileNotFoundError: |
| raise Exception("ffmpeg not found") |
|
|