""" ALWAS ML Inference Server FastAPI-based inference API for all 4 ALWAS ML models. Designed to replace the Groq API dependency and add predictive capabilities. """ import os import json import numpy as np import joblib from datetime import datetime, timedelta from typing import Optional, List from pydantic import BaseModel, Field # === Load Models & Configs === MODEL_DIR = os.environ.get('MODEL_DIR', './models') # Load all models hours_model = joblib.load(f'{MODEL_DIR}/hours_estimator.joblib') complexity_xgb = joblib.load(f'{MODEL_DIR}/complexity_xgb.joblib') complexity_lgb = joblib.load(f'{MODEL_DIR}/complexity_lgb.joblib') bottleneck_model = joblib.load(f'{MODEL_DIR}/bottleneck_predictor.joblib') completion_model = joblib.load(f'{MODEL_DIR}/completion_predictor.joblib') # Load encoders tech_node_encoder = joblib.load(f'{MODEL_DIR}/tech_node_encoder.joblib') block_type_encoder = joblib.load(f'{MODEL_DIR}/block_type_encoder.joblib') priority_encoder = joblib.load(f'{MODEL_DIR}/priority_encoder.joblib') complexity_encoder = joblib.load(f'{MODEL_DIR}/complexity_encoder.joblib') bottleneck_encoder = joblib.load(f'{MODEL_DIR}/bottleneck_encoder.joblib') # Load config with open(f'{MODEL_DIR}/feature_config.json', 'r') as f: feature_config = json.load(f) with open(f'{MODEL_DIR}/metrics.json', 'r') as f: model_metrics = json.load(f) # === Pydantic Models === class BlockEstimateRequest(BaseModel): """Request for complexity & hours estimation (replaces Groq API).""" block_type: str = Field(..., description="Block type (e.g., 'ADC', 'PLL', 'LDO')") tech_node: str = Field(..., description="Technology node (e.g., '7nm', '28nm')") priority: str = Field(default='P3-Medium', description="Priority level") transistor_count: Optional[int] = Field(default=None, description="Estimated transistor count") has_dependencies: bool = Field(default=False) num_dependencies: int = Field(default=0) constraint_complexity: float = Field(default=1.0, ge=0, le=3.0) drc_iterations: int = Field(default=2) engineer_skill_factor: float = Field(default=1.0, ge=0.5, le=1.5) class BottleneckRequest(BaseModel): """Request for bottleneck risk prediction.""" block_type: str tech_node: str priority: str = 'P3-Medium' transistor_count: Optional[int] = None has_dependencies: bool = False num_dependencies: int = 0 constraint_complexity: float = 1.0 estimated_hours: float = 20.0 hours_logged: float = 0.0 drc_iterations: int = 2 drc_violations_total: int = 0 lvs_mismatches_total: int = 0 current_stage: str = 'In Progress' days_in_current_stage: float = 0.0 engineer_skill_factor: float = 1.0 is_overdue: bool = False class CompletionRequest(BaseModel): """Request for completion time prediction.""" block_type: str tech_node: str priority: str = 'P3-Medium' transistor_count: Optional[int] = None has_dependencies: bool = False num_dependencies: int = 0 constraint_complexity: float = 1.0 estimated_hours: float = 20.0 engineer_skill_factor: float = 1.0 drc_iterations: int = 2 current_stage: str = 'In Progress' cumulative_hours: float = 0.0 cumulative_days: float = 0.0 cumulative_drc_violations: int = 0 cumulative_lvs_mismatches: int = 0 class BulkBlockRequest(BaseModel): """Bulk estimation for multiple blocks.""" blocks: List[BlockEstimateRequest] # === Helper Functions === STAGES = ['Not Started', 'In Progress', 'DRC', 'LVS', 'ERC', 'Review', 'Completed'] STAGE_IDX = {s: i for i, s in enumerate(STAGES)} PRIORITY_MAP = {'P1-Critical': 1, 'P2-High': 2, 'P3-Medium': 3, 'P4-Low': 4} # Default transistor counts by block type DEFAULT_TRANSISTOR_COUNTS = { 'ADC': 50000, 'DAC': 35000, 'PLL': 80000, 'LDO': 8000, 'BGR': 5000, 'OTA': 3000, 'Comparator': 2000, 'SerDes': 120000, 'VCO': 15000, 'Mixer': 10000, 'LNA': 6000, 'PA': 20000, 'TIA': 4000, 'SampleHold': 3500, 'LVDS_Driver': 8000, 'BandgapRef': 3000, 'CurrentMirror': 1500, 'DiffAmp': 2500, 'Oscillator': 12000, 'PowerDetector': 5000 } def safe_encode(encoder, value, default=0): """Safely encode a value, returning default if unknown.""" try: return encoder.transform([value])[0] except (ValueError, KeyError): return default def safe_priority_encode(priority): """Encode priority safely.""" try: return priority_encoder.transform([[priority]])[0][0] except: return 2 # default to medium def get_transistor_count(block_type, provided_count): """Get transistor count, using default if not provided.""" if provided_count and provided_count > 0: return provided_count return DEFAULT_TRANSISTOR_COUNTS.get(block_type, 10000) # === Prediction Functions === def predict_complexity_and_hours(req: BlockEstimateRequest): """Predict complexity class and estimated hours for a new block.""" tc = get_transistor_count(req.block_type, req.transistor_count) tc_log = np.log1p(tc) tech_enc = safe_encode(tech_node_encoder, req.tech_node) type_enc = safe_encode(block_type_encoder, req.block_type) priority_enc = safe_priority_encode(req.priority) priority_num = PRIORITY_MAP.get(req.priority, 3) type_node_int = tech_enc * 10 + type_enc complexity_score = req.constraint_complexity * tc_log size_priority_int = tc_log * priority_num # Hours estimation features hours_features = np.array([[ tech_enc, type_enc, priority_enc, tc, tc_log, int(req.has_dependencies), req.num_dependencies, req.constraint_complexity, req.drc_iterations, req.engineer_skill_factor, type_node_int, complexity_score, size_priority_int ]]) estimated_hours = float(hours_model.predict(hours_features)[0]) estimated_hours = max(4.0, round(estimated_hours, 1)) # Complexity classification features complexity_features = np.array([[ tech_enc, type_enc, priority_enc, tc, tc_log, int(req.has_dependencies), req.num_dependencies, req.constraint_complexity, req.drc_iterations, type_node_int, complexity_score, size_priority_int ]]) xgb_proba = complexity_xgb.predict_proba(complexity_features)[0] lgb_proba = complexity_lgb.predict_proba(complexity_features)[0] ensemble_proba = (xgb_proba + lgb_proba) / 2 complexity_idx = int(np.argmax(ensemble_proba)) complexity_label = complexity_encoder.classes_[complexity_idx] confidence = float(ensemble_proba[complexity_idx]) # Generate reasoning reasoning = generate_reasoning(req, complexity_label, estimated_hours, tc) # Risk assessment risk_level = 'low' if complexity_label == 'Low' else ('medium' if complexity_label == 'Medium' else 'high') # Suggested skill level skill_needed = 'senior' if complexity_label == 'High' else ('mid' if complexity_label == 'Medium' else 'junior') return { 'complexity': complexity_label, 'estimated_hours': estimated_hours, 'confidence': round(confidence, 3), 'risk_level': risk_level, 'reasoning': reasoning, 'recommended_drc_iterations': max(req.drc_iterations, 2 if complexity_label == 'High' else 1), 'suggested_engineer_skill_level': skill_needed, 'complexity_probabilities': { cls: round(float(p), 3) for cls, p in zip(complexity_encoder.classes_, ensemble_proba) }, 'estimated_days': round(estimated_hours / 8, 1), 'model_version': '1.0.0', } def generate_reasoning(req, complexity, hours, tc): """Generate human-readable reasoning for the estimate.""" parts = [] if complexity == 'High': if req.tech_node in ['5nm', '7nm', '12nm']: parts.append(f"Advanced {req.tech_node} node requires extensive DRC/LVS iterations with tight design rules") if tc > 50000: parts.append(f"Large transistor count (~{tc:,}) significantly increases layout complexity and verification time") if req.block_type in ['PLL', 'SerDes', 'ADC', 'PA']: parts.append(f"{req.block_type} blocks require precision analog matching and careful signal routing") if req.has_dependencies: parts.append(f"Inter-block dependencies ({req.num_dependencies}) add integration and timing closure overhead") if req.constraint_complexity > 2.0: parts.append(f"High constraint complexity ({req.constraint_complexity:.1f}/3.0) demands extensive floor planning") elif complexity == 'Medium': parts.append(f"{req.block_type} at {req.tech_node} presents moderate layout challenges") if req.constraint_complexity > 1.5: parts.append("Analog constraints require careful floor planning and routing") if tc > 20000: parts.append(f"Moderate transistor count (~{tc:,}) requires systematic verification") else: parts.append(f"{req.block_type} at {req.tech_node} is a well-characterized block with established layout patterns") if tc < 10000: parts.append("Small transistor count allows straightforward layout") if not parts: parts.append(f"Standard {req.block_type} layout at {req.tech_node} technology node") parts.append(f"Estimated {hours:.0f} hours ({hours/8:.1f} working days) for completion") return "; ".join(parts) + "." def predict_bottleneck_risk(req: BottleneckRequest): """Predict bottleneck risk for a block.""" tc = get_transistor_count(req.block_type, req.transistor_count) tc_log = np.log1p(tc) tech_enc = safe_encode(tech_node_encoder, req.tech_node) type_enc = safe_encode(block_type_encoder, req.block_type) priority_enc = safe_priority_encode(req.priority) complexity_score = req.constraint_complexity * tc_log hours_ratio = req.hours_logged / max(req.estimated_hours, 1) stage_idx = STAGE_IDX.get(req.current_stage, 1) hours_budget_pct = req.hours_logged / max(req.estimated_hours, 1) * 100 stage_velocity = req.hours_logged / max(stage_idx, 1) features = np.array([[ tech_enc, type_enc, priority_enc, tc_log, int(req.has_dependencies), req.num_dependencies, req.constraint_complexity, req.estimated_hours, req.hours_logged, req.drc_iterations, req.drc_violations_total, req.lvs_mismatches_total, stage_idx, req.engineer_skill_factor, complexity_score, hours_budget_pct, stage_velocity ]]) risk_idx = bottleneck_model.predict(features)[0] risk_proba = bottleneck_model.predict_proba(features)[0] risk_label = bottleneck_encoder.classes_[risk_idx] # Generate actionable recommendations recommendations = [] if risk_label == 'High': if hours_ratio > 1.3: recommendations.append("Block is significantly over budget — consider reassignment or scope review") if req.days_in_current_stage > 5: recommendations.append(f"Block stuck in {req.current_stage} for {req.days_in_current_stage:.0f} days — escalate to manager") if req.drc_violations_total > 10: recommendations.append(f"High DRC violations ({req.drc_violations_total}) — review design rule compliance") if req.is_overdue: recommendations.append("Block is past due date — prioritize or adjust timeline") elif risk_label == 'Medium': if hours_ratio > 1.0: recommendations.append("Hours approaching estimate — monitor closely") if req.days_in_current_stage > 3: recommendations.append(f"Consider checking progress — {req.days_in_current_stage:.0f} days in {req.current_stage}") return { 'risk_level': risk_label, 'confidence': round(float(risk_proba[risk_idx]), 3), 'risk_probabilities': { cls: round(float(p), 3) for cls, p in zip(bottleneck_encoder.classes_, risk_proba) }, 'hours_over_budget_ratio': round(hours_ratio, 2), 'recommendations': recommendations if recommendations else ['Block progressing normally'], 'should_alert': risk_label == 'High', 'model_version': '1.0.0', } def predict_completion_time(req: CompletionRequest): """Predict remaining hours to completion.""" tc = get_transistor_count(req.block_type, req.transistor_count) tc_log = np.log1p(tc) tech_enc = safe_encode(tech_node_encoder, req.tech_node) type_enc = safe_encode(block_type_encoder, req.block_type) priority_num = PRIORITY_MAP.get(req.priority, 3) stage_idx = STAGE_IDX.get(req.current_stage, 1) stages_completed = stage_idx hours_ratio = req.cumulative_hours / max(req.estimated_hours, 1) avg_hours_per_stage = req.cumulative_hours / max(stages_completed, 1) avg_days_per_stage = req.cumulative_days / max(stages_completed, 1) features = np.array([[ tech_enc, type_enc, priority_num, tc_log, int(req.has_dependencies), req.num_dependencies, req.constraint_complexity, req.estimated_hours, req.engineer_skill_factor, req.drc_iterations, stage_idx, req.cumulative_hours, req.cumulative_days, req.cumulative_drc_violations, req.cumulative_lvs_mismatches, hours_ratio, stages_completed, avg_hours_per_stage, avg_days_per_stage ]]) remaining_hours = float(completion_model.predict(features)[0]) remaining_hours = max(0, round(remaining_hours, 1)) remaining_days = remaining_hours / 8 # Project completion date now = datetime.now() estimated_completion = now + timedelta(days=remaining_days) return { 'remaining_hours': remaining_hours, 'remaining_days': round(remaining_days, 1), 'estimated_completion_date': estimated_completion.strftime('%Y-%m-%d'), 'total_estimated_hours': round(req.cumulative_hours + remaining_hours, 1), 'progress_percent': round(req.cumulative_hours / max(req.cumulative_hours + remaining_hours, 1) * 100, 1), 'current_stage': req.current_stage, 'model_version': '1.0.0', } # === FastAPI App === try: from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware import uvicorn app = FastAPI( title="ALWAS ML API", description="Machine Learning models for the Analog Layout Workflow Automation System", version="1.0.0", ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") def root(): return { "service": "ALWAS ML API", "version": "1.0.0", "models": { "hours_estimation": model_metrics.get('hours_estimation', {}), "complexity_classification": model_metrics.get('complexity_classification', {}), "bottleneck_prediction": model_metrics.get('bottleneck_prediction', {}), "completion_prediction": model_metrics.get('completion_prediction', {}), }, "endpoints": [ "/predict/estimate", "/predict/bottleneck", "/predict/completion", "/predict/bulk-estimate", "/health", ] } @app.get("/health") def health(): return {"status": "healthy", "models_loaded": 5, "timestamp": datetime.now().isoformat()} @app.post("/predict/estimate") def estimate_block(req: BlockEstimateRequest): """Estimate complexity and hours for a new block. Direct replacement for Groq AI estimation.""" try: return predict_complexity_and_hours(req) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/predict/bottleneck") def predict_bottleneck(req: BottleneckRequest): """Predict bottleneck risk for an in-progress block.""" try: return predict_bottleneck_risk(req) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/predict/completion") def predict_completion(req: CompletionRequest): """Predict remaining time to completion.""" try: return predict_completion_time(req) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/predict/bulk-estimate") def bulk_estimate(req: BulkBlockRequest): """Bulk estimation for multiple blocks at once.""" try: results = [predict_complexity_and_hours(block) for block in req.blocks] return { "count": len(results), "estimates": results, "total_estimated_hours": round(sum(r['estimated_hours'] for r in results), 1), } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/model/metrics") def get_metrics(): """Get model performance metrics.""" return model_metrics @app.get("/model/supported-values") def get_supported_values(): """Get list of supported block types, tech nodes, etc.""" return { "tech_nodes": feature_config['tech_nodes'], "block_types": feature_config['block_types'], "priorities": feature_config['priorities'], "stages": STAGES, "complexity_classes": feature_config['complexity_classes'], "bottleneck_classes": feature_config['bottleneck_classes'], } HAS_FASTAPI = True except ImportError: HAS_FASTAPI = False print("FastAPI not installed — running in library mode only") # === Standalone Test === if __name__ == '__main__': print("=" * 60) print("ALWAS ML Inference Server — Test Mode") print("=" * 60) # Test 1: Complexity & Hours estimation print("\n--- Test: PLL at 7nm ---") result = predict_complexity_and_hours(BlockEstimateRequest( block_type='PLL', tech_node='7nm', priority='P1-Critical', transistor_count=80000, has_dependencies=True, num_dependencies=3, constraint_complexity=2.5, drc_iterations=4, engineer_skill_factor=0.8 )) print(json.dumps(result, indent=2)) # Test 2: Simple block print("\n--- Test: CurrentMirror at 45nm ---") result = predict_complexity_and_hours(BlockEstimateRequest( block_type='CurrentMirror', tech_node='45nm', priority='P4-Low', transistor_count=1500, constraint_complexity=0.5 )) print(json.dumps(result, indent=2)) # Test 3: Bottleneck risk print("\n--- Test: Bottleneck Risk ---") result = predict_bottleneck_risk(BottleneckRequest( block_type='ADC', tech_node='7nm', priority='P1-Critical', estimated_hours=60, hours_logged=80, drc_violations_total=15, current_stage='DRC', days_in_current_stage=7, is_overdue=True )) print(json.dumps(result, indent=2)) # Test 4: Completion time print("\n--- Test: Completion Prediction ---") result = predict_completion_time(CompletionRequest( block_type='DAC', tech_node='12nm', priority='P2-High', estimated_hours=40, current_stage='LVS', cumulative_hours=25, cumulative_days=4, cumulative_drc_violations=5 )) print(json.dumps(result, indent=2)) # Start server if FastAPI available if HAS_FASTAPI: print(f"\n{'=' * 60}") print("Starting ALWAS ML API server on http://0.0.0.0:7860") print("=" * 60) uvicorn.run(app, host="0.0.0.0", port=7860)