Compare commits

...

33 Commits

Author SHA1 Message Date
5e492e4d8f 1
1
2024-12-02 00:26:05 +08:00
f1836172d6 1
1
2024-12-02 00:20:51 +08:00
adeb153c8a 1
1
2024-12-02 00:00:36 +08:00
5336990f54 1
1
2024-12-01 23:40:37 +08:00
deff3cb921 1 2024-12-01 22:47:09 +08:00
96f0fbdcd7 1
1
2024-12-01 22:43:40 +08:00
9aec69bb46 1
1
2024-12-01 22:42:34 +08:00
e54a8c5f00 11
1
2024-12-01 22:28:46 +08:00
1d6593a8f2 1
1
2024-12-01 22:24:55 +08:00
a14984a263 1
1
2024-12-01 22:19:34 +08:00
3e65e02704 1
1
2024-12-01 22:14:23 +08:00
5eef2384cf 1
1
2024-12-01 20:26:09 +08:00
0864295a6e 1
1
2024-12-01 20:01:23 +08:00
c9defe78f1 1
1
2024-12-01 19:45:31 +08:00
27da1caf44 1
1
2024-12-01 19:29:42 +08:00
bf1c5116be 1
1
2024-12-01 19:13:02 +08:00
3ff448b15d Update README.md 2024-12-01 18:35:49 +08:00
9cfd4f88af Update README.md 2024-12-01 18:34:29 +08:00
80088a4144 1
1
2024-12-01 17:39:35 +08:00
840d44b773 1
1
2024-12-01 17:36:03 +08:00
fc1259d0e2 Update actions.py 2024-12-01 17:27:01 +08:00
5f22c0b6eb 1
1
2024-12-01 17:24:25 +08:00
ff15ecb1a1 1
1
2024-12-01 17:13:43 +08:00
2bc34bc860 1
1
2024-12-01 17:00:07 +08:00
df0aa388e2 1
1
2024-12-01 16:54:52 +08:00
daec029789 11
1
2024-12-01 16:45:09 +08:00
5aa40d1dd5 Update chengdu_mahjong_engine.py 2024-12-01 16:35:14 +08:00
ea1a938aad Update hand.py 2024-12-01 16:31:21 +08:00
66310aa2f5 1
1
2024-12-01 16:23:45 +08:00
efc71af70c 1
1
2024-12-01 15:46:50 +08:00
64c7b47a4b Update actions.py 2024-12-01 05:36:23 +08:00
6a6baeb460 Update actions.py 2024-12-01 05:06:29 +08:00
03fcd8203c Update actions.py 2024-12-01 05:03:08 +08:00
45 changed files with 1764 additions and 615 deletions

108
README.md
View File

@@ -76,7 +76,7 @@
- **缺一门**:玩家必须选择缺少一种花色(条、筒、万中的任意一种),即只能用两种花色来胡牌。如果手中只有单一花色,则为清一色。
- **定缺**:游戏开始时,每位玩家需要扣下一张牌作为自己缺的那门,并且不能更改。如果本身就是两门牌,则可以报“天缺”而不扣牌。
- **起牌与打牌**:庄家通过掷骰子决定起牌位置,然后按顺序抓牌。庄家先出牌,之后每家依次摸牌打牌。
- **碰、杠**:允许碰牌和杠牌,但不允许吃牌。杠牌分为明杠和暗杠,明杠是其他玩家打出的牌被你碰后又摸到相同的牌;暗杠则是你自己摸到四张相同的牌。
- **碰、杠**:允许碰牌和杠牌,但不允许吃牌。杠牌分为明杠和暗杠,明杠是其他玩家打出的牌刚好与你手里有三张的牌相同;暗杠则是你自己摸到四张相同的牌。
- **胡牌**胡牌的基本条件是拥有一个对子加上四个顺子或刻子三个相同牌。自摸为三家给分点炮则由放炮者给分。n*AAA+m*ABC+DD mn可以等于0。
- **血战到底**:一家胡牌后,其他未胡牌的玩家继续游戏,直到只剩下最后一位玩家或者黄庄(所有牌都被摸完)为止。
@@ -176,7 +176,111 @@
麻将游戏引擎建模代码于项目根src/engine/目录下。
## PPOProximal Policy Optimization算法
## 算法
#### **1. 强化学习**
适用于学习最佳策略帮助AI根据牌局动态决策如摸牌、出牌、胡牌等
**Q-Learning/Deep Q-Learning (DQN)**:
使用价值函数近似,适用于简单麻将变体。
在复杂麻将中可能遇到状态空间爆炸的问题。
**Policy Gradient(如 REINFORCE、PPO、A3C)**:
- 直接学习策略,适合连续决策问题。
- Proximal Policy Optimization (PPO) 是目前表现较好的强化学习算法。
**AlphaZero/Monte Carlo Tree Search (MCTS)**:
- 结合深度神经网络和搜索算法,模拟多局游戏,适用于探索全局最优策略。
**适用场景**
自主对局学习(自我博弈)。
学习如何综合权衡得失(如是否碰牌、杠牌或放弃操作)。
#### **2. 模拟和搜索算法**
适用于推理对手手牌或牌堆剩余牌,提升策略的稳定性。
**算法**
- Monte Carlo Tree Search (MCTS):
- 模拟多个可能的后续动作,估算每个动作的收益。
- 常用于长序列决策,如考虑碰、杠、胡等多步操作的效果。
- Minimax with Alpha-Beta Pruning:
- 在两人麻将(或简化版本)中,模拟对手的可能操作。
**适用场景**
需要进行搜索优化的场景(如判断是否选择碰、杠)。
分析未来几步操作对得分的影响。
#### **3. 监督学习**
适用于模仿人类玩家的决策或历史数据学习。
**算法**
- 分类算法
(如 Logistic Regression、Random Forest、XGBoost、Neural Networks
- 学习单步决策(如出哪张牌)。
- 适用于学习简单的局部决策。
- 序列模型
(如 RNN、LSTM、Transformer
- 学习决策的序列模式(如出牌顺序和策略连贯性)。
- Transformer 可以捕捉复杂的上下文关系。
**适用场景**
有大量玩家对局数据作为训练集。
模拟人类打牌风格。
#### **4. 混合方法**
结合强化学习和监督学习的优点,以应对麻将的高复杂性和多样化。
**示例方法**
- Imitation Learning + Reinforcement Learning:
- 先使用监督学习模仿玩家风格,再用强化学习微调策略。
- AlphaZero-like Framework:
- 结合深度强化学习和搜索(如 MCTS强化对局策略。
**适用场景**
需要在短时间内获得可用的AI策略。
想进一步优化模型的决策能力。
#### **对抗学习**
让AI与自身对局自我博弈或与其他AI对局提升对抗能力。
#### **工具和框架**
1. 强化学习框架:
- **Stable-Baselines3**: 简单易用支持PPO、DQN等算法。
- **Ray RLlib**: 分布式强化学习框架,适合复杂任务。
2. 深度学习框架:
- TensorFlow 或 PyTorch构建神经网络和深度学习模型。
3. 麻将环境:
- 自定义麻将环境或使用已有开源环境(如 OpenAI Gym 或 MahjongRL
### PPOProximal Policy Optimization算法
TensorBoard 通常会记录和可视化多种训练指标。你提到的这些图表代表了 PPO 训练过程中的不同方面。下面是对每个图表的解释:

View File

@@ -1,4 +1,3 @@
# configs/log_config.py
from loguru import logger
import os
@@ -10,5 +9,31 @@ def setup_logging():
# 清除所有现有日志处理器,防止重复配置
logger.remove()
# 配置日志,记录到 ../logs 目录下
logger.add(os.path.join(log_dir, "chengdu_mj_engine.log"), rotation="10 MB", level="DEBUG", format="{time} {level} {message}")
# 配置斗地主日志文件
logger.add(
os.path.join(log_dir, "doudizhu_engine.log"), # 斗地主的日志文件
rotation="10 MB",
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss.SSS} {level} {message}",
backtrace=True, # 启用完整堆栈信息
diagnose=True # 启用变量诊断信息
)
# 配置麻将日志文件(如果需要)
logger.add(
os.path.join(log_dir, "chengdu_mj_engine.log"),
rotation="10 MB",
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss.SSS} {level} {message}",
backtrace=True,
diagnose=True,
)
# 配置控制台日志
logger.add(
lambda msg: print(msg),
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss.SSS} {level} {message}",
backtrace=True,
diagnose=True,
)

Binary file not shown.

Binary file not shown.

View File

@@ -1,33 +1,43 @@
import gym
from stable_baselines3 import PPO
from src.environment.chengdu_majiang_env import MahjongEnv
from src import ChengduMahjongEnv
import torch
from configs.log_config import setup_logging
from loguru import logger # 添加 logger
def train_model():
# 创建 MahjongEnv 环境实例
env = MahjongEnv()
env = ChengduMahjongEnv()
# 检查是否有可用的 GPU
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"使用设备: {device}")
logger.info(f"使用设备: {device}") # 替换 print 为 logger.info
# 使用 PPO 算法训练模型
model = PPO("MlpPolicy", env, verbose=1, tensorboard_log="../logs/ppo_mahjong_tensorboard/", device=device)
# 使用 PPO 算法训练模型,切换到 MultiInputPolicy
model = PPO(
"MultiInputPolicy", # 更改为 MultiInputPolicy
env,
verbose=1,
tensorboard_log="../logs/ppo_mahjong_tensorboard/",
device=device
)
# 训练模型,训练总步数为 100000
logger.info("开始训练模型...")
model.learn(total_timesteps=100)
logger.info("模型训练完成!")
# 保存训练后的模型
model.save("../models/ppo_mahjong_model")
logger.info("模型已保存到 '../models/ppo_mahjong_model'")
# 测试模型
logger.info("开始测试模型...")
obs = env.reset()
done = False
while not done:
action, _states = model.predict(obs) # 使用训练好的模型来选择动作
obs, reward, done, info = env.step(action) # 执行动作
env.render() # 打印环境状态
logger.info(f"动作: {action}, 奖励: {reward}, 是否结束: {done}, 信息: {info}") # 替换 print 为 logger.info
if __name__ == "__main__":
# 调用配置函数来设置日志

View File

