127 lines
4.0 KiB
Python
127 lines
4.0 KiB
Python
from db.connection import MySQLPool
|
||
import bcrypt
|
||
|
||
|
||
class DatabaseManager:
|
||
def __init__(self):
|
||
# 使用MySQLPool初始化数据库连接池
|
||
self.pool = MySQLPool()
|
||
|
||
def fetch(self, query, params=None):
|
||
conn = self.pool.get_connection()
|
||
try:
|
||
cursor = conn.cursor()
|
||
cursor.execute(query, params or ())
|
||
result = cursor.fetchall()
|
||
return result
|
||
finally:
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
def execute(self, query, params=None):
|
||
conn = self.pool.get_connection()
|
||
try:
|
||
cursor = conn.cursor()
|
||
cursor.execute(query, params or ())
|
||
conn.commit()
|
||
return cursor.rowcount
|
||
finally:
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
def user_exists(self, phone_number):
|
||
sql = "SELECT 1 FROM user WHERE number=%s LIMIT 1"
|
||
result = self.fetch(sql, (phone_number,))
|
||
return len(result) > 0
|
||
|
||
def insert_user(self, user):
|
||
sql = """
|
||
INSERT INTO user (name, number, password, status)
|
||
VALUES (%s, %s, %s, %s)
|
||
"""
|
||
data = (user.name, user.number, user.password, user.status)
|
||
# print(data)
|
||
return self.execute(sql, data)
|
||
|
||
def valid_login(self, number, password_attempt):
|
||
# SQL查询获取用户的哈希密码,身份和状态
|
||
sql = "SELECT password, status,name FROM user WHERE number=%s LIMIT 1"
|
||
result = self.fetch(sql, (number,))
|
||
print(result)
|
||
if result:
|
||
stored_hash = result[0]['password'] # 假设结果是密码字段
|
||
status = result[0]['status'] # 用户状态
|
||
name = result[0]['name']
|
||
|
||
# 使用bcrypt进行密码验证
|
||
if bcrypt.checkpw(password_attempt.encode('utf-8'), stored_hash.encode('utf-8')):
|
||
# 密码匹配,返回登录成功,身份和状态
|
||
return {'valid': True, 'status': status, 'name': name}
|
||
|
||
# 密码不匹配或用户不存在,返回登录失败
|
||
return {'valid': False}
|
||
|
||
def get_menu(self, role):
|
||
sql = "SELECT menu_name,path FROM menu_items WHERE role=%s ORDER BY `order`"
|
||
result = self.fetch(sql, (role,))
|
||
return result
|
||
|
||
def get_all_courses(self):
|
||
sql = "SELECT course_name, course_code, credits, description FROM course"
|
||
result = self.fetch(sql)
|
||
return result
|
||
|
||
def get_current_teacher_courses(self, teacher_number):
|
||
# 使用INNER JOIN连接teacher_class_course表和course表
|
||
sql = """
|
||
SELECT
|
||
c.course_name,
|
||
c.course_code,
|
||
c.credits,
|
||
tcc.class_name,
|
||
m.major
|
||
FROM
|
||
teacher_class_course tcc
|
||
JOIN
|
||
course c ON tcc.course_id = c.course_id
|
||
JOIN
|
||
major m ON tcc.major_id = m.major_id
|
||
WHERE
|
||
tcc.teacher_number = %s;
|
||
"""
|
||
|
||
# 执行查询并返回结果
|
||
result = self.fetch(sql, (teacher_number,))
|
||
return result
|
||
|
||
def get_course_type(self):
|
||
sql = "SELECT course_name, course_type FROM course"
|
||
result = self.fetch(sql)
|
||
return result
|
||
|
||
def get_announcement_info(self):
|
||
sql = "SELECT course_name, course_type FROM course"
|
||
result = self.fetch(sql)
|
||
return result
|
||
|
||
def insert_teacher(self, teacher):
|
||
sql = "INSERT INTO teacher (name, user_id,teacher_number) VALUES (%s, %s, %s);"
|
||
|
||
data = (teacher.name, teacher.user_id, teacher.teacher_number)
|
||
print(data)
|
||
return self.execute(sql, data)
|
||
|
||
def query_user_id(self, phone_number):
|
||
sql = "SELECT user_id from user WHERE number = %s;"
|
||
data = self.fetch(sql, phone_number)
|
||
|
||
return data[0]['user_id']
|
||
|
||
def insert_student(self, student):
|
||
sql = """
|
||
INSERT INTO student (student_name, student_number, user_id, major_id, class_name)
|
||
VALUES (%s, %s, %s, %s, %s)
|
||
"""
|
||
data = (student.student_name, student.student_number, student.user_id, student.major_id, student.class_name)
|
||
return self.execute(sql, data)
|