alwas-ml-models / inference_server.py
muthuk1's picture
Upload inference_server.py with huggingface_hub
1898b4d verified
"""
ALWAS ML Inference Server
FastAPI-based inference API for all 4 ALWAS ML models.
Designed to replace the Groq API dependency and add predictive capabilities.
"""
import os
import json
import numpy as np
import joblib
from datetime import datetime, timedelta
from typing import Optional, List
from pydantic import BaseModel, Field
# === Load Models & Configs ===
MODEL_DIR = os.environ.get('MODEL_DIR', './models')
# Load all models
hours_model = joblib.load(f'{MODEL_DIR}/hours_estimator.joblib')
complexity_xgb = joblib.load(f'{MODEL_DIR}/complexity_xgb.joblib')
complexity_lgb = joblib.load(f'{MODEL_DIR}/complexity_lgb.joblib')
bottleneck_model = joblib.load(f'{MODEL_DIR}/bottleneck_predictor.joblib')
completion_model = joblib.load(f'{MODEL_DIR}/completion_predictor.joblib')
# Load encoders
tech_node_encoder = joblib.load(f'{MODEL_DIR}/tech_node_encoder.joblib')
block_type_encoder = joblib.load(f'{MODEL_DIR}/block_type_encoder.joblib')
priority_encoder = joblib.load(f'{MODEL_DIR}/priority_encoder.joblib')
complexity_encoder = joblib.load(f'{MODEL_DIR}/complexity_encoder.joblib')
bottleneck_encoder = joblib.load(f'{MODEL_DIR}/bottleneck_encoder.joblib')
# Load config
with open(f'{MODEL_DIR}/feature_config.json', 'r') as f:
feature_config = json.load(f)
with open(f'{MODEL_DIR}/metrics.json', 'r') as f:
model_metrics = json.load(f)
# === Pydantic Models ===
class BlockEstimateRequest(BaseModel):
"""Request for complexity & hours estimation (replaces Groq API)."""
block_type: str = Field(..., description="Block type (e.g., 'ADC', 'PLL', 'LDO')")
tech_node: str = Field(..., description="Technology node (e.g., '7nm', '28nm')")
priority: str = Field(default='P3-Medium', description="Priority level")
transistor_count: Optional[int] = Field(default=None, description="Estimated transistor count")
has_dependencies: bool = Field(default=False)
num_dependencies: int = Field(default=0)
constraint_complexity: float = Field(default=1.0, ge=0, le=3.0)
drc_iterations: int = Field(default=2)
engineer_skill_factor: float = Field(default=1.0, ge=0.5, le=1.5)
class BottleneckRequest(BaseModel):
"""Request for bottleneck risk prediction."""
block_type: str
tech_node: str
priority: str = 'P3-Medium'
transistor_count: Optional[int] = None
has_dependencies: bool = False
num_dependencies: int = 0
constraint_complexity: float = 1.0
estimated_hours: float = 20.0
hours_logged: float = 0.0
drc_iterations: int = 2
drc_violations_total: int = 0
lvs_mismatches_total: int = 0
current_stage: str = 'In Progress'
days_in_current_stage: float = 0.0
engineer_skill_factor: float = 1.0
is_overdue: bool = False
class CompletionRequest(BaseModel):
"""Request for completion time prediction."""
block_type: str
tech_node: str
priority: str = 'P3-Medium'
transistor_count: Optional[int] = None
has_dependencies: bool = False
num_dependencies: int = 0
constraint_complexity: float = 1.0
estimated_hours: float = 20.0
engineer_skill_factor: float = 1.0
drc_iterations: int = 2
current_stage: str = 'In Progress'
cumulative_hours: float = 0.0
cumulative_days: float = 0.0
cumulative_drc_violations: int = 0
cumulative_lvs_mismatches: int = 0
class BulkBlockRequest(BaseModel):
"""Bulk estimation for multiple blocks."""
blocks: List[BlockEstimateRequest]
# === Helper Functions ===
STAGES = ['Not Started', 'In Progress', 'DRC', 'LVS', 'ERC', 'Review', 'Completed']
STAGE_IDX = {s: i for i, s in enumerate(STAGES)}
PRIORITY_MAP = {'P1-Critical': 1, 'P2-High': 2, 'P3-Medium': 3, 'P4-Low': 4}
# Default transistor counts by block type
DEFAULT_TRANSISTOR_COUNTS = {
'ADC': 50000, 'DAC': 35000, 'PLL': 80000, 'LDO': 8000, 'BGR': 5000,
'OTA': 3000, 'Comparator': 2000, 'SerDes': 120000, 'VCO': 15000,
'Mixer': 10000, 'LNA': 6000, 'PA': 20000, 'TIA': 4000, 'SampleHold': 3500,
'LVDS_Driver': 8000, 'BandgapRef': 3000, 'CurrentMirror': 1500,
'DiffAmp': 2500, 'Oscillator': 12000, 'PowerDetector': 5000
}
def safe_encode(encoder, value, default=0):
"""Safely encode a value, returning default if unknown."""
try:
return encoder.transform([value])[0]
except (ValueError, KeyError):
return default
def safe_priority_encode(priority):
"""Encode priority safely."""
try:
return priority_encoder.transform([[priority]])[0][0]
except:
return 2 # default to medium
def get_transistor_count(block_type, provided_count):
"""Get transistor count, using default if not provided."""
if provided_count and provided_count > 0:
return provided_count
return DEFAULT_TRANSISTOR_COUNTS.get(block_type, 10000)
# === Prediction Functions ===
def predict_complexity_and_hours(req: BlockEstimateRequest):
"""Predict complexity class and estimated hours for a new block."""
tc = get_transistor_count(req.block_type, req.transistor_count)
tc_log = np.log1p(tc)
tech_enc = safe_encode(tech_node_encoder, req.tech_node)
type_enc = safe_encode(block_type_encoder, req.block_type)
priority_enc = safe_priority_encode(req.priority)
priority_num = PRIORITY_MAP.get(req.priority, 3)
type_node_int = tech_enc * 10 + type_enc
complexity_score = req.constraint_complexity * tc_log
size_priority_int = tc_log * priority_num
# Hours estimation features
hours_features = np.array([[
tech_enc, type_enc, priority_enc, tc, tc_log,
int(req.has_dependencies), req.num_dependencies,
req.constraint_complexity, req.drc_iterations,
req.engineer_skill_factor, type_node_int,
complexity_score, size_priority_int
]])
estimated_hours = float(hours_model.predict(hours_features)[0])
estimated_hours = max(4.0, round(estimated_hours, 1))
# Complexity classification features
complexity_features = np.array([[
tech_enc, type_enc, priority_enc, tc, tc_log,
int(req.has_dependencies), req.num_dependencies,
req.constraint_complexity, req.drc_iterations,
type_node_int, complexity_score, size_priority_int
]])
xgb_proba = complexity_xgb.predict_proba(complexity_features)[0]
lgb_proba = complexity_lgb.predict_proba(complexity_features)[0]
ensemble_proba = (xgb_proba + lgb_proba) / 2
complexity_idx = int(np.argmax(ensemble_proba))
complexity_label = complexity_encoder.classes_[complexity_idx]
confidence = float(ensemble_proba[complexity_idx])
# Generate reasoning
reasoning = generate_reasoning(req, complexity_label, estimated_hours, tc)
# Risk assessment
risk_level = 'low' if complexity_label == 'Low' else ('medium' if complexity_label == 'Medium' else 'high')
# Suggested skill level
skill_needed = 'senior' if complexity_label == 'High' else ('mid' if complexity_label == 'Medium' else 'junior')
return {
'complexity': complexity_label,
'estimated_hours': estimated_hours,
'confidence': round(confidence, 3),
'risk_level': risk_level,
'reasoning': reasoning,
'recommended_drc_iterations': max(req.drc_iterations, 2 if complexity_label == 'High' else 1),
'suggested_engineer_skill_level': skill_needed,
'complexity_probabilities': {
cls: round(float(p), 3)
for cls, p in zip(complexity_encoder.classes_, ensemble_proba)
},
'estimated_days': round(estimated_hours / 8, 1),
'model_version': '1.0.0',
}
def generate_reasoning(req, complexity, hours, tc):
"""Generate human-readable reasoning for the estimate."""
parts = []
if complexity == 'High':
if req.tech_node in ['5nm', '7nm', '12nm']:
parts.append(f"Advanced {req.tech_node} node requires extensive DRC/LVS iterations with tight design rules")
if tc > 50000:
parts.append(f"Large transistor count (~{tc:,}) significantly increases layout complexity and verification time")
if req.block_type in ['PLL', 'SerDes', 'ADC', 'PA']:
parts.append(f"{req.block_type} blocks require precision analog matching and careful signal routing")
if req.has_dependencies:
parts.append(f"Inter-block dependencies ({req.num_dependencies}) add integration and timing closure overhead")
if req.constraint_complexity > 2.0:
parts.append(f"High constraint complexity ({req.constraint_complexity:.1f}/3.0) demands extensive floor planning")
elif complexity == 'Medium':
parts.append(f"{req.block_type} at {req.tech_node} presents moderate layout challenges")
if req.constraint_complexity > 1.5:
parts.append("Analog constraints require careful floor planning and routing")
if tc > 20000:
parts.append(f"Moderate transistor count (~{tc:,}) requires systematic verification")
else:
parts.append(f"{req.block_type} at {req.tech_node} is a well-characterized block with established layout patterns")
if tc < 10000:
parts.append("Small transistor count allows straightforward layout")
if not parts:
parts.append(f"Standard {req.block_type} layout at {req.tech_node} technology node")
parts.append(f"Estimated {hours:.0f} hours ({hours/8:.1f} working days) for completion")
return "; ".join(parts) + "."
def predict_bottleneck_risk(req: BottleneckRequest):
"""Predict bottleneck risk for a block."""
tc = get_transistor_count(req.block_type, req.transistor_count)
tc_log = np.log1p(tc)
tech_enc = safe_encode(tech_node_encoder, req.tech_node)
type_enc = safe_encode(block_type_encoder, req.block_type)
priority_enc = safe_priority_encode(req.priority)
complexity_score = req.constraint_complexity * tc_log
hours_ratio = req.hours_logged / max(req.estimated_hours, 1)
stage_idx = STAGE_IDX.get(req.current_stage, 1)
hours_budget_pct = req.hours_logged / max(req.estimated_hours, 1) * 100
stage_velocity = req.hours_logged / max(stage_idx, 1)
features = np.array([[
tech_enc, type_enc, priority_enc, tc_log,
int(req.has_dependencies), req.num_dependencies,
req.constraint_complexity, req.estimated_hours, req.hours_logged,
req.drc_iterations, req.drc_violations_total,
req.lvs_mismatches_total, stage_idx,
req.engineer_skill_factor, complexity_score,
hours_budget_pct, stage_velocity
]])
risk_idx = bottleneck_model.predict(features)[0]
risk_proba = bottleneck_model.predict_proba(features)[0]
risk_label = bottleneck_encoder.classes_[risk_idx]
# Generate actionable recommendations
recommendations = []
if risk_label == 'High':
if hours_ratio > 1.3:
recommendations.append("Block is significantly over budget — consider reassignment or scope review")
if req.days_in_current_stage > 5:
recommendations.append(f"Block stuck in {req.current_stage} for {req.days_in_current_stage:.0f} days — escalate to manager")
if req.drc_violations_total > 10:
recommendations.append(f"High DRC violations ({req.drc_violations_total}) — review design rule compliance")
if req.is_overdue:
recommendations.append("Block is past due date — prioritize or adjust timeline")
elif risk_label == 'Medium':
if hours_ratio > 1.0:
recommendations.append("Hours approaching estimate — monitor closely")
if req.days_in_current_stage > 3:
recommendations.append(f"Consider checking progress — {req.days_in_current_stage:.0f} days in {req.current_stage}")
return {
'risk_level': risk_label,
'confidence': round(float(risk_proba[risk_idx]), 3),
'risk_probabilities': {
cls: round(float(p), 3)
for cls, p in zip(bottleneck_encoder.classes_, risk_proba)
},
'hours_over_budget_ratio': round(hours_ratio, 2),
'recommendations': recommendations if recommendations else ['Block progressing normally'],
'should_alert': risk_label == 'High',
'model_version': '1.0.0',
}
def predict_completion_time(req: CompletionRequest):
"""Predict remaining hours to completion."""
tc = get_transistor_count(req.block_type, req.transistor_count)
tc_log = np.log1p(tc)
tech_enc = safe_encode(tech_node_encoder, req.tech_node)
type_enc = safe_encode(block_type_encoder, req.block_type)
priority_num = PRIORITY_MAP.get(req.priority, 3)
stage_idx = STAGE_IDX.get(req.current_stage, 1)
stages_completed = stage_idx
hours_ratio = req.cumulative_hours / max(req.estimated_hours, 1)
avg_hours_per_stage = req.cumulative_hours / max(stages_completed, 1)
avg_days_per_stage = req.cumulative_days / max(stages_completed, 1)
features = np.array([[
tech_enc, type_enc, priority_num, tc_log,
int(req.has_dependencies), req.num_dependencies,
req.constraint_complexity, req.estimated_hours,
req.engineer_skill_factor, req.drc_iterations,
stage_idx, req.cumulative_hours, req.cumulative_days,
req.cumulative_drc_violations, req.cumulative_lvs_mismatches,
hours_ratio, stages_completed, avg_hours_per_stage, avg_days_per_stage
]])
remaining_hours = float(completion_model.predict(features)[0])
remaining_hours = max(0, round(remaining_hours, 1))
remaining_days = remaining_hours / 8
# Project completion date
now = datetime.now()
estimated_completion = now + timedelta(days=remaining_days)
return {
'remaining_hours': remaining_hours,
'remaining_days': round(remaining_days, 1),
'estimated_completion_date': estimated_completion.strftime('%Y-%m-%d'),
'total_estimated_hours': round(req.cumulative_hours + remaining_hours, 1),
'progress_percent': round(req.cumulative_hours / max(req.cumulative_hours + remaining_hours, 1) * 100, 1),
'current_stage': req.current_stage,
'model_version': '1.0.0',
}
# === FastAPI App ===
try:
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
app = FastAPI(
title="ALWAS ML API",
description="Machine Learning models for the Analog Layout Workflow Automation System",
version="1.0.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
def root():
return {
"service": "ALWAS ML API",
"version": "1.0.0",
"models": {
"hours_estimation": model_metrics.get('hours_estimation', {}),
"complexity_classification": model_metrics.get('complexity_classification', {}),
"bottleneck_prediction": model_metrics.get('bottleneck_prediction', {}),
"completion_prediction": model_metrics.get('completion_prediction', {}),
},
"endpoints": [
"/predict/estimate",
"/predict/bottleneck",
"/predict/completion",
"/predict/bulk-estimate",
"/health",
]
}
@app.get("/health")
def health():
return {"status": "healthy", "models_loaded": 5, "timestamp": datetime.now().isoformat()}
@app.post("/predict/estimate")
def estimate_block(req: BlockEstimateRequest):
"""Estimate complexity and hours for a new block. Direct replacement for Groq AI estimation."""
try:
return predict_complexity_and_hours(req)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/predict/bottleneck")
def predict_bottleneck(req: BottleneckRequest):
"""Predict bottleneck risk for an in-progress block."""
try:
return predict_bottleneck_risk(req)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/predict/completion")
def predict_completion(req: CompletionRequest):
"""Predict remaining time to completion."""
try:
return predict_completion_time(req)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/predict/bulk-estimate")
def bulk_estimate(req: BulkBlockRequest):
"""Bulk estimation for multiple blocks at once."""
try:
results = [predict_complexity_and_hours(block) for block in req.blocks]
return {
"count": len(results),
"estimates": results,
"total_estimated_hours": round(sum(r['estimated_hours'] for r in results), 1),
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/model/metrics")
def get_metrics():
"""Get model performance metrics."""
return model_metrics
@app.get("/model/supported-values")
def get_supported_values():
"""Get list of supported block types, tech nodes, etc."""
return {
"tech_nodes": feature_config['tech_nodes'],
"block_types": feature_config['block_types'],
"priorities": feature_config['priorities'],
"stages": STAGES,
"complexity_classes": feature_config['complexity_classes'],
"bottleneck_classes": feature_config['bottleneck_classes'],
}
HAS_FASTAPI = True
except ImportError:
HAS_FASTAPI = False
print("FastAPI not installed — running in library mode only")
# === Standalone Test ===
if __name__ == '__main__':
print("=" * 60)
print("ALWAS ML Inference Server — Test Mode")
print("=" * 60)
# Test 1: Complexity & Hours estimation
print("\n--- Test: PLL at 7nm ---")
result = predict_complexity_and_hours(BlockEstimateRequest(
block_type='PLL', tech_node='7nm', priority='P1-Critical',
transistor_count=80000, has_dependencies=True, num_dependencies=3,
constraint_complexity=2.5, drc_iterations=4, engineer_skill_factor=0.8
))
print(json.dumps(result, indent=2))
# Test 2: Simple block
print("\n--- Test: CurrentMirror at 45nm ---")
result = predict_complexity_and_hours(BlockEstimateRequest(
block_type='CurrentMirror', tech_node='45nm', priority='P4-Low',
transistor_count=1500, constraint_complexity=0.5
))
print(json.dumps(result, indent=2))
# Test 3: Bottleneck risk
print("\n--- Test: Bottleneck Risk ---")
result = predict_bottleneck_risk(BottleneckRequest(
block_type='ADC', tech_node='7nm', priority='P1-Critical',
estimated_hours=60, hours_logged=80,
drc_violations_total=15, current_stage='DRC',
days_in_current_stage=7, is_overdue=True
))
print(json.dumps(result, indent=2))
# Test 4: Completion time
print("\n--- Test: Completion Prediction ---")
result = predict_completion_time(CompletionRequest(
block_type='DAC', tech_node='12nm', priority='P2-High',
estimated_hours=40, current_stage='LVS',
cumulative_hours=25, cumulative_days=4,
cumulative_drc_violations=5
))
print(json.dumps(result, indent=2))
# Start server if FastAPI available
if HAS_FASTAPI:
print(f"\n{'=' * 60}")
print("Starting ALWAS ML API server on http://0.0.0.0:7860")
print("=" * 60)
uvicorn.run(app, host="0.0.0.0", port=7860)