Compare commits

..

No commits in common. "f3ef70b09618404d06606fc2bbfe102c86df7bcb" and "bb53d199bd079334a320293a5dba288a042f3651" have entirely different histories.

5 changed files with 50 additions and 84 deletions

View File

@ -1,16 +1,9 @@
from flask import Flask, request, redirect, url_for, render_template, session, jsonify from flask import Flask, request, redirect, url_for, render_template, session, jsonify
from db.connection import MySQLPool
from config import SECRET_KEY from config import SECRET_KEY
from db.database_manager import DatabaseManager
from models.User import User
app = Flask(__name__) app = Flask(__name__)
app.secret_key = SECRET_KEY # 从配置文件设置 app.secret_key = SECRET_KEY # 从配置文件设置
# 一个全局MySQLPool对象用于管理数据库连接
mysql_pool = MySQLPool()
@app.route('/') @app.route('/')
def index(): def index():
# 如果用户已登录,则重定向到主页;否则,重定向到登录页面 # 如果用户已登录,则重定向到主页;否则,重定向到登录页面
@ -18,53 +11,42 @@ def index():
return redirect(url_for('home')) return redirect(url_for('home'))
else: else:
return render_template('login.html') return render_template('login.html')
@app.route('/register', methods=['GET', 'POST']) @app.route('/register', methods=['GET', 'POST'])
def register(): def register():
if request.method == 'POST': if request.method == 'POST':
# 解析JSON数据而不是表单数据
data = request.get_json() data = request.get_json()
# print(data) nick_name = data['nickname']
user = User( print(nick_name)
nickname=data.get('nickname'), # 处理注册逻辑...
phone_number=data.get('cellphone'), # 确保JSON中的键和这里匹配 return jsonify({"success": True, "message": "注册成功"}) # 返回JSON响应
password=data.get('password'),
identity=data.get('identity'),
is_active=True # 或者根据你的逻辑设置
)
db_manager = DatabaseManager()
if not db_manager.user_exists(user.phone_number):
db_manager.insert_user(user)
return jsonify({"success": True, "message": "注册成功"})
else:
return jsonify({"success": False, "message": "用户已存在"})
else: else:
return render_template('register.html') return render_template('register.html')
@app.route('/login', methods=['GET', 'POST']) @app.route('/login', methods=['GET', 'POST'])
def login(): def login():
if request.method == 'GET': if request.method == 'GET':
return render_template('login.html') return render_template('login.html')
else: else:
phone_number = request.form['username'] username = request.form['username']
password = request.form['password'] password = request.form['password']
db_manager = DatabaseManager() print(username, password)
# 验证用户名和密码... # 验证用户名和密码...
if db_manager.valid_login(phone_number, password): if valid_login(username, password):
# 登录成功 # 登录成功
session['username'] = phone_number session['username'] = username
return jsonify(success=True, message="登录成功") return jsonify(success=True, message="登录成功")
else: else:
# 登录失败 # 登录失败
return jsonify(success=False, message="无效的用户名或密码") return jsonify(success=False, message="无效的用户名或密码")
@app.route('/forget', methods=['GET', 'POST']) @app.route('/forget', methods=['GET', 'POST'])
def forget_page(): def forget_page():
return render_template('forget.html') return render_template('forget.html')
@app.route('/home') @app.route('/home')
def home(): def home():
if 'username' in session: if 'username' in session:
@ -72,7 +54,6 @@ def home():
else: else:
return redirect("login") return redirect("login")
@app.route('/logout') @app.route('/logout')
def logout(): def logout():
# 清除session中的所有信息 # 清除session中的所有信息
@ -81,5 +62,10 @@ def logout():
return redirect('/login') return redirect('/login')
def valid_login(username, password):
# 这里应该是验证用户名和密码的逻辑,比如查询数据库等等
# 假设用户名是admin且密码是secret
return username == '1' and password == '1'
if __name__ == '__main__': if __name__ == '__main__':
app.run(debug=True) app.run(debug = True)

View File

