This repository has been archived on 2024-09-30. You can view files and clone it, but cannot push or open issues/pull-requests.
SmartRollCall/db/database_manager.py

300 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from utils.time_utils import check_now_time
from utils.time_utils import get_time_by_ids
from db.connection import MySQLPool
import bcrypt
class DatabaseManager:
def __init__(self):
# 使用MySQLPool初始化数据库连接池
self.pool = MySQLPool()
def fetch(self, query, params=None):
conn = self.pool.get_connection()
try:
cursor = conn.cursor()
cursor.execute(query, params or ())
result = cursor.fetchall()
return result
finally:
cursor.close()
conn.close()
def execute(self, query, params=None):
conn = self.pool.get_connection()
try:
cursor = conn.cursor()
cursor.execute(query, params or ())
conn.commit()
return cursor.rowcount
finally:
cursor.close()
conn.close()
def executemany(self, query, params_list):
conn = self.pool.get_connection()
try:
cursor = conn.cursor()
cursor.executemany(query, params_list) # 使用executemany代替execute
conn.commit() # 提交事务
return cursor.rowcount # 返回受影响的行数
except Exception as e:
# 可以在这里添加错误处理逻辑,例如打印错误日志、回滚事务等
print("An error occurred: ", e)
conn.rollback()
return None
finally:
cursor.close()
conn.close()
def user_exists(self, phone_number):
sql = "SELECT 1 FROM user WHERE number=%s LIMIT 1"
result = self.fetch(sql, (phone_number,))
return len(result) > 0
def insert_user(self, user):
sql = """
INSERT INTO user (name, number, password, status)
VALUES (%s, %s, %s, %s)
"""
data = (user.name, user.number, user.password, user.status)
# print(data)
return self.execute(sql, data)
def valid_login(self, number, password_attempt):
# SQL查询获取用户的哈希密码身份和状态
sql = "SELECT password, status,name FROM user WHERE number=%s LIMIT 1"
result = self.fetch(sql, (number,))
print(result)
if result:
stored_hash = result[0]['password'] # 假设结果是密码字段
status = result[0]['status'] # 用户状态
name = result[0]['name']
# 使用bcrypt进行密码验证
if bcrypt.checkpw(password_attempt.encode('utf-8'), stored_hash.encode('utf-8')):
# 密码匹配,返回登录成功,身份和状态
return {'valid': True, 'status': status, 'name': name}
# 密码不匹配或用户不存在,返回登录失败
return {'valid': False}
def get_menu(self, role):
sql = "SELECT menu_name,path FROM menu_items WHERE role=%s ORDER BY `order`"
result = self.fetch(sql, (role,))
return result
def get_all_courses(self):
sql = "SELECT course_name, course_code, credits, description FROM course"
result = self.fetch(sql)
return result
def get_current_teacher_courses(self, teacher_number):
# 使用INNER JOIN连接teacher_class_course表和course表
sql = """
SELECT
c.course_id,
c.course_name,
c.course_code,
c.credits,
tcc.class_name,
m.major,
m.major_id
FROM
teacher_class_course tcc
JOIN
course c ON tcc.course_id = c.course_id
JOIN
major m ON tcc.major_id = m.major_id
WHERE
tcc.teacher_number = %s;
"""
# 执行查询并返回结果
class_student_sql = """SELECT course_id, course_name,class_name,major,major_id,major FROM class_student where teacher_number = %s;"""
result1 = self.fetch(sql, (teacher_number,))
result2 = self.fetch(class_student_sql, (teacher_number))
data = []
# 遍历第一个结果集
if result1:
for row in result1:
data.append({
"course_id": row['course_id'],
"course_name": row['course_name'],
"class_name": row['class_name'],
"major_id": row['major_id'],
"major": row['major']
})
# 遍历第二个结果集
if result2:
for row in result2:
data.append({
"course_id": row['course_id'],
"course_name": row['course_name'],
"class_name": row['class_name'],
"major_id": row['major_id'],
"major": row['major']
})
# 返回包含两个结果集信息的data列表
return data
def get_course_type(self):
sql = "SELECT course_name, course_type FROM course"
result = self.fetch(sql)
return result
def get_announcement_info(self):
sql = "SELECT course_name, course_type FROM course"
result = self.fetch(sql)
return result
def insert_teacher(self, teacher):
sql = "INSERT INTO teacher (name, user_id,teacher_number) VALUES (%s, %s, %s);"
data = (teacher.name, teacher.user_id, teacher.teacher_number)
print(data)
return self.execute(sql, data)
def query_user_id(self, phone_number):
sql = "SELECT user_id from user WHERE number = %s;"
data = self.fetch(sql, phone_number)
return data[0]['user_id']
def insert_student(self, student):
sql = """
INSERT INTO student (student_name, student_number, user_id, major_id, class_name)
VALUES (%s, %s, %s, %s, %s)
"""
data = (student.student_name, student.student_number, student.user_id, student.major_id, student.class_name)
return self.execute(sql, data)
def get_course_name(self, student_number, day_of_week, period_id):
print(f"student_number: {student_number}, day_of_week: {day_of_week}, period_id: {period_id}")
# 从student表获取class_name
sql_student = "SELECT class_name FROM student WHERE student_number = %s;"
class_name = self.fetch(sql_student, (student_number,))[0]['class_name']
# 使用class_name和day_of_week从schedule表获取course_id
sql_schedule = "SELECT course_id FROM schedule WHERE day_of_week = %s AND class_name = %s AND period_id = %s;"
course_id = self.fetch(sql_schedule, (day_of_week, class_name, period_id))
print(f"course_id: {course_id}")
id = course_id[0]['course_id']
print(f"id: {id}")
# 对于每一个course_id从course表中查询course_name
sql_course = "SELECT course_name FROM course WHERE course_id = %s;"
course_name = self.fetch(sql_course, (id,))[0]['course_name']
print(f"course_name: {course_name}")
data = {"course_name": course_name, "course_id": id}
return data
def update_sign_in_info(self, student_number, course_id, course_name, date, status):
sql = """
INSERT INTO attendance_record (student_number, course_id, course_name, date, status)
VALUES (%s, %s, %s, %s, %s)
"""
val = (student_number, course_id, course_name, date, status)
result = self.execute(sql, val)
return result
def get_class_courses(self, student_number):
# 1. 查询学生的主修专业
major_sql = "SELECT major_id FROM student WHERE student_number = %s;"
major_result = self.fetch(major_sql, student_number)
print(major_result)
major_id = major_result[0]['major_id']
# 2. 查询专业的所有课程ID
course_ids_sql = "SELECT course_id FROM major_course WHERE major_id = %s;"
courses_result = self.fetch(course_ids_sql, major_id)
course_ids = [course['course_id'] for course in courses_result]
# 3. 对每个课程ID查询课程详细信息
courses_data = []
for course_id in course_ids:
course_sql = "SELECT course_name, course_code, credits, description FROM course WHERE course_id = %s;"
result = self.fetch(course_sql, course_id)
courses_data.append(result[0])
return courses_data # 返回课程详细信息列表
def student_get_today_courses(self, student_number, day_of_week):
# 从student表获取class_name
sql_student = "SELECT class_name FROM student WHERE student_number = %s;"
class_name = self.fetch(sql_student, (student_number,))[0]['class_name']
# 使用class_name和day_of_week从schedule表获取course_id
sql_schedule = "SELECT course_id,period_id FROM schedule WHERE day_of_week = %s AND class_name = %s;"
schedule_results = self.fetch(sql_schedule, (day_of_week, class_name))
print(schedule_results)
data = []
for i in schedule_results:
course_id = i['course_id']
sql_course = "SELECT course_name FROM course WHERE course_id = %s;"
course_name = self.fetch(sql_course, (course_id))[0]['course_name']
id = i['period_id']
time = get_time_by_ids(id)
data.append({"course_name": course_name, "time": time})
return data
def teacher_sign_in(self, course_id, course_name, class_name, major_id, date, status, teacher_number):
class_student_sql = "SELECT student_number FROM class_student WHERE teacher_number = %s AND major_id = %s AND class_name = %s"
class_student_result = self.fetch(class_student_sql, (teacher_number, major_id, class_name))
student_sql = "SELECT student_number FROM student WHERE class_name = %s AND major_id = %s;"
student_sql_result = self.fetch(student_sql, (class_name, major_id))
values_list = []
# 检查是否有学生编号被返回
if student_sql_result:
for student in student_sql_result:
# 对于每个学生编号,创建一条记录的值元组
student_number = student['student_number']
values_list.append((student_number, course_id, course_name, date, status))
if class_student_result:
for class_student in class_student_result:
# 注意这里变量名已经修改为class_student
student_number = class_student['student_number']
values_list.append((student_number, course_id, course_name, date, status))
# 如果没有任何学生编号,返回相应的消息
if not values_list:
return {"msg": "当前班级专业没有学生"}
# 构建一次性插入多条记录的SQL语句
attendance_sql = "INSERT INTO attendance_record (student_number, course_id, course_name, date, status) VALUES (%s, %s, %s, %s, %s)"
# 使用executemany一次性插入多条记录
result = self.executemany(attendance_sql, values_list)
print(f"Inserted rows: {result}")
return {"msg": "班级签到成功,签到人数:" + str(result)}
def insert_into_class_student(self, class_students):
try:
values_list = [
(
cs['teacher_number'],
cs['class_name'],
cs['student_name'],
cs['student_number'],
cs['course_id'],
cs['course_name'],
cs['major_id'],
cs['major']
)
for cs in class_students
]
sql = """INSERT INTO class_student
(teacher_number, class_name, student_name, student_number, course_id, course_name, major_id, major)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"""
result = self.executemany(sql, values_list)
return {"inserted_rows": result}
except Exception as e:
# 这里应根据实际情况记录日志或返回错误信息
print(f"An error occurred: {e}")
return {"error": str(e)}