wsy182 2024-12-01 19:13:02 +08:00
parent 3ff448b15d
commit bf1c5116be
4 changed files with 185 additions and 66 deletions

View File

@ -1,6 +1,6 @@
import gym
from stable_baselines3 import PPO
from src.environment.chengdu_majiang_env import MahjongEnv
from src.environment.chengdu_mahjong_env import MahjongEnv
import torch
from configs.log_config import setup_logging

View File

@ -3,7 +3,7 @@ import random
from loguru import logger
from configs.log_config import setup_logging
from src.engine.actions import draw_tile, check_blood_battle, should_gang, random_choice
from src.engine.actions import draw_tile, should_gang, random_choice, handle_win, handle_gang, handle_peng
from src.engine.actions import set_missing_suit, check_other_players
from src.engine.chengdu_mahjong_state import ChengduMahjongState
@ -132,4 +132,42 @@ class ChengduMahjongEngine:
while not self.game_over:
self.play_turn()
logger.info("游戏已结束")
logger.info("游戏已结束")
def check_other_players(self, tile):
"""
检查其他玩家是否可以对打出的牌进行操作如胡牌
优先级为胡牌 > 杠牌 > 碰牌
如果有玩家选择操作修改游戏状态和出牌顺序
"""
current_player = self.state.current_player
actions_taken = False
for player in range(4):
if player == current_player:
continue
# 优先检查胡牌
if self.state.can_win(self.state.hands[player], self.state.melds[player], self.state.missing_suits[player]):
logger.info(f"玩家 {player} 可以胡玩家 {current_player} 的牌: {tile}")
handle_win(player, current_player, tile)
actions_taken = True
break # 胡牌后结束
# 检查是否可以杠牌
if self.state.hands[player].tile_count[tile] >= 3:
logger.info(f"玩家 {player} 可以杠玩家 {current_player} 的牌: {tile}")
if handle_gang(self, player, tile, mode="ming"): # 执行明杠逻辑
actions_taken = True
break # 杠牌后不检查其他玩家
# 检查是否可以碰牌
if self.state.hands[player].tile_count[tile] >= 2:
logger.info(f"玩家 {player} 可以碰玩家 {current_player} 的牌: {tile}")
if handle_peng(self, player, tile): # 执行碰牌逻辑
actions_taken = True
break # 碰牌后不检查其他玩家
if not actions_taken:
logger.info(f"玩家 {current_player} 打出的牌 {tile} 没有触发其他玩家的操作")
return actions_taken

View File

