### ------------------------------------------------------------------------------------------------ ### ### First: do `apt-get update && apt-get install -y fluidsynth` and `pip install miditok midi2audio` ### ### ------------------------------------------------------------------------------------------------ ### ### IMPORTS ### import os import requests import zipfile import numpy as np from miditok import REMI from pathlib import Path from tqdm import tqdm import torch import torch.nn as nn from torch.nn import functional as F import time ### DATA LOADING ### MIDI_URL = "https://storage.googleapis.com/magentadata/datasets/maestro/v3.0.0/maestro-v3.0.0-midi.zip" ZIP_FILE = "maestro_midi.zip" EXTRACT_PATH = "maestro_raw" DATA_DIR = "data" os.makedirs(DATA_DIR, exist_ok=True) def download_and_prepare(): if not os.path.exists(ZIP_FILE): print("Downloading MIDI dataset...") r = requests.get(MIDI_URL) with open(ZIP_FILE, "wb") as f: f.write(r.content) if not os.path.exists(EXTRACT_PATH): print("Unpacking files...") with zipfile.ZipFile(ZIP_FILE, 'r') as zip_ref: zip_ref.extractall(EXTRACT_PATH) config = TokenizerConfig( num_velocities=16, use_chords=True, use_tempos=True, use_time_signatures=True ) tokenizer = REMI(config) all_tokens = [] midi_paths = list(Path(EXTRACT_PATH).rglob("*.mid*")) print(f"Tokenizing {len(midi_paths)} MIDI files...") for path in tqdm(midi_paths): try: midi_tokens = tokenizer(path) if isinstance(midi_tokens, list): ids = midi_tokens[0].ids else: ids = midi_tokens.ids if len(ids) > 0: all_tokens.extend(ids) except Exception as e: continue if len(all_tokens) == 0: print("ERROR: No tokens processed!") return data = np.array(all_tokens, dtype=np.uint16) n = len(data) train_data = data[:int(n*0.9)] val_data = data[int(n*0.9):] train_data.tofile(os.path.join(DATA_DIR, 'train.bin')) val_data.tofile(os.path.join(DATA_DIR, 'val.bin')) print(f"Preparation done!") print(f"Train Tokens: {len(train_data)} | Val Tokens: {len(val_data)}") print(f"Vocab size: {len(tokenizer)}") download_and_prepare() ### TRAINING ### batch_size = 64 block_size = 1024 max_iters = 20000 learning_rate = 5e-4 gradient_accumulation_steps = 4 eval_interval = 250 eval_iters = 100 n_embd = 512 n_head = 8 n_layer = 8 dropout = 0.3 vocab_size = 387 data_dir = 'data' checkpoint_path = 'tinymozart_ckpt.pt' best_model_path = 'tinymozart_best.pt' log_path = 'training_log.txt' device = 'cuda' def get_batch(data): ix = torch.randint(len(data) - block_size, (batch_size,)) x = torch.stack([torch.from_numpy((data[i:i+block_size]).astype(np.int64)) for i in ix]) y = torch.stack([torch.from_numpy((data[i+1:i+block_size+1]).astype(np.int64)) for i in ix]) return x.to(device), y.to(device) @torch.no_grad() def estimate_loss(model, train_data, val_data): out = {} model.eval() for split, data in [('train', train_data), ('val', val_data)]: losses = torch.zeros(eval_iters) for k in range(eval_iters): x, y = get_batch(data) _, loss = model(x, y) losses[k] = loss.mean().item() out[split] = losses.mean() model.train() return out # --- 3. Architektur --- class MultiHeadAttention(nn.Module): def __init__(self, num_heads, head_size): super().__init__() self.num_heads = num_heads self.head_size = head_size self.c_attn = nn.Linear(n_embd, 3 * n_embd, bias=False) self.c_proj = nn.Linear(n_embd, n_embd) self.dropout = dropout def forward(self, x): B, T, C = x.size() qkv = self.c_attn(x) q, k, v = qkv.split(n_embd, dim=2) q = q.view(B, T, self.num_heads, self.head_size).transpose(1, 2) k = k.view(B, T, self.num_heads, self.head_size).transpose(1, 2) v = v.view(B, T, self.num_heads, self.head_size).transpose(1, 2) y = F.scaled_dot_product_attention( q, k, v, dropout_p=self.dropout if self.training else 0.0, is_causal=True ) y = y.transpose(1, 2).contiguous().view(B, T, C) return self.c_proj(y) class FeedForward(nn.Module): def __init__(self, n_embd): super().__init__() self.net = nn.Sequential(nn.Linear(n_embd, 4 * n_embd), nn.GELU(), nn.Linear(4 * n_embd, n_embd), nn.Dropout(dropout)) def forward(self, x): return self.net(x) class Block(nn.Module): def __init__(self, n_embd, n_head): super().__init__() head_size = n_embd // n_head self.sa = MultiHeadAttention(n_head, head_size) self.ffwd = FeedForward(n_embd) self.ln1, self.ln2 = nn.LayerNorm(n_embd), nn.LayerNorm(n_embd) def forward(self, x): x = x + self.sa(self.ln1(x)) x = x + self.ffwd(self.ln2(x)) return x class TinyMozart(nn.Module): def __init__(self, vocab_size): super().__init__() self.token_embedding_table = nn.Embedding(vocab_size, n_embd) self.position_embedding_table = nn.Embedding(block_size, n_embd) self.blocks = nn.Sequential(*[Block(n_embd, n_head) for _ in range(n_layer)]) self.ln_f = nn.LayerNorm(n_embd) self.lm_head = nn.Linear(n_embd, vocab_size) def forward(self, idx, targets=None): B, T = idx.shape x = self.token_embedding_table(idx) + self.position_embedding_table(torch.arange(T, device=idx.device)) x = self.blocks(x) logits = self.lm_head(self.ln_f(x)) loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1)) if targets is not None else None return logits, loss def train(): train_data = np.fromfile(os.path.join(data_dir, 'train.bin'), dtype=np.uint16) val_data = np.fromfile(os.path.join(data_dir, 'val.bin'), dtype=np.uint16) model = TinyMozart(vocab_size).to(device) if torch.cuda.device_count() > 1: print(f"🚀 Using {torch.cuda.device_count()} GPUs!") model = nn.DataParallel(model) optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=0.1) scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=max_iters) start_iter = 0 best_val_loss = float('inf') target_ckpt = checkpoint_path if os.path.exists(checkpoint_path) else (best_model_path if os.path.exists(best_model_path) else None) if target_ckpt: print(f"Loading checkpoint from {target_ckpt}...") checkpoint = torch.load(target_ckpt, map_location=device) model.load_state_dict(checkpoint['model_state_dict']) optimizer.load_state_dict(checkpoint['optimizer_state_dict']) start_iter = checkpoint['iter'] best_val_loss = checkpoint.get('best_val_loss', float('inf')) print(f"Resuming from iter {start_iter} with best_val_loss {best_val_loss:.4f}") model.train() t0 = time.time() for iter in range(start_iter, max_iters): optimizer.zero_grad(set_to_none=True) accum_loss = 0 for _ in range(gradient_accumulation_steps): xb, yb = get_batch(train_data) logits, loss = model(xb, yb) loss = loss.mean() / gradient_accumulation_steps loss.backward() accum_loss += loss.item() torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() scheduler.step() if iter % 50 == 0: dt = time.time() - t0 t0 = time.time() print(f"Iter {iter}: Loss {accum_loss:.4f} | {dt*1000/50:.1f}ms/step", flush=True) if iter % eval_interval == 0: losses = estimate_loss(model, train_data, val_data) print(f">>> EVAL {iter}: Train {losses['train']:.4f}, Val {losses['val']:.4f}", flush=True) with open(log_path, 'a') as f: f.write(f"{iter},{losses['train']:.4f},{losses['val']:.4f}\n") checkpoint = { 'iter': iter, 'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict(), 'best_val_loss': best_val_loss } torch.save(checkpoint, checkpoint_path) if losses['val'] < best_val_loss: best_val_loss = losses['val'] checkpoint['best_val_loss'] = best_val_loss torch.save(checkpoint, best_model_path) print(f"✨ New best model saved! (Loss: {best_val_loss:.4f})") if __name__ == "__main__": train()