Compare commits
35 Commits
af5721992d
...
dev
| Author | SHA1 | Date | |
|---|---|---|---|
| 5e492e4d8f | |||
| f1836172d6 | |||
| adeb153c8a | |||
| 5336990f54 | |||
| deff3cb921 | |||
| 96f0fbdcd7 | |||
| 9aec69bb46 | |||
| e54a8c5f00 | |||
| 1d6593a8f2 | |||
| a14984a263 | |||
| 3e65e02704 | |||
| 5eef2384cf | |||
| 0864295a6e | |||
| c9defe78f1 | |||
| 27da1caf44 | |||
| bf1c5116be | |||
| 3ff448b15d | |||
| 9cfd4f88af | |||
| 80088a4144 | |||
| 840d44b773 | |||
| fc1259d0e2 | |||
| 5f22c0b6eb | |||
| ff15ecb1a1 | |||
| 2bc34bc860 | |||
| df0aa388e2 | |||
| daec029789 | |||
| 5aa40d1dd5 | |||
| ea1a938aad | |||
| 66310aa2f5 | |||
| efc71af70c | |||
| 64c7b47a4b | |||
| 6a6baeb460 | |||
| 03fcd8203c | |||
| 9e8cfe793c | |||
| b9a2b3bc31 |
130
README.md
130
README.md
@@ -76,7 +76,7 @@
|
||||
- **缺一门**:玩家必须选择缺少一种花色(条、筒、万中的任意一种),即只能用两种花色来胡牌。如果手中只有单一花色,则为清一色。
|
||||
- **定缺**:游戏开始时,每位玩家需要扣下一张牌作为自己缺的那门,并且不能更改。如果本身就是两门牌,则可以报“天缺”而不扣牌。
|
||||
- **起牌与打牌**:庄家通过掷骰子决定起牌位置,然后按顺序抓牌。庄家先出牌,之后每家依次摸牌打牌。
|
||||
- **碰、杠**:允许碰牌和杠牌,但不允许吃牌。杠牌分为明杠和暗杠,明杠是其他玩家打出的牌被你碰后又摸到相同的牌;暗杠则是你自己摸到四张相同的牌。
|
||||
- **碰、杠**:允许碰牌和杠牌,但不允许吃牌。杠牌分为明杠和暗杠,明杠是其他玩家打出的牌刚好与你手里有三张的牌相同;暗杠则是你自己摸到四张相同的牌。
|
||||
- **胡牌**:胡牌的基本条件是拥有一个对子加上四个顺子或刻子(三个相同牌)。自摸为三家给分,点炮则由放炮者给分。n*AAA+m*ABC+DD ,mn可以等于0。
|
||||
- **血战到底**:一家胡牌后,其他未胡牌的玩家继续游戏,直到只剩下最后一位玩家或者黄庄(所有牌都被摸完)为止。
|
||||
|
||||
@@ -150,11 +150,137 @@
|
||||
|
||||
**自摸 **:是指玩家通过自己摸牌完成胡牌。自摸时,其他玩家都需要给赢家支付相应的分数。
|
||||
|
||||
## 成都麻将游戏流程
|
||||
|
||||
1.确定庄家:通常在第一局开始时通过掷骰子来决定庄家。以后每局由上一局胡牌的玩家坐庄,如果流局则庄家不变。
|
||||
|
||||
2.庄家摸牌:从掷骰子确定的位置开始,庄家先摸14张牌,其他玩家每人摸13张牌。
|
||||
|
||||
3.庄家出牌:庄家先打出一张牌,开始这一局的游戏。
|
||||
|
||||
4.顺时针出牌:接下来按照顺时针方向,每位玩家依次摸牌和出牌。
|
||||
|
||||
5.摸牌与出牌:每个玩家轮流摸一张牌,然后选择出一张牌。玩家可以进行碰、杠等操作。
|
||||
|
||||
6.打缺一门:玩家必须选择先打完定缺的花色牌,才能出其他牌。(缺一种花色(筒、条、万中的一种),即手牌中只能保留两种花色)
|
||||
|
||||
7.自摸:玩家摸到的牌使自己胡牌。
|
||||
|
||||
8.点炮:其他玩家打出的牌使自己胡牌。
|
||||
|
||||
9.计分:胡牌后根据胡牌的番数和其他规则进行计分。)
|
||||
|
||||
10.结算
|
||||
|
||||
## 成都麻将规则建模
|
||||
|
||||
麻将游戏引擎建模代码于项目根src/engine/目录下。
|
||||
|
||||
## PPO(Proximal Policy Optimization)算法
|
||||
|
||||
|
||||
## 算法
|
||||
|
||||
#### **1. 强化学习**
|
||||
|
||||
适用于学习最佳策略,帮助AI根据牌局动态决策(如摸牌、出牌、胡牌等)。
|
||||
|
||||
**Q-Learning/Deep Q-Learning (DQN)**:
|
||||
|
||||
使用价值函数近似,适用于简单麻将变体。
|
||||
|
||||
在复杂麻将中可能遇到状态空间爆炸的问题。
|
||||
|
||||
**Policy Gradient(如 REINFORCE、PPO、A3C)**:
|
||||
|
||||
- 直接学习策略,适合连续决策问题。
|
||||
- Proximal Policy Optimization (PPO) 是目前表现较好的强化学习算法。
|
||||
|
||||
**AlphaZero/Monte Carlo Tree Search (MCTS)**:
|
||||
|
||||
- 结合深度神经网络和搜索算法,模拟多局游戏,适用于探索全局最优策略。
|
||||
|
||||
**适用场景**:
|
||||
|
||||
自主对局学习(自我博弈)。
|
||||
|
||||
学习如何综合权衡得失(如是否碰牌、杠牌或放弃操作)。
|
||||
|
||||
#### **2. 模拟和搜索算法**
|
||||
|
||||
适用于推理对手手牌或牌堆剩余牌,提升策略的稳定性。
|
||||
|
||||
**算法**:
|
||||
|
||||
- Monte Carlo Tree Search (MCTS):
|
||||
- 模拟多个可能的后续动作,估算每个动作的收益。
|
||||
- 常用于长序列决策,如考虑碰、杠、胡等多步操作的效果。
|
||||
- Minimax with Alpha-Beta Pruning:
|
||||
- 在两人麻将(或简化版本)中,模拟对手的可能操作。
|
||||
|
||||
**适用场景**:
|
||||
|
||||
需要进行搜索优化的场景(如判断是否选择碰、杠)。
|
||||
|
||||
分析未来几步操作对得分的影响。
|
||||
|
||||
#### **3. 监督学习**
|
||||
|
||||
适用于模仿人类玩家的决策或历史数据学习。
|
||||
|
||||
**算法**:
|
||||
|
||||
- 分类算法
|
||||
|
||||
(如 Logistic Regression、Random Forest、XGBoost、Neural Networks):
|
||||
|
||||
- 学习单步决策(如出哪张牌)。
|
||||
- 适用于学习简单的局部决策。
|
||||
|
||||
- 序列模型
|
||||
|
||||
(如 RNN、LSTM、Transformer):
|
||||
|
||||
- 学习决策的序列模式(如出牌顺序和策略连贯性)。
|
||||
- Transformer 可以捕捉复杂的上下文关系。
|
||||
|
||||
**适用场景**:
|
||||
|
||||
有大量玩家对局数据作为训练集。
|
||||
|
||||
模拟人类打牌风格。
|
||||
|
||||
#### **4. 混合方法**
|
||||
|
||||
结合强化学习和监督学习的优点,以应对麻将的高复杂性和多样化。
|
||||
|
||||
**示例方法**:
|
||||
|
||||
- Imitation Learning + Reinforcement Learning:
|
||||
- 先使用监督学习模仿玩家风格,再用强化学习微调策略。
|
||||
- AlphaZero-like Framework:
|
||||
- 结合深度强化学习和搜索(如 MCTS),强化对局策略。
|
||||
|
||||
**适用场景**:
|
||||
|
||||
需要在短时间内获得可用的AI策略。
|
||||
|
||||
想进一步优化模型的决策能力。
|
||||
|
||||
#### **对抗学习**:
|
||||
|
||||
让AI与自身对局(自我博弈)或与其他AI对局,提升对抗能力。
|
||||
|
||||
#### **工具和框架**
|
||||
|
||||
1. 强化学习框架:
|
||||
- **Stable-Baselines3**: 简单易用,支持PPO、DQN等算法。
|
||||
- **Ray RLlib**: 分布式强化学习框架,适合复杂任务。
|
||||
2. 深度学习框架:
|
||||
- TensorFlow 或 PyTorch:构建神经网络和深度学习模型。
|
||||
3. 麻将环境:
|
||||
- 自定义麻将环境或使用已有开源环境(如 OpenAI Gym 或 MahjongRL)。
|
||||
|
||||
### PPO(Proximal Policy Optimization)算法
|
||||
|
||||
TensorBoard 通常会记录和可视化多种训练指标。你提到的这些图表代表了 PPO 训练过程中的不同方面。下面是对每个图表的解释:
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
# configs/log_config.py
|
||||
from loguru import logger
|
||||
import os
|
||||
|
||||
@@ -10,5 +9,31 @@ def setup_logging():
|
||||
# 清除所有现有日志处理器,防止重复配置
|
||||
logger.remove()
|
||||
|
||||
# 配置日志,记录到 ../logs 目录下
|
||||
logger.add(os.path.join(log_dir, "chengdu_mj_engine.log"), rotation="10 MB", level="DEBUG", format="{time} {level} {message}")
|
||||
# 配置斗地主日志文件
|
||||
logger.add(
|
||||
os.path.join(log_dir, "doudizhu_engine.log"), # 斗地主的日志文件
|
||||
rotation="10 MB",
|
||||
level="DEBUG",
|
||||
format="{time:YYYY-MM-DD HH:mm:ss.SSS} {level} {message}",
|
||||
backtrace=True, # 启用完整堆栈信息
|
||||
diagnose=True # 启用变量诊断信息
|
||||
)
|
||||
|
||||
# 配置麻将日志文件(如果需要)
|
||||
logger.add(
|
||||
os.path.join(log_dir, "chengdu_mj_engine.log"),
|
||||
rotation="10 MB",
|
||||
level="DEBUG",
|
||||
format="{time:YYYY-MM-DD HH:mm:ss.SSS} {level} {message}",
|
||||
backtrace=True,
|
||||
diagnose=True,
|
||||
)
|
||||
|
||||
# 配置控制台日志
|
||||
logger.add(
|
||||
lambda msg: print(msg),
|
||||
level="DEBUG",
|
||||
format="{time:YYYY-MM-DD HH:mm:ss.SSS} {level} {message}",
|
||||
backtrace=True,
|
||||
diagnose=True,
|
||||
)
|
||||
|
||||
BIN
models/ppo_doudizhu_model.zip
Normal file
BIN
models/ppo_doudizhu_model.zip
Normal file
Binary file not shown.
Binary file not shown.
@@ -1,33 +1,43 @@
|
||||
import gym
|
||||
from stable_baselines3 import PPO
|
||||
from src.environment.chengdu_majiang_env import MahjongEnv
|
||||
from src import ChengduMahjongEnv
|
||||
import torch
|
||||
from configs.log_config import setup_logging
|
||||
from loguru import logger # 添加 logger
|
||||
|
||||
def train_model():
|
||||
# 创建 MahjongEnv 环境实例
|
||||
env = MahjongEnv()
|
||||
env = ChengduMahjongEnv()
|
||||
|
||||
# 检查是否有可用的 GPU
|
||||
device = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
print(f"使用设备: {device}")
|
||||
logger.info(f"使用设备: {device}") # 替换 print 为 logger.info
|
||||
|
||||
# 使用 PPO 算法训练模型
|
||||
model = PPO("MlpPolicy", env, verbose=1, tensorboard_log="../logs/ppo_mahjong_tensorboard/", device=device)
|
||||
# 使用 PPO 算法训练模型,切换到 MultiInputPolicy
|
||||
model = PPO(
|
||||
"MultiInputPolicy", # 更改为 MultiInputPolicy
|
||||
env,
|
||||
verbose=1,
|
||||
tensorboard_log="../logs/ppo_mahjong_tensorboard/",
|
||||
device=device
|
||||
)
|
||||
|
||||
# 训练模型,训练总步数为 100000
|
||||
logger.info("开始训练模型...")
|
||||
model.learn(total_timesteps=100)
|
||||
logger.info("模型训练完成!")
|
||||
|
||||
# 保存训练后的模型
|
||||
model.save("../models/ppo_mahjong_model")
|
||||
logger.info("模型已保存到 '../models/ppo_mahjong_model'")
|
||||
|
||||
# 测试模型
|
||||
logger.info("开始测试模型...")
|
||||
obs = env.reset()
|
||||
done = False
|
||||
while not done:
|
||||
action, _states = model.predict(obs) # 使用训练好的模型来选择动作
|
||||
obs, reward, done, info = env.step(action) # 执行动作
|
||||
env.render() # 打印环境状态
|
||||
logger.info(f"动作: {action}, 奖励: {reward}, 是否结束: {done}, 信息: {info}") # 替换 print 为 logger.info
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 调用配置函数来设置日志
|
||||
|
||||
46
scripts/train_dizhu_model.py
Normal file
46
scripts/train_dizhu_model.py
Normal file
@@ -0,0 +1,46 @@
|
||||
from stable_baselines3 import PPO
|
||||
from src.environment.dizhu_env import DouDiZhuEnv # 导入斗地主环境
|
||||
import torch
|
||||
from configs.log_config import setup_logging
|
||||
from loguru import logger # 使用日志工具
|
||||
|
||||
def train_dizhu_model():
|
||||
# 创建 DouDiZhuEnv 环境实例
|
||||
env = DouDiZhuEnv()
|
||||
|
||||
# 检查是否有可用的 GPU
|
||||
device = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
logger.info(f"使用设备: {device}") # 使用 logger 记录设备信息
|
||||
|
||||
# 使用 PPO 算法训练模型,设置为 MultiInputPolicy
|
||||
model = PPO(
|
||||
"MultiInputPolicy", # 适用于多输入的策略
|
||||
env,
|
||||
verbose=1,
|
||||
tensorboard_log="../logs/ppo_doudizhu_tensorboard/", # TensorBoard 日志路径
|
||||
device=device
|
||||
)
|
||||
|
||||
# 训练模型,设定总训练步数
|
||||
logger.info("开始训练斗地主模型...")
|
||||
model.learn(total_timesteps=10000000000000000) # 总训练步数
|
||||
logger.info("斗地主模型训练完成!")
|
||||
|
||||
# 保存训练后的模型
|
||||
model_path = "../models/ppo_doudizhu_model"
|
||||
model.save(model_path)
|
||||
logger.info(f"模型已保存到 '{model_path}'")
|
||||
|
||||
# 测试模型
|
||||
logger.info("开始测试斗地主模型...")
|
||||
obs = env.reset()
|
||||
done = False
|
||||
while not done:
|
||||
action, _states = model.predict(obs) # 使用训练好的模型来选择动作
|
||||
obs, reward, done, info = env.step(action) # 执行动作
|
||||
logger.info(f"动作: {action}, 奖励: {reward}, 是否结束: {done}, 信息: {info}") # 记录测试过程
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 设置日志
|
||||
setup_logging()
|
||||
train_dizhu_model()
|
||||
@@ -1,124 +0,0 @@
|
||||
from loguru import logger
|
||||
from src.engine.utils import get_tile_name
|
||||
|
||||
|
||||
def draw_tile(engine):
|
||||
"""
|
||||
当前玩家摸牌逻辑,记录牌的详细信息和游戏状态。
|
||||
"""
|
||||
if engine.state.remaining_tiles == 0:
|
||||
logger.warning("牌堆已空,游戏结束!")
|
||||
engine.game_over = True
|
||||
return 0, True # 游戏结束时返回 0 和 done = True
|
||||
|
||||
tile = engine.state.deck.pop(0) # 从牌堆中取出一张牌
|
||||
engine.state.remaining_tiles -= 1 # 更新剩余牌数
|
||||
engine.state.hands[engine.state.current_player][tile] += 1 # 加入当前玩家手牌
|
||||
|
||||
tile_name = get_tile_name(tile) # 获取具体的牌名
|
||||
logger.info(
|
||||
f"玩家 {engine.state.current_player} 摸到一张牌: {tile_name}(索引 {tile})。剩余牌堆数量: {engine.state.remaining_tiles}"
|
||||
)
|
||||
|
||||
# 返回奖励和游戏是否结束的标志
|
||||
return 0, False # 奖励为 0,done 为 False(游戏继续)
|
||||
|
||||
|
||||
def discard_tile(self, tile):
|
||||
"""
|
||||
当前玩家打牌逻辑,记录打出的牌和当前牌河信息。
|
||||
"""
|
||||
if self.state.hands[self.state.current_player][tile] == 0:
|
||||
logger.error(f"玩家 {self.state.current_player} 尝试打出不存在的牌: 索引 {tile}")
|
||||
raise ValueError("玩家没有这张牌")
|
||||
|
||||
self.state.hands[self.state.current_player][tile] -= 1 # 从手牌中移除
|
||||
self.state.discards[self.state.current_player].append(tile) # 加入牌河
|
||||
|
||||
tile_name = get_tile_name(tile) # 获取具体的牌名
|
||||
logger.info(
|
||||
f"玩家 {self.state.current_player} 打出一张牌: {tile_name}(索引 {tile})。当前牌河: {[get_tile_name(t) for t in self.state.discards[self.state.current_player]]}"
|
||||
)
|
||||
|
||||
|
||||
def peng(self, tile):
|
||||
"""
|
||||
当前玩家碰牌逻辑,记录碰牌操作和手牌状态。
|
||||
"""
|
||||
player = self.state.current_player
|
||||
if self.state.hands[player][tile] < 2:
|
||||
logger.error(f"玩家 {player} 尝试碰牌失败: {get_tile_name(tile)}(索引 {tile})")
|
||||
raise ValueError("碰牌条件不满足")
|
||||
|
||||
self.state.hands[player][tile] -= 2 # 减去两张牌
|
||||
self.state.melds[player].append(("peng", tile)) # 加入明牌列表
|
||||
|
||||
tile_name = get_tile_name(tile)
|
||||
logger.info(f"玩家 {player} 碰了一张牌: {tile_name}(索引 {tile})。当前明牌: {self.state.melds[player]}")
|
||||
|
||||
|
||||
def gang(self, tile, mode):
|
||||
"""
|
||||
当前玩家杠牌逻辑,记录杠牌类型和状态更新。
|
||||
"""
|
||||
player = self.state.current_player
|
||||
tile_name = get_tile_name(tile)
|
||||
|
||||
if mode == "ming" and self.state.hands[player][tile] == 3:
|
||||
self.state.hands[player][tile] -= 3
|
||||
self.state.melds[player].append(("ming_gang", tile))
|
||||
logger.info(f"玩家 {player} 明杠: {tile_name}(索引 {tile})")
|
||||
self.state.scores[player] += 1 # 奖励1分
|
||||
logger.info(f"玩家 {player} 因明杠获得1分")
|
||||
|
||||
elif mode == "an" and self.state.hands[player][tile] == 4:
|
||||
self.state.hands[player][tile] -= 4
|
||||
self.state.melds[player].append(("an_gang", tile))
|
||||
logger.info(f"玩家 {player} 暗杠: {tile_name}(索引 {tile})")
|
||||
self.state.scores[player] += 1 # 奖励1分
|
||||
logger.info(f"玩家 {player} 因暗杠获得1分")
|
||||
|
||||
else:
|
||||
logger.error(f"玩家 {player} 尝试杠牌失败: {tile_name}(索引 {tile}),条件不满足")
|
||||
raise ValueError("杠牌条件不满足")
|
||||
|
||||
|
||||
|
||||
def check_blood_battle(self):
|
||||
"""
|
||||
检查游戏是否流局或血战结束,记录状态。
|
||||
"""
|
||||
if any(score <= 0 for score in self.state.scores):
|
||||
logger.info(f"游戏结束,某玩家分数小于等于0: {self.state.scores}")
|
||||
self.game_over = True
|
||||
|
||||
if len(self.state.winners) >= 3 or self.state.remaining_tiles == 0:
|
||||
logger.info(f"游戏结束,赢家列表: {self.state.winners}")
|
||||
self.game_over = True
|
||||
|
||||
|
||||
def set_missing_suit(player, missing_suit, game_state):
|
||||
"""
|
||||
玩家设置缺门的动作。
|
||||
|
||||
参数:
|
||||
- player: 玩家索引(0-3)。
|
||||
- missing_suit: 玩家选择的缺门("条"、"筒" 或 "万")。
|
||||
- game_state: 当前的游戏状态(`ChengduMahjongState` 实例)。
|
||||
|
||||
异常:
|
||||
- ValueError: 如果缺门设置无效。
|
||||
"""
|
||||
valid_suits = ["条", "筒", "万"]
|
||||
if missing_suit not in valid_suits:
|
||||
logger.error(f"玩家 {player} 尝试设置无效的缺门: {missing_suit}")
|
||||
raise ValueError("缺门设置无效")
|
||||
|
||||
if game_state.missing_suits[player] is not None:
|
||||
logger.error(f"玩家 {player} 已经设置了缺门,不能重复设置")
|
||||
raise ValueError("缺门已经设置,不能重复设置")
|
||||
|
||||
game_state.missing_suits[player] = missing_suit
|
||||
logger.info(f"玩家 {player} 设置缺门为: {missing_suit}")
|
||||
|
||||
return game_state.missing_suits[player]
|
||||
@@ -1,92 +0,0 @@
|
||||
def calculate_fan(hand, melds, is_self_draw, is_cleared, conditions):
|
||||
"""
|
||||
动态计算番数,优先处理高番数。
|
||||
|
||||
参数:
|
||||
- hand: 当前胡牌的手牌(Hand 对象)。
|
||||
- melds: 玩家已明牌的列表(Meld 对象列表)。
|
||||
- is_self_draw: 是否自摸。
|
||||
- is_cleared: 是否清一色。
|
||||
- conditions: 字典,包含特殊胡牌条件,如 {"is_seven_pairs": True, "is_tian_hu": True}。
|
||||
|
||||
返回:
|
||||
- fan: 最大番数。
|
||||
"""
|
||||
fan = 0 # 默认番数
|
||||
|
||||
# 定义番数规则(按优先级从高到低排序)
|
||||
rules = {
|
||||
"tian_hu": lambda: 12 if conditions.get("is_tian_hu", False) else 0, # 天胡
|
||||
"di_hu": lambda: 12 if conditions.get("is_di_hu", False) else 0, # 地胡
|
||||
"dragon_seven_pairs": lambda: 12 if is_dragon_seven_pairs(hand) else 0, # 龙七对
|
||||
"clear_seven_pairs": lambda: 12 if is_cleared and is_seven_pairs(hand) else 0, # 清七对
|
||||
"full_request": lambda: 6 if is_full_request(hand, melds) else 0, # 全求人
|
||||
"pure_cleared": lambda: 4 if is_cleared and len(melds) >= 2 else 0, # 极品清一色
|
||||
"seven_pairs": lambda: 2 if is_seven_pairs(hand) else 0, # 七对
|
||||
"big_pairs": lambda: 2 if is_big_pairs(hand) else 0, # 大对子
|
||||
"small_pairs": lambda: 2 if is_small_pairs(hand) else 0, # 小七对
|
||||
"golden_hook": lambda: 1 if is_golden_hook(hand) else 0, # 金钩吊
|
||||
"gang_flower": lambda: 1 if conditions.get("is_gang_flower", False) else 0, # 杠上开花
|
||||
"rob_gang": lambda: 1 if conditions.get("is_rob_gang", False) else 0, # 抢杠胡
|
||||
"under_the_sea": lambda: 1 if conditions.get("is_under_the_sea", False) else 0, # 海底捞月
|
||||
"plain_win": lambda: 1 if not any(conditions.values()) else 0 # 平胡
|
||||
}
|
||||
|
||||
# 逐一应用规则
|
||||
for rule, func in rules.items():
|
||||
result = func()
|
||||
if result > fan:
|
||||
fan = result # 选择当前最大番数
|
||||
|
||||
return fan
|
||||
|
||||
|
||||
# 辅助函数实现
|
||||
def is_seven_pairs(hand):
|
||||
"""检查是否是七对(七个对子)。"""
|
||||
from collections import Counter
|
||||
counter = Counter(hand.tiles)
|
||||
return sum(1 for count in counter.values() if count == 2) == 7
|
||||
|
||||
|
||||
def is_dragon_seven_pairs(hand):
|
||||
"""检查是否是龙七对(七对中有一对是三张)。"""
|
||||
from collections import Counter
|
||||
counter = Counter(hand.tiles)
|
||||
pairs = sum(1 for count in counter.values() if count == 2)
|
||||
triplets = sum(1 for count in counter.values() if count == 3)
|
||||
return pairs == 6 and triplets == 1
|
||||
|
||||
|
||||
def is_big_pairs(hand):
|
||||
"""检查是否是大对子(四坎牌和一对将,坎牌为刻子)。"""
|
||||
from collections import Counter
|
||||
counter = Counter(hand.tiles)
|
||||
triplets = sum(1 for count in counter.values() if count == 3)
|
||||
pairs = sum(1 for count in counter.values() if count == 2)
|
||||
return triplets == 4 and pairs == 1
|
||||
|
||||
|
||||
def is_golden_hook(hand):
|
||||
"""检查是否是金钩吊(只剩一张牌)。"""
|
||||
return len(hand.tiles) == 1
|
||||
|
||||
|
||||
def is_full_request(hand, melds):
|
||||
"""检查是否是全求人(所有牌通过碰、杠完成)。"""
|
||||
return all(tile_count == 0 for tile_count in hand.tiles) and len(melds) > 0
|
||||
|
||||
|
||||
def is_cleared(hand, melds):
|
||||
"""检查是否是清一色(所有牌同一种花色)。"""
|
||||
all_tiles = hand.tiles + [meld.tile for meld in melds]
|
||||
suits = {tile.suit for tile in all_tiles}
|
||||
return len(suits) == 1
|
||||
def is_small_pairs(hand):
|
||||
"""
|
||||
检查是否是小七对(六对加一对)。
|
||||
"""
|
||||
from collections import Counter
|
||||
counter = Counter(hand.tiles)
|
||||
pairs = sum(1 for count in counter.values() if count == 2)
|
||||
return pairs == 6
|
||||
@@ -1,47 +0,0 @@
|
||||
import random
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from .chengdu_mahjong_state import ChengduMahjongState
|
||||
|
||||
|
||||
class ChengduMahjongEngine:
|
||||
def __init__(self):
|
||||
self.state = ChengduMahjongState() # 创建游戏状态
|
||||
self.game_over = False
|
||||
self.game_started = False # 游戏是否已开始
|
||||
self.deal_tiles() # 发牌
|
||||
|
||||
def deal_tiles(self):
|
||||
""" 发牌,每个玩家发13张牌,并设置缺门 """
|
||||
logger.info("发牌中...")
|
||||
|
||||
# 洗牌(随机打乱牌堆)
|
||||
random.shuffle(self.state.deck)
|
||||
|
||||
# 随机发牌给每个玩家
|
||||
for player in range(4):
|
||||
for _ in range(13): # 每个玩家13张牌
|
||||
tile = self.state.deck.pop() # 从牌堆抽取一张牌
|
||||
self.state.hands[player][tile] += 1 # 增加玩家手牌的计数
|
||||
|
||||
# 设置缺门:每个玩家定缺(这里假设我们让每个玩家的缺门都为“条”)
|
||||
for player in range(4):
|
||||
missing_suit = "条" # 这里可以通过其他方式设置缺门,比如随机选择
|
||||
self.state.set_missing_suit(player, missing_suit)
|
||||
|
||||
def start_game(self):
|
||||
""" 开始游戏 """
|
||||
if not self.game_started:
|
||||
self.game_started = True
|
||||
logger.info("游戏开始!")
|
||||
else:
|
||||
logger.warning("游戏已经开始,不能重复启动!")
|
||||
|
||||
def check_game_over(self):
|
||||
""" 检查游戏是否结束 """
|
||||
# 你可以根据游戏规则检查是否有玩家胡牌或其他结束条件
|
||||
if len(self.state.deck) == 0:
|
||||
self.game_over = True
|
||||
logger.info("游戏结束!")
|
||||
|
||||
10
src/engine/dizhu/deck.py
Normal file
10
src/engine/dizhu/deck.py
Normal file
@@ -0,0 +1,10 @@
|
||||
import numpy as np
|
||||
|
||||
class Deck:
|
||||
def __init__(self):
|
||||
self.cards = [i for i in range(54)] # 0-53 表示54张牌
|
||||
np.random.shuffle(self.cards)
|
||||
|
||||
def deal(self):
|
||||
# 返回三位玩家的手牌和地主牌
|
||||
return self.cards[:17], self.cards[17:34], self.cards[34:51], self.cards[51:]
|
||||
195
src/engine/dizhu/dizhu_engine.py
Normal file
195
src/engine/dizhu/dizhu_engine.py
Normal file
@@ -0,0 +1,195 @@
|
||||
import numpy as np
|
||||
from loguru import logger
|
||||
from src.engine.dizhu.player_state import PlayerState
|
||||
from src.engine.dizhu.deck import Deck
|
||||
from src.engine.dizhu.utils import card_to_string, detect_card_type
|
||||
|
||||
|
||||
class DiZhuEngine:
|
||||
def __init__(self):
|
||||
self.deck = Deck() # 牌堆
|
||||
self.players = [] # 玩家列表
|
||||
self.landlord_index = -1 # 地主索引
|
||||
self.current_player_index = 0 # 当前玩家索引
|
||||
self.landlord_cards = [] # 地主牌
|
||||
self.last_player = None # 最后出牌的玩家索引
|
||||
self.current_pile = None # 当前牌面上的牌
|
||||
self.game_over = False # 是否游戏结束
|
||||
|
||||
def reset(self):
|
||||
"""
|
||||
初始化游戏状态,包括发牌和分配角色。
|
||||
"""
|
||||
# 洗牌并发牌
|
||||
p1_hand, p2_hand, p3_hand, landlord_cards = self.deck.deal()
|
||||
self.landlord_cards = landlord_cards
|
||||
|
||||
# 创建玩家
|
||||
self.players = [
|
||||
PlayerState(p1_hand, "农民"),
|
||||
PlayerState(p2_hand, "农民"),
|
||||
PlayerState(p3_hand, "地主")
|
||||
]
|
||||
self.landlord_index = 2 # 默认玩家 3 为地主
|
||||
self.current_player_index = 0
|
||||
self.game_over = False
|
||||
|
||||
# 日志输出
|
||||
logger.info("游戏初始化完成")
|
||||
logger.info(f"地主牌: {[card_to_string(card) for card in self.landlord_cards]}")
|
||||
for i, player in enumerate(self.players):
|
||||
logger.info(f"玩家 {i + 1} ({player.role}) 手牌: {player.get_hand_cards_as_strings()}")
|
||||
|
||||
def get_current_player(self):
|
||||
"""获取当前玩家对象"""
|
||||
current_player = self.players[self.current_player_index]
|
||||
return current_player
|
||||
|
||||
def step(self, action):
|
||||
"""
|
||||
执行动作并更新状态
|
||||
:param action: 当前玩家的动作(可以是 'pass' 或一个动作列表)
|
||||
"""
|
||||
current_player = self.get_current_player()
|
||||
|
||||
if action == "pass":
|
||||
self.pass_count += 1
|
||||
|
||||
# 如果所有其他玩家都过牌,允许最后出牌玩家再次出牌
|
||||
if self.pass_count == 2: # 两名玩家连续过牌
|
||||
self.current_player_index = self.last_player
|
||||
self.pass_count = 0 # 重置过牌计数
|
||||
self.current_pile = None # 清空当前牌面
|
||||
else:
|
||||
# 出牌逻辑
|
||||
if not isinstance(action, list):
|
||||
action = [action]
|
||||
|
||||
if not all(card in current_player.hand_cards for card in action):
|
||||
raise ValueError(f"玩家手牌不足以完成此次出牌: {action}")
|
||||
|
||||
if self.current_pile and not self._can_beat(self.current_pile, action):
|
||||
raise ValueError(f"出牌无法打过当前牌面: {action}")
|
||||
|
||||
# 出牌成功
|
||||
self.current_pile = action # 更新当前牌面
|
||||
self.pass_count = 0 # 出牌后重置过牌计数
|
||||
self.last_player = self.current_player_index # 更新最后出牌的玩家
|
||||
|
||||
|
||||
# 从手牌中移除
|
||||
for card in action:
|
||||
current_player.hand_cards.remove(card)
|
||||
current_player.history.append(action)
|
||||
|
||||
# 检查游戏是否结束
|
||||
if not current_player.hand_cards:
|
||||
self.game_over = True
|
||||
logger.info(f"游戏结束!玩家 {self.current_player_index + 1} ({current_player.role}) 获胜")
|
||||
return f"{current_player.role} 胜利!"
|
||||
|
||||
# 切换到下一个玩家
|
||||
self.current_player_index = (self.current_player_index + 1) % 3
|
||||
|
||||
|
||||
def get_action_space(self):
|
||||
"""
|
||||
动态生成当前动作空间。
|
||||
:return: 合法动作的列表
|
||||
"""
|
||||
valid_actions = ["pass"]
|
||||
current_player = self.get_current_player()
|
||||
|
||||
# 遍历玩家手牌,生成所有可能的组合
|
||||
hand_cards = current_player.hand_cards
|
||||
valid_actions.extend(self._generate_valid_combinations(hand_cards))
|
||||
|
||||
return valid_actions
|
||||
|
||||
def _generate_valid_combinations(self, cards):
|
||||
"""
|
||||
根据手牌生成所有合法牌型组合
|
||||
:param cards: 当前玩家的手牌
|
||||
:return: 合法牌型的列表
|
||||
"""
|
||||
# 示例:生成单牌、对子和三张的合法组合
|
||||
from itertools import combinations
|
||||
valid_combinations = []
|
||||
for i in range(1, len(cards) + 1):
|
||||
for combo in combinations(cards, i):
|
||||
if detect_card_type(list(combo)): # 检查是否为合法牌型
|
||||
valid_combinations.append(list(combo))
|
||||
return valid_combinations
|
||||
|
||||
def _can_beat(self, current_pile, action):
|
||||
"""
|
||||
检查当前动作是否能打过牌面上的牌。
|
||||
:param current_pile: 当前牌面上的牌(列表)
|
||||
:param action: 当前玩家要出的牌(列表)
|
||||
:return: True 如果可以打过,否则 False
|
||||
"""
|
||||
current_type = detect_card_type(current_pile)
|
||||
action_type = detect_card_type(action)
|
||||
|
||||
if not current_type or not action_type:
|
||||
return False # 非法牌型
|
||||
|
||||
# 火箭可以压任何牌
|
||||
if action_type == "火箭":
|
||||
return True
|
||||
# 炸弹可以压非炸弹的牌型
|
||||
if action_type == "炸弹" and current_type != "炸弹":
|
||||
return True
|
||||
# 同牌型比较大小
|
||||
if current_type == action_type:
|
||||
return max(action) > max(current_pile)
|
||||
|
||||
return False # 其他情况不合法
|
||||
|
||||
def get_game_state(self):
|
||||
"""
|
||||
返回当前游戏状态,包括玩家手牌、出牌历史和当前玩家。
|
||||
"""
|
||||
state = {
|
||||
"landlord_cards": self.landlord_cards,
|
||||
"players": [
|
||||
{
|
||||
"role": player.role,
|
||||
"hand_cards": player.hand_cards,
|
||||
"history": player.history,
|
||||
}
|
||||
for player in self.players
|
||||
],
|
||||
"current_player_index": self.current_player_index,
|
||||
"game_over": self.game_over,
|
||||
}
|
||||
logger.info("当前游戏状态: ")
|
||||
logger.info(f"游戏是否结束: {self.game_over}")
|
||||
return state
|
||||
|
||||
def is_valid_play(self, cards):
|
||||
"""
|
||||
检查给定的牌是否为合法的斗地主牌型。
|
||||
:param cards: 玩家出的牌(列表)
|
||||
:return: True 如果是合法牌型,否则 False
|
||||
"""
|
||||
if len(cards) == 1:
|
||||
return True # 单牌
|
||||
if len(cards) == 2 and cards[0] == cards[1]:
|
||||
return True # 对子
|
||||
if len(cards) == 3 and cards[0] == cards[1] == cards[2]:
|
||||
return True # 三张
|
||||
if len(cards) == 4:
|
||||
counts = {card: cards.count(card) for card in set(cards)}
|
||||
if 3 in counts.values():
|
||||
return True # 三带一
|
||||
if all(count == 4 for count in counts.values()):
|
||||
return True # 炸弹
|
||||
if len(cards) >= 5:
|
||||
# 顺子
|
||||
sorted_cards = sorted(cards)
|
||||
if all(sorted_cards[i] + 1 == sorted_cards[i + 1] for i in range(len(sorted_cards) - 1)):
|
||||
return True
|
||||
# TODO: 扩展支持更多牌型(如连对、飞机等)
|
||||
return False
|
||||
|
||||
22
src/engine/dizhu/player_state.py
Normal file
22
src/engine/dizhu/player_state.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from collections import deque
|
||||
from src.engine.dizhu.utils import card_to_string
|
||||
|
||||
class PlayerState:
|
||||
def __init__(self, hand_cards, role):
|
||||
self.hand_cards = hand_cards # 玩家手牌
|
||||
self.role = role # "地主" 或 "农民"
|
||||
self.history = deque() # 出牌历史,使用 deque
|
||||
|
||||
def get_hand_cards_as_strings(self):
|
||||
"""
|
||||
获取玩家手牌的具体牌型字符串
|
||||
:return: 手牌字符串列表
|
||||
"""
|
||||
return [card_to_string(card) for card in self.hand_cards]
|
||||
|
||||
def __repr__(self):
|
||||
"""
|
||||
返回玩家的字符串表示,包括手牌和角色
|
||||
"""
|
||||
hand_cards_str = ", ".join(self.get_hand_cards_as_strings())
|
||||
return f"玩家角色: {self.role}, 手牌: [{hand_cards_str}]"
|
||||
32
src/engine/dizhu/scroing.py
Normal file
32
src/engine/dizhu/scroing.py
Normal file
@@ -0,0 +1,32 @@
|
||||
class DouDiZhuScoring:
|
||||
def __init__(self, base_score=1):
|
||||
self.base_score = base_score # 底分
|
||||
self.multiplier = 1 # 倍数
|
||||
self.landlord_win = False # 地主是否胜利
|
||||
|
||||
def apply_event(self, event):
|
||||
"""
|
||||
根据游戏事件调整倍数。
|
||||
:param event: 事件类型,如 "炸弹", "火箭", "春天", "反春天"
|
||||
"""
|
||||
if event in ["炸弹", "火箭", "春天", "反春天"]:
|
||||
self.multiplier *= 2
|
||||
elif event == "抢地主":
|
||||
self.multiplier += 1
|
||||
|
||||
def calculate_score(self, landlord_win):
|
||||
"""
|
||||
计算最终分数。
|
||||
:param landlord_win: 地主是否胜利
|
||||
:return: 地主分数,农民分数
|
||||
"""
|
||||
self.landlord_win = landlord_win
|
||||
if landlord_win:
|
||||
landlord_score = 2 * self.base_score * self.multiplier
|
||||
farmer_score = -self.base_score * self.multiplier
|
||||
else:
|
||||
landlord_score = -2 * self.base_score * self.multiplier
|
||||
farmer_score = self.base_score * self.multiplier
|
||||
|
||||
return landlord_score, farmer_score
|
||||
|
||||
48
src/engine/dizhu/utils.py
Normal file
48
src/engine/dizhu/utils.py
Normal file
@@ -0,0 +1,48 @@
|
||||
|
||||
|
||||
def card_to_string(card_index):
|
||||
"""
|
||||
将牌的索引转换为具体牌型的字符串表示
|
||||
:param card_index: 牌的索引(0-53)
|
||||
:return: 具体牌型字符串
|
||||
"""
|
||||
suits = ['♠️', '♥️', '♦️', '♣️'] # 花色
|
||||
values = ['3', '4', '5', '6', '7', '8', '9', '10', 'J', 'Q', 'K', 'A', '2']
|
||||
|
||||
if card_index < 52:
|
||||
# 普通牌:计算花色和牌面值
|
||||
value = values[card_index // 4]
|
||||
suit = suits[card_index % 4]
|
||||
return f"{suit}{value}"
|
||||
elif card_index == 52:
|
||||
return "小王"
|
||||
elif card_index == 53:
|
||||
return "大王"
|
||||
else:
|
||||
raise ValueError(f"无效的牌索引: {card_index}")
|
||||
|
||||
|
||||
def detect_card_type(cards):
|
||||
"""
|
||||
检测牌型
|
||||
:param cards: 玩家出的牌(列表)
|
||||
:return: 牌型字符串或 None(非法牌型)
|
||||
"""
|
||||
if len(cards) == 1:
|
||||
return "单牌"
|
||||
if len(cards) == 2 and cards[0] == cards[1]:
|
||||
return "对子"
|
||||
if len(cards) == 3 and cards[0] == cards[1] == cards[2]:
|
||||
return "三张"
|
||||
if len(cards) == 4 and cards[0] == cards[1] == cards[2] == cards[3]:
|
||||
return "炸弹"
|
||||
# 三带一
|
||||
if len(cards) == 4:
|
||||
counts = {card: cards.count(card) for card in set(cards)}
|
||||
if 3 in counts.values():
|
||||
return "三带一"
|
||||
# 顺子
|
||||
if len(cards) >= 5 and all(cards[i] + 1 == cards[i + 1] for i in range(len(cards) - 1)):
|
||||
return "顺子"
|
||||
# TODO: 实现其他牌型判断(如连对、飞机等)
|
||||
return None
|
||||
@@ -1,49 +0,0 @@
|
||||
from src.engine.mahjong_tile import MahjongTile
|
||||
from collections import defaultdict
|
||||
|
||||
class Hand:
|
||||
def __init__(self):
|
||||
# 存储所有的 MahjongTile 对象
|
||||
self.tiles = []
|
||||
# 存储每种牌的数量,键为 MahjongTile 对象,值为数量
|
||||
self.tile_count = defaultdict(int)
|
||||
|
||||
def add_tile(self, tile):
|
||||
""" 向手牌中添加一张牌 """
|
||||
if not isinstance(tile, MahjongTile):
|
||||
raise ValueError("必须添加 MahjongTile 类型的牌")
|
||||
self.tiles.append(tile) # 将牌添加到手牌中
|
||||
self.tile_count[tile] += 1 # 增加牌的数量
|
||||
|
||||
def remove_tile(self, tile):
|
||||
""" 从手牌中移除一张牌 """
|
||||
if not isinstance(tile, MahjongTile):
|
||||
raise ValueError("必须移除 MahjongTile 类型的牌")
|
||||
if self.tile_count[tile] > 0:
|
||||
self.tiles.remove(tile)
|
||||
self.tile_count[tile] -= 1
|
||||
else:
|
||||
raise ValueError(f"手牌中没有该牌: {tile}")
|
||||
|
||||
def get_tile_count(self, tile):
|
||||
""" 获取手牌中某张牌的数量 """
|
||||
if not isinstance(tile, MahjongTile):
|
||||
raise ValueError("必须是 MahjongTile 类型的牌")
|
||||
return self.tile_count[tile]
|
||||
|
||||
def can_peng(self, tile):
|
||||
""" 判断是否可以碰(即是否已经有2张相同的牌,摸一张牌后可以碰) """
|
||||
if not isinstance(tile, MahjongTile):
|
||||
raise ValueError("必须是 MahjongTile 类型的牌")
|
||||
return self.tile_count[tile] == 2 # 摸一张牌后总数为 3 张,才可以碰
|
||||
|
||||
def can_gang(self, tile):
|
||||
""" 判断是否可以杠(即是否已经有3张相同的牌,摸一张牌后可以杠) """
|
||||
if not isinstance(tile, MahjongTile):
|
||||
raise ValueError("必须是 MahjongTile 类型的牌")
|
||||
return self.tile_count[tile] == 4 # 摸一张牌后总数为 4 张,才可以杠
|
||||
|
||||
def __repr__(self):
|
||||
""" 返回手牌的字符串表示 """
|
||||
tiles_str = ", ".join(str(tile) for tile in self.tiles)
|
||||
return f"手牌: [{tiles_str}], 牌的数量: {dict(self.tile_count)}"
|
||||
494
src/engine/mahjong/actions.py
Normal file
494
src/engine/mahjong/actions.py
Normal file
@@ -0,0 +1,494 @@
|
||||
import random as random_module
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from src.engine.mahjong.calculate_fan import calculate_fan
|
||||
from src.engine.mahjong.mahjong_tile import MahjongTile
|
||||
from src.engine.mahjong.meld import Meld
|
||||
|
||||
|
||||
def draw_tile(engine):
|
||||
"""
|
||||
当前玩家摸牌逻辑,按照顺序从牌堆顶部摸牌,并记录牌的详细信息和游戏状态。
|
||||
"""
|
||||
# 检查牌堆是否已空
|
||||
if engine.state.remaining_tiles == 0:
|
||||
logger.warning("牌堆已空,游戏结束!")
|
||||
engine.game_over = True
|
||||
return None, True # 游戏结束时返回 None 和 done = True
|
||||
|
||||
# 当前玩家
|
||||
current_player = engine.state.current_player
|
||||
|
||||
# 从牌堆顶部摸一张牌
|
||||
tile = engine.state.deck.pop(0) # 按顺序从牌堆取出一张牌
|
||||
engine.state.remaining_tiles -= 1 # 更新剩余牌数
|
||||
engine.state.hands[current_player].add_tile(tile) # 将牌对象加入当前玩家手牌
|
||||
|
||||
# 获取牌名
|
||||
tile_name = str(tile) # 调用 MahjongTile 的 __repr__ 方法
|
||||
logger.info(
|
||||
f"玩家 {current_player} 摸到一张牌: {tile_name}。"
|
||||
f"剩余牌堆数量: {engine.state.remaining_tiles}"
|
||||
)
|
||||
|
||||
# 检查摸到的牌是否属于缺门
|
||||
missing_suit = engine.state.missing_suits[current_player]
|
||||
if tile.suit == missing_suit:
|
||||
logger.info(f"玩家 {current_player} 摸到缺门牌: {tile_name},需要优先打出")
|
||||
|
||||
# 返回摸到的牌和游戏是否结束的标志
|
||||
return tile, False # 返回摸到的牌对象和游戏继续的标志
|
||||
|
||||
|
||||
def discard_tile(self, tile):
|
||||
"""
|
||||
当前玩家打牌逻辑,记录打出的牌和当前牌河信息。
|
||||
"""
|
||||
current_player = self.state.current_player
|
||||
hand = self.state.hands[current_player]
|
||||
|
||||
# 检查牌的有效性
|
||||
if not isinstance(tile, MahjongTile):
|
||||
logger.error(f"玩家 {current_player} 尝试打出无效的牌: {tile}")
|
||||
raise ValueError("打出的牌必须是 MahjongTile 对象")
|
||||
|
||||
# 检查是否有这张牌
|
||||
if hand.tile_count[tile] == 0:
|
||||
logger.error(f"玩家 {current_player} 尝试打出不存在的牌: {tile}")
|
||||
raise ValueError("玩家没有这张牌")
|
||||
|
||||
# 检查缺门规则
|
||||
missing_suit = self.state.missing_suits[current_player]
|
||||
if tile.suit == missing_suit and any(t.suit == missing_suit for t in hand.tiles):
|
||||
logger.error(f"玩家 {current_player} 仍有缺门的牌: {tile}")
|
||||
raise ValueError("必须先打完缺门花色的牌")
|
||||
|
||||
# 从手牌中移除
|
||||
hand.remove_tile(tile)
|
||||
self.state.discards[current_player].append(tile)
|
||||
|
||||
# 打出的牌名
|
||||
tile_name = str(tile)
|
||||
|
||||
# 打出牌后打印状态
|
||||
logger.info(
|
||||
f"玩家 {current_player} 打出一张牌: {tile_name}。"
|
||||
f"当前牌河: {[str(t) for t in self.state.discards[current_player]]}"
|
||||
)
|
||||
|
||||
# 检查是否触发其他玩家的操作(碰、杠、胡牌)
|
||||
self.check_other_players(tile)
|
||||
|
||||
return tile
|
||||
|
||||
|
||||
def peng(self, tile):
|
||||
"""
|
||||
当前玩家碰牌逻辑,记录碰牌操作和手牌状态。
|
||||
"""
|
||||
player = self.state.current_player
|
||||
hand = self.state.hands[player]
|
||||
|
||||
if hand.tile_count[tile] < 2:
|
||||
logger.error(f"玩家 {player} 尝试碰牌失败: {tile}")
|
||||
raise ValueError("碰牌条件不满足")
|
||||
|
||||
# 从手牌中移除两张牌
|
||||
hand.tile_count[tile] -= 2
|
||||
self.state.melds[player].append(("碰", tile)) # 加入明牌列表
|
||||
|
||||
logger.info(f"玩家 {player} 碰了一张牌: {tile}。当前明牌: {self.state.melds[player]}")
|
||||
|
||||
|
||||
def gang(self, tile, mode):
|
||||
"""
|
||||
当前玩家杠牌逻辑,记录杠牌类型和状态更新。
|
||||
"""
|
||||
player = self.state.current_player
|
||||
|
||||
# 检查牌的有效性
|
||||
if not isinstance(tile, MahjongTile):
|
||||
logger.error(f"玩家 {player} 尝试杠牌时提供了无效的牌: {tile}")
|
||||
raise ValueError("杠的牌必须是 MahjongTile 对象")
|
||||
|
||||
tile_name = str(tile) # 使用 MahjongTile 的 __repr__ 方法
|
||||
|
||||
if mode == "ming":
|
||||
# 明杠逻辑
|
||||
if self.state.hands[player].tile_count[tile] >= 3:
|
||||
self.state.hands[player].tile_count[tile] -= 3 # 移除三张牌
|
||||
self.state.melds[player].append(("ming_gang", tile)) # 添加到明牌
|
||||
logger.info(f"玩家 {player} 明杠成功: {tile_name}")
|
||||
self.state.scores[player] += 1 # 明杠奖励1分
|
||||
logger.info(f"玩家 {player} 因明杠获得1分,当前得分: {self.state.scores[player]}")
|
||||
else:
|
||||
logger.error(f"玩家 {player} 明杠失败: 手中牌数量不足")
|
||||
raise ValueError("明杠条件不满足,需要至少三张相同的牌")
|
||||
|
||||
elif mode == "an":
|
||||
# 暗杠逻辑
|
||||
if self.state.hands[player].tile_count[tile] >= 4:
|
||||
self.state.hands[player].tile_count[tile] -= 4 # 移除四张牌
|
||||
self.state.melds[player].append(("an_gang", tile)) # 添加到明牌
|
||||
logger.info(f"玩家 {player} 暗杠成功: {tile_name}")
|
||||
self.state.scores[player] += 1 # 暗杠奖励1分
|
||||
logger.info(f"玩家 {player} 因暗杠获得1分,当前得分: {self.state.scores[player]}")
|
||||
else:
|
||||
logger.error(f"玩家 {player} 暗杠失败: 手中牌数量不足")
|
||||
raise ValueError("暗杠条件不满足,需要至少四张相同的牌")
|
||||
else:
|
||||
logger.error(f"玩家 {player} 提供了无效的杠牌类型: {mode}")
|
||||
raise ValueError("无效的杠牌类型,仅支持 'ming'、'an' 或 'bu'")
|
||||
|
||||
|
||||
def check_blood_battle(self):
|
||||
"""
|
||||
检查游戏是否流局或血战结束,记录状态。
|
||||
"""
|
||||
if any(score <= 0 for score in self.state.scores):
|
||||
logger.info(f"游戏结束,某玩家分数小于等于0: {self.state.scores}")
|
||||
self.game_over = True
|
||||
|
||||
if len(self.state.winners) >= 3 or self.state.remaining_tiles == 0:
|
||||
logger.info(f"游戏结束,赢家列表: {self.state.winners}")
|
||||
self.game_over = True
|
||||
|
||||
|
||||
def set_missing_suit(player, game_state):
|
||||
"""
|
||||
玩家自动根据手牌选择缺门。
|
||||
|
||||
参数:
|
||||
- player: 玩家索引(0-3)。
|
||||
- game_state: 当前的游戏状态(`ChengduMahjongState` 实例)。
|
||||
|
||||
返回:
|
||||
- str: 玩家设置的缺门花色。
|
||||
"""
|
||||
valid_suits = ["条", "筒", "万"]
|
||||
hand = game_state.hands[player] # 获取玩家手牌
|
||||
|
||||
# 统计每种花色的牌数量
|
||||
suit_counts = {suit: 0 for suit in valid_suits}
|
||||
for tile in hand.tiles:
|
||||
suit_counts[tile.suit] += 1
|
||||
|
||||
# 找到数量最少的花色
|
||||
missing_suit = min(suit_counts, key=suit_counts.get)
|
||||
|
||||
# 检查是否已经设置过缺门
|
||||
if game_state.missing_suits[player] is not None:
|
||||
logger.warning(f"玩家 {player} 已设置过缺门,不能重复设置")
|
||||
raise ValueError("缺门已经设置,不能重复设置")
|
||||
|
||||
# 设置缺门并记录日志
|
||||
game_state.missing_suits[player] = missing_suit
|
||||
logger.info(
|
||||
f"玩家 {player} 手牌花色分布: {suit_counts}。缺门设置为: {missing_suit}"
|
||||
)
|
||||
|
||||
return missing_suit
|
||||
|
||||
|
||||
def check_other_players(self, tile):
|
||||
"""
|
||||
检查其他玩家是否可以对打出的牌进行操作(如胡牌、杠、碰)。
|
||||
优先级为:胡牌 > 杠牌 > 碰牌。
|
||||
如果有玩家选择操作,修改游戏状态和出牌顺序。
|
||||
"""
|
||||
current_player = self.state.current_player
|
||||
actions_taken = False
|
||||
|
||||
for player in range(4):
|
||||
if player == current_player:
|
||||
continue
|
||||
|
||||
# 优先检查胡牌
|
||||
if self.state.can_win(self.state.hands[player], self.state.melds[player], self.state.missing_suits[player]):
|
||||
logger.info(f"玩家 {player} 可以胡玩家 {current_player} 的牌: {tile}")
|
||||
handle_win(player, current_player, tile)
|
||||
actions_taken = True
|
||||
break # 胡牌后结束
|
||||
|
||||
# 检查是否可以杠牌
|
||||
if self.state.hands[player].tile_count[tile] >= 3:
|
||||
logger.info(f"玩家 {player} 可以杠玩家 {current_player} 的牌: {tile}")
|
||||
if handle_gang(self,player, tile, mode="ming"): # 执行明杠逻辑
|
||||
actions_taken = True
|
||||
break # 杠牌后不检查其他玩家
|
||||
|
||||
# 检查是否可以碰牌
|
||||
if self.state.hands[player].tile_count[tile] >= 2:
|
||||
logger.info(f"玩家 {player} 可以碰玩家 {current_player} 的牌: {tile}")
|
||||
if handle_peng(self,player, tile): # 执行碰牌逻辑
|
||||
actions_taken = True
|
||||
break # 碰牌后不检查其他玩家
|
||||
|
||||
if not actions_taken:
|
||||
logger.info(f"玩家 {current_player} 打出的牌 {tile} 没有触发其他玩家的操作")
|
||||
return actions_taken
|
||||
|
||||
|
||||
|
||||
def handle_peng(self, player, tile):
|
||||
"""
|
||||
处理玩家碰牌逻辑并更新出牌顺序。
|
||||
"""
|
||||
if not isinstance(tile, MahjongTile):
|
||||
logger.error(f"tile 必须是 MahjongTile 类型,但收到的是: {type(tile)}")
|
||||
return False
|
||||
|
||||
if self.state.hands[player].tile_count[tile] < 2:
|
||||
logger.error(f"玩家 {player} 无法碰牌: {tile}")
|
||||
return False
|
||||
|
||||
# 减少手牌中的牌数量
|
||||
self.state.hands[player].remove_tile(tile) # 移除第一张
|
||||
self.state.hands[player].remove_tile(tile) # 移除第二张
|
||||
|
||||
# 添加到明牌区
|
||||
self.state.melds[player].append(Meld(tile, "碰")) # 使用 Meld 类表示明牌
|
||||
|
||||
logger.info(f"玩家 {player} 碰了牌: {tile}。当前明牌: {self.state.melds[player]}")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def get_player_discard_choice(self, player):
|
||||
"""
|
||||
模拟获取玩家打牌的选择。
|
||||
在实际项目中,使用 GUI 或 AI 决策替代。
|
||||
"""
|
||||
hand_tiles = self.state.hands[player].tiles
|
||||
logger.info(f"玩家 {player} 当前手牌: {[str(tile) for tile in hand_tiles]}")
|
||||
|
||||
# 模拟玩家选择打出第一张非碰牌
|
||||
chosen_tile = hand_tiles[0] # 在真实项目中替换为实际用户输入或AI决策
|
||||
logger.info(f"玩家 {player} 选择打出牌: {chosen_tile}")
|
||||
return chosen_tile
|
||||
|
||||
|
||||
def handle_gang(self, player, tile, mode):
|
||||
"""
|
||||
处理玩家杠牌逻辑、计算分数并更新状态。
|
||||
:param player: 杠牌玩家索引
|
||||
:param tile: 杠牌的那张牌
|
||||
:param mode: 杠牌的类型 ("ming" 或 "an")
|
||||
:return: True 如果操作成功,否则 False
|
||||
"""
|
||||
if not isinstance(tile, MahjongTile):
|
||||
logger.error(f"玩家 {player} 杠牌的牌无效: {tile}")
|
||||
return False
|
||||
|
||||
base_score = self.state.bottom_score # 底分
|
||||
|
||||
if mode == "ming": # 明杠逻辑
|
||||
if self.state.hands[player].tile_count[tile] < 3:
|
||||
logger.error(f"玩家 {player} 无法明杠: {tile}")
|
||||
return False
|
||||
|
||||
# 更新状态
|
||||
self.update_meld(player, tile, "杠", count=3)
|
||||
|
||||
# 明杠分数计算
|
||||
gang_score = base_score * 2
|
||||
for i in range(4):
|
||||
if i != player:
|
||||
self.state.scores[i] -= gang_score
|
||||
self.state.scores[player] += gang_score * 3
|
||||
|
||||
logger.info(f"玩家 {player} 明杠,总分变化: +{gang_score * 3},其他玩家每人扣分: -{gang_score}")
|
||||
return True
|
||||
|
||||
elif mode == "an": # 暗杠逻辑
|
||||
if self.state.hands[player].tile_count[tile] < 4:
|
||||
logger.error(f"玩家 {player} 无法暗杠: {tile}")
|
||||
return False
|
||||
|
||||
# 更新状态
|
||||
update_meld(player, tile, "杠", count=4)
|
||||
|
||||
# 暗杠分数计算
|
||||
gang_score = base_score * 4
|
||||
for i in range(4):
|
||||
if i != player:
|
||||
self.state.scores[i] -= gang_score
|
||||
self.state.scores[player] += gang_score * 3
|
||||
|
||||
logger.info(f"玩家 {player} 暗杠,总分变化: +{gang_score * 3},其他玩家每人扣分: -{gang_score}")
|
||||
return True
|
||||
|
||||
def update_meld(self, player, tile, meld_type, count):
|
||||
"""
|
||||
更新玩家的明牌状态,并移除相应的牌。
|
||||
"""
|
||||
self.state.hands[player].tile_count[tile] -= count
|
||||
self.state.melds[player].append((meld_type, tile))
|
||||
logger.info(f"玩家 {player} 更新明牌: {meld_type} {tile},当前明牌: {self.state.melds[player]}")
|
||||
|
||||
|
||||
|
||||
def random_discard_tile(engine):
|
||||
"""
|
||||
当前玩家随机选择一张牌打出,优先打缺门牌。
|
||||
"""
|
||||
current_player = engine.state.current_player
|
||||
hand = engine.state.hands[current_player].tiles # 当前玩家的手牌
|
||||
missing_suit = engine.state.missing_suits[current_player] # 当前玩家的缺门
|
||||
|
||||
if not hand:
|
||||
logger.error(f"玩家 {current_player} 的手牌为空,无法打牌")
|
||||
return
|
||||
|
||||
# 使用改进后的 random_choice 函数选择牌
|
||||
tile = random_choice(hand, missing_suit)
|
||||
|
||||
# 从手牌中移除该牌
|
||||
engine.state.hands[current_player].remove_tile(tile)
|
||||
engine.state.discards[current_player].append(tile)
|
||||
|
||||
logger.info(
|
||||
f"玩家 {current_player} 打出了一张牌: {tile}。"
|
||||
f"当前牌河: {[str(t) for t in engine.state.discards[current_player]]}"
|
||||
)
|
||||
|
||||
# 检查其他玩家是否可以操作
|
||||
engine.check_other_players(tile)
|
||||
|
||||
# 切换到下一个玩家
|
||||
next_player = (current_player + 1) % 4
|
||||
engine.state.current_player = next_player
|
||||
|
||||
|
||||
def random_choice(hand, missing_suit):
|
||||
"""
|
||||
根据缺门优先随机选择打出的牌。
|
||||
"""
|
||||
if not hand:
|
||||
raise ValueError("手牌不能为空")
|
||||
|
||||
# 筛选出缺门的牌
|
||||
missing_suit_tiles = [tile for tile in hand if tile.suit == missing_suit]
|
||||
|
||||
# 如果缺门牌存在,优先从缺门牌中随机选择
|
||||
if missing_suit_tiles:
|
||||
index = random_module.randint(0, len(missing_suit_tiles) - 1)
|
||||
return missing_suit_tiles[index]
|
||||
|
||||
# 如果缺门牌不存在,从剩余牌中随机选择
|
||||
index = random_module.randint(0, len(hand) - 1)
|
||||
return hand[index]
|
||||
|
||||
|
||||
def should_gang(ai_player, state, gang_type):
|
||||
"""
|
||||
判断 AI 是否选择杠牌
|
||||
:param ai_player: 当前玩家索引
|
||||
:param state: 游戏状态对象
|
||||
:param gang_type: 杠牌类型(暗杠或明杠)
|
||||
:return: True 表示杠,False 表示不杠
|
||||
"""
|
||||
# 获取当前玩家分数
|
||||
current_score = state.scores[ai_player]
|
||||
# 获取当前玩家手牌
|
||||
hand = state.hands[ai_player]
|
||||
# 获取局势信息
|
||||
draw_counts = state.draw_counts
|
||||
remaining_tiles = state.remaining_tiles
|
||||
|
||||
# 基础策略:如果当前分数远低于平均分,优先杠
|
||||
average_score = sum(state.scores) / len(state.scores)
|
||||
if current_score < average_score * 0.8:
|
||||
return True
|
||||
|
||||
# 检查听牌状态:如果杠后听牌,优先杠
|
||||
# if is_ready_to_win(hand, state.melds[ai_player], state.missing_suits[ai_player]):
|
||||
# return True
|
||||
|
||||
# 根据杠牌类型调整策略
|
||||
if gang_type == "暗杠":
|
||||
# 暗杠通常安全,偏向杠
|
||||
return True
|
||||
elif gang_type == "明杠":
|
||||
# 明杠可能被针对,后期优先考虑杠
|
||||
return remaining_tiles < 30 # 剩余牌少于 30 张时偏向杠
|
||||
|
||||
# 默认不杠
|
||||
return False
|
||||
|
||||
|
||||
def select_discard_tile(self, player):
|
||||
"""
|
||||
选择要打出的牌(此处为简单示例,可接入 AI 策略)。
|
||||
"""
|
||||
hand = self.state.hands[player]
|
||||
# 优先打出缺门牌
|
||||
for tile in hand.tiles:
|
||||
if tile.suit == self.state.missing_suits[player]:
|
||||
return tile
|
||||
# 如果没有缺门牌,随机打出一张
|
||||
return hand.tiles[0]
|
||||
|
||||
|
||||
|
||||
def handle_win(self, player, current_player, tile):
|
||||
"""
|
||||
处理胡牌逻辑,包括动态计算番数和分数。
|
||||
:param player: 胡牌玩家索引
|
||||
:param current_player: 打出牌的玩家索引(若自摸为 None)
|
||||
:param tile: 胡牌的那张牌
|
||||
"""
|
||||
logger.info(f"玩家 {player} 胡牌!胡的牌是: {tile if tile else '自摸'}")
|
||||
|
||||
# 判断是否地胡
|
||||
is_dihu = self.state.draw_counts[player] == 0 and player != self.state.current_player
|
||||
is_self_draw = current_player is None
|
||||
|
||||
# 动态计算番数
|
||||
fan_count = calculate_fan(
|
||||
hand=self.state.hands[player],
|
||||
melds=self.state.melds[player],
|
||||
is_self_draw=is_self_draw,
|
||||
winning_tile=tile,
|
||||
conditions={"is_tian_hu": False, "is_di_hu": is_dihu}
|
||||
)
|
||||
|
||||
# 分数计算
|
||||
base_score = self.state.bottom_score
|
||||
win_score = base_score * (2 ** fan_count)
|
||||
|
||||
if is_self_draw:
|
||||
# 自摸结算
|
||||
for i in range(4):
|
||||
if i != player:
|
||||
self.state.scores[i] -= win_score
|
||||
self.state.scores[player] += win_score * 3
|
||||
else:
|
||||
# 点炮结算
|
||||
self.state.scores[player] += win_score * 3
|
||||
self.state.scores[current_player] -= win_score
|
||||
|
||||
# 更新赢家状态
|
||||
self.state.winners.append(player)
|
||||
self.state.print_game_state(player)
|
||||
|
||||
# 输出日志
|
||||
logger.info(f"玩家 {player} 胡牌类型: {'地胡' if is_dihu else '普通胡牌'}")
|
||||
logger.info(f"玩家 {player} 总番数: {fan_count}")
|
||||
logger.info(f"玩家 {current_player if current_player is not None else '所有其他玩家'} 扣分: {win_score}")
|
||||
logger.info(f"玩家 {player} 加分: {win_score * 3}")
|
||||
logger.info(f"当前分数: {self.state.scores}")
|
||||
|
||||
|
||||
def update_meld(self, player, tile, meld_type, count):
|
||||
"""
|
||||
更新玩家的明牌状态,并移除相应的牌。
|
||||
"""
|
||||
self.state.hands[player].tile_count[tile] -= count
|
||||
self.state.melds[player].append((meld_type, tile))
|
||||
logger.info(f"玩家 {player} 更新明牌: {meld_type} {tile},当前明牌: {self.state.melds[player]}")
|
||||
44
src/engine/mahjong/calculate_fan.py
Normal file
44
src/engine/mahjong/calculate_fan.py
Normal file
@@ -0,0 +1,44 @@
|
||||
from src.engine.mahjong.fan_type import is_terminal_fan,is_cleared,is_full_request,is_seven_pairs,is_basic_win,is_dragon_seven_pairs
|
||||
from loguru import logger
|
||||
|
||||
def calculate_fan(hand, melds, is_self_draw, winning_tile, conditions):
|
||||
"""
|
||||
动态计算番数,根据现有的番型规则。
|
||||
|
||||
参数:
|
||||
- hand: 当前胡牌的手牌(Hand 对象)。
|
||||
- melds: 玩家已明牌的列表(Meld 对象列表)。
|
||||
- is_self_draw: 是否自摸。
|
||||
- winning_tile: 胡的那张牌(MahjongTile 对象)。
|
||||
- conditions: 字典,包含特殊胡牌条件,如 {"is_tian_hu": True}。
|
||||
|
||||
返回:
|
||||
- int: 最大番数。
|
||||
"""
|
||||
fan = 0 # 初始化番数
|
||||
|
||||
# 定义番型规则(按优先级从高到低排序)
|
||||
rules = [
|
||||
("tian_hu", lambda: 12 if conditions.get("is_tian_hu", False) else 0), # 天胡
|
||||
("di_hu", lambda: 12 if conditions.get("is_di_hu", False) else 0), # 地胡
|
||||
("dragon_seven_pairs", lambda: is_dragon_seven_pairs(hand, melds)[0]), # 龙七对
|
||||
("seven_pairs", lambda: is_seven_pairs(hand)), # 七对
|
||||
("full_request", lambda: is_full_request(hand, melds, winning_tile)), # 全求人
|
||||
("cleared", lambda: is_cleared(hand, melds)), # 清一色
|
||||
("terminal_fan", lambda: is_terminal_fan(hand, melds)), # 带幺九
|
||||
("plain_win", lambda: is_basic_win(hand)), # 平胡
|
||||
]
|
||||
|
||||
# 逐一应用规则,取最大番数
|
||||
for rule, func in rules:
|
||||
current_fan = func()
|
||||
if current_fan > fan:
|
||||
fan = current_fan
|
||||
logger.debug(f"应用番型规则 {rule}: {current_fan} 番")
|
||||
|
||||
# 特殊条件(例如自摸加番)
|
||||
if is_self_draw:
|
||||
fan += 1 # 自摸额外加 1 番
|
||||
logger.debug("自摸加 1 番")
|
||||
|
||||
return fan
|
||||
151
src/engine/mahjong/chengdu_mahjong_engine.py
Normal file
151
src/engine/mahjong/chengdu_mahjong_engine.py
Normal file
@@ -0,0 +1,151 @@
|
||||
import random
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from configs.log_config import setup_logging
|
||||
from src.engine.mahjong.actions import draw_tile, random_choice, handle_win, handle_gang, handle_peng
|
||||
from src.engine.mahjong.actions import set_missing_suit
|
||||
from src.engine.mahjong.chengdu_mahjong_state import ChengduMahjongState
|
||||
|
||||
|
||||
class ChengduMahjongEngine:
|
||||
def __init__(self):
|
||||
self.state = ChengduMahjongState()
|
||||
self.game_over = False
|
||||
self.game_started = False
|
||||
self.current_player = 0
|
||||
|
||||
def initialize_game(self):
|
||||
"""
|
||||
初始化游戏,确定庄家,发牌并设置缺门。
|
||||
"""
|
||||
logger.info("游戏初始化...")
|
||||
setup_logging()
|
||||
# 确定庄家(掷骰子)
|
||||
self.state.current_player = random.randint(0, 3)
|
||||
logger.info(f"庄家确定为玩家: {self.state.current_player}")
|
||||
|
||||
logger.info("游戏初始化完成,准备开始!")
|
||||
|
||||
def deal_tiles(self):
|
||||
""" 发牌:庄家摸14张,其余玩家摸13张 """
|
||||
logger.info("开始发牌...")
|
||||
random.shuffle(self.state.deck) # 洗牌
|
||||
|
||||
for player in range(4):
|
||||
tiles_to_draw = 14 if player == self.state.current_player else 13
|
||||
for _ in range(tiles_to_draw):
|
||||
tile = self.state.deck.pop()
|
||||
self.state.hands[player].add_tile(tile)
|
||||
|
||||
# 自动设置缺门
|
||||
set_missing_suit(player, self.state)
|
||||
|
||||
# 记录发牌后的牌堆数量
|
||||
self.state.remaining_tiles = len(self.state.deck)
|
||||
logger.info(f"发牌结束并完成缺门设置!当前牌堆剩余数量: {self.state.remaining_tiles}")
|
||||
|
||||
def play_turn(self):
|
||||
current_player = self.state.current_player
|
||||
|
||||
# 玩家摸牌逻辑
|
||||
draw_tile(self)
|
||||
|
||||
# 玩家选择一张牌打出
|
||||
tile = random_choice(self.state.hands[current_player], self.state.missing_suits[current_player])
|
||||
logger.info(f"玩家 {current_player} 选择打牌: {tile}")
|
||||
|
||||
# 检查其他玩家是否可以对该牌进行操作
|
||||
actions_taken = self.check_other_players(tile)
|
||||
|
||||
if not actions_taken:
|
||||
# 将牌加入弃牌堆
|
||||
self.state.discards[current_player].append(tile)
|
||||
logger.info(f"玩家 {current_player} 打出的牌 {tile} 没有触发其他玩家的操作")
|
||||
|
||||
# 切换到下一位玩家
|
||||
self.state.current_player = (current_player + 1) % 4
|
||||
logger.info(f"轮到玩家 {self.state.current_player} 出牌")
|
||||
|
||||
# 检查游戏结束条件
|
||||
self.check_game_over()
|
||||
|
||||
def check_game_over(self):
|
||||
"""
|
||||
检查游戏是否结束。
|
||||
游戏结束条件:
|
||||
1. 牌堆已空。
|
||||
2. 赢的玩家数量 >= 3。
|
||||
"""
|
||||
# 检查是否已无牌可摸
|
||||
if self.state.remaining_tiles == 0:
|
||||
self.game_over = True
|
||||
logger.info("游戏结束:牌堆已空")
|
||||
return
|
||||
|
||||
# 检查是否满足血战结束条件:赢家数量 >= 3
|
||||
if len(self.state.winners) >= 3:
|
||||
self.game_over = True
|
||||
logger.info(f"游戏结束:赢家数量达到 {len(self.state.winners)} 人")
|
||||
return
|
||||
|
||||
# 如果没有触发结束条件,继续游戏
|
||||
logger.info(f"当前赢家数量: {len(self.state.winners)},游戏继续")
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
运行游戏主循环。
|
||||
"""
|
||||
self.initialize_game()
|
||||
self.game_started = True
|
||||
|
||||
while not self.game_over:
|
||||
self.play_turn()
|
||||
|
||||
logger.info("游戏已结束")
|
||||
|
||||
def check_other_players(self, tile):
|
||||
"""
|
||||
检查其他玩家是否可以对打出的牌进行操作(如胡牌、杠、碰)。
|
||||
优先级为:胡牌 > 杠牌 > 碰牌。
|
||||
如果有玩家选择操作,修改游戏状态和出牌顺序。
|
||||
"""
|
||||
current_player = self.state.current_player
|
||||
actions_taken = False
|
||||
|
||||
for player in range(4):
|
||||
if player == current_player:
|
||||
continue
|
||||
|
||||
# 优先检查胡牌
|
||||
if self.state.can_win(self.state.hands[player], self.state.melds[player], self.state.missing_suits[player]):
|
||||
logger.info(f"玩家 {player} 可以胡玩家 {current_player} 的牌: {tile}")
|
||||
handle_win(player, current_player, tile)
|
||||
actions_taken = True
|
||||
break # 胡牌后结束
|
||||
|
||||
# 检查是否可以杠牌
|
||||
if self.state.hands[player].tile_count[tile] >= 3:
|
||||
logger.info(f"玩家 {player} 可以杠玩家 {current_player} 的牌: {tile}")
|
||||
if handle_gang(self, player, tile, mode="ming"): # 执行明杠逻辑
|
||||
actions_taken = True
|
||||
break # 杠牌后不检查其他玩家
|
||||
|
||||
# 检查是否可以碰牌
|
||||
if self.state.hands[player].tile_count[tile] >= 2:
|
||||
logger.info(f"玩家 {player} 可以碰玩家 {current_player} 的牌: {tile}")
|
||||
if handle_peng(self, player, tile): # 执行碰牌逻辑
|
||||
actions_taken = True
|
||||
break # 碰牌后不检查其他玩家
|
||||
|
||||
if not actions_taken:
|
||||
logger.info(f"玩家 {current_player} 打出的牌 {tile} 没有触发其他玩家的操作")
|
||||
return actions_taken
|
||||
|
||||
def update_meld(self, player, tile, meld_type, count):
|
||||
"""
|
||||
更新玩家的明牌状态,并移除相应的牌。
|
||||
"""
|
||||
self.state.hands[player].tile_count[tile] -= count
|
||||
self.state.melds[player].append((meld_type, tile))
|
||||
logger.info(f"玩家 {player} 更新明牌: {meld_type} {tile},当前明牌: {self.state.melds[player]}")
|
||||
@@ -1,7 +1,7 @@
|
||||
from collections import Counter
|
||||
from src.engine.hand import Hand
|
||||
from src.engine.mahjong_tile import MahjongTile
|
||||
from src.engine.meld import Meld
|
||||
from src.engine.mahjong.hand import Hand
|
||||
from src.engine.mahjong.mahjong_tile import MahjongTile
|
||||
from src.engine.mahjong.meld import Meld
|
||||
from loguru import logger
|
||||
|
||||
|
||||
@@ -17,12 +17,16 @@ class ChengduMahjongState:
|
||||
self.deck = [MahjongTile(suit, value) for suit in ["条", "筒", "万"] for value in range(1, 10)] * 4 # 108张牌
|
||||
# 当前玩家索引
|
||||
self.current_player = 0
|
||||
# 底分
|
||||
self.bottom_score = 1
|
||||
# 玩家分数
|
||||
self.scores = [100, 100, 100, 100]
|
||||
# 剩余牌数量
|
||||
self.remaining_tiles = len(self.deck)
|
||||
# 胜利玩家列表
|
||||
self.winners = []
|
||||
# 记录每个玩家的抓牌次数
|
||||
self.draw_counts = [0] * 4
|
||||
# 缺门信息
|
||||
self.missing_suits = [None] * 4 # 每个玩家的缺门("条"、"筒" 或 "万")
|
||||
|
||||
@@ -113,7 +117,7 @@ class ChengduMahjongState:
|
||||
# **第一步:检查花色限制**
|
||||
suits = {tile.suit for tile in hand.tiles}
|
||||
if len(suits) > 2:
|
||||
logger.info("花色超过两种,不能胡牌")
|
||||
# logger.info("花色超过两种,不能胡牌")
|
||||
return False # 花色超过两种,不能胡牌
|
||||
|
||||
# 检查是否打完缺门的花色
|
||||
@@ -1,4 +1,4 @@
|
||||
from src.engine.utils import try_win,is_terminal_tile
|
||||
from src.engine.mahjong.utils import try_win
|
||||
from collections import Counter
|
||||
|
||||
def is_basic_win(hand):
|
||||
@@ -40,7 +40,7 @@ def is_cleared(hand, melds):
|
||||
return 0
|
||||
|
||||
|
||||
def calculate_terminal_fan(hand, melds):
|
||||
def is_terminal_fan(hand, melds):
|
||||
"""
|
||||
计算带幺九番型,并返回对应番数。
|
||||
"""
|
||||
@@ -163,3 +163,5 @@ def is_dragon_seven_pairs(hand, melds):
|
||||
return 12, -1 # 龙七对计为 12 番,并减少 1 根
|
||||
return 0, 0
|
||||
|
||||
|
||||
|
||||
89
src/engine/mahjong/hand.py
Normal file
89
src/engine/mahjong/hand.py
Normal file
@@ -0,0 +1,89 @@
|
||||
from src.engine.mahjong.mahjong_tile import MahjongTile
|
||||
from collections import defaultdict
|
||||
|
||||
class Hand:
|
||||
def __init__(self):
|
||||
# 存储所有的 MahjongTile 对象
|
||||
self.tiles = []
|
||||
# 存储每种牌的数量,键为 MahjongTile 对象,值为数量
|
||||
self.tile_count = defaultdict(int)
|
||||
|
||||
def add_tile(self, tile):
|
||||
""" 向手牌中添加一张牌 """
|
||||
if not isinstance(tile, MahjongTile):
|
||||
raise ValueError("必须添加 MahjongTile 类型的牌")
|
||||
if len(self.tiles) > 14:
|
||||
raise ValueError("手牌数量不能超过 14 张")
|
||||
self.tiles.append(tile) # 将牌添加到手牌中
|
||||
self.tile_count[tile] += 1 # 增加牌的数量
|
||||
def remove_tile(self, tile):
|
||||
""" 从手牌中移除一张牌 """
|
||||
if not isinstance(tile, MahjongTile):
|
||||
raise ValueError("必须移除 MahjongTile 类型的牌")
|
||||
if self.tile_count[tile] > 0:
|
||||
self.tiles.remove(tile)
|
||||
self.tile_count[tile] -= 1
|
||||
else:
|
||||
raise ValueError(f"手牌中没有该牌: {tile}")
|
||||
|
||||
def get_tile_count(self, tile):
|
||||
""" 获取手牌中某张牌的数量 """
|
||||
if not isinstance(tile, MahjongTile):
|
||||
raise ValueError("必须是 MahjongTile 类型的牌")
|
||||
return self.tile_count[tile]
|
||||
|
||||
def can_peng(self, tile):
|
||||
""" 判断是否可以碰(即是否已经有2张相同的牌,摸一张牌后可以碰) """
|
||||
if not isinstance(tile, MahjongTile):
|
||||
raise ValueError("必须是 MahjongTile 类型的牌")
|
||||
return self.tile_count[tile] == 2 # 摸一张牌后总数为 3 张,才可以碰
|
||||
|
||||
def can_gang(self, tile=None):
|
||||
"""
|
||||
判断是否可以杠牌。
|
||||
两种情况:
|
||||
1. 当前手牌中已经有 4 张相同的牌,可以选择杠。
|
||||
2. 当前手牌中有 3 张相同的牌,摸到第 4 张后可以杠。
|
||||
|
||||
:param tile: 需要判断的牌(可以为空)。
|
||||
:return: True 如果可以杠,否则 False。
|
||||
"""
|
||||
if tile is not None:
|
||||
# 情况 1: 摸到一张牌后形成四张
|
||||
if not isinstance(tile, MahjongTile):
|
||||
raise ValueError("必须是 MahjongTile 类型的牌")
|
||||
return self.tile_count[tile] == 4
|
||||
else:
|
||||
# 情况 2: 手牌中已有四张一样的牌
|
||||
for t, count in self.tile_count.items():
|
||||
if count == 4:
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_gang_type(self, tile=None):
|
||||
"""
|
||||
判断杠牌的类型(暗杠或明杠)。
|
||||
:param tile: 如果指定了牌,检查是否可以明杠。
|
||||
:return: 杠牌类型,"暗杠" 或 "明杠",如果不能杠返回 None。
|
||||
"""
|
||||
if tile is not None:
|
||||
# 明杠的判断逻辑:手牌中已有3张相同的牌,摸到第4张
|
||||
if not isinstance(tile, MahjongTile):
|
||||
raise ValueError("必须是 MahjongTile 类型的牌")
|
||||
if self.tile_count[tile] == 4:
|
||||
return "明杠"
|
||||
else:
|
||||
# 暗杠的判断逻辑:手牌中已有4张相同的牌
|
||||
for t, count in self.tile_count.items():
|
||||
if count == 4:
|
||||
return "暗杠"
|
||||
return None
|
||||
|
||||
|
||||
def __repr__(self):
|
||||
""" 返回手牌的字符串表示 """
|
||||
tiles_str = ", ".join(str(tile) for tile in self.tiles)
|
||||
return f"手牌: [{tiles_str}], 牌的数量: {dict(self.tile_count)}"
|
||||
def __iter__(self):
|
||||
"""使 Hand 对象可迭代,直接迭代其 tiles 列表"""
|
||||
return iter(self.tiles)
|
||||
@@ -6,6 +6,7 @@ class MahjongTile:
|
||||
raise ValueError("Invalid tile")
|
||||
self.suit = suit
|
||||
self.value = value
|
||||
self.index = ({"条": 0, "筒": 1, "万": 2}[suit]) * 9 + (value - 1)
|
||||
|
||||
def __repr__(self):
|
||||
return f"{self.value}{self.suit}"
|
||||
@@ -1,4 +1,4 @@
|
||||
from src.engine.mahjong_tile import MahjongTile
|
||||
from src.engine.mahjong.mahjong_tile import MahjongTile
|
||||
|
||||
class Meld:
|
||||
def __init__(self, tile, type: str):
|
||||
0
src/environment/__init__.py
Normal file
0
src/environment/__init__.py
Normal file
220
src/environment/chengdu_mahjong_env.py
Normal file
220
src/environment/chengdu_mahjong_env.py
Normal file
@@ -0,0 +1,220 @@
|
||||
import gym
|
||||
from gym import spaces
|
||||
import numpy as np
|
||||
|
||||
from src.engine.mahjong.actions import handle_peng, handle_gang, handle_win
|
||||
from src.engine.mahjong.chengdu_mahjong_engine import ChengduMahjongEngine
|
||||
from loguru import logger
|
||||
|
||||
|
||||
class ChengduMahjongEnv(gym.Env):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
# 初始化麻将引擎
|
||||
self.engine = ChengduMahjongEngine()
|
||||
|
||||
# 定义观察空间:手牌、明牌、弃牌和庄家信息
|
||||
self.observation_space = spaces.Dict({
|
||||
"hand": spaces.Box(low=0, high=4, shape=(108,), dtype=np.int32), # 手牌数量
|
||||
"melds": spaces.Box(low=0, high=4, shape=(108,), dtype=np.int32), # 明牌数量
|
||||
"discard_pile": spaces.Box(low=0, high=4, shape=(108,), dtype=np.int32), # 弃牌数量
|
||||
"dealer": spaces.Discrete(4), # 当前庄家
|
||||
})
|
||||
|
||||
# 初始化游戏
|
||||
self.reset()
|
||||
|
||||
@property
|
||||
def action_space(self):
|
||||
"""
|
||||
动态生成当前动作空间。
|
||||
"""
|
||||
valid_actions = self.get_action_space()
|
||||
# 动态生成离散动作空间的大小
|
||||
return spaces.Discrete(len(valid_actions))
|
||||
|
||||
def reset(self):
|
||||
"""重置游戏状态"""
|
||||
self.engine = ChengduMahjongEngine() # 重置引擎
|
||||
self.engine.initialize_game()
|
||||
self.engine.deal_tiles()
|
||||
return self._get_observation()
|
||||
|
||||
def step(self, action):
|
||||
"""
|
||||
执行动作,更新状态并返回结果。
|
||||
:param action: 动作(0-13 表示打牌, 14 表示碰, 15 表示杠, 16 表示胡)
|
||||
:return: obs, reward, done, info
|
||||
"""
|
||||
current_player = self.engine.state.current_player
|
||||
hand = self.engine.state.hands[current_player].tiles # 当前玩家手牌
|
||||
logger.info(f"玩家 {current_player} 手牌: {self.engine.state.hands[current_player].tiles}")
|
||||
|
||||
# **检查动作合法性**
|
||||
max_hand_actions = len(hand) # 当前玩家手牌数量
|
||||
max_action_index = max_hand_actions + 3 # 打牌 + 特殊动作
|
||||
|
||||
if action >= max_action_index:
|
||||
raise ValueError(f"无效的动作: {action}")
|
||||
|
||||
# **执行动作**
|
||||
if action < max_hand_actions: # 打牌动作
|
||||
tile = hand[action]
|
||||
# logger.info(f"玩家 {current_player} 选择打牌: {tile}")
|
||||
self.engine.check_other_players(tile)
|
||||
elif action == max_hand_actions: # 碰
|
||||
tile_to_peng = self._get_tile_for_special_action("peng")
|
||||
if tile_to_peng:
|
||||
handle_peng(self.engine, current_player, tile_to_peng)
|
||||
logger.info(f"玩家 {current_player} 碰了牌: {tile_to_peng}")
|
||||
else:
|
||||
logger.warning("碰动作无效,未满足条件")
|
||||
elif action == max_hand_actions + 1: # 杠
|
||||
tile_to_gang = self._get_tile_for_special_action("gang")
|
||||
if tile_to_gang:
|
||||
handle_gang(self.engine, current_player, tile_to_gang, mode="an")
|
||||
logger.info(f"玩家 {current_player} 杠了牌: {tile_to_gang}")
|
||||
else:
|
||||
logger.warning("杠动作无效,未满足条件")
|
||||
elif action == max_hand_actions + 2: # 胡
|
||||
if self.engine.state.can_win(
|
||||
self.engine.state.hands[current_player],
|
||||
self.engine.state.melds[current_player],
|
||||
self.engine.state.missing_suits[current_player]
|
||||
):
|
||||
handle_win(self.engine, current_player, None, None)
|
||||
logger.info(f"玩家 {current_player} 胡牌!")
|
||||
else:
|
||||
logger.warning("胡动作无效,未满足条件")
|
||||
|
||||
# **更新玩家轮次**
|
||||
if not self.engine.game_over: # 确保游戏未结束时才轮转玩家
|
||||
self.engine.state.current_player = (current_player + 1) % 4
|
||||
|
||||
# **更新状态**
|
||||
obs = self._get_observation()
|
||||
|
||||
# **奖励设计**
|
||||
reward = self._calculate_reward(current_player)
|
||||
|
||||
# **检查游戏是否结束**
|
||||
self.engine.check_game_over()
|
||||
done = self.engine.game_over
|
||||
|
||||
# **返回值**
|
||||
info = {
|
||||
"player": current_player,
|
||||
"action": action,
|
||||
}
|
||||
return obs, reward, done, info
|
||||
|
||||
def _get_observation(self):
|
||||
"""
|
||||
提取当前玩家的观察空间
|
||||
:return: dict
|
||||
"""
|
||||
player_index = self.engine.state.current_player
|
||||
hand = np.zeros(108, dtype=np.int32)
|
||||
melds = np.zeros(108, dtype=np.int32)
|
||||
discard_pile = np.zeros(108, dtype=np.int32)
|
||||
|
||||
# 填充手牌、明牌和弃牌信息
|
||||
for tile, count in self.engine.state.hands[player_index].tile_count.items():
|
||||
hand[tile.index] = count
|
||||
for meld in self.engine.state.melds[player_index]:
|
||||
melds[meld.tile.index] += meld.count
|
||||
for tile in self.engine.state.discards[player_index]:
|
||||
discard_pile[tile.index] += 1
|
||||
|
||||
return {
|
||||
"hand": hand,
|
||||
"melds": melds,
|
||||
"discard_pile": discard_pile,
|
||||
"dealer": self.engine.state.current_player,
|
||||
}
|
||||
|
||||
def _calculate_reward(self, current_player):
|
||||
"""
|
||||
奖励设计:基于分数变化
|
||||
:return: float
|
||||
"""
|
||||
return self.engine.state.scores[current_player] - 100
|
||||
|
||||
def _get_tile_for_special_action(self, action_type):
|
||||
"""
|
||||
获取可碰、杠、胡的牌
|
||||
:param action_type: "peng", "gang", "win"
|
||||
:return: tile or None
|
||||
"""
|
||||
if action_type == "peng":
|
||||
for tile, count in self.engine.state.hands[self.engine.state.current_player].tile_count.items():
|
||||
if count == 2: # 碰需要两张相同的牌
|
||||
return tile
|
||||
elif action_type == "gang":
|
||||
for tile, count in self.engine.state.hands[self.engine.state.current_player].tile_count.items():
|
||||
if count == 4: # 杠需要四张相同的牌
|
||||
return tile
|
||||
elif action_type == "win":
|
||||
if self.engine.state.can_win(
|
||||
self.engine.state.hands[self.engine.state.current_player],
|
||||
self.engine.state.melds[self.engine.state.current_player],
|
||||
self.engine.state.missing_suits[self.engine.state.current_player]
|
||||
):
|
||||
return True
|
||||
return None
|
||||
|
||||
def get_action_space(self):
|
||||
"""
|
||||
动态计算当前合法的动作空间。
|
||||
返回一个合法动作的列表,其中:
|
||||
- 0 到 len(hand.tiles) - 1 表示打出手牌的索引。
|
||||
- len(hand.tiles) 表示碰动作。
|
||||
- len(hand.tiles) + 1 表示杠动作。
|
||||
- len(hand.tiles) + 2 表示胡动作。
|
||||
"""
|
||||
current_player = self.engine.state.current_player
|
||||
hand = self.engine.state.hands[current_player]
|
||||
valid_actions = []
|
||||
|
||||
# 打牌动作
|
||||
valid_actions.extend(range(len(hand.tiles)))
|
||||
|
||||
# 特殊动作
|
||||
if self._can_peng(current_player):
|
||||
valid_actions.append(len(hand.tiles)) # 碰
|
||||
if self._can_gang(current_player):
|
||||
valid_actions.append(len(hand.tiles) + 1) # 杠
|
||||
if self._can_hu(current_player):
|
||||
valid_actions.append(len(hand.tiles) + 2) # 胡
|
||||
|
||||
return valid_actions
|
||||
|
||||
|
||||
# 辅助函数判断特殊动作是否可执行
|
||||
def _can_peng(self, player):
|
||||
"""
|
||||
判断玩家是否可以碰。
|
||||
"""
|
||||
for tile, count in self.engine.state.hands[player].tile_count.items():
|
||||
if count >= 2: # 至少两张相同的牌
|
||||
return True
|
||||
return False
|
||||
|
||||
def _can_gang(self, player):
|
||||
"""
|
||||
判断玩家是否可以杠。
|
||||
"""
|
||||
for tile, count in self.engine.state.hands[player].tile_count.items():
|
||||
if count == 4: # 有四张相同的牌
|
||||
return True
|
||||
return False
|
||||
|
||||
def _can_hu(self, player):
|
||||
"""
|
||||
判断玩家是否可以胡牌。
|
||||
"""
|
||||
return self.engine.state.can_win(
|
||||
self.engine.state.hands[player],
|
||||
self.engine.state.melds[player],
|
||||
self.engine.state.missing_suits[player]
|
||||
)
|
||||
@@ -1,120 +0,0 @@
|
||||
import gym
|
||||
import numpy as np
|
||||
from gym import spaces
|
||||
from src.engine.actions import draw_tile, discard_tile, peng, gang, check_blood_battle
|
||||
from src.engine.calculate_fan import calculate_fan, is_seven_pairs, is_cleared, is_big_pairs
|
||||
from src.engine.chengdu_mahjong_engine import ChengduMahjongEngine
|
||||
from src.engine.scoring import calculate_score
|
||||
|
||||
|
||||
class MahjongEnv(gym.Env):
|
||||
def __init__(self):
|
||||
super(MahjongEnv, self).__init__()
|
||||
self.engine = ChengduMahjongEngine()
|
||||
self.scores = [100, 100, 100, 100] # 四位玩家初始分数
|
||||
self.base_score = 1 # 底分
|
||||
self.max_rounds = 100 # 最大轮数,防止游戏无限进行
|
||||
self.current_round = 0 # 当前轮数
|
||||
self.action_space = spaces.Discrete(108) # 动作空间:打牌的索引
|
||||
self.observation_space = spaces.Box(low=0, high=4, shape=(108,), dtype=np.int32)
|
||||
|
||||
def reset(self):
|
||||
self.engine = ChengduMahjongEngine()
|
||||
self.scores = [100, 100, 100, 100] # 每局重置分数
|
||||
self.current_round = 0
|
||||
return self.engine.state.hands[self.engine.state.current_player]
|
||||
|
||||
def step(self, action):
|
||||
"""
|
||||
执行玩家动作并更新游戏状态。
|
||||
|
||||
参数:
|
||||
- action: 玩家动作,0 代表摸牌,1 代表打牌,2 代表碰牌,3 代表杠牌
|
||||
|
||||
返回:
|
||||
- next_state: 当前玩家的手牌
|
||||
- reward: 奖励
|
||||
- done: 是否结束
|
||||
- info: 其他信息(如奖励历史等)
|
||||
"""
|
||||
done = False
|
||||
reward = 0
|
||||
|
||||
try:
|
||||
if action == 0: # 0代表摸牌
|
||||
reward, done = draw_tile(self.engine) # 调用摸牌函数
|
||||
elif action == 1: # 1代表打牌
|
||||
tile = self.engine.state.hands[self.engine.state.current_player][0] # 假设选择第一张牌
|
||||
discard_tile(self.engine, tile) # 调用打牌函数
|
||||
reward, done = -1, False
|
||||
elif action == 2: # 2代表碰牌
|
||||
tile = self.engine.state.hands[self.engine.state.current_player][0] # 假设选择第一张牌
|
||||
peng(self.engine, tile) # 调用碰牌函数
|
||||
reward, done = 0, False
|
||||
elif action == 3: # 3代表杠牌
|
||||
tile = self.engine.state.hands[self.engine.state.current_player][0] # 假设选择第一张牌
|
||||
gang(self.engine, tile, mode="ming") # 暂时假设为明杠
|
||||
reward, done = 0, False
|
||||
|
||||
# 检查是否胡牌
|
||||
if self.engine.state.can_win(self.engine.state.hands[self.engine.state.current_player]):
|
||||
reward, done = self.handle_win() # 胡牌时处理胜利逻辑
|
||||
|
||||
# 检查游戏结束条件
|
||||
check_blood_battle(self.engine)
|
||||
|
||||
if self.engine.game_over: # 检查是否游戏结束
|
||||
done = True
|
||||
|
||||
except ValueError:
|
||||
reward, done = -10, False # 非法操作扣分
|
||||
|
||||
# 切换到下一个玩家
|
||||
self.engine.state.current_player = (self.engine.state.current_player + 1) % 4
|
||||
self.current_round += 1
|
||||
|
||||
# 如果达到最大轮数,结束游戏
|
||||
if self.current_round >= self.max_rounds:
|
||||
done = True
|
||||
reward = 0 # 平局奖励或惩罚(可调整)
|
||||
|
||||
return self.engine.state.hands[self.engine.state.current_player], reward, done, {}
|
||||
|
||||
def handle_win(self):
|
||||
"""
|
||||
处理胡牌后的分数结算和奖励。
|
||||
"""
|
||||
winner = self.engine.state.current_player
|
||||
hand = self.engine.state.hands[winner]
|
||||
melds = self.engine.state.melds[winner]
|
||||
is_self_draw = True # 假设自摸(后续可动态判断)
|
||||
|
||||
conditions = {
|
||||
"is_cleared": is_cleared(hand, melds),
|
||||
"is_seven_pairs": is_seven_pairs(hand),
|
||||
"is_big_pairs": is_big_pairs(hand),
|
||||
# 添加其他条件...
|
||||
}
|
||||
|
||||
# 动态计算番数
|
||||
fan = calculate_fan(hand, melds, is_self_draw, is_cleared, conditions)
|
||||
|
||||
# 动态计算得分
|
||||
scores = calculate_score(fan, self.base_score, is_self_draw)
|
||||
self.scores[winner] += scores["winner"]
|
||||
for i, score in enumerate(scores["loser"]):
|
||||
self.scores[i] += score # 扣分
|
||||
|
||||
# 奖励设置为赢家得分
|
||||
reward = scores["winner"]
|
||||
self.engine.state.winners.append(winner) # 添加赢家到列表
|
||||
return reward, True # 胡牌结束当前局
|
||||
|
||||
def render(self, mode="human"):
|
||||
"""
|
||||
打印游戏状态信息,便于调试。
|
||||
"""
|
||||
print(f"当前轮数: {self.current_round}")
|
||||
print("玩家分数:", self.scores)
|
||||
print("当前玩家状态:", self.engine.state.hands[self.engine.state.current_player])
|
||||
|
||||
144
src/environment/dizhu_env.py
Normal file
144
src/environment/dizhu_env.py
Normal file
@@ -0,0 +1,144 @@
|
||||
import gym
|
||||
import numpy as np
|
||||
from gym import spaces
|
||||
from src.engine.dizhu.dizhu_engine import DiZhuEngine # 引入斗地主引擎
|
||||
from loguru import logger
|
||||
|
||||
|
||||
class DouDiZhuEnv(gym.Env):
|
||||
def __init__(self):
|
||||
super(DouDiZhuEnv, self).__init__()
|
||||
self.engine = DiZhuEngine() # 初始化斗地主引擎
|
||||
|
||||
# 定义初始动作空间和观察空间
|
||||
self.action_space = spaces.Discrete(55) # 假设最大动作空间为 55
|
||||
self.observation_space = spaces.Dict({
|
||||
"hand_cards": spaces.Box(low=0, high=1, shape=(54,), dtype=np.int32), # 玩家手牌(独热编码)
|
||||
"history": spaces.Box(low=0, high=1, shape=(54,), dtype=np.int32), # 出牌历史
|
||||
"current_pile": spaces.Box(low=0, high=1, shape=(54,), dtype=np.int32), # 当前牌面上的牌
|
||||
"current_player": spaces.Discrete(3), # 当前玩家索引
|
||||
})
|
||||
|
||||
def reset(self):
|
||||
"""重置游戏环境"""
|
||||
self.engine.reset()
|
||||
logger.info("斗地主环境已重置")
|
||||
return self._get_observation()
|
||||
|
||||
def step(self, action):
|
||||
"""执行动作并更新环境"""
|
||||
try:
|
||||
reward = 0 # 初始化奖励
|
||||
current_player = self.engine.get_current_player()
|
||||
|
||||
|
||||
if action == 0: # 过牌
|
||||
self.engine.step("pass")
|
||||
reward -= 0.5 # 对频繁过牌给予轻微惩罚
|
||||
else:
|
||||
# 玩家选择出牌
|
||||
action_cards = self._decode_action(action) # 解码动作为具体的牌型
|
||||
|
||||
|
||||
# 出牌前的手牌数量
|
||||
previous_hand_count = len(current_player.hand_cards)
|
||||
|
||||
# 尝试执行动作
|
||||
self.engine.step(action_cards)
|
||||
|
||||
# 出牌后的手牌数量
|
||||
current_hand_count = len(current_player.hand_cards)
|
||||
|
||||
# 根据减少的手牌数量计算奖励
|
||||
reward += (previous_hand_count - current_hand_count) * 1.0
|
||||
|
||||
# 检查游戏是否结束
|
||||
done = self.engine.game_over
|
||||
if done:
|
||||
reward += 10 # 胜利时给予较大的奖励
|
||||
logger.info(f"游戏结束!胜利玩家: {self.engine.current_player_index + 1}")
|
||||
|
||||
return self._get_observation(), reward, done, {}
|
||||
|
||||
except ValueError as e:
|
||||
return self._get_observation(), -5, False, {"error": str(e)}
|
||||
|
||||
def _get_observation(self):
|
||||
"""获取当前玩家的观察空间"""
|
||||
current_player = self.engine.get_current_player()
|
||||
|
||||
# 手牌的独热编码
|
||||
hand_cards = np.zeros(54, dtype=np.int32)
|
||||
for card in current_player.hand_cards:
|
||||
hand_cards[card] = 1
|
||||
|
||||
# 出牌历史的独热编码
|
||||
history = np.zeros(54, dtype=np.int32)
|
||||
for play in current_player.history:
|
||||
if isinstance(play, list): # 如果是列表
|
||||
for card in play:
|
||||
history[card] = 1
|
||||
elif isinstance(play, int): # 如果是单个牌
|
||||
history[play] = 1
|
||||
|
||||
# 当前牌面的独热编码
|
||||
current_pile = np.zeros(54, dtype=np.int32)
|
||||
if self.engine.current_pile:
|
||||
for card in self.engine.current_pile:
|
||||
current_pile[card] = 1
|
||||
|
||||
return {
|
||||
"hand_cards": hand_cards,
|
||||
"history": history,
|
||||
"current_pile": current_pile,
|
||||
"current_player": self.engine.current_player_index,
|
||||
}
|
||||
|
||||
def _decode_action(self, action):
|
||||
"""
|
||||
解码动作为具体的牌型。
|
||||
:param action: 动作索引
|
||||
:return: 解码后的牌型
|
||||
"""
|
||||
valid_actions = self.get_action_space()
|
||||
if action < len(valid_actions):
|
||||
return valid_actions[action]
|
||||
else:
|
||||
raise ValueError(f"非法动作索引: {action}")
|
||||
|
||||
def render(self, mode="human"):
|
||||
"""打印当前游戏状态"""
|
||||
state = self.engine.get_game_state()
|
||||
print(state)
|
||||
|
||||
def get_action_space(self):
|
||||
"""
|
||||
动态生成当前合法的动作空间。
|
||||
返回一个合法动作的列表,其中:
|
||||
- 索引 0 表示过牌。
|
||||
- 其余索引表示具体的出牌动作。
|
||||
"""
|
||||
current_player = self.engine.get_current_player()
|
||||
hand_cards = current_player.hand_cards
|
||||
|
||||
# 所有合法出牌组合
|
||||
valid_actions = [["pass"]] # 索引 0 表示过牌
|
||||
valid_actions.extend(self._generate_valid_combinations(hand_cards))
|
||||
return valid_actions
|
||||
|
||||
def _generate_valid_combinations(self, hand_cards):
|
||||
"""
|
||||
根据手牌生成所有合法牌型组合。
|
||||
:param hand_cards: 当前玩家的手牌
|
||||
:return: 所有合法的牌型组合
|
||||
"""
|
||||
from itertools import combinations
|
||||
valid_combinations = []
|
||||
|
||||
# 示例:生成单牌、对子和三张的合法组合
|
||||
for i in range(1, len(hand_cards) + 1):
|
||||
for combo in combinations(hand_cards, i):
|
||||
if self.engine.is_valid_play(list(combo)): # 检查是否为合法牌型
|
||||
valid_combinations.append(list(combo))
|
||||
|
||||
return valid_combinations
|
||||
8
test.py
8
test.py
@@ -1,8 +1,8 @@
|
||||
from src.engine.chengdu_mahjong_state import ChengduMahjongState
|
||||
from src.engine.hand import Hand
|
||||
from src import ChengduMahjongState
|
||||
from src import Hand
|
||||
|
||||
from src.engine.mahjong_tile import MahjongTile
|
||||
from src.engine.meld import Meld
|
||||
from src import MahjongTile
|
||||
from src import Meld
|
||||
|
||||
hand = Hand()
|
||||
|
||||
|
||||
35
test_dizhu.py
Normal file
35
test_dizhu.py
Normal file
@@ -0,0 +1,35 @@
|
||||
from stable_baselines3 import PPO
|
||||
from src.environment.dizhu_env import DouDiZhuEnv # 导入斗地主环境
|
||||
from loguru import logger
|
||||
|
||||
|
||||
def test_dizhu_model(): # 确保函数名以 test_ 开头
|
||||
# 创建斗地主环境
|
||||
env = DouDiZhuEnv()
|
||||
|
||||
# 加载已训练的模型
|
||||
model_path = "./models/ppo_doudizhu_model.zip" # 确保路径正确
|
||||
logger.info(f"加载模型: {model_path}")
|
||||
try:
|
||||
model = PPO.load(model_path)
|
||||
except Exception as e:
|
||||
logger.error(f"加载模型失败: {e}")
|
||||
return
|
||||
|
||||
# 测试模型
|
||||
obs = env.reset()
|
||||
done = False
|
||||
total_reward = 0
|
||||
logger.info("开始测试斗地主模型...")
|
||||
|
||||
max_steps = 1000 # 设置最大步数
|
||||
step_count = 0
|
||||
|
||||
while not done and step_count < max_steps:
|
||||
action, _ = model.predict(obs, deterministic=True)
|
||||
obs, reward, done, info = env.step(action)
|
||||
total_reward += reward
|
||||
step_count += 1
|
||||
logger.info(f"动作: {action}, 奖励: {reward}, 是否结束: {done}, 信息: {info}")
|
||||
|
||||
logger.info(f"测试完成,总奖励: {total_reward}")
|
||||
0
tests/dizhu/__init__.py
Normal file
0
tests/dizhu/__init__.py
Normal file
0
tests/mahjong/__init__.py
Normal file
0
tests/mahjong/__init__.py
Normal file
@@ -1,8 +1,7 @@
|
||||
import pytest
|
||||
from src.engine.calculate_fan import calculate_fan, is_seven_pairs, is_cleared, is_big_pairs
|
||||
from src import calculate_fan, is_seven_pairs, is_cleared, is_big_pairs
|
||||
|
||||
from src.engine.hand import Hand
|
||||
from src.engine.mahjong_tile import MahjongTile
|
||||
from src import Hand
|
||||
from src import MahjongTile
|
||||
|
||||
# 测试用例
|
||||
|
||||
48
tests/mahjong/test_chengdu_mahjong_engine.py
Normal file
48
tests/mahjong/test_chengdu_mahjong_engine.py
Normal file
@@ -0,0 +1,48 @@
|
||||
from src import ChengduMahjongEngine
|
||||
from loguru import logger
|
||||
|
||||
def test_mahjong_engine():
|
||||
"""
|
||||
测试成都麻将引擎,包括初始化、发牌、轮次逻辑等。
|
||||
"""
|
||||
# 初始化麻将引擎
|
||||
engine = ChengduMahjongEngine()
|
||||
|
||||
# 初始化游戏
|
||||
engine.initialize_game()
|
||||
|
||||
# 发牌
|
||||
engine.deal_tiles()
|
||||
|
||||
# 检查发牌后的状态
|
||||
logger.info(f"庄家: 玩家 {engine.state.current_player}")
|
||||
for player in range(4):
|
||||
hand = engine.state.hands[player]
|
||||
logger.info(f"玩家 {player} 的手牌: {hand}")
|
||||
logger.info(f"玩家 {player} 的缺门: {engine.state.missing_suits[player]}")
|
||||
|
||||
# 模拟游戏主循环
|
||||
try:
|
||||
engine.run()
|
||||
except Exception as e:
|
||||
logger.error(f"测试引擎时出错: {e}")
|
||||
|
||||
# 打印游戏结束后的状态
|
||||
logger.info("游戏结束!")
|
||||
for player in range(4):
|
||||
logger.info(
|
||||
f"玩家 {player}: 分数={engine.state.scores[player]}, "
|
||||
f"手牌数量={len(engine.state.hands[player].tiles)}, 明牌数量={len(engine.state.melds[player])}, "
|
||||
f"缺门={engine.state.missing_suits[player]}, 手牌={engine.state.hands[player]}, 明牌={engine.state.melds[player]}"
|
||||
)
|
||||
|
||||
# 记录赢家信息
|
||||
if engine.state.winners:
|
||||
logger.info(f"赢家: {engine.state.winners}")
|
||||
else:
|
||||
logger.info("没有赢家!")
|
||||
|
||||
|
||||
# 运行测试
|
||||
if __name__ == "__main__":
|
||||
test_mahjong_engine()
|
||||
@@ -1,7 +1,7 @@
|
||||
from src.engine.chengdu_mahjong_state import ChengduMahjongState
|
||||
from src.engine.hand import Hand
|
||||
from src.engine.mahjong_tile import MahjongTile
|
||||
from src.engine.meld import Meld
|
||||
from src import ChengduMahjongState
|
||||
from src import Hand
|
||||
from src import MahjongTile
|
||||
from src import Meld
|
||||
|
||||
|
||||
def test_set_missing_suit():
|
||||
@@ -1,7 +1,7 @@
|
||||
from src.engine.hand import Hand
|
||||
from src.engine.mahjong_tile import MahjongTile
|
||||
from src.engine.fan_type import is_basic_win,is_cleared,calculate_terminal_fan,is_seven_pairs,is_full_request,is_dragon_seven_pairs
|
||||
from src.engine.meld import Meld
|
||||
from src import Hand
|
||||
from src import MahjongTile
|
||||
from src import is_basic_win,is_cleared,is_terminal_fan,is_seven_pairs,is_full_request,is_dragon_seven_pairs
|
||||
from src import Meld
|
||||
|
||||
def test_is_basic_win():
|
||||
"""
|
||||
@@ -100,7 +100,7 @@ def test_calculate_terminal_fan():
|
||||
hand.add_tile(MahjongTile("条", 5))
|
||||
hand.add_tile(MahjongTile("条", 5))
|
||||
melds = []
|
||||
assert calculate_terminal_fan(hand, melds) == 3, "测试失败:基本带幺九应为 3 番"
|
||||
assert is_terminal_fan(hand, melds) == 3, "测试失败:基本带幺九应为 3 番"
|
||||
|
||||
def test_is_seven_pairs():
|
||||
"""测试七对番型"""
|
||||
@@ -1,5 +1,5 @@
|
||||
from src.engine.hand import Hand
|
||||
from src.engine.mahjong_tile import MahjongTile
|
||||
from src import Hand
|
||||
from src import MahjongTile
|
||||
|
||||
|
||||
def test_add_tile():
|
||||
@@ -1,4 +1,4 @@
|
||||
from src.engine.mahjong_tile import MahjongTile
|
||||
from src import MahjongTile
|
||||
|
||||
def test_mahjong_tile():
|
||||
# 测试合法的牌
|
||||
@@ -1,5 +1,5 @@
|
||||
import pytest
|
||||
from src.engine.scoring import calculate_score
|
||||
from src import calculate_score
|
||||
|
||||
@pytest.mark.parametrize("fan, is_self_draw, base_score, expected_scores", [
|
||||
# 测试用例 1: 自摸,总番数 3
|
||||
@@ -1,4 +1,4 @@
|
||||
from src.engine.utils import get_suit,get_tile_name
|
||||
from src import get_suit,get_tile_name
|
||||
|
||||
def test_get_suit():
|
||||
# 测试条花色(0-35)
|
||||
0
tests/models/__init__.py
Normal file
0
tests/models/__init__.py
Normal file
@@ -1,96 +0,0 @@
|
||||
def test_draw_tile():
|
||||
from src.engine.chengdu_mahjong_engine import ChengduMahjongEngine
|
||||
|
||||
engine = ChengduMahjongEngine()
|
||||
initial_remaining = engine.state.remaining_tiles
|
||||
tile = engine.draw_tile()
|
||||
|
||||
# 验证牌堆数量减少
|
||||
assert engine.state.remaining_tiles == initial_remaining - 1, "牌堆数量未正确减少"
|
||||
# 验证牌已加入当前玩家手牌
|
||||
assert engine.state.hands[engine.state.current_player][tile] > 0, "摸牌未加入玩家手牌"
|
||||
print(f"test_draw_tile passed: 摸到了 {tile}")
|
||||
|
||||
|
||||
def test_discard_tile():
|
||||
from src.engine.chengdu_mahjong_engine import ChengduMahjongEngine
|
||||
|
||||
engine = ChengduMahjongEngine()
|
||||
tile = engine.draw_tile() # 玩家先摸牌
|
||||
engine.discard_tile(tile) # 打出摸到的牌
|
||||
|
||||
# 验证手牌数量减少
|
||||
assert engine.state.hands[engine.state.current_player][tile] == 0, "手牌未正确移除"
|
||||
# 验证牌加入了牌河
|
||||
assert tile in engine.state.discards[engine.state.current_player], "牌未正确加入牌河"
|
||||
print(f"test_discard_tile passed: 打出了 {tile}")
|
||||
|
||||
|
||||
def test_set_missing_suit():
|
||||
from src.engine.chengdu_mahjong_state import ChengduMahjongState
|
||||
|
||||
state = ChengduMahjongState()
|
||||
player = 0
|
||||
missing_suit = "筒"
|
||||
|
||||
state.set_missing_suit(player, missing_suit)
|
||||
|
||||
# 验证缺门是否正确设置
|
||||
assert state.missing_suits[player] == missing_suit, "缺门设置错误"
|
||||
print(f"test_set_missing_suit passed: 缺门设置为 {missing_suit}")
|
||||
|
||||
|
||||
def test_can_win():
|
||||
from src.engine.chengdu_mahjong_state import ChengduMahjongState
|
||||
|
||||
state = ChengduMahjongState()
|
||||
hand = [0] * 108
|
||||
hand[0] = 2 # 两张1条(对子)
|
||||
hand[3] = 1 # 2条
|
||||
hand[4] = 1 # 3条
|
||||
hand[5] = 1 # 4条
|
||||
hand[10] = 1 # 5条
|
||||
hand[11] = 1 # 6条
|
||||
hand[12] = 1 # 7条
|
||||
hand[20] = 1 # 8条
|
||||
hand[21] = 1 # 9条
|
||||
hand[22] = 1 # 1筒
|
||||
hand[30] = 1 # 2筒
|
||||
hand[31] = 1 # 3筒
|
||||
hand[32] = 1 # 4筒
|
||||
|
||||
result = state.can_win(hand)
|
||||
|
||||
assert result is True, "胡牌判断失败"
|
||||
print(f"test_can_win passed: 胡牌条件正确")
|
||||
|
||||
|
||||
|
||||
|
||||
def test_peng():
|
||||
from src.engine.chengdu_mahjong_engine import ChengduMahjongEngine
|
||||
|
||||
engine = ChengduMahjongEngine()
|
||||
tile = 5 # 模拟手牌中有3张牌
|
||||
engine.state.hands[engine.state.current_player][tile] = 3
|
||||
engine.peng(tile)
|
||||
|
||||
# 验证手牌减少
|
||||
assert engine.state.hands[engine.state.current_player][tile] == 1, "碰牌后手牌数量错误"
|
||||
# 验证明牌记录
|
||||
assert ("peng", tile) in engine.state.melds[engine.state.current_player], "碰牌未正确记录"
|
||||
print(f"test_peng passed: 碰牌成功")
|
||||
|
||||
def test_gang():
|
||||
from src.engine.chengdu_mahjong_engine import ChengduMahjongEngine
|
||||
|
||||
engine = ChengduMahjongEngine()
|
||||
tile = 10 # 模拟手牌中有4张牌
|
||||
engine.state.hands[engine.state.current_player][tile] = 4
|
||||
engine.gang(tile, mode="an")
|
||||
|
||||
# 验证手牌减少
|
||||
assert engine.state.hands[engine.state.current_player][tile] == 0, "杠牌后手牌数量错误"
|
||||
# 验证明牌记录
|
||||
assert ("an_gang", tile) in engine.state.melds[engine.state.current_player], "杠牌未正确记录"
|
||||
print(f"test_gang passed: 杠牌成功")
|
||||
Reference in New Issue
Block a user