| from autogen import GroupChatManager |
| import json |
| import re |
| import networkx as nx |
|
|
| from agents import create_agents |
| from agents import is_termination_msg, gpt4_config |
| from corrector_agents import get_corrector_agents |
| from refiner_agents import get_refiner_agents |
|
|
| from chats import GroupChat, ChatWithEngineer, LayoutCorrectorGroupChat, ObjectDeletionGroupChat, LayoutRefinerGroupChat |
|
|
| from utils import get_room_priors, extract_list_from_json |
| from utils import preprocess_scene_graph, build_graph, remove_unnecessary_edges, handle_under_prepositions, get_conflicts, get_size_conflicts, get_object_from_scene_graph |
| from utils import get_object_from_scene_graph, get_rotation, get_cluster_objects, clean_and_extract_edges |
| from utils import get_cluster_size |
| from utils import get_possible_positions, is_point_bbox, calculate_overlap, get_topological_ordering, place_object, get_depth, get_visualization |
|
|
| class Designer: |
| def __init__(self, no_of_objects, user_input, room_dimensions): |
| self.no_of_objects = no_of_objects |
| self.user_input = user_input |
| self.room_dimensions = room_dimensions |
| self.room_priors = get_room_priors(self.room_dimensions) |
| self.scene_graph = None |
|
|
| def create_initial_design(self): |
| user_proxy, json_schema_debugger, interior_designer, interior_architect, engineer = create_agents(self.no_of_objects) |
| |
| groupchat = GroupChat( |
| agents=[user_proxy, interior_designer, interior_architect], |
| messages=[], |
| max_round=3 |
| ) |
|
|
| chat_with_engineer = ChatWithEngineer( |
| agents =[user_proxy, engineer, json_schema_debugger], |
| messages=[], |
| max_round=15 |
| ) |
|
|
| manager = GroupChatManager(groupchat=groupchat, llm_config=gpt4_config, is_termination_msg=is_termination_msg) |
| user_proxy.initiate_chat( |
| manager, |
| message=f""" |
| The room has the size {self.room_dimensions[0]}m x {self.room_dimensions[1]}m x {self.room_dimensions[2]}m |
| User Preference (in triple backquotes): |
| ``` |
| {self.user_input} |
| ``` |
| Room layout elements in the room (in triple backquotes): |
| ``` |
| ['south_wall', 'north_wall', 'west_wall', 'east_wall', 'middle of the room', 'ceiling'] |
| ``` |
| json |
| """, |
| ) |
|
|
|
|
| designer_response = json.loads(groupchat.messages[-2]["content"]) |
| architect_response = json.loads(groupchat.messages[-1]["content"]) |
|
|
| blocks_designer, blocks_architect = extract_list_from_json(designer_response), extract_list_from_json(architect_response) |
| if len(blocks_designer) != len(blocks_architect): |
| print("Lengths: ", len(blocks_designer), len(blocks_architect)) |
| raise ValueError("The number of blocks from the designer and architect should be the same! Please generate again.") |
| |
| json_data = None |
|
|
| for d_block, a_block in zip(blocks_designer, blocks_architect): |
| engineer.reset(), json_schema_debugger.reset() |
| prompt = str(d_block) + "\n" + str(a_block) |
|
|
| object_ids = [item["new_object_id"] for item in json_data["objects_in_room"]] if json_data is not None else [] |
|
|
| manager = GroupChatManager(groupchat=chat_with_engineer, |
| llm_config=gpt4_config, |
| human_input_mode="NEVER", |
| is_termination_msg=is_termination_msg) |
| user_proxy.initiate_chat( |
| manager, |
| message=f""" |
| Room layout elements in the room (in triple backquotes): |
| ``` |
| ['south_wall', 'north_wall', 'west_wall', 'east_wall', 'middle of the floor', 'ceiling'] |
| ``` |
| Array of objects in the room (in triple backquotes): |
| ``` |
| {object_ids} |
| ``` |
| Objects to be placed in the room (in triple backquotes): |
| ``` |
| {prompt} |
| ``` |
| json |
| """, |
| ) |
| if json_data is None: |
| json_data = json.loads(chat_with_engineer.messages[-2]["content"]) |
| else: |
| json_data["objects_in_room"] += json.loads(chat_with_engineer.messages[-2]["content"])["objects_in_room"] |
| |
| self.scene_graph = json_data |
|
|
|
|
| def correct_design(self, verbose=False, auto_prune=False): |
| |
| scene_graph = preprocess_scene_graph(self.scene_graph["objects_in_room"]) |
| G = build_graph(scene_graph) |
| G = remove_unnecessary_edges(G) |
| G, scene_graph = handle_under_prepositions(G, scene_graph) |
|
|
| conflicts = get_conflicts(G, scene_graph) |
|
|
| if verbose: |
| print("-------------------CONFLICTS-------------------") |
| for conflict in conflicts: |
| print(conflict) |
| print("\n\n") |
|
|
| user_proxy, spatial_corrector_agent, json_schema_debugger, object_deletion_agent = get_corrector_agents() |
|
|
| while len(conflicts) > 0: |
| spatial_corrector_agent.reset(), json_schema_debugger.reset() |
| groupchat = LayoutCorrectorGroupChat( |
| agents =[user_proxy, spatial_corrector_agent, json_schema_debugger], |
| messages=[], |
| max_round=15 |
| ) |
| manager = GroupChatManager(groupchat=groupchat, llm_config=gpt4_config, is_termination_msg=is_termination_msg) |
| user_proxy.initiate_chat( |
| manager, |
| message=f""" |
| {conflicts[0]} |
| """, |
| ) |
| correction = groupchat.messages[-2] |
| pattern = r'```json\s*([^`]+)\s*```' |
| match = re.search(pattern, correction["content"], re.DOTALL).group(1) |
| correction_json = json.loads(match) |
| corr_obj = get_object_from_scene_graph(correction_json["corrected_object"]["new_object_id"], scene_graph) |
| corr_obj["is_on_the_floor"] = correction_json["corrected_object"]["is_on_the_floor"] |
| corr_obj["facing"] = correction_json["corrected_object"]["facing"] |
| corr_obj["placement"] = correction_json["corrected_object"]["placement"] |
| G = build_graph(scene_graph) |
| conflicts = get_conflicts(G, scene_graph) |
|
|
| if auto_prune: |
| size_conflicts = get_size_conflicts(G, scene_graph, self.user_input, self.room_priors, verbose) |
|
|
| if verbose: |
| print("-------------------SIZE CONFLICTS-------------------") |
| for conflict in size_conflicts: |
| print(conflict) |
| print("\n\n") |
|
|
| while len(size_conflicts) > 0: |
| object_deletion_agent.reset() |
| groupchat = ObjectDeletionGroupChat( |
| agents =[user_proxy, object_deletion_agent], |
| messages=[], |
| max_round=2 |
| ) |
| manager = GroupChatManager(groupchat=groupchat, llm_config=gpt4_config, is_termination_msg=is_termination_msg) |
| user_proxy.initiate_chat( |
| manager, |
| message=f""" |
| {size_conflicts[0]} |
| """, |
| ) |
| correction = groupchat.messages[-1] |
| correction_json = json.loads(correction["content"]) |
| object_to_delete = correction_json["object_to_delete"] |
| descendants = nx.descendants(G, object_to_delete) |
| objs_to_delete = descendants.union({object_to_delete}) |
| print("Objs to Delete: ", objs_to_delete) |
| scene_graph = [x for x in scene_graph if x["new_object_id"] not in objs_to_delete] |
| for obj in objs_to_delete: |
| G.remove_node(obj) |
|
|
| size_conflicts = get_size_conflicts(G, scene_graph, self.user_input, self.room_priors, verbose) |
| self.scene_graph["objects_in_room"] = scene_graph |
|
|
|
|
| def refine_design(self, verbose=False): |
|
|
| cluster_dict = get_cluster_objects(self.scene_graph["objects_in_room"]) |
|
|
|
|
| inputs = [] |
| for key, value in cluster_dict.items(): |
| key = list(key) |
| if len(key[0]) == 2: |
| parent_id = key[0][0][1] |
| prep = key[0][1][1] |
| elif len(key[0]) == 3: |
| parent_id = key[0][1][1] |
| prep = key[0][2][1] |
| objs = value |
|
|
| inputs.append((parent_id, prep, objs)) |
|
|
| if verbose: |
| if inputs == []: |
| print("No clusters found") |
| for parent_id, prep, objs in inputs: |
| print(f"Parent Object : {parent_id}") |
| print(f"Children Objects : {objs}") |
| print(f"The children objects are '{prep}' the parent object") |
| print("\n") |
|
|
|
|
| for parent_id, prep, obj_names in inputs: |
| objs = [get_object_from_scene_graph(obj, self.scene_graph["objects_in_room"]) for obj in obj_names] |
| objs_rot = [get_rotation(obj, self.scene_graph["objects_in_room"]) for obj in objs] |
|
|
| parent_obj = get_object_from_scene_graph(parent_id, self.scene_graph["objects_in_room"]) |
| if parent_obj is None: |
| parent_obj = [prior for prior in self.room_priors if prior.get("new_object_id") == parent_id][0] |
| parent_obj_rot = get_rotation(parent_obj, self.scene_graph["objects_in_room"]) |
|
|
| rot_diffs = [obj_rot - parent_obj_rot for obj_rot in objs_rot] |
| direction_check = lambda diff, prep: (diff % 180 == 0 and prep in ["left of", "right of"]) or (diff % 180 != 0 and prep in ["in front", "behind"]) or (diff % 180 != 0 and prep == "on") |
| possibilities_str = "Constraints:\n" + '\n'.join(["\t" + f"Place objects {'`behind` or `in front`' if direction_check(diff, prep) else '`left of` or `right of`'} of {name}!" for name, diff in zip(obj_names, rot_diffs)]) |
|
|
| user_proxy, layout_refiner, json_schema_debugger = get_refiner_agents() |
|
|
| layout_refiner.reset(), json_schema_debugger.reset() |
| groupchat = LayoutRefinerGroupChat( |
| agents =[user_proxy, layout_refiner, json_schema_debugger], |
| messages=[], |
| max_round=15 |
| ) |
| manager = GroupChatManager(groupchat=groupchat, llm_config=gpt4_config, is_termination_msg=is_termination_msg) |
| user_proxy.initiate_chat( |
| manager, |
| message=f""" |
| Parent Object : {parent_id} |
| Children Objects : {obj_names} |
| |
| {possibilities_str} |
| |
| The children objects are '{prep}' the parent object |
| """, |
| ) |
|
|
| new_relationships = json.loads(groupchat.messages[-2]["content"]) |
| if "items" in new_relationships["children_objects"]: |
| new_relationships = {"children_objects" : new_relationships["children_objects"]["items"]} |
| |
| invalid_name_ids = [] |
| for child in new_relationships["children_objects"]: |
| for other_child in child["placement"]["children_objects"]: |
| other_child_rot = get_rotation(get_object_from_scene_graph(other_child["name_id"], self.scene_graph["objects_in_room"]), self.scene_graph["objects_in_room"]) |
| if direction_check(other_child_rot - parent_obj_rot, prep) and other_child["preposition"] not in ["in front", "behind"]: |
| invalid_name_ids.append(child["name_id"]) |
| elif not direction_check(other_child_rot - parent_obj_rot, prep) and other_child["preposition"] not in ["left of", "right of"]: |
| invalid_name_ids.append(child["name_id"]) |
|
|
| if verbose: |
| print("Invalid name IDs: ", invalid_name_ids) |
| new_relationships["children_objects"] = [child for child in new_relationships["children_objects"] if child["name_id"] not in invalid_name_ids] |
| |
| if len(new_relationships["children_objects"]) == 0: |
| continue |
|
|
| edges, edges_to_flip = clean_and_extract_edges(new_relationships, parent_id, verbose=verbose) |
|
|
| prep_correspondences ={ |
| "left of" : "right of", |
| "right of" : "left of", |
| "in front" : "behind", |
| "behind" : "in front", |
| } |
|
|
|
|
| for obj in new_relationships["children_objects"]: |
| name_id = obj["name_id"] |
| rel = obj["placement"]["children_objects"] |
| for r in rel: |
| if (name_id, r["name_id"]) in edges: |
| to_flip = edges_to_flip[(name_id, r["name_id"])] |
| if to_flip: |
| corr_obj = get_object_from_scene_graph(r["name_id"], self.scene_graph["objects_in_room"]) |
| corr_prep = prep_correspondences[r["preposition"]] |
| corr_obj["placement"]["objects_in_room"].append({"object_id" : name_id, "preposition" : corr_prep, "is_adjacent" : r["is_adjacent"]}) |
| else: |
| corr_obj = get_object_from_scene_graph(name_id, self.scene_graph["objects_in_room"]) |
| corr_obj["placement"]["objects_in_room"].append({"object_id" : r["name_id"], "preposition" : r["preposition"], "is_adjacent" : r["is_adjacent"]}) |
|
|
| def create_object_clusters(self, verbose=False): |
| |
| for obj in self.scene_graph["objects_in_room"]: |
| rot = get_rotation(obj, self.scene_graph["objects_in_room"]) |
| obj["rotation"] = {"z_angle" : rot} |
| |
| ROOM_LAYOUT_ELEMENTS = ["south_wall", "north_wall", "west_wall", "east_wall", "ceiling", "middle of the room"] |
|
|
| G = build_graph(self.scene_graph["objects_in_room"]) |
| nodes = G.nodes() |
|
|
| |
| for node in nodes: |
| if node not in ROOM_LAYOUT_ELEMENTS: |
| cluster_size, children_objs = get_cluster_size(node, G, self.scene_graph["objects_in_room"]) |
| if verbose: |
| print("Node: ", node) |
| print("Cluster size: ", cluster_size) |
| print("Children: ", children_objs) |
| print("\n") |
| node_obj = get_object_from_scene_graph(node, self.scene_graph["objects_in_room"]) |
| cluster_size = {"x_neg" : cluster_size["left of"], "x_pos" : cluster_size["right of"], "y_neg" : cluster_size["behind"], "y_pos" : cluster_size["in front"]} |
| node_obj["cluster"] = {"constraint_area" : cluster_size} |
|
|
| def backtrack(self, verbose=False): |
| self.scene_graph = self.scene_graph["objects_in_room"] + self.room_priors |
| prior_ids = ["south_wall", "north_wall", "east_wall", "west_wall", "ceiling", "middle of the room"] |
| |
| point_bbox = dict.fromkeys([item["new_object_id"] for item in self.scene_graph], False) |
| |
| |
| for item in self.scene_graph: |
| if item["new_object_id"] in prior_ids: |
| continue |
| possible_pos = get_possible_positions(item["new_object_id"], self.scene_graph, self.room_dimensions) |
| |
| overlap = None |
| if len(possible_pos) == 1: |
| overlap = possible_pos[0] |
| elif len(possible_pos) > 1: |
| overlap = possible_pos[0] |
| for pos in possible_pos[1:]: |
| overlap = calculate_overlap(overlap, pos) |
| |
| if overlap is not None and is_point_bbox(overlap) and len(possible_pos) > 0: |
| item["position"] = {"x" : overlap[0], "y" : overlap[2], "z" : overlap[4]} |
| point_bbox[item["new_object_id"]] = True |
| |
| scene_graph_wo_layout = [item for item in self.scene_graph if item["new_object_id"] not in prior_ids] |
| object_ids = [item["new_object_id"] for item in scene_graph_wo_layout] |
| |
| depth_scene_graph = get_depth(scene_graph_wo_layout) |
| max_depth = max(depth_scene_graph.values()) |
| |
| if verbose: |
| print("Max depth: ", max_depth) |
| print("Depth scene graph: ", depth_scene_graph) |
| print("Point BBox: ", [key for key, value in point_bbox.items() if value]) |
| |
| get_visualization(self.scene_graph, self.room_priors) |
| for obj in scene_graph_wo_layout: |
| if "position" in obj.keys(): |
| print(obj["new_object_id"], obj["position"]) |
| |
| topological_order = get_topological_ordering(scene_graph_wo_layout) |
| topological_order = [item for item in topological_order if item not in prior_ids] |
| if verbose: |
| print("Topological order: ", topological_order) |
| |
| d = 1 |
| while d <= max_depth: |
| if verbose: |
| print("Depth: ", d) |
| error_flag = False |
| |
| |
| nodes = [node for node in topological_order if depth_scene_graph[node] == d] |
| if verbose: |
| print(f"Nodes at depth {d}:", nodes) |
| |
| errors = {} |
| for node in nodes: |
| if point_bbox[node]: |
| continue |
| |
| |
| obj = next(item for item in scene_graph_wo_layout if item["new_object_id"] == node) |
| errors = place_object(obj, self.scene_graph, self.room_dimensions, errors={}, verbose=verbose) |
| if verbose: |
| print(f"Errors for {obj['new_object_id']}:", errors) |
|
|
| if errors: |
| if d > 1: |
| d -= 1 |
| if verbose: |
| print("Reducing depth to: ", d) |
| |
| error_flag = True |
| |
| for del_item in scene_graph_wo_layout: |
| if depth_scene_graph[del_item["new_object_id"]] >= d: |
| if "position" in del_item.keys() and not point_bbox[del_item["new_object_id"]]: |
| if verbose: |
| print("Deleting position for: ", del_item["new_object_id"]) |
| del del_item["position"] |
| errors = {} |
| break |
| |
| if not error_flag: |
| d += 1 |
| if verbose: |
| get_visualization(self.scene_graph, self.room_priors) |
| |
| def to_json(self, filename="scene_graph.json"): |
| |
| with open(filename, "w") as file: |
| json.dump(self.scene_graph, file, indent=4) |
|
|