LapStore commited on
Commit
ee3c704
·
1 Parent(s): 3b8379e

refined design

Browse files
Files changed (7) hide show
  1. Utils.py +7 -0
  2. app.py +6 -156
  3. database_manager.py +30 -0
  4. keys.env +13 -0
  5. mqtt_manager.py +98 -0
  6. requirements.txt +1 -0
  7. traffic_utils.py +52 -0
Utils.py ADDED
@@ -0,0 +1,7 @@
 
 
 
 
 
 
 
 
1
+ from typing import List, Union
2
+ from pydantic import BaseModel
3
+
4
+ class CoordinatesPayload(BaseModel):
5
+ coords: Union[str, List[List[float]]]
6
+ state: str
7
+ time: int
app.py CHANGED
@@ -1,70 +1,22 @@
1
  from fastapi import FastAPI, HTTPException,BackgroundTasks
2
- from typing import List, Union
3
- from pydantic import BaseModel
4
  import requests
5
  import ast
6
  import pymysql
7
  import time
8
-
 
 
 
 
9
 
10
  # -------------------------
11
  # App Initialization
12
  # -------------------------
13
  app = FastAPI()
14
 
15
- # -------------------------
16
- # Database Setup (Pure PyMySQL)
17
- # -------------------------
18
- timeout = 20
19
-
20
- def get_db():
21
- connection = pymysql.connect(
22
- charset="utf8mb4",
23
- connect_timeout=timeout,
24
- cursorclass=pymysql.cursors.DictCursor,
25
- db="trafficManagerSignals",
26
- host="mysql-19285eb2-tahaelshrif1-7999.h.aivencloud.com",
27
- password="AVNS_aT0RGFafs6_34WFegSF",
28
- read_timeout=timeout,
29
- port=11520,
30
- user="avnadmin",
31
- write_timeout=timeout,
32
- ssl={"ssl": True},
33
- )
34
- return connection
35
-
36
- # -------------------------
37
- # Utilities
38
- # -------------------------
39
- def get_rectangle_container(coordinates):
40
- lons = [i for i, _ in coordinates]
41
- lats = [i for _, i in coordinates]
42
- min_lat, max_lat = min(lats), max(lats)
43
- min_lon, max_lon = min(lons), max(lons)
44
- return min_lat, max_lat, min_lon, max_lon
45
-
46
- def get_traffic_in_container(coordinates):
47
- (min_lat, max_lat, min_lon, max_lon) = get_rectangle_container(coordinates)
48
- overpass_url = "http://overpass-api.de/api/interpreter"
49
- query = f"""
50
- [out:json];
51
- node["highway"="traffic_signals"]({min_lat},{min_lon},{max_lat},{max_lon});
52
- out body;
53
- """
54
- response = requests.get(overpass_url, params={'data': query})
55
- data = response.json()
56
- return [(el['lat'], el['lon']) for el in data['elements']]
57
 
58
 
59
- singlas_state = {}
60
-
61
- def open_signal(tl_id,state,time_):
62
- singlas_state[tl_id] = state
63
- time.sleep(time_)
64
- singlas_state.pop(tl_id)
65
-
66
-
67
- # -------------------------
68
  # Routes
69
  # -------------------------
70
  @app.get("/")
@@ -72,88 +24,6 @@ def read_root():
72
  return {"message": "Hello from FastAPI on Hugging Face Spaces!"}
73
 
74
 
