Compare commits

...

33 Commits

Author SHA1 Message Date
e85e7d9096 Update README.md 2024-12-09 22:01:29 +08:00
798d1af835 Update README.md 2024-12-09 21:59:59 +08:00
0d723495ce Update README.md 2024-12-02 15:38:39 +08:00
4a9f45b2df Update README.md 2024-12-02 13:20:57 +08:00
96353480be Merge pull request 'dev' (#1) from dev into main
Reviewed-on: #1
2024-11-30 22:25:00 +08:00
7f06b5648e Delete .gitignore 2024-11-30 22:24:30 +08:00
0dd368cc33 Create .gitignore 2024-11-30 22:23:00 +08:00
e58e890ccb 1
1
2024-11-30 22:18:55 +08:00
ee8bf46701 1
1
2024-11-30 22:07:38 +08:00
4142bd9423 1
1
2024-11-30 21:04:03 +08:00
fd6006b186 1 2024-11-30 20:43:21 +08:00
7d040f7e40 Update README.md 2024-11-30 20:31:38 +08:00
9d95edfa11 1
1
2024-11-30 20:02:04 +08:00
b78d6a17a4 1
1
2024-11-30 19:27:17 +08:00
3487c805d4 Update chengdu_majiang_env.py 2024-11-30 19:20:30 +08:00
9f7a22be7f 1 2024-11-30 19:15:24 +08:00
2a5680fae9 1
1
2024-11-30 18:28:19 +08:00
14c811f6b9 1
1
2024-11-30 18:14:20 +08:00
7632edd0e3 Create chengdu_majiang_env.py 2024-11-30 17:48:04 +08:00
6c6fdff706 Update actions.py 2024-11-30 17:37:24 +08:00
42a6320ce8 Update test_scoring.py 2024-11-30 17:35:56 +08:00
c041963b97 Update scoring.py 2024-11-30 17:35:53 +08:00
6e0c8a80f3 1 2024-11-30 17:17:28 +08:00
5d660ff39a 1 2024-11-30 17:11:12 +08:00
4dbbd583b9 1
1
2024-11-30 17:08:43 +08:00
349a2ff088 1
1
2024-11-30 17:01:49 +08:00
211189a776 Update test_calculate_fan.py 2024-11-30 16:57:29 +08:00
4e563ae946 Update calculate_fan.py 2024-11-30 16:55:56 +08:00
066436b4e5 Update calculate_fan.py 2024-11-30 16:53:47 +08:00
705afd596b Update calculate_fan.py 2024-11-30 16:52:18 +08:00
54a9161c0a Update calculate_fan.py 2024-11-30 16:50:38 +08:00
0c8a2ff668 Update test_calculate_fan.py 2024-11-30 16:44:57 +08:00
8ad0177ee0 Update calculate_fan.py 2024-11-30 16:44:55 +08:00
21 changed files with 830 additions and 180 deletions

View File

@@ -101,13 +101,13 @@
3. **带幺九**
- **带幺九**指玩家手上的牌全部是由1和9组成的顺子、刻子或对子。例如123, 789, 111, 999, 11等。计为3番。
- **带幺九**指玩家手上的牌全部是由1和9组成的顺子、刻子或对子。例如123, 789, 111, 999, 11等。计为3番。<!--存疑-->
- **清带幺九**指玩家手上的牌不仅全部由1和9组成而且是同一花色条、筒、万即清一色的带幺九。计为1番。<!--存疑-->
4. **七对**手牌由7个对子组成计为2番。
5. **全求人**:所有牌都是通过碰、杠、吃别人打出的牌来完成的计为6番。
5. **全求人**所有牌都是通过碰、杠别人打出的牌来完成的计为6番。
6. **龙七对**七对中有一对是三张相同的牌计为12番。
@@ -142,4 +142,83 @@
## 成都麻将规则建模
麻将游戏引擎建模代码于项目根src/engine/目录下
麻将游戏引擎建模代码于项目根src/engine/目录下
## PPOProximal Policy Optimization算法
TensorBoard 通常会记录和可视化多种训练指标。你提到的这些图表代表了 PPO 训练过程中的不同方面。下面是对每个图表的解释:
### 1. `train/loss`
- **含义**:总损失函数值。
- **用途**:这是所有损失项的综合,包括策略梯度损失、价值函数损失等。通过观察这个指标,可以了解整个训练过程中的总体损失趋势。
### 2. `train/policy_gradient_loss`
- **含义**:策略梯度损失。
- **用途**这表示策略网络更新时的损失。PPO 通过剪裁来限制策略更新的幅度,以确保稳定的学习过程。观察这个指标可以帮助你了解策略更新的情况。
### 3. `train/value_loss`
- **含义**:价值函数损失。
- **用途**:这表示价值网络(用于估计状态值或状态-动作对的价值)的损失。价值函数的准确性对于评估策略的好坏非常重要。通过观察这个指标,可以了解价值网络的学习情况。
### 4. `train/learning_rate`
- **含义**:当前的学习率。
- **用途**:学习率是优化器的一个重要超参数,控制着每次更新时权重调整的幅度。观察学习率的变化可以帮助你了解学习率调度策略的效果。
### 5. `train/explained_variance`
- **含义**:解释方差。
- **用途**:这是一个衡量价值函数预测与实际回报之间差异的指标。解释方差越接近 1说明价值函数的预测越准确。通过观察这个指标可以评估价值函数的性能。
### 6. `train/entropy_loss`
- **含义**:熵损失。
- **用途**:熵损失鼓励策略具有一定的随机性,以防止过早收敛到局部最优解。通过观察这个指标,可以了解策略的探索程度。
### 7. `train/clip_range`
- **含义**:剪裁范围。
- **用途**PPO 使用剪裁来限制策略更新的幅度,以确保稳定性。剪裁范围是一个重要的超参数,决定了剪裁的严格程度。观察这个指标可以帮助你了解剪裁策略的效果。
### 8. `train/clip_fraction`
- **含义**:被剪裁的比例。
- **用途**:这个指标表示有多少比例的更新被剪裁。如果剪裁比例很高,可能意味着你的策略更新过于激进,需要调整剪裁范围或其他超参数。
### 9. `train/approx_kl`
- **含义**:近似 KL 散度。
- **用途**KL 散度衡量新旧策略之间的差异。PPO 通过控制 KL 散度来确保策略更新的稳定性。通过观察这个指标,可以了解策略更新的幅度和稳定性。
### 总结
这些图表提供了关于 PPO 训练过程的全面视图,帮助你监控和调试模型。以下是每个图表的主要用途:
- **`train/loss`**:总体损失,反映训练的整体进展。
- **`train/policy_gradient_loss`**:策略梯度损失,反映策略网络的更新情况。
- **`train/value_loss`**:价值函数损失,反映价值网络的学习情况。
- **`train/learning_rate`**:学习率,反映优化器的设置。
- **`train/explained_variance`**:解释方差,反映价值函数的准确性。
- **`train/entropy_loss`**:熵损失,反映策略的探索程度。
- **`train/clip_range`**:剪裁范围,反映策略更新的限制。
- **`train/clip_fraction`**:被剪裁的比例,反映策略更新的稳定性。
- **`train/approx_kl`**:近似 KL 散度,反映策略更新的幅度和稳定性。
## 参考
https://github.com/mangenotwork/CLI-Sichuan-Mahjong //golang命令行麻将
https://github.com/lauyikfung/SichuaMahjongAI //SichuaMahjongAI
https://github.com/risseraka/node-sichuan-mahjong //nodejs
https://github.imc.re/latorc/MahjongCopilot //麻将 AI 助手,基于 mjai (Mortal模型) 实现的机器人。
https://github.com/kennyzhang0819/Sichuan-Mahjong-AI-Testbed // Java 完整实现的四川麻将游戏的源代码

0
configs/application.yaml Normal file
View File

View File

@@ -1,4 +1,14 @@
# configs/log_config.py
from loguru import logger
import os
# 配置日志
logger.add("mahjong_ai_{time}.log", rotation="10 MB", level="DEBUG", format="{time} {level} {message}")
def setup_logging():
# 确保日志目录存在
log_dir = "../logs"
os.makedirs(log_dir, exist_ok=True)
# 清除所有现有日志处理器,防止重复配置
logger.remove()
# 配置日志,记录到 ../logs 目录下
logger.add(os.path.join(log_dir, "chengdu_mj_engine.log"), rotation="10 MB", level="DEBUG", format="{time} {level} {message}")

Binary file not shown.

View File

@@ -1 +1,4 @@
pytest tests/
loguru~=0.7.2
pytest~=8.3.3
gym~=0.26.2
numpy~=2.1.3

View File

@@ -0,0 +1,35 @@
import gym
from stable_baselines3 import PPO
from src.environment.chengdu_majiang_env import MahjongEnv
import torch
from configs.log_config import setup_logging
def train_model():
# 创建 MahjongEnv 环境实例
env = MahjongEnv()
# 检查是否有可用的GPU
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"使用设备: {device}")
# 使用 PPO 算法训练模型
model = PPO("MlpPolicy", env, verbose=1, tensorboard_log="../logs/ppo_mahjong_tensorboard/", device=device)
# 训练模型训练总步数为100000
model.learn(total_timesteps=100)
# 保存训练后的模型
model.save("../models/ppo_mahjong_model")
# 测试模型
obs = env.reset()
done = False
while not done:
action, _states = model.predict(obs) # 使用训练好的模型来选择动作
obs, reward, done, info = env.step(action) # 执行动作
env.render() # 打印环境状态
if __name__ == "__main__":
# 调用配置函数来设置日志
setup_logging()
train_model()

