| import os |
| os.environ['CURL_CA_BUNDLE'] = '' |
|
|
| import torch |
| |
| import torchvision.transforms as transforms |
| import openai |
| import ffmpeg |
| from .tag2text import tag2text_caption |
| from .utils import * |
|
|
| from .load_internvideo import * |
|
|
| from .grit_model import DenseCaptioning |
| from .lang import SimpleLanguageModel |
| from scipy.io.wavfile import write as write_wav |
| from bark import SAMPLE_RATE, generate_audio |
|
|
|
|
| class VideoCaption: |
| def __init__(self, device): |
| self.device = device |
| self.image_size = 384 |
| |
| self.video_path = None |
| self.result = None |
| self.tags = None |
| self.normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], |
| std=[0.229, 0.224, 0.225]) |
| self.transform = transforms.Compose([transforms.ToPILImage(),transforms.Resize((self.image_size, self.image_size)), transforms.ToTensor(),self.normalize]) |
| self.model = tag2text_caption(pretrained="model_zoo/tag2text_swin_14m.pth", image_size=self.image_size, vit='swin_b').eval().to(device) |
| self.load_video = LoadVideo() |
| print("[INFO] initialize Caption model success!") |
|
|
| def framewise_details(self, inputs): |
| video_path = inputs.strip() |
| caption = self.inference(video_path) |
| frame_caption = "" |
| prev_caption = "" |
| start_time = 0 |
| end_time = 0 |
| for i, j in enumerate(caption): |
| current_caption = f"{j}." |
| current_dcs = f"{i+1}" |
| if len(current_dcs) > 0: |
| last_valid_dcs = current_dcs |
| if current_caption == prev_caption: |
| end_time = i+1 |
| else: |
| if prev_caption: |
| frame_caption += f"Second {start_time} - {end_time}: {prev_caption}{last_valid_dcs}\n" |
| start_time = i+1 |
| end_time = i+1 |
| prev_caption = current_caption |
| if prev_caption: |
| frame_caption += f"Second {start_time} - {end_time}: {prev_caption}{current_dcs}\n" |
| total_dur = end_time |
| frame_caption += f"| Total Duration: {total_dur} seconds.\n" |
|
|
| print(frame_caption) |
| |
| self.video_path = video_path |
| |
| |
| |
| |
| return frame_caption |
| |
| @prompts(name="Video Caption", |
| description="useful when you want to generate a description for video. " |
| "like: generate a description or caption for this video. " |
| "The input to this tool should be a string, " |
| "representing the video_path") |
| def inference(self, inputs): |
| video_path = inputs.strip() |
| data = self.load_video(video_path) |
| |
| tmp = [] |
| for _, img in enumerate(data): |
| tmp.append(self.transform(img).to(self.device).unsqueeze(0)) |
|
|
| |
| image = torch.cat(tmp).to(self.device) |
| |
|
|
| input_tag_list = None |
| with torch.no_grad(): |
| caption, tags = self.model.generate(image,tag_input = input_tag_list, max_length = 50, return_tag_predict = True) |
| |
| |
| del data, image, tmp |
| torch.cuda.empty_cache() |
| torch.cuda.ipc_collect() |
| self.result = caption |
| self.tags = tags |
| |
| return caption |
|
|
|
|
| class Summarization: |
| def __init__(self, device): |
| self.device = device |
| self.model = SimpleT5() |
| self.model.load_model( |
| "t5", "./model_zoo/flan-t5-large-finetuned-openai-summarize_from_feedback", use_gpu=False) |
| self.model.model = self.model.model.to(self.device) |
| self.model.device = device |
| |
| print("[INFO] initialize Summarize model success!") |
|
|
| @prompts(name="Video Summarization", |
| description="useful when you want to Summarize video content for input video. " |
| "like: summarize this video. " |
| "The input to this tool should be a string, " |
| "representing the video_path") |
| def inference(self, inputs): |
| caption = inputs.strip() |
| sum_res = self.model.predict(caption) |
| return sum_res |
|
|
|
|
| class ActionRecognition: |
| def __init__(self, device): |
| self.device = device |
| self.video_path = None |
| |
| self.model = load_intern_action(device) |
| self.transform = transform_action() |
| self.toPIL = T.ToPILImage() |
| self.load_video = LoadVideo() |
| print("[INFO] initialize InternVideo model success!") |
| |
| @prompts(name="Action Recognition", |
| description="useful when you want to recognize the action category in this video. " |
| "like: recognize the action or classify this video" |
| "The input to this tool should be a string, " |
| "representing the video_path") |
| def inference(self, inputs): |
| video_path = inputs.strip() |
| |
| |
| |
| |
| data = self.load_video(video_path) |
|
|
| |
| action_index = np.linspace(0, len(data)-1, 8).astype(int) |
| tmp_pred = [] |
| for i,img in enumerate(data): |
| if i in action_index: |
| tmp_pred.append(self.toPIL(img)) |
| action_tensor = self.transform(tmp_pred) |
| TC, H, W = action_tensor.shape |
| action_tensor = action_tensor.reshape(1, TC//3, 3, H, W).permute(0, 2, 1, 3, 4).to(self.device) |
| with torch.no_grad(): |
| prediction = self.model(action_tensor) |
| prediction = F.softmax(prediction, dim=1).flatten() |
| prediction = kinetics_classnames[str(int(prediction.argmax()))] |
| |
| return prediction |
|
|
|
|
| class DenseCaption: |
| def __init__(self, device): |
| self.device = device |
| self.model = DenseCaptioning(device) |
| self.model.initialize_model() |
| |
| self.load_video = LoadVideo() |
| print("[INFO] initialize DenseCaptioe model success!") |
|
|
| @prompts(name="Video Dense Caption", |
| description="useful when you want to generate a dense caption for video. " |
| "like: generate a dense caption or description for this video. " |
| "The input to this tool should be a string, " |
| "representing the video_path") |
| def inference(self, inputs): |
| video_path = inputs.strip() |
| |
| data = self.load_video(video_path) |
| dense_caption = [] |
| dense_index = np.arange(0, len(data)-1, 5) |
| original_images = data[dense_index,:,:,::-1] |
| with torch.no_grad(): |
| for original_image in original_images: |
| dense_caption.append(self.model.run_caption_tensor(original_image)) |
| dense_caption = ' '.join([f"Second {i+1} : {j}.\n" for i,j in zip(dense_index,dense_caption)]) |
|
|
| return dense_caption |
|
|
|
|
| class GenerateTikTokVideo: |
| template_model = True |
| def __init__(self, ActionRecognition, VideoCaption, DenseCaption): |
| self.ActionRecognition = ActionRecognition |
| self.VideoCaption = VideoCaption |
| |
| self.DenseCaption = DenseCaption |
| self.SimpleLanguageModel = None |
|
|
| @prompts(name="Generate TikTok Video", |
| description="useful when you want to generate a video with TikTok style based on prompt." |
| "like: cut this video to a TikTok video based on prompt." |
| "The input to this tool should be a comma separated string of two, " |
| "representing the video_path and prompt") |
| def inference(self, inputs): |
| video_path = inputs.split(',')[0].strip() |
| text = ', '.join(inputs.split(',')[1: ]) |
| if self.SimpleLanguageModel == None: |
| self.SimpleLanguageModel = SimpleLanguageModel() |
| action_classes = self.ActionRecognition.inference(video_path) |
| print(f'action_classes = {action_classes}') |
| dense_caption = self.DenseCaption.inference(video_path) |
| print(f'dense_caption = {dense_caption}') |
| caption = self.VideoCaption.inference(video_path) |
| caption = '. '.join(caption) |
| print(f'caption = {caption}') |
| tags = self.VideoCaption.tags |
| print(f'tags = {tags}') |
| framewise_caption = self.VideoCaption.framewise_details(video_path) |
| print(f'framewise_caption = {framewise_caption}') |
| video_prompt = f"""The tags for this video are: {action_classes}, {','.join(tags)}; |
| The temporal description of the video is: {framewise_caption} |
| The dense caption of the video is: {dense_caption}""" |
| timestamp = self.run_text_with_time(video_prompt, text) |
| print(f'timestamp = {timestamp}') |
| if not timestamp: |
| return 'Error! Please try it again.' |
| start_time, end_time = min(timestamp), max(timestamp) |
| print(f'start_time, end_time = = {start_time}, {end_time}') |
| video_during = end_time - start_time + 1 |
| |
| |
| |
| |
| prompt=f"忘记之前的回答模板,请使用中文回答这个问题。视频里如果出现男生就叫小帅,出现女生就叫小美,如果不确定性别,就叫大聪明。请以’注意看,这个人叫’为开头生成一段视频营销文案。" |
| texts = self.run_text_with_tiktok(video_prompt, prompt).strip() |
| |
| texts += '。' |
| print(f"before polishing: {texts}") |
| print('*' * 40) |
| |
| texts = openai.ChatCompletion.create(model="gpt-3.5-turbo",messages=[{"role":"user","content":f"使用中文回答这个问题,请用润色下面的句子,去除重复的片段,并且仍以’注意看,这个人叫’为开头:{texts}"}]).choices[0].message['content'] |
| print(f"after polishing: {texts}") |
| clipped_video_path = gen_new_name(video_path, 'tmp', 'mp4') |
| wav_file = clipped_video_path.replace('.mp4', '.wav') |
| audio_path = self.gen_audio(texts, wav_file) |
| audio_duration = int(float(ffmpeg.probe(audio_path)['streams'][0]['duration']))+1 |
| os.system(f"ffmpeg -y -v quiet -ss {start_time} -t {video_during} -i {video_path} -c:v libx264 -c:a copy -movflags +faststart {clipped_video_path}") |
| |
| new_video_path = gen_new_name(video_path, 'GenerateTickTokVideo', 'mp4') |
| if video_during < audio_duration: |
| |
| |
| |
| video_concat = os.path.join(os.path.dirname(clipped_video_path), 'concat.info') |
| video_concat = gen_new_name(video_concat, '', 'info') |
| with open(video_concat,'w') as f: |
| for _ in range(audio_duration//video_during+1): |
| f.write(f"file \'{os.path.basename(clipped_video_path)}\'\n") |
| tmp_path = gen_new_name(video_path, 'tmp', 'mp4') |
| os.system(f"ffmpeg -y -f concat -i {video_concat} {tmp_path}") |
| print(f"ffmpeg -y -i {tmp_path} -i {wav_file} {new_video_path}") |
| os.system(f"ffmpeg -y -i {tmp_path} -i {wav_file} {new_video_path}") |
| else: |
| print(f"ffmpeg -y -i {clipped_video_path} -i {wav_file} {new_video_path}") |
| os.system(f"ffmpeg -y -i {clipped_video_path} -i {wav_file} {new_video_path}") |
| if not os.path.exists(new_video_path): |
| import pdb |
| pdb.set_trace() |
| |
| |
| |
| return (new_video_path, ) |
| |
| def run_text_with_time(self, video_caption, text): |
| |
| prompt = "Only in this conversation, \ |
| You must find the text-related start time \ |
| and end time based on video caption. Your answer \ |
| must end with the format {answer} [start time: end time]." |
| response = self.SimpleLanguageModel(f"Video content: {video_caption}. Text: {text.strip()}." + prompt) |
| |
| |
| import re |
| pattern = r"\d+" |
| |
| try: |
| |
| matches = re.findall(pattern, response) |
| start_idx , end_idx = matches[-2:] |
| start_idx , end_idx = int(start_idx), int(end_idx) |
| except: |
| return None |
| import pdb |
| pdb.set_trace() |
| |
| print(f"\nProcessed run_text_with_time, Input text: {text}\n") |
| return (start_idx, end_idx) |
|
|
| def run_text_with_tiktok(self, video_content, prompt): |
| |
| inputs = f"Video description: {video_content}. {prompt}" |
| |
| response = self.SimpleLanguageModel(inputs) |
| response = response.replace("\\", "/") |
| |
| |
| |
| |
| print(f"\nProcessed run_text_with_tiktok, Input text: {prompt}\n, Response: {response}") |
| return response |
|
|
| def gen_audio(self, text, save_path): |
| audio_array = generate_audio(text) |
| write_wav(save_path, SAMPLE_RATE, audio_array) |
| return save_path |
|
|
|
|
| if __name__ == '__main__': |
| |
| |
| |
| |
| video_path = './tmp_files/f4236666.mp4' |
| device = 'cuda:0' |
| |
| |
| |
| |
| |
| |
| from lang import SimpleLanguageModel |
| model = GenerateTikTokVideo(ActionRecognition(device), |
| VideoCaption(device), |
| DenseCaption(device) |
| ) |
| out = model.inference(video_path+",帮我剪辑出最精彩的片段") |
| print(out) |