astrbbbb / astrbot /core /utils /media_utils.py
qa1145's picture
Upload 1245 files
8ede856 verified
"""媒体文件处理工具
提供音视频格式转换、时长获取等功能。
"""
import asyncio
import os
import subprocess
import uuid
from pathlib import Path
from astrbot import logger
from astrbot.core.utils.astrbot_path import get_astrbot_temp_path
async def get_media_duration(file_path: str) -> int | None:
"""使用ffprobe获取媒体文件时长
Args:
file_path: 媒体文件路径
Returns:
时长(毫秒),如果获取失败返回None
"""
try:
# 使用ffprobe获取时长
process = await asyncio.create_subprocess_exec(
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
file_path,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0 and stdout:
duration_seconds = float(stdout.decode().strip())
duration_ms = int(duration_seconds * 1000)
logger.debug(f"[Media Utils] 获取媒体时长: {duration_ms}ms")
return duration_ms
else:
logger.warning(f"[Media Utils] 无法获取媒体文件时长: {file_path}")
return None
except FileNotFoundError:
logger.warning(
"[Media Utils] ffprobe未安装或不在PATH中,无法获取媒体时长。请安装ffmpeg: https://ffmpeg.org/"
)
return None
except Exception as e:
logger.warning(f"[Media Utils] 获取媒体时长时出错: {e}")
return None
async def convert_audio_to_opus(audio_path: str, output_path: str | None = None) -> str:
"""使用ffmpeg将音频转换为opus格式
Args:
audio_path: 原始音频文件路径
output_path: 输出文件路径,如果为None则自动生成
Returns:
转换后的opus文件路径
Raises:
Exception: 转换失败时抛出异常
"""
# 如果已经是opus格式,直接返回
if audio_path.lower().endswith(".opus"):
return audio_path
# 生成输出文件路径
if output_path is None:
temp_dir = get_astrbot_temp_path()
os.makedirs(temp_dir, exist_ok=True)
output_path = os.path.join(temp_dir, f"media_audio_{uuid.uuid4().hex}.opus")
try:
# 使用ffmpeg转换为opus格式
# -y: 覆盖输出文件
# -i: 输入文件
# -acodec libopus: 使用opus编码器
# -ac 1: 单声道
# -ar 16000: 采样率16kHz
process = await asyncio.create_subprocess_exec(
"ffmpeg",
"-y",
"-i",
audio_path,
"-acodec",
"libopus",
"-ac",
"1",
"-ar",
"16000",
output_path,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
# 清理可能已生成但无效的临时文件
if output_path and os.path.exists(output_path):
try:
os.remove(output_path)
logger.debug(
f"[Media Utils] 已清理失败的opus输出文件: {output_path}"
)
except OSError as e:
logger.warning(f"[Media Utils] 清理失败的opus输出文件时出错: {e}")
error_msg = stderr.decode() if stderr else "未知错误"
logger.error(f"[Media Utils] ffmpeg转换音频失败: {error_msg}")
raise Exception(f"ffmpeg conversion failed: {error_msg}")
logger.debug(f"[Media Utils] 音频转换成功: {audio_path} -> {output_path}")
return output_path
except FileNotFoundError:
logger.error(
"[Media Utils] ffmpeg未安装或不在PATH中,无法转换音频格式。请安装ffmpeg: https://ffmpeg.org/"
)
raise Exception("ffmpeg not found")
except Exception as e:
logger.error(f"[Media Utils] 转换音频格式时出错: {e}")
raise
async def convert_video_format(
video_path: str, output_format: str = "mp4", output_path: str | None = None
) -> str:
"""使用ffmpeg转换视频格式
Args:
video_path: 原始视频文件路径
output_format: 目标格式,默认mp4
output_path: 输出文件路径,如果为None则自动生成
Returns:
转换后的视频文件路径
Raises:
Exception: 转换失败时抛出异常
"""
# 如果已经是目标格式,直接返回
if video_path.lower().endswith(f".{output_format}"):
return video_path
# 生成输出文件路径
if output_path is None:
temp_dir = get_astrbot_temp_path()
os.makedirs(temp_dir, exist_ok=True)
output_path = os.path.join(
temp_dir,
f"media_video_{uuid.uuid4().hex}.{output_format}",
)
try:
# 使用ffmpeg转换视频格式
process = await asyncio.create_subprocess_exec(
"ffmpeg",
"-y",
"-i",
video_path,
"-c:v",
"libx264",
"-c:a",
"aac",
output_path,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
# 清理可能已生成但无效的临时文件
if output_path and os.path.exists(output_path):
try:
os.remove(output_path)
logger.debug(
f"[Media Utils] 已清理失败的{output_format}输出文件: {output_path}"
)
except OSError as e:
logger.warning(
f"[Media Utils] 清理失败的{output_format}输出文件时出错: {e}"
)
error_msg = stderr.decode() if stderr else "未知错误"
logger.error(f"[Media Utils] ffmpeg转换视频失败: {error_msg}")
raise Exception(f"ffmpeg conversion failed: {error_msg}")
logger.debug(f"[Media Utils] 视频转换成功: {video_path} -> {output_path}")
return output_path
except FileNotFoundError:
logger.error(
"[Media Utils] ffmpeg未安装或不在PATH中,无法转换视频格式。请安装ffmpeg: https://ffmpeg.org/"
)
raise Exception("ffmpeg not found")
except Exception as e:
logger.error(f"[Media Utils] 转换视频格式时出错: {e}")
raise
async def convert_audio_format(
audio_path: str,
output_format: str = "amr",
output_path: str | None = None,
) -> str:
"""使用ffmpeg将音频转换为指定格式。
Args:
audio_path: 原始音频文件路径
output_format: 目标格式,例如 amr / ogg
output_path: 输出文件路径,如果为None则自动生成
Returns:
转换后的音频文件路径
"""
if audio_path.lower().endswith(f".{output_format}"):
return audio_path
if output_path is None:
temp_dir = Path(get_astrbot_temp_path())
temp_dir.mkdir(parents=True, exist_ok=True)
output_path = str(temp_dir / f"media_audio_{uuid.uuid4().hex}.{output_format}")
args = ["ffmpeg", "-y", "-i", audio_path]
if output_format == "amr":
args.extend(["-ac", "1", "-ar", "8000", "-ab", "12.2k"])
elif output_format == "ogg":
args.extend(["-acodec", "libopus", "-ac", "1", "-ar", "16000"])
args.append(output_path)
try:
process = await asyncio.create_subprocess_exec(
*args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
_, stderr = await process.communicate()
if process.returncode != 0:
if output_path and os.path.exists(output_path):
try:
os.remove(output_path)
except OSError as e:
logger.warning(f"[Media Utils] 清理失败的音频输出文件时出错: {e}")
error_msg = stderr.decode() if stderr else "未知错误"
raise Exception(f"ffmpeg conversion failed: {error_msg}")
logger.debug(f"[Media Utils] 音频转换成功: {audio_path} -> {output_path}")
return output_path
except FileNotFoundError:
raise Exception("ffmpeg not found")
async def convert_audio_to_amr(audio_path: str, output_path: str | None = None) -> str:
"""将音频转换为amr格式。"""
return await convert_audio_format(
audio_path=audio_path,
output_format="amr",
output_path=output_path,
)
async def convert_audio_to_wav(audio_path: str, output_path: str | None = None) -> str:
"""将音频转换为wav格式。"""
return await convert_audio_format(
audio_path=audio_path,
output_format="wav",
output_path=output_path,
)
async def extract_video_cover(
video_path: str,
output_path: str | None = None,
) -> str:
"""从视频中提取封面图(JPG)。"""
if output_path is None:
temp_dir = Path(get_astrbot_temp_path())
temp_dir.mkdir(parents=True, exist_ok=True)
output_path = str(temp_dir / f"media_cover_{uuid.uuid4().hex}.jpg")
try:
process = await asyncio.create_subprocess_exec(
"ffmpeg",
"-y",
"-i",
video_path,
"-ss",
"00:00:00",
"-frames:v",
"1",
output_path,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
_, stderr = await process.communicate()
if process.returncode != 0:
if output_path and os.path.exists(output_path):
try:
os.remove(output_path)
except OSError as e:
logger.warning(f"[Media Utils] 清理失败的视频封面文件时出错: {e}")
error_msg = stderr.decode() if stderr else "未知错误"
raise Exception(f"ffmpeg extract cover failed: {error_msg}")
return output_path
except FileNotFoundError:
raise Exception("ffmpeg not found")