import gradio as gr import ffmpeg from funasr import AutoModel from moviepy.editor import VideoFileClip import os import subprocess import base64 from PIL import Image import io import uuid import shutil import glob from openai import OpenAI # 初始化funasr模型 model = AutoModel(model="paraformer-zh", vad_model="fsmn-vad", punc_model="ct-punc-c") client = OpenAI(api_key="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") # 记录所有临时文件路径的列表 temp_files = [] def clear_directory(dir_path): if os.path.exists(dir_path): shutil.rmtree(dir_path) os.makedirs(dir_path, exist_ok=True) def segment_video(video_path, segment_length): segment_paths = [] with VideoFileClip(video_path) as video: total_duration = int(video.duration) for start in range(0, total_duration, segment_length): end = min(start + segment_length, total_duration) segment_path = f"segment_{uuid.uuid4()}.mp4" ffmpeg.input(video_path, ss=start, to=end).output(segment_path).run() segment_paths.append(segment_path) temp_files.append(segment_path) return segment_paths def extract_audio(segment_path): audio_path = f"extracted_audio_{uuid.uuid4()}.wav" ffmpeg.input(segment_path).output(audio_path).run() temp_files.append(audio_path) return audio_path def audio_to_text_with_funasr(audio_path): res = model.generate(input=audio_path, batch_size_s=300, hotword='魔搭') if isinstance(res, list) and len(res) > 0: text = " ".join([item.get('text', '') for item in res]) else: text = '无法识别音频内容' return text def process_text_with_openai(text): response = client.chat.completions.create( model='gpt-4-0125-preview', messages=[ {"role": "system", "content": "作为一名综合语言处理专家,您的任务是对课堂录音的文本信息进行分析。关键信息提取: 识别并提取重概念和关键点。情感分析: 分析文本情绪,判断文本所传达的情绪状态,估计分值(1-5).我会直接给到你需要处理的文本,你会直接针对文本进行处理,然后返回结果,只输出关键信息和情感分析的直接结果。输出的结果要十分精简。"}, {"role": "user", "content": text}, ], temperature=0.1 ) processed_text = response.choices[0].message.content.strip() print(processed_text) return processed_text def resize_and_encode_image_to_base64(image_path, output_size=(512, 512)): with Image.open(image_path) as img: img.thumbnail(output_size, Image.ANTIALIAS) img_byte_arr = io.BytesIO() img.save(img_byte_arr, format='JPEG') # 保存为JPG格式 encoded_string = base64.b64encode(img_byte_arr.getvalue()).decode('utf-8') return encoded_string def describe_image_with_openai(base64_image): response = client.chat.completions.create( model="gpt-4-vision-preview", messages=[ { "role": "user", "content": [ {"type": "text", "text": "你是一位专业的课堂状态分析员。你会按照以下步骤完成任务。首先,面部分析识别情绪:1.识别图片中的面部表情。2.将面部表情与相应的基本情绪关联起来。3.注意面部表情的细微差别,这些可能指示更复杂的情绪状态。其次,身体姿态分析互动关系:1.观察并报告图像中个体的身体语言和姿态。2.根据姿态推断互动的性质(例如,协作、对抗)。3.考虑个体之间的距离和方向,以了解互动关系。最后,反馈:直接给出对于图片中情绪状态和互动关系的评分(1-5的范围),不需要给到分析过程。你返回给我的结果只是评分,其他信息都不需要"}, {"type": "image_url","image_url": { "url": f"data:image/jpeg;base64,{base64_image}"}} ] } ], max_tokens=300 ) description = response.choices[0].message.content.strip() print(description) return description def video_to_images(segment_path, interval, start_time): clip = VideoFileClip(segment_path) images_descriptions = [] frames_dir = os.path.join("frames", str(uuid.uuid4())) os.makedirs(frames_dir, exist_ok=True) for i in range(0, int(clip.duration), interval): img_path = os.path.join(frames_dir, f"frame_at_{i}.jpg") clip.save_frame(img_path, t=i) base64_image = resize_and_encode_image_to_base64(img_path) description = describe_image_with_openai(base64_image) minutes = (start_time + i) // 60 seconds = (start_time + i) % 60 timestamp = f"{minutes}分{seconds}秒" images_descriptions.append((base64_image, timestamp, description)) temp_files.append(img_path) return images_descriptions def download_video(video_url): video_path = f"downloaded_video_{uuid.uuid4()}.mp4" subprocess.run([ "ffmpeg", "-y", "-i", video_url, "-vcodec", "libx264", "-crf", "28", "-preset", "ultrafast", video_path ], check=True) temp_files.append(video_path) return video_path def clear_temp_files(): for file_path in temp_files: if os.path.exists(file_path): os.remove(file_path) temp_files.clear() def process_video(video_url, segment_minutes, image_interval_seconds): clear_directory("frames") clear_directory("segments") video_path = download_video(video_url) segment_length = segment_minutes * 60 segments = segment_video(video_path, segment_length) html_results = [] for i, segment_path in enumerate(segments): start_time = i * segment_length end_time = min((i + 1) * segment_length, int(VideoFileClip(video_path).duration)) audio_path = extract_audio(segment_path) text = audio_to_text_with_funasr(audio_path) processed_text = process_text_with_openai(text) images_descriptions = video_to_images(segment_path, image_interval_seconds, start_time) title = f"第 {start_time//60} 分钟到 {end_time//60} 分钟的内容" images_html = ''.join([f'
{image[1]} - {image[2]}
{processed_text}