| """ |
| Before we can identify damage sites, we need to look for suitable regions in the image. |
| Typically, damage sites appear as dark regions in the image. Instead of simple thresholding, we use |
| a clustering approach to identify regions that belong together and form damage site candidates. |
| """ |
|
|
| import numpy as np |
| import scipy.ndimage as ndi |
| from scipy.spatial import KDTree |
| from sklearn.cluster import DBSCAN |
| import logging |
| from PIL import Image |
|
|
|
|
| def get_centroids(image, image_threshold=20, |
| eps=1, min_samples=5, metric='euclidean', |
| min_size=20, fill_holes=False, |
| filter_close_centroids=False, filter_radius=50) -> list: |
| """ |
| Determine centroids of clusters corresponding to potential damage sites. |
| In a first step, a threshold is applied to the input image to identify areas of potential damage sites. |
| Using DBSCAN, these agglomerations of pixels are fitted into clusters. Then, the mean x/y values are determined |
| from pixels belonging to one cluster. If the number of pixels in a given cluster excees the threshold given by min_size, this cluster is added |
| to the list of (x,y) coordinates that is returned as the final list potential damage sites. |
| |
| Sometimes, clusters may be found in very close proximity to each other, we can reject those to avoid |
| classifying the same event multiple times (which may distort our statistics). |
| |
| DBScan documentation: https://scikit-learn.org/stable/modules/generated/sklearn.cluster.DBSCAN.html |
| |
| Args: |
| image: Input SEM image (PIL Image or NumPy array). |
| image_threshold (int, optional): Threshold to be applied to the image to identify candidates for damage sites. Defaults to 20. |
| eps (int, optional): parameter eps of DBSCAN: The maximum distance between two samples for one to be considered as in the neighborhood of the other. Defaults to 1. |
| min_samples (int, optional): parameter min_samples of DBSCAN: The number of samples (or total weight) in a neighborhood for a point to be considered as a core point. Defaults to 5. |
| metric (str, optional): parameter metric of DBSCAN. Defaults to 'euclidean'. |
| min_size (int, optional): Minimum number of pixels in a cluster for the damage site candidate to be considered in the final list. Defaults to 20. |
| fill_holes (bool, optional): Fill small holes in damage sites clusters using binary_fill_holes. Defaults to False. |
| filter_close_centroids (bool, optional): Filter cluster centroids within a given radius. Defaults to False |
| filter_radius (float, optional): Radius within which centroids are considered to be the same. Defaults to 50 |
| |
| Returns: |
| list: list of (x,y) coordinates of the centroids of the clusters of accepted damage site candidates. |
| """ |
|
|
| centroids = [] |
| logging.info(f"get_centroids: Input image type: {type(image)}") |
|
|
| |
| |
| if isinstance(image, Image.Image): |
| |
| if image.mode == 'RGB': |
| image_array = np.array(image.convert('L')) |
| logging.info("get_centroids: Converted RGB PIL Image to grayscale NumPy array.") |
| else: |
| image_array = np.array(image) |
| logging.info("get_centroids: Converted PIL Image to NumPy array.") |
| elif isinstance(image, np.ndarray): |
| |
| if image.ndim == 3 and image.shape[2] in [3, 4]: |
| image_array = np.mean(image, axis=2).astype(image.dtype) |
| logging.info("get_centroids: Converted multi-channel NumPy array to grayscale NumPy array.") |
| else: |
| image_array = image |
| logging.info("get_centroids: Image is already a suitable NumPy array.") |
| else: |
| logging.error("get_centroids: Unsupported image format received. Expected PIL Image or NumPy array.") |
| raise ValueError("Unsupported image format. Expected PIL Image or NumPy array for thresholding.") |
|
|
| |
| |
| |
| cluster_candidates_mask = image_array < image_threshold |
| |
|
|
| |
| |
| |
| |
| |
| if fill_holes: |
| cluster_candidates_mask = ndi.binary_fill_holes(cluster_candidates_mask) |
|
|
| |
| cluster_candidates = np.asarray(cluster_candidates_mask).nonzero() |
| cluster_candidates = np.transpose(cluster_candidates) |
|
|
| |
| if cluster_candidates.size == 0: |
| logging.warning("No cluster candidates found after thresholding. Returning empty centroids list.") |
| return [] |
|
|
| |
| |
| |
| dbscan = DBSCAN(eps=eps, min_samples=min_samples, metric=metric) |
|
|
| dbscan.fit(cluster_candidates) |
|
|
| labels = dbscan.labels_ |
| |
| n_clusters = len(set(labels)) - (1 if -1 in labels else 0) |
| n_noise = list(labels).count(-1) |
| logging.debug('# clusters {}, #noise {}'.format(n_clusters, n_noise)) |
|
|
|
|
| |
| |
| |
| for i in set(labels): |
| if i > -1: |
| |
| cluster_points = cluster_candidates[labels == i, :] |
| if len(cluster_points) > min_size: |
| x_mean = np.mean(cluster_points, axis=0)[0] |
| y_mean = np.mean(cluster_points, axis=0)[1] |
| centroids.append([x_mean, y_mean]) |
|
|
| if filter_close_centroids and len(centroids) > 1: |
| proximity_tree = KDTree(centroids) |
| pairs = proximity_tree.query_pairs(filter_radius) |
| |
| |
| indices_to_remove = set() |
| for p1_idx, p2_idx in pairs: |
| |
| |
| indices_to_remove.add(max(p1_idx, p2_idx)) |
| |
| |
| filtered_centroids = [centroid for i, centroid in enumerate(centroids) if i not in indices_to_remove] |
| centroids = filtered_centroids |
| logging.info(f"Filtered {len(indices_to_remove)} close centroids. Remaining: {len(centroids)}") |
|
|
|
|
| return centroids |
|
|