View File

@@ -1,58 +1,124 @@
from loguru import logger
from utils import get_tile_name # 确保 get_tile_name 已在 utils.py 中定义并导入
from src.engine.utils import get_tile_name
def draw_tile(state):
def draw_tile(engine):
"""
当前玩家摸牌的动作逻辑。
当前玩家摸牌逻辑,记录牌的详细信息和游戏状态
"""
if engine.state.remaining_tiles == 0:
logger.warning("牌堆已空,游戏结束!")
engine.game_over = True
return 0, True # 游戏结束时返回 0 和 done = True
tile = engine.state.deck.pop(0) # 从牌堆中取出一张牌
engine.state.remaining_tiles -= 1 # 更新剩余牌数
engine.state.hands[engine.state.current_player][tile] += 1 # 加入当前玩家手牌
tile_name = get_tile_name(tile) # 获取具体的牌名
logger.info(
f"玩家 {engine.state.current_player} 摸到一张牌: {tile_name}(索引 {tile})。剩余牌堆数量: {engine.state.remaining_tiles}"
)
# 返回奖励和游戏是否结束的标志
return 0, False # 奖励为 0done 为 False游戏继续
def discard_tile(self, tile):
"""
当前玩家打牌逻辑,记录打出的牌和当前牌河信息。
"""
if self.state.hands[self.state.current_player][tile] == 0:
logger.error(f"玩家 {self.state.current_player} 尝试打出不存在的牌: 索引 {tile}")
raise ValueError("玩家没有这张牌")
self.state.hands[self.state.current_player][tile] -= 1 # 从手牌中移除
self.state.discards[self.state.current_player].append(tile) # 加入牌河
tile_name = get_tile_name(tile) # 获取具体的牌名
logger.info(
f"玩家 {self.state.current_player} 打出一张牌: {tile_name}(索引 {tile})。当前牌河: {[get_tile_name(t) for t in self.state.discards[self.state.current_player]]}"
)
def peng(self, tile):
"""
当前玩家碰牌逻辑,记录碰牌操作和手牌状态。
"""
player = self.state.current_player
if self.state.hands[player][tile] < 2:
logger.error(f"玩家 {player} 尝试碰牌失败: {get_tile_name(tile)}(索引 {tile}")
raise ValueError("碰牌条件不满足")
self.state.hands[player][tile] -= 2 # 减去两张牌
self.state.melds[player].append(("peng", tile)) # 加入明牌列表
tile_name = get_tile_name(tile)
logger.info(f"玩家 {player} 碰了一张牌: {tile_name}(索引 {tile})。当前明牌: {self.state.melds[player]}")
def gang(self, tile, mode):
"""
当前玩家杠牌逻辑,记录杠牌类型和状态更新。
"""
player = self.state.current_player
tile_name = get_tile_name(tile)
if mode == "ming" and self.state.hands[player][tile] == 3:
self.state.hands[player][tile] -= 3
self.state.melds[player].append(("ming_gang", tile))
logger.info(f"玩家 {player} 明杠: {tile_name}(索引 {tile}")
self.state.scores[player] += 1 # 奖励1分
logger.info(f"玩家 {player} 因明杠获得1分")
elif mode == "an" and self.state.hands[player][tile] == 4:
self.state.hands[player][tile] -= 4
self.state.melds[player].append(("an_gang", tile))
logger.info(f"玩家 {player} 暗杠: {tile_name}(索引 {tile}")
self.state.scores[player] += 1 # 奖励1分
logger.info(f"玩家 {player} 因暗杠获得1分")
else:
logger.error(f"玩家 {player} 尝试杠牌失败: {tile_name}(索引 {tile}),条件不满足")
raise ValueError("杠牌条件不满足")
def check_blood_battle(self):
"""
检查游戏是否流局或血战结束,记录状态。
"""
if any(score <= 0 for score in self.state.scores):
logger.info(f"游戏结束某玩家分数小于等于0: {self.state.scores}")
self.game_over = True
if len(self.state.winners) >= 3 or self.state.remaining_tiles == 0:
logger.info(f"游戏结束,赢家列表: {self.state.winners}")
self.game_over = True
def set_missing_suit(player, missing_suit, game_state):
"""
玩家设置缺门的动作。
参数:
- state: ChengduMahjongState 实例,表示当前游戏状态
返回:
- tile: 当前玩家摸到的牌的索引。
- player: 玩家索引0-3
- missing_suit: 玩家选择的缺门("""""")。
- game_state: 当前的游戏状态(`ChengduMahjongState` 实例)。
异常:
- ValueError: 如果牌堆已经空了(流局条件)
- ValueError: 如果缺门设置无效
"""
if state.remaining_tiles == 0:
logger.warning("牌堆已空,无法摸牌!")
raise ValueError("牌堆已空") # 牌堆为空时不能摸牌
valid_suits = ["", "", ""]
if missing_suit not in valid_suits:
logger.error(f"玩家 {player} 尝试设置无效的缺门: {missing_suit}")
raise ValueError("缺门设置无效")
tile = state.deck.pop(0) # 从牌堆顶取出一张牌
state.remaining_tiles -= 1 # 更新牌堆剩余数量
state.hands[state.current_player][tile] += 1 # 将摸到的牌添加到当前玩家的手牌中
if game_state.missing_suits[player] is not None:
logger.error(f"玩家 {player} 已经设置了缺门,不能重复设置")
raise ValueError("缺门已经设置,不能重复设置")
tile_name = get_tile_name(tile) # 获取牌的具体名称
logger.info(
f"玩家 {state.current_player} 摸到一张牌: {tile_name}(索引 {tile})。剩余牌堆数量: {state.remaining_tiles}"
)
return tile
game_state.missing_suits[player] = missing_suit
logger.info(f"玩家 {player} 设置缺门为: {missing_suit}")
def discard_tile(state, tile):
"""
当前玩家打出一张牌的动作逻辑。
参数:
- state: ChengduMahjongState 实例,表示当前游戏状态。
- tile: 玩家想要打出的牌的索引。
操作:
- 从当前玩家手牌中移除指定的牌。
- 将指定牌添加到当前玩家的牌河中。
异常:
- ValueError: 如果当前玩家的手牌中没有这张牌。
"""
if state.hands[state.current_player][tile] == 0:
logger.error(f"玩家 {state.current_player} 尝试打出不存在的牌: 索引 {tile}")
raise ValueError("玩家没有这张牌") # 防止打出不存在的牌
state.hands[state.current_player][tile] -= 1 # 从手牌中移除该牌
state.discards[state.current_player].append(tile) # 将牌添加到牌河
tile_name = get_tile_name(tile) # 获取牌的具体名称
logger.info(
f"玩家 {state.current_player} 打出一张牌: {tile_name}(索引 {tile})。当前牌河: {[get_tile_name(t) for t in state.discards[state.current_player]]}"
)
return game_state.missing_suits[player]