@ -0,0 +1,144 @@
import gym
from gym import spaces
import numpy as np
from src.engine.chengdu_mahjong_engine import ChengduMahjongEngine
from loguru import logger
class ChengduMahjongEnv(gym.Env):
def __init__(self):
super().__init__()
# 初始化麻将引擎
self.engine = ChengduMahjongEngine()
# 定义动作空间打牌0-13+ 特殊动作14: 碰, 15: 杠, 16: 胡)
self.action_space = spaces.Discrete(14 + 3)
# 定义观察空间:手牌、明牌、弃牌和庄家信息
self.observation_space = spaces.Dict({
"hand": spaces.Box(low=0, high=4, shape=(108,), dtype=np.int32), # 手牌数量
"melds": spaces.Box(low=0, high=4, shape=(108,), dtype=np.int32), # 明牌数量
"discard_pile": spaces.Box(low=0, high=4, shape=(108,), dtype=np.int32), # 弃牌数量
"dealer": spaces.Discrete(4), # 当前庄家
})
# 初始化游戏
self.reset()
def reset(self):
"""重置游戏状态"""
self.engine = ChengduMahjongEngine() # 重置引擎
self.engine.initialize_game()
self.engine.deal_tiles()
return self._get_observation()
def step(self, action):
"""
执行动作更新状态并返回结果
:param action: 动作0-13 表示打牌, 14 表示碰, 15 表示杠, 16 表示胡
:return: obs, reward, done, info
"""
current_player = self.engine.state.current_player
# **1. 检查动作是否合法并执行**
if action < 14: # 打牌动作
if action >= len(self.engine.state.hands[current_player].tiles):
raise ValueError(f"动作 {action} 超出手牌范围")
tile = self.engine.state.hands[current_player].tiles[action]
logger.info(f"玩家 {current_player} 选择打牌: {tile}")
self.engine.check_other_players(tile)
elif action == 14: # 碰
tile_to_peng = self._get_tile_for_special_action("peng")
if tile_to_peng:
self.engine.handle_peng(current_player, tile_to_peng)
else:
logger.warning("碰动作无效,未满足条件")
elif action == 15: # 杠
tile_to_gang = self._get_tile_for_special_action("gang")
if tile_to_gang:
self.engine.handle_gang(current_player, tile_to_gang, mode="an") # 默认暗杠
else:
logger.warning("杠动作无效,未满足条件")
elif action == 16: # 胡
if self.engine.state.can_win(
self.engine.state.hands[current_player],
self.engine.state.melds[current_player],
self.engine.state.missing_suits[current_player]
):
self.engine.handle_win(current_player, None, None)
else:
logger.warning("胡动作无效,未满足条件")
else:
raise ValueError(f"无效的动作: {action}")
# **2. 更新状态**
obs = self._get_observation()
# **3. 奖励设计**
reward = self._calculate_reward(current_player)
# **4. 检查游戏是否结束**
self.engine.check_game_over()
done = self.engine.game_over
# **5. 返回值**
info = {
"player": current_player,
"action": action,
}
return obs, reward, done, info
def _get_observation(self):
"""
提取当前玩家的观察空间
:return: dict
"""
player_index = self.engine.state.current_player
hand = np.zeros(108, dtype=np.int32)
melds = np.zeros(108, dtype=np.int32)
discard_pile = np.zeros(108, dtype=np.int32)
# 填充手牌、明牌和弃牌信息
for tile, count in self.engine.state.hands[player_index].tile_count.items():
hand[tile.index] = count
for meld in self.engine.state.melds[player_index]:
melds[meld.tile.index] += meld.count
for tile in self.engine.state.discards[player_index]:
discard_pile[tile.index] += 1
return {
"hand": hand,
"melds": melds,
"discard_pile": discard_pile,
"dealer": self.engine.state.current_player,
}
def _calculate_reward(self, current_player):
"""
奖励设计基于分数变化
:return: float
"""
return self.engine.state.scores[current_player] - 100
def _get_tile_for_special_action(self, action_type):
"""
获取可碰胡的牌
:param action_type: "peng", "gang", "win"
:return: tile or None
"""
if action_type == "peng":
for tile, count in self.engine.state.hands[self.engine.state.current_player].tile_count.items():
if count == 2: # 碰需要两张相同的牌
return tile
elif action_type == "gang":
for tile, count in self.engine.state.hands[self.engine.state.current_player].tile_count.items():
if count == 4: # 杠需要四张相同的牌
return tile
elif action_type == "win":
if self.engine.state.can_win(
self.engine.state.hands[self.engine.state.current_player],
self.engine.state.melds[self.engine.state.current_player],
self.engine.state.missing_suits[self.engine.state.current_player]
):
return True
return None

View File

@ -1,63 +0,0 @@
import gym
from gym import spaces
import numpy as np
from src.engine.chengdu_mahjong_state import ChengduMahjongState
class ChengduMahjongEnv(gym.Env):
def __init__(self):
super().__init__()
self.state = ChengduMahjongState()
self.action_space = spaces.Discrete(5) # 0: 出牌, 1: 碰, 2: 杠, 3: 胡, 4: 过
self.observation_space = spaces.Dict({
"hand": spaces.Box(low=0, high=4, shape=(108,), dtype=np.int32), # 手牌数量
"melds": spaces.Box(low=0, high=4, shape=(108,), dtype=np.int32), # 明牌数量
"discard_pile": spaces.Box(low=0, high=4, shape=(108,), dtype=np.int32), # 弃牌数量
"dealer": spaces.Discrete(4), # 当前庄家
})
self.reset()
def reset(self):
"""重置游戏状态"""
self.state.reset() # 初始化游戏状态
return self._get_observation()
def step(self, action):
reward = 0
done = False
if action == 0: # 出牌
self.state.discard()
elif action == 1: # 碰
self.state.peng()
elif action == 2: # 杠
self.state.kong()
elif action == 3: # 胡
reward, done = self.state.win()
elif action == 4: # 过
self.state.pass_turn()
# 检查游戏是否结束
done = done or self.state.is_game_over()
return self._get_observation(), reward, done, {}
def _get_observation(self):
"""获取玩家当前的观察空间"""
player_index = self.state.current_player
hand = np.zeros(108, dtype=np.int32)
melds = np.zeros(108, dtype=np.int32)
discard_pile = np.zeros(108, dtype=np.int32)
# 填充手牌、明牌和弃牌信息
for tile, count in self.state.hands[player_index].tile_count.items():
hand[tile.index] = count
for meld in self.state.melds[player_index]:
melds[meld.tile.index] += meld.count
for tile in self.state.discards[player_index]:
discard_pile[tile.index] += 1
return {
"hand": hand,
"melds": melds,
"discard_pile": discard_pile,
"dealer": self.state.current_player
}