| """ |
| LangGraph Workflow Orchestration |
| Design System Extractor v2 |
| |
| Defines the main workflow graph with agents, checkpoints, and transitions. |
| """ |
|
|
| from typing import Literal |
| from datetime import datetime |
| from langgraph.graph import StateGraph, END |
| from langgraph.checkpoint.memory import MemorySaver |
|
|
| from agents.state import AgentState, create_initial_state, get_stage_progress |
| from core.token_schema import Viewport |
|
|
|
|
| |
| |
| |
|
|
| async def discover_pages(state: AgentState) -> AgentState: |
| """ |
| Agent 1 - Part 1: Discover pages from base URL. |
| |
| This node: |
| 1. Takes the base URL |
| 2. Crawls to find linked pages |
| 3. Classifies page types (homepage, listing, detail, etc.) |
| 4. Returns discovered pages for user confirmation |
| """ |
| from agents.crawler import PageDiscoverer |
| |
| state["current_stage"] = "discover" |
| state["stage_started_at"] = datetime.now() |
| |
| try: |
| discoverer = PageDiscoverer() |
| pages = await discoverer.discover(state["base_url"]) |
| |
| state["discovered_pages"] = pages |
| state["awaiting_human_input"] = True |
| state["checkpoint_name"] = "confirm_pages" |
| |
| except Exception as e: |
| state["errors"].append(f"Discovery failed: {str(e)}") |
| |
| return state |
|
|
|
|
| async def extract_tokens_desktop(state: AgentState) -> AgentState: |
| """ |
| Agent 1 - Part 2a: Extract tokens from desktop viewport. |
| """ |
| from agents.extractor import TokenExtractor |
| |
| state["current_stage"] = "extract" |
| |
| try: |
| extractor = TokenExtractor(viewport=Viewport.DESKTOP) |
| result = await extractor.extract( |
| pages=state["pages_to_crawl"], |
| progress_callback=lambda p: state.update({"desktop_crawl_progress": p}) |
| ) |
| |
| state["desktop_extraction"] = result |
| |
| except Exception as e: |
| state["errors"].append(f"Desktop extraction failed: {str(e)}") |
| |
| return state |
|
|
|
|
| async def extract_tokens_mobile(state: AgentState) -> AgentState: |
| """ |
| Agent 1 - Part 2b: Extract tokens from mobile viewport. |
| """ |
| from agents.extractor import TokenExtractor |
| |
| try: |
| extractor = TokenExtractor(viewport=Viewport.MOBILE) |
| result = await extractor.extract( |
| pages=state["pages_to_crawl"], |
| progress_callback=lambda p: state.update({"mobile_crawl_progress": p}) |
| ) |
| |
| state["mobile_extraction"] = result |
| |
| except Exception as e: |
| state["errors"].append(f"Mobile extraction failed: {str(e)}") |
| |
| return state |
|
|
|
|
| async def normalize_tokens(state: AgentState) -> AgentState: |
| """ |
| Agent 2: Normalize and structure extracted tokens. |
| """ |
| from agents.normalizer import TokenNormalizer |
| |
| state["current_stage"] = "normalize" |
| state["stage_started_at"] = datetime.now() |
| |
| try: |
| normalizer = TokenNormalizer() |
| |
| if state["desktop_extraction"]: |
| state["desktop_normalized"] = normalizer.normalize(state["desktop_extraction"]) |
| |
| if state["mobile_extraction"]: |
| state["mobile_normalized"] = normalizer.normalize(state["mobile_extraction"]) |
| |
| |
| state["awaiting_human_input"] = True |
| state["checkpoint_name"] = "review_tokens" |
| |
| except Exception as e: |
| state["errors"].append(f"Normalization failed: {str(e)}") |
| |
| return state |
|
|
|
|
| async def generate_recommendations(state: AgentState) -> AgentState: |
| """ |
| Agent 3: Generate upgrade recommendations. |
| """ |
| from agents.advisor import DesignSystemAdvisor |
| |
| state["current_stage"] = "advise" |
| state["stage_started_at"] = datetime.now() |
| |
| try: |
| advisor = DesignSystemAdvisor() |
| recommendations = await advisor.analyze_and_recommend( |
| desktop=state["desktop_normalized"], |
| mobile=state["mobile_normalized"], |
| ) |
| |
| state["upgrade_recommendations"] = recommendations |
| |
| |
| state["awaiting_human_input"] = True |
| state["checkpoint_name"] = "select_upgrades" |
| |
| except Exception as e: |
| state["errors"].append(f"Recommendation generation failed: {str(e)}") |
| |
| return state |
|
|
|
|
| async def generate_final_tokens(state: AgentState) -> AgentState: |
| """ |
| Agent 4: Generate final token JSON. |
| """ |
| from agents.generator import TokenGenerator |
| |
| state["current_stage"] = "generate" |
| state["stage_started_at"] = datetime.now() |
| |
| try: |
| generator = TokenGenerator() |
| |
| |
| selections = { |
| "type_scale": state["selected_type_scale"], |
| "spacing_system": state["selected_spacing_system"], |
| "naming_convention": state["selected_naming_convention"], |
| "color_ramps": state["selected_color_ramps"], |
| "a11y_fixes": state["selected_a11y_fixes"], |
| } |
| |
| if state["desktop_normalized"]: |
| state["desktop_final"] = generator.generate( |
| normalized=state["desktop_normalized"], |
| selections=selections, |
| version=state["version_label"], |
| ) |
| |
| if state["mobile_normalized"]: |
| state["mobile_final"] = generator.generate( |
| normalized=state["mobile_normalized"], |
| selections=selections, |
| version=state["version_label"], |
| ) |
| |
| |
| state["awaiting_human_input"] = True |
| state["checkpoint_name"] = "approve_export" |
| |
| except Exception as e: |
| state["errors"].append(f"Token generation failed: {str(e)}") |
| |
| return state |
|
|
|
|
| async def complete_workflow(state: AgentState) -> AgentState: |
| """ |
| Final node: Mark workflow as complete. |
| """ |
| state["current_stage"] = "export" |
| state["awaiting_human_input"] = False |
| state["checkpoint_name"] = None |
| |
| return state |
|
|
|
|
| |
| |
| |
|
|
| def handle_page_confirmation(state: AgentState, confirmed_pages: list[str]) -> AgentState: |
| """Handle human confirmation of pages to crawl.""" |
| state["pages_to_crawl"] = confirmed_pages |
| state["awaiting_human_input"] = False |
| state["checkpoint_name"] = None |
| return state |
|
|
|
|
| def handle_token_review( |
| state: AgentState, |
| color_decisions: dict[str, bool], |
| typography_decisions: dict[str, bool], |
| spacing_decisions: dict[str, bool], |
| ) -> AgentState: |
| """Handle human review of extracted tokens.""" |
| state["accepted_colors"] = [k for k, v in color_decisions.items() if v] |
| state["rejected_colors"] = [k for k, v in color_decisions.items() if not v] |
| state["accepted_typography"] = [k for k, v in typography_decisions.items() if v] |
| state["rejected_typography"] = [k for k, v in typography_decisions.items() if not v] |
| state["accepted_spacing"] = [k for k, v in spacing_decisions.items() if v] |
| state["rejected_spacing"] = [k for k, v in spacing_decisions.items() if not v] |
| |
| state["awaiting_human_input"] = False |
| state["checkpoint_name"] = None |
| return state |
|
|
|
|
| def handle_upgrade_selection( |
| state: AgentState, |
| type_scale: str | None, |
| spacing_system: str | None, |
| naming_convention: str | None, |
| color_ramps: dict[str, bool], |
| a11y_fixes: list[str], |
| ) -> AgentState: |
| """Handle human selection of upgrade options.""" |
| state["selected_type_scale"] = type_scale |
| state["selected_spacing_system"] = spacing_system |
| state["selected_naming_convention"] = naming_convention |
| state["selected_color_ramps"] = color_ramps |
| state["selected_a11y_fixes"] = a11y_fixes |
| |
| state["awaiting_human_input"] = False |
| state["checkpoint_name"] = None |
| return state |
|
|
|
|
| def handle_export_approval(state: AgentState, version_label: str) -> AgentState: |
| """Handle human approval of final export.""" |
| state["version_label"] = version_label |
| state["awaiting_human_input"] = False |
| state["checkpoint_name"] = None |
| return state |
|
|
|
|
| |
| |
| |
|
|
| def route_after_discovery(state: AgentState) -> Literal["wait_for_pages", "extract"]: |
| """Route after discovery: wait for human or continue.""" |
| if state["awaiting_human_input"]: |
| return "wait_for_pages" |
| return "extract" |
|
|
|
|
| def route_after_extraction(state: AgentState) -> Literal["normalize", "error"]: |
| """Route after extraction: normalize or handle error.""" |
| if state["desktop_extraction"] is None and state["mobile_extraction"] is None: |
| return "error" |
| return "normalize" |
|
|
|
|
| def route_after_normalization(state: AgentState) -> Literal["wait_for_review", "advise"]: |
| """Route after normalization: wait for review or continue.""" |
| if state["awaiting_human_input"]: |
| return "wait_for_review" |
| return "advise" |
|
|
|
|
| def route_after_recommendations(state: AgentState) -> Literal["wait_for_selection", "generate"]: |
| """Route after recommendations: wait for selection or continue.""" |
| if state["awaiting_human_input"]: |
| return "wait_for_selection" |
| return "generate" |
|
|
|
|
| def route_after_generation(state: AgentState) -> Literal["wait_for_approval", "complete"]: |
| """Route after generation: wait for approval or complete.""" |
| if state["awaiting_human_input"]: |
| return "wait_for_approval" |
| return "complete" |
|
|
|
|
| |
| |
| |
|
|
| def build_workflow_graph() -> StateGraph: |
| """ |
| Build the main LangGraph workflow. |
| |
| Flow: |
| 1. discover_pages -> [human confirms pages] |
| 2. extract_desktop + extract_mobile (parallel) |
| 3. normalize_tokens -> [human reviews tokens] |
| 4. generate_recommendations -> [human selects upgrades] |
| 5. generate_final_tokens -> [human approves export] |
| 6. complete |
| """ |
| |
| |
| workflow = StateGraph(AgentState) |
| |
| |
| |
| |
| |
| |
| workflow.add_node("discover", discover_pages) |
| |
| |
| workflow.add_node("extract_desktop", extract_tokens_desktop) |
| workflow.add_node("extract_mobile", extract_tokens_mobile) |
| |
| |
| workflow.add_node("normalize", normalize_tokens) |
| |
| |
| workflow.add_node("advise", generate_recommendations) |
| |
| |
| workflow.add_node("generate", generate_final_tokens) |
| |
| |
| workflow.add_node("complete", complete_workflow) |
| |
| |
| workflow.add_node("wait_for_pages", lambda s: s) |
| workflow.add_node("wait_for_review", lambda s: s) |
| workflow.add_node("wait_for_selection", lambda s: s) |
| workflow.add_node("wait_for_approval", lambda s: s) |
| |
| |
| |
| |
| |
| |
| workflow.set_entry_point("discover") |
| |
| |
| workflow.add_conditional_edges( |
| "discover", |
| route_after_discovery, |
| { |
| "wait_for_pages": "wait_for_pages", |
| "extract": "extract_desktop", |
| } |
| ) |
| |
| |
| workflow.add_edge("wait_for_pages", "extract_desktop") |
| |
| |
| workflow.add_edge("extract_desktop", "extract_mobile") |
| |
| |
| workflow.add_conditional_edges( |
| "extract_mobile", |
| route_after_extraction, |
| { |
| "normalize": "normalize", |
| "error": END, |
| } |
| ) |
| |
| |
| workflow.add_conditional_edges( |
| "normalize", |
| route_after_normalization, |
| { |
| "wait_for_review": "wait_for_review", |
| "advise": "advise", |
| } |
| ) |
| |
| |
| workflow.add_edge("wait_for_review", "advise") |
| |
| |
| workflow.add_conditional_edges( |
| "advise", |
| route_after_recommendations, |
| { |
| "wait_for_selection": "wait_for_selection", |
| "generate": "generate", |
| } |
| ) |
| |
| |
| workflow.add_edge("wait_for_selection", "generate") |
| |
| |
| workflow.add_conditional_edges( |
| "generate", |
| route_after_generation, |
| { |
| "wait_for_approval": "wait_for_approval", |
| "complete": "complete", |
| } |
| ) |
| |
| |
| workflow.add_edge("wait_for_approval", "complete") |
| |
| |
| workflow.add_edge("complete", END) |
| |
| return workflow |
|
|
|
|
| |
| |
| |
|
|
| class WorkflowRunner: |
| """ |
| Manages workflow execution with human-in-the-loop support. |
| """ |
| |
| def __init__(self): |
| self.graph = build_workflow_graph() |
| self.checkpointer = MemorySaver() |
| self.app = self.graph.compile(checkpointer=self.checkpointer) |
| self.current_state: AgentState | None = None |
| self.thread_id: str | None = None |
| |
| async def start(self, base_url: str, thread_id: str | None = None) -> AgentState: |
| """Start a new workflow.""" |
| self.thread_id = thread_id or f"workflow_{datetime.now().timestamp()}" |
| self.current_state = create_initial_state(base_url) |
| |
| config = {"configurable": {"thread_id": self.thread_id}} |
| |
| |
| async for event in self.app.astream(self.current_state, config): |
| self.current_state = event |
| if self.current_state.get("awaiting_human_input"): |
| break |
| |
| return self.current_state |
| |
| async def resume(self, human_input: dict) -> AgentState: |
| """Resume workflow after human input.""" |
| if not self.current_state or not self.thread_id: |
| raise ValueError("No active workflow to resume") |
| |
| checkpoint = self.current_state.get("checkpoint_name") |
| |
| |
| if checkpoint == "confirm_pages": |
| self.current_state = handle_page_confirmation( |
| self.current_state, |
| human_input.get("confirmed_pages", []) |
| ) |
| elif checkpoint == "review_tokens": |
| self.current_state = handle_token_review( |
| self.current_state, |
| human_input.get("color_decisions", {}), |
| human_input.get("typography_decisions", {}), |
| human_input.get("spacing_decisions", {}), |
| ) |
| elif checkpoint == "select_upgrades": |
| self.current_state = handle_upgrade_selection( |
| self.current_state, |
| human_input.get("type_scale"), |
| human_input.get("spacing_system"), |
| human_input.get("naming_convention"), |
| human_input.get("color_ramps", {}), |
| human_input.get("a11y_fixes", []), |
| ) |
| elif checkpoint == "approve_export": |
| self.current_state = handle_export_approval( |
| self.current_state, |
| human_input.get("version_label", "v1") |
| ) |
| |
| config = {"configurable": {"thread_id": self.thread_id}} |
| |
| |
| async for event in self.app.astream(self.current_state, config): |
| self.current_state = event |
| if self.current_state.get("awaiting_human_input"): |
| break |
| |
| return self.current_state |
| |
| def get_progress(self) -> dict: |
| """Get current workflow progress.""" |
| if not self.current_state: |
| return {"status": "not_started"} |
| return get_stage_progress(self.current_state) |
| |
| def get_state(self) -> AgentState | None: |
| """Get current state.""" |
| return self.current_state |
|
|
|
|
| |
| |
| |
|
|
| def create_workflow() -> WorkflowRunner: |
| """Create a new workflow runner instance.""" |
| return WorkflowRunner() |
|
|
|
|
| async def run_discovery_only(base_url: str) -> list: |
| """Run only the discovery phase (for testing).""" |
| from agents.crawler import PageDiscoverer |
| |
| discoverer = PageDiscoverer() |
| return await discoverer.discover(base_url) |
|
|
|
|
| async def run_extraction_only(pages: list[str], viewport: Viewport) -> dict: |
| """Run only the extraction phase (for testing).""" |
| from agents.extractor import TokenExtractor |
| |
| extractor = TokenExtractor(viewport=viewport) |
| return await extractor.extract(pages) |
|
|