View File

@@ -16,36 +16,41 @@ def calculate_fan(hand, melds, is_self_draw, is_cleared, conditions):
# 定义番种规则
rules = {
"is_pure_cleared": lambda: 3 if is_cleared and len(melds) == 3 and not conditions.get("is_double_pure_cleared",
False) else 0,
"is_double_pure_cleared": lambda: 4 if is_cleared and len(melds) >= 2 and conditions.get(
"is_double_pure_cleared", False) else 0,
"is_cleared": lambda: 2 if is_cleared and not (conditions.get("is_pure_cleared", False) or
conditions.get("is_double_pure_cleared", False) or
conditions.get("is_clear_seven_pairs", False)) else 0,
"is_seven_pairs": lambda: 2 if conditions.get("is_seven_pairs", False) and not conditions.get(
"is_dragon_seven_pairs", False) else 0,
"is_dragon_seven_pairs": lambda: 12 if conditions.get("is_dragon_seven_pairs", False) else 0,
"is_clear_seven_pairs": lambda: 12 if conditions.get("is_clear_seven_pairs", False) else 0,
"is_big_pairs": lambda: 2 if conditions.get("is_big_pairs", False) else 0,
"is_small_pairs": lambda: 2 if conditions.get("is_small_pairs", False) else 0,
"is_full_request": lambda: 6 if conditions.get("is_full_request", False) else 0,
"is_gang_flower": lambda: 1 if conditions.get("is_gang_flower", False) else 0,
"is_rob_gang": lambda: 1 if conditions.get("is_rob_gang", False) else 0,
"is_under_the_sea": lambda: 1 if conditions.get("is_under_the_sea", False) else 0,
"is_tian_hu": lambda: 12 if conditions.get("is_tian_hu", False) else 0,
"is_di_hu": lambda: 12 if conditions.get("is_di_hu", False) else 0,
"basic_win": lambda: 1 if not (conditions.get("is_seven_pairs", False) or
conditions.get("is_big_pairs", False) or
conditions.get("is_dragon_seven_pairs", False) or
conditions.get("is_pure_cleared", False) or
conditions.get("is_double_pure_cleared", False)) else 0,
"is_cleared": lambda: 2 if is_cleared and not (conditions.get("is_pure_cleared", False) or
conditions.get("is_double_pure_cleared", False)) else 0,
"is_pure_cleared": lambda: 3 if is_cleared and len(melds) >= 1 and not conditions.get("is_double_pure_cleared",
False) else 0,
"is_double_pure_cleared": lambda: 4 if is_cleared and len(melds) >= 2 else 0,
# 极中极带两杠的清一色或清一色对子胡带杠计为4番称为“极中极”或“精品”点炮80分。
"is_seven_pairs": lambda: 2 if conditions.get("is_seven_pairs", False) and not conditions.get(
"is_dragon_seven_pairs", False) else 0,
# 七对手牌由7个对子组成计为2番。
"is_dragon_seven_pairs": lambda: 12 if conditions.get("is_dragon_seven_pairs", False) else 0,
# 龙七对七对中有一对是三张相同的牌计为12番。
"is_clear_seven_pairs": lambda: 12 if conditions.get("is_clear_seven_pairs", False) else 0,
# 清七对全部由一种花色组成的七对计为12番。
"is_big_pairs": lambda: 2 if conditions.get("is_big_pairs", False) else 0, # 大对子手牌由四个对子加一个刻子组成计为2番。
"is_small_pairs": lambda: 2 if conditions.get("is_small_pairs", False) else 0, # 小七对有六对加上一个对子计为2番。
"is_full_request": lambda: 6 if conditions.get("is_full_request", False) else 0,
# 全求人所有牌都是通过碰、杠、吃别人打出的牌来完成的计为6番。
"is_gang_flower": lambda: 1 if conditions.get("is_gang_flower", False) else 0, # 杠上开花在杠牌之后立即自摸胡牌计为1番。
"is_rob_gang": lambda: 1 if conditions.get("is_rob_gang", False) else 0, # 抢杠胡当其他玩家明杠时你正好可以胡那张牌计为1番。
"is_under_the_sea": lambda: 1 if conditions.get("is_under_the_sea", False) else 0, # 海底捞月最后一张牌被玩家摸到并胡牌计为1番。
"is_tian_hu": lambda: 12 if conditions.get("is_tian_hu", False) else 0, # 天胡庄家起牌后直接胡牌计为12番。
"is_di_hu": lambda: 12 if conditions.get("is_di_hu", False) else 0, # 地胡闲家在第一轮打牌时就胡牌计为12番。
conditions.get("is_double_pure_cleared", False) or
conditions.get("is_small_pairs", False) or
conditions.get("is_clear_seven_pairs", False) or
conditions.get("is_full_request", False) or
conditions.get("is_rob_gang", False) or
conditions.get("is_under_the_sea", False) or
conditions.get("is_tian_hu", False) or
conditions.get("is_di_hu", False)) else 0,
}
# 逐一应用规则
print("\nCalculating fan...")
# 逐一应用规则
for rule, func in rules.items():
result = func()
@@ -62,13 +67,23 @@ def is_seven_pairs(hand):
return sum(1 for count in hand if count == 2) == 7
def is_cleared(hand):
def is_cleared(hand, melds):
"""
检查手牌是否是清一色。
"""
suits = [tile // 36 for tile, count in enumerate(hand) if count > 0]
return len(set(suits)) == 1
检查手牌和明牌是否是清一色。
参数:
- hand: 当前胡牌的手牌长度为108的列表表示每张牌的数量
- melds: 碰杠等明牌列表。
返回:
- bool: 是否为清一色。
"""
# 获取所有牌的花色
all_tiles = hand + [tile for meld in melds for tile in meld]
suits = [tile // 36 for tile in all_tiles if tile > 0]
# 检查是否有多种花色
return len(set(suits)) == 1
def is_big_pairs(hand):
"""

View File

@@ -1,86 +1,47 @@
from .game_state import ChengduMahjongState
from .utils import get_tile_name # 确保 utils 中有 get_tile_name 定义
import random
from loguru import logger
from .game_state import ChengduMahjongState
class ChengduMahjongEngine:
def __init__(self):
self.state = ChengduMahjongState()
self.state = ChengduMahjongState() # 创建游戏状态
self.game_over = False
self.game_started = False # 游戏是否已开始
self.deal_tiles() # 发牌
def draw_tile(self):
"""
当前玩家摸牌逻辑,记录牌的详细信息和游戏状态。
"""
if self.state.remaining_tiles == 0:
logger.warning("牌堆已空,游戏结束!")
self.game_over = True
return "牌堆已空"
def deal_tiles(self):
""" 发牌每个玩家发13张牌并设置缺门 """
logger.info("发牌中...")
tile = self.state.deck.pop(0) # 从牌堆中取出一张牌
self.state.remaining_tiles -= 1 # 更新剩余牌数
self.state.hands[self.state.current_player][tile] += 1 # 加入当前玩家手牌
# 洗牌(随机打乱牌堆)
random.shuffle(self.state.deck)
tile_name = get_tile_name(tile) # 获取具体的牌名
logger.info(
f"玩家 {self.state.current_player} 摸到一张牌: {tile_name}(索引 {tile})。剩余牌堆数量: {self.state.remaining_tiles}"
)
return tile
# 随机发牌给每个玩家
for player in range(4):
for _ in range(13): # 每个玩家13张牌
tile = self.state.deck.pop() # 从牌堆抽取一张牌
self.state.hands[player][tile] += 1 # 增加玩家手牌的计数
def discard_tile(self, tile):
"""
当前玩家打牌逻辑,记录打出的牌和当前牌河信息。
"""
if self.state.hands[self.state.current_player][tile] == 0:
logger.error(f"玩家 {self.state.current_player} 尝试打出不存在的牌: 索引 {tile}")
raise ValueError("玩家没有这张牌")
# 设置缺门:每个玩家定缺(这里假设我们让每个玩家的缺门都为“条”)
for player in range(4):
missing_suit = "" # 这里可以通过其他方式设置缺门,比如随机选择
self.state.set_missing_suit(player, missing_suit)
self.state.hands[self.state.current_player][tile] -= 1 # 从手牌中移除
self.state.discards[self.state.current_player].append(tile) # 加入牌河
tile_name = get_tile_name(tile) # 获取具体的牌名
logger.info(
f"玩家 {self.state.current_player} 打出一张牌: {tile_name}(索引 {tile})。当前牌河: {[get_tile_name(t) for t in self.state.discards[self.state.current_player]]}"
)
def peng(self, tile):
"""
当前玩家碰牌逻辑,记录碰牌操作和手牌状态。
"""
player = self.state.current_player
if self.state.hands[player][tile] < 2:
logger.error(f"玩家 {player} 尝试碰牌失败: {get_tile_name(tile)}(索引 {tile}")
raise ValueError("碰牌条件不满足")
self.state.hands[player][tile] -= 2 # 减去两张牌
self.state.melds[player].append(("peng", tile)) # 加入明牌列表
tile_name = get_tile_name(tile)
logger.info(f"玩家 {player} 碰了一张牌: {tile_name}(索引 {tile})。当前明牌: {self.state.melds[player]}")
def gang(self, tile, mode="ming"):
"""
当前玩家杠牌逻辑,记录杠牌类型和状态更新。
"""
player = self.state.current_player
tile_name = get_tile_name(tile)
if mode == "ming" and self.state.hands[player][tile] == 3:
self.state.hands[player][tile] -= 3
self.state.melds[player].append(("ming_gang", tile))
logger.info(f"玩家 {player} 明杠: {tile_name}(索引 {tile}")
elif mode == "an" and self.state.hands[player][tile] == 4:
self.state.hands[player][tile] -= 4
self.state.melds[player].append(("an_gang", tile))
logger.info(f"玩家 {player} 暗杠: {tile_name}(索引 {tile}")
def start_game(self):
""" 开始游戏 """
if not self.game_started:
self.game_started = True
logger.info("游戏开始!")
else:
logger.error(f"玩家 {player} 尝试杠牌失败: {tile_name}(索引 {tile}),条件不满足")
raise ValueError("杠牌条件不满足")
logger.warning("游戏已经开始,不能重复启动!")
def check_blood_battle(self):
"""
检查游戏是否流局或血战结束,记录状态。
"""
if len(self.state.winners) >= 3 or self.state.remaining_tiles == 0:
logger.info(f"游戏结束,赢家列表: {self.state.winners}")
def check_game_over(self):
""" 检查游戏是否结束 """
# 你可以根据游戏规则检查是否有玩家胡牌或其他结束条件
if len(self.state.deck) == 0:
self.game_over = True
logger.info("游戏结束!")

View File

@@ -4,7 +4,7 @@ from loguru import logger
class ChengduMahjongState:
def __init__(self):
# 每个玩家的手牌
# 每个玩家的手牌使用108个索引表示
self.hands = [[0] * 108 for _ in range(4)] # 每个玩家108张牌的计数
# 每个玩家的打出的牌
self.discards = [[] for _ in range(4)] # 每个玩家的弃牌列表
@@ -14,6 +14,8 @@ class ChengduMahjongState:
self.deck = list(range(108)) # 0-107 表示108张牌
# 当前玩家索引
self.current_player = 0
# 玩家分数
self.scores = [100, 100, 100, 100]
# 剩余牌数量
self.remaining_tiles = 108
# 胜利玩家列表
@@ -34,11 +36,8 @@ class ChengduMahjongState:
"""
valid_suits = ["", "", ""]
if missing_suit not in valid_suits:
logger.error(f"玩家 {player} 尝试设置无效的缺门: {missing_suit}")
raise ValueError("缺门设置无效")
self.missing_suits[player] = missing_suit
logger.info(f"玩家 {player} 设置缺门为: {missing_suit}")
def can_win(self, hand):
"""

39
src/engine/hand.py Normal file
View File

@@ -0,0 +1,39 @@
from collections import defaultdict
from collections import defaultdict
class Hand:
def __init__(self):
# 存储所有的牌
self.tiles = []
# 存储每种牌的数量,默认值为 0
self.tile_count = defaultdict(int)
def add_tile(self, tile):
""" 向手牌中添加一张牌 """
self.tiles.append(tile) # 将牌添加到手牌中
self.tile_count[tile] += 1 # 增加牌的数量
def remove_tile(self, tile):
""" 从手牌中移除一张牌 """
if self.tile_count[tile] > 0:
self.tiles.remove(tile)
self.tile_count[tile] -= 1
else:
raise ValueError(f"手牌中没有该牌: {tile}")
def get_tile_count(self, tile):
""" 获取手牌中某张牌的数量 """
return self.tile_count[tile]
def can_peng(self, tile):
""" 判断是否可以碰即是否已经有2张相同的牌摸一张牌后可以碰 """
return self.tile_count[tile] == 2 # 摸一张牌后总数为 3 张,才可以碰
def can_gang(self, tile):
""" 判断是否可以杠即是否已经有3张相同的牌摸一张牌后可以杠 """
return self.tile_count[tile] == 3 # 摸一张牌后总数为 4 张,才可以杠
def __repr__(self):
""" 返回手牌的字符串表示 """
return f"手牌: {self.tiles}, 牌的数量: {dict(self.tile_count)}"

View File

@@ -0,0 +1,20 @@
class MahjongTile:
SUITS = ['', '', '']
def __init__(self, suit, value):
if suit not in self.SUITS or not (1 <= value <= 9):
raise ValueError("Invalid tile")
self.suit = suit
self.value = value
def __repr__(self):
return f"{self.value}{self.suit}"
def __eq__(self, other):
return self.suit == other.suit and self.value == other.value
def __hash__(self):
return hash((self.suit, self.value))

32
src/engine/scoring.py Normal file
View File

@@ -0,0 +1,32 @@
def calculate_score(fan: int, base_score: int, is_self_draw: bool) -> dict:
"""
根据成都麻将规则计算得分(不区分庄家和闲家)。
参数:
- fan: 总番数。
- base_score: 底分。
- is_self_draw: 是否为自摸。
返回:
- scores: 字典,包含赢家得分和输家扣分。
"""
# 计算总分
multiplier = 2 ** fan # 根据番数计算倍率
total_score = base_score * multiplier
if is_self_draw:
# 自摸:三家平摊分数
per_loser_score = -total_score
winner_score = total_score * 3 # 总赢家得分
return {
"winner": winner_score,
"loser": [per_loser_score] * 3
}
else:
# 点炮:点炮者独付
loser_score = -total_score
winner_score = total_score
return {
"winner": winner_score,
"loser": [loser_score, 0, 0]
}

View File

@@ -1,29 +1,19 @@
def get_suit(tile_index):
"""
根据牌的索引返回花色。
条:索引 0-35索引 36-71索引 72-107
参数:
- tile_index: 牌的索引0-107
返回:
- 花色字符串: """"""
"""
suits = ["", "", ""]
return suits[tile_index // 36]
if 0 <= tile_index <= 35:
return ""
elif 36 <= tile_index <= 71:
return ""
elif 72 <= tile_index <= 107:
return ""
else:
raise ValueError(f"无效的牌索引: {tile_index}")
def get_tile_name(tile_index):
"""
根据牌的索引返回具体的牌(花色和数字)。
参数:
- tile_index: 牌的索引0-107
返回:
- 具体牌的字符串: 例如 "1条""9筒""5万"
根据牌的索引返回牌名例如1条2筒等)。
"""
suits = ["", "", ""]
suit = suits[tile_index // 36] # 根据索引获取花色
number = (tile_index % 36) // 4 + 1 # 计算具体数字1-9
return f"{number}{suit}"
suit = get_suit(tile_index)
return f"{tile_index % 36 + 1}{suit}"

View File

@@ -0,0 +1,120 @@
import gym
import numpy as np
from gym import spaces
from src.engine.actions import draw_tile, discard_tile, peng, gang, check_blood_battle
from src.engine.calculate_fan import calculate_fan, is_seven_pairs, is_cleared, is_big_pairs
from src.engine.chengdu_mahjong_engine import ChengduMahjongEngine
from src.engine.scoring import calculate_score
class MahjongEnv(gym.Env):
def __init__(self):
super(MahjongEnv, self).__init__()
self.engine = ChengduMahjongEngine()
self.scores = [100, 100, 100, 100] # 四位玩家初始分数
self.base_score = 1 # 底分
self.max_rounds = 100 # 最大轮数,防止游戏无限进行
self.current_round = 0 # 当前轮数
self.action_space = spaces.Discrete(108) # 动作空间:打牌的索引
self.observation_space = spaces.Box(low=0, high=4, shape=(108,), dtype=np.int32)
def reset(self):
self.engine = ChengduMahjongEngine()
self.scores = [100, 100, 100, 100] # 每局重置分数
self.current_round = 0
return self.engine.state.hands[self.engine.state.current_player]
def step(self, action):
"""
执行玩家动作并更新游戏状态。
参数:
- action: 玩家动作0 代表摸牌1 代表打牌2 代表碰牌3 代表杠牌
返回:
- next_state: 当前玩家的手牌
- reward: 奖励
- done: 是否结束
- info: 其他信息(如奖励历史等)
"""
done = False
reward = 0
try:
if action == 0: # 0代表摸牌
reward, done = draw_tile(self.engine) # 调用摸牌函数
elif action == 1: # 1代表打牌
tile = self.engine.state.hands[self.engine.state.current_player][0] # 假设选择第一张牌
discard_tile(self.engine, tile) # 调用打牌函数
reward, done = -1, False
elif action == 2: # 2代表碰牌
tile = self.engine.state.hands[self.engine.state.current_player][0] # 假设选择第一张牌
peng(self.engine, tile) # 调用碰牌函数
reward, done = 0, False
elif action == 3: # 3代表杠牌
tile = self.engine.state.hands[self.engine.state.current_player][0] # 假设选择第一张牌
gang(self.engine, tile, mode="ming") # 暂时假设为明杠
reward, done = 0, False
# 检查是否胡牌
if self.engine.state.can_win(self.engine.state.hands[self.engine.state.current_player]):
reward, done = self.handle_win() # 胡牌时处理胜利逻辑
# 检查游戏结束条件
check_blood_battle(self.engine)
if self.engine.game_over: # 检查是否游戏结束
done = True
except ValueError:
reward, done = -10, False # 非法操作扣分
# 切换到下一个玩家
self.engine.state.current_player = (self.engine.state.current_player + 1) % 4
self.current_round += 1
# 如果达到最大轮数,结束游戏
if self.current_round >= self.max_rounds:
done = True
reward = 0 # 平局奖励或惩罚(可调整)
return self.engine.state.hands[self.engine.state.current_player], reward, done, {}
def handle_win(self):
"""
处理胡牌后的分数结算和奖励。
"""
winner = self.engine.state.current_player
hand = self.engine.state.hands[winner]
melds = self.engine.state.melds[winner]
is_self_draw = True # 假设自摸(后续可动态判断)
conditions = {
"is_cleared": is_cleared(hand, melds),
"is_seven_pairs": is_seven_pairs(hand),
"is_big_pairs": is_big_pairs(hand),
# 添加其他条件...
}
# 动态计算番数
fan = calculate_fan(hand, melds, is_self_draw, is_cleared, conditions)
# 动态计算得分
scores = calculate_score(fan, self.base_score, is_self_draw)
self.scores[winner] += scores["winner"]
for i, score in enumerate(scores["loser"]):
self.scores[i] += score # 扣分
# 奖励设置为赢家得分
reward = scores["winner"]
self.engine.state.winners.append(winner) # 添加赢家到列表
return reward, True # 胡牌结束当前局
def render(self, mode="human"):
"""
打印游戏状态信息,便于调试。
"""
print(f"当前轮数: {self.current_round}")
print("玩家分数:", self.scores)
print("当前玩家状态:", self.engine.state.hands[self.engine.state.current_player])

3
test.py Normal file
View File

@@ -0,0 +1,3 @@
import torch
print(torch.cuda.is_available()) # 如果返回True说明可以使用GPU
print(torch.__version__)

View File

@@ -53,9 +53,28 @@ def test_clear_win():
fan = calculate_fan(hand, melds, is_self_draw=True, is_cleared=is_cleared(hand), conditions=conditions)
assert fan == 3, f"Expected 3 fans (1 basic + 2 cleared), got {fan}"
def test_pure_cleared():
"""
测试清对计分
"""
hand = [2] * 6 + [0] * 102 # 手牌:清对
melds = [0, 1, 2] # 模拟碰
conditions = {"is_pure_cleared": True}
fan = calculate_fan(hand, melds, is_self_draw=False, is_cleared=True, conditions=conditions)
assert fan == 3, f"Expected 3 fans (pure cleared), got {fan}"
def test_pure_cleared():
"""
测试清对计分
"""
hand = [2] * 6 + [0] * 102 # 手牌:清对
melds = [0, 1, 2] # 模拟碰
conditions = {"is_pure_cleared": True} # 明确条件
fan = calculate_fan(hand, melds, is_self_draw=False, is_cleared=True, conditions=conditions)
assert fan == 3, f"Expected 3 fans (pure cleared), got {fan}"
def test_seven_pairs():
"""
测试七对计分
@@ -77,6 +96,40 @@ def test_seven_pairs():
assert fan == 2, f"Expected 2 fans (7 pairs), got {fan}"
def test_small_pairs():
"""
测试小七对计分
"""
hand = [2] * 6 + [0] * 102
melds = []
conditions = {"is_small_pairs": True}
fan = calculate_fan(hand, melds, is_self_draw=False, is_cleared=False, conditions=conditions)
assert fan == 2, f"Expected 2 fans (small pairs), got {fan}"
def test_clear_seven_pairs():
"""
测试清七对计分
"""
hand = [2] * 7 + [0] * 101
melds = []
conditions = {"is_clear_seven_pairs": True}
fan = calculate_fan(hand, melds, is_self_draw=False, is_cleared=True, conditions=conditions)
assert fan == 12, f"Expected 12 fans (clear seven pairs), got {fan}"
def test_full_request():
"""
测试全求人计分
"""
hand = [2] + [0] * 107
melds = []
conditions = {"is_full_request": True}
fan = calculate_fan(hand, melds, is_self_draw=False, is_cleared=False, conditions=conditions)
assert fan == 6, f"Expected 6 fans (full request), got {fan}"
def test_big_pairs():
"""
测试大对子计分
@@ -126,6 +179,59 @@ def test_gang_flower():
assert fan == 2, f"Expected 2 fans (1 basic + 1 gang flower), got {fan}"
def test_rob_gang():
"""
测试抢杠胡计分
"""
hand = [0] * 108
melds = []
conditions = {"is_rob_gang": True}
fan = calculate_fan(hand, melds, is_self_draw=False, is_cleared=False, conditions=conditions)
assert fan == 1, f"Expected 1 fan (rob gang), got {fan}"
def test_under_the_sea():
"""
测试海底捞月计分
"""
hand = [0] * 108
melds = []
conditions = {"is_under_the_sea": True}
fan = calculate_fan(hand, melds, is_self_draw=True, is_cleared=False, conditions=conditions)
assert fan == 1, f"Expected 1 fan (under the sea), got {fan}"
def test_cannon():
"""
测试放炮计分
"""
hand = [0] * 108
melds = []
conditions = {"is_cannon": True}
fan = calculate_fan(hand, melds, is_self_draw=False, is_cleared=False, conditions=conditions)
assert fan == 1, f"Expected 1 fan (cannon), got {fan}"
def test_tian_hu():
"""
测试天胡计分
"""
hand = [0] * 108
melds = []
conditions = {"is_tian_hu": True}
fan = calculate_fan(hand, melds, is_self_draw=True, is_cleared=False, conditions=conditions)
assert fan == 12, f"Expected 12 fans (tian hu), got {fan}"
def test_di_hu():
"""
测试地胡计分
"""
hand = [0] * 108
melds = []
conditions = {"is_di_hu": True}
fan = calculate_fan(hand, melds, is_self_draw=False, is_cleared=False, conditions=conditions)
assert fan == 12, f"Expected 12 fans (di hu), got {fan}"
def test_dragon_seven_pairs():
"""
测试龙七对计分
@@ -147,7 +253,15 @@ def test_dragon_seven_pairs():
}
fan = calculate_fan(hand, melds, is_self_draw=True, is_cleared=False, conditions=conditions)
assert fan == 13, f"Expected 13 fans (1 self-draw + 12 dragon seven pairs), got {fan}"
assert fan == 12, f"Expected 13 fans (1 self-draw + 12 dragon seven pairs), got {fan}"
def test_self_draw():
"""
测试自摸计分
"""
hand = [0] * 108
melds = []
conditions = {}
fan = calculate_fan(hand, melds, is_self_draw=True, is_cleared=False, conditions=conditions)
assert fan == 1, f"Expected 1 fan (self-draw), got {fan}"

64
tests/test_hand.py Normal file
View File

@@ -0,0 +1,64 @@
from src.engine.hand import Hand
def test_hand():
# 创建一个玩家的手牌
hand = Hand()
# 添加一些牌到手牌中
hand.add_tile("1条")
hand.add_tile("1条")
hand.add_tile("2条")
hand.add_tile("2条")
hand.add_tile("2条")
hand.add_tile("3条")
# 打印手牌
print("\n当前手牌:", hand)
# 测试获取某张牌的数量
assert hand.get_tile_count("1条") == 2, f"测试失败1条应该有 2 张"
assert hand.get_tile_count("2条") == 3, f"测试失败2条应该有 3 张"
assert hand.get_tile_count("3条") == 1, f"测试失败3条应该有 1 张"
# 测试移除一张牌
hand.remove_tile("1条")
print("移除 1条 后的手牌:", hand)
assert hand.get_tile_count("1条") == 1, f"测试失败1条应该有 1 张"
# 确保移除后有足够的牌可以碰
# 添加一张 1条确保可以碰
hand.add_tile("1条")
print("添加 1条 后的手牌:", hand)
# 测试是否可以碰
assert hand.can_peng("1条") == True, f"测试失败1条应该可以碰"
print("可以碰 1条 的牌:", hand.can_peng("1条"))
assert hand.can_peng("3条") == False, f"测试失败3条不可以碰"
print("不可以碰 3条 的牌:", hand.can_peng("3条"))
# 测试是否可以杠
assert hand.can_gang("1条") == False, f"测试失败1条不可以杠"
print("不可以杠 1条 的牌:", hand.can_gang("1条"))
assert hand.can_gang("2条") == False, f"测试失败2条不可以杠"
print("不可以杠 2条 的牌:", hand.can_gang("2条"))
# 添加更多牌来形成杠
hand.add_tile("2条")
print("添加牌后手牌:", hand)
hand.add_tile("2条")
print("添加牌后手牌:", hand)
assert hand.can_gang("2条") == False, f"测试失败2条不可以杠" # still not enough for gang
# 添加一张更多的 2条 来形成杠
hand.add_tile("2条")
print("添加一张2条后:", hand)
assert hand.can_gang("2条") == True, f"测试失败2条应该可以杠"
print("所有测试通过!")
# 运行测试
test_hand()

View File

@@ -0,0 +1,45 @@
from src.engine.mahjong_tile import MahjongTile
def test_mahjong_tile():
# 测试合法的牌
tile1 = MahjongTile("", 5)
assert tile1.suit == "", f"测试失败:预期花色是 '',但实际是 {tile1.suit}"
assert tile1.value == 5, f"测试失败:预期面值是 5但实际是 {tile1.value}"
assert repr(tile1) == "5条", f"测试失败:预期牌名是 '5条',但实际是 {repr(tile1)}"
tile2 = MahjongTile("", 3)
assert tile2.suit == "", f"测试失败:预期花色是 '',但实际是 {tile2.suit}"
assert tile2.value == 3, f"测试失败:预期面值是 3但实际是 {tile2.value}"
assert repr(tile2) == "3筒", f"测试失败:预期牌名是 '3筒',但实际是 {repr(tile2)}"
tile3 = MahjongTile("", 9)
assert tile3.suit == "", f"测试失败:预期花色是 '',但实际是 {tile3.suit}"
assert tile3.value == 9, f"测试失败:预期面值是 9但实际是 {tile3.value}"
assert repr(tile3) == "9万", f"测试失败:预期牌名是 '9万',但实际是 {repr(tile3)}"
# 测试非法的牌
try:
MahjongTile("", 10) # 面值超出范围
assert False, "测试失败:面值为 10 的牌应该抛出异常"
except ValueError:
pass # 正确抛出异常
try:
MahjongTile("", 5) # 花色无效
assert False, "测试失败:花色为 '' 的牌应该抛出异常"
except ValueError:
pass # 正确抛出异常
# 测试相等判断
tile4 = MahjongTile("", 5)
assert tile1 == tile4, f"测试失败:预期 {tile1}{tile4} 相等"
tile5 = MahjongTile("", 5)
assert tile1 != tile5, f"测试失败:预期 {tile1}{tile5} 不相等"
# 测试哈希
tile_set = {tile1, tile4, tile2}
assert len(tile_set) == 2, f"测试失败:集合中应该有 2 张牌,而实际有 {len(tile_set)}"
print("所有测试通过!")

19
tests/test_scoring.py Normal file
View File

@@ -0,0 +1,19 @@
import pytest
from src.engine.scoring import calculate_score
@pytest.mark.parametrize("fan, is_self_draw, base_score, expected_scores", [
# 测试用例 1: 自摸,总番数 3
(3, True, 5, {"winner": 120, "loser": [-40, -40, -40]}),
# 测试用例 2: 点炮,总番数 2
(2, False, 5, {"winner": 20, "loser": [-20, 0, 0]}),
# 测试用例 3: 自摸,总番数 4
(4, True, 5, {"winner": 240, "loser": [-80, -80, -80]}),
# 测试用例 4: 点炮,总番数 1
(1, False, 5, {"winner": 10, "loser": [-10, 0, 0]}),
])
def test_calculate_score(fan, is_self_draw, base_score, expected_scores):
scores = calculate_score(fan, base_score, is_self_draw)
assert scores == expected_scores, f"测试失败: {scores} != {expected_scores}"

36
tests/test_utils.py Normal file
View File

@@ -0,0 +1,36 @@
from src.engine.utils import get_suit,get_tile_name
def test_get_suit():
# 测试条花色0-35
for i in range(36):
assert get_suit(i) == "", f"测试失败:索引 {i} 应该是 ''"
# 测试筒花色36-71
for i in range(36, 72):
assert get_suit(i) == "", f"测试失败:索引 {i} 应该是 ''"
# 测试万花色72-107
for i in range(72, 108):
assert get_suit(i) == "", f"测试失败:索引 {i} 应该是 ''"
# 测试无效索引
try:
get_suit(108)
assert False, "测试失败:索引 108 应该抛出 ValueError"
except ValueError:
pass # 如果抛出 ValueError测试通过
print("get_suit 测试通过!")
def test_get_tile_name():
# 测试每个牌的名称是否正确
for i in range(108):
tile_name = get_tile_name(i)
assert tile_name == f"{i % 36 + 1}{get_suit(i)}", \
f"测试失败:索引 {i} 应该是 '{i % 36 + 1}{get_suit(i)}',但实际返回 '{tile_name}'"
print("get_tile_name 测试通过!")
# 运行测试
test_get_suit()
test_get_tile_name()