| |
|
|
| import json |
| import re |
| from dataclasses import dataclass, field |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional |
|
|
| from lxml import etree |
| import numpy as np |
|
|
| from utils.geo import BoundaryBox, Projection |
|
|
| METERS_PATTERN: re.Pattern = re.compile("^(?P<value>\\d*\\.?\\d*)\\s*m$") |
| KILOMETERS_PATTERN: re.Pattern = re.compile("^(?P<value>\\d*\\.?\\d*)\\s*km$") |
| MILES_PATTERN: re.Pattern = re.compile("^(?P<value>\\d*\\.?\\d*)\\s*mi$") |
|
|
|
|
| def parse_float(string: str) -> Optional[float]: |
| """Parse string representation of a float or integer value.""" |
| try: |
| return float(string) |
| except (TypeError, ValueError): |
| return None |
|
|
|
|
| @dataclass(eq=False) |
| class OSMElement: |
| """ |
| Something with tags (string to string mapping). |
| """ |
|
|
| id_: int |
| tags: Dict[str, str] |
|
|
| def get_float(self, key: str) -> Optional[float]: |
| """Parse float from tag value.""" |
| if key in self.tags: |
| return parse_float(self.tags[key]) |
| return None |
|
|
| def get_length(self, key: str) -> Optional[float]: |
| """Get length in meters.""" |
| if key not in self.tags: |
| return None |
|
|
| value: str = self.tags[key] |
|
|
| float_value: float = parse_float(value) |
| if float_value is not None: |
| return float_value |
|
|
| for pattern, ratio in [ |
| (METERS_PATTERN, 1.0), |
| (KILOMETERS_PATTERN, 1000.0), |
| (MILES_PATTERN, 1609.344), |
| ]: |
| matcher: re.Match = pattern.match(value) |
| if matcher: |
| float_value: float = parse_float(matcher.group("value")) |
| if float_value is not None: |
| return float_value * ratio |
|
|
| return None |
|
|
| def __hash__(self) -> int: |
| return self.id_ |
|
|
|
|
| @dataclass(eq=False) |
| class OSMNode(OSMElement): |
| """ |
| OpenStreetMap node. |
| |
| See https://wiki.openstreetmap.org/wiki/Node |
| """ |
|
|
| geo: np.ndarray |
| visible: Optional[str] = None |
| xy: Optional[np.ndarray] = None |
|
|
| @classmethod |
| def from_dict(cls, structure: Dict[str, Any]) -> "OSMNode": |
| """ |
| Parse node from Overpass-like structure. |
| |
| :param structure: input structure |
| """ |
| return cls( |
| structure["id"], |
| structure.get("tags", {}), |
| geo=np.array((structure["lat"], structure["lon"])), |
| visible=structure.get("visible"), |
| ) |
|
|
|
|
| @dataclass(eq=False) |
| class OSMWay(OSMElement): |
| """ |
| OpenStreetMap way. |
| |
| See https://wiki.openstreetmap.org/wiki/Way |
| """ |
|
|
| nodes: Optional[List[OSMNode]] = field(default_factory=list) |
| visible: Optional[str] = None |
|
|
| @classmethod |
| def from_dict( |
| cls, structure: Dict[str, Any], nodes: Dict[int, OSMNode] |
| ) -> "OSMWay": |
| """ |
| Parse way from Overpass-like structure. |
| |
| :param structure: input structure |
| :param nodes: node structure |
| """ |
| return cls( |
| structure["id"], |
| structure.get("tags", {}), |
| [nodes[x] for x in structure["nodes"]], |
| visible=structure.get("visible"), |
| ) |
|
|
| def is_cycle(self) -> bool: |
| """Is way a cycle way or an area boundary.""" |
| return self.nodes[0] == self.nodes[-1] |
|
|
| def __repr__(self) -> str: |
| return f"Way <{self.id_}> {self.nodes}" |
|
|
|
|
| @dataclass |
| class OSMMember: |
| """ |
| Member of OpenStreetMap relation. |
| """ |
|
|
| type_: str |
| ref: int |
| role: str |
|
|
|
|
| @dataclass(eq=False) |
| class OSMRelation(OSMElement): |
| """ |
| OpenStreetMap relation. |
| |
| See https://wiki.openstreetmap.org/wiki/Relation |
| """ |
|
|
| members: Optional[List[OSMMember]] |
| visible: Optional[str] = None |
|
|
| @classmethod |
| def from_dict(cls, structure: Dict[str, Any]) -> "OSMRelation": |
| """ |
| Parse relation from Overpass-like structure. |
| |
| :param structure: input structure |
| """ |
| return cls( |
| structure["id"], |
| structure["tags"], |
| [OSMMember(x["type"], x["ref"], x["role"]) for x in structure["members"]], |
| visible=structure.get("visible"), |
| ) |
|
|
|
|
| class OSMData: |
| """ |
| The whole OpenStreetMap information about nodes, ways, and relations. |
| """ |
|
|
| def __init__(self) -> None: |
| self.nodes: Dict[int, OSMNode] = {} |
| self.ways: Dict[int, OSMWay] = {} |
| self.relations: Dict[int, OSMRelation] = {} |
| self.box: BoundaryBox = None |
|
|
| @classmethod |
| def from_dict(cls, structure: Dict[str, Any]): |
| data = cls() |
| bounds = structure.get("bounds") |
| if bounds is not None: |
| data.box = BoundaryBox( |
| np.array([bounds["minlat"], bounds["minlon"]]), |
| np.array([bounds["maxlat"], bounds["maxlon"]]), |
| ) |
|
|
| for element in structure["elements"]: |
| if element["type"] == "node": |
| node = OSMNode.from_dict(element) |
| data.add_node(node) |
| for element in structure["elements"]: |
| if element["type"] == "way": |
| way = OSMWay.from_dict(element, data.nodes) |
| data.add_way(way) |
| for element in structure["elements"]: |
| if element["type"] == "relation": |
| relation = OSMRelation.from_dict(element) |
| data.add_relation(relation) |
|
|
| return data |
|
|
| @classmethod |
| def from_json(cls, path: Path): |
| with path.open(encoding='utf-8') as fid: |
| structure = json.load(fid) |
| return cls.from_dict(structure) |
|
|
| @classmethod |
| def from_xml(cls, path: Path): |
| root = etree.parse(str(path)).getroot() |
| structure = {"elements": []} |
| from tqdm import tqdm |
|
|
| for elem in tqdm(root): |
| if elem.tag == "bounds": |
| structure["bounds"] = { |
| k: float(elem.attrib[k]) |
| for k in ("minlon", "minlat", "maxlon", "maxlat") |
| } |
| elif elem.tag in {"node", "way", "relation"}: |
| if elem.tag == "node": |
| item = { |
| "id": int(elem.attrib["id"]), |
| "lat": float(elem.attrib["lat"]), |
| "lon": float(elem.attrib["lon"]), |
| "visible": elem.attrib.get("visible"), |
| "tags": { |
| x.attrib["k"]: x.attrib["v"] for x in elem if x.tag == "tag" |
| }, |
| } |
| elif elem.tag == "way": |
| item = { |
| "id": int(elem.attrib["id"]), |
| "visible": elem.attrib.get("visible"), |
| "tags": { |
| x.attrib["k"]: x.attrib["v"] for x in elem if x.tag == "tag" |
| }, |
| "nodes": [int(x.attrib["ref"]) for x in elem if x.tag == "nd"], |
| } |
| elif elem.tag == "relation": |
| item = { |
| "id": int(elem.attrib["id"]), |
| "visible": elem.attrib.get("visible"), |
| "tags": { |
| x.attrib["k"]: x.attrib["v"] for x in elem if x.tag == "tag" |
| }, |
| "members": [ |
| { |
| "type": x.attrib["type"], |
| "ref": int(x.attrib["ref"]), |
| "role": x.attrib["role"], |
| } |
| for x in elem |
| if x.tag == "member" |
| ], |
| } |
| item["type"] = elem.tag |
| structure["elements"].append(item) |
| elem.clear() |
| del root |
| return cls.from_dict(structure) |
|
|
| @classmethod |
| def from_file(cls, path: Path): |
| ext = path.suffix |
| if ext == ".json": |
| return cls.from_json(path) |
| elif ext in {".osm", ".xml"}: |
| return cls.from_xml(path) |
| else: |
| raise ValueError(f"Unknown extension for {path}") |
|
|
| def add_node(self, node: OSMNode): |
| """Add node and update map parameters.""" |
| if node.id_ in self.nodes: |
| raise ValueError(f"Node with duplicate id {node.id_}.") |
| self.nodes[node.id_] = node |
|
|
| def add_way(self, way: OSMWay): |
| """Add way and update map parameters.""" |
| if way.id_ in self.ways: |
| raise ValueError(f"Way with duplicate id {way.id_}.") |
| self.ways[way.id_] = way |
|
|
| def add_relation(self, relation: OSMRelation): |
| """Add relation and update map parameters.""" |
| if relation.id_ in self.relations: |
| raise ValueError(f"Relation with duplicate id {relation.id_}.") |
| self.relations[relation.id_] = relation |
|
|
| def add_xy_to_nodes(self, proj: Projection): |
| nodes = list(self.nodes.values()) |
| if len(nodes) == 0: |
| return |
| geos = np.stack([n.geo for n in nodes], 0) |
| if proj.bounds is not None: |
| |
| valid = proj.bounds.contains(geos) |
| if valid.mean() < 0.9: |
| print("Many nodes are out of the projection bounds.") |
| xys = np.zeros_like(geos) |
| xys[valid] = proj.project(geos[valid]) |
| else: |
| xys = proj.project(geos) |
| for xy, node in zip(xys, nodes): |
| node.xy = xy |
|
|