| |
|
|
| import json |
| from pathlib import Path |
| from collections import Counter |
| from sklearn.model_selection import train_test_split |
| from datasets import Dataset, DatasetDict |
| from transformers import AutoTokenizer |
|
|
| |
| INPUT_PATH = Path("data/labeled/labeled_dockerfiles.jsonl") |
| TOP_RULES_PATH = Path("data/metadata/top_rules.json") |
| OUTPUT_DIR = Path("data/processed/dataset_multilabel_top30") |
| TOKENIZER_NAME = "microsoft/codebert-base" |
| MAX_LENGTH = 512 |
| SEED = 42 |
|
|
| def load_top_rules(): |
| with open(TOP_RULES_PATH, encoding="utf-8") as f: |
| return json.load(f) |
|
|
| def build_dataset(records, top_rules): |
| rule2id = {r: i for i, r in enumerate(top_rules)} |
| data = [] |
| for row in records: |
| if row.get("label") != "bad": |
| continue |
|
|
| triggered = row.get("rules_triggered", []) |
| multilabel = [0] * len(top_rules) |
| matched = False |
|
|
| for rule in triggered: |
| if rule in rule2id: |
| multilabel[rule2id[rule]] = 1 |
| matched = True |
|
|
| if not matched: |
| continue |
|
|
| data.append({ |
| "text": "\n".join(row["content"]) if isinstance(row["content"], list) else str(row["content"]), |
| "labels": multilabel, |
| "meta_lines": row.get("lines", {}), |
| "meta_fixes": row.get("fixes", {}) |
| }) |
|
|
| return data |
|
|
| def main(): |
| print("📥 Wczytywanie danych...") |
| top_rules = load_top_rules() |
| print(f"🔝 Top {len(top_rules)} reguł: {top_rules}") |
|
|
| with INPUT_PATH.open(encoding="utf-8") as f: |
| records = [json.loads(line) for line in f if line.strip()] |
|
|
| dataset = build_dataset(records, top_rules) |
| print(f"📦 Zbudowano {len(dataset)} przykładów multilabel.") |
|
|
| if not dataset: |
| print("❌ Brak danych do przetworzenia. Sprawdź dane wejściowe.") |
| return |
|
|
| print("🔀 Podział na train/val/test...") |
| train_val, test = train_test_split(dataset, test_size=0.1, random_state=SEED) |
| train, val = train_test_split(train_val, test_size=0.1111, random_state=SEED) |
|
|
| ds = DatasetDict({ |
| "train": Dataset.from_list(train), |
| "validation": Dataset.from_list(val), |
| "test": Dataset.from_list(test), |
| }) |
|
|
| print("🔤 Tokenizacja...") |
| tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_NAME) |
|
|
| def tokenize_function(batch): |
| texts = [str(x) if x is not None else "" for x in batch["text"]] |
| return tokenizer( |
| texts, |
| padding="max_length", |
| truncation=True, |
| max_length=MAX_LENGTH |
| ) |
|
|
| ds_tokenized = ds.map( |
| tokenize_function, |
| batched=True, |
| remove_columns=["text", "meta_lines", "meta_fixes"] |
| ) |
|
|
| print(f"💾 Zapisuję do: {OUTPUT_DIR}") |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
| ds_tokenized.save_to_disk(str(OUTPUT_DIR)) |
|
|
| print("✅ Gotowe.") |
|
|
| if __name__ == "__main__": |
| main() |
|
|