75
- @app.post("/get_State")
76
- def get_State(tl_id):
77
- if tl_id in singlas_state:
78
- return singlas_state[tl_id]
79
- else:
80
- return "FREE"
81
-
82
-
83
- import paho.mqtt.client as mqtt
84
- import time
85
- import threading
86
-
87
- mqtt_broker = "broker.hivemq.com" # أو بروكر محلي لو عاوز
88
- mqtt_port = 1883
89
-
90
- client = mqtt.Client()
91
- client.connect(mqtt_broker, mqtt_port)
92
-
93
- def mqtt_publisher_loop(tl_id):
94
- import paho.mqtt.client as mqtt
95
- import time
96
-
97
- mqtt_broker = "broker.hivemq.com"
98
- mqtt_port = 1883
99
-
100
- client = mqtt.Client()
101
- client.connect(mqtt_broker, mqtt_port)
102
- client.loop_start()
103
-
104
- last_state = None
105
- while True:
106
- current_state = get_State(tl_id)
107
- if current_state != last_state:
108
- topic = f"state/{tl_id}"
109
- client.publish(topic, current_state)
110
- print(f"📡 Published {current_state} to {topic}")
111
- last_state = current_state
112
- time.sleep(1)
113
-
114
- @app.post("/getstt")
115
- def start_mqtt_publishers(tl_id:str):
116
- threading.Thread(target=mqtt_publisher_loop, args=(tl_id,), daemon=True).start()
117
-
118
- '''
119
- from fastapi import WebSocket, WebSocketDisconnect
120
- import asyncio
121
-
122
- @app.websocket("/ws/state")
123
- async def websocket_get_state(websocket: WebSocket):
124
- await websocket.accept()
125
- try:
126
- tl_id = await websocket.receive_text()
127
- print(f"✅ Client subscribed to {tl_id}")
128
-
129
- last_state = None
130
- while True:
131
- current_state = get_State(tl_id)
132
- if current_state != last_state:
133
- await websocket.send_text(current_state)
134
- last_state = current_state
135
- await asyncio.sleep(1) # كل ثانية يشيك التغيير
136
- except WebSocketDisconnect:
137
- print("❌ Client disconnected")
138
-
139
-
140
- @app.get("/test-db")
141
- def test_db():
142
- try:
143
- connection = get_db()
144
- with connection.cursor() as cursor:
145
- cursor.execute("SELECT 1")
146
- result = cursor.fetchone()
147
- connection.close()
148
- return {"status": "connected", "result": result}
149
- except Exception as e:
150
- return {"status": "error", "detail": str(e)}
151
-
152
- '''
153
- class CoordinatesPayload(BaseModel):
154
- coords: Union[str, List[List[float]]]
155
- state: str
156
- time: int
157
 
158
 
159
  @app.post("/send-coordinates")
