This repository has been archived on 2024-09-30. You can view files and clone it, but cannot push or open issues/pull-requests.
SmartRollCall/db/database_manager.py

151 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from db.connection import MySQLPool
import bcrypt
class DatabaseManager:
def __init__(self):
# 使用MySQLPool初始化数据库连接池
self.pool = MySQLPool()
def fetch(self, query, params=None):
conn = self.pool.get_connection()
try:
cursor = conn.cursor()
cursor.execute(query, params or ())
result = cursor.fetchall()
return result
finally:
cursor.close()
conn.close()
def execute(self, query, params=None):
conn = self.pool.get_connection()
try:
cursor = conn.cursor()
cursor.execute(query, params or ())
conn.commit()
return cursor.rowcount
finally:
cursor.close()
conn.close()
def user_exists(self, phone_number):
sql = "SELECT 1 FROM user WHERE number=%s LIMIT 1"
result = self.fetch(sql, (phone_number,))
return len(result) > 0
def insert_user(self, user):
sql = """
INSERT INTO user (name, number, password, status)
VALUES (%s, %s, %s, %s)
"""
data = (user.name, user.number, user.password, user.status)
# print(data)
return self.execute(sql, data)
def valid_login(self, number, password_attempt):
# SQL查询获取用户的哈希密码身份和状态
sql = "SELECT password, status,name FROM user WHERE number=%s LIMIT 1"
result = self.fetch(sql, (number,))
print(result)
if result:
stored_hash = result[0]['password'] # 假设结果是密码字段
status = result[0]['status'] # 用户状态
name = result[0]['name']
# 使用bcrypt进行密码验证
if bcrypt.checkpw(password_attempt.encode('utf-8'), stored_hash.encode('utf-8')):
# 密码匹配,返回登录成功,身份和状态
return {'valid': True, 'status': status, 'name': name}
# 密码不匹配或用户不存在,返回登录失败
return {'valid': False}
def get_menu(self, role):
sql = "SELECT menu_name,path FROM menu_items WHERE role=%s ORDER BY `order`"
result = self.fetch(sql, (role,))
return result
def get_all_courses(self):
sql = "SELECT course_name, course_code, credits, description FROM course"
result = self.fetch(sql)
return result
def get_current_teacher_courses(self, teacher_number):
# 使用INNER JOIN连接teacher_class_course表和course表
sql = """
SELECT
c.course_name,
c.course_code,
c.credits,
tcc.class_name,
m.major
FROM
teacher_class_course tcc
JOIN
course c ON tcc.course_id = c.course_id
JOIN
major m ON tcc.major_id = m.major_id
WHERE
tcc.teacher_number = %s;
"""
# 执行查询并返回结果
result = self.fetch(sql, (teacher_number,))
return result
def get_course_type(self):
sql = "SELECT course_name, course_type FROM course"
result = self.fetch(sql)
return result
def get_announcement_info(self):
sql = "SELECT course_name, course_type FROM course"
result = self.fetch(sql)
return result
def insert_teacher(self, teacher):
sql = "INSERT INTO teacher (name, user_id,teacher_number) VALUES (%s, %s, %s);"
data = (teacher.name, teacher.user_id, teacher.teacher_number)
print(data)
return self.execute(sql, data)
def query_user_id(self, phone_number):
sql = "SELECT user_id from user WHERE number = %s;"
data = self.fetch(sql, phone_number)
return data[0]['user_id']
def insert_student(self, student):
sql = """
INSERT INTO student (student_name, student_number, user_id, major_id, class_name)
VALUES (%s, %s, %s, %s, %s)
"""
data = (student.student_name, student.student_number, student.user_id, student.major_id, student.class_name)
return self.execute(sql, data)
def get_course_name(self, student_number, day_of_week, period_id):
# 从student表获取class_name
sql_student = "SELECT class_name FROM student WHERE student_number = %s;"
class_name = self.fetch(sql_student, (student_number,))[0]['class_name']
# 使用class_name和day_of_week从schedule表获取course_id
sql_schedule = "SELECT course_id, FROM schedule WHERE day_of_week = %s AND class_name = %s AND period_id = %s;"
course_id = self.fetch(sql_schedule, (day_of_week, class_name, period_id))
# 对于每一个course_id从course表中查询course_name
sql_course = "SELECT course_name FROM course WHERE course_id = %s;"
course_name = self.fetch(sql_course, (course_id['course_id'],))
data = {"course_name": course_name, "course_id": course_id}
return data
def update_sign_in_info(self, student_number, course_id, course_name, date, status):
sql = """
INSERT INTO attendance_record (student_number, course_id, course_name, date, status)
VALUES (%s, %s, %s, %s, %s)
"""
val = (student_number, course_id, course_name, date, status)
result = self.execute(sql, val)
return result