| |
| from __future__ import annotations |
|
|
| import asyncio |
| import json |
| import os |
| from telethon import TelegramClient, errors |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| import asyncio |
| import json |
| import os |
| from typing import Dict, Any |
|
|
| from telethon import TelegramClient, errors |
| from huggingface_hub import HfApi, hf_hub_download |
|
|
| |
| CHANNEL = "cgsvalka" |
| SESSION_FILE = "my_session.session" |
| OUTPUT_DIR = "downloads" |
| API_ID = 28708692 |
| API_HASH = "72fa6a22c65d7a58e00f2ccb8d60841d" |
| MESSAGE_LIMIT = 0 |
| STATE_FILE = "download_state.json" |
|
|
| |
| |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") |
| HF_REPO_ID = "samfred2/TGFiles" |
| STATE_FILE = "download_state.json" |
|
|
|
|
| def load_local_state() -> Dict[str, Any]: |
| if os.path.exists(STATE_FILE): |
| try: |
| with open(STATE_FILE, "r", encoding="utf-8") as f: |
| return json.load(f) |
| except Exception: |
| return {"downloaded_files": []} |
| return {"downloaded_files": []} |
|
|
|
|
| def save_local_state(state: Dict[str, Any]) -> None: |
| with open(STATE_FILE, "w", encoding="utf-8") as f: |
| json.dump(state, f, indent=2, ensure_ascii=False) |
|
|
|
|
| def download_state_from_hf(token: str) -> Dict[str, Any]: |
| """Try to download the state file from the HF dataset. Returns state dict or empty state.""" |
| if not token: |
| return {"downloaded_files": []} |
| try: |
| |
| local_path = hf_hub_download(repo_id=HF_REPO_ID, filename=STATE_FILE, repo_type="dataset", token=token) |
| with open(local_path, "r", encoding="utf-8") as f: |
| return json.load(f) |
| except Exception: |
| return {"downloaded_files": []} |
|
|
|
|
| def upload_file_to_hf(local_path: str, path_in_repo: str, token: str) -> bool: |
| """Upload a single file to the HF dataset repo. Returns True on success.""" |
| if not token: |
| return False |
| try: |
| api = HfApi() |
| api.upload_file(path_or_fileobj=local_path, path_in_repo=path_in_repo, repo_id=HF_REPO_ID, repo_type="dataset", token=token) |
| return True |
| except Exception as e: |
| print(f"Failed to upload {local_path} to HF: {e}") |
| return False |
|
|
|
|
| def upload_state_to_hf(state: Dict[str, Any], token: str) -> bool: |
| |
| save_local_state(state) |
| return upload_file_to_hf(STATE_FILE, STATE_FILE, token) |
|
|
|
|
| async def download_channel(): |
| os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
| |
| remote_state = download_state_from_hf(HF_TOKEN) if HF_TOKEN else {"downloaded_files": []} |
| local_state = load_local_state() |
|
|
| |
| downloaded_files = { (e.get("message_id"), e.get("filename")) for e in remote_state.get("downloaded_files", []) } |
| for e in local_state.get("downloaded_files", []): |
| downloaded_files.add((e.get("message_id"), e.get("filename"))) |
|
|
| |
| downloaded_list = [ {"message_id": mid, "filename": fname} for (mid, fname) in downloaded_files if mid is not None ] |
|
|
| state: Dict[str, Any] = {"downloaded_files": downloaded_list} |
|
|
| |
| downloaded_ids = {entry["message_id"] for entry in state["downloaded_files"]} |
|
|
| |
| client = TelegramClient(SESSION_FILE, API_ID, API_HASH) |
|
|
| async with client: |
| try: |
| entity = await client.get_entity(CHANNEL) |
| except Exception as e: |
| print(f"Failed to resolve channel '{CHANNEL}': {e}") |
| return 1 |
|
|
| print(f"Starting download from: {entity.title if hasattr(entity, 'title') else CHANNEL}") |
|
|
| count = 0 |
| downloaded = 0 |
| skipped = 0 |
| not_rar = 0 |
|
|
| try: |
| async for message in client.iter_messages(entity, limit=MESSAGE_LIMIT or None): |
| count += 1 |
|
|
| |
| if message.id in downloaded_ids: |
| skipped += 1 |
| continue |
|
|
| if not message.media: |
| continue |
|
|
| |
| is_rar = False |
| filename = "" |
| if message.file: |
| filename = getattr(message.file, 'name', '') or '' |
| if filename: |
| is_rar = filename.lower().endswith('.rar') |
| else: |
| mime_type = getattr(message.file, 'mime_type', '') or '' |
| is_rar = 'rar' in mime_type.lower() if mime_type else False |
|
|
| if not is_rar: |
| not_rar += 1 |
| continue |
|
|
| |
| if filename: |
| suggested = f"{message.id}_{filename}" |
| else: |
| suggested = f"{message.id}.rar" |
|
|
| out_path = os.path.join(OUTPUT_DIR, suggested) |
|
|
| |
| try: |
| print(f"[{count}] downloading -> {os.path.basename(out_path)}") |
| await client.download_media(message, file=out_path) |
| downloaded += 1 |
|
|
| |
| if HF_TOKEN: |
| path_in_repo = f"files/{os.path.basename(out_path)}" |
| ok = upload_file_to_hf(out_path, path_in_repo, HF_TOKEN) |
| if not ok: |
| print(f"Warning: failed to upload {out_path} to HF repo {HF_REPO_ID}") |
|
|
| |
| state["downloaded_files"].append({"message_id": message.id, "filename": os.path.basename(out_path)}) |
| downloaded_ids.add(message.id) |
| save_local_state(state) |
|
|
| |
| if HF_TOKEN: |
| upload_state_to_hf(state, HF_TOKEN) |
|
|
| |
| await asyncio.sleep(0.2) |
| except errors.FloodWaitError as fw: |
| wait = int(fw.seconds) if fw.seconds else 60 |
| print(f"Hit FloodWait: sleeping {wait}s") |
| await asyncio.sleep(wait + 1) |
| except KeyboardInterrupt: |
| print("Interrupted by user; saving state and exiting.") |
| save_local_state(state) |
| if HF_TOKEN: |
| upload_state_to_hf(state, HF_TOKEN) |
| break |
| except Exception as e: |
| print(f"Error while downloading message {message.id}: {e}") |
|
|
| except KeyboardInterrupt: |
| print("Interrupted by user; saving final state.") |
| save_local_state(state) |
| if HF_TOKEN: |
| upload_state_to_hf(state, HF_TOKEN) |
|
|
| print(f"\nFinal Statistics:") |
| print(f"Messages scanned: {count}") |
| print(f"RAR files downloaded: {downloaded}") |
| print(f"Already downloaded (skipped): {skipped}") |
| print(f"Non-RAR files skipped: {not_rar}") |
| print(f"\nDownload state saved to: {STATE_FILE}") |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| asyncio.run(download_channel()) |