| import geopandas as gpd |
| import pandas as pd |
| import networkx as nx |
| import osmnx as ox |
| import numpy as np |
| import shapely |
| from pyproj import Transformer |
| from functools import partial |
|
|
| ox.settings.useful_tags_node=["highway", "ref", "barrier", "highway:ref", "name"] |
| ox.settings.useful_tags_way=["highway", "maxspeed", "name", "ref", "oneway", "toll", "barrier"] |
| ox.settings.use_cache=False |
|
|
| def custom_filter_order(order, start=0): |
| highway_order=["motorway", "trunk", "primary", "secondary", "tertiary", |
| "unclassified", "residential", "service", "pedestrian"] |
| return '["highway"~"'+'|'.join(highway_order[start:order])+'"]' |
|
|
| def osm_stations(dissolve=True): |
| osm_stations=gpd.read_file("export.geojson") |
| clean_col=osm_stations.columns[osm_stations.isna().mean()<0.9] |
| osm_clean=osm_stations[clean_col].drop(["barrier", "@id"], axis=1).dropna(subset="name") |
| osm_clean["autoroute"]=osm_clean["highway:ref"].str.replace(" ", "") |
| osm_clean["nref"]=osm_clean["operator:ref"] |
| badguys="Péage des |Péage de |Péage d'|Péage-de-|Péage du |Péage-du-|Péage " |
| osm_clean["name"]=osm_clean["name"].str.replace(badguys, "", regex=True) |
| osm_clean["osmid"]=osm_clean["id"].str.split("/").str[1].astype(int) |
| if dissolve: |
| osm_clean=osm_clean.dissolve(by='name', aggfunc='first') |
| osm_clean['geometry'] = osm_clean.geometry.to_crs("2154").centroid |
| osm_clean['latlon'] = osm_clean.geometry.to_crs("WGS84") |
| osm_clean=osm_clean.drop(columns=["id", "highway:ref", "operator:ref"]) |
| return osm_clean |
|
|
|
|
| def rebuild_highway(Gsub, inter, weight="travel_time"): |
| """ |
| On s'inspire de osmnx.simplification::simplify_graph (l. 275) |
| On prend un graph correspondant à un composant connecté. |
| On calcule les distances minimales deux à deux, on ajoute les tarifs. |
| Pour chaque paire : |
| - on récupère le chemin. |
| - on fusionne les géométries des edges, on ajoute le temps, le tarif, début et fin |
| - on met les edges dans un set |
| - on met les noeuds non-toll_booth dans un set |
| On supprime edges et nodes. |
| On ajoute les edges recalculés. |
| """ |
|
|
| dftarifs=pd.read_csv("tarifs2025.csv") |
| dosm=osm_stations(dissolve=False).set_index("osmid").name.to_dict() |
| |
| nodes_to_remove = set() |
| len_path = dict(nx.all_pairs_dijkstra(Gsub, weight=weight)) |
| ltup=[] |
| |
| for k, v in len_path.items(): |
| if k not in inter: |
| continue |
| for k1, v1 in v[0].items(): |
| if k1 not in inter: |
| continue |
| ent, sor = dosm[k], dosm[k1] |
| if ent == sor: |
| continue |
| tarif = dftarifs.query(f'E=="{ent}" and S=="{sor}"').Tarif.to_list() |
| if len(tarif)!=1: |
| continue |
| route=v[1][k1] |
| edges=list(Gsub.edges[edge] for edge in nx.utils.pairwise(route)) |
| nodes_to_remove.update(route) |
| |
| length_sum=np.sum([edge["length"] for edge in edges]) |
| time_sum=np.sum([edge["travel_time"] for edge in edges]) |
| geometry_sum=shapely.ops.linemerge([edge["geometry"] for edge in edges]) |
| |
| |
| dic={"key": 0, "length" : length_sum, "travel_time": time_sum, "route": route, |
| "tarif": tarif[0], "geometry": geometry_sum} |
| ltup.append((k, k1, 0, dic)) |
|
|
| return ltup, nodes_to_remove.difference(inter) |
|
|
|
|
| def rebuild_highways(Ga, toll_nodes): |
| ltup=[] |
| nodes_to_remove=set() |
| for wcc in nx.weakly_connected_components(Ga): |
| inter=wcc.intersection(toll_nodes) |
| if len(inter) > 1 : |
| Gsub=Ga.subgraph(wcc).copy() |
| lt, to_remove=rebuild_highway(Gsub, inter) |
| ltup.extend(lt) |
| nodes_to_remove.update(to_remove) |
| return ltup, nodes_to_remove |
|
|
| def lamb93(): |
| return "EPSG:2154" |
|
|
| def to_Lambert93(): |
| transformer = Transformer.from_crs("WGS84", lamb93()) |
| return transformer |
|
|
| def from_Lambert93(): |
| transformer = Transformer.from_crs(lamb93(), "WGS84") |
| return transformer |
|
|
| def get_gdf_ellipse(orig_coo, dest_coo): |
| orig_lamb=np.array(to_Lambert93().transform(*orig_coo)) |
| dest_lamb=np.array(to_Lambert93().transform(*dest_coo)) |
| C=(orig_lamb+dest_lamb)/2 |
| D=orig_lamb-dest_lamb |
| a=1.15 * np.linalg.norm(D) /2 |
| c = 0.4 * np.linalg.norm(D) /2 |
| theta=np.arctan2(D[1], D[0]) |
| circ = shapely.geometry.Point(C).buffer(1) |
| ell = shapely.affinity.scale(circ, a, c) |
| ellr = shapely.affinity.rotate(ell, theta, use_radians=True) |
|
|
| return gpd.GeoSeries(ellr, crs= lamb93()) |
|
|
| def get_shapely_ellipse(orig_coo, dest_coo): |
| return get_gdf_ellipse(orig_coo, dest_coo).to_crs("WGS84").geometry[0] |
|
|
|
|
| def tariftime(u, v, d, l): |
| d=d[0] |
| return d["travel_time"] if "tarif" not in d else d["travel_time"] + l*d["tarif"] |
|
|
|
|
| def tariftimedf(Gc, orig_id, dest_id, weight="travel_time"): |
| ltup=[] |
|
|
| for l in np.arange(0, 250, 5): |
| tarif= partial(tariftime, l=l) |
| fastest=nx.shortest_path(Gc, orig_id, dest_id, weight=tarif) |
| gfast=ox.routing.route_to_gdf(Gc, fastest, weight= weight) |
| prix=float(gfast["tarif"].sum()) if "tarif" in gfast else 0 |
| ltup.append((prix, float(gfast["travel_time"].sum()), fastest )) |
| if prix==0: break |
|
|
| df=pd.DataFrame(ltup, columns=["tarif", "time", "path"]).drop_duplicates(subset=["tarif", "time"]).reset_index() |
| df["time (mn)"]=df["time"]/60 |
| return df |
|
|
|
|
| def download_graph(orig_coo, dest_coo, precision): |
| buf=get_shapely_ellipse(orig_coo , dest_coo) |
| cf=custom_filter_order(precision) |
| G=ox.graph.graph_from_polygon(buf, network_type='drive', custom_filter=cf, simplify=False) |
| G=ox.simplify_graph(G, node_attrs_include=["barrier"]) |
| G = ox.add_edge_speeds(G) |
| G = ox.add_edge_travel_times(G) |
| G = ox.project_graph(G, to_crs=lamb93()) |
| return G |
| |
| def add_tarifs(G): |
| gdfn, gdfe=ox.graph_to_gdfs(G) |
| if "toll" not in gdfe.columns or "barrier" not in gdfn.columns: |
| return G |
| gae=gdfe[((gdfe.highway=="motorway") & (gdfe.toll == "yes")) | (gdfe.highway=="motorway_link") ] |
| gbn=gdfn.query("barrier=='toll_booth'") |
| u, v, k = zip(*gae.index) |
| uv = set(u).union(v) |
| gan=gdfn[gdfn.index.isin(uv)] |
| Ga=ox.convert.to_digraph(ox.convert.graph_from_gdfs(gan, gae)) |
| toll_nodes=gbn.index.to_list() |
| ltup, nodes_to_remove = rebuild_highways(Ga, toll_nodes) |
| Gc=G.copy() |
| Gc.remove_nodes_from(nodes_to_remove) |
| Gc.add_edges_from(ltup) |
| return Gc |
| |
| def readable_place(row): |
| name= "\n".join(row["name"]) if isinstance(row["name"], list) else row["name"] |
| ref= row["ref"][0] if isinstance(row["ref"], list) else row["ref"] |
| if pd.isna(name): |
| return str(row["ref"]) |
| return name if pd.isna(ref) else f'{row["ref"]} {name}' |
|
|
| def readable_dist(dm): |
| if dm<1000: |
| return f"{dm:.0f}m" |
| elif dm<10000: |
| return f"{dm/1000:.1f}km" |
| else: |
| return f"{dm/1000:.0f}km" |
|
|
| def readable_time(ts): |
| if ts<60: |
| return f"{ts:.0f}s" |
| elif ts<3600: |
| return f"{ts//60:.0f}min {ts%60:.0f}s" |
| else: |
| return f"{ts//3600:.0f}h {(ts//60)%60:.0f}min" |
|
|
| def readable_tarif(row): |
| if "tarif" not in row or row["tarif"]==0: |
| return "" |
| else: |
| return f', {row["tarif"]}€' |
| |
| def readable_agg(row): |
| return f'{readable_time(row["travel_time"])}, {readable_dist(row["length"])}{readable_tarif(row)}' |
|
|
| def clean_gfast(gfast): |
| gfast["poids"]=gfast.apply(readable_agg, axis=1) |
| gfast["legend"] = readable_agg(gfast[list(set(gfast.columns) & {"travel_time", "tarif", "length"})].sum()) |
| gfast["rue"] = gfast.apply(readable_place, axis=1) |
| return gfast |
|
|
|
|