| """ |
| Funkcje pomocnicze dla modelu Magiv2. |
| |
| ═══════════════════════════════════════════════════════════════════════════════ |
| STRESZCZENIE ZAWARTOŚCI PLIKU |
| ═══════════════════════════════════════════════════════════════════════════════ |
| |
| Ten moduł zawiera narzędzia pomocnicze do przetwarzania i wizualizacji wyników |
| modelu Magiv2 dla analizy komiksów/mangi. Plik składa się z 5 głównych kategorii: |
| |
| 1. ZARZĄDZANIE URZĄDZENIAMI |
| ├─ move_to_device() - Rekurencyjne przenoszenie danych między CPU/GPU |
| │ Obsługuje: dict, list, tuple, numpy.ndarray, torch.Tensor |
| └─ Używane przy każdym wywołaniu modelu do przeniesienia danych na właściwe urządzenie |
| |
| 2. STRUKTURA UNION-FIND DO KLASTROWANIA (linie ~53-190) |
| ├─ class UnionFind - Disjoint Set Union z kompresją ścieżki i union by size |
| │ ├─ __init__(n) - Inicjalizacja n rozłącznych elementów |
| │ ├─ from_adj_matrix() - Tworzenie z macierzy sąsiedztwa |
| │ ├─ from_adj_list() - Tworzenie z listy sąsiedztwa |
| │ ├─ from_edge_list() - Tworzenie z listy krawędzi |
| │ ├─ find(x) - Znajdowanie korzenia zbioru (z path compression) |
| │ ├─ unite(x, y) - Łączenie zbiorów (z union by size) |
| │ ├─ get_components_of(x) - Wszystkie elementy w zbiorze x |
| │ ├─ are_connected(x, y) - Sprawdzanie czy x i y w tym samym zbiorze |
| │ ├─ get_size(x) - Rozmiar zbioru zawierającego x |
| │ ├─ get_num_components() - Liczba rozłącznych zbiorów |
| │ └─ get_labels_for_connected_components() - Generowanie etykiet klastrów |
| └─ Używane do grupowania postaci na podstawie macierzy podobieństwa |
| |
| 3. WIZUALIZACJA WYNIKÓW |
| ├─ visualise_single_image_prediction() - Główna funkcja wizualizacji |
| │ ├─ Rysuje bounding boxy: panele (zielone), tekst (czerwone), |
| │ │ postaci (niebieskie), ogony dymków (fioletowe) |
| │ ├─ Wyświetla imiona postaci nad ich bounding boxami |
| │ ├─ Rysuje klastry postaci (ta sama osoba) jako kolorowe linie w układzie gwiazdki |
| │ ├─ Pokazuje asocjacje tekst-postać (kto mówi) - czerwone przerywane linie |
| │ ├─ Pokazuje asocjacje tekst-ogon - fioletowe przerywane linie |
| │ └─ Zwraca obraz jako numpy array lub zapisuje do pliku |
| └─ plot_bboxes() - Pomocnicza funkcja do rysowania prostokątów |
| |
| 4. SORTOWANIE PANELI I TEKSTÓW W KOLEJNOŚCI CZYTANIA |
| |
| A. Sortowanie paneli (manga: prawo->lewo, góra->dół): |
| ├─ sort_panels() - Główny algorytm sortowania paneli |
| │ ├─ Buduje skierowany graf kolejności czytania |
| │ ├─ Używa erozji paneli (5%) do obsługi niedokładnych detekcji |
| │ ├─ Usuwa cykle przez eliminację najdłuższych krawędzi |
| │ └─ Zwraca sortowanie topologiczne (kolejność czytania) |
| │ |
| ├─ is_there_a_directed_edge() - Określa czy panel A jest przed B |
| │ ├─ Reguły mangi: prawo ma priorytet nad górą |
| │ ├─ Obsługuje nakładające się panele przez erozję |
| │ └─ Używa heurystyk cięć (cuts) dla skomplikowanych układów |
| │ |
| ├─ use_cuts_to_determine_edge_from_a_to_b() - Zaawansowane heurystyki |
| │ ├─ Dzieli panele na "wiersze" (overlapping Y ranges) |
| │ ├─ Dzieli panele na "kolumny" (overlapping X ranges) |
| │ └─ Iteracyjna erozja gdy nie można określić kolejności |
| │ |
| └─ Funkcje pomocnicze geometrii: |
| ├─ is_strictly_above/below/left_of/right_of() - Relacje przestrzenne |
| ├─ intersects() - Sprawdzanie przecięcia prostokątów (Shapely) |
| ├─ get_distance() - Odległość euklidesowa między prostokątami |
| ├─ erode_rectangle() - Zmniejszanie prostokąta z zachowaniem aspect ratio |
| └─ merge_overlapping_ranges() - Scalanie nakładających się zakresów 1D |
| |
| B. Sortowanie tekstów: |
| ├─ sort_text_boxes_in_reading_order() - Sortuje teksty według paneli |
| │ ├─ Przypisuje każdy tekst do najbliższego panelu |
| │ ├─ Sortuje teksty według kolejności paneli |
| │ └─ W każdym panelu sortuje według odległości od prawego górnego rogu |
| │ |
| ├─ get_text_to_panel_mapping() - Przypisanie tekst->panel |
| │ ├─ Preferuje nakładanie się (intersection area) |
| │ └─ Fallback: najbliższy panel (distance) |
| │ |
| └─ sort_texts_within_panel() - Sortowanie w obrębie jednego panelu |
| └─ Sortuje według odległości od prawego górnego rogu panelu |
| |
| 5. KONWERSJE FORMATÓW BOUNDING BOXÓW |
| ├─ x1y1wh_to_x1y1x2y2() - (x, y, width, height) -> (x1, y1, x2, y2) |
| ├─ x1y1x2y2_to_xywh() - (x1, y1, x2, y2) -> (x, y, width, height) |
| │ └─ Format COCO używa xywh zamiast corners |
| └─ convert_to_list_of_lists() - Uniwersalna konwersja torch/numpy/list |
| |
| ═══════════════════════════════════════════════════════════════════════════════ |
| KLUCZOWE ALGORYTMY |
| ═══════════════════════════════════════════════════════════════════════════════ |
| |
| 1. UNION-FIND (O(α(n)) - prawie stała): |
| - Path compression: podczas find() ustawiamy rodzica bezpośrednio na korzeń |
| - Union by size: mniejszy zbiór dołączamy do większego dla zbalansowania |
| - Używane do klastrowania postaci z macierzy podobieństwa |
| |
| 2. SORTOWANIE PANELI (O(n² log n)): |
| - Graf skierowany gdzie krawędź A->B = "A przed B" |
| - Reguły: prawo > góra (manga) lub lewo > góra (komiks zachodni) |
| - Usuwanie cykli przez eliminację najdłuższych krawędzi |
| - Sortowanie topologiczne DAG dla finalnej kolejności |
| - Erozja progresywna (5% na iterację) dla nakładających się paneli |
| |
| 3. SORTOWANIE TEKSTÓW (O(n log n)): |
| - Przypisanie do paneli: max(intersection_area) lub min(distance) |
| - Sortowanie według ID panelu (panele już posortowane) |
| - W panelu: sortowanie według distance od prawego górnego rogu |
| - Odległość w Shapely: shortest distance między geometriami |
| |
| ═══════════════════════════════════════════════════════════════════════════════ |
| ZALEŻNOŚCI ZEWNĘTRZNE |
| ═══════════════════════════════════════════════════════════════════════════════ |
| |
| - torch: Tensory GPU/CPU, operacje na urządzeniach |
| - numpy: Operacje na tablicach, NDArray typing |
| - matplotlib: Wizualizacja (pyplot, patches) |
| - shapely: Geometria 2D (Point, box, Polygon) - przecięcia, odległości |
| - networkx: Grafy (DiGraph, topological_sort, simple_cycles) |
| - typing: Type hints (Any, Dict, List, Tuple, Union, Optional) |
| |
| ═══════════════════════════════════════════════════════════════════════════════ |
| TYPOWE UŻYCIE |
| ═══════════════════════════════════════════════════════════════════════════════ |
| |
| # 1. Przeniesienie danych na GPU |
| inputs = move_to_device({"images": np_array, "labels": [0, 1, 2]}, device) |
| |
| # 2. Klastrowanie postaci z macierzy podobieństwa |
| uf = UnionFind.from_adj_matrix(similarity_matrix > threshold) |
| cluster_labels = uf.get_labels_for_connected_components() |
| |
| # 3. Sortowanie paneli w kolejności czytania |
| sorted_panel_indices = sort_panels(panel_bboxes) |
| |
| # 4. Sortowanie tekstów |
| sorted_text_indices = sort_text_boxes_in_reading_order( |
| text_bboxes, sorted_panel_bboxes |
| ) |
| |
| # 5. Wizualizacja wyników |
| image = visualise_single_image_prediction( |
| image_array, predictions, filename="output.png" |
| ) |
| |
| # 6. Konwersja formatów bbox |
| coco_bbox = x1y1x2y2_to_xywh([10, 20, 30, 40]) # -> [10, 20, 20, 20] |
| |
| ═══════════════════════════════════════════════════════════════════════════════ |
| """ |
|
|
| import torch |
| import numpy as np |
| import random |
| import matplotlib.pyplot as plt |
| import matplotlib.patches as patches |
| from shapely.geometry import Point, box |
| from shapely.geometry.polygon import Polygon |
| import networkx as nx |
| from copy import deepcopy |
| from itertools import groupby |
| from typing import Any, Dict, List, Tuple, Union, Optional |
| from numpy.typing import NDArray |
|
|
|
|
| def move_to_device(inputs: Any, device: torch.device) -> Any: |
| """ |
| Rekurencyjnie przenosi dane na określone urządzenie (CPU/GPU). |
| |
| Obsługuje różne typy danych: |
| - Słowniki: przenosi każdy klucz-wartość rekurencyjnie |
| - Listy: przenosi każdy element rekurencyjnie |
| - Tuple: przenosi każdy element rekurencyjnie |
| - numpy.ndarray: konwertuje na torch.Tensor i przenosi |
| - torch.Tensor: przenosi bezpośrednio |
| |
| Args: |
| inputs: Dane do przeniesienia (dict, list, tuple, array, tensor) |
| device: Docelowe urządzenie torch (torch.device) |
| |
| Returns: |
| Dane przeniesione na docelowe urządzenie (ten sam typ co input) |
| """ |
| if hasattr(inputs, "keys"): |
| |
| return {k: move_to_device(v, device) for k, v in inputs.items()} |
| elif isinstance(inputs, list): |
| |
| return [move_to_device(v, device) for v in inputs] |
| elif isinstance(inputs, tuple): |
| |
| return tuple([move_to_device(v, device) for v in inputs]) |
| elif isinstance(inputs, np.ndarray): |
| |
| return torch.from_numpy(inputs).to(device) |
| else: |
| |
| return inputs.to(device) |
|
|
|
|
| class UnionFind: |
| """ |
| Union-Find (Disjoint Set Union) - struktura danych do klastrowania. |
| |
| Używana do grupowania postaci na podstawie macierzy podobieństwa. |
| Implementuje algorytm z kompresją ścieżki (path compression) i |
| łączeniem według rozmiaru (union by size) dla optymalnej wydajności. |
| |
| Attributes: |
| parent: Lista rodziców dla każdego węzła (indeks -> rodzic) |
| size: Rozmiary poddrzew dla każdego korzenia |
| num_components: Liczba rozłącznych komponentów (klastrów) |
| """ |
|
|
| def __init__(self, n: int) -> None: |
| """ |
| Inicjalizuje Union-Find z n rozłącznymi elementami. |
| |
| Args: |
| n: Liczba elementów (węzłów) w strukturze |
| """ |
| self.parent: List[int] = list(range(n)) |
| self.size: List[int] = [1] * n |
| self.num_components: int = n |
|
|
| @classmethod |
| def from_adj_matrix(cls, adj_matrix: torch.Tensor) -> 'UnionFind': |
| """ |
| Tworzy Union-Find z macierzy sąsiedztwa (adjacency matrix). |
| |
| Łączy węzły i,j jeśli adj_matrix[i,j] > 0 (są połączone krawędzią). |
| |
| Args: |
| adj_matrix: Macierz sąsiedztwa [n, n] (1 = połączone, 0 = rozłączone) |
| |
| Returns: |
| Nowa instancja UnionFind z połączonymi węzłami |
| """ |
| ufds: UnionFind = cls(adj_matrix.shape[0]) |
| for i in range(adj_matrix.shape[0]): |
| for j in range(adj_matrix.shape[1]): |
| if adj_matrix[i, j] > 0: |
| ufds.unite(i, j) |
| return ufds |
|
|
| @classmethod |
| def from_adj_list(cls, adj_list: List[List[int]]) -> 'UnionFind': |
| """ |
| Tworzy Union-Find z listy sąsiedztwa (adjacency list). |
| |
| Args: |
| adj_list: Lista list, gdzie adj_list[i] zawiera sąsiadów węzła i |
| |
| Returns: |
| Nowa instancja UnionFind z połączonymi węzłami |
| """ |
| ufds: UnionFind = cls(len(adj_list)) |
| for i in range(len(adj_list)): |
| for j in adj_list[i]: |
| ufds.unite(i, j) |
| return ufds |
|
|
| @classmethod |
| def from_edge_list(cls, edge_list: List[Tuple[int, int]], num_nodes: int) -> 'UnionFind': |
| """ |
| Tworzy Union-Find z listy krawędzi. |
| |
| Args: |
| edge_list: Lista krotek (i, j) reprezentujących krawędzie |
| num_nodes: Całkowita liczba węzłów w grafie |
| |
| Returns: |
| Nowa instancja UnionFind z połączonymi węzłami |
| """ |
| ufds: UnionFind = cls(num_nodes) |
| for edge in edge_list: |
| ufds.unite(edge[0], edge[1]) |
| return ufds |
|
|
| def find(self, x: int) -> int: |
| """ |
| Znajduje korzeń (reprezentanta) zbioru zawierającego x. |
| |
| Implementuje kompresję ścieżki (path compression) - podczas |
| przechodzenia do korzenia, ustawia rodzica każdego węzła |
| bezpośrednio na korzeń dla przyszłych szybszych zapytań. |
| |
| Args: |
| x: Indeks węzła |
| |
| Returns: |
| Indeks korzenia zbioru zawierającego x |
| """ |
| if self.parent[x] == x: |
| return x |
| |
| self.parent[x] = self.find(self.parent[x]) |
| return self.parent[x] |
|
|
| def unite(self, x: int, y: int) -> None: |
| """ |
| Łączy zbiory zawierające x i y. |
| |
| Implementuje union by size - mniejszy zbiór jest dołączany |
| do większego dla utrzymania zbalansowanego drzewa. |
| |
| Args: |
| x: Indeks pierwszego węzła |
| y: Indeks drugiego węzła |
| """ |
| x = self.find(x) |
| y = self.find(y) |
| if x != y: |
| |
| if self.size[x] < self.size[y]: |
| x, y = y, x |
| self.parent[y] = x |
| self.size[x] += self.size[y] |
| self.num_components -= 1 |
|
|
| def get_components_of(self, x: int) -> List[int]: |
| """ |
| Zwraca wszystkie węzły w tym samym zbiorze co x. |
| |
| Args: |
| x: Indeks węzła |
| |
| Returns: |
| Lista indeksów wszystkich węzłów w zbiorze zawierającym x |
| """ |
| x = self.find(x) |
| return [i for i in range(len(self.parent)) if self.find(i) == x] |
|
|
| def are_connected(self, x: int, y: int) -> bool: |
| """ |
| Sprawdza czy x i y są w tym samym zbiorze. |
| |
| Args: |
| x: Indeks pierwszego węzła |
| y: Indeks drugiego węzła |
| |
| Returns: |
| True jeśli x i y są w tym samym zbiorze, False w przeciwnym razie |
| """ |
| return self.find(x) == self.find(y) |
|
|
| def get_size(self, x: int) -> int: |
| """ |
| Zwraca rozmiar zbioru zawierającego x. |
| |
| Args: |
| x: Indeks węzła |
| |
| Returns: |
| Liczba węzłów w zbiorze zawierającym x |
| """ |
| return self.size[self.find(x)] |
|
|
| def get_num_components(self) -> int: |
| """ |
| Zwraca liczbę rozłącznych zbiorów (komponentów). |
| |
| Returns: |
| Liczba rozłącznych zbiorów w strukturze |
| """ |
| return self.num_components |
|
|
| def get_labels_for_connected_components(self) -> List[int]: |
| """ |
| Generuje etykiety klastrów dla wszystkich węzłów. |
| |
| Węzły w tym samym zbiorze otrzymują tę samą etykietę (0, 1, 2, ...). |
| Etykiety są przypisywane w kolejności pierwszego napotkania korzenia. |
| |
| Returns: |
| Lista etykiet klastrów (długość n), gdzie labels[i] to klaster węzła i |
| """ |
| map_parent_to_label: Dict[int, int] = {} |
| labels: List[int] = [] |
| for i in range(len(self.parent)): |
| parent: int = self.find(i) |
| if parent not in map_parent_to_label: |
| map_parent_to_label[parent] = len(map_parent_to_label) |
| labels.append(map_parent_to_label[parent]) |
| return labels |
|
|
|
|
| def visualise_single_image_prediction( |
| image_as_np_array: NDArray[np.uint8], |
| predictions: Dict[str, Any], |
| filename: Optional[str] |
| ) -> NDArray[np.uint8]: |
| """ |
| Wizualizuje wyniki predykcji modelu na obrazie strony mangi/komiksu. |
| |
| Rysuje: |
| - Zielone prostokąty: panele |
| - Czerwone prostokąty: tekst (tylko essential_text, tj. dialogi) |
| - Niebieskie prostokąty: postaci |
| - Fioletowe prostokąty: ogony dymków |
| - Niebieskie etykiety: imiona postaci |
| - Kolorowe linie: klastry postaci (ta sama osoba) |
| - Czerwone przerywane linie: asocjacje tekst-postać (kto mówi) |
| - Fioletowe przerywane linie: asocjacje tekst-ogon |
| |
| Args: |
| image_as_np_array: Obraz strony jako numpy array [H, W, C] |
| predictions: Słownik z wynikami zawierający klucze: |
| - "panels", "texts", "characters", "tails": bounding boxy |
| - "character_names": imiona postaci |
| - "character_cluster_labels": etykiety klastrów postaci |
| - "text_character_associations": pary (idx_tekstu, idx_postaci) |
| - "text_tail_associations": pary (idx_tekstu, idx_ogona) |
| - "is_essential_text": flagi czy tekst to dialog |
| filename: Opcjonalna ścieżka do zapisu wizualizacji (lub None) |
| |
| Returns: |
| Obraz wizualizacji jako numpy array [H, W, C] |
| """ |
| figure, subplot = plt.subplots(1, 1, figsize=(10, 10)) |
| subplot.imshow(image_as_np_array) |
|
|
| |
| plot_bboxes(subplot, predictions["panels"], color="green") |
| plot_bboxes(subplot, predictions["texts"], color="red", |
| visibility=predictions["is_essential_text"]) |
| plot_bboxes(subplot, predictions["characters"], color="blue") |
| plot_bboxes(subplot, predictions["tails"], color="purple") |
|
|
| |
| for i, name in enumerate(predictions["character_names"]): |
| char_bbox: List[float] = predictions["characters"][i] |
| x1: float |
| y1: float |
| x2: float |
| y2: float |
| x1, y1, x2, y2 = char_bbox |
| subplot.text(x1, y1 - 2, name, |
| verticalalignment='bottom', horizontalalignment='left', |
| |
| bbox=dict(facecolor='blue', alpha=1, edgecolor='none'), |
| color='white', fontsize=8) |
|
|
| |
| COLOURS: List[str] = [ |
| "#b7ff51", |
| "#f50a8f", |
| "#4b13b6", |
| "#ddaa34", |
| "#bea2a2", |
| ] |
| colour_index: int = 0 |
| character_cluster_labels: List[int] = predictions["character_cluster_labels"] |
| |
| unique_label_sorted_by_frequency: List[int] = sorted(list(set( |
| character_cluster_labels)), key=lambda x: character_cluster_labels.count(x), reverse=True) |
|
|
| |
| for label in unique_label_sorted_by_frequency: |
| root: Optional[int] = None |
| others: List[int] = [] |
| |
| for i in range(len(predictions["characters"])): |
| if character_cluster_labels[i] == label: |
| if root is None: |
| root = i |
| else: |
| others.append(i) |
|
|
| |
| if colour_index >= len(COLOURS): |
| |
| random_colour: str = COLOURS[0] |
| while random_colour in COLOURS: |
| random_colour = "#" + \ |
| "".join([random.choice("0123456789ABCDEF") |
| for j in range(6)]) |
| else: |
| random_colour: str = COLOURS[colour_index] |
| colour_index += 1 |
|
|
| |
| bbox_i: List[float] = predictions["characters"][root] |
| x1: float = bbox_i[0] + (bbox_i[2] - bbox_i[0]) / 2 |
| y1: float = bbox_i[1] + (bbox_i[3] - bbox_i[1]) / 2 |
| |
| subplot.plot([x1], [y1], color=random_colour, marker="o", markersize=5) |
|
|
| |
| for j in others: |
| bbox_j: List[float] = predictions["characters"][j] |
| x1 = bbox_i[0] + (bbox_i[2] - bbox_i[0]) / 2 |
| y1 = bbox_i[1] + (bbox_i[3] - bbox_i[1]) / 2 |
| x2: float = bbox_j[0] + (bbox_j[2] - bbox_j[0]) / 2 |
| y2: float = bbox_j[1] + (bbox_j[3] - bbox_j[1]) / 2 |
| |
| subplot.plot([x1, x2], [y1, y2], color=random_colour, linewidth=2) |
| |
| subplot.plot([x2], [y2], color=random_colour, |
| marker="o", markersize=5) |
|
|
| |
| for (i, j) in predictions["text_character_associations"]: |
| bbox_i: List[float] = predictions["texts"][i] |
| bbox_j: List[float] = predictions["characters"][j] |
| |
| if not predictions["is_essential_text"][i]: |
| continue |
| |
| x1: float = bbox_i[0] + (bbox_i[2] - bbox_i[0]) / 2 |
| y1: float = bbox_i[1] + (bbox_i[3] - bbox_i[1]) / 2 |
| x2: float = bbox_j[0] + (bbox_j[2] - bbox_j[0]) / 2 |
| y2: float = bbox_j[1] + (bbox_j[3] - bbox_j[1]) / 2 |
| |
| subplot.plot([x1, x2], [y1, y2], color="red", |
| linewidth=2, linestyle="dashed") |
|
|
| |
| for (i, j) in predictions["text_tail_associations"]: |
| bbox_i: List[float] = predictions["texts"][i] |
| bbox_j: List[float] = predictions["tails"][j] |
| |
| x1: float = bbox_i[0] + (bbox_i[2] - bbox_i[0]) / 2 |
| y1: float = bbox_i[1] + (bbox_i[3] - bbox_i[1]) / 2 |
| x2: float = bbox_j[0] + (bbox_j[2] - bbox_j[0]) / 2 |
| y2: float = bbox_j[1] + (bbox_j[3] - bbox_j[1]) / 2 |
| |
| subplot.plot([x1, x2], [y1, y2], color="purple", |
| linewidth=2, linestyle="dashed") |
|
|
| |
| subplot.axis("off") |
| |
| if filename is not None: |
| plt.savefig(filename, bbox_inches="tight", pad_inches=0) |
|
|
| |
| figure.canvas.draw() |
| image: NDArray[np.uint8] = np.array(figure.canvas.renderer._renderer) |
| plt.close() |
| return image |
|
|
|
|
| def plot_bboxes( |
| subplot: Any, |
| bboxes: List[List[float]], |
| color: str = "red", |
| visibility: Optional[List[int]] = None |
| ) -> None: |
| """ |
| Rysuje bounding boxy na subplocie matplotlib. |
| |
| Args: |
| subplot: Subplot matplotlib do rysowania |
| bboxes: Lista bounding boxów w formacie [x1, y1, x2, y2] |
| color: Kolor krawędzi prostokątów (domyślnie "red") |
| visibility: Opcjonalna lista flag (1=widoczny, 0=ukryty). |
| Jeśli None, wszystkie boxy są widoczne |
| """ |
| if visibility is None: |
| visibility = [1] * len(bboxes) |
|
|
| for id, bbox in enumerate(bboxes): |
| |
| if visibility[id] == 0: |
| continue |
| |
| w: float = bbox[2] - bbox[0] |
| h: float = bbox[3] - bbox[1] |
| |
| rect: patches.Rectangle = patches.Rectangle( |
| bbox[:2], w, h, linewidth=1, edgecolor=color, facecolor="none", linestyle="solid" |
| ) |
| subplot.add_patch(rect) |
|
|
|
|
| def sort_panels(rects: Union[torch.Tensor, NDArray, List[List[float]]]) -> List[int]: |
| """ |
| Sortuje panele w kolejności czytania (prawo->lewo, góra->dół dla mangi). |
| |
| Algorytm: |
| 1. Lekka erozja paneli aby obsłużyć niedokładne detekcje |
| 2. Budowa grafu skierowanego z krawędziami reprezentującymi kolejność czytania |
| 3. Usunięcie cykli przez eliminację najdłuższych krawędzi w każdym cyklu |
| 4. Sortowanie topologiczne grafu acyklicznego |
| |
| Args: |
| rects: Bounding boxy paneli [x1, y1, x2, y2] |
| |
| Returns: |
| Lista indeksów paneli w kolejności czytania |
| """ |
| before_rects: List[List[float]] = convert_to_list_of_lists(rects) |
| |
| rects_eroded: List[List[float]] = [ |
| erode_rectangle(rect, 0.05) for rect in before_rects] |
|
|
| |
| G: nx.DiGraph = nx.DiGraph() |
| G.add_nodes_from(range(len(rects_eroded))) |
| for i in range(len(rects_eroded)): |
| for j in range(len(rects_eroded)): |
| if i == j: |
| continue |
| |
| if is_there_a_directed_edge(i, j, rects_eroded): |
| G.add_edge(i, j, weight=get_distance( |
| rects_eroded[i], rects_eroded[j])) |
| else: |
| G.add_edge(j, i, weight=get_distance( |
| rects_eroded[i], rects_eroded[j])) |
|
|
| |
| while True: |
| cycles: List[List[int]] = sorted(nx.simple_cycles(G)) |
| cycles = [cycle for cycle in cycles if len(cycle) > 1] |
| if len(cycles) == 0: |
| break |
| |
| cycle: List[int] = cycles[0] |
| |
| edges: List[Tuple[int, int]] = [ |
| e for e in zip(cycle, cycle[1:] + cycle[:1])] |
| |
| max_cyclic_edge: Tuple[int, int] = max( |
| edges, key=lambda x: G.edges[x]["weight"]) |
| G.remove_edge(*max_cyclic_edge) |
|
|
| |
| return list(nx.topological_sort(G)) |
|
|
|
|
| def is_strictly_above(rectA: List[float], rectB: List[float]) -> bool: |
| """Sprawdza czy rectA jest całkowicie nad rectB (dolna krawędź A < górna krawędź B).""" |
| x1A: float |
| y1A: float |
| x2A: float |
| y2A: float |
| x1A, y1A, x2A, y2A = rectA |
| x1B: float |
| y1B: float |
| x2B: float |
| y2B: float |
| x1B, y1B, x2B, y2B = rectB |
| return y2A < y1B |
|
|
|
|
| def is_strictly_below(rectA: List[float], rectB: List[float]) -> bool: |
| """Sprawdza czy rectA jest całkowicie pod rectB (dolna krawędź B < górna krawędź A).""" |
| x1A: float |
| y1A: float |
| x2A: float |
| y2A: float |
| x1A, y1A, x2A, y2A = rectA |
| x1B: float |
| y1B: float |
| x2B: float |
| y2B: float |
| x1B, y1B, x2B, y2B = rectB |
| return y2B < y1A |
|
|
|
|
| def is_strictly_left_of(rectA: List[float], rectB: List[float]) -> bool: |
| """Sprawdza czy rectA jest całkowicie na lewo od rectB (prawa krawędź A < lewa krawędź B).""" |
| x1A: float |
| y1A: float |
| x2A: float |
| y2A: float |
| x1A, y1A, x2A, y2A = rectA |
| x1B: float |
| y1B: float |
| x2B: float |
| y2B: float |
| x1B, y1B, x2B, y2B = rectB |
| return x2A < x1B |
|
|
|
|
| def is_strictly_right_of(rectA: List[float], rectB: List[float]) -> bool: |
| """Sprawdza czy rectA jest całkowicie na prawo od rectB (prawa krawędź B < lewa krawędź A).""" |
| x1A: float |
| y1A: float |
| x2A: float |
| y2A: float |
| x1A, y1A, x2A, y2A = rectA |
| x1B: float |
| y1B: float |
| x2B: float |
| y2B: float |
| x1B, y1B, x2B, y2B = rectB |
| return x2B < x1A |
|
|
|
|
| def intersects(rectA: List[float], rectB: List[float]) -> bool: |
| """Sprawdza czy dwa prostokąty się przecinają używając Shapely.""" |
| return box(*rectA).intersects(box(*rectB)) |
|
|
|
|
| def is_there_a_directed_edge(a: int, b: int, rects: List[List[float]]) -> bool: |
| """ |
| Określa czy panel 'a' powinien być czytany przed panelem 'b'. |
| |
| Używa reguł kolejności czytania mangi (prawo->lewo, góra->dół): |
| - Jeśli A jest na prawo i nie poniżej B -> A przed B |
| - Jeśli A jest nad i nie na lewo od B -> A przed B |
| - Dla nakładających się paneli używa erozji i heurystyk |
| |
| Args: |
| a: Indeks pierwszego panelu |
| b: Indeks drugiego panelu |
| rects: Lista bounding boxów paneli |
| |
| Returns: |
| True jeśli istnieje krawędź a->b (a przed b), False w przeciwnym razie |
| """ |
| rectA: List[float] = rects[a] |
| rectB: List[float] = rects[b] |
| |
| centre_of_A: List[float] = [rectA[0] + (rectA[2] - rectA[0]) / 2, |
| rectA[1] + (rectA[3] - rectA[1]) / 2] |
| centre_of_B: List[float] = [rectB[0] + (rectB[2] - rectB[0]) / 2, |
| rectB[1] + (rectB[3] - rectB[1]) / 2] |
| |
| if np.allclose(np.array(centre_of_A), np.array(centre_of_B)): |
| return box(*rectA).area > (box(*rectB)).area |
| copy_A = [rectA[0], rectA[1], rectA[2], rectA[3]] |
| copy_B = [rectB[0], rectB[1], rectB[2], rectB[3]] |
| while True: |
| if is_strictly_above(copy_A, copy_B) and not is_strictly_left_of(copy_A, copy_B): |
| return 1 |
| if is_strictly_above(copy_B, copy_A) and not is_strictly_left_of(copy_B, copy_A): |
| return 0 |
| if is_strictly_right_of(copy_A, copy_B) and not is_strictly_below(copy_A, copy_B): |
| return 1 |
| if is_strictly_right_of(copy_B, copy_A) and not is_strictly_below(copy_B, copy_A): |
| return 0 |
| if is_strictly_below(copy_A, copy_B) and is_strictly_right_of(copy_A, copy_B): |
| return use_cuts_to_determine_edge_from_a_to_b(a, b, rects) |
| if is_strictly_below(copy_B, copy_A) and is_strictly_right_of(copy_B, copy_A): |
| return use_cuts_to_determine_edge_from_a_to_b(a, b, rects) |
| |
| copy_A = erode_rectangle(copy_A, 0.05) |
| copy_B = erode_rectangle(copy_B, 0.05) |
|
|
|
|
| def get_distance(rectA: List[float], rectB: List[float]) -> float: |
| """ |
| Oblicza odległość euklidesową między dwoma prostokątami. |
| |
| Args: |
| rectA: Pierwszy prostokąt [x1, y1, x2, y2] |
| rectB: Drugi prostokąt [x1, y1, x2, y2] |
| |
| Returns: |
| Odległość między prostokątami (0 jeśli się przecinają) |
| """ |
| return box(rectA[0], rectA[1], rectA[2], rectA[3]).distance(box(rectB[0], rectB[1], rectB[2], rectB[3])) |
|
|
|
|
| def use_cuts_to_determine_edge_from_a_to_b(a: int, b: int, rects: List[List[float]]) -> bool: |
| """ |
| Używa zaawansowanych heurystyk "cięć" do określenia kolejności czytania paneli. |
| |
| Gdy standardowe reguły przestrzenne (prawo/lewo/góra/dół) nie mogą jednoznacznie |
| określić kolejności między dwoma panelami, ta funkcja stosuje algorytm dzielenia |
| przestrzeni na "wiersze" i "kolumny" aby ustalić która z tych paneli jest pierwsza. |
| |
| Algorytm: |
| 1. Wyznacza minimalny prostokąt otaczający oba panele (a i b) |
| 2. Znajduje wszystkie panele przecinające ten obszar |
| 3. KROK POZIOMY: Dzieli panele na "wiersze" (overlapping Y ranges) |
| - Scala nakładające się zakresy Y w nieprzekrywające się poziomy |
| - Jeśli a i b są w różnych poziomach -> wyższy poziom jest pierwszy |
| 4. KROK PIONOWY: Dzieli panele na "kolumny" (overlapping X ranges, odwrócone) |
| - Scala nakładające się zakresy X w nieprzekrywające się kolumny |
| - Kolumny są odwrócone (prawo->lewo) dla mangi |
| - Jeśli a i b są w różnych kolumnach -> prawa kolumna jest pierwsza |
| 5. EROZJA: Jeśli nadal nie można określić, zmniejsz panele o 5% i powtórz |
| |
| Ta funkcja jest wywoływana tylko dla skomplikowanych układów paneli, |
| gdzie panele są częściowo nakładające się lub ułożone nieregularnie. |
| |
| Args: |
| a: Indeks pierwszego panelu |
| b: Indeks drugiego panelu |
| rects: Lista wszystkich bounding boxów paneli [x1, y1, x2, y2] |
| |
| Returns: |
| True jeśli panel 'a' powinien być czytany przed panelem 'b', False w przeciwnym razie |
| """ |
| |
| rects = deepcopy(rects) |
|
|
| while True: |
| |
| xmin: float |
| ymin: float |
| xmax: float |
| ymax: float |
| xmin, ymin, xmax, ymax = min(rects[a][0], rects[b][0]), min( |
| rects[a][1], rects[b][1]), max(rects[a][2], rects[b][2]), max(rects[a][3], rects[b][3]) |
|
|
| |
| rect_index: List[int] = [i for i in range(len(rects)) if intersects( |
| rects[i], [xmin, ymin, xmax, ymax])] |
| |
| rects_copy: List[List[float]] = [rect for rect in rects if intersects( |
| rect, [xmin, ymin, xmax, ymax])] |
|
|
| |
| |
| overlapping_y_ranges: List[Tuple[float, float]] = merge_overlapping_ranges( |
| [(y1, y2) for x1, y1, x2, y2 in rects_copy]) |
| panel_index_to_split: Dict[int, int] = {} |
|
|
| |
| for split_index, (y1, y2) in enumerate(overlapping_y_ranges): |
| for i, index in enumerate(rect_index): |
| |
| if y1 <= rects_copy[i][1] <= rects_copy[i][3] <= y2: |
| panel_index_to_split[index] = split_index |
|
|
| |
| if panel_index_to_split[a] != panel_index_to_split[b]: |
| return panel_index_to_split[a] < panel_index_to_split[b] |
|
|
| |
| |
| overlapping_x_ranges: List[Tuple[float, float]] = merge_overlapping_ranges( |
| [(x1, x2) for x1, y1, x2, y2 in rects_copy]) |
| panel_index_to_split: Dict[int, int] = {} |
|
|
| |
| |
| for split_index, (x1, x2) in enumerate(overlapping_x_ranges[::-1]): |
| for i, index in enumerate(rect_index): |
| |
| if x1 <= rects_copy[i][0] <= rects_copy[i][2] <= x2: |
| panel_index_to_split[index] = split_index |
|
|
| |
| if panel_index_to_split[a] != panel_index_to_split[b]: |
| return panel_index_to_split[a] < panel_index_to_split[b] |
|
|
| |
| |
| rects = [erode_rectangle(rect, 0.05) for rect in rects] |
|
|
|
|
| def erode_rectangle(bbox: List[float], erosion_factor: float) -> List[float]: |
| """ |
| Zmniejsza prostokąt proporcjonalnie zachowując aspect ratio. |
| |
| Erozja jest stosowana względem krótszego boku aby zachować kształt. |
| Używane do obsługi niedokładnych detekcji paneli. |
| |
| Args: |
| bbox: Bounding box [x1, y1, x2, y2] |
| erosion_factor: Współczynnik erozji (0-1), np. 0.05 = 5% redukcja |
| |
| Returns: |
| Zmniejszony bounding box [x1, y1, x2, y2] |
| """ |
| x1: float |
| y1: float |
| x2: float |
| y2: float |
| x1, y1, x2, y2 = bbox |
| w: float |
| h: float |
| w, h = x2 - x1, y2 - y1 |
| |
| cx: float |
| cy: float |
| cx, cy = x1 + w / 2, y1 + h / 2 |
|
|
| |
| if w < h: |
| aspect_ratio: float = w / h |
| erosion_factor_width: float = erosion_factor * aspect_ratio |
| erosion_factor_height: float = erosion_factor |
| else: |
| aspect_ratio: float = h / w |
| erosion_factor_width: float = erosion_factor |
| erosion_factor_height: float = erosion_factor * aspect_ratio |
|
|
| |
| w = w - w * erosion_factor_width |
| h = h - h * erosion_factor_height |
| |
| x1, y1, x2, y2 = cx - w / 2, cy - h / 2, cx + w / 2, cy + h / 2 |
| return [x1, y1, x2, y2] |
|
|
|
|
| def merge_overlapping_ranges(ranges: List[Tuple[float, float]]) -> List[Tuple[float, float]]: |
| """ |
| Scala nakładające się zakresy 1D w nieprzekrywające się zakresy. |
| |
| Używane do dzielenia paneli na "wiersze" lub "kolumny" dla określenia |
| kolejności czytania gdy panele są ułożone nieregularnie. |
| |
| Args: |
| ranges: Lista krotek (początek, koniec) reprezentujących zakresy |
| |
| Returns: |
| Lista scalonych nieprzekrywających się zakresów, posortowana |
| """ |
| if len(ranges) == 0: |
| return [] |
| |
| ranges_sorted: List[Tuple[float, float]] = sorted( |
| ranges, key=lambda x: x[0]) |
| merged_ranges: List[Tuple[float, float]] = [] |
| prev_x1: float |
| prev_x2: float |
| for i, r in enumerate(ranges_sorted): |
| if i == 0: |
| prev_x1, prev_x2 = r |
| continue |
| x1: float |
| x2: float |
| x1, x2 = r |
| |
| if x1 > prev_x2: |
| merged_ranges.append((prev_x1, prev_x2)) |
| prev_x1, prev_x2 = x1, x2 |
| else: |
| |
| prev_x2 = max(prev_x2, x2) |
| |
| merged_ranges.append((prev_x1, prev_x2)) |
| return merged_ranges |
|
|
|
|
| def sort_text_boxes_in_reading_order( |
| text_bboxes: Union[torch.Tensor, NDArray, List[List[float]]], |
| sorted_panel_bboxes: Union[torch.Tensor, NDArray, List[List[float]]] |
| ) -> List[int]: |
| """ |
| Sortuje teksty w kolejności czytania, grupując według paneli. |
| |
| Algorytm: |
| 1. Przypisz każdy tekst do najbliższego/najbardziej nakładającego się panelu |
| 2. Sortuj teksty według ID panelu (panele już są w kolejności czytania) |
| 3. W obrębie każdego panelu, sortuj teksty według odległości od prawego górnego rogu |
| |
| Args: |
| text_bboxes: Bounding boxy tekstów [x1, y1, x2, y2] |
| sorted_panel_bboxes: Bounding boxy paneli już posortowane w kolejności czytania |
| |
| Returns: |
| Lista indeksów tekstów w kolejności czytania |
| """ |
| text_bboxes_list: List[List[float]] = convert_to_list_of_lists(text_bboxes) |
| sorted_panel_bboxes_list: List[List[float]] = convert_to_list_of_lists( |
| sorted_panel_bboxes) |
|
|
| if len(text_bboxes_list) == 0: |
| return [] |
|
|
| def indices_of_same_elements(nums: List[int]) -> List[List[int]]: |
| """Grupuje indeksy według wartości (elementy z tą samą wartością w jednej grupie).""" |
| groups = groupby(range(len(nums)), key=lambda i: nums[i]) |
| return [list(indices) for _, indices in groups] |
|
|
| |
| panel_id_for_text: List[int] = get_text_to_panel_mapping( |
| text_bboxes_list, sorted_panel_bboxes_list) |
| |
| indices_of_texts: List[int] = list(range(len(text_bboxes_list))) |
| indices_of_texts, panel_id_for_text = zip( |
| *sorted(zip(indices_of_texts, panel_id_for_text), key=lambda x: x[1])) |
| indices_of_texts = list(indices_of_texts) |
|
|
| |
| grouped_indices: List[List[int]] = indices_of_same_elements( |
| panel_id_for_text) |
| for group in grouped_indices: |
| subset_of_text_indices: List[int] = [ |
| indices_of_texts[i] for i in group] |
| text_bboxes_of_subset: List[List[float]] = [text_bboxes_list[i] |
| for i in subset_of_text_indices] |
| |
| sorted_subset_indices: List[int] = sort_texts_within_panel( |
| text_bboxes_of_subset) |
| indices_of_texts[group[0]: group[-1] + 1] = [subset_of_text_indices[i] |
| for i in sorted_subset_indices] |
| return indices_of_texts |
|
|
|
|
| def get_text_to_panel_mapping( |
| text_bboxes: List[List[float]], |
| sorted_panel_bboxes: List[List[float]] |
| ) -> List[int]: |
| """ |
| Przypisuje każdy tekst do najbliższego/najbardziej nakładającego się panelu. |
| |
| Algorytm priorytetów: |
| 1. PRIORYTET 1 - Przecięcie (intersection): Jeśli tekst przecina się z jakimś panelem, |
| wybierz panel z największą powierzchnią przecięcia (tekst "w środku" panelu) |
| 2. PRIORYTET 2 - Odległość (distance): Jeśli tekst nie przecina się z żadnym panelem, |
| wybierz najbliższy panel (tekst "obok" panelu) |
| 3. BRAK PANELI: Jeśli nie ma żadnych paneli, przypisz -1 (brak przypisania) |
| |
| Ta funkcja jest kluczowa dla sortowania tekstów w kolejności czytania, |
| ponieważ teksty są grupowane według paneli, a panele są już posortowane. |
| |
| Args: |
| text_bboxes: Lista bounding boxów tekstów [x1, y1, x2, y2] |
| sorted_panel_bboxes: Lista bounding boxów paneli [x1, y1, x2, y2], |
| już posortowana w kolejności czytania |
| |
| Returns: |
| Lista indeksów paneli dla każdego tekstu (długość = len(text_bboxes)). |
| Wartość -1 oznacza brak przypisania (gdy nie ma żadnych paneli). |
| """ |
| text_to_panel_mapping: List[int] = [] |
|
|
| for text_bbox in text_bboxes: |
| |
| shapely_text_polygon: Polygon = box(*text_bbox) |
| all_intersections: List[Tuple[float, int]] = [] |
| |
| all_distances: List[Tuple[float, int]] = [] |
|
|
| |
| if len(sorted_panel_bboxes) == 0: |
| text_to_panel_mapping.append(-1) |
| continue |
|
|
| |
| for j, annotation in enumerate(sorted_panel_bboxes): |
| |
| shapely_annotation_polygon: Polygon = box(*annotation) |
|
|
| |
| if shapely_text_polygon.intersects(shapely_annotation_polygon): |
| intersection_area: float = shapely_text_polygon.intersection( |
| shapely_annotation_polygon).area |
| all_intersections.append((intersection_area, j)) |
|
|
| |
| distance: float = shapely_text_polygon.distance( |
| shapely_annotation_polygon) |
| all_distances.append((distance, j)) |
|
|
| |
| if len(all_intersections) == 0: |
| |
| closest_panel_index: int = min( |
| all_distances, key=lambda x: x[0])[1] |
| text_to_panel_mapping.append(closest_panel_index) |
| else: |
| |
| best_panel_index: int = max( |
| all_intersections, key=lambda x: x[0])[1] |
| text_to_panel_mapping.append(best_panel_index) |
|
|
| return text_to_panel_mapping |
|
|
|
|
| def sort_texts_within_panel(rects: List[List[float]]) -> List[int]: |
| """ |
| Sortuje teksty w obrębie jednego panelu według odległości od prawego górnego rogu. |
| |
| Dla mangi (czytanej prawo->lewo, góra->dół), teksty są czytane od prawego |
| górnego rogu. Algorytm: |
| 1. Znajdź prawy górny róg panelu (max(X), min(Y) ze wszystkich tekstów) |
| 2. Oblicz odległość każdego tekstu od tego punktu odniesienia |
| 3. Sortuj teksty według odległości (najbliższe pierwsze) |
| |
| Tekst najbliższy prawego górnego rogu jest czytany jako pierwszy, |
| następnie kolejne w dół i w lewo. |
| |
| Args: |
| rects: Lista bounding boxów tekstów w jednym panelu [x1, y1, x2, y2] |
| |
| Returns: |
| Lista indeksów tekstów posortowana według kolejności czytania |
| (indeks 0 = pierwszy tekst do przeczytania) |
| """ |
| |
| smallest_y: float = float("inf") |
| greatest_x: float = float("-inf") |
|
|
| for i, rect in enumerate(rects): |
| x1: float |
| y1: float |
| x2: float |
| y2: float |
| x1, y1, x2, y2 = rect |
| smallest_y = min(smallest_y, y1) |
| greatest_x = max(greatest_x, x2) |
|
|
| |
| reference_point: Point = Point(greatest_x, smallest_y) |
|
|
| |
| polygons_and_index: List[Tuple[Polygon, int]] = [] |
| for i, rect in enumerate(rects): |
| x1: float |
| y1: float |
| x2: float |
| y2: float |
| x1, y1, x2, y2 = rect |
| polygons_and_index.append((box(x1, y1, x2, y2), i)) |
|
|
| |
| polygons_and_index = sorted( |
| polygons_and_index, key=lambda x: reference_point.distance(x[0])) |
|
|
| |
| indices: List[int] = [x[1] for x in polygons_and_index] |
| return indices |
|
|
|
|
| def x1y1wh_to_x1y1x2y2(bbox: List[float]) -> List[float]: |
| """ |
| Konwertuje bbox z formatu (x1, y1, width, height) na (x1, y1, x2, y2). |
| |
| Args: |
| bbox: Bounding box [x1, y1, width, height] |
| |
| Returns: |
| Bounding box [x1, y1, x2, y2] (corners format) |
| """ |
| x1: float |
| y1: float |
| w: float |
| h: float |
| x1, y1, w, h = bbox |
| return [x1, y1, x1 + w, y1 + h] |
|
|
|
|
| def x1y1x2y2_to_xywh(bbox: List[float]) -> List[float]: |
| """ |
| Konwertuje bbox z formatu (x1, y1, x2, y2) na (x, y, width, height). |
| |
| Format COCO używa (x, y, w, h) zamiast corners. |
| |
| Args: |
| bbox: Bounding box [x1, y1, x2, y2] (corners format) |
| |
| Returns: |
| Bounding box [x, y, width, height] (COCO format) |
| """ |
| x1: float |
| y1: float |
| x2: float |
| y2: float |
| x1, y1, x2, y2 = bbox |
| return [x1, y1, x2 - x1, y2 - y1] |
|
|
|
|
| def convert_to_list_of_lists(rects: Union[torch.Tensor, NDArray, List]) -> List[List[float]]: |
| """ |
| Konwertuje różne formaty bounding boxów na List[List[float]]. |
| |
| Obsługuje: |
| - torch.Tensor -> list |
| - numpy.ndarray -> list |
| - iterable -> list of lists |
| |
| Args: |
| rects: Bounding boxy w dowolnym formacie |
| |
| Returns: |
| Lista list [[x1, y1, x2, y2], ...] |
| """ |
| if isinstance(rects, torch.Tensor): |
| return rects.tolist() |
| if isinstance(rects, np.ndarray): |
| return rects.tolist() |
| return [[a, b, c, d] for a, b, c, d in rects] |
|
|