Update app.py
Browse files
app.py
CHANGED
|
@@ -7,6 +7,7 @@ import os
|
|
| 7 |
import secrets
|
| 8 |
import threading
|
| 9 |
import traceback
|
|
|
|
| 10 |
from collections import deque
|
| 11 |
from datetime import datetime
|
| 12 |
from pathlib import Path
|
|
@@ -237,14 +238,21 @@ def _ensure_data_layout():
|
|
| 237 |
global db_initialized
|
| 238 |
if db_initialized:
|
| 239 |
return
|
|
|
|
|
|
|
| 240 |
with db_init_lock:
|
| 241 |
if db_initialized:
|
|
|
|
| 242 |
return
|
|
|
|
| 243 |
_init_db_schema()
|
| 244 |
db_initialized = True
|
|
|
|
| 245 |
|
| 246 |
try:
|
|
|
|
| 247 |
_migrate_legacy_file_data_if_needed()
|
|
|
|
| 248 |
except Exception as exc:
|
| 249 |
logger.warning("Legacy data migration skipped due to error: %s", exc)
|
| 250 |
|
|
@@ -363,7 +371,9 @@ def _build_mysql_conn_kwargs():
|
|
| 363 |
"database": db_name,
|
| 364 |
"charset": "utf8mb4",
|
| 365 |
"autocommit": True,
|
| 366 |
-
"connect_timeout": int(os.getenv("MYSQL_CONNECT_TIMEOUT", "
|
|
|
|
|
|
|
| 367 |
"cursorclass": pymysql.cursors.DictCursor,
|
| 368 |
}
|
| 369 |
|
|
@@ -377,11 +387,43 @@ def _build_mysql_conn_kwargs():
|
|
| 377 |
|
| 378 |
|
| 379 |
def _db_connect():
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 380 |
try:
|
| 381 |
-
conn = pymysql.connect(**
|
| 382 |
except Exception as exc:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 383 |
_update_db_status(False, exc)
|
| 384 |
raise
|
|
|
|
|
|
|
|
|
|
| 385 |
_update_db_status(True)
|
| 386 |
return conn
|
| 387 |
|
|
@@ -417,7 +459,7 @@ def _db_execute(query: str, params=()):
|
|
| 417 |
|
| 418 |
|
| 419 |
def _init_db_schema():
|
| 420 |
-
|
| 421 |
_db_execute(
|
| 422 |
f"""
|
| 423 |
CREATE TABLE IF NOT EXISTS `{USERS_TABLE}` (
|
|
@@ -434,6 +476,7 @@ def _init_db_schema():
|
|
| 434 |
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
|
| 435 |
"""
|
| 436 |
)
|
|
|
|
| 437 |
|
| 438 |
|
| 439 |
def _legacy_load_json(path: Path, default):
|
|
@@ -444,11 +487,15 @@ def _legacy_load_json(path: Path, default):
|
|
| 444 |
|
| 445 |
|
| 446 |
def _migrate_legacy_file_data_if_needed():
|
|
|
|
| 447 |
if not LEGACY_USERS_META_PATH.exists():
|
|
|
|
| 448 |
return
|
| 449 |
|
| 450 |
row = _db_query_one(f"SELECT COUNT(*) AS cnt FROM `{USERS_TABLE}`")
|
| 451 |
-
|
|
|
|
|
|
|
| 452 |
return
|
| 453 |
|
| 454 |
try:
|
|
@@ -459,8 +506,10 @@ def _migrate_legacy_file_data_if_needed():
|
|
| 459 |
|
| 460 |
users = raw.get("users", []) if isinstance(raw, dict) else []
|
| 461 |
if not users:
|
|
|
|
| 462 |
return
|
| 463 |
|
|
|
|
| 464 |
migrated = 0
|
| 465 |
for item in users:
|
| 466 |
username = str(item.get("username", "")).strip()
|
|
@@ -506,11 +555,13 @@ def _migrate_legacy_file_data_if_needed():
|
|
| 506 |
except Exception as exc:
|
| 507 |
logger.warning("迁移用户失败。user=%s error=%s", username, exc)
|
| 508 |
|
|
|
|
| 509 |
if migrated > 0:
|
| 510 |
logger.info("已完成旧版文件数据迁移,共迁移 %s 个用户。", migrated)
|
| 511 |
|
| 512 |
|
| 513 |
def _load_users_meta():
|
|
|
|
| 514 |
_ensure_data_layout()
|
| 515 |
rows = _db_query_all(
|
| 516 |
f"""
|
|
@@ -519,6 +570,7 @@ def _load_users_meta():
|
|
| 519 |
ORDER BY username ASC
|
| 520 |
"""
|
| 521 |
)
|
|
|
|
| 522 |
return {str(row["username"]): row for row in rows}
|
| 523 |
|
| 524 |
|
|
@@ -785,15 +837,20 @@ async def _run_user_tasks(username: str):
|
|
| 785 |
def _sync_user_jobs_from_meta(users_map: dict[str, Any], run_startup_tasks: bool = False):
|
| 786 |
global scheduler_bootstrapped
|
| 787 |
|
|
|
|
| 788 |
for username in users_map.keys():
|
|
|
|
| 789 |
_schedule_user_job(username)
|
| 790 |
if run_startup_tasks:
|
| 791 |
cfg = _load_user_config(username)
|
| 792 |
run_on_startup = bool(cfg.get("scheduler", {}).get("runOnStartup", False))
|
|
|
|
| 793 |
if run_on_startup:
|
|
|
|
| 794 |
_start_background_run(username, "startup")
|
| 795 |
|
| 796 |
scheduler_bootstrapped = True
|
|
|
|
| 797 |
|
| 798 |
|
| 799 |
def _start_scheduler_bootstrap(run_startup_tasks: bool):
|
|
@@ -801,6 +858,7 @@ def _start_scheduler_bootstrap(run_startup_tasks: bool):
|
|
| 801 |
|
| 802 |
with scheduler_bootstrap_lock:
|
| 803 |
if scheduler_bootstrap_running:
|
|
|
|
| 804 |
return False
|
| 805 |
scheduler_bootstrap_running = True
|
| 806 |
|
|
@@ -808,9 +866,19 @@ def _start_scheduler_bootstrap(run_startup_tasks: bool):
|
|
| 808 |
global scheduler_bootstrapped, scheduler_bootstrap_running
|
| 809 |
try:
|
| 810 |
logger.info("Scheduler bootstrap started. run_startup_tasks=%s", run_startup_tasks)
|
|
|
|
|
|
|
| 811 |
_ensure_data_layout()
|
|
|
|
|
|
|
|
|
|
| 812 |
users_map = _load_users_meta()
|
|
|
|
|
|
|
|
|
|
| 813 |
_sync_user_jobs_from_meta(users_map, run_startup_tasks=run_startup_tasks)
|
|
|
|
|
|
|
| 814 |
logger.info("Scheduler bootstrap completed. users=%s", len(users_map))
|
| 815 |
except Exception as exc:
|
| 816 |
scheduler_bootstrapped = False
|
|
|
|
| 7 |
import secrets
|
| 8 |
import threading
|
| 9 |
import traceback
|
| 10 |
+
import time
|
| 11 |
from collections import deque
|
| 12 |
from datetime import datetime
|
| 13 |
from pathlib import Path
|
|
|
|
| 238 |
global db_initialized
|
| 239 |
if db_initialized:
|
| 240 |
return
|
| 241 |
+
|
| 242 |
+
logger.info("DB layout ensure begin.")
|
| 243 |
with db_init_lock:
|
| 244 |
if db_initialized:
|
| 245 |
+
logger.info("DB layout already initialized by another worker.")
|
| 246 |
return
|
| 247 |
+
logger.info("DB schema initialization begin.")
|
| 248 |
_init_db_schema()
|
| 249 |
db_initialized = True
|
| 250 |
+
logger.info("DB schema initialization complete.")
|
| 251 |
|
| 252 |
try:
|
| 253 |
+
logger.info("Legacy migration stage begin.")
|
| 254 |
_migrate_legacy_file_data_if_needed()
|
| 255 |
+
logger.info("Legacy migration stage complete.")
|
| 256 |
except Exception as exc:
|
| 257 |
logger.warning("Legacy data migration skipped due to error: %s", exc)
|
| 258 |
|
|
|
|
| 371 |
"database": db_name,
|
| 372 |
"charset": "utf8mb4",
|
| 373 |
"autocommit": True,
|
| 374 |
+
"connect_timeout": int(os.getenv("MYSQL_CONNECT_TIMEOUT", "4")),
|
| 375 |
+
"read_timeout": int(os.getenv("MYSQL_READ_TIMEOUT", "8")),
|
| 376 |
+
"write_timeout": int(os.getenv("MYSQL_WRITE_TIMEOUT", "8")),
|
| 377 |
"cursorclass": pymysql.cursors.DictCursor,
|
| 378 |
}
|
| 379 |
|
|
|
|
| 387 |
|
| 388 |
|
| 389 |
def _db_connect():
|
| 390 |
+
kwargs = _build_mysql_conn_kwargs()
|
| 391 |
+
host = kwargs.get("host")
|
| 392 |
+
port = kwargs.get("port")
|
| 393 |
+
database = kwargs.get("database")
|
| 394 |
+
connect_timeout = kwargs.get("connect_timeout")
|
| 395 |
+
read_timeout = kwargs.get("read_timeout")
|
| 396 |
+
write_timeout = kwargs.get("write_timeout")
|
| 397 |
+
has_ssl = bool(kwargs.get("ssl"))
|
| 398 |
+
started_at = time.perf_counter()
|
| 399 |
+
|
| 400 |
+
logger.info(
|
| 401 |
+
"MySQL connect begin. host=%s port=%s db=%s connect_timeout=%ss read_timeout=%ss write_timeout=%ss ssl=%s",
|
| 402 |
+
host,
|
| 403 |
+
port,
|
| 404 |
+
database,
|
| 405 |
+
connect_timeout,
|
| 406 |
+
read_timeout,
|
| 407 |
+
write_timeout,
|
| 408 |
+
has_ssl,
|
| 409 |
+
)
|
| 410 |
try:
|
| 411 |
+
conn = pymysql.connect(**kwargs)
|
| 412 |
except Exception as exc:
|
| 413 |
+
elapsed = time.perf_counter() - started_at
|
| 414 |
+
logger.warning(
|
| 415 |
+
"MySQL connect failed after %.2fs. host=%s port=%s db=%s error=%s",
|
| 416 |
+
elapsed,
|
| 417 |
+
host,
|
| 418 |
+
port,
|
| 419 |
+
database,
|
| 420 |
+
exc,
|
| 421 |
+
)
|
| 422 |
_update_db_status(False, exc)
|
| 423 |
raise
|
| 424 |
+
|
| 425 |
+
elapsed = time.perf_counter() - started_at
|
| 426 |
+
logger.info("MySQL connect success. host=%s port=%s db=%s elapsed=%.2fs", host, port, database, elapsed)
|
| 427 |
_update_db_status(True)
|
| 428 |
return conn
|
| 429 |
|
|
|
|
| 459 |
|
| 460 |
|
| 461 |
def _init_db_schema():
|
| 462 |
+
logger.info("DB schema creation SQL begin.")
|
| 463 |
_db_execute(
|
| 464 |
f"""
|
| 465 |
CREATE TABLE IF NOT EXISTS `{USERS_TABLE}` (
|
|
|
|
| 476 |
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
|
| 477 |
"""
|
| 478 |
)
|
| 479 |
+
logger.info("DB schema creation SQL complete.")
|
| 480 |
|
| 481 |
|
| 482 |
def _legacy_load_json(path: Path, default):
|
|
|
|
| 487 |
|
| 488 |
|
| 489 |
def _migrate_legacy_file_data_if_needed():
|
| 490 |
+
logger.info("Legacy migration check. path=%s", LEGACY_USERS_META_PATH)
|
| 491 |
if not LEGACY_USERS_META_PATH.exists():
|
| 492 |
+
logger.info("Legacy migration skipped: users.json not found.")
|
| 493 |
return
|
| 494 |
|
| 495 |
row = _db_query_one(f"SELECT COUNT(*) AS cnt FROM `{USERS_TABLE}`")
|
| 496 |
+
existing_count = int(row.get("cnt", 0)) if row else 0
|
| 497 |
+
if existing_count > 0:
|
| 498 |
+
logger.info("Legacy migration skipped: database already has %s users.", existing_count)
|
| 499 |
return
|
| 500 |
|
| 501 |
try:
|
|
|
|
| 506 |
|
| 507 |
users = raw.get("users", []) if isinstance(raw, dict) else []
|
| 508 |
if not users:
|
| 509 |
+
logger.info("Legacy migration skipped: legacy users list is empty.")
|
| 510 |
return
|
| 511 |
|
| 512 |
+
logger.info("Legacy migration loaded %s legacy users.", len(users))
|
| 513 |
migrated = 0
|
| 514 |
for item in users:
|
| 515 |
username = str(item.get("username", "")).strip()
|
|
|
|
| 555 |
except Exception as exc:
|
| 556 |
logger.warning("迁移用户失败。user=%s error=%s", username, exc)
|
| 557 |
|
| 558 |
+
logger.info("Legacy migration finished. migrated=%s total=%s", migrated, len(users))
|
| 559 |
if migrated > 0:
|
| 560 |
logger.info("已完成旧版文件数据迁移,共迁移 %s 个用户。", migrated)
|
| 561 |
|
| 562 |
|
| 563 |
def _load_users_meta():
|
| 564 |
+
logger.info("Load users meta begin.")
|
| 565 |
_ensure_data_layout()
|
| 566 |
rows = _db_query_all(
|
| 567 |
f"""
|
|
|
|
| 570 |
ORDER BY username ASC
|
| 571 |
"""
|
| 572 |
)
|
| 573 |
+
logger.info("Load users meta complete. count=%s", len(rows))
|
| 574 |
return {str(row["username"]): row for row in rows}
|
| 575 |
|
| 576 |
|
|
|
|
| 837 |
def _sync_user_jobs_from_meta(users_map: dict[str, Any], run_startup_tasks: bool = False):
|
| 838 |
global scheduler_bootstrapped
|
| 839 |
|
| 840 |
+
logger.info("Sync user jobs begin. count=%s run_startup_tasks=%s", len(users_map), run_startup_tasks)
|
| 841 |
for username in users_map.keys():
|
| 842 |
+
logger.info("Sync user job. username=%s", username)
|
| 843 |
_schedule_user_job(username)
|
| 844 |
if run_startup_tasks:
|
| 845 |
cfg = _load_user_config(username)
|
| 846 |
run_on_startup = bool(cfg.get("scheduler", {}).get("runOnStartup", False))
|
| 847 |
+
logger.info("Startup run flag loaded. username=%s run_on_startup=%s", username, run_on_startup)
|
| 848 |
if run_on_startup:
|
| 849 |
+
logger.info("Trigger startup run. username=%s", username)
|
| 850 |
_start_background_run(username, "startup")
|
| 851 |
|
| 852 |
scheduler_bootstrapped = True
|
| 853 |
+
logger.info("Sync user jobs complete. count=%s", len(users_map))
|
| 854 |
|
| 855 |
|
| 856 |
def _start_scheduler_bootstrap(run_startup_tasks: bool):
|
|
|
|
| 858 |
|
| 859 |
with scheduler_bootstrap_lock:
|
| 860 |
if scheduler_bootstrap_running:
|
| 861 |
+
logger.info("Scheduler bootstrap already running; skip duplicate start.")
|
| 862 |
return False
|
| 863 |
scheduler_bootstrap_running = True
|
| 864 |
|
|
|
|
| 866 |
global scheduler_bootstrapped, scheduler_bootstrap_running
|
| 867 |
try:
|
| 868 |
logger.info("Scheduler bootstrap started. run_startup_tasks=%s", run_startup_tasks)
|
| 869 |
+
|
| 870 |
+
logger.info("Bootstrap stage begin: ensure_data_layout")
|
| 871 |
_ensure_data_layout()
|
| 872 |
+
logger.info("Bootstrap stage complete: ensure_data_layout")
|
| 873 |
+
|
| 874 |
+
logger.info("Bootstrap stage begin: load_users_meta")
|
| 875 |
users_map = _load_users_meta()
|
| 876 |
+
logger.info("Bootstrap stage complete: load_users_meta count=%s", len(users_map))
|
| 877 |
+
|
| 878 |
+
logger.info("Bootstrap stage begin: sync_user_jobs")
|
| 879 |
_sync_user_jobs_from_meta(users_map, run_startup_tasks=run_startup_tasks)
|
| 880 |
+
logger.info("Bootstrap stage complete: sync_user_jobs count=%s", len(users_map))
|
| 881 |
+
|
| 882 |
logger.info("Scheduler bootstrap completed. users=%s", len(users_map))
|
| 883 |
except Exception as exc:
|
| 884 |
scheduler_bootstrapped = False
|