NitinBot002 commited on
Commit
4da6b12
·
verified ·
1 Parent(s): aba1e76

Create telegram_youtube_workflow.py

Browse files
Files changed (1) hide show
  1. telegram_youtube_workflow.py +154 -0
telegram_youtube_workflow.py ADDED
@@ -0,0 +1,154 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import asyncio
3
+ import json
4
+ import logging
5
+ from datetime import datetime
6
+ from pathlib import Path
7
+ from telethon import TelegramClient
8
+ from telethon.sessions import StringSession
9
+ from googleapiclient.discovery import build
10
+ from googleapiclient.errors import HttpError
11
+ from googleapiclient.http import MediaFileUpload
12
+ from google.oauth2.credentials import Credentials
13
+ import firebase_admin
14
+ from firebase_admin import credentials, firestore
15
+
16
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()])
17
+ logger = logging.getLogger(__name__)
18
+
19
+ class TelegramYouTubeWorkflow:
20
+ def __init__(self, config):
21
+ self.config = config
22
+ self.telegram_client = None
23
+ self.youtube_service = None
24
+ self.firestore_db = None
25
+ self.download_dir = Path(self.config.get('download_directory', 'downloads'))
26
+ self.download_dir.mkdir(exist_ok=True)
27
+ self.SCOPES = ['https://www.googleapis.com/auth/youtube.upload', 'https://www.googleapis.com/auth/youtube']
28
+ self.setup_firebase()
29
+
30
+ def setup_firebase(self):
31
+ try:
32
+ service_account_path = self.config['firebase']['service_account_key']
33
+ if not os.path.exists(service_account_path):
34
+ logger.warning(f"Firebase key not found. Proceeding without Firebase.")
35
+ return
36
+ if not firebase_admin._apps:
37
+ cred = credentials.Certificate(service_account_path)
38
+ firebase_admin.initialize_app(cred)
39
+ self.firestore_db = firestore.client()
40
+ self.collection_name = self.config['firebase'].get('collection_name', 'processed_videos')
41
+ logger.info("Firebase connection established.")
42
+ except Exception as e:
43
+ logger.error(f"Error setting up Firebase: {e}")
44
+
45
+ async def setup_telegram_client(self):
46
+ telegram_config = self.config['telegram']
47
+ session_string = telegram_config.get('session_string')
48
+ if not session_string:
49
+ raise ValueError("Telethon session string is not configured.")
50
+ self.telegram_client = TelegramClient(
51
+ StringSession(session_string),
52
+ int(telegram_config['api_id']),
53
+ telegram_config['api_hash']
54
+ )
55
+ logger.info("Telegram client configured with session string.")
56
+
57
+ def build_youtube_service(self, credentials_path='token.json'):
58
+ if not os.path.exists(credentials_path):
59
+ logger.error("YouTube credentials (token.json) not found.")
60
+ return None
61
+ creds = Credentials.from_authorized_user_file(credentials_path, self.SCOPES)
62
+ self.youtube_service = build('youtube', 'v3', credentials=creds)
63
+ logger.info("YouTube API client initialized.")
64
+ return self.youtube_service
65
+
66
+ def is_video_processed(self, channel_username, message_id):
67
+ if not self.firestore_db: return False
68
+ doc_id = f"{channel_username.replace('@', '')}_{message_id}"
69
+ return self.firestore_db.collection(self.collection_name).document(doc_id).get().exists
70
+
71
+ def mark_video_processed(self, channel_username, message_id, youtube_id=None, telegram_url=None):
72
+ if not self.firestore_db: return
73
+ doc_id = f"{channel_username.replace('@', '')}_{message_id}"
74
+ doc_data = {
75
+ 'channel_username': channel_username, 'telegram_message_id': message_id,
76
+ 'telegram_url': telegram_url or f"https://t.me/{channel_username.replace('@', '')}/{message_id}",
77
+ 'youtube_video_id': youtube_id, 'processed_at': firestore.SERVER_TIMESTAMP,
78
+ 'status': 'completed' if youtube_id else 'failed'
79
+ }
80
+ self.firestore_db.collection(self.collection_name).document(doc_id).set(doc_data)
81
+
82
+ async def get_channel_videos_batch(self, limit=10, offset_id=0):
83
+ await self.telegram_client.connect()
84
+ try:
85
+ entity = await self.telegram_client.get_entity(self.config['telegram']['channel_username'])
86
+ videos, skipped_count, total_checked, last_message_id = [], 0, 0, None
87
+ async for message in self.telegram_client.iter_messages(entity, limit=limit, offset_id=offset_id):
88
+ total_checked += 1
89
+ last_message_id = message.id
90
+ if message.video and message.video.mime_type.startswith('video/'):
91
+ if self.is_video_processed(self.config['telegram']['channel_username'], message.id):
92
+ skipped_count += 1
93
+ continue
94
+ videos.append({'id': message.id, 'message': message, 'caption': message.text or '', 'date': message.date,
95
+ 'telegram_url': f"https://t.me/{self.config['telegram']['channel_username'].replace('@', '')}/{message.id}",
96
+ 'channel_username': self.config['telegram']['channel_username']})
97
+ return {'videos': videos, 'skipped_count': skipped_count, 'total_checked': total_checked, 'last_message_id': last_message_id}
98
+ finally:
99
+ if self.telegram_client.is_connected(): await self.telegram_client.disconnect()
100
+
101
+ async def download_video(self, video_info):
102
+ try:
103
+ filepath = self.download_dir / f"video_{video_info['id']}.mp4"
104
+ await self.telegram_client.connect()
105
+ await self.telegram_client.download_media(message=video_info['message'], file=str(filepath))
106
+ logger.info(f"Downloaded: {filepath}")
107
+ return str(filepath)
108
+ except Exception as e:
109
+ logger.error(f"Error downloading video {video_info['id']}: {e}")
110
+ return None
111
+ finally:
112
+ if self.telegram_client.is_connected(): await self.telegram_client.disconnect()
113
+
114
+ def upload_to_youtube(self, video_path, video_info):
115
+ try:
116
+ video_settings, caption = self.config['video_settings'], video_info['caption']
117
+ title = f"{video_settings.get('title_prefix', '')}{caption[:90]}" or f"Video from Telegram {video_info['date'].strftime('%Y-%m-%d')}"
118
+ description = f"{video_settings.get('description_template', '')}\n\nOriginal post: {video_info['telegram_url']}\n\nCaption: {caption}"
119
+ body = {'snippet': {'title': title, 'description': description, 'tags': video_settings.get('tags', []), 'categoryId': video_settings.get('category_id', '22')},
120
+ 'status': {'privacyStatus': video_settings.get('privacy_status', 'private')}}
121
+ media = MediaFileUpload(video_path, chunksize=-1, resumable=True)
122
+ request = self.youtube_service.videos().insert(part=','.join(body.keys()), body=body, media_body=media)
123
+ response = None
124
+ while response is None:
125
+ status, response = request.next_chunk()
126
+ if status: logger.info(f"Upload progress: {int(status.progress() * 100)}%")
127
+ logger.info(f"Video uploaded! YouTube ID: {response['id']}")
128
+ return response['id']
129
+ except HttpError as e: logger.error(f"YouTube API error: {e.content}"); return None
130
+ except Exception as e: logger.error(f"Error uploading to YouTube: {e}"); return None
131
+
132
+ def cleanup_video(self, video_path):
133
+ try: os.remove(video_path); logger.info(f"Cleaned up: {video_path}")
134
+ except Exception as e: logger.error(f"Error cleaning up {video_path}: {e}")
135
+
136
+ async def process_single_batch(self, limit=5, offset_id=0):
137
+ if not self.youtube_service: return {"status": "error", "message": "YouTube service not authenticated"}
138
+ await self.setup_telegram_client()
139
+ batch_data = await self.get_channel_videos_batch(limit=limit, offset_id=offset_id)
140
+ processed_count, failed_count = 0, 0
141
+ for video_info in batch_data['videos']:
142
+ video_path = await self.download_video(video_info)
143
+ if not video_path: failed_count += 1; continue
144
+ youtube_id = self.upload_to_youtube(video_path, video_info)
145
+ self.mark_video_processed(video_info['channel_username'], video_info['id'], youtube_id, video_info['telegram_url'])
146
+ if youtube_id: processed_count += 1
147
+ else: failed_count += 1
148
+ self.cleanup_video(video_path)
149
+ await asyncio.sleep(1)
150
+ continue_prompt = batch_data['total_checked'] == limit and len(batch_data['videos']) == 0
151
+ summary = {"status": "Batch completed", "processed": processed_count, "failed": failed_count, "skipped": batch_data['skipped_count'],
152
+ "new_videos_found": len(batch_data['videos']), "continue_prompt": continue_prompt, "next_offset": batch_data['last_message_id']}
153
+ logger.info(f"Batch finished: {summary}")
154
+ return summary