| """
|
| 认证模块 - 邮箱/手机验证码登录
|
| """
|
| from flask import Blueprint, request, jsonify, session
|
| import random
|
| import string
|
| import time
|
| import smtplib
|
| from email.mime.text import MIMEText
|
| from email.mime.multipart import MIMEMultipart
|
| import os
|
| from datetime import datetime, timedelta
|
| import jwt
|
| import sqlite3
|
| import hashlib
|
|
|
| auth_bp = Blueprint('auth', __name__)
|
|
|
|
|
| verification_codes = {}
|
|
|
|
|
| JWT_SECRET = os.getenv("JWT_SECRET", "your_jwt_secret_key_here")
|
| JWT_ALGORITHM = "HS256"
|
| JWT_EXPIRATION_DELTA = timedelta(days=7)
|
|
|
|
|
| DB_PATH = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'auth.db')
|
|
|
|
|
| def init_auth_db():
|
| """初始化认证数据库表"""
|
| conn = sqlite3.connect(DB_PATH)
|
| cursor = conn.cursor()
|
|
|
|
|
| cursor.execute('''
|
| CREATE TABLE IF NOT EXISTS users (
|
| id INTEGER PRIMARY KEY AUTOINCREMENT,
|
| email TEXT UNIQUE,
|
| phone TEXT UNIQUE,
|
| username TEXT UNIQUE,
|
| role TEXT DEFAULT 'student',
|
| name TEXT,
|
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
| last_login TIMESTAMP,
|
| password TEXT
|
| )
|
| ''')
|
|
|
|
|
| try:
|
| cursor.execute("ALTER TABLE users ADD COLUMN password TEXT")
|
| except Exception:
|
| pass
|
|
|
|
|
| cursor.execute('''
|
| CREATE TABLE IF NOT EXISTS verification_logs (
|
| id INTEGER PRIMARY KEY AUTOINCREMENT,
|
| identifier TEXT NOT NULL,
|
| code TEXT NOT NULL,
|
| type TEXT NOT NULL,
|
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
| used BOOLEAN DEFAULT 0
|
| )
|
| ''')
|
|
|
| conn.commit()
|
| conn.close()
|
|
|
|
|
| init_auth_db()
|
|
|
| def generate_verification_code(length=6):
|
| """生成验证码"""
|
| return ''.join(random.choices(string.digits, k=length))
|
|
|
| def send_email_code(email, code):
|
| """发送邮箱验证码"""
|
| try:
|
|
|
| smtp_host = os.getenv("SMTP_HOST", "smtp.gmail.com")
|
| smtp_port = int(os.getenv("SMTP_PORT", "587"))
|
| smtp_user = os.getenv("SMTP_USER", "")
|
| smtp_pass = os.getenv("SMTP_PASS", "")
|
|
|
| if not smtp_user or not smtp_pass:
|
|
|
| print(f"=======================================")
|
| print(f"📧 Email Verification (Dev Mode)")
|
| print(f"=======================================")
|
| print(f"To: {email}")
|
| print(f"Code: {code}")
|
| print(f"=======================================")
|
| return True
|
|
|
|
|
| msg = MIMEMultipart('alternative')
|
| msg['From'] = os.getenv("FROM_EMAIL", "Education@aixiao.xyz")
|
| msg['To'] = email
|
| msg['Subject'] = '【智能教育助手】登录验证码'
|
|
|
| html = f'''
|
| <html>
|
| <head>
|
| <style>
|
| body {{ font-family: 'Microsoft YaHei', Arial, sans-serif; }}
|
| .container {{ max-width: 600px; margin: 0 auto; padding: 20px; }}
|
| .header {{ text-align: center; color: #10b981; }}
|
| .code-box {{
|
| background: #f8fafc;
|
| border: 2px solid #10b981;
|
| border-radius: 8px;
|
| padding: 20px;
|
| margin: 20px 0;
|
| text-align: center;
|
| }}
|
| .code {{
|
| font-size: 32px;
|
| font-weight: bold;
|
| color: #10b981;
|
| letter-spacing: 5px;
|
| }}
|
| </style>
|
| </head>
|
| <body>
|
| <div class="container">
|
| <h1 class="header">智能教育助手</h1>
|
| <p>您的登录验证码是:</p>
|
| <div class="code-box">
|
| <div class="code">{code}</div>
|
| </div>
|
| <p style="color: #999;">验证码有效期为5分钟,请尽快使用。</p>
|
| <p style="color: #999;">如果这不是您的操作,请忽略此邮件。</p>
|
| </div>
|
| </body>
|
| </html>
|
| '''
|
|
|
| part = MIMEText(html, 'html')
|
| msg.attach(part)
|
|
|
|
|
| server = smtplib.SMTP(smtp_host, smtp_port)
|
|
|
|
|
| server.login(smtp_user, smtp_pass)
|
| server.send_message(msg)
|
| server.quit()
|
|
|
| return True
|
| except Exception as e:
|
| print(f"Email send error: {e}")
|
| return False
|
|
|
| def generate_jwt_token(user_info):
|
| """生成JWT token"""
|
| payload = {
|
| 'user_id': user_info.get('id'),
|
| 'email': user_info.get('email'),
|
| 'phone': user_info.get('phone'),
|
| 'username': user_info.get('username'),
|
| 'role': user_info.get('role', 'student'),
|
| 'exp': datetime.utcnow() + JWT_EXPIRATION_DELTA
|
| }
|
| return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
|
|
|
| def verify_jwt_token(token):
|
| """验证JWT token"""
|
| try:
|
| payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
|
| return payload
|
| except jwt.ExpiredSignatureError:
|
| return None
|
| except jwt.InvalidTokenError:
|
| return None
|
|
|
| @auth_bp.route('/send-email-code', methods=['POST'])
|
| def send_email_verification():
|
| """发送邮箱验证码"""
|
| try:
|
| data = request.json
|
| email = data.get('email')
|
|
|
| if not email:
|
| return jsonify({'success': False, 'message': '请提供邮箱地址'}), 400
|
|
|
|
|
| code = generate_verification_code()
|
| print(f"Generated code for {email}: {code}")
|
|
|
|
|
| verification_codes[email] = {
|
| 'code': code,
|
| 'timestamp': time.time(),
|
| 'type': 'email'
|
| }
|
|
|
|
|
| try:
|
| conn = sqlite3.connect(DB_PATH)
|
| cursor = conn.cursor()
|
| cursor.execute(
|
| "INSERT INTO verification_logs (identifier, code, type) VALUES (?, ?, ?)",
|
| (email, code, 'email')
|
| )
|
| conn.commit()
|
| conn.close()
|
| except Exception as db_error:
|
| print(f"Database error: {db_error}")
|
|
|
|
|
| if send_email_code(email, code):
|
| return jsonify({
|
| 'success': True,
|
| 'message': '验证码已发送到您的邮箱',
|
| 'expiresIn': 300
|
| })
|
| else:
|
| return jsonify({'success': False, 'message': '发送验证码失败'}), 500
|
| except Exception as e:
|
| import traceback
|
| print(f"Error in send_email_verification: {e}")
|
| print(traceback.format_exc())
|
| return jsonify({'success': False, 'message': '发送验证码失败'}), 500
|
|
|
| @auth_bp.route('/send-phone-code', methods=['POST'])
|
| def send_phone_verification():
|
| """发送手机验证码(模拟)"""
|
| data = request.json
|
| phone = data.get('phone')
|
|
|
| if not phone:
|
| return jsonify({'success': False, 'message': '请提供手机号'}), 400
|
|
|
|
|
| code = generate_verification_code()
|
|
|
|
|
| verification_codes[phone] = {
|
| 'code': code,
|
| 'timestamp': time.time(),
|
| 'type': 'phone'
|
| }
|
|
|
|
|
| print(f"=======================================")
|
| print(f"📱 Phone Verification (Dev Mode)")
|
| print(f"=======================================")
|
| print(f"Phone: {phone}")
|
| print(f"Code: {code}")
|
| print(f"=======================================")
|
|
|
| return jsonify({
|
| 'success': True,
|
| 'message': '验证码已发送',
|
| 'code': code,
|
| 'expiresIn': 300
|
| })
|
|
|
| @auth_bp.route('/login-email', methods=['POST'])
|
| def login_with_email():
|
| """邮箱验证码登录"""
|
| data = request.json
|
| email = data.get('email')
|
| code = data.get('code')
|
| role = data.get('role', 'student')
|
|
|
| if not email or not code:
|
| return jsonify({'success': False, 'message': '请提供邮箱和验证码'}), 400
|
|
|
|
|
| stored = verification_codes.get(email)
|
| if not stored:
|
| return jsonify({'success': False, 'message': '验证码无效'}), 400
|
|
|
|
|
| if time.time() - stored['timestamp'] > 300:
|
| del verification_codes[email]
|
| return jsonify({'success': False, 'message': '验证码已过期'}), 400
|
|
|
|
|
| if stored['code'] != code:
|
| return jsonify({'success': False, 'message': '验证码错误'}), 400
|
|
|
|
|
| del verification_codes[email]
|
|
|
|
|
| conn = sqlite3.connect(DB_PATH)
|
| cursor = conn.cursor()
|
|
|
| cursor.execute("SELECT * FROM users WHERE email = ?", (email,))
|
| user = cursor.fetchone()
|
|
|
| if not user:
|
|
|
| cursor.execute(
|
| "INSERT INTO users (email, name, role) VALUES (?, ?, ?)",
|
| (email, email.split('@')[0], role)
|
| )
|
| conn.commit()
|
| user_id = cursor.lastrowid
|
| user_info = {
|
| 'id': user_id,
|
| 'email': email,
|
| 'name': email.split('@')[0],
|
| 'role': role
|
| }
|
| else:
|
| user_info = {
|
| 'id': user[0],
|
| 'email': user[1],
|
| 'phone': user[2],
|
| 'name': user[5],
|
| 'role': user[4]
|
| }
|
|
|
| cursor.execute(
|
| "UPDATE users SET last_login = CURRENT_TIMESTAMP WHERE id = ?",
|
| (user[0],)
|
| )
|
| conn.commit()
|
|
|
| conn.close()
|
|
|
|
|
| token = generate_jwt_token(user_info)
|
|
|
|
|
| session['user'] = user_info
|
|
|
| return jsonify({
|
| 'success': True,
|
| 'message': '登录成功',
|
| 'data': {
|
| 'user': user_info,
|
| 'token': token
|
| }
|
| })
|
|
|
| @auth_bp.route('/login-phone', methods=['POST'])
|
| def login_with_phone():
|
| """手机验证码登录"""
|
| data = request.json
|
| phone = data.get('phone')
|
| code = data.get('code')
|
| role = data.get('role', 'student')
|
|
|
| if not phone or not code:
|
| return jsonify({'success': False, 'message': '请提供手机号和验证码'}), 400
|
|
|
|
|
| stored = verification_codes.get(phone)
|
| if not stored:
|
| return jsonify({'success': False, 'message': '验证码无效'}), 400
|
|
|
|
|
| if time.time() - stored['timestamp'] > 300:
|
| del verification_codes[phone]
|
| return jsonify({'success': False, 'message': '验证码已过期'}), 400
|
|
|
|
|
| if stored['code'] != code:
|
| return jsonify({'success': False, 'message': '验证码错误'}), 400
|
|
|
|
|
| del verification_codes[phone]
|
|
|
|
|
| conn = sqlite3.connect(DB_PATH)
|
| cursor = conn.cursor()
|
|
|
| cursor.execute("SELECT * FROM users WHERE phone = ?", (phone,))
|
| user = cursor.fetchone()
|
|
|
| if not user:
|
|
|
| cursor.execute(
|
| "INSERT INTO users (phone, name, role) VALUES (?, ?, ?)",
|
| (phone, f"用户{phone[-4:]}", role)
|
| )
|
| conn.commit()
|
| user_id = cursor.lastrowid
|
| user_info = {
|
| 'id': user_id,
|
| 'phone': phone,
|
| 'name': f"用户{phone[-4:]}",
|
| 'role': role
|
| }
|
| else:
|
| user_info = {
|
| 'id': user[0],
|
| 'email': user[1],
|
| 'phone': user[2],
|
| 'name': user[5],
|
| 'role': user[4]
|
| }
|
|
|
| cursor.execute(
|
| "UPDATE users SET last_login = CURRENT_TIMESTAMP WHERE id = ?",
|
| (user[0],)
|
| )
|
| conn.commit()
|
|
|
| conn.close()
|
|
|
|
|
| token = generate_jwt_token(user_info)
|
|
|
|
|
| session['user'] = user_info
|
|
|
| return jsonify({
|
| 'success': True,
|
| 'message': '登录成功',
|
| 'data': {
|
| 'user': user_info,
|
| 'token': token
|
| }
|
| })
|
|
|
| @auth_bp.route('/verify', methods=['GET'])
|
| def verify_token():
|
| """验证Token"""
|
| auth_header = request.headers.get('Authorization')
|
| if not auth_header:
|
| return jsonify({'success': False, 'message': '未提供认证令牌'}), 401
|
|
|
| token = auth_header.replace('Bearer ', '')
|
| payload = verify_jwt_token(token)
|
|
|
| if not payload:
|
| return jsonify({'success': False, 'message': '无效的认证令牌'}), 401
|
|
|
|
|
| conn = sqlite3.connect(DB_PATH)
|
| cursor = conn.cursor()
|
|
|
| if payload.get('email'):
|
| cursor.execute("SELECT * FROM users WHERE email = ?", (payload['email'],))
|
| elif payload.get('phone'):
|
| cursor.execute("SELECT * FROM users WHERE phone = ?", (payload['phone'],))
|
| elif payload.get('username'):
|
| cursor.execute("SELECT * FROM users WHERE username = ?", (payload['username'],))
|
| else:
|
| conn.close()
|
| return jsonify({'success': False, 'message': '用户不存在'}), 401
|
|
|
| user = cursor.fetchone()
|
| conn.close()
|
|
|
| if not user:
|
| return jsonify({'success': False, 'message': '用户不存在'}), 401
|
|
|
| user_info = {
|
| 'id': user[0],
|
| 'email': user[1],
|
| 'phone': user[2],
|
| 'name': user[5],
|
| 'role': user[4]
|
| }
|
|
|
| return jsonify({
|
| 'success': True,
|
| 'data': {'user': user_info}
|
| })
|
|
|
| @auth_bp.route('/register', methods=['POST'])
|
| def register():
|
| """用户名密码注册"""
|
| data = request.json
|
| username = data.get('username', '').strip()
|
| password = data.get('password', '')
|
| role = data.get('role', 'student')
|
|
|
| if not username or not password:
|
| return jsonify({'success': False, 'message': '请提供用户名和密码'}), 400
|
|
|
| if len(password) < 6:
|
| return jsonify({'success': False, 'message': '密码至少6位'}), 400
|
|
|
| if role not in ('teacher', 'student'):
|
| return jsonify({'success': False, 'message': '角色无效'}), 400
|
|
|
| password_hash = hashlib.sha256(password.encode()).hexdigest()
|
|
|
| conn = sqlite3.connect(DB_PATH)
|
| cursor = conn.cursor()
|
|
|
|
|
| cursor.execute("SELECT id FROM users WHERE username = ?", (username,))
|
| if cursor.fetchone():
|
| conn.close()
|
| return jsonify({'success': False, 'message': '用户名已存在'}), 400
|
|
|
| cursor.execute(
|
| "INSERT INTO users (username, password, name, role) VALUES (?, ?, ?, ?)",
|
| (username, password_hash, username, role)
|
| )
|
| conn.commit()
|
| user_id = cursor.lastrowid
|
| conn.close()
|
|
|
| user_info = {
|
| 'id': user_id,
|
| 'username': username,
|
| 'name': username,
|
| 'role': role
|
| }
|
| token = generate_jwt_token(user_info)
|
|
|
| return jsonify({
|
| 'success': True,
|
| 'message': '注册成功',
|
| 'data': {
|
| 'user': user_info,
|
| 'token': token
|
| }
|
| })
|
|
|
| @auth_bp.route('/login-password', methods=['POST'])
|
| def login_with_password():
|
| """用户名密码登录"""
|
| data = request.json
|
| username = data.get('username', '').strip()
|
| password = data.get('password', '')
|
|
|
| if not username or not password:
|
| return jsonify({'success': False, 'message': '请提供用户名和密码'}), 400
|
|
|
| password_hash = hashlib.sha256(password.encode()).hexdigest()
|
|
|
| conn = sqlite3.connect(DB_PATH)
|
| cursor = conn.cursor()
|
|
|
| cursor.execute("SELECT * FROM users WHERE username = ? AND password = ?", (username, password_hash))
|
| user = cursor.fetchone()
|
|
|
| if not user:
|
| conn.close()
|
| return jsonify({'success': False, 'message': '用户名或密码错误'}), 401
|
|
|
| user_info = {
|
| 'id': user[0],
|
| 'email': user[1],
|
| 'phone': user[2],
|
| 'username': user[3],
|
| 'name': user[5],
|
| 'role': user[4]
|
| }
|
|
|
| cursor.execute(
|
| "UPDATE users SET last_login = CURRENT_TIMESTAMP WHERE id = ?",
|
| (user[0],)
|
| )
|
| conn.commit()
|
| conn.close()
|
|
|
| token = generate_jwt_token(user_info)
|
| session['user'] = user_info
|
|
|
| return jsonify({
|
| 'success': True,
|
| 'message': '登录成功',
|
| 'data': {
|
| 'user': user_info,
|
| 'token': token
|
| }
|
| })
|
|
|
| @auth_bp.route('/me', methods=['GET'])
|
| def get_current_user():
|
| """获取当前登录用户信息"""
|
| auth_header = request.headers.get('Authorization')
|
| if not auth_header:
|
| return jsonify({'success': False, 'message': '未提供认证令牌'}), 401
|
|
|
| token = auth_header.replace('Bearer ', '')
|
| payload = verify_jwt_token(token)
|
|
|
| if not payload:
|
| return jsonify({'success': False, 'message': '无效的认证令牌'}), 401
|
|
|
| conn = sqlite3.connect(DB_PATH)
|
| cursor = conn.cursor()
|
|
|
| if payload.get('email'):
|
| cursor.execute("SELECT * FROM users WHERE email = ?", (payload['email'],))
|
| elif payload.get('phone'):
|
| cursor.execute("SELECT * FROM users WHERE phone = ?", (payload['phone'],))
|
| elif payload.get('username'):
|
| cursor.execute("SELECT * FROM users WHERE username = ?", (payload['username'],))
|
| else:
|
| conn.close()
|
| return jsonify({'success': False, 'message': '用户不存在'}), 401
|
|
|
| user = cursor.fetchone()
|
| conn.close()
|
|
|
| if not user:
|
| return jsonify({'success': False, 'message': '用户不存在'}), 401
|
|
|
| return jsonify({
|
| 'success': True,
|
| 'data': {
|
| 'user': {
|
| 'id': user[0],
|
| 'email': user[1],
|
| 'phone': user[2],
|
| 'username': user[3],
|
| 'name': user[5],
|
| 'role': user[4]
|
| }
|
| }
|
| })
|
|
|
| @auth_bp.route('/change_password', methods=['POST'])
|
| def change_password():
|
| """修改密码"""
|
| auth_header = request.headers.get('Authorization')
|
| if not auth_header:
|
| return jsonify({'success': False, 'message': '未提供认证令牌'}), 401
|
|
|
| token = auth_header.replace('Bearer ', '')
|
| payload = verify_jwt_token(token)
|
| if not payload:
|
| return jsonify({'success': False, 'message': '无效的认证令牌'}), 401
|
|
|
| data = request.json
|
| old_password = data.get('oldPassword', '')
|
| new_password = data.get('newPassword', '')
|
|
|
| if not old_password or not new_password:
|
| return jsonify({'success': False, 'message': '请提供旧密码和新密码'}), 400
|
|
|
| if len(new_password) < 6:
|
| return jsonify({'success': False, 'message': '新密码至少6位'}), 400
|
|
|
| old_hash = hashlib.sha256(old_password.encode()).hexdigest()
|
| new_hash = hashlib.sha256(new_password.encode()).hexdigest()
|
|
|
| conn = sqlite3.connect(DB_PATH)
|
| cursor = conn.cursor()
|
|
|
| user_id = payload.get('user_id')
|
| cursor.execute("SELECT password FROM users WHERE id = ?", (user_id,))
|
| row = cursor.fetchone()
|
|
|
| if not row or row[0] != old_hash:
|
| conn.close()
|
| return jsonify({'success': False, 'message': '旧密码错误'}), 400
|
|
|
| cursor.execute("UPDATE users SET password = ? WHERE id = ?", (new_hash, user_id))
|
| conn.commit()
|
| conn.close()
|
|
|
| return jsonify({'success': True, 'message': '密码修改成功'})
|
|
|
| @auth_bp.route('/logout', methods=['POST'])
|
| def logout():
|
| """登出"""
|
| session.clear()
|
| return jsonify({'success': True, 'message': '已登出'}) |