| import os |
| import base64 |
| import hashlib |
| import mimetypes |
| from datetime import datetime |
| from io import BytesIO |
|
|
| from flask import Flask, request, jsonify, redirect, render_template |
| from huggingface_hub import HfApi, HfFileSystem |
| from PIL import Image |
| import uuid |
|
|
| app = Flask(__name__) |
|
|
| |
| HF_TOKEN = os.environ.get("HF_TOKEN") |
| DATASET_ID = os.environ.get("DATASET_ID") |
| MAX_SIZE = int(os.environ.get("MAX_SIZE", 10)) * 1024 * 1024 |
| API_KEY = os.environ.get("API_KEY") |
|
|
| fs = HfFileSystem(token=HF_TOKEN) |
| hf_api = HfApi(token=HF_TOKEN) |
|
|
| def validate_auth(): |
| if not API_KEY: |
| return True |
| header_key = request.headers.get("X-API-Key") |
| param_key = request.args.get("key") |
| return header_key == API_KEY or param_key == API_KEY |
|
|
| def generate_filename(data, mime_type=None): |
| ext = mimetypes.guess_extension(mime_type or "application/octet-stream") or ".bin" |
| unique_id = uuid.uuid4().hex[:8] |
| timestamp = datetime.now().strftime("%Y%m%d%H%M%S") |
| return f"{timestamp}-{unique_id}{ext}" |
|
|
| def save_to_dataset(content, filename): |
| path = f"datasets/{DATASET_ID}/resolve/main/images/{filename}" |
| fs.mkdir(f"datasets/{DATASET_ID}/images", exist_ok=True) |
| fs.write_bytes(f"datasets/{DATASET_ID}/images/{filename}", content) |
| return f"https://hf-mirror.com/datasets/{DATASET_ID}/resolve/main/images/{filename}" |
|
|
| @app.route("/", methods=["GET"]) |
| def index(): |
| return render_template("index.html") |
|
|
| @app.route("/api/1/upload", methods=["GET", "POST"]) |
| def upload(): |
| |
| if not validate_auth(): |
| return jsonify({ |
| "status_code": 401, |
| "error": {"message": "Invalid API key"}, |
| "status_txt": "Unauthorized" |
| }), 401 |
|
|
| |
| source = None |
| if request.method == "POST": |
| if "source" in request.files: |
| file = request.files["source"] |
| source = file.read() |
| else: |
| source = request.form.get("source") |
| else: |
| source = request.args.get("source") |
|
|
| |
| if isinstance(source, bytes): |
| content = source |
| elif source.startswith(("http://", "https://")): |
| |
| return jsonify({"error": "URL download not implemented"}), 501 |
| elif source.startswith("data:"): |
| header, data = source.split(",", 1) |
| mime_type = header.split(":")[1].split(";")[0] |
| content = base64.b64decode(data) |
| else: |
| content = base64.b64decode(source) |
|
|
| |
| if len(content) > MAX_SIZE: |
| return jsonify({ |
| "status_code": 413, |
| "error": {"message": f"File exceeds {MAX_SIZE//1024//1024}MB limit"}, |
| "status_txt": "Payload Too Large" |
| }), 413 |
|
|
| |
| try: |
| image = Image.open(BytesIO(content)) |
| width, height = image.size |
| mime_type = image.get_format_mimetype() |
| except: |
| width = height = 0 |
| mime_type = None |
|
|
| |
| filename = generate_filename(content, mime_type) |
| file_url = save_to_dataset(content, filename) |
|
|
| |
| response_format = request.args.get("format", "json") |
| if response_format == "txt": |
| return file_url |
| elif response_format == "redirect": |
| return redirect(file_url, code=302) |
| else: |
| return jsonify({ |
| "status_code": 200, |
| "success": {"message": "file uploaded", "code": 200}, |
| "image": { |
| "filename": filename, |
| "url": file_url, |
| "size": len(content), |
| "width": width, |
| "height": height, |
| "mime": mime_type, |
| "size_formatted": f"{len(content)/1024/1024:.1f} MB" |
| }, |
| "status_txt": "OK" |
| }) |
|
|
| if __name__ == "__main__": |
| app.run(host="0.0.0.0", port=7860, debug=True) |
|
|
|
|