|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| import os, math, argparse, json, re, traceback, numpy as np, pandas as pd, trimesh, laspy, shutil
|
| import logging, matplotlib.pyplot as plt, shapely
|
| from shapely.geometry import Polygon, LineString
|
| from scipy.spatial import distance
|
| from tqdm import trange, tqdm
|
| from math import pi
|
|
|
| logging.basicConfig(level=logging.DEBUG, filename='logs.txt',
|
| format='%(asctime)s %(levelname)s %(message)s',
|
| datefmt='%H:%M:%S')
|
| logger = logging.getLogger("prep")
|
|
|
| _precision = 0.00001
|
|
|
| def get_bbox(polyline):
|
| polyline_np = np.array(polyline)
|
| xmin, ymin = np.amin(polyline_np, axis=0)
|
| xmax, ymax = np.amax(polyline_np, axis=0)
|
| return (xmin, ymin, xmax, ymax)
|
|
|
| def get_center_point(pline):
|
| if len(pline) == 0:
|
| return (0, 0)
|
| xs = [p[0] for p in pline]
|
| ys = [p[1] for p in pline]
|
| return (sum(xs) / len(pline), sum(ys) / len(pline))
|
|
|
| def intersect_line(line1, line2):
|
| (x1, y1), (x2, y2) = line1
|
| (x3, y3), (x4, y4) = line2
|
|
|
| denominator = (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4)
|
| if denominator == 0:
|
| return None
|
|
|
| x = ((x1 * y2 - y1 * x2) * (x3 - x4) - (x1 - x2) * (x3 * y4 - y3 * x4)) / denominator
|
| y = ((x1 * y2 - y1 * x2) * (y3 - y4) - (y1 - y2) * (x3 * y4 - y3 * x4)) / denominator
|
|
|
|
|
| if x < min(x1, x2) or x > max(x1, x2) or x < min(x3, x4) or x > max(x3, x4):
|
| return None
|
|
|
| return (x, y)
|
|
|
| def get_positions_pline(base_pline, target_pline):
|
| target_pos_marks = []
|
| for i in range(len(target_pline)):
|
| target = [target_pline[i], (target_pline[i][0], target_pline[i][1] + 1e+10)]
|
| pos = 0.0
|
| for j in range(len(base_pline) - 1):
|
| base = [base_pline[j], base_pline[j + 1]]
|
| intersect = intersect_line(base, target)
|
| if intersect == None:
|
| continue
|
|
|
| if equal(intersect[1], target[0][1]):
|
| pos = 0.0
|
| break
|
|
|
| pos = -1.0 if intersect[1] > target[0][1] else 1.0
|
| break
|
| target_pos_marks.append(pos)
|
|
|
| return target_pos_marks
|
|
|
| def get_below_pline(base_pline, target_pline):
|
| pos_marks = get_positions_pline(base_pline, target_pline)
|
| average = sum(pos_marks) / len(pos_marks)
|
| return average < 0.0
|
|
|
| def get_geometry(xsec, label):
|
| for geom in xsec['geom']:
|
| if geom['label'] == label:
|
| return geom
|
| return None
|
|
|
| def is_point_in_rect(point1, point2, perp):
|
| return is_point_in_rectangle(point1[0], point1[1], point2[0], point2[1], perp[0], perp[1])
|
|
|
| def is_point_in_rectangle(x1, y1, x2, y2, x, y):
|
|
|
| x1, x2 = min(x1, x2), max(x1, x2)
|
| y1, y2 = min(y1, y2), max(y1, y2)
|
|
|
|
|
| return x1 <= x <= x2 and y1 <= y <= y2
|
|
|
| def sign_distance(a, b, c, p):
|
| d = math.sqrt(a*a + b*b)
|
| if d == 0.0:
|
| lp = 0.0
|
| else:
|
| lp = (a * p[0] + b * p[1] + c) / d
|
| return lp
|
|
|
| def equal(a, b):
|
| return abs(a - b) < _precision
|
|
|
| def equal_point(p1, p2):
|
| return equal(p1[0], p2[0]) and equal(p1[1], p2[1])
|
|
|
| def get_angle(x1, y1, x2, y2):
|
| pi = math.acos(-1.0)
|
|
|
|
|
| dx = x2 - x1
|
| dy = y2 - y1
|
|
|
|
|
| if dx == 0 and dy == 0:
|
| return -1.0
|
| if dy < 0 and dx == 0:
|
| angle_radius = pi + pi / 2
|
| elif dy > 0 and dx == 0:
|
| angle_radius = pi / 2
|
| else:
|
| angle_radius = math.atan(dy / dx)
|
|
|
|
|
| if dy >= 0 and dx > 0:
|
| pass
|
| if dy < 0 and dx > 0:
|
| angle_radius += 2 * pi
|
| elif dx < 0:
|
| angle_radius += pi
|
|
|
| return angle_radius
|
|
|
| def line_coefficients(point1, point2):
|
| x1, y1 = point1
|
| x2, y2 = point2
|
|
|
| A = y2 - y1
|
| B = x1 - x2
|
| C = x2*y1 - x1*y2
|
|
|
| return A, B, C
|
|
|
| def sign_point_on_line(point1, point2, new_point):
|
| if equal(point1[0], new_point[0]) and equal(point1[1], new_point[1]):
|
| return 0.0
|
| if equal(point2[0], new_point[0]) and equal(point2[1], new_point[1]):
|
| return 0.0
|
|
|
| line_A, line_B, line_C = line_coefficients(point1, point2)
|
| x, y = new_point
|
| value = line_A * x + line_B * y + line_C
|
| if math.fabs(value) < _precision:
|
| return 0.0
|
| elif value > 0.0:
|
| return 1.0
|
| return -1.0
|
|
|
| def sign_distance_on_line(line_point1, line_point2, point):
|
| direction = sign_point_on_line(line_point1, line_point2, point)
|
| if direction == 0:
|
| return 0.0
|
|
|
| if math.fabs(line_point1[0] - line_point2[0]) < _precision and math.fabs(line_point1[1] <= line_point2[1]) < _precision:
|
| return 0.0
|
|
|
|
|
| x = point[0]
|
| y = point[1]
|
| x1 = line_point1[0]
|
| y1 = line_point1[1]
|
| x2 = line_point2[0]
|
| y2 = line_point2[1]
|
|
|
| if x1 <= x2:
|
| a = 1
|
| b = 0
|
| c = -x1
|
| else:
|
| m = (y2 - y1) / (x2 - x1)
|
| a = -m
|
| b = 1
|
| c = -y1 + (m * x1)
|
|
|
| dist = abs(a * x + b * y + c) / math.sqrt(a * a + b * b)
|
| dist *= float(direction)
|
|
|
| return dist
|
|
|
| def is_point_on_line(point1, point2, perp):
|
| if is_point_in_rect(point1, point2, perp) == False:
|
| return False
|
| direction = sign_point_on_line(point1, point2, perp)
|
| if math.fabs(direction) < _precision:
|
| return True
|
| return False
|
|
|
| def is_overlap_line(line, part_seg):
|
| p1, p2 = line
|
| p3, p4 = part_seg
|
|
|
| f1 = is_point_on_line(p1, p2, p3)
|
| f2 = is_point_on_line(p1, p2, p4)
|
|
|
| if (f1 or f2) and f1 != f2:
|
| if f1 and (equal_point(p1, p3) or equal_point(p2, p3)):
|
| return False
|
| if f2 and (equal_point(p1, p4) or equal_point(p2, p4)):
|
| return False
|
|
|
| return f1 or f2
|
|
|
| def is_on_pline(polyline, base_line):
|
| p1 = base_line[0]
|
| p2 = base_line[1]
|
|
|
| for i in range(len(polyline) - 1):
|
| p3 = polyline[i]
|
| p4 = polyline[i + 1]
|
| if is_overlap_line((p1, p2), (p3, p4)) or is_overlap_line((p3, p4), (p1, p2)):
|
| return True
|
|
|
| return False
|
|
|
| def get_match_line_labels(xsec, base_geom, base_line):
|
| labels = []
|
| for geom in xsec['geom']:
|
| if geom == base_geom:
|
| continue
|
| geom_label = geom['label']
|
| base_label = base_geom['label']
|
| if geom_label == base_label:
|
| continue
|
| closed = geom['closed']
|
| if closed == True:
|
| continue
|
| if geom_label == 'center':
|
| continue
|
|
|
| polyline = geom['polyline']
|
| if is_on_pline(polyline, base_line):
|
| labels.append(geom['label'])
|
| return labels
|
|
|
| def get_seq_feature_tokens(xsec, geom, closed_type):
|
| polyline = geom['polyline']
|
| closed = geom['closed']
|
| if closed != closed_type:
|
| return []
|
|
|
| lines = []
|
| for i in range(len(polyline) - 1):
|
| line = (polyline[i], polyline[i + 1])
|
| lines.append(line)
|
|
|
| geom_tokens = []
|
| for line in lines:
|
| labels = get_match_line_labels(xsec, geom, line)
|
| if len(labels) == 0:
|
| continue
|
|
|
|
|
|
|
|
|
| geom_tokens.extend(labels)
|
|
|
| return geom_tokens
|
|
|
| def translate_geometry(xsec, cp):
|
| for geom in xsec['geom']:
|
| polyline = geom['polyline']
|
| geom['polyline'] = [(p[0] - cp[0], p[1] - cp[1]) for p in polyline]
|
|
|
| return xsec
|
|
|
| def is_closed(polyline):
|
| if equal_point(polyline[0], polyline[-1]):
|
| return True
|
| return False
|
|
|
| def summery_feature(features):
|
| sum_features = []
|
| if len(features) == 0:
|
| return sum_features
|
|
|
| index = 0
|
| while index < len(features):
|
| f = features[index]
|
| sum_feature = f
|
| if type(f) == list:
|
| sum_feature = summery_feature(f)
|
| if len(sum_feature) == 1:
|
| sum_feature = sum_feature[0]
|
| elif type(f) == str:
|
| label = f
|
|
|
| last_index = index
|
| for i in range(index + 1, len(features)):
|
| if type(features[i]) == str:
|
| if features[i] == label:
|
| last_index = i
|
| else:
|
| break
|
| else:
|
| break
|
| if last_index != index:
|
| sum_feature = (f'{f}({last_index - index + 1})')
|
| index = last_index
|
| else:
|
| pass
|
|
|
| sum_features.append(sum_feature)
|
| index += 1
|
|
|
| return sum_features
|
|
|
| def get_intersection_count(xsec, base_geom, target_label):
|
| pave_top = get_geometry(xsec, 'pave_surface')
|
| if pave_top == None:
|
| return 0
|
|
|
| polyline = base_geom['polyline']
|
| polygon = Polygon(polyline)
|
| base_p1 = polygon.centroid
|
| base_p2 = (base_p1.x, base_p1.y + 1e+10)
|
| vertical_line = LineString([base_p1, base_p2])
|
|
|
| count = 0
|
| for target_geom in xsec['geom']:
|
| if base_geom == target_geom:
|
| continue
|
| label = target_geom['label']
|
| if re.search(target_label, label) == None:
|
| continue
|
|
|
|
|
| target_polyline = target_geom['polyline']
|
| polyline = LineString(target_polyline)
|
| ip = shapely.intersection(polyline, vertical_line)
|
| if ip.is_empty:
|
| continue
|
| count += 1
|
|
|
| return count
|
|
|
| def update_xsection_feature(xsec):
|
| gnd_geom = get_geometry(xsec, 'ground')
|
| if gnd_geom == None:
|
| return None
|
|
|
| center = get_geometry(xsec, 'center')
|
| if center == None or 'polyline' not in center:
|
| return None
|
| cp = get_center_point(center['polyline'])
|
|
|
| xsec = translate_geometry(xsec, cp)
|
| station = xsec['station']
|
|
|
| index = 0
|
| while index < len(xsec['geom']):
|
| geom = xsec['geom'][index]
|
| label = geom['label']
|
| polyline = geom['polyline']
|
| closed = geom['closed']
|
| if len(polyline) <= 2 or closed == False:
|
| index += 1
|
| continue
|
|
|
| pt1 = polyline[0]
|
| pt2 = polyline[-1]
|
| if equal_point(pt1, pt2) == False:
|
| polyline.append(pt1)
|
|
|
|
|
| polygon = Polygon(polyline)
|
| if math.fabs(polygon.area) < _precision:
|
| xsec['geom'].pop(index)
|
| continue
|
|
|
| if station == '1+660.00000' and label == 'cut_ditch':
|
| label = 'cut_ditch'
|
|
|
|
|
| if get_below_pline(gnd_geom['polyline'], polyline):
|
| geom['earthwork_feature'].append('below')
|
| else:
|
| geom['earthwork_feature'].append('above')
|
|
|
| if re.search('pave_.*', label):
|
| pave_int_count = get_intersection_count(xsec, geom, 'pave_.*')
|
| geom['earthwork_feature'].append(f'pave_int({pave_int_count})')
|
|
|
| tokens = get_seq_feature_tokens(xsec, geom, True)
|
| if len(tokens) == 1:
|
| geom['earthwork_feature'].append(tokens[0])
|
| else:
|
| geom['earthwork_feature'].extend(tokens)
|
|
|
| geom['earthwork_feature'] = summery_feature(geom['earthwork_feature'])
|
|
|
|
|
| logger.debug(f'{station}. {label} feature: {geom["earthwork_feature"]}')
|
| index += 1
|
|
|
| return xsec
|
|
|
| def update_xsections_feature(xsections):
|
|
|
| for xsec in xsections:
|
| for geom in xsec['geom']:
|
| label = geom['label']
|
| polyline = geom['polyline']
|
| if len(polyline) < 2:
|
| continue
|
| closed = is_closed(polyline)
|
| if closed == False:
|
| closed = False if re.search('pave_layer.*', label) == None else True
|
| geom['closed'] = closed
|
|
|
|
|
| out_xsections = []
|
| for xsec in xsections:
|
| out_xsec = update_xsection_feature(xsec)
|
| if out_xsec == None:
|
| continue
|
| out_xsections.append(out_xsec)
|
|
|
| return out_xsections
|
|
|
| def main():
|
| parser = argparse.ArgumentParser(description='create earthwork train dataset')
|
| parser.add_argument('--input', type=str, default='output/', help='input folder')
|
| parser.add_argument('--output', type=str, default='dataset/', help='output folder')
|
|
|
| args = parser.parse_args()
|
| try:
|
| file_names = os.listdir(args.input)
|
| for file_name in tqdm(file_names):
|
| if file_name.endswith('.json') == False:
|
| continue
|
| print(f'processing {file_name}')
|
| data = None
|
| with open(os.path.join(args.input, file_name), 'r') as f:
|
| data = json.load(f)
|
|
|
| out_xsections = update_xsections_feature(data)
|
|
|
| output_file = os.path.join(args.output, file_name)
|
| with open(output_file, 'w') as f:
|
| json.dump(out_xsections, f, indent=4)
|
|
|
| except Exception as e:
|
| print(f'error: {e}')
|
| traceback.print_exc()
|
|
|
| if __name__ == '__main__':
|
| main() |