| from fastapi import FastAPI, UploadFile, File, BackgroundTasks, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.responses import HTMLResponse |
| import shutil |
| import os |
| import uuid |
| from services.audio_separator import AudioSeparator |
| from services.chord_analyzer import ChordAnalyzer |
| from services.midi_converter import MidiConverter |
| from services.beat_tracker import BeatTracker |
|
|
| app = FastAPI(title="Tunebase AI Engine", description="High-performance audio separation for Math Rock", version="1.0.0") |
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def read_root(): |
| return """ |
| <!DOCTYPE html> |
| <html> |
| <head> |
| <title>Tunebase</title> |
| <style> |
| body, html { margin: 0; padding: 0; height: 100%; overflow: hidden; } |
| iframe { width: 100%; height: 100%; border: none; } |
| </style> |
| </head> |
| <body> |
| <iframe src="https://tunebase.vercel.app" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe> |
| </body> |
| </html> |
| """ |
|
|
| |
| origins = ["*"] |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=origins, |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| UPLOAD_DIR = "uploads" |
| PROCESSED_DIR = "processed" |
| os.makedirs(UPLOAD_DIR, exist_ok=True) |
| os.makedirs(PROCESSED_DIR, exist_ok=True) |
|
|
| |
| app.mount("/processed", StaticFiles(directory=PROCESSED_DIR), name="processed") |
| |
|
|
| |
| |
| try: |
| separator = AudioSeparator() |
| analyzer = ChordAnalyzer() |
| midi_converter = MidiConverter() |
| beat_tracker = BeatTracker() |
| except Exception as e: |
| print(f"Warning: Failed to load models on startup. {e}") |
| import traceback |
| traceback.print_exc() |
| separator = None |
| analyzer = None |
| midi_converter = None |
| beat_tracker = None |
|
|
| @app.get("/") |
| def read_root(): |
| return {"message": "Tunebase AI Engine Ready 🎸"} |
|
|
| @app.post("/upload") |
| async def upload_audio(background_tasks: BackgroundTasks, file: UploadFile = File(...)): |
| """ |
| Upload file audio dan mulai proses separasi di background. |
| """ |
| file_id = str(uuid.uuid4()) |
| file_ext = file.filename.split(".")[-1] |
| file_path = os.path.join(UPLOAD_DIR, f"{file_id}.{file_ext}") |
| |
| with open(file_path, "wb") as buffer: |
| shutil.copyfileobj(file.file, buffer) |
| |
| return {"id": file_id, "filename": file.filename, "status": "uploaded"} |
|
|
| |
| import time |
| import shutil |
|
|
| def cleanup_old_files(): |
| print("Running cleanup task...") |
| now = time.time() |
| cutoff = now - 3600 |
| |
| for folder in os.listdir(PROCESSED_DIR): |
| folder_path = os.path.join(PROCESSED_DIR, folder) |
| if os.path.isdir(folder_path): |
| try: |
| |
| mtime = os.path.getmtime(folder_path) |
| if mtime < cutoff: |
| print(f"Deleting old session: {folder}") |
| shutil.rmtree(folder_path) |
| except Exception as e: |
| print(f"Error cleaning {folder}: {e}") |
|
|
| |
| processing_status = {} |
|
|
| @app.get("/status/{file_id}") |
| async def get_status(file_id: str): |
| return processing_status.get(file_id, {"status": "unknown", "progress": 0, "step": "Waiting"}) |
|
|
| def update_progress(file_id, step, progress, status="processing", data=None): |
| state = { |
| "status": status, |
| "progress": progress, |
| "step": step |
| } |
| if data: |
| state.update(data) |
| processing_status[file_id] = state |
|
|
| @app.post("/process/{file_id}") |
| def process_audio(file_id: str, background_tasks: BackgroundTasks, mode: str = "4stem"): |
| """ |
| Trigger separasi dan analisis. |
| Modes: 2stem (vocals+instruments), 4stem (default), 6stem (full) |
| """ |
| if not separator: |
| raise HTTPException(status_code=503, detail="AI Engine not initialized") |
|
|
| |
| input_path = None |
| |
| for f in os.listdir(UPLOAD_DIR): |
| if f.startswith(file_id): |
| input_path = os.path.join(UPLOAD_DIR, f) |
| break |
| |
| if not input_path: |
| raise HTTPException(status_code=404, detail="File not found") |
| |
| output_dir = os.path.join(PROCESSED_DIR, file_id) |
| |
| |
| space_host = os.environ.get("SPACE_HOST") |
| if space_host: |
| base_url = f"https://{space_host}/processed" |
| else: |
| base_url = "http://localhost:8000/processed" |
| |
| try: |
| update_progress(file_id, "Separating Audio Stems...", 10) |
| |
| |
| def progress_cb(step, prog): |
| update_progress(file_id, step, prog) |
| |
| stems, duration = separator.separate(input_path, output_dir, callback=progress_cb, mode=mode) |
| |
| update_progress(file_id, "Analyzing Rhythm...", 50) |
| |
| |
| update_progress(file_id, "Converting to MIDI...", 70) |
| midi_files = {} |
| if midi_converter: |
| |
| |
| target_stems = ['piano', 'bass', 'guitar_rhythm', 'guitar_lead', 'vocals'] |
| for stem_name in target_stems: |
| if stem_name in stems: |
| stem_path = stems[stem_name] |
| midi_out = os.path.join(output_dir, f"{stem_name}.mid") |
| if midi_converter.convert(stem_path, midi_out): |
| midi_files[stem_name] = f"{base_url}/{file_id}/{stem_name}.mid" |
| |
| update_progress(file_id, "Analyzing Rhythm...", 85) |
| bpm = 0 |
| beats = [] |
| if beat_tracker: |
| |
| beat_source = stems.get('drums') or stems.get('other') or input_path |
| rhythm_data = beat_tracker.track(beat_source) |
| bpm = rhythm_data['bpm'] |
| beats = rhythm_data['beats'] |
| |
| update_progress(file_id, "Finalizing...", 95) |
| |
| update_progress(file_id, "Finalizing...", 95) |
| |
| |
| stems_url = {k: f"{base_url}/{file_id}/{os.path.basename(v)}" for k, v in stems.items()} |
| |
| final_data = { |
| "stems": stems_url, |
| "midi": midi_files, |
| "bpm": bpm, |
| "beats": beats, |
| "duration": duration |
| } |
| print(f"Final Data for {file_id}: {final_data}") |
| update_progress(file_id, "Completed", 100, status="completed", data=final_data) |
| |
| return { |
| "status": "completed", |
| **final_data |
| } |
| |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| processing_status[file_id] = {"status": "error", "error": str(e)} |
| print(f"Error processing: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|