Source code for minestudio.simulator.entry

'''
Date: 2024-11-11 05:20:17
LastEditors: muzhancun muzhancun@stu.pku.edu.cn
LastEditTime: 2025-05-26 22:10:06
FilePath: /MineStudio/minestudio/simulator/entry.py
'''

import os
import cv2
import argparse
import numpy as np
import torch
import gymnasium
from gymnasium import spaces
from copy import deepcopy
from typing import Dict, List, Tuple, Union, Sequence, Mapping, Any, Optional, Literal
from dataclasses import asdict, dataclass, field, fields

from minestudio.utils.vpt_lib.actions import ActionTransformer
from minestudio.utils.vpt_lib.action_mapping import CameraHierarchicalMapping
from minestudio.simulator.minerl.utils.inventory import map_slot_number_to_cmd_slot
from minestudio.simulator.minerl.herobraine.env_specs.human_survival_specs import HumanSurvival
from minestudio.simulator.callbacks import MinecraftCallback
from minestudio.utils import get_mine_studio_dir


[docs] @dataclass class CameraConfig: """Configuration for camera quantization and binning settings. :param camera_binsize: The size of each bin for camera quantization, default is 2. :param camera_maxval: The maximum value for camera quantization, default is 10. :param camera_mu: The mu parameter for mu-law quantization, default is 10.0. :param camera_quantization_scheme: The quantization scheme to use, either "mu_law" or "linear", default is "mu_law". """ camera_binsize: int = 2 camera_maxval: int = 10 camera_mu: float = 10.0 camera_quantization_scheme: str = "mu_law" def __post_init__(self): if self.camera_quantization_scheme not in ["mu_law", "linear"]: raise ValueError("camera_quantization_scheme must be 'mu_law' or 'linear'") @property def n_camera_bins(self): """The bin number of the setting. :returns: The number of camera bins. """ return 2 * self.camera_maxval // self.camera_binsize + 1 @property def action_transformer_kwargs(self): """Dictionary of camera settings used by an action transformer.""" return { 'camera_binsize': self.camera_binsize, 'camera_maxval': self.camera_maxval, 'camera_mu': self.camera_mu, 'camera_quantization_scheme': self.camera_quantization_scheme, }
[docs] def download_engine(): """Downloads the simulator engine from Hugging Face Hub and extracts it.""" import huggingface_hub, zipfile local_dir = get_mine_studio_dir() print(f"Downloading simulator engine to {local_dir}") huggingface_hub.hf_hub_download(repo_id='CraftJarvis/SimulatorEngine', filename='engine.zip', local_dir=local_dir) with zipfile.ZipFile(os.path.join(local_dir, 'engine.zip'), 'r') as zip_ref: zip_ref.extractall(local_dir) os.remove(os.path.join(local_dir, 'engine.zip'))
[docs] def check_engine(skip_confirmation=False): """Checks if the simulator engine exists and downloads it if not. :param skip_confirmation: If True, skips the confirmation prompt before downloading. """ if not os.path.exists(os.path.join(get_mine_studio_dir(), "engine", "build", "libs", "mcprec-6.13.jar")): if skip_confirmation: download_engine() else: response = input("Detecting missing simulator engine, do you want to download it from huggingface (Y/N)?\n") if response == 'Y' or response == 'y': download_engine() else: exit(0)
[docs] class MinecraftSim(gymnasium.Env): """MineStudio Minecraft Simulator. :param action_type: The type of the action space, can be 'env' or 'agent'. :param obs_size: The resolution of the observation, default is (224, 224). :param render_size: The original resolution of the game, default is (640, 360). :param seed: The seed of the minecraft world, default is 0. :param inventory: The initial inventory of the agent, default is an empty dict. :param preferred_spawn_biome: The preferred spawn biome when calling reset, default is None. :param num_empty_frames: The number of empty frames to skip when calling reset, default is 20. :param callbacks: A list of callbacks to be called before and after each basic calling. :param camera_config: The configuration for camera quantization and binning settings. :keyword kwargs: Additional keyword arguments. """ def __init__( self, action_type: Literal['env', 'agent'] = 'agent', # the style of the action space obs_size: Tuple[int, int] = (224, 224), # the resolution of the observation (cv2 resize) render_size: Tuple[int, int] = (640, 360), # the original resolution of the game is 640x360 seed: int = 0, # the seed of the minecraft world inventory: Dict = {}, # the initial inventory of the agent preferred_spawn_biome: Optional[str] = None, # the preferred spawn biome when call reset num_empty_frames: int = 20, # the number of empty frames to skip when calling reset callbacks: List[MinecraftCallback] = [], # the callbacks to be called before and after each basic calling camera_config:CameraConfig=None, # the configuration for camera quantization and binning settings **kwargs ) -> Any: super().__init__() check_engine() self.obs_size = obs_size self.action_type = action_type self.render_size = render_size self.seed = seed self.num_empty_frames = num_empty_frames self.callbacks = callbacks self.callback_messages = set() # record messages from callbacks, for example the help messages self.env = HumanSurvival( fov_range = [70, 70], gamma_range = [2, 2], guiscale_range = [1, 1], cursor_size_range = [16.0, 16.0], frameskip = 1, resolution = render_size, inventory = inventory, preferred_spawn_biome = preferred_spawn_biome, ).make() self.env.seed(seed) self.already_reset = False if camera_config is None: camera_config = CameraConfig() self.action_mapper = CameraHierarchicalMapping(n_camera_bins = camera_config.n_camera_bins) self.action_transformer = ActionTransformer(**camera_config.action_transformer_kwargs)
[docs] def agent_action_to_env_action(self, action: Dict[str, Any]): """Converts an agent action to an environment action. :param action: The agent action. :returns: The environment action. """ #! This is quite important step (for some reason). #! For the sake of your sanity, remember to do this step (manual conversion to numpy) #! before proceeding. Otherwise, your agent might be a little derp. if isinstance(action, tuple): action = { 'buttons': action[0], 'camera': action[1], } # Second, convert the action to the type of numpy if isinstance(action["buttons"], torch.Tensor): action = { "buttons": action["buttons"].cpu().numpy(), "camera": action["camera"].cpu().numpy() } action = self.action_mapper.to_factored(action) action = self.action_transformer.policy2env(action) return action
[docs] def env_action_to_agent_action(self, action: Dict[str, Any]): """Converts an environment action to an agent action. :param action: The environment action. :returns: The agent action. """ action = self.action_transformer.env2policy(action) action = self.action_mapper.from_factored(action) return action
[docs] def step(self, action: Dict[str, Any]) -> Tuple[np.ndarray, float, bool, bool, Dict[str, Any]]: """Runs one timestep of the environment's dynamics. :param action: The action to take. :returns: A tuple containing the observation, reward, terminated flag, truncated flag, and info dictionary. """ if self.action_type == 'agent': env_action = self.agent_action_to_env_action(action) action.pop('buttons') action.pop('camera') action.update(env_action) for callback in self.callbacks: action = callback.before_step(self, action) obs, reward, done, info = self.env.step(action.copy()) terminated, truncated = done, done obs, info = self._wrap_obs_info(obs, info) for callback in self.callbacks: obs, reward, terminated, truncated, info = callback.after_step(self, obs, reward, terminated, truncated, info) self.obs, self.info = obs, info return obs, reward, terminated, truncated, info
[docs] def reset(self) -> Tuple[np.ndarray, Dict]: """Resets the environment to an initial state and returns the initial observation and info. :returns: A tuple containing the initial observation and info dictionary. """ reset_flag = True for callback in self.callbacks: reset_flag = callback.before_reset(self, reset_flag) if reset_flag: # hard reset self.env.reset() self.already_reset = True for _ in range(self.num_empty_frames): # skip the frames to avoid the initial black screen action = self.env.action_space.no_op() obs, reward, done, info = self.env.step(action) obs, info = self._wrap_obs_info(obs, info) for callback in self.callbacks: print(callback) obs, info = callback.after_reset(self, obs, info) self.obs, self.info = obs, info return obs, info
def _wrap_obs_info(self, obs: Dict, info: Dict) -> Dict: """Wraps the observation and info dictionaries in origin MineRL sim. :param obs: The observation dictionary. :param info: The info dictionary. :returns: sA tuple containing the wrapped observation and info dictionaries. """ _info = info.copy() _info.update(obs) _obs = {'image': cv2.resize(obs['pov'], dsize=self.obs_size, interpolation=cv2.INTER_LINEAR)} if getattr(self, 'info', None) is None: self.info = {} for key, value in _info.items(): self.info[key] = value _info = self.info.copy() return _obs, _info
[docs] def noop_action(self) -> Dict[str, Any]: """Returns a no-op action for the current action type. :returns: A no-op action. """ if self.action_type == 'agent': return { "buttons": np.array([0]), "camera": np.array([60]), } else: return self.env.action_space.no_op()
[docs] def close(self) -> None: """Performs any necessary cleanup. :returns: The close status from the underlying environment. """ for callback in self.callbacks: callback.before_close(self) close_status = self.env.close() for callback in self.callbacks: callback.after_close(self) return close_status
[docs] def render(self) -> None: """Renders the environment. :returns: The rendered image. """ image = self.obs['image'] for callback in self.callbacks: image = callback.before_render(self, image) #! core logic for callback in self.callbacks: image = callback.after_render(self, image) return image
@property def action_space(self) -> spaces.Dict: """The action space of the environment.""" if self.action_type == 'agent': return gymnasium.spaces.Dict({ "buttons": gymnasium.spaces.MultiDiscrete([8641]), "camera": gymnasium.spaces.MultiDiscrete([121]), }) elif self.action_type == 'env': return gymnasium.spaces.Dict({ 'attack': gymnasium.spaces.Discrete(2), 'back': gymnasium.spaces.Discrete(2), 'forward': gymnasium.spaces.Discrete(2), 'jump': gymnasium.spaces.Discrete(2), 'left': gymnasium.spaces.Discrete(2), 'right': gymnasium.spaces.Discrete(2), 'sneak': gymnasium.spaces.Discrete(2), 'sprint': gymnasium.spaces.Discrete(2), 'use': gymnasium.spaces.Discrete(2), 'hotbar.1': gymnasium.spaces.Discrete(2), 'hotbar.2': gymnasium.spaces.Discrete(2), 'hotbar.3': gymnasium.spaces.Discrete(2), 'hotbar.4': gymnasium.spaces.Discrete(2), 'hotbar.5': gymnasium.spaces.Discrete(2), 'hotbar.6': gymnasium.spaces.Discrete(2), 'hotbar.7': gymnasium.spaces.Discrete(2), 'hotbar.8': gymnasium.spaces.Discrete(2), 'hotbar.9': gymnasium.spaces.Discrete(2), 'inventory': gymnasium.spaces.Discrete(2), 'camera': gymnasium.spaces.Box(low=-180, high=180, shape=(2,), dtype=np.float32), }) else: raise ValueError(f"Unknown action type: {self.action_type}") @property def observation_space(self) -> spaces.Dict: """The observation space of the environment.""" height, width = self.obs_size return gymnasium.spaces.Dict({ "image": gymnasium.spaces.Box(low=0, high=255, shape=(height, width, 3), dtype=np.uint8) })
if __name__ == '__main__': # test if the simulator works parser = argparse.ArgumentParser() parser.add_argument('-y', '--yes', action='store_true', help='Skip confirmation', default=False) args = parser.parse_args() if args.yes: check_engine(skip_confirmation=True) from minestudio.simulator.callbacks import SpeedTestCallback sim = MinecraftSim( action_type="env", callbacks=[SpeedTestCallback(50)] ) obs, info = sim.reset() for i in range(100): action = sim.action_space.sample() obs, reward, terminated, truncated, info = sim.step(action) sim.close()