@ -1,5 +1,4 @@
# config.py # config.py
import pymysql
# app secretkey # app secretkey
SECRET_KEY = 'sUNiJ7QPulxrbmZD' SECRET_KEY = 'sUNiJ7QPulxrbmZD'
@ -7,10 +6,9 @@ SECRET_KEY = 'sUNiJ7QPulxrbmZD'
# 数据库连接配置 # 数据库连接配置
DB_CONFIG = { DB_CONFIG = {
'host': '42.193.20.110', 'host': '42.193.20.110',
'port': 8006, # 注意端口是数字,不是字符串
'user': 'test', 'user': 'test',
'password': 'X7gq9lbxqpDGbyCi', 'password': 'X7gq9lbxqpDGbyCi',
'database': 'test_db', 'database': 'test_db',
'charset': 'utf8mb4', 'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor 'cursorclass': 'pymysql.cursors.DictCursor'
} }

View File

@ -20,3 +20,16 @@ class MySQLPool:
def get_connection(self): def get_connection(self):
# 从连接池中获取一个连接 # 从连接池中获取一个连接
return self.pool.connection() return self.pool.connection()
# 使用上下文管理器自动处理连接的开启和关闭
def execute(self, sql, args=None):
with self.get_connection() as connection:
with connection.cursor() as cursor:
cursor.execute(sql, args)
if sql.strip().lower().startswith("select"):
# 如果是查询操作,返回所有结果
return cursor.fetchall()
else:
# 如果是增、删、改操作,提交事务并返回影响的行数
connection.commit()
return cursor.rowcount

View File

@ -1,55 +1,28 @@
import pymysql import pymysql
from db.connection import MySQLPool from config import DB_CONFIG
import bcrypt
class DatabaseManager: class DatabaseManager:
def __init__(self): def __init__(self):
# 使用MySQLPool初始化数据库连接池 self.connection = pymysql.connect(**DB_CONFIG)
self.pool = MySQLPool()
def fetch(self, query, params=None): def fetch(self, query, params=None):
conn = self.pool.get_connection() # 实现查询逻辑
try: pass
cursor = conn.cursor()
cursor.execute(query, params or ())
result = cursor.fetchall()
return result
finally:
cursor.close()
conn.close()
def execute(self, query, params=None): def insert(self, query, params=None):
conn = self.pool.get_connection() # 实现插入逻辑
try: pass
cursor = conn.cursor()
cursor.execute(query, params or ())
conn.commit()
return cursor.rowcount
finally:
cursor.close()
conn.close()
def user_exists(self, phone_number): def update(self, query, params=None):
sql = "SELECT 1 FROM user WHERE phone_number=%s LIMIT 1" # 实现更新逻辑
result = self.fetch(sql, (phone_number,)) pass
return len(result) > 0
def insert_user(self, user): def delete(self, query, params=None):
sql = """ # 实现删除逻辑
INSERT INTO user (nickname, phone_number, password, identity, is_active) pass
VALUES (%s, %s, %s, %s, %s)
"""
data = (user.nickname, user.phone_number, user.password, user.identity, user.is_active)
# print(data)
return self.execute(sql, data)
def valid_login(self, phone_number, password_attempt): def close(self):
# SQL查询获取用户的哈希密码 # 关闭数据库连接
sql = "SELECT password FROM user WHERE phone_number=%s LIMIT 1" self.connection.close()
result = self.fetch(sql, (phone_number,))
if result: # 可能还包含其他数据库操作方法...
stored_hash = result[0]['password'] # 假设结果是密码字段
# 使用bcrypt进行密码验证
if bcrypt.checkpw(password_attempt.encode('utf-8'), stored_hash.encode('utf-8')):
return True # 密码匹配,登录成功
return False # 密码不匹配或用户不存在,登录失败

View File

@ -1,14 +1,10 @@
import bcrypt
class User: class User:
def __init__(self, nickname, phone_number, password, identity, is_active): def __init__(self, nickname, phone_number, password, identity, is_active):
self.nickname = nickname # 用户昵称 self.nickname = nickname # 用户昵称
self.phone_number = phone_number # 手机号 self.phone_number = phone_number # 手机号
self.password = self.hash_password(password) # 哈希密码 self.password = password # 密码
self.identity = identity # 身份(老师或学生) self.identity = identity # 身份(老师或学生)
self.is_active = is_active # 状态(是否可用) self.is_active = is_active # 状态(是否可用)
def hash_password(self, password):
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def __str__(self): def __str__(self):
return f"User({self.nickname}, {self.phone_number}, {'Teacher' if self.identity == 'teacher' else 'Student'}, {'Active' if self.is_active else 'Inactive'})" return f"User({self.nickname}, {self.phone_number}, {'Teacher' if self.identity == 'teacher' else 'Student'}, {'Active' if self.is_active else 'Inactive'})"