@@ -0,0 +1,46 @@
from stable_baselines3 import PPO
from src.environment.dizhu_env import DouDiZhuEnv # 导入斗地主环境
import torch
from configs.log_config import setup_logging
from loguru import logger # 使用日志工具
def train_dizhu_model():
# 创建 DouDiZhuEnv 环境实例
env = DouDiZhuEnv()
# 检查是否有可用的 GPU
device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"使用设备: {device}") # 使用 logger 记录设备信息
# 使用 PPO 算法训练模型,设置为 MultiInputPolicy
model = PPO(
"MultiInputPolicy", # 适用于多输入的策略
env,
verbose=1,
tensorboard_log="../logs/ppo_doudizhu_tensorboard/", # TensorBoard 日志路径
device=device
)
# 训练模型,设定总训练步数
logger.info("开始训练斗地主模型...")
model.learn(total_timesteps=10000000000000000) # 总训练步数
logger.info("斗地主模型训练完成!")
# 保存训练后的模型
model_path = "../models/ppo_doudizhu_model"
model.save(model_path)
logger.info(f"模型已保存到 '{model_path}'")
# 测试模型
logger.info("开始测试斗地主模型...")
obs = env.reset()
done = False
while not done:
action, _states = model.predict(obs) # 使用训练好的模型来选择动作
obs, reward, done, info = env.step(action) # 执行动作
logger.info(f"动作: {action}, 奖励: {reward}, 是否结束: {done}, 信息: {info}") # 记录测试过程
if __name__ == "__main__":
# 设置日志
setup_logging()
train_dizhu_model()

View File

@@ -1,173 +0,0 @@
from loguru import logger
from src.engine.mahjong_tile import MahjongTile
def draw_tile(engine):
"""
当前玩家摸牌逻辑,记录牌的详细信息和游戏状态。
"""
# 检查牌堆是否已空
if engine.state.remaining_tiles == 0:
logger.warning("牌堆已空,游戏结束!")
engine.game_over = True
return 0, True # 游戏结束时返回 0 和 done = True
# 当前玩家
current_player = engine.state.current_player
# 从牌堆中摸一张牌
tile = engine.state.deck.pop(0) # 从牌堆抽取一张牌
engine.state.remaining_tiles -= 1 # 更新剩余牌数
engine.state.hands[current_player].add_tile(tile) # 将牌加入当前玩家手牌
# 获取牌名
tile_name = str(tile) # 调用 MahjongTile 的 __repr__ 方法
logger.info(
f"玩家 {current_player} 摸到一张牌: {tile_name}(索引 {tile})。"
f"剩余牌堆数量: {engine.state.remaining_tiles}"
)
# 检查摸到的牌是否属于缺门
missing_suit = engine.state.missing_suits[current_player]
if tile.suit == missing_suit:
logger.info(f"玩家 {current_player} 摸到缺门牌: {tile_name},需要优先打出")
# 切换到下一位玩家
next_player = (current_player + 1) % 4
engine.state.current_player = next_player
# 返回奖励和游戏是否结束的标志
return 0, False # 奖励为 0done 为 False游戏继续
def discard_tile(self, tile):
"""
当前玩家打牌逻辑,记录打出的牌和当前牌河信息。
"""
current_player = self.state.current_player
hand = self.state.hands[current_player]
# 检查牌的有效性
if not isinstance(tile, MahjongTile):
logger.error(f"玩家 {current_player} 尝试打出无效的牌: {tile}")
raise ValueError("打出的牌必须是 MahjongTile 对象")
# 检查是否有这张牌
if hand[tile] == 0:
logger.error(f"玩家 {current_player} 尝试打出不存在的牌: {tile}")
raise ValueError("玩家没有这张牌")
# 检查缺门规则
missing_suit = self.state.missing_suits[current_player]
if tile.suit == missing_suit and any(t.suit == missing_suit for t in hand.tiles):
logger.error(f"玩家 {current_player} 尝试打出非缺门的牌: {tile}")
raise ValueError("必须先打完缺门花色的牌")
# 从手牌中移除
hand[tile] -= 1
self.state.discards[current_player].append(tile)
# 打出的牌名
tile_name = get_tile_name(tile)
# 打出牌后打印状态
logger.info(
f"玩家 {current_player} 打出一张牌: {tile_name}(索引 {tile})。"
f"当前牌河: {[get_tile_name(t) for t in self.state.discards[current_player]]}"
)
# 检查是否触发其他玩家的操作(碰、杠、胡牌)
self.check_other_players(tile)
return tile
def peng(self, tile):
"""
当前玩家碰牌逻辑,记录碰牌操作和手牌状态。
"""
player = self.state.current_player
if self.state.hands[player][tile] < 2:
logger.error(f"玩家 {player} 尝试碰牌失败: {get_tile_name(tile)}(索引 {tile}")
raise ValueError("碰牌条件不满足")
self.state.hands[player][tile] -= 2 # 减去两张牌
self.state.melds[player].append(("peng", tile)) # 加入明牌列表
tile_name = get_tile_name(tile)
logger.info(f"玩家 {player} 碰了一张牌: {tile_name}(索引 {tile})。当前明牌: {self.state.melds[player]}")
def gang(self, tile, mode):
"""
当前玩家杠牌逻辑,记录杠牌类型和状态更新。
"""
player = self.state.current_player
tile_name = get_tile_name(tile)
if mode == "ming" and self.state.hands[player][tile] == 3:
self.state.hands[player][tile] -= 3
self.state.melds[player].append(("ming_gang", tile))
logger.info(f"玩家 {player} 明杠: {tile_name}(索引 {tile}")
self.state.scores[player] += 1 # 奖励1分
logger.info(f"玩家 {player} 因明杠获得1分")
elif mode == "an" and self.state.hands[player][tile] == 4:
self.state.hands[player][tile] -= 4
self.state.melds[player].append(("an_gang", tile))
logger.info(f"玩家 {player} 暗杠: {tile_name}(索引 {tile}")
self.state.scores[player] += 1 # 奖励1分
logger.info(f"玩家 {player} 因暗杠获得1分")
else:
logger.error(f"玩家 {player} 尝试杠牌失败: {tile_name}(索引 {tile}),条件不满足")
raise ValueError("杠牌条件不满足")
def check_blood_battle(self):
"""
检查游戏是否流局或血战结束,记录状态。
"""
if any(score <= 0 for score in self.state.scores):
logger.info(f"游戏结束某玩家分数小于等于0: {self.state.scores}")
self.game_over = True
if len(self.state.winners) >= 3 or self.state.remaining_tiles == 0:
logger.info(f"游戏结束,赢家列表: {self.state.winners}")
self.game_over = True
def set_missing_suit(player, game_state):
"""
玩家自动根据手牌选择缺门。
参数:
- player: 玩家索引0-3
- game_state: 当前的游戏状态(`ChengduMahjongState` 实例)。
返回:
- str: 玩家设置的缺门花色。
"""
valid_suits = ["", "", ""]
hand = game_state.hands[player] # 获取玩家手牌
# 统计每种花色的牌数量
suit_counts = {suit: 0 for suit in valid_suits}
for tile in hand.tiles:
suit_counts[tile.suit] += 1
# 找到数量最少的花色
missing_suit = min(suit_counts, key=suit_counts.get)
# 检查是否已经设置过缺门
if game_state.missing_suits[player] is not None:
logger.warning(f"玩家 {player} 已设置过缺门,不能重复设置")
raise ValueError("缺门已经设置,不能重复设置")
# 设置缺门并记录日志
game_state.missing_suits[player] = missing_suit
logger.info(
f"玩家 {player} 手牌花色分布: {suit_counts}。缺门设置为: {missing_suit}"
)
return missing_suit

View File

@@ -1,92 +0,0 @@
def calculate_fan(hand, melds, is_self_draw, is_cleared, conditions):
"""
动态计算番数,优先处理高番数。
参数:
- hand: 当前胡牌的手牌Hand 对象)。
- melds: 玩家已明牌的列表Meld 对象列表)。
- is_self_draw: 是否自摸。
- is_cleared: 是否清一色。
- conditions: 字典,包含特殊胡牌条件,如 {"is_seven_pairs": True, "is_tian_hu": True}。
返回:
- fan: 最大番数。
"""
fan = 0 # 默认番数
# 定义番数规则(按优先级从高到低排序)
rules = {
"tian_hu": lambda: 12 if conditions.get("is_tian_hu", False) else 0, # 天胡
"di_hu": lambda: 12 if conditions.get("is_di_hu", False) else 0, # 地胡
"dragon_seven_pairs": lambda: 12 if is_dragon_seven_pairs(hand) else 0, # 龙七对
"clear_seven_pairs": lambda: 12 if is_cleared and is_seven_pairs(hand) else 0, # 清七对
"full_request": lambda: 6 if is_full_request(hand, melds) else 0, # 全求人
"pure_cleared": lambda: 4 if is_cleared and len(melds) >= 2 else 0, # 极品清一色
"seven_pairs": lambda: 2 if is_seven_pairs(hand) else 0, # 七对
"big_pairs": lambda: 2 if is_big_pairs(hand) else 0, # 大对子
"small_pairs": lambda: 2 if is_small_pairs(hand) else 0, # 小七对
"golden_hook": lambda: 1 if is_golden_hook(hand) else 0, # 金钩吊
"gang_flower": lambda: 1 if conditions.get("is_gang_flower", False) else 0, # 杠上开花
"rob_gang": lambda: 1 if conditions.get("is_rob_gang", False) else 0, # 抢杠胡
"under_the_sea": lambda: 1 if conditions.get("is_under_the_sea", False) else 0, # 海底捞月
"plain_win": lambda: 1 if not any(conditions.values()) else 0 # 平胡
}
# 逐一应用规则
for rule, func in rules.items():
result = func()
if result > fan:
fan = result # 选择当前最大番数
return fan
# 辅助函数实现
def is_seven_pairs(hand):
"""检查是否是七对(七个对子)。"""
from collections import Counter
counter = Counter(hand.tiles)
return sum(1 for count in counter.values() if count == 2) == 7
def is_dragon_seven_pairs(hand):
"""检查是否是龙七对(七对中有一对是三张)。"""
from collections import Counter
counter = Counter(hand.tiles)
pairs = sum(1 for count in counter.values() if count == 2)
triplets = sum(1 for count in counter.values() if count == 3)
return pairs == 6 and triplets == 1
def is_big_pairs(hand):
"""检查是否是大对子(四坎牌和一对将,坎牌为刻子)。"""
from collections import Counter
counter = Counter(hand.tiles)
triplets = sum(1 for count in counter.values() if count == 3)
pairs = sum(1 for count in counter.values() if count == 2)
return triplets == 4 and pairs == 1
def is_golden_hook(hand):
"""检查是否是金钩吊(只剩一张牌)。"""
return len(hand.tiles) == 1
def is_full_request(hand, melds):
"""检查是否是全求人(所有牌通过碰、杠完成)。"""
return all(tile_count == 0 for tile_count in hand.tiles) and len(melds) > 0
def is_cleared(hand, melds):
"""检查是否是清一色(所有牌同一种花色)。"""
all_tiles = hand.tiles + [meld.tile for meld in melds]
suits = {tile.suit for tile in all_tiles}
return len(suits) == 1
def is_small_pairs(hand):
"""
检查是否是小七对(六对加一对)。
"""
from collections import Counter
counter = Counter(hand.tiles)
pairs = sum(1 for count in counter.values() if count == 2)
return pairs == 6

View File

