Spaces:
Running
Running
| import os | |
| import sqlite3 | |
| import json | |
| import csv | |
| import hashlib | |
| import subprocess | |
| from datetime import datetime | |
| from huggingface_hub import HfApi, hf_hub_download | |
| # Settings | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| REPO_ID = "boffire/libretranslate-en-kab-suggestions" | |
| DEST_JSON_PATH_IN_REPO = "suggestions.json" | |
| DEST_CSV_PATH_IN_REPO = "suggestions.csv" | |
| REPO_TYPE = "dataset" | |
| JSON_OUTPUT_PATH = "/tmp/suggestions.json" | |
| CSV_OUTPUT_PATH = "/tmp/suggestions.csv" | |
| CHECKSUM_FILE_JSON = "/tmp/.last_suggestions_json_checksum" | |
| CHECKSUM_FILE_CSV = "/tmp/.last_suggestions_csv_checksum" | |
| # If start.sh found the DB and symlinked it, this will hit first. | |
| # Otherwise we fall back to a dynamic search. | |
| possible_paths = [ | |
| "/app/db/suggestions.db", # symlink created by start.sh | |
| "/app/.local/share/db/suggestions.db", # when HOME=/app | |
| "/app/suggestions.db", | |
| "/root/.local/share/db/suggestions.db", | |
| "/home/libretranslate/.local/share/db/suggestions.db" | |
| ] | |
| def find_db(): | |
| print(f"Running in CWD: {os.getcwd()}") | |
| # 1. Try known paths | |
| for path in possible_paths: | |
| if os.path.exists(path): | |
| print(f"Found suggestions.db at {path}") | |
| return path | |
| # 2. Dynamic fallback: search the whole filesystem | |
| print("Known paths missed — running dynamic search...") | |
| try: | |
| result = subprocess.run( | |
| ["find", "/", "-name", "suggestions.db", "2>/dev/null"], | |
| capture_output=True, text=True, timeout=15 | |
| ) | |
| # The stderr redirect in the shell command won't work via subprocess, | |
| # so we filter empty lines. | |
| lines = [l for l in result.stdout.splitlines() if l.strip()] | |
| if lines: | |
| print(f"Found suggestions.db via search at {lines[0]}") | |
| return lines[0] | |
| except Exception as e: | |
| print(f"Dynamic search failed: {e}") | |
| print("suggestions.db not found anywhere.") | |
| return None | |
| def extract_suggestions(db_path): | |
| suggestions = [] | |
| try: | |
| conn = sqlite3.connect(db_path) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT q, s, source, target FROM suggestions") | |
| rows = cursor.fetchall() | |
| conn.close() | |
| for row in rows: | |
| unique_id = hashlib.md5((row[0] + row[1] + row[2] + row[3]).encode()).hexdigest() | |
| suggestions.append({ | |
| "id": unique_id, | |
| "source_text": row[0], | |
| "suggested_text": row[1], | |
| "source_lang": row[2], | |
| "target_lang": row[3], | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| except sqlite3.Error as e: | |
| print(f"SQLite error: {e}") | |
| return suggestions | |
| def download_existing_json(): | |
| try: | |
| path = hf_hub_download( | |
| repo_id=REPO_ID, | |
| repo_type=REPO_TYPE, | |
| filename=DEST_JSON_PATH_IN_REPO, | |
| token=HF_TOKEN, | |
| local_dir="/tmp" | |
| ) | |
| print("Downloaded existing suggestions from Hugging Face.") | |
| return path | |
| except Exception as e: | |
| print(f"Could not fetch existing suggestions from HF: {e}") | |
| return None | |
| def merge_with_existing(suggestions, existing_json_path): | |
| existing = {} | |
| if existing_json_path and os.path.exists(existing_json_path): | |
| try: | |
| with open(existing_json_path, "r", encoding="utf-8") as f: | |
| for item in json.load(f): | |
| existing[item["id"]] = { | |
| "source_text": item["source_text"], | |
| "suggested_text": item["suggested_text"], | |
| "source_lang": item["source_lang"], | |
| "target_lang": item["target_lang"], | |
| "timestamp": item.get("timestamp", datetime.now().isoformat()) | |
| } | |
| except Exception as e: | |
| print(f"Failed to read existing JSON: {e}") | |
| changed = False | |
| for s in suggestions: | |
| s_clean = { | |
| "source_text": s["source_text"], | |
| "suggested_text": s["suggested_text"], | |
| "source_lang": s["source_lang"], | |
| "target_lang": s["target_lang"], | |
| } | |
| existing_entry = existing.get(s["id"]) | |
| if not existing_entry: | |
| changed = True | |
| existing[s["id"]] = {**s_clean, "timestamp": datetime.now().isoformat()} | |
| if not changed: | |
| print("No new suggestions — skipping write/upload.") | |
| return None | |
| # Write merged JSON | |
| final = [] | |
| for id_, data in existing.items(): | |
| final.append({**data, "id": id_}) | |
| with open(JSON_OUTPUT_PATH, "w", encoding="utf-8") as f: | |
| json.dump(final, f, indent=2, ensure_ascii=False) | |
| # Also write CSV | |
| write_csv(final, CSV_OUTPUT_PATH) | |
| return JSON_OUTPUT_PATH | |
| def write_csv(suggestions, csv_path): | |
| with open(csv_path, "w", newline="", encoding="utf-8") as csvfile: | |
| writer = csv.DictWriter(csvfile, fieldnames=[ | |
| "id", "source_text", "suggested_text", "source_lang", "target_lang", "timestamp" | |
| ]) | |
| writer.writeheader() | |
| for item in suggestions: | |
| writer.writerow(item) | |
| def get_checksum(filepath): | |
| if not os.path.exists(filepath): | |
| return None | |
| with open(filepath, "rb") as f: | |
| return hashlib.md5(f.read()).hexdigest() | |
| def upload_if_updated(filepath, dest_path, checksum_file): | |
| if not filepath or not os.path.exists(filepath): | |
| return | |
| new_checksum = get_checksum(filepath) | |
| old_checksum = None | |
| if os.path.exists(checksum_file): | |
| with open(checksum_file, "r") as f: | |
| old_checksum = f.read().strip() | |
| if new_checksum != old_checksum: | |
| print(f"Uploading updated {os.path.basename(dest_path)} to Hugging Face...") | |
| try: | |
| api = HfApi() | |
| api.upload_file( | |
| path_or_fileobj=filepath, | |
| path_in_repo=dest_path, | |
| repo_id=REPO_ID, | |
| repo_type=REPO_TYPE, | |
| token=HF_TOKEN | |
| ) | |
| with open(checksum_file, "w") as f: | |
| f.write(new_checksum) | |
| print(f"Upload successful: {dest_path} at {datetime.now().isoformat()}") | |
| except Exception as e: | |
| print(f"Upload failed for {dest_path}:", e) | |
| else: | |
| print(f"No changes in {os.path.basename(dest_path)} — skipping upload.") | |
| def main(): | |
| print(f"===== Application Startup at {datetime.now().isoformat()} =====") | |
| if not HF_TOKEN: | |
| print("HF_TOKEN not set — skipping upload.") | |
| return | |
| db_path = find_db() | |
| if not db_path: | |
| return | |
| suggestions = extract_suggestions(db_path) | |
| if not suggestions: | |
| print("No suggestions found — skipping.") | |
| return | |
| existing_path = download_existing_json() | |
| merged_json = merge_with_existing(suggestions, existing_path) | |
| if merged_json: | |
| upload_if_updated(JSON_OUTPUT_PATH, DEST_JSON_PATH_IN_REPO, CHECKSUM_FILE_JSON) | |
| upload_if_updated(CSV_OUTPUT_PATH, DEST_CSV_PATH_IN_REPO, CHECKSUM_FILE_CSV) | |
| if __name__ == "__main__": | |
| main() | |