wsy182 2024-12-01 22:19:34 +08:00
parent 3e65e02704
commit a14984a263
3 changed files with 131 additions and 29 deletions

View File

@ -0,0 +1,80 @@
import numpy as np
from src.engine.dizhu.player_state import PlayerState
from src.engine.dizhu.deck import Deck
class DiZhuEngine:
def __init__(self):
self.deck = Deck() # 牌堆
self.players = [] # 玩家列表
self.landlord_index = -1 # 地主索引
self.current_player_index = 0 # 当前玩家索引
self.landlord_cards = [] # 地主牌
self.game_over = False # 是否游戏结束
def reset(self):
"""
初始化游戏状态包括发牌和分配角色
"""
# 洗牌并发牌
p1_hand, p2_hand, p3_hand, landlord_cards = self.deck.deal()
self.landlord_cards = landlord_cards
# 创建玩家
self.players = [
PlayerState(p1_hand, "农民"),
PlayerState(p2_hand, "农民"),
PlayerState(p3_hand, "地主")
]
self.landlord_index = 2 # 默认玩家 3 为地主
self.current_player_index = 0
self.game_over = False
def get_current_player(self):
"""
获取当前玩家对象
"""
return self.players[self.current_player_index]
def step(self, action):
"""
执行玩家的出牌动作
:param action: 当前玩家的动作出牌或过牌
"""
current_player = self.get_current_player()
if action == "pass":
# 玩家选择过牌
current_player.history.append("pass")
else:
# 玩家出牌,移除对应牌
if not all(card in current_player.hand_cards for card in action):
raise ValueError("玩家手牌不足以完成此次出牌")
for card in action:
current_player.hand_cards.remove(card)
current_player.history.append(action)
# 检查是否游戏结束
if not current_player.hand_cards:
self.game_over = True
return f"{current_player.role} 胜利!"
# 切换到下一个玩家
self.current_player_index = (self.current_player_index + 1) % 3
def get_game_state(self):
"""
返回当前游戏状态包括玩家手牌出牌历史和当前玩家
"""
state = {
"landlord_cards": self.landlord_cards,
"players": [
{
"role": player.role,
"hand_cards": player.hand_cards,
"history": player.history,
}
for player in self.players
],
"current_player_index": self.current_player_index,
"game_over": self.game_over,
}
return state

View File

@ -2,8 +2,8 @@ import gym
from gym import spaces
import numpy as np
from src import handle_peng, handle_gang, handle_win
from src import ChengduMahjongEngine
from src.engine.mahjong.actions import handle_peng, handle_gang, handle_win
from src.engine.mahjong.chengdu_mahjong_engine import ChengduMahjongEngine
from loguru import logger

View File

@ -1,35 +1,57 @@
import gym
import numpy as np
from gym import spaces
from src.engine.dizhu.dizhu_engine import DiZhuEngine # 引入地主引擎
from src.engine.dizhu.player_state import PlayerState
from src.engine.dizhu.deck import Deck
class DouDiZhuEnv:
class DouDiZhuEnv(gym.Env):
def __init__(self):
self.deck = Deck()
self.players = [] # 初始化玩家
self.landlord = None
self.current_player_index = 0
self.action_space = spaces.Discrete(54) # 动作空间,出一张牌或“过牌”
self.observation_space = spaces.Box(low=0, high=1, shape=(54,)) # 牌局状态表示
super(DouDiZhuEnv, self).__init__()
self.engine = DiZhuEngine() # 初始化斗地主引擎
self.action_space = spaces.Discrete(55) # 假设最大动作空间为 55表示可能的出牌和过牌
self.observation_space = spaces.Dict({
"hand_cards": spaces.Box(low=0, high=1, shape=(54,), dtype=np.int32), # 玩家手牌(独热编码)
"history": spaces.Box(low=0, high=1, shape=(54,), dtype=np.int32), # 出牌历史
})
def reset(self):
p1_hand, p2_hand, p3_hand, landlord_cards = self.deck.deal()
self.players = [
PlayerState(p1_hand, "农民"),
PlayerState(p2_hand, "农民"),
PlayerState(p3_hand, "地主"),
]
self.landlord = self.players[2]
self.current_player_index = 0
"""重置游戏环境"""
self.engine.reset()
return self._get_observation()
def _get_observation(self):
# 返回当前玩家的状态,具体实现根据模型需求定制
return {
"hand": self.players[self.current_player_index].hand_cards,
"history": self.players[self.current_player_index].history,
}
def step(self, action):
# 执行动作,更新状态
pass
"""执行动作并更新环境"""
try:
# 根据动作索引解析出具体的出牌动作
if action == 0:
self.engine.step("pass")
else:
card_index = action - 1 # 动作索引 1-54 对应 54 张牌
self.engine.step([card_index])
# 更新游戏状态
done = self.engine.game_over
reward = 1 if done else 0 # 简单奖励:胜利得 1 分,其他情况得 0
return self._get_observation(), reward, done, {}
except ValueError as e:
# 如果玩家执行了无效动作,给予惩罚
return self._get_observation(), -1, False, {"error": str(e)}
def _get_observation(self):
"""获取当前玩家的状态"""
current_player = self.engine.get_current_player()
hand_cards = np.zeros(54, dtype=np.int32)
for card in current_player.hand_cards:
hand_cards[card] = 1
history = np.zeros(54, dtype=np.int32)
for play in current_player.history:
for card in play:
history[card] = 1
return {"hand_cards": hand_cards, "history": history}
def render(self, mode="human"):
"""打印当前游戏状态"""
state = self.engine.get_game_state()
print(state)