@@ -176,23 +46,3 @@ def receive_coordinates(payload: CoordinatesPayload,background_tasks: Background
176
  else:
177
  return {"error": "No traffic signals found in database"+str(traffic_lights)}
178
 
179
- # -------------------------
180
- # Signal Database Check
181
- # -------------------------
182
- def check_signals(coords):
183
- connection = get_db()
184
- found_signals = []
185
- try:
186
- with connection.cursor() as cursor:
187
- for lat, lon in coords:
188
- cursor.execute("""
189
- SELECT tl_id_sumo FROM traffic_signals
190
- WHERE ABS(lat - %s) < 0.0001 AND ABS(lon - %s) < 0.0001
191
- """, (lon,lat))
192
- result = cursor.fetchone()
193
- if result:
194
- found_signals.append(result['tl_id_sumo'])
195
- finally:
196
- connection.close()
197
-
198
- return found_signals
 
1
  from fastapi import FastAPI, HTTPException,BackgroundTasks
 
 
2
  import requests
3
  import ast
4
  import pymysql
5
  import time
6
+ from database_manager import *
7
+ from mqtt_manager import singlas_state
8
+ from mqtt_manager import *
9
+ from traffic_utils import *
10
+ from Utils import *
11
 
12
  # -------------------------
13
  # App Initialization
14
  # -------------------------
15
  app = FastAPI()
16
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
17
 
18
 
19
+ #--------------------------
 
 
 
 
 
 
 
 
20
  # Routes
21
  # -------------------------
22
  @app.get("/")
 
24
  return {"message": "Hello from FastAPI on Hugging Face Spaces!"}
25
 
26
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
 
28
 
29
  @app.post("/send-coordinates")
 
46
  else:
47
  return {"error": "No traffic signals found in database"+str(traffic_lights)}
48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
database_manager.py ADDED
@@ -0,0 +1,30 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pymysql
2
+ from dotenv import load_dotenv
3
+ import os
4
+
5
+ load_dotenv(dotenv_path="keys.env")
6
+
7
+ db_name = os.getenv("DATABASE_NAME")
8
+ db_user = os.getenv("User")
9
+ mysql_host = os.getenv("MYSQL_HOST")
10
+ mysql_password = os.getenv("MYSQL_PASSWORD")
11
+ db_port = int(os.getenv("MYSQL_PORT"))
12
+
13
+
14
+ timeout = 20
15
+
16
+ def get_db():
17
+ connection = pymysql.connect(
18
+ charset="utf8mb4",
19
+ connect_timeout=timeout,
20
+ cursorclass=pymysql.cursors.DictCursor,
21
+ db=db_name,
22
+ host=mysql_host,
23
+ password=mysql_password,
24
+ read_timeout=timeout,
25
+ port=db_port,
26
+ user=db_user,
27
+ write_timeout=timeout,
28
+ ssl={"ssl": True},
29
+ )
30
+ return connection
keys.env ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ MQTT_BROKER="broker.hivemq.com"
2
+ MQTT_PORT=1883
3
+
4
+ MQTT_PARTOPIC_state="state"
5
+ MQTT_SUBTOPIC_reply="reply"
6
+ MQTT_SUBTOPIC_msg="msg"
7
+
8
+ # database info
9
+ MYSQL_HOST="mysql-19285eb2-tahaelshrif1-7999.h.aivencloud.com"
10
+ MYSQL_PORT=11520
11
+ MYSQL_USER="avnadmin"
12
+ MYSQL_PASSWORD="AVNS_aT0RGFafs6_34WFegSF"
13
+ DATABASE_NAME="trafficManagerSignals"
mqtt_manager.py ADDED
@@ -0,0 +1,98 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ import paho.mqtt.client as mqtt
3
+ import time
4
+ import threading
5
+ from dotenv import load_dotenv
6
+ import os
7
+ from traffic_utils import status_acc,status_emr,status_free
8
+ import paho.mqtt.client as mqtt
9
+ import time
10
+
11
+ load_dotenv(dotenv_path="keys.env")
12
+
13
+ mqtt_broker = os.getenv("MQTT_BROKER")
14
+ mqtt_port = int(os.getenv("MQTT_PORT"))
15
+ MQTT_PARTOPIC_state = os.getenv("MQTT_PARTOPIC_state")
16
+ MQTT_SUBTOPIC_reply = os.getenv("MQTT_SUBTOPIC_reply")
17
+ MQTT_SUBTOPIC_msg = os.getenv("MQTT_SUBTOPIC_msg")
18
+ topic_rep = lambda x: MQTT_PARTOPIC_state+"/"+x +"/"+ MQTT_SUBTOPIC_reply
19
+ topic_msg = lambda x:MQTT_PARTOPIC_state+"/"+x +"/"+ MQTT_SUBTOPIC_msg
20
+
21
+
22
+ singlas_state = {
23
+ "1698478721" :status_free,
24
+ "6082411793":status_free}
25
+
26
+ client = mqtt.Client()
27
+
28
+ def intialize_mqtt_brokers():
29
+ client.connect(mqtt_broker, mqtt_port)
30
+ for tl_id in singlas_state.keys():
31
+ client.subscribe(topic_rep(tl_id))
32
+
33
+ def start_mqtt_publishers():
34
+ for tl_id in singlas_state.keys():
35
+ threading.Thread(target=mqtt_publisher_loop, args=(tl_id,), daemon=True).start()
36
+
37
+
38
+ def get_State(tl_id):
39
+ if tl_id in singlas_state:
40
+ return singlas_state[tl_id]
41
+ else:
42
+ return "FREE"
43
+
44
+
45
+ def mqtt_publisher_loop(tl_id):
46
+ #client = mqtt.Client()
47
+ #client.connect(mqtt_broker, mqtt_port)
48
+ client.loop_start()
49
+
50
+ last_state = None
51
+ while True:
52
+ current_state = get_State(tl_id)
53
+ if current_state != last_state:
54
+ topic = topic_msg(tl_id)
55
+ client.publish(topic, current_state)
56
+ print(f"📡 Published {current_state} to {topic}")
57
+ last_state = current_state
58
+ time.sleep(1)
59
+
60
+ intialize_mqtt_brokers()
61
+
62
+
63
+
64
+ '''
65
+ from fastapi import WebSocket, WebSocketDisconnect
66
+ import asyncio
67
+
68
+ @app.websocket("/ws/state")
69
+ async def websocket_get_state(websocket: WebSocket):
70
+ await websocket.accept()
71
+ try:
72
+ tl_id = await websocket.receive_text()
73
+ print(f"✅ Client subscribed to {tl_id}")
74
+
75
+ last_state = None
76
+ while True:
77
+ current_state = get_State(tl_id)
78
+ if current_state != last_state:
79
+ await websocket.send_text(current_state)
80
+ last_state = current_state
81
+ await asyncio.sleep(1) # كل ثانية يشيك التغيير
82
+ except WebSocketDisconnect:
83
+ print("❌ Client disconnected")
84
+
85
+
86
+ @app.get("/test-db")
87
+ def test_db():
88
+ try:
89
+ connection = get_db()
90
+ with connection.cursor() as cursor:
91
+ cursor.execute("SELECT 1")
92
+ result = cursor.fetchone()
93
+ connection.close()
94
+ return {"status": "connected", "result": result}
95
+ except Exception as e:
96
+ return {"status": "error", "detail": str(e)}
97
+
98
+ '''
requirements.txt CHANGED
@@ -4,3 +4,4 @@ requests
4
  pymysql
5
  sqlalchemy
6
  paho-mqtt
 
 
4
  pymysql
5
  sqlalchemy
6
  paho-mqtt
7
+ python-dotenv
traffic_utils.py ADDED
@@ -0,0 +1,52 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import requests
2
+ from database_manager import *
3
+ import time
4
+ from mqtt_manager import singlas_state
5
+
6
+ status_free = "FREE"
7
+ status_emr = "EMR"
8
+ status_acc = "ACC"
9
+
10
+ def get_traffic_in_container(coordinates):
11
+ (min_lat, max_lat, min_lon, max_lon) = get_rectangle_container(coordinates)
12
+ overpass_url = "http://overpass-api.de/api/interpreter"
13
+ query = f"""
14
+ [out:json];
15
+ node["highway"="traffic_signals"]({min_lat},{min_lon},{max_lat},{max_lon});
16
+ out body;
17
+ """
18
+ response = requests.get(overpass_url, params={'data': query})
19
+ data = response.json()
20
+ return [(el['lat'], el['lon']) for el in data['elements']]
21
+
22
+
23
+ def get_rectangle_container(coordinates):
24
+ lons = [i for i, _ in coordinates]
25
+ lats = [i for _, i in coordinates]
26
+ min_lat, max_lat = min(lats), max(lats)
27
+ min_lon, max_lon = min(lons), max(lons)
28
+ return min_lat, max_lat, min_lon, max_lon
29
+
30
+ def open_signal(tl_id,state,time_):
31
+ singlas_state[tl_id] = state
32
+ time.sleep(time_)
33
+ singlas_state[tl_id] = status_free
34
+
35
+
36
+ def check_signals(coords):
37
+ connection = get_db()
38
+ found_signals = []
39
+ try:
40
+ with connection.cursor() as cursor:
41
+ for lat, lon in coords:
42
+ cursor.execute("""
43
+ SELECT tl_id_sumo FROM traffic_signals
44
+ WHERE ABS(lat - %s) < 0.0001 AND ABS(lon - %s) < 0.0001
45
+ """, (lon,lat))
46
+ result = cursor.fetchone()
47
+ if result:
48
+ found_signals.append(result['tl_id_sumo'])
49
+ finally:
50
+ connection.close()
51
+
52
+ return found_signals