"""Orchestration layer for multi-domain code analysis.""" from __future__ import annotations import time from typing import Any, Callable, Dict from analyzers import analyze_data_science_code, analyze_dsa_code, analyze_ml_code, analyze_web_code from models import PyTorchCodeAnalyzerModel from schemas.request import AnalyzeCodeRequest from schemas.response import AnalyzeCodeResponse, DomainAnalysis, StaticAnalysisSummary from services.reward_service import RewardService from services.suggestion_service import SuggestionService from utils import estimate_complexity, parse_code_structure def _lint_score(parsed: Dict[str, Any]) -> float: """Convert structural smells into a normalized lint-style score.""" score = 1.0 if not parsed.get("syntax_valid", True): score -= 0.45 score -= min(parsed.get("long_lines", 0), 5) * 0.03 if parsed.get("tabs_used"): score -= 0.1 if parsed.get("trailing_whitespace_lines"): score -= 0.05 if parsed.get("docstring_ratio", 0.0) == 0.0 and parsed.get("function_names"): score -= 0.08 return round(max(0.0, min(1.0, score)), 4) class AnalysisService: """End-to-end analysis pipeline shared by API and UI.""" def __init__(self) -> None: self.model = PyTorchCodeAnalyzerModel() self.reward_service = RewardService() self.suggestion_service = SuggestionService() self._analyzers: Dict[str, Callable[[str, Dict[str, Any], Dict[str, Any]], DomainAnalysis]] = { "dsa": analyze_dsa_code, "data_science": analyze_data_science_code, "ml_dl": analyze_ml_code, "web": analyze_web_code, } def _heuristic_domain_scores(self, parsed: Dict[str, Any], code: str) -> Dict[str, float]: """Derive domain priors from imports and syntax-level hints.""" scores = { "dsa": 0.2 + (0.15 if parsed.get("uses_recursion") else 0.0) + (0.15 if parsed.get("max_loop_depth", 0) >= 1 else 0.0), "data_science": 0.2 + (0.35 if parsed.get("uses_pandas") or parsed.get("uses_numpy") else 0.0), "ml_dl": 0.2 + (0.35 if parsed.get("uses_torch") or parsed.get("uses_sklearn") else 0.0), "web": 0.2 + (0.35 if parsed.get("uses_fastapi") or parsed.get("uses_flask") else 0.0) + (0.1 if parsed.get("route_decorators") else 0.0), "general": 0.2, } if "fastapi" in code.lower(): scores["web"] += 0.1 if "pandas" in code.lower() or "numpy" in code.lower(): scores["data_science"] += 0.1 if "torch" in code.lower(): scores["ml_dl"] += 0.1 if "while" in code or "for" in code: scores["dsa"] += 0.05 return {key: round(min(value, 0.99), 4) for key, value in scores.items()} def analyze(self, request: AnalyzeCodeRequest) -> AnalyzeCodeResponse: """Run the complete multi-domain analysis pipeline.""" started = time.perf_counter() parsed = parse_code_structure(request.code) complexity = estimate_complexity(parsed, request.code) model_prediction = self.model.predict(request.code, request.context_window, parsed) heuristic_scores = self._heuristic_domain_scores(parsed, request.code) combined_scores = {} for domain, heuristic_score in heuristic_scores.items(): model_score = float(model_prediction["domain_scores"].get(domain, 0.2)) combined_scores[domain] = round((0.6 * model_score) + (0.4 * heuristic_score), 4) detected_domain = request.domain_hint if request.domain_hint != "auto" else max(combined_scores, key=combined_scores.get) analyzer = self._analyzers.get(detected_domain) domain_analysis = ( analyzer(request.code, parsed, complexity) if analyzer is not None else DomainAnalysis( domain="general", domain_score=0.6, issues=[], suggestions=["Add stronger domain-specific context for deeper analysis."], highlights={}, ) ) lint_score = _lint_score(parsed) score_breakdown = self.reward_service.compute( ml_score=float(model_prediction["ml_quality_score"]), domain_score=domain_analysis.domain_score, lint_score=lint_score, complexity_penalty=float(complexity["complexity_penalty"]), ) static_analysis = StaticAnalysisSummary( syntax_valid=bool(parsed["syntax_valid"]), syntax_error=str(parsed["syntax_error"]), cyclomatic_complexity=int(complexity["cyclomatic_complexity"]), line_count=int(parsed["line_count"]), max_loop_depth=int(parsed["max_loop_depth"]), time_complexity=str(complexity["time_complexity"]), space_complexity=str(complexity["space_complexity"]), detected_imports=list(parsed["imports"]), code_smells=list(parsed["code_smells"]), ) improvement_plan = self.suggestion_service.build_improvement_plan( domain_analysis=domain_analysis, static_analysis=static_analysis, ) summary = ( f"Detected `{detected_domain}` code with a model score of {score_breakdown.ml_score:.0%}, " f"domain score {score_breakdown.domain_score:.0%}, and final reward {score_breakdown.reward:.0%}." ) return AnalyzeCodeResponse( detected_domain=detected_domain, # type: ignore[arg-type] domain_confidences=combined_scores, score_breakdown=score_breakdown, static_analysis=static_analysis, domain_analysis=domain_analysis, improvement_plan=improvement_plan, model_backend=str(model_prediction["backend_name"]), model_id=str(model_prediction["model_id"]), summary=summary, context_window=request.context_window, analysis_time_ms=round((time.perf_counter() - started) * 1000.0, 2), )