| from src.dev_pilot.state.sdlc_state import SDLCState |
| from src.dev_pilot.cache.redis_cache import flush_redis_cache, save_state_to_redis, get_state_from_redis |
| import uuid |
| import src.dev_pilot.utils.constants as const |
| from loguru import logger |
|
|
| class GraphExecutor: |
| def __init__(self, graph): |
| self.graph = graph |
|
|
| def get_thread(self, task_id): |
| return {"configurable": {"thread_id": task_id}} |
| |
| |
| def start_workflow(self, project_name: str): |
| |
| graph = self.graph |
| |
| flush_redis_cache() |
| |
| |
| task_id = f"sdlc-session-{uuid.uuid4().hex[:8]}" |
| |
| thread = self.get_thread(task_id) |
| |
| state = None |
| for event in graph.stream({"project_name": project_name},thread, stream_mode="values"): |
| state = event |
| |
| current_state = graph.get_state(thread) |
| save_state_to_redis(task_id, current_state) |
| |
| return {"task_id" : task_id, "state": state} |
| |
| |
| def generate_stories(self, task_id:str, requirements: list[str]): |
| saved_state = get_state_from_redis(task_id) |
| if saved_state: |
| saved_state['requirements'] = requirements |
| saved_state['next_node'] = const.REVIEW_USER_STORIES |
| |
| return self.update_and_resume_graph(saved_state,task_id,"get_user_requirements") |
|
|
|
|
| |
| def graph_review_flow(self, task_id, status, feedback, review_type): |
| saved_state = get_state_from_redis(task_id) |
| |
| if saved_state: |
| if review_type == const.REVIEW_USER_STORIES: |
| saved_state['user_stories_review_status'] = status |
| saved_state['user_stories_feedback'] = feedback |
| node_name = "review_user_stories" |
| saved_state['next_node'] = const.REVIEW_USER_STORIES if status == "feedback" else const.REVIEW_DESIGN_DOCUMENTS |
| |
| elif review_type == const.REVIEW_DESIGN_DOCUMENTS: |
| saved_state['design_documents_review_status'] = status |
| saved_state['design_documents_feedback'] = feedback |
| node_name = "review_design_documents" |
| saved_state['next_node'] = const.REVIEW_DESIGN_DOCUMENTS if status == "feedback" else const.REVIEW_CODE |
| |
| elif review_type == const.REVIEW_CODE: |
| saved_state['code_review_status'] = status |
| saved_state['code_review_feedback'] = feedback |
| node_name = "code_review" |
| saved_state['next_node'] = const.REVIEW_CODE if status == "feedback" else const.REVIEW_SECURITY_RECOMMENDATIONS |
| |
| elif review_type == const.REVIEW_SECURITY_RECOMMENDATIONS: |
| saved_state['security_review_status'] = status |
| saved_state['security_review_comments'] = feedback |
| node_name = "security_review" |
| saved_state['next_node'] = const.REVIEW_SECURITY_RECOMMENDATIONS if status == "feedback" else const.REVIEW_TEST_CASES |
| |
| elif review_type == const.REVIEW_TEST_CASES: |
| saved_state['test_case_review_status'] = status |
| saved_state['test_case_review_feedback'] = feedback |
| node_name = "review_test_cases" |
| saved_state['next_node'] = const.REVIEW_TEST_CASES if status == "feedback" else const.REVIEW_QA_TESTING |
| |
| elif review_type == const.REVIEW_QA_TESTING: |
| saved_state['qa_testing_status'] = status |
| saved_state['qa_testing_feedback'] = feedback |
| node_name = "qa_review" |
| saved_state['next_node'] = const.REVIEW_QA_TESTING if status == "feedback" else const.END_NODE |
| |
| else: |
| raise ValueError(f"Unsupported review type: {review_type}") |
| |
| return self.update_and_resume_graph(saved_state,task_id,node_name) |
| |
| |
| def update_and_resume_graph(self, saved_state,task_id, as_node): |
| graph = self.graph |
| thread = self.get_thread(task_id) |
| |
| graph.update_state(thread, saved_state, as_node=as_node) |
| |
| |
| state = None |
| for event in graph.stream(None, thread, stream_mode="values"): |
| logger.debug(f"Event Received: {event}") |
| state = event |
| |
| |
| current_state = graph.get_state(thread) |
| save_state_to_redis(task_id, current_state) |
| |
| return {"task_id" : task_id, "state": state} |
|
|
|
|
| def get_updated_state(self, task_id): |
| saved_state = get_state_from_redis(task_id) |
| return {"task_id" : task_id, "state": saved_state} |
| |
|
|