Source code for minestudio.simulator.callbacks.play

'''
Date: 2024-11-14 20:10:54
LastEditors: muzhancun muzhancun@stu.pku.edu.cn
LastEditTime: 2024-11-20 01:06:35
FilePath: /MineStudio/minestudio/simulator/callbacks/play.py
'''
from minestudio.simulator.callbacks.callback import MinecraftCallback
from minestudio.simulator.utils import MinecraftGUI, GUIConstants

import time
from typing import Dict, Literal, Optional, Callable, Tuple, List, Any
from rich import print

DEBUG = False

[docs] class PlayCallback(MinecraftCallback): """Enables interactive play and/or agent-driven gameplay in a GUI window. This callback provides a graphical interface for human players to interact with the Minecraft environment. It can also run a pre-trained agent and allow switching control between human and agent. Key functionalities: - Renders the game view in a separate window. - Captures keyboard and mouse input for human control. - Can load and run a specified agent model. - Allows switching between human and agent control (default: 'L' key). - Displays game information (FPS, player position, current mode) in the GUI. - Handles custom key bindings for actions like mouse capture and closing. :param agent_generator: A callable that returns an agent instance. If None, only human play is enabled. Defaults to None. :type agent_generator: Callable, optional :param extra_draw_call: A list of additional callable functions to be executed during the GUI drawing phase. Defaults to None. :type extra_draw_call: Optional[List[Callable]], optional """ def __init__( self, agent_generator: Callable = None, extra_draw_call: Optional[List[Callable]] = None ): """Initializes the PlayCallback. Sets up the GUI, loads the agent if provided, and prints key bindings. :param agent_generator: Function to generate the agent model. :param extra_draw_call: Additional functions for custom GUI drawing. """ self.gui = MinecraftGUI(extra_draw_call=extra_draw_call) self.constants = GUIConstants() self.start_time = time.time() self.end_time = time.time() if agent_generator is not None: print(f'[green]Load agent with name: {agent_generator.func.__name__}, args: {agent_generator.keywords}[/green]') self.agent = agent_generator().to("cuda") else: self.agent = None self.switch = 'human' self.terminated = False self.last_action = None self.timestep = 0 if self.agent is not None: self.reset_agent() # print a table of key bindings print( f'[yellow]Extra Key Bindings Besides Minecraft Controls:[/yellow]\n' f' [white]C[/white]: Capture Mouse \n' f' [white]Left Ctrl + C[/white]: [red]Close Window[/red] \n' f' [white]Esc[/white]: [green]Enter Command Mode[/green] \n' )
[docs] def reset_agent(self): """Resets the agent's internal state (e.g., memory). Called when switching to agent control or at the start of a new episode. """ self.memory = None
[docs] def before_reset(self, sim, reset_flag: bool) -> bool: """Resets the GUI before the environment resets. :param sim: The simulator instance. :param reset_flag: The current reset flag status. :returns: The passed `reset_flag`. :rtype: bool """ self.gui.reset_gui() return reset_flag
[docs] def after_reset(self, sim, obs: Dict, info: Dict) -> Tuple[Dict, Dict]: """Handles tasks after the environment resets. Resets termination flag, agent state, updates GUI, and resets timestep. :param sim: The simulator instance. :param obs: The initial observation. :param info: The initial info dictionary. :returns: The passed `obs` and `info`. :rtype: Tuple[Dict, Dict] """ self.terminated = False if self.agent is not None: self.reset_agent() sim.callback_messages.add("Press 'L' to switch control.") self.gui._update_image(info) self.timestep = 0 return obs, info
[docs] def before_step(self, sim, action: Any) -> Dict: """Determines and processes the action before the environment steps. Handles input from human (keyboard/mouse via GUI) or agent based on the current control switch (`self.switch`). Also processes chat messages. The executed action will be added to the info dict as "taken_action". :param sim: The simulator instance. :param action: The proposed action (can be None, a string like 'human', or an action dict). :type action: Any :returns: The action dictionary to be executed by the environment. :rtype: Dict """ assert not self.terminated, "Cannot step environment after it is done." self.gui.window.dispatch_events() if isinstance(action, str) or action is None: if isinstance(action, str) and action != self.switch: self.switch = action if self.switch == "agent": if self.agent is None: print('[red]agent is not specified, switch to human control[/red]') self.switch = 'human' else: self.reset_agent() if self.switch != "agent": if self.gui.command != "": action = sim.noop_action() else: human_action = self.gui._get_human_action() action = human_action else: assert self.agent is not None, "Agent is not specified." agent_action, self.memory = self.agent.get_action(sim.obs, self.memory, input_shape = "*") agent_action = sim.agent_action_to_env_action(agent_action) action = agent_action if self.gui.chat_message is not None: #!WARNING: should stop the game when chat message is not None action["chat"] = self.gui.chat_message self.gui.chat_message = None self.last_action = action return action
[docs] def after_step(self, sim, obs: Dict, reward: float, terminated: bool, truncated: bool, info: Dict) -> Tuple[Dict, float, bool, bool, Dict]: """Handles tasks after the environment takes a step. Updates GUI with new observation and info, calculates FPS, processes key releases (like switching control or mouse capture), and handles termination. :param sim: The simulator instance. :param obs: The observation after the step. :param reward: The reward received. :param terminated: Whether the episode has terminated. :param truncated: Whether the episode has been truncated. :param info: The info dictionary. :returns: The (potentially modified) obs, reward, terminated, truncated, and info. :rtype: Tuple[Dict, float, bool, bool, Dict] """ self.terminated = terminated self.timestep += 1 self.end_time = time.time() time.sleep(max(0, self.constants.MINERL_FRAME_TIME - (self.end_time - self.start_time))) fps = 1 / (self.end_time - self.start_time) self.start_time = time.time() released_keys = self.gui._capture_all_keys() if 'ESCAPE' in released_keys: info['ESCAPE'] = True self.gui.mode = 'command' print(f'[green]Command Mode Activated[/green]') message = [ [f"Role: {self.switch}", f"Mode: {self.gui.mode}", f"Timestep: {self.timestep}", f"FPS: {fps:.2f}"], [f"X: {info['player_pos']['x']:.2f}", f"Y: {info['player_pos']['y']:.2f}", f"Z: {info['player_pos']['z']:.2f}"], ] if DEBUG: print(f'[yellow]Debug Information:[/yellow]') ignored_keys = ['location_stats', 'voxels', 'mobs', 'health', 'food_level', 'pov', 'inventory', 'equipped_items', 'use_item', 'pickup', 'break_item', 'craft_item', 'mine_block', 'damage_dealt', 'kill_entity', 'player_pos', 'is_gui_open', 'isGuiOpen'] for k, v in info.items(): if k not in ignored_keys: print(f'{k}: {v}') print(f'[End of Debug Information]') for name, message_item in info.get('message', {}).items(): message.append([message_item]) action_items = [] for k, v in self.last_action.items(): if k == 'camera': v = f"({v[0]:.2f}, {v[1]:.2f})" elif 'hotbar' in k: continue elif 'chat' in k: continue action_items.append(f"{k}: {v}") message.append(action_items) self.gui._update_image(info, message=message) info['ESCAPE'] = False # don't forget to reset the key if self.gui.mode == 'command': help_message = "" for message_item in sim.callback_messages: help_message += message_item + '\n' self.gui._show_message(help_message) released_keys = self.process_keys(sim, released_keys) for key in released_keys: if key in info: info[key] = not info[key] else: info[key] = True terminated = self.terminated # press 'L' to switch control if 'L' in released_keys: switch_control = True self.switch = 'human' if self.switch == 'agent' else 'agent' print(f'[red]Switch to {self.switch} control[/red]') else: switch_control = False if terminated: self.gui._show_message("Episode terminated.") info["taken_action"] = self.last_action info['switch'] = self.switch self.terminated = terminated if switch_control: #? TODO: add more features after switching control if self.switch == 'agent': if self.agent is None: print('[red]agent is not specified, switch to human control[/red]') self.switch = 'human' else: self.reset_agent() obs, reward, terminated, truncated, info = sim.step(sim.noop_action()) self.last_action = sim.noop_action() self.terminated = terminated self.timestep += 1 return obs, reward, terminated, truncated, info
[docs] def before_close(self, sim): """Closes the GUI window before the simulator closes. :param sim: The simulator instance. """ self.gui.close_gui()
[docs] def process_keys(self, sim, released_keys: set) -> set: """Processes special key releases for GUI and simulation control. Handles: - 'C': Toggle mouse capture (exclusive mouse mode). - Ctrl+'C': Close the window and terminate the simulation. - 'ESCAPE': Enter/exit command mode (currently exits by clearing keys). :param sim: The simulator instance. :param released_keys: A set of keys that were released in this frame. :type released_keys: set :returns: The set of `released_keys` after processing (potentially modified). :rtype: set """ # press 'C' to set mouse visibility if 'C' in released_keys: # print('shit') if not (self.gui.modifiers & self.gui.key.MOD_CTRL): self.gui.capture_mouse = not self.gui.capture_mouse self.gui.window.set_mouse_visible(not self.gui.capture_mouse) self.gui.window.set_exclusive_mouse(self.gui.capture_mouse) else: # press ctrl+C to close the window and stop the simulation print(f'[red]Close the window![/red]') self.terminated = True # press 'ESC' to enter command mode if 'ESCAPE' in released_keys: time_count = 0 # Renamed variable to avoid conflict with time module while True: self.gui.window.dispatch_events() self.gui.window.switch_to() self.gui.window.flip() current_released_keys = self.gui._capture_all_keys() # Use a different variable name time_count += 1 if len(current_released_keys) > 0: released_keys = current_released_keys # Update the original set if needed break self.gui.mode = 'normal' # delete ESCAPE in released keys released_keys = set(released_keys) - {'ESCAPE'} if 'C' in released_keys and (self.gui.modifiers & self.gui.key.MOD_CTRL): print(f'[red]Close the window![/red]') self.terminated = True else: # delete all keys in the released_keys released_keys = set() return released_keys