| |
|
|
| import json |
| import tempfile |
| import subprocess |
| from pathlib import Path |
| from concurrent.futures import ProcessPoolExecutor, as_completed |
| from datetime import datetime |
|
|
| |
| INPUT_PATH = Path("data/raw/dockerfiles.jsonl") |
| OUTPUT_PATH = Path("data/labeled/labeled_dockerfiles.jsonl") |
| FAILED_LOG = Path("data/labeled/failed_dockerfiles.jsonl") |
| MISSING_FIXES_LOG = Path("data/labeled/missing_fixes.txt") |
| OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) |
|
|
| FIXES_PATH = Path("data/fixes/fixes.json") |
| HADOLINT_BIN = "hadolint" |
| MAX_WORKERS = 6 |
| TIMEOUT_SECONDS = 5 |
|
|
| |
| with open(FIXES_PATH, encoding="utf-8") as f: |
| FIXES = json.load(f) |
|
|
| MISSING_FIXES = set() |
|
|
| def attach_fixes(rules_triggered: list[str]) -> dict: |
| suggestions = {} |
| for rule in rules_triggered: |
| if rule in FIXES: |
| suggestions[rule] = FIXES[rule] |
| else: |
| MISSING_FIXES.add(rule) |
| return suggestions |
|
|
| def lint_dockerfile(entry: dict) -> dict: |
| try: |
| content = entry["content"] |
| joined = "\n".join(content) |
|
|
| with tempfile.NamedTemporaryFile("w", suffix=".Dockerfile", delete=False) as tmp: |
| tmp.write(joined) |
| tmp.flush() |
| temp_path = tmp.name |
|
|
| result = subprocess.run( |
| [HADOLINT_BIN, temp_path, "-f", "json"], |
| capture_output=True, |
| text=True, |
| timeout=TIMEOUT_SECONDS |
| ) |
|
|
| Path(temp_path).unlink(missing_ok=True) |
|
|
| if result.returncode == 0: |
| return { |
| "label": "good", |
| "rules_triggered": [], |
| "lines": {}, |
| "fix_suggestions": {}, |
| "repo": entry["repo"], |
| "path": entry["path"], |
| "content": content, |
| "timestamp": datetime.now().isoformat() |
| } |
|
|
| try: |
| findings = json.loads(result.stdout) |
| rules = sorted(set(item["code"] for item in findings if "code" in item)) |
| line_map = {} |
| for item in findings: |
| code = item.get("code") |
| line = item.get("line") |
| if code and line: |
| line_map.setdefault(code, line) |
|
|
| fix_suggestions = attach_fixes(rules) |
|
|
| except Exception as e: |
| rules = ["lint-parse-error"] |
| line_map = {} |
| fix_suggestions = {} |
|
|
| return { |
| "label": "bad", |
| "rules_triggered": rules, |
| "lines": line_map, |
| "fix_suggestions": fix_suggestions, |
| "repo": entry["repo"], |
| "path": entry["path"], |
| "content": content, |
| "timestamp": datetime.now().isoformat() |
| } |
|
|
| except subprocess.TimeoutExpired: |
| return { |
| "label": "bad", |
| "rules_triggered": ["lint-timeout"], |
| "lines": {}, |
| "fix_suggestions": {}, |
| "repo": entry.get("repo"), |
| "path": entry.get("path"), |
| "content": entry.get("content"), |
| "timestamp": datetime.now().isoformat() |
| } |
|
|
| except Exception as e: |
| return { |
| "label": "bad", |
| "rules_triggered": [f"lint-error:{str(e)}"], |
| "lines": {}, |
| "fix_suggestions": {}, |
| "repo": entry.get("repo"), |
| "path": entry.get("path"), |
| "content": entry.get("content"), |
| "timestamp": datetime.now().isoformat() |
| } |
|
|
| def main(): |
| with open(INPUT_PATH, encoding="utf-8") as f: |
| records = [json.loads(line) for line in f if line.strip()] |
|
|
| print(f"🚀 Start analizy {len(records)} Dockerfile (wątki={MAX_WORKERS})") |
|
|
| results, failed = [], [] |
|
|
| with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor: |
| futures = [executor.submit(lint_dockerfile, row) for row in records] |
|
|
| for i, future in enumerate(as_completed(futures)): |
| try: |
| result = future.result() |
| if "rules_triggered" not in result: |
| failed.append(result) |
| else: |
| results.append(result) |
| except Exception as e: |
| failed.append({ |
| "label": "bad", |
| "rules_triggered": [f"future-error:{str(e)}"], |
| "lines": {}, |
| "fix_suggestions": {}, |
| "repo": "unknown", |
| "path": "unknown", |
| "content": [], |
| "timestamp": datetime.now().isoformat() |
| }) |
|
|
| if (i + 1) % 250 == 0: |
| print(f" 🔄 {i+1}/{len(records)} przetworzonych...") |
|
|
| with open(OUTPUT_PATH, "w", encoding="utf-8") as f_out: |
| for rec in results: |
| json.dump(rec, f_out) |
| f_out.write("\n") |
|
|
| with open(FAILED_LOG, "w", encoding="utf-8") as f_fail: |
| for rec in failed: |
| json.dump(rec, f_fail) |
| f_fail.write("\n") |
|
|
| if MISSING_FIXES: |
| print(f"\n⚠️ Brakuje fixów dla {len(MISSING_FIXES)} reguł – zapisuję do {MISSING_FIXES_LOG}") |
| with open(MISSING_FIXES_LOG, "w", encoding="utf-8") as f_miss: |
| for rule in sorted(MISSING_FIXES): |
| f_miss.write(rule + "\n") |
| else: |
| print("✅ Wszystkie reguły mają przypisany fix!") |
|
|
| print(f"\n✅ Zapisano {len(results)} Dockerfile z etykietami i fixami → {OUTPUT_PATH}") |
| print(f"❌ Nieudanych: {len(failed)} → {FAILED_LOG}") |
|
|
| if __name__ == "__main__": |
| main() |
|
|