@@ -1,99 +0,0 @@
import random
from loguru import logger
from src.engine.actions import set_missing_suit
from src.engine.chengdu_mahjong_state import ChengduMahjongState
from src.engine.actions import draw_tile, discard_tile, peng, gang, check_blood_battle
class ChengduMahjongEngine:
def __init__(self):
self.state = ChengduMahjongState()
self.game_over = False
self.game_started = False
self.current_player = 0
def initialize_game(self):
"""
初始化游戏,确定庄家,发牌并设置缺门。
"""
logger.info("游戏初始化...")
# 确定庄家(掷骰子)
self.state.current_player = random.randint(0, 3)
logger.info(f"庄家确定为玩家 {self.state.current_player}")
logger.info("游戏初始化完成,准备开始!")
def deal_tiles(self):
""" 发牌庄家摸14张其余玩家摸13张 """
logger.info("开始发牌...")
random.shuffle(self.state.deck) # 洗牌
for player in range(4):
tiles_to_draw = 14 if player == self.state.current_player else 13
for _ in range(tiles_to_draw):
tile = self.state.deck.pop()
self.state.hands[player].add_tile(tile)
# 自动设置缺门
set_missing_suit(player, self.state)
logger.info("发牌结束并完成缺门设置!")
def play_turn(self):
"""
进行一回合的操作:当前玩家摸牌、出牌。
"""
logger.info(f"轮到玩家 {self.state.current_player} 行动")
# 玩家摸牌
reward, done = draw_tile(self)
if done:
logger.info("游戏因牌堆摸空而结束!")
return
# 玩家打牌(逻辑可扩展为选择最佳牌)
tile_to_discard = self.select_discard_tile(self.state.current_player)
discard_tile(self, tile_to_discard)
# 检查游戏是否结束
self.check_game_over()
# 切换到下一位玩家
self.state.current_player = (self.state.current_player + 1) % 4
def select_discard_tile(self, player):
"""
选择要打出的牌(此处为简单示例,可接入 AI 策略)。
"""
hand = self.state.hands[player]
# 优先打出缺门牌
for tile in hand.tiles:
if tile.suit == self.state.missing_suits[player]:
return tile
# 如果没有缺门牌,随机打出一张
return hand.tiles[0]
def check_game_over(self):
"""
检查游戏是否结束。
"""
# 检查是否已无牌可摸
if self.state.remaining_tiles == 0:
self.game_over = True
logger.info("游戏结束:牌堆已空")
return
# 检查是否满足血战结束条件
check_blood_battle(self)
def run(self):
"""
运行游戏主循环。
"""
self.initialize_game()
self.game_started = True
while not self.game_over:
self.play_turn()
logger.info("游戏已结束")

10
src/engine/dizhu/deck.py Normal file
View File

@@ -0,0 +1,10 @@
import numpy as np
class Deck:
def __init__(self):
self.cards = [i for i in range(54)] # 0-53 表示54张牌
np.random.shuffle(self.cards)
def deal(self):
# 返回三位玩家的手牌和地主牌
return self.cards[:17], self.cards[17:34], self.cards[34:51], self.cards[51:]

View File

@@ -0,0 +1,195 @@
import numpy as np
from loguru import logger
from src.engine.dizhu.player_state import PlayerState
from src.engine.dizhu.deck import Deck
from src.engine.dizhu.utils import card_to_string, detect_card_type
class DiZhuEngine:
def __init__(self):
self.deck = Deck() # 牌堆
self.players = [] # 玩家列表
self.landlord_index = -1 # 地主索引
self.current_player_index = 0 # 当前玩家索引
self.landlord_cards = [] # 地主牌
self.last_player = None # 最后出牌的玩家索引
self.current_pile = None # 当前牌面上的牌
self.game_over = False # 是否游戏结束
def reset(self):
"""
初始化游戏状态,包括发牌和分配角色。
"""
# 洗牌并发牌
p1_hand, p2_hand, p3_hand, landlord_cards = self.deck.deal()
self.landlord_cards = landlord_cards
# 创建玩家
self.players = [
PlayerState(p1_hand, "农民"),
PlayerState(p2_hand, "农民"),
PlayerState(p3_hand, "地主")
]
self.landlord_index = 2 # 默认玩家 3 为地主
self.current_player_index = 0
self.game_over = False
# 日志输出
logger.info("游戏初始化完成")
logger.info(f"地主牌: {[card_to_string(card) for card in self.landlord_cards]}")
for i, player in enumerate(self.players):
logger.info(f"玩家 {i + 1} ({player.role}) 手牌: {player.get_hand_cards_as_strings()}")
def get_current_player(self):
"""获取当前玩家对象"""
current_player = self.players[self.current_player_index]
return current_player
def step(self, action):
"""
执行动作并更新状态
:param action: 当前玩家的动作(可以是 'pass' 或一个动作列表)
"""
current_player = self.get_current_player()
if action == "pass":
self.pass_count += 1
# 如果所有其他玩家都过牌,允许最后出牌玩家再次出牌
if self.pass_count == 2: # 两名玩家连续过牌
self.current_player_index = self.last_player
self.pass_count = 0 # 重置过牌计数
self.current_pile = None # 清空当前牌面
else:
# 出牌逻辑
if not isinstance(action, list):
action = [action]
if not all(card in current_player.hand_cards for card in action):
raise ValueError(f"玩家手牌不足以完成此次出牌: {action}")
if self.current_pile and not self._can_beat(self.current_pile, action):
raise ValueError(f"出牌无法打过当前牌面: {action}")
# 出牌成功
self.current_pile = action # 更新当前牌面
self.pass_count = 0 # 出牌后重置过牌计数
self.last_player = self.current_player_index # 更新最后出牌的玩家
# 从手牌中移除
for card in action:
current_player.hand_cards.remove(card)
current_player.history.append(action)
# 检查游戏是否结束
if not current_player.hand_cards:
self.game_over = True
logger.info(f"游戏结束!玩家 {self.current_player_index + 1} ({current_player.role}) 获胜")
return f"{current_player.role} 胜利!"
# 切换到下一个玩家
self.current_player_index = (self.current_player_index + 1) % 3
def get_action_space(self):
"""
动态生成当前动作空间。
:return: 合法动作的列表
"""
valid_actions = ["pass"]
current_player = self.get_current_player()
# 遍历玩家手牌,生成所有可能的组合
hand_cards = current_player.hand_cards
valid_actions.extend(self._generate_valid_combinations(hand_cards))
return valid_actions
def _generate_valid_combinations(self, cards):
"""
根据手牌生成所有合法牌型组合
:param cards: 当前玩家的手牌
:return: 合法牌型的列表
"""
# 示例:生成单牌、对子和三张的合法组合
from itertools import combinations
valid_combinations = []
for i in range(1, len(cards) + 1):
for combo in combinations(cards, i):
if detect_card_type(list(combo)): # 检查是否为合法牌型
valid_combinations.append(list(combo))
return valid_combinations
def _can_beat(self, current_pile, action):
"""
检查当前动作是否能打过牌面上的牌。
:param current_pile: 当前牌面上的牌(列表)
:param action: 当前玩家要出的牌(列表)
:return: True 如果可以打过,否则 False
"""
current_type = detect_card_type(current_pile)
action_type = detect_card_type(action)
if not current_type or not action_type:
return False # 非法牌型
# 火箭可以压任何牌
if action_type == "火箭":
return True
# 炸弹可以压非炸弹的牌型
if action_type == "炸弹" and current_type != "炸弹":
return True
# 同牌型比较大小
if current_type == action_type:
return max(action) > max(current_pile)
return False # 其他情况不合法
def get_game_state(self):
"""
返回当前游戏状态,包括玩家手牌、出牌历史和当前玩家。
"""
state = {
"landlord_cards": self.landlord_cards,
"players": [
{
"role": player.role,
"hand_cards": player.hand_cards,
"history": player.history,
}
for player in self.players
],
"current_player_index": self.current_player_index,
"game_over": self.game_over,
}
logger.info("当前游戏状态: ")
logger.info(f"游戏是否结束: {self.game_over}")
return state
def is_valid_play(self, cards):
"""
检查给定的牌是否为合法的斗地主牌型。
:param cards: 玩家出的牌(列表)
:return: True 如果是合法牌型,否则 False
"""
if len(cards) == 1:
return True # 单牌
if len(cards) == 2 and cards[0] == cards[1]:
return True # 对子
if len(cards) == 3 and cards[0] == cards[1] == cards[2]:
return True # 三张
if len(cards) == 4:
counts = {card: cards.count(card) for card in set(cards)}
if 3 in counts.values():
return True # 三带一
if all(count == 4 for count in counts.values()):
return True # 炸弹
if len(cards) >= 5:
# 顺子
sorted_cards = sorted(cards)
if all(sorted_cards[i] + 1 == sorted_cards[i + 1] for i in range(len(sorted_cards) - 1)):
return True
# TODO: 扩展支持更多牌型(如连对、飞机等)
return False

View File

@@ -0,0 +1,22 @@
from collections import deque
from src.engine.dizhu.utils import card_to_string
class PlayerState:
def __init__(self, hand_cards, role):
self.hand_cards = hand_cards # 玩家手牌
self.role = role # "地主" 或 "农民"
self.history = deque() # 出牌历史,使用 deque
def get_hand_cards_as_strings(self):
"""
获取玩家手牌的具体牌型字符串
:return: 手牌字符串列表
"""
return [card_to_string(card) for card in self.hand_cards]
def __repr__(self):
"""
返回玩家的字符串表示,包括手牌和角色
"""
hand_cards_str = ", ".join(self.get_hand_cards_as_strings())
return f"玩家角色: {self.role}, 手牌: [{hand_cards_str}]"

View File

@@ -0,0 +1,32 @@
class DouDiZhuScoring:
def __init__(self, base_score=1):
self.base_score = base_score # 底分
self.multiplier = 1 # 倍数
self.landlord_win = False # 地主是否胜利
def apply_event(self, event):
"""
根据游戏事件调整倍数。
:param event: 事件类型,如 "炸弹", "火箭", "春天", "反春天"
"""
if event in ["炸弹", "火箭", "春天", "反春天"]:
self.multiplier *= 2
elif event == "抢地主":
self.multiplier += 1
def calculate_score(self, landlord_win):
"""
计算最终分数。
:param landlord_win: 地主是否胜利
:return: 地主分数,农民分数
"""
self.landlord_win = landlord_win
if landlord_win:
landlord_score = 2 * self.base_score * self.multiplier
farmer_score = -self.base_score * self.multiplier
else:
landlord_score = -2 * self.base_score * self.multiplier
farmer_score = self.base_score * self.multiplier
return landlord_score, farmer_score

48
src/engine/dizhu/utils.py Normal file
View File

@@ -0,0 +1,48 @@
def card_to_string(card_index):
"""
将牌的索引转换为具体牌型的字符串表示
:param card_index: 牌的索引0-53
:return: 具体牌型字符串
"""
suits = ['♠️', '♥️', '♦️', '♣️'] # 花色
values = ['3', '4', '5', '6', '7', '8', '9', '10', 'J', 'Q', 'K', 'A', '2']
if card_index < 52:
# 普通牌:计算花色和牌面值
value = values[card_index // 4]
suit = suits[card_index % 4]
return f"{suit}{value}"
elif card_index == 52:
return "小王"
elif card_index == 53:
return "大王"
else:
raise ValueError(f"无效的牌索引: {card_index}")
def detect_card_type(cards):
"""
检测牌型
:param cards: 玩家出的牌(列表)
:return: 牌型字符串或 None非法牌型
"""
if len(cards) == 1:
return "单牌"
if len(cards) == 2 and cards[0] == cards[1]:
return "对子"
if len(cards) == 3 and cards[0] == cards[1] == cards[2]:
return "三张"
if len(cards) == 4 and cards[0] == cards[1] == cards[2] == cards[3]:
return "炸弹"
# 三带一
if len(cards) == 4:
counts = {card: cards.count(card) for card in set(cards)}
if 3 in counts.values():
return "三带一"
# 顺子
if len(cards) >= 5 and all(cards[i] + 1 == cards[i + 1] for i in range(len(cards) - 1)):
return "顺子"
# TODO: 实现其他牌型判断(如连对、飞机等)
return None

View File

@@ -1,49 +0,0 @@
from src.engine.mahjong_tile import MahjongTile
from collections import defaultdict
class Hand:
def __init__(self):
# 存储所有的 MahjongTile 对象
self.tiles = []
# 存储每种牌的数量,键为 MahjongTile 对象,值为数量
self.tile_count = defaultdict(int)
def add_tile(self, tile):
""" 向手牌中添加一张牌 """
if not isinstance(tile, MahjongTile):
raise ValueError("必须添加 MahjongTile 类型的牌")
self.tiles.append(tile) # 将牌添加到手牌中
self.tile_count[tile] += 1 # 增加牌的数量
def remove_tile(self, tile):
""" 从手牌中移除一张牌 """
if not isinstance(tile, MahjongTile):
raise ValueError("必须移除 MahjongTile 类型的牌")
if self.tile_count[tile] > 0:
self.tiles.remove(tile)
self.tile_count[tile] -= 1
else:
raise ValueError(f"手牌中没有该牌: {tile}")
def get_tile_count(self, tile):
""" 获取手牌中某张牌的数量 """
if not isinstance(tile, MahjongTile):
raise ValueError("必须是 MahjongTile 类型的牌")
return self.tile_count[tile]
def can_peng(self, tile):
""" 判断是否可以碰即是否已经有2张相同的牌摸一张牌后可以碰 """
if not isinstance(tile, MahjongTile):
raise ValueError("必须是 MahjongTile 类型的牌")
return self.tile_count[tile] == 2 # 摸一张牌后总数为 3 张,才可以碰
def can_gang(self, tile):
""" 判断是否可以杠即是否已经有3张相同的牌摸一张牌后可以杠 """
if not isinstance(tile, MahjongTile):
raise ValueError("必须是 MahjongTile 类型的牌")
return self.tile_count[tile] == 4 # 摸一张牌后总数为 4 张,才可以杠
def __repr__(self):
""" 返回手牌的字符串表示 """
tiles_str = ", ".join(str(tile) for tile in self.tiles)
return f"手牌: [{tiles_str}], 牌的数量: {dict(self.tile_count)}"

View File

@@ -0,0 +1,494 @@
import random as random_module
from loguru import logger
from src.engine.mahjong.calculate_fan import calculate_fan
from src.engine.mahjong.mahjong_tile import MahjongTile
from src.engine.mahjong.meld import Meld
def draw_tile(engine):
"""
当前玩家摸牌逻辑,按照顺序从牌堆顶部摸牌,并记录牌的详细信息和游戏状态。
"""
# 检查牌堆是否已空
if engine.state.remaining_tiles == 0:
logger.warning("牌堆已空,游戏结束!")
engine.game_over = True
return None, True # 游戏结束时返回 None 和 done = True
# 当前玩家
current_player = engine.state.current_player
# 从牌堆顶部摸一张牌
tile = engine.state.deck.pop(0) # 按顺序从牌堆取出一张牌
engine.state.remaining_tiles -= 1 # 更新剩余牌数
engine.state.hands[current_player].add_tile(tile) # 将牌对象加入当前玩家手牌
# 获取牌名
tile_name = str(tile) # 调用 MahjongTile 的 __repr__ 方法
logger.info(
f"玩家 {current_player} 摸到一张牌: {tile_name}"
f"剩余牌堆数量: {engine.state.remaining_tiles}"
)
# 检查摸到的牌是否属于缺门
missing_suit = engine.state.missing_suits[current_player]
if tile.suit == missing_suit:
logger.info(f"玩家 {current_player} 摸到缺门牌: {tile_name},需要优先打出")
# 返回摸到的牌和游戏是否结束的标志
return tile, False # 返回摸到的牌对象和游戏继续的标志
def discard_tile(self, tile):
"""
当前玩家打牌逻辑,记录打出的牌和当前牌河信息。
"""
current_player = self.state.current_player
hand = self.state.hands[current_player]
# 检查牌的有效性
if not isinstance(tile, MahjongTile):
logger.error(f"玩家 {current_player} 尝试打出无效的牌: {tile}")
raise ValueError("打出的牌必须是 MahjongTile 对象")
# 检查是否有这张牌
if hand.tile_count[tile] == 0:
logger.error(f"玩家 {current_player} 尝试打出不存在的牌: {tile}")
raise ValueError("玩家没有这张牌")
# 检查缺门规则
missing_suit = self.state.missing_suits[current_player]
if tile.suit == missing_suit and any(t.suit == missing_suit for t in hand.tiles):
logger.error(f"玩家 {current_player} 仍有缺门的牌: {tile}")
raise ValueError("必须先打完缺门花色的牌")
# 从手牌中移除
hand.remove_tile(tile)
self.state.discards[current_player].append(tile)
# 打出的牌名
tile_name = str(tile)
# 打出牌后打印状态
logger.info(
f"玩家 {current_player} 打出一张牌: {tile_name}"
f"当前牌河: {[str(t) for t in self.state.discards[current_player]]}"
)
# 检查是否触发其他玩家的操作(碰、杠、胡牌)
self.check_other_players(tile)
return tile
def peng(self, tile):
"""
当前玩家碰牌逻辑,记录碰牌操作和手牌状态。
"""
player = self.state.current_player
hand = self.state.hands[player]
if hand.tile_count[tile] < 2:
logger.error(f"玩家 {player} 尝试碰牌失败: {tile}")
raise ValueError("碰牌条件不满足")
# 从手牌中移除两张牌
hand.tile_count[tile] -= 2
self.state.melds[player].append(("", tile)) # 加入明牌列表
logger.info(f"玩家 {player} 碰了一张牌: {tile}。当前明牌: {self.state.melds[player]}")
def gang(self, tile, mode):
"""
当前玩家杠牌逻辑,记录杠牌类型和状态更新。
"""
player = self.state.current_player
# 检查牌的有效性
if not isinstance(tile, MahjongTile):
logger.error(f"玩家 {player} 尝试杠牌时提供了无效的牌: {tile}")
raise ValueError("杠的牌必须是 MahjongTile 对象")
tile_name = str(tile) # 使用 MahjongTile 的 __repr__ 方法
if mode == "ming":
# 明杠逻辑
if self.state.hands[player].tile_count[tile] >= 3:
self.state.hands[player].tile_count[tile] -= 3 # 移除三张牌
self.state.melds[player].append(("ming_gang", tile)) # 添加到明牌
logger.info(f"玩家 {player} 明杠成功: {tile_name}")
self.state.scores[player] += 1 # 明杠奖励1分
logger.info(f"玩家 {player} 因明杠获得1分当前得分: {self.state.scores[player]}")
else:
logger.error(f"玩家 {player} 明杠失败: 手中牌数量不足")
raise ValueError("明杠条件不满足,需要至少三张相同的牌")
elif mode == "an":
# 暗杠逻辑
if self.state.hands[player].tile_count[tile] >= 4:
self.state.hands[player].tile_count[tile] -= 4 # 移除四张牌
self.state.melds[player].append(("an_gang", tile)) # 添加到明牌
logger.info(f"玩家 {player} 暗杠成功: {tile_name}")
self.state.scores[player] += 1 # 暗杠奖励1分
logger.info(f"玩家 {player} 因暗杠获得1分当前得分: {self.state.scores[player]}")
else:
logger.error(f"玩家 {player} 暗杠失败: 手中牌数量不足")
raise ValueError("暗杠条件不满足,需要至少四张相同的牌")
else:
logger.error(f"玩家 {player} 提供了无效的杠牌类型: {mode}")
raise ValueError("无效的杠牌类型,仅支持 'ming''an''bu'")
def check_blood_battle(self):
"""
检查游戏是否流局或血战结束,记录状态。
"""
if any(score <= 0 for score in self.state.scores):
logger.info(f"游戏结束某玩家分数小于等于0: {self.state.scores}")
self.game_over = True
if len(self.state.winners) >= 3 or self.state.remaining_tiles == 0:
logger.info(f"游戏结束,赢家列表: {self.state.winners}")
self.game_over = True
def set_missing_suit(player, game_state):
"""
玩家自动根据手牌选择缺门。
参数:
- player: 玩家索引0-3
- game_state: 当前的游戏状态(`ChengduMahjongState` 实例)。
返回:
- str: 玩家设置的缺门花色。
"""
valid_suits = ["", "", ""]
hand = game_state.hands[player] # 获取玩家手牌
# 统计每种花色的牌数量
suit_counts = {suit: 0 for suit in valid_suits}
for tile in hand.tiles:
suit_counts[tile.suit] += 1
# 找到数量最少的花色
missing_suit = min(suit_counts, key=suit_counts.get)
# 检查是否已经设置过缺门
if game_state.missing_suits[player] is not None:
logger.warning(f"玩家 {player} 已设置过缺门,不能重复设置")
raise ValueError("缺门已经设置,不能重复设置")
# 设置缺门并记录日志
game_state.missing_suits[player] = missing_suit
logger.info(
f"玩家 {player} 手牌花色分布: {suit_counts}。缺门设置为: {missing_suit}"
)
return missing_suit
def check_other_players(self, tile):
"""
检查其他玩家是否可以对打出的牌进行操作(如胡牌、杠、碰)。
优先级为:胡牌 > 杠牌 > 碰牌。
如果有玩家选择操作,修改游戏状态和出牌顺序。
"""
current_player = self.state.current_player
actions_taken = False
for player in range(4):
if player == current_player:
continue
# 优先检查胡牌
if self.state.can_win(self.state.hands[player], self.state.melds[player], self.state.missing_suits[player]):
logger.info(f"玩家 {player} 可以胡玩家 {current_player} 的牌: {tile}")
handle_win(player, current_player, tile)
actions_taken = True
break # 胡牌后结束
# 检查是否可以杠牌
if self.state.hands[player].tile_count[tile] >= 3:
logger.info(f"玩家 {player} 可以杠玩家 {current_player} 的牌: {tile}")
if handle_gang(self,player, tile, mode="ming"): # 执行明杠逻辑
actions_taken = True
break # 杠牌后不检查其他玩家
# 检查是否可以碰牌
if self.state.hands[player].tile_count[tile] >= 2:
logger.info(f"玩家 {player} 可以碰玩家 {current_player} 的牌: {tile}")
if handle_peng(self,player, tile): # 执行碰牌逻辑
actions_taken = True
break # 碰牌后不检查其他玩家
if not actions_taken:
logger.info(f"玩家 {current_player} 打出的牌 {tile} 没有触发其他玩家的操作")
return actions_taken
def handle_peng(self, player, tile):
"""
处理玩家碰牌逻辑并更新出牌顺序。
"""
if not isinstance(tile, MahjongTile):
logger.error(f"tile 必须是 MahjongTile 类型,但收到的是: {type(tile)}")
return False
if self.state.hands[player].tile_count[tile] < 2:
logger.error(f"玩家 {player} 无法碰牌: {tile}")
return False
# 减少手牌中的牌数量
self.state.hands[player].remove_tile(tile) # 移除第一张
self.state.hands[player].remove_tile(tile) # 移除第二张
# 添加到明牌区
self.state.melds[player].append(Meld(tile, "")) # 使用 Meld 类表示明牌
logger.info(f"玩家 {player} 碰了牌: {tile}。当前明牌: {self.state.melds[player]}")
return True
def get_player_discard_choice(self, player):
"""
模拟获取玩家打牌的选择。
在实际项目中,使用 GUI 或 AI 决策替代。
"""
hand_tiles = self.state.hands[player].tiles
logger.info(f"玩家 {player} 当前手牌: {[str(tile) for tile in hand_tiles]}")
# 模拟玩家选择打出第一张非碰牌
chosen_tile = hand_tiles[0] # 在真实项目中替换为实际用户输入或AI决策
logger.info(f"玩家 {player} 选择打出牌: {chosen_tile}")
return chosen_tile
def handle_gang(self, player, tile, mode):
"""
处理玩家杠牌逻辑、计算分数并更新状态。
:param player: 杠牌玩家索引
:param tile: 杠牌的那张牌
:param mode: 杠牌的类型 ("ming""an")
:return: True 如果操作成功,否则 False
"""
if not isinstance(tile, MahjongTile):
logger.error(f"玩家 {player} 杠牌的牌无效: {tile}")
return False
base_score = self.state.bottom_score # 底分
if mode == "ming": # 明杠逻辑
if self.state.hands[player].tile_count[tile] < 3:
logger.error(f"玩家 {player} 无法明杠: {tile}")
return False
# 更新状态
self.update_meld(player, tile, "", count=3)
# 明杠分数计算
gang_score = base_score * 2
for i in range(4):
if i != player:
self.state.scores[i] -= gang_score
self.state.scores[player] += gang_score * 3
logger.info(f"玩家 {player} 明杠,总分变化: +{gang_score * 3},其他玩家每人扣分: -{gang_score}")
return True
elif mode == "an": # 暗杠逻辑
if self.state.hands[player].tile_count[tile] < 4:
logger.error(f"玩家 {player} 无法暗杠: {tile}")
return False
# 更新状态
update_meld(player, tile, "", count=4)
# 暗杠分数计算
gang_score = base_score * 4
for i in range(4):
if i != player:
self.state.scores[i] -= gang_score
self.state.scores[player] += gang_score * 3
logger.info(f"玩家 {player} 暗杠,总分变化: +{gang_score * 3},其他玩家每人扣分: -{gang_score}")
return True
def update_meld(self, player, tile, meld_type, count):
"""
更新玩家的明牌状态,并移除相应的牌。
"""
self.state.hands[player].tile_count[tile] -= count
self.state.melds[player].append((meld_type, tile))
logger.info(f"玩家 {player} 更新明牌: {meld_type} {tile},当前明牌: {self.state.melds[player]}")
def random_discard_tile(engine):
"""
当前玩家随机选择一张牌打出,优先打缺门牌。
"""
current_player = engine.state.current_player
hand = engine.state.hands[current_player].tiles # 当前玩家的手牌
missing_suit = engine.state.missing_suits[current_player] # 当前玩家的缺门
if not hand:
logger.error(f"玩家 {current_player} 的手牌为空,无法打牌")
return
# 使用改进后的 random_choice 函数选择牌
tile = random_choice(hand, missing_suit)
# 从手牌中移除该牌
engine.state.hands[current_player].remove_tile(tile)
engine.state.discards[current_player].append(tile)
logger.info(
f"玩家 {current_player} 打出了一张牌: {tile}"
f"当前牌河: {[str(t) for t in engine.state.discards[current_player]]}"
)
# 检查其他玩家是否可以操作
engine.check_other_players(tile)
# 切换到下一个玩家
next_player = (current_player + 1) % 4
engine.state.current_player = next_player
def random_choice(hand, missing_suit):
"""
根据缺门优先随机选择打出的牌。
"""
if not hand:
raise ValueError("手牌不能为空")
# 筛选出缺门的牌
missing_suit_tiles = [tile for tile in hand if tile.suit == missing_suit]
# 如果缺门牌存在,优先从缺门牌中随机选择
if missing_suit_tiles:
index = random_module.randint(0, len(missing_suit_tiles) - 1)
return missing_suit_tiles[index]
# 如果缺门牌不存在,从剩余牌中随机选择
index = random_module.randint(0, len(hand) - 1)
return hand[index]
def should_gang(ai_player, state, gang_type):
"""
判断 AI 是否选择杠牌
:param ai_player: 当前玩家索引
:param state: 游戏状态对象
:param gang_type: 杠牌类型(暗杠或明杠)
:return: True 表示杠False 表示不杠
"""
# 获取当前玩家分数
current_score = state.scores[ai_player]
# 获取当前玩家手牌
hand = state.hands[ai_player]
# 获取局势信息
draw_counts = state.draw_counts
remaining_tiles = state.remaining_tiles
# 基础策略:如果当前分数远低于平均分,优先杠
average_score = sum(state.scores) / len(state.scores)
if current_score < average_score * 0.8:
return True
# 检查听牌状态:如果杠后听牌,优先杠
# if is_ready_to_win(hand, state.melds[ai_player], state.missing_suits[ai_player]):
# return True
# 根据杠牌类型调整策略
if gang_type == "暗杠":
# 暗杠通常安全,偏向杠
return True
elif gang_type == "明杠":
# 明杠可能被针对,后期优先考虑杠
return remaining_tiles < 30 # 剩余牌少于 30 张时偏向杠
# 默认不杠
return False
def select_discard_tile(self, player):
"""
选择要打出的牌(此处为简单示例,可接入 AI 策略)。
"""
hand = self.state.hands[player]
# 优先打出缺门牌
for tile in hand.tiles:
if tile.suit == self.state.missing_suits[player]:
return tile
# 如果没有缺门牌,随机打出一张
return hand.tiles[0]
def handle_win(self, player, current_player, tile):
"""
处理胡牌逻辑,包括动态计算番数和分数。
:param player: 胡牌玩家索引
:param current_player: 打出牌的玩家索引(若自摸为 None
:param tile: 胡牌的那张牌
"""
logger.info(f"玩家 {player} 胡牌!胡的牌是: {tile if tile else '自摸'}")
# 判断是否地胡
is_dihu = self.state.draw_counts[player] == 0 and player != self.state.current_player
is_self_draw = current_player is None
# 动态计算番数
fan_count = calculate_fan(
hand=self.state.hands[player],
melds=self.state.melds[player],
is_self_draw=is_self_draw,
winning_tile=tile,
conditions={"is_tian_hu": False, "is_di_hu": is_dihu}
)
# 分数计算
base_score = self.state.bottom_score
win_score = base_score * (2 ** fan_count)
if is_self_draw:
# 自摸结算
for i in range(4):
if i != player:
self.state.scores[i] -= win_score
self.state.scores[player] += win_score * 3
else:
# 点炮结算
self.state.scores[player] += win_score * 3
self.state.scores[current_player] -= win_score
# 更新赢家状态
self.state.winners.append(player)
self.state.print_game_state(player)
# 输出日志
logger.info(f"玩家 {player} 胡牌类型: {'地胡' if is_dihu else '普通胡牌'}")
logger.info(f"玩家 {player} 总番数: {fan_count}")
logger.info(f"玩家 {current_player if current_player is not None else '所有其他玩家'} 扣分: {win_score}")
logger.info(f"玩家 {player} 加分: {win_score * 3}")
logger.info(f"当前分数: {self.state.scores}")
def update_meld(self, player, tile, meld_type, count):
"""
更新玩家的明牌状态,并移除相应的牌。
"""
self.state.hands[player].tile_count[tile] -= count
self.state.melds[player].append((meld_type, tile))
logger.info(f"玩家 {player} 更新明牌: {meld_type} {tile},当前明牌: {self.state.melds[player]}")

View File

@@ -0,0 +1,44 @@
from src.engine.mahjong.fan_type import is_terminal_fan,is_cleared,is_full_request,is_seven_pairs,is_basic_win,is_dragon_seven_pairs
from loguru import logger
def calculate_fan(hand, melds, is_self_draw, winning_tile, conditions):
"""
动态计算番数,根据现有的番型规则。
参数:
- hand: 当前胡牌的手牌Hand 对象)。
- melds: 玩家已明牌的列表Meld 对象列表)。
- is_self_draw: 是否自摸。
- winning_tile: 胡的那张牌MahjongTile 对象)。
- conditions: 字典,包含特殊胡牌条件,如 {"is_tian_hu": True}。
返回:
- int: 最大番数。
"""
fan = 0 # 初始化番数
# 定义番型规则(按优先级从高到低排序)
rules = [
("tian_hu", lambda: 12 if conditions.get("is_tian_hu", False) else 0), # 天胡
("di_hu", lambda: 12 if conditions.get("is_di_hu", False) else 0), # 地胡
("dragon_seven_pairs", lambda: is_dragon_seven_pairs(hand, melds)[0]), # 龙七对
("seven_pairs", lambda: is_seven_pairs(hand)), # 七对
("full_request", lambda: is_full_request(hand, melds, winning_tile)), # 全求人
("cleared", lambda: is_cleared(hand, melds)), # 清一色
("terminal_fan", lambda: is_terminal_fan(hand, melds)), # 带幺九
("plain_win", lambda: is_basic_win(hand)), # 平胡
]
# 逐一应用规则,取最大番数
for rule, func in rules:
current_fan = func()
if current_fan > fan:
fan = current_fan
logger.debug(f"应用番型规则 {rule}: {current_fan}")
# 特殊条件(例如自摸加番)
if is_self_draw:
fan += 1 # 自摸额外加 1 番
logger.debug("自摸加 1 番")
return fan

View File

@@ -0,0 +1,151 @@
import random
from loguru import logger
from configs.log_config import setup_logging
from src.engine.mahjong.actions import draw_tile, random_choice, handle_win, handle_gang, handle_peng
from src.engine.mahjong.actions import set_missing_suit
from src.engine.mahjong.chengdu_mahjong_state import ChengduMahjongState
class ChengduMahjongEngine:
def __init__(self):
self.state = ChengduMahjongState()
self.game_over = False
self.game_started = False
self.current_player = 0
def initialize_game(self):
"""
初始化游戏,确定庄家,发牌并设置缺门。
"""
logger.info("游戏初始化...")
setup_logging()
# 确定庄家(掷骰子)
self.state.current_player = random.randint(0, 3)
logger.info(f"庄家确定为玩家: {self.state.current_player}")
logger.info("游戏初始化完成,准备开始!")
def deal_tiles(self):
""" 发牌庄家摸14张其余玩家摸13张 """
logger.info("开始发牌...")
random.shuffle(self.state.deck) # 洗牌
for player in range(4):
tiles_to_draw = 14 if player == self.state.current_player else 13
for _ in range(tiles_to_draw):
tile = self.state.deck.pop()
self.state.hands[player].add_tile(tile)
# 自动设置缺门
set_missing_suit(player, self.state)
# 记录发牌后的牌堆数量
self.state.remaining_tiles = len(self.state.deck)
logger.info(f"发牌结束并完成缺门设置!当前牌堆剩余数量: {self.state.remaining_tiles}")
def play_turn(self):
current_player = self.state.current_player
# 玩家摸牌逻辑
draw_tile(self)
# 玩家选择一张牌打出
tile = random_choice(self.state.hands[current_player], self.state.missing_suits[current_player])
logger.info(f"玩家 {current_player} 选择打牌: {tile}")
# 检查其他玩家是否可以对该牌进行操作
actions_taken = self.check_other_players(tile)
if not actions_taken:
# 将牌加入弃牌堆
self.state.discards[current_player].append(tile)
logger.info(f"玩家 {current_player} 打出的牌 {tile} 没有触发其他玩家的操作")
# 切换到下一位玩家
self.state.current_player = (current_player + 1) % 4
logger.info(f"轮到玩家 {self.state.current_player} 出牌")
# 检查游戏结束条件
self.check_game_over()
def check_game_over(self):
"""
检查游戏是否结束。
游戏结束条件:
1. 牌堆已空。
2. 赢的玩家数量 >= 3。
"""
# 检查是否已无牌可摸
if self.state.remaining_tiles == 0:
self.game_over = True
logger.info("游戏结束:牌堆已空")
return
# 检查是否满足血战结束条件:赢家数量 >= 3
if len(self.state.winners) >= 3:
self.game_over = True
logger.info(f"游戏结束:赢家数量达到 {len(self.state.winners)}")
return
# 如果没有触发结束条件,继续游戏
logger.info(f"当前赢家数量: {len(self.state.winners)},游戏继续")
def run(self):
"""
运行游戏主循环。
"""
self.initialize_game()
self.game_started = True
while not self.game_over:
self.play_turn()
logger.info("游戏已结束")
def check_other_players(self, tile):
"""
检查其他玩家是否可以对打出的牌进行操作(如胡牌、杠、碰)。
优先级为:胡牌 > 杠牌 > 碰牌。
如果有玩家选择操作,修改游戏状态和出牌顺序。
"""
current_player = self.state.current_player
actions_taken = False
for player in range(4):
if player == current_player:
continue
# 优先检查胡牌
if self.state.can_win(self.state.hands[player], self.state.melds[player], self.state.missing_suits[player]):
logger.info(f"玩家 {player} 可以胡玩家 {current_player} 的牌: {tile}")
handle_win(player, current_player, tile)
actions_taken = True
break # 胡牌后结束
# 检查是否可以杠牌
if self.state.hands[player].tile_count[tile] >= 3:
logger.info(f"玩家 {player} 可以杠玩家 {current_player} 的牌: {tile}")
if handle_gang(self, player, tile, mode="ming"): # 执行明杠逻辑
actions_taken = True
break # 杠牌后不检查其他玩家
# 检查是否可以碰牌
if self.state.hands[player].tile_count[tile] >= 2:
logger.info(f"玩家 {player} 可以碰玩家 {current_player} 的牌: {tile}")
if handle_peng(self, player, tile): # 执行碰牌逻辑
actions_taken = True
break # 碰牌后不检查其他玩家
if not actions_taken:
logger.info(f"玩家 {current_player} 打出的牌 {tile} 没有触发其他玩家的操作")
return actions_taken
def update_meld(self, player, tile, meld_type, count):
"""
更新玩家的明牌状态,并移除相应的牌。
"""
self.state.hands[player].tile_count[tile] -= count
self.state.melds[player].append((meld_type, tile))
logger.info(f"玩家 {player} 更新明牌: {meld_type} {tile},当前明牌: {self.state.melds[player]}")

View File

@@ -1,7 +1,7 @@
from collections import Counter
from src.engine.hand import Hand
from src.engine.mahjong_tile import MahjongTile
from src.engine.meld import Meld
from src.engine.mahjong.hand import Hand
from src.engine.mahjong.mahjong_tile import MahjongTile
from src.engine.mahjong.meld import Meld
from loguru import logger
@@ -17,12 +17,16 @@ class ChengduMahjongState:
self.deck = [MahjongTile(suit, value) for suit in ["", "", ""] for value in range(1, 10)] * 4 # 108张牌
# 当前玩家索引
self.current_player = 0
# 底分
self.bottom_score = 1
# 玩家分数
self.scores = [100, 100, 100, 100]
# 剩余牌数量
self.remaining_tiles = len(self.deck)
# 胜利玩家列表
self.winners = []
# 记录每个玩家的抓牌次数
self.draw_counts = [0] * 4
# 缺门信息
self.missing_suits = [None] * 4 # 每个玩家的缺门("条"、"筒" 或 "万"
@@ -113,7 +117,7 @@ class ChengduMahjongState:
# **第一步:检查花色限制**
suits = {tile.suit for tile in hand.tiles}
if len(suits) > 2:
logger.info("花色超过两种,不能胡牌")
# logger.info("花色超过两种,不能胡牌")
return False # 花色超过两种,不能胡牌
# 检查是否打完缺门的花色

View File

@@ -1,4 +1,4 @@
from src.engine.utils import try_win,is_terminal_tile
from src.engine.mahjong.utils import try_win
from collections import Counter
def is_basic_win(hand):
@@ -40,7 +40,7 @@ def is_cleared(hand, melds):
return 0
def calculate_terminal_fan(hand, melds):
def is_terminal_fan(hand, melds):
"""
计算带幺九番型并返回对应番数
"""

View File

@@ -0,0 +1,89 @@
from src.engine.mahjong.mahjong_tile import MahjongTile
from collections import defaultdict
class Hand:
def __init__(self):
# 存储所有的 MahjongTile 对象
self.tiles = []
# 存储每种牌的数量,键为 MahjongTile 对象,值为数量
self.tile_count = defaultdict(int)
def add_tile(self, tile):
""" 向手牌中添加一张牌 """
if not isinstance(tile, MahjongTile):
raise ValueError("必须添加 MahjongTile 类型的牌")
if len(self.tiles) > 14:
raise ValueError("手牌数量不能超过 14 张")
self.tiles.append(tile) # 将牌添加到手牌中
self.tile_count[tile] += 1 # 增加牌的数量
def remove_tile(self, tile):
""" 从手牌中移除一张牌 """
if not isinstance(tile, MahjongTile):
raise ValueError("必须移除 MahjongTile 类型的牌")
if self.tile_count[tile] > 0:
self.tiles.remove(tile)
self.tile_count[tile] -= 1
else:
raise ValueError(f"手牌中没有该牌: {tile}")
def get_tile_count(self, tile):
""" 获取手牌中某张牌的数量 """
if not isinstance(tile, MahjongTile):
raise ValueError("必须是 MahjongTile 类型的牌")
return self.tile_count[tile]
def can_peng(self, tile):
""" 判断是否可以碰即是否已经有2张相同的牌摸一张牌后可以碰 """
if not isinstance(tile, MahjongTile):
raise ValueError("必须是 MahjongTile 类型的牌")
return self.tile_count[tile] == 2 # 摸一张牌后总数为 3 张,才可以碰
def can_gang(self, tile=None):
"""
判断是否可以杠牌。
两种情况:
1. 当前手牌中已经有 4 张相同的牌,可以选择杠。
2. 当前手牌中有 3 张相同的牌,摸到第 4 张后可以杠。
:param tile: 需要判断的牌(可以为空)。
:return: True 如果可以杠,否则 False。
"""
if tile is not None:
# 情况 1: 摸到一张牌后形成四张
if not isinstance(tile, MahjongTile):
raise ValueError("必须是 MahjongTile 类型的牌")
return self.tile_count[tile] == 4
else:
# 情况 2: 手牌中已有四张一样的牌
for t, count in self.tile_count.items():
if count == 4:
return True
return False
def get_gang_type(self, tile=None):
"""
判断杠牌的类型(暗杠或明杠)。
:param tile: 如果指定了牌,检查是否可以明杠。
:return: 杠牌类型,"暗杠""明杠",如果不能杠返回 None。
"""
if tile is not None:
# 明杠的判断逻辑手牌中已有3张相同的牌摸到第4张
if not isinstance(tile, MahjongTile):
raise ValueError("必须是 MahjongTile 类型的牌")
if self.tile_count[tile] == 4:
return "明杠"
else:
# 暗杠的判断逻辑手牌中已有4张相同的牌
for t, count in self.tile_count.items():
if count == 4:
return "暗杠"
return None
def __repr__(self):
""" 返回手牌的字符串表示 """
tiles_str = ", ".join(str(tile) for tile in self.tiles)
return f"手牌: [{tiles_str}], 牌的数量: {dict(self.tile_count)}"
def __iter__(self):
"""使 Hand 对象可迭代,直接迭代其 tiles 列表"""
return iter(self.tiles)

View File

@@ -6,6 +6,7 @@ class MahjongTile:
raise ValueError("Invalid tile")
self.suit = suit
self.value = value
self.index = ({"": 0, "": 1, "": 2}[suit]) * 9 + (value - 1)
def __repr__(self):
return f"{self.value}{self.suit}"

View File

@@ -1,4 +1,4 @@
from src.engine.mahjong_tile import MahjongTile
from src.engine.mahjong.mahjong_tile import MahjongTile
class Meld:
def __init__(self, tile, type: str):

View File

View File

@@ -0,0 +1,220 @@
import gym
from gym import spaces
import numpy as np
from src.engine.mahjong.actions import handle_peng, handle_gang, handle_win
from src.engine.mahjong.chengdu_mahjong_engine import ChengduMahjongEngine
from loguru import logger
class ChengduMahjongEnv(gym.Env):
def __init__(self):
super().__init__()
# 初始化麻将引擎
self.engine = ChengduMahjongEngine()
# 定义观察空间:手牌、明牌、弃牌和庄家信息
self.observation_space = spaces.Dict({
"hand": spaces.Box(low=0, high=4, shape=(108,), dtype=np.int32), # 手牌数量
"melds": spaces.Box(low=0, high=4, shape=(108,), dtype=np.int32), # 明牌数量
"discard_pile": spaces.Box(low=0, high=4, shape=(108,), dtype=np.int32), # 弃牌数量
"dealer": spaces.Discrete(4), # 当前庄家
})
# 初始化游戏
self.reset()
@property
def action_space(self):
"""
动态生成当前动作空间。
"""
valid_actions = self.get_action_space()
# 动态生成离散动作空间的大小
return spaces.Discrete(len(valid_actions))
def reset(self):
"""重置游戏状态"""
self.engine = ChengduMahjongEngine() # 重置引擎
self.engine.initialize_game()
self.engine.deal_tiles()
return self._get_observation()
def step(self, action):
"""
执行动作,更新状态并返回结果。
:param action: 动作0-13 表示打牌, 14 表示碰, 15 表示杠, 16 表示胡)
:return: obs, reward, done, info
"""
current_player = self.engine.state.current_player
hand = self.engine.state.hands[current_player].tiles # 当前玩家手牌
logger.info(f"玩家 {current_player} 手牌: {self.engine.state.hands[current_player].tiles}")
# **检查动作合法性**
max_hand_actions = len(hand) # 当前玩家手牌数量
max_action_index = max_hand_actions + 3 # 打牌 + 特殊动作
if action >= max_action_index:
raise ValueError(f"无效的动作: {action}")
# **执行动作**
if action < max_hand_actions: # 打牌动作
tile = hand[action]
# logger.info(f"玩家 {current_player} 选择打牌: {tile}")
self.engine.check_other_players(tile)
elif action == max_hand_actions: # 碰
tile_to_peng = self._get_tile_for_special_action("peng")
if tile_to_peng:
handle_peng(self.engine, current_player, tile_to_peng)
logger.info(f"玩家 {current_player} 碰了牌: {tile_to_peng}")
else:
logger.warning("碰动作无效,未满足条件")
elif action == max_hand_actions + 1: # 杠
tile_to_gang = self._get_tile_for_special_action("gang")
if tile_to_gang:
handle_gang(self.engine, current_player, tile_to_gang, mode="an")
logger.info(f"玩家 {current_player} 杠了牌: {tile_to_gang}")
else:
logger.warning("杠动作无效,未满足条件")
elif action == max_hand_actions + 2: # 胡
if self.engine.state.can_win(
self.engine.state.hands[current_player],
self.engine.state.melds[current_player],
self.engine.state.missing_suits[current_player]
):
handle_win(self.engine, current_player, None, None)
logger.info(f"玩家 {current_player} 胡牌!")
else:
logger.warning("胡动作无效,未满足条件")
# **更新玩家轮次**
if not self.engine.game_over: # 确保游戏未结束时才轮转玩家
self.engine.state.current_player = (current_player + 1) % 4
# **更新状态**
obs = self._get_observation()
# **奖励设计**
reward = self._calculate_reward(current_player)
# **检查游戏是否结束**
self.engine.check_game_over()
done = self.engine.game_over
# **返回值**
info = {
"player": current_player,
"action": action,
}
return obs, reward, done, info
def _get_observation(self):
"""
提取当前玩家的观察空间
:return: dict
"""
player_index = self.engine.state.current_player
hand = np.zeros(108, dtype=np.int32)
melds = np.zeros(108, dtype=np.int32)
discard_pile = np.zeros(108, dtype=np.int32)
# 填充手牌、明牌和弃牌信息
for tile, count in self.engine.state.hands[player_index].tile_count.items():
hand[tile.index] = count
for meld in self.engine.state.melds[player_index]:
melds[meld.tile.index] += meld.count
for tile in self.engine.state.discards[player_index]:
discard_pile[tile.index] += 1
return {
"hand": hand,
"melds": melds,
"discard_pile": discard_pile,
"dealer": self.engine.state.current_player,
}
def _calculate_reward(self, current_player):
"""
奖励设计:基于分数变化
:return: float
"""
return self.engine.state.scores[current_player] - 100
def _get_tile_for_special_action(self, action_type):
"""
获取可碰、杠、胡的牌
:param action_type: "peng", "gang", "win"
:return: tile or None
"""
if action_type == "peng":
for tile, count in self.engine.state.hands[self.engine.state.current_player].tile_count.items():
if count == 2: # 碰需要两张相同的牌
return tile
elif action_type == "gang":
for tile, count in self.engine.state.hands[self.engine.state.current_player].tile_count.items():
if count == 4: # 杠需要四张相同的牌
return tile
elif action_type == "win":
if self.engine.state.can_win(
self.engine.state.hands[self.engine.state.current_player],
self.engine.state.melds[self.engine.state.current_player],
self.engine.state.missing_suits[self.engine.state.current_player]
):
return True
return None
def get_action_space(self):
"""
动态计算当前合法的动作空间。
返回一个合法动作的列表,其中:
- 0 到 len(hand.tiles) - 1 表示打出手牌的索引。
- len(hand.tiles) 表示碰动作。
- len(hand.tiles) + 1 表示杠动作。
- len(hand.tiles) + 2 表示胡动作。
"""
current_player = self.engine.state.current_player
hand = self.engine.state.hands[current_player]
valid_actions = []
# 打牌动作
valid_actions.extend(range(len(hand.tiles)))
# 特殊动作
if self._can_peng(current_player):
valid_actions.append(len(hand.tiles)) # 碰
if self._can_gang(current_player):
valid_actions.append(len(hand.tiles) + 1) # 杠
if self._can_hu(current_player):
valid_actions.append(len(hand.tiles) + 2) # 胡
return valid_actions
# 辅助函数判断特殊动作是否可执行
def _can_peng(self, player):
"""
判断玩家是否可以碰。
"""
for tile, count in self.engine.state.hands[player].tile_count.items():
if count >= 2: # 至少两张相同的牌
return True
return False
def _can_gang(self, player):
"""
判断玩家是否可以杠。
"""
for tile, count in self.engine.state.hands[player].tile_count.items():
if count == 4: # 有四张相同的牌
return True
return False
def _can_hu(self, player):
"""
判断玩家是否可以胡牌。
"""
return self.engine.state.can_win(
self.engine.state.hands[player],
self.engine.state.melds[player],
self.engine.state.missing_suits[player]
)

View File

@@ -1,63 +0,0 @@
import gym
from gym import spaces
import numpy as np
from src.engine.chengdu_mahjong_state import ChengduMahjongState
class ChengduMahjongEnv(gym.Env):
def __init__(self):
super().__init__()
self.state = ChengduMahjongState()
self.action_space = spaces.Discrete(5) # 0: 出牌, 1: 碰, 2: 杠, 3: 胡, 4: 过
self.observation_space = spaces.Dict({
"hand": spaces.Box(low=0, high=4, shape=(108,), dtype=np.int32), # 手牌数量
"melds": spaces.Box(low=0, high=4, shape=(108,), dtype=np.int32), # 明牌数量
"discard_pile": spaces.Box(low=0, high=4, shape=(108,), dtype=np.int32), # 弃牌数量
"dealer": spaces.Discrete(4), # 当前庄家
})
self.reset()
def reset(self):
"""重置游戏状态"""
self.state.reset() # 初始化游戏状态
return self._get_observation()
def step(self, action):
reward = 0
done = False
if action == 0: # 出牌
self.state.discard()
elif action == 1: # 碰
self.state.peng()
elif action == 2: # 杠
self.state.kong()
elif action == 3: # 胡
reward, done = self.state.win()
elif action == 4: # 过
self.state.pass_turn()
# 检查游戏是否结束
done = done or self.state.is_game_over()
return self._get_observation(), reward, done, {}
def _get_observation(self):
"""获取玩家当前的观察空间"""
player_index = self.state.current_player
hand = np.zeros(108, dtype=np.int32)
melds = np.zeros(108, dtype=np.int32)
discard_pile = np.zeros(108, dtype=np.int32)
# 填充手牌、明牌和弃牌信息
for tile, count in self.state.hands[player_index].tile_count.items():
hand[tile.index] = count
for meld in self.state.melds[player_index]:
melds[meld.tile.index] += meld.count
for tile in self.state.discards[player_index]:
discard_pile[tile.index] += 1
return {
"hand": hand,
"melds": melds,
"discard_pile": discard_pile,
"dealer": self.state.current_player
}

View File

@@ -0,0 +1,144 @@
import gym
import numpy as np
from gym import spaces
from src.engine.dizhu.dizhu_engine import DiZhuEngine # 引入斗地主引擎
from loguru import logger
class DouDiZhuEnv(gym.Env):
def __init__(self):
super(DouDiZhuEnv, self).__init__()
self.engine = DiZhuEngine() # 初始化斗地主引擎
# 定义初始动作空间和观察空间
self.action_space = spaces.Discrete(55) # 假设最大动作空间为 55
self.observation_space = spaces.Dict({
"hand_cards": spaces.Box(low=0, high=1, shape=(54,), dtype=np.int32), # 玩家手牌(独热编码)
"history": spaces.Box(low=0, high=1, shape=(54,), dtype=np.int32), # 出牌历史
"current_pile": spaces.Box(low=0, high=1, shape=(54,), dtype=np.int32), # 当前牌面上的牌
"current_player": spaces.Discrete(3), # 当前玩家索引
})
def reset(self):
"""重置游戏环境"""
self.engine.reset()
logger.info("斗地主环境已重置")
return self._get_observation()
def step(self, action):
"""执行动作并更新环境"""
try:
reward = 0 # 初始化奖励
current_player = self.engine.get_current_player()
if action == 0: # 过牌
self.engine.step("pass")
reward -= 0.5 # 对频繁过牌给予轻微惩罚
else:
# 玩家选择出牌
action_cards = self._decode_action(action) # 解码动作为具体的牌型
# 出牌前的手牌数量
previous_hand_count = len(current_player.hand_cards)
# 尝试执行动作
self.engine.step(action_cards)
# 出牌后的手牌数量
current_hand_count = len(current_player.hand_cards)
# 根据减少的手牌数量计算奖励
reward += (previous_hand_count - current_hand_count) * 1.0
# 检查游戏是否结束
done = self.engine.game_over
if done:
reward += 10 # 胜利时给予较大的奖励
logger.info(f"游戏结束!胜利玩家: {self.engine.current_player_index + 1}")
return self._get_observation(), reward, done, {}
except ValueError as e:
return self._get_observation(), -5, False, {"error": str(e)}
def _get_observation(self):
"""获取当前玩家的观察空间"""
current_player = self.engine.get_current_player()
# 手牌的独热编码
hand_cards = np.zeros(54, dtype=np.int32)
for card in current_player.hand_cards:
hand_cards[card] = 1
# 出牌历史的独热编码
history = np.zeros(54, dtype=np.int32)
for play in current_player.history:
if isinstance(play, list): # 如果是列表
for card in play:
history[card] = 1
elif isinstance(play, int): # 如果是单个牌
history[play] = 1
# 当前牌面的独热编码
current_pile = np.zeros(54, dtype=np.int32)
if self.engine.current_pile:
for card in self.engine.current_pile:
current_pile[card] = 1
return {
"hand_cards": hand_cards,
"history": history,
"current_pile": current_pile,
"current_player": self.engine.current_player_index,
}
def _decode_action(self, action):
"""
解码动作为具体的牌型。
:param action: 动作索引
:return: 解码后的牌型
"""
valid_actions = self.get_action_space()
if action < len(valid_actions):
return valid_actions[action]
else:
raise ValueError(f"非法动作索引: {action}")
def render(self, mode="human"):
"""打印当前游戏状态"""
state = self.engine.get_game_state()
print(state)
def get_action_space(self):
"""
动态生成当前合法的动作空间。
返回一个合法动作的列表,其中:
- 索引 0 表示过牌。
- 其余索引表示具体的出牌动作。
"""
current_player = self.engine.get_current_player()
hand_cards = current_player.hand_cards
# 所有合法出牌组合
valid_actions = [["pass"]] # 索引 0 表示过牌
valid_actions.extend(self._generate_valid_combinations(hand_cards))
return valid_actions
def _generate_valid_combinations(self, hand_cards):
"""
根据手牌生成所有合法牌型组合。
:param hand_cards: 当前玩家的手牌
:return: 所有合法的牌型组合
"""
from itertools import combinations
valid_combinations = []
# 示例:生成单牌、对子和三张的合法组合
for i in range(1, len(hand_cards) + 1):
for combo in combinations(hand_cards, i):
if self.engine.is_valid_play(list(combo)): # 检查是否为合法牌型
valid_combinations.append(list(combo))
return valid_combinations

View File

@@ -1,8 +1,8 @@
from src.engine.chengdu_mahjong_state import ChengduMahjongState
from src.engine.hand import Hand
from src import ChengduMahjongState
from src import Hand
from src.engine.mahjong_tile import MahjongTile
from src.engine.meld import Meld
from src import MahjongTile
from src import Meld
hand = Hand()

35
test_dizhu.py Normal file
View File

@@ -0,0 +1,35 @@
from stable_baselines3 import PPO
from src.environment.dizhu_env import DouDiZhuEnv # 导入斗地主环境
from loguru import logger
def test_dizhu_model(): # 确保函数名以 test_ 开头
# 创建斗地主环境
env = DouDiZhuEnv()
# 加载已训练的模型
model_path = "./models/ppo_doudizhu_model.zip" # 确保路径正确
logger.info(f"加载模型: {model_path}")
try:
model = PPO.load(model_path)
except Exception as e:
logger.error(f"加载模型失败: {e}")
return
# 测试模型
obs = env.reset()
done = False
total_reward = 0
logger.info("开始测试斗地主模型...")
max_steps = 1000 # 设置最大步数
step_count = 0
while not done and step_count < max_steps:
action, _ = model.predict(obs, deterministic=True)
obs, reward, done, info = env.step(action)
total_reward += reward
step_count += 1
logger.info(f"动作: {action}, 奖励: {reward}, 是否结束: {done}, 信息: {info}")
logger.info(f"测试完成,总奖励: {total_reward}")

0
tests/dizhu/__init__.py Normal file
View File

View File

View File

@@ -1,8 +1,7 @@
import pytest
from src.engine.calculate_fan import calculate_fan, is_seven_pairs, is_cleared, is_big_pairs
from src import calculate_fan, is_seven_pairs, is_cleared, is_big_pairs
from src.engine.hand import Hand
from src.engine.mahjong_tile import MahjongTile
from src import Hand
from src import MahjongTile
# 测试用例

View File

@@ -0,0 +1,48 @@
from src import ChengduMahjongEngine
from loguru import logger
def test_mahjong_engine():
"""
测试成都麻将引擎,包括初始化、发牌、轮次逻辑等。
"""
# 初始化麻将引擎
engine = ChengduMahjongEngine()
# 初始化游戏
engine.initialize_game()
# 发牌
engine.deal_tiles()
# 检查发牌后的状态
logger.info(f"庄家: 玩家 {engine.state.current_player}")
for player in range(4):
hand = engine.state.hands[player]
logger.info(f"玩家 {player} 的手牌: {hand}")
logger.info(f"玩家 {player} 的缺门: {engine.state.missing_suits[player]}")
# 模拟游戏主循环
try:
engine.run()
except Exception as e:
logger.error(f"测试引擎时出错: {e}")
# 打印游戏结束后的状态
logger.info("游戏结束!")
for player in range(4):
logger.info(
f"玩家 {player}: 分数={engine.state.scores[player]}, "
f"手牌数量={len(engine.state.hands[player].tiles)}, 明牌数量={len(engine.state.melds[player])}, "
f"缺门={engine.state.missing_suits[player]}, 手牌={engine.state.hands[player]}, 明牌={engine.state.melds[player]}"
)
# 记录赢家信息
if engine.state.winners:
logger.info(f"赢家: {engine.state.winners}")
else:
logger.info("没有赢家!")
# 运行测试
if __name__ == "__main__":
test_mahjong_engine()

View File

@@ -1,7 +1,7 @@
from src.engine.chengdu_mahjong_state import ChengduMahjongState
from src.engine.hand import Hand
from src.engine.mahjong_tile import MahjongTile
from src.engine.meld import Meld
from src import ChengduMahjongState
from src import Hand
from src import MahjongTile
from src import Meld
def test_set_missing_suit():

View File

@@ -1,7 +1,7 @@
from src.engine.hand import Hand
from src.engine.mahjong_tile import MahjongTile
from src.engine.fan_type import is_basic_win,is_cleared,calculate_terminal_fan,is_seven_pairs,is_full_request,is_dragon_seven_pairs
from src.engine.meld import Meld
from src import Hand
from src import MahjongTile
from src import is_basic_win,is_cleared,is_terminal_fan,is_seven_pairs,is_full_request,is_dragon_seven_pairs
from src import Meld
def test_is_basic_win():
"""
@@ -100,7 +100,7 @@ def test_calculate_terminal_fan():
hand.add_tile(MahjongTile("", 5))
hand.add_tile(MahjongTile("", 5))
melds = []
assert calculate_terminal_fan(hand, melds) == 3, "测试失败:基本带幺九应为 3 番"
assert is_terminal_fan(hand, melds) == 3, "测试失败:基本带幺九应为 3 番"
def test_is_seven_pairs():
"""测试七对番型"""

View File

@@ -1,5 +1,5 @@
from src.engine.hand import Hand
from src.engine.mahjong_tile import MahjongTile
from src import Hand
from src import MahjongTile
def test_add_tile():

View File

@@ -1,4 +1,4 @@
from src.engine.mahjong_tile import MahjongTile
from src import MahjongTile
def test_mahjong_tile():
# 测试合法的牌

View File

@@ -1,5 +1,5 @@
import pytest
from src.engine.scoring import calculate_score
from src import calculate_score
@pytest.mark.parametrize("fan, is_self_draw, base_score, expected_scores", [
# 测试用例 1: 自摸,总番数 3

View File

@@ -1,4 +1,4 @@
from src.engine.utils import get_suit,get_tile_name
from src import get_suit,get_tile_name
def test_get_suit():
# 测试条花色0-35

0
tests/models/__init__.py Normal file
View File

View File

@@ -1,96 +0,0 @@
def test_draw_tile():
from src.engine.chengdu_mahjong_engine import ChengduMahjongEngine
engine = ChengduMahjongEngine()
initial_remaining = engine.state.remaining_tiles
tile = engine.draw_tile()
# 验证牌堆数量减少
assert engine.state.remaining_tiles == initial_remaining - 1, "牌堆数量未正确减少"
# 验证牌已加入当前玩家手牌
assert engine.state.hands[engine.state.current_player][tile] > 0, "摸牌未加入玩家手牌"
print(f"test_draw_tile passed: 摸到了 {tile}")
def test_discard_tile():
from src.engine.chengdu_mahjong_engine import ChengduMahjongEngine
engine = ChengduMahjongEngine()
tile = engine.draw_tile() # 玩家先摸牌
engine.discard_tile(tile) # 打出摸到的牌
# 验证手牌数量减少
assert engine.state.hands[engine.state.current_player][tile] == 0, "手牌未正确移除"
# 验证牌加入了牌河
assert tile in engine.state.discards[engine.state.current_player], "牌未正确加入牌河"
print(f"test_discard_tile passed: 打出了 {tile}")
def test_set_missing_suit():
from src.engine.chengdu_mahjong_state import ChengduMahjongState
state = ChengduMahjongState()
player = 0
missing_suit = ""
state.set_missing_suit(player, missing_suit)
# 验证缺门是否正确设置
assert state.missing_suits[player] == missing_suit, "缺门设置错误"
print(f"test_set_missing_suit passed: 缺门设置为 {missing_suit}")
def test_can_win():
from src.engine.chengdu_mahjong_state import ChengduMahjongState
state = ChengduMahjongState()
hand = [0] * 108
hand[0] = 2 # 两张1条对子
hand[3] = 1 # 2条
hand[4] = 1 # 3条
hand[5] = 1 # 4条
hand[10] = 1 # 5条
hand[11] = 1 # 6条
hand[12] = 1 # 7条
hand[20] = 1 # 8条
hand[21] = 1 # 9条
hand[22] = 1 # 1筒
hand[30] = 1 # 2筒
hand[31] = 1 # 3筒
hand[32] = 1 # 4筒
result = state.can_win(hand)
assert result is True, "胡牌判断失败"
print(f"test_can_win passed: 胡牌条件正确")
def test_peng():
from src.engine.chengdu_mahjong_engine import ChengduMahjongEngine
engine = ChengduMahjongEngine()
tile = 5 # 模拟手牌中有3张牌
engine.state.hands[engine.state.current_player][tile] = 3
engine.peng(tile)
# 验证手牌减少
assert engine.state.hands[engine.state.current_player][tile] == 1, "碰牌后手牌数量错误"
# 验证明牌记录
assert ("peng", tile) in engine.state.melds[engine.state.current_player], "碰牌未正确记录"
print(f"test_peng passed: 碰牌成功")
def test_gang():
from src.engine.chengdu_mahjong_engine import ChengduMahjongEngine
engine = ChengduMahjongEngine()
tile = 10 # 模拟手牌中有4张牌
engine.state.hands[engine.state.current_player][tile] = 4
engine.gang(tile, mode="an")
# 验证手牌减少
assert engine.state.hands[engine.state.current_player][tile] == 0, "杠牌后手牌数量错误"
# 验证明牌记录
assert ("an_gang", tile) in engine.state.melds[engine.state.current_player], "杠牌未正确记录"
print(f"test_gang passed: 杠牌成功")