| import os |
| import re |
| import json |
| import numpy as np |
| import pandas as pd |
|
|
| import torch |
| import random |
| import pickle as pkl |
| from tqdm import tqdm |
| from torch import Tensor |
| from scipy.spatial import distance_matrix |
| import torch_geometric |
| from torch_geometric.data import HeteroData |
| from torch_geometric.nn import to_hetero |
| |
|
|
|
|
| def cross_product(p1, p2, p3): |
| return (p2[0] - p1[0]) * (p3[1] - p1[1]) - (p3[0] - p1[0]) * (p2[1] - p1[1]) |
|
|
| def colinear(p1, p2, p3): |
| if (p1[1]-p2[1])*(p2[0]-p3[0]) == (p1[0]-p2[0])*(p2[1]-p3[1]) and p3[0]>min(p1[0],p2[0]) and p3[0]<max(p1[0],p2[0]): return True |
| if (p1[1]-p2[1])*(p2[0]-p3[0]) == (p1[0]-p2[0])*(p2[1]-p3[1]) and p3[1]>min(p1[1],p2[1]) and p3[1]<max(p1[1],p2[1]): return True |
|
|
| def is_intersected(p1, p2, p3, p4): |
| if colinear(p1, p2, p3) or colinear(p1, p2, p4): return True |
| if cross_product(p1, p2, p3) * cross_product(p1, p2, p4) < 0 and cross_product(p3, p4, p1) * cross_product(p3, p4, p2) < 0: return True |
| else: return False |
|
|
| |
| def read_singe(df, i): |
| p_i = df[0][i] |
| np_i = list(p_i) |
| rflag = 0 |
| for x in range(len(p_i)-4): |
| if p_i[x] == ')': rflag+=1 |
| if p_i[x:x+4] == '), (' and p_i[x-1]!=')': np_i.insert(x+rflag+1,' 0.0 0.0') |
| elif p_i[x:x+4] == '), (' and p_i[x-1]==')': np_i.insert(x+rflag+1,' 1.0 1.0') |
| p_i = ''.join(np_i) |
|
|
|
|
| pos = np.empty((1,2)) |
| pi_nums = re.findall(r"\d+\.?\d*",p_i) |
| j=0 |
| while j < len(pi_nums)-1: |
| if j == 0: |
| pos[0][0] = float(pi_nums[j]) |
| pos[0][1] = float(pi_nums[j+1]) |
| j+=2 |
| continue |
| pos = np.append(pos,[[float(pi_nums[j]),0]],0) |
| pos[j//2][1] = float(pi_nums[j+1]) |
| j+=2 |
| return pos |
|
|
| def Visi_Edge(pos_join, flag): |
| inside_edge_index = [[],[]] |
| apart_edge_index = [[],[]] |
|
|
| vg_point = [] |
| for i in range(len(pos_join)): vg_point.append((pos_join[i][0], pos_join[i][1])) |
|
|
| hole_p = np.where(flag==1)[0] |
| if len(hole_p) != 0: |
| last_id = 0 |
| for m in range(len(flag)): |
| if flag[m] == 2 or flag[m] == 3: |
| if sum(flag[last_id:m]) == 0: |
| last_id = m+1 |
| continue |
| poly_i = vg_point[last_id:m+1] |
| pos_i = np.arange(last_id, m+1) |
| last_id = m+1 |
| for i in range(len(poly_i)): |
| if flag[pos_i[i]] == 1: |
| for j in range(i, len(flag)): |
| if flag[j]==1 or flag[j] == 2 or flag[j] == 3: |
| hole_i = poly_i[i:j+1] |
| pos_hole = np.arange(i, j+1) |
| for p1 in hole_i: |
| for p2 in poly_i: |
| if p2 not in hole_i: |
| inter_count = 0 |
| for d in range(len(poly_i)-1): |
| p3, p4 = poly_i[d], poly_i[d+1] |
| if is_intersected(p1, p2, p3, p4): inter_count+=1 |
| if inter_count==0: |
| head, tail = pos_i[poly_i.index(p1)], pos_i[poly_i.index(p2)] |
| inside_edge_index[0].append(head), inside_edge_index[1].append(tail) |
|
|
| for i in range(len(vg_point)): |
| p1 = vg_point[i] |
| p1_id = np.count_nonzero(flag[0:i] == 2) + np.count_nonzero(flag[0:i] == 3) |
| for j in range(len(vg_point)): |
| p2 = vg_point[j] |
| if p1 == p2: continue |
| p2_id = np.count_nonzero(flag[0:j] == 2) + np.count_nonzero(flag[0:j] == 3) |
| inter_count = 0 |
| for m in range(len(flag-1)): |
| if flag[m]!=1 and flag[m]!=2 and flag[m]!=3: p3, p4 = vg_point[m], vg_point[m+1] |
| if is_intersected(p1, p2, p3, p4): inter_count+=1 |
| if inter_count==0: |
| head, tail = vg_point.index(p1), vg_point.index(p2) |
| cc = np.count_nonzero(flag[min(head, tail):max(head, tail)] == 2) + np.count_nonzero(flag[min(head, tail): max(head, tail)] == 3) |
| if p1_id!=p2_id and cc!=0: apart_edge_index[0].append(head), apart_edge_index[1].append(tail) |
| |
|
|
| ninside_edge_index = [[],[]] |
| napart_edge_index = [[],[]] |
| exteriors = [[],[]] |
|
|
| if len(hole_p)!=0: |
| for i in range(len(flag)): |
| link_i = [pos_join[inside_edge_index[1][j]] for j in range(len(inside_edge_index[1])) if inside_edge_index[0][j]==i] |
| if len(link_i)==0: continue |
| ninside_edge_index[0].append(i) |
| dis_matrix = distance_matrix([pos_join[i]], link_i) |
| node_i = (link_i[np.argmin(dis_matrix[0])][0], link_i[np.argmin(dis_matrix[0])][1]) |
| ninside_edge_index[1].append(vg_point.index(node_i)) |
|
|
| for i in range(len(vg_point)-1): |
| if flag[i]!=1 and flag[i]!=2 and flag[i]!=3 : exteriors[0].append(i), exteriors[1].append(i+1) |
|
|
| for i in range(len(flag)): |
| link_i = [pos_join[apart_edge_index[1][j]] for j in range(len(apart_edge_index[1])) if apart_edge_index[0][j]==i] |
| if len(link_i)==0: continue |
| napart_edge_index[0].append(i) |
| dis_matrix = distance_matrix([pos_join[i]], link_i) |
| node_i = (link_i[np.argmin(dis_matrix[0])][0], link_i[np.argmin(dis_matrix[0])][1]) |
| napart_edge_index[1].append(vg_point.index(node_i)) |
|
|
| inside_edge_index, apart_edge_index = ninside_edge_index, napart_edge_index |
|
|
| return inside_edge_index, apart_edge_index, exteriors |
|
|
|
|
| def HeteroEdge(pos,k): |
| pos_join = np.delete(pos, np.where(np.sum(pos, 1)==0)[0], axis=0) |
| pos_join = np.delete(pos_join, np.where(np.sum(pos_join, 1)==2)[0], axis=0) |
| pos_join = np.delete(pos_join, np.where(np.sum(pos_join, 1)==4)[0], axis=0) |
| flag = np.zeros(len(pos_join)) |
| pos = np.delete(pos, 0, axis=0) |
| count, id = 0, 0 |
| while count<k: |
| for i in range(len(pos)): |
| if pos[i][0]==0: |
| flag[i-1]=1 |
| pos = np.delete(pos, i, axis=0) |
| break |
| elif pos[i][0]==1: |
| flag[i-1]=2 |
| pos = np.delete(pos, i, axis=0) |
| break |
| elif pos[i][0]==2: |
| flag[i-1]=3 |
| pos = np.delete(pos, i, axis=0) |
| pos_join[id:i, 0]+=count |
| count+=1 |
| id = i |
| break |
| pos_join = pos_join |
| inside_edge_index, apart_edge_index, exteriors = Visi_Edge(pos_join, flag) |
|
|
| return pos_join, inside_edge_index, apart_edge_index, exteriors |
|
|
| |
| def NNIST_HeteroVG(df, label_df, k): |
| pos = [[0,0]] |
| label = '' |
| for i in np.random.randint(0, len(df), k): |
| while True: |
| if len(pos) == 1 and label_df[0][i] == 0: i = random.randint(0, len(df)) |
| else: break |
| pos = np.append(pos, read_singe(df, i), 0) |
| pos = np.append(pos, [[2,2]], 0) |
| label = label+'%d'%(label_df[0][i]) |
|
|
| label = int(label) |
| pos_join, inside, apart, exteriors = HeteroEdge(pos,k) |
|
|
| data = HeteroData() |
|
|
| data['vertices'].x = torch.zeros((len(pos_join), 1), dtype=torch.float) |
| data.y = torch.tensor(label, dtype=torch.int) |
| data.pos = torch.tensor(pos_join, dtype=torch.float) |
|
|
| data['vertices', 'inside', 'vertices'].edge_index = torch.tensor([inside[0]+inside[1]+exteriors[0],inside[1]+inside[0]+exteriors[1]], dtype=torch.long) |
| data['vertices', 'apart', 'vertices'].edge_index = torch.tensor([apart[0]+apart[1],apart[1]+apart[0]], dtype=torch.long) |
| data['vertices', 'inside', 'vertices'].edge_attr = torch.zeros((len(data['vertices', 'inside', 'vertices'].edge_index[0]),1), dtype=torch.float) |
| data['vertices', 'apart', 'vertices'].edge_attr = torch.zeros((len(data['vertices', 'apart', 'vertices'].edge_index[0]),1), dtype=torch.float) |
|
|
| return data |
|
|
| mnist_filename = '/content/drive/MyDrive/MINST_Polygons/polyMNIST/mnist_polygon_test.json' |
| label_filename = '/content/drive/MyDrive/MINST_Polygons/polyMNIST/mnist_label_test.json' |
| df = pd.read_json(mnist_filename) |
| label_df = pd.read_json(label_filename) |
|
|
| K = 2 |
| N = 10 |
| multi_mnist_dataset = [] |
| for k in range(2, K+1): |
| for i in tqdm(range(N)): |
| data = NNIST_HeteroVG(df, label_df, k=k) |
| multi_mnist_dataset.append(data) |
|
|
| if not os.path.exists('/content/drive/MyDrive/MINST_Polygons/multi_mnist'): |
| os.makedirs('/content/drive/MyDrive/MINST_Polygons/multi_mnist') |
| with open('/content/drive/MyDrive/MINST_Polygons/multi_mnist/multi_mnist.pkl','wb') as file: |
| pkl.dump(multi_mnist_dataset, file) |
|
|