# Copyright 2023 OmniSafe Team. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""World model of the Safety Gymnasium."""
from __future__ import annotations
from typing import Any, ClassVar
import gymnasium
import numpy as np
import safety_gymnasium
import torch
from omnisafe.envs.core import CMDP, env_register
from omnisafe.typing import Box, OmnisafeSpace
[docs]@env_register
class SafetyGymnasiumModelBased(CMDP): # pylint: disable=too-many-instance-attributes
"""Safety Gymnasium environment for Model-based algorithms.
Attributes:
_support_envs (list[str]): List of supported environments.
need_auto_reset_wrapper (bool): Whether to use auto reset wrapper.
need_time_limit_wrapper (bool): Whether to use time limit wrapper.
"""
need_auto_reset_wrapper = False
need_time_limit_wrapper = False
_support_envs: ClassVar[list[str]] = [
'SafetyPointGoal0-v0-modelbased',
'SafetyPointGoal1-v0-modelbased',
'SafetyCarGoal0-v0-modelbased',
'SafetyCarGoal1-v0-modelbased',
'SafetyAntGoal0-v0-modelbased',
'SafetyAntGoal1-v0-modelbased',
]
def __init__(
self,
env_id: str,
num_envs: int = 1,
device: str = 'cpu',
use_lidar: bool = False,
**kwargs: Any,
) -> None:
"""Initialize the environment.
Args:
env_id (str): Environment id.
num_envs (int, optional): Number of environments. Defaults to 1.
device (torch.device, optional): Device to store the data. Defaults to 'cpu'.
use_lidar (bool, optional): Whether to use lidar observation. Defaults to False.
Keyword Args:
render_mode (str, optional): The render mode, ranging from ``human``, ``rgb_array``, ``rgb_array_list``.
Defaults to ``rgb_array``.
camera_name (str, optional): The camera name.
camera_id (int, optional): The camera id.
width (int, optional): The width of the rendered image. Defaults to 256.
height (int, optional): The height of the rendered image. Defaults to 256.
"""
super().__init__(env_id)
self._use_lidar = use_lidar
if num_envs == 1:
self._env = safety_gymnasium.make(
id=env_id.replace('-modelbased', ''),
autoreset=False,
**kwargs,
)
assert isinstance(self._env.action_space, Box), 'Only support Box action space.'
assert isinstance(
self._env.observation_space,
Box,
), 'Only support Box observation space.'
self._action_space = self._env.action_space
else:
raise NotImplementedError
self._device = torch.device(device)
self._num_envs = num_envs
self._metadata = self._env.metadata
self._constraints: list[str] = ['hazards'] # gremlins, vase, buttons
self._xyz_sensors: list[str] = ['velocimeter', 'accelerometer']
self._angle_sensors: list[str] = ['gyro', 'magnetometer']
self._flatten_order: list[str] = (
self._xyz_sensors
+ self._angle_sensors
+ ['goal']
+ self._constraints
+ ['robot_m']
+ ['robot']
)
self._base_state: list[str] = self._xyz_sensors + self._angle_sensors
self._task: str = 'Goal'
self._env.reset()
self.goal_position: np.ndarray = self._env.task.goal.pos
self.robot_position: np.ndarray = self._env.task.agent.pos
self.hazards_position: list[np.ndarray] = self._env.task.hazards.pos
self.goal_distance: float = self._dist_xy(self.robot_position, self.goal_position)
coordinate_sensor_obs: dict[str, Any] = self._get_coordinate_sensor()
self._coordinate_obs_size: int = sum(
np.prod(i.shape) for i in list(coordinate_sensor_obs.values())
)
offset: int = 0
self.key_to_slice: dict[str, slice] = {}
self.key_to_slice_tensor: dict[str, torch.Tensor] = {}
for k in self._flatten_order:
k_size = np.prod(coordinate_sensor_obs[k].shape)
self.key_to_slice[k] = slice(offset, offset + k_size)
self.key_to_slice_tensor[k] = torch.arange(offset, offset + k_size)
offset += k_size
self._base_state_size: int = sum(
np.prod(coordinate_sensor_obs[k].shape) for k in list(self._base_state)
)
self.key_to_slice['base_state'] = slice(0, self._base_state_size)
self.key_to_slice_tensor['base_state'] = torch.arange(0, self._base_state_size)
self._num_lidar_bin: int = 16
self._max_lidar_dist: int = 3
self.hazards_size: float = 0.2
self.goal_size: float = 0.3
self.original_observation_space: OmnisafeSpace = self._env.observation_space
self.coordinate_observation_space: OmnisafeSpace = gymnasium.spaces.Box(
-np.inf,
np.inf,
(self._coordinate_obs_size,),
dtype=np.float32,
)
flat_coordinate_obs = self._get_flat_coordinate(coordinate_sensor_obs)
self.lidar_observation_space: OmnisafeSpace = gymnasium.spaces.Box(
-np.inf,
np.inf,
(self.get_lidar_from_coordinate(flat_coordinate_obs).shape[0],),
dtype=np.float32,
)
if self._use_lidar:
self._observation_space = self.lidar_observation_space
else:
self._observation_space = self.coordinate_observation_space
@property
def task(self) -> str:
"""The name of the task."""
return self._task
[docs] def get_cost_from_obs_tensor(self, obs: torch.Tensor, is_binary: bool = True) -> torch.Tensor:
"""Get batch cost from batch observation.
Args:
obs (torch.Tensor): Batch observation.
is_binary (bool, optional): Whether to use binary cost. Defaults to True.
Returns:
cost: Batch cost.
"""
assert torch.is_tensor(obs), 'obs must be tensor'
assert len(obs.shape) == 2 or len(obs.shape) == 3
hazards_key = self.key_to_slice_tensor['hazards']
if len(obs.shape) == 2:
batch_size = obs.shape[0]
hazard_obs = obs[:, hazards_key].reshape(batch_size, -1, 2)
elif len(obs.shape) == 3:
batch_size = obs.shape[0] * obs.shape[1]
hazard_obs = obs[:, :, hazards_key].reshape(batch_size, -1, 2)
else:
raise RuntimeError('observation size mismatch')
hazards_dist = torch.sqrt(
torch.sum(torch.square(hazard_obs), dim=2),
).reshape(
batch_size,
-1,
)
if is_binary:
cost = torch.where(hazards_dist <= self.hazards_size, 1.0, 0.0)
cost = cost.sum(1)
cost = torch.where(cost >= 1, 1.0, 0.0)
else:
cost = ((hazards_dist < self.hazards_size) * (self.hazards_size - hazards_dist)).sum(
1,
) * 10
if len(obs.shape) == 2:
cost = cost.reshape(obs.shape[0], 1)
elif len(obs.shape) == 3:
cost = cost.reshape(obs.shape[0], obs.shape[1], 1)
return cost
[docs] def get_lidar_from_coordinate(self, obs: np.ndarray) -> torch.Tensor:
"""Get lidar observation.
Args:
obs (np.ndarray): The observation.
Returns:
lidar_obs: The lidar observation.
"""
robot_matrix_x_y = obs[self.key_to_slice['robot_m']]
robot_matrix_x = float(robot_matrix_x_y[0])
robot_matrix_y = float(robot_matrix_x_y[1])
first_row = [robot_matrix_x, robot_matrix_y, 0.0]
second_row = [-robot_matrix_y, robot_matrix_x, 0.0]
third_row = [0.0, 0.0, 1.0]
robot_matrix = [first_row, second_row, third_row]
robot_pos = obs[self.key_to_slice['robot']]
hazards_lidar_vec = self._obs_lidar_pseudo(robot_matrix, robot_pos, self.hazards_position)
goal_lidar_vec = self._obs_lidar_pseudo(robot_matrix, robot_pos, [self.goal_position])
base_state_vec = obs[self.key_to_slice['base_state']]
obs_vec = list(base_state_vec) + list(hazards_lidar_vec) + list(goal_lidar_vec)
obs_vec = np.array(obs_vec)
return torch.as_tensor(obs_vec, dtype=torch.float32, device=self._device).unsqueeze(0)
[docs] def _ego_xy(
self,
robot_matrix: list[list[float]],
robot_pos: np.ndarray,
pos: np.ndarray,
) -> np.ndarray:
"""Return the egocentric XY vector to a position from the robot.
Args:
robot_matrix (list[list[float]]): 3x3 rotation matrix.
robot_pos (np.ndarray): 2D robot position.
pos (np.ndarray): 2D position.
Returns:
2D_egocentric_vector: The 2D egocentric vector.
"""
assert pos.shape == (2,), f'Bad pos {pos}'
assert robot_pos.shape == (2,), f'Bad robot_pos {robot_pos}'
robot_3vec = robot_pos
robot_mat = robot_matrix
pos_3vec = np.concatenate([pos, [0]]) # add a zero z-coordinate
robot_3vec = np.concatenate([robot_3vec, [0]])
world_3vec = pos_3vec - robot_3vec
return np.matmul(world_3vec, robot_mat)[:2]
[docs] def _obs_lidar_pseudo(
self,
robot_matrix: list[list[float]],
robot_pos: np.ndarray,
positions: list[np.ndarray],
) -> np.ndarray: # pylint: disable=too-many-locals
"""Return a robot-centric lidar observation of a list of positions.
Lidar is a set of bins around the robot (divided evenly in a circle).
The detection directions are exclusive and exhaustive for a full 360 view.
Each bin reads 0 if there are no objects in that direction.
If there are multiple objects, the distance to the closest one is used.
Otherwise the bin reads the fraction of the distance towards the robot.
E.g. if the object is 90% of lidar_max_dist away, the bin will read 0.1,
and if the object is 10% of lidar_max_dist away, the bin will read 0.9.
(The reading can be thought of as "closeness" or inverse distance)
This encoding has some desirable properties:
- bins read 0 when empty
- bins smoothly increase as objects get close
- maximum reading is 1.0 (where the object overlaps the robot)
- close objects occlude far objects
- constant size observation with variable numbers of objects
Args:
robot_matrix (list[list[float]]): 3x3 rotation matrix.
robot_pos (np.ndarray): 2D robot position.
positions (list[np.ndarray]): 2D positions.
Returns:
lidar_observation: The lidar observation.
"""
obs = np.zeros(self._num_lidar_bin)
for pos in positions:
pos = np.asarray(pos)
if pos.shape == (3,):
pos = pos[:2] # Truncate Z coordinate
position_z = complex(
*self._ego_xy(robot_matrix, robot_pos, pos),
) # X, Y as real, imaginary components
dist = np.abs(position_z)
angle = np.angle(position_z) % (np.pi * 2)
bin_size = (np.pi * 2) / self._num_lidar_bin
sensor_bin = int(angle / bin_size)
bin_angle = bin_size * sensor_bin
sensor = max(0, self._max_lidar_dist - dist) / self._max_lidar_dist
obs[sensor_bin] = max(obs[sensor_bin], sensor)
# Aliasing
alias = (angle - bin_angle) / bin_size
assert (
0 <= alias <= 1
), f'bad alias {alias}, dist {dist}, angle {angle}, bin {sensor_bin}'
bin_plus = (sensor_bin + 1) % self._num_lidar_bin
bin_minus = (sensor_bin - 1) % self._num_lidar_bin
obs[bin_plus] = max(obs[bin_plus], alias * sensor)
obs[bin_minus] = max(obs[bin_minus], (1 - alias) * sensor)
return obs
[docs] def _get_flat_coordinate(self, coordinate_obs: dict[str, Any]) -> np.ndarray:
"""Get the flattened obs.
Args:
coordinate_obs (dict[str, Any]): The dict of coordinate and sensor observations.
Returns:
flat_obs: The flattened observation.
"""
assert (
self.coordinate_observation_space.shape is not None
), 'Bad coordinate_observation_space'
flat_obs = np.zeros(self.coordinate_observation_space.shape[0])
for k in self._flatten_order:
idx = self.key_to_slice[k]
flat_obs[idx] = coordinate_obs[k].flat
return flat_obs
[docs] def _get_coordinate_sensor(self) -> dict[str, Any]:
"""Return the coordinate observation and sensor observation.
We will ignore the z-axis coordinates in every poses.
The returned obs coordinates are all in the robot coordinates.
Returns:
coordinate_obs: The coordinate observation.
"""
obs = {}
robot_matrix = self._env.task.agent.mat
obs['robot_m'] = np.array(robot_matrix[0][:2])
robot_pos = self._env.task.agent.pos
goal_pos = self._env.task.goal.pos
hazards_pos_list = self._env.task.hazards.pos # list of shape (3,) ndarray
ego_goal_pos = self._ego_xy(robot_matrix, robot_pos[:2], goal_pos[:2])
ego_hazards_pos_list = [
self._ego_xy(robot_matrix, robot_pos[:2], pos[:2]) for pos in hazards_pos_list
] # list of shape (2,) ndarray
# append obs to the dict
for sensor in self._xyz_sensors: # explicitly listed sensors
if sensor == 'accelerometer':
obs[sensor] = self._env.task.agent.get_sensor(sensor)[:1] # only x axis matters
elif sensor == 'ballquat_rear':
obs[sensor] = self._env.task.agent.get_sensor(sensor)
else:
obs[sensor] = self._env.task.agent.get_sensor(sensor)[:2] # only x,y axis matters
for sensor in self._angle_sensors:
if sensor == 'gyro':
obs[sensor] = self._env.task.agent.get_sensor(sensor)[
2:
] # [2:] # only z axis matters
# pass # gyro does not help
else:
obs[sensor] = self._env.task.agent.get_sensor(sensor)
# --------modification-----------------
obs['robot'] = np.array(robot_pos[:2])
obs['hazards'] = np.array(ego_hazards_pos_list) # (hazard_num, 2)
obs['goal'] = ego_goal_pos # (2,)
# obs['vases'] = np.array(ego_vases_pos_list) # (vase_num, 2)
return obs
[docs] def _dist_xy(
self,
pos1: np.ndarray | list[np.ndarray],
pos2: np.ndarray | list[np.ndarray],
) -> float:
"""Return the distance from the robot to an XY position.
Args:
pos1 (np.ndarray | list[np.ndarray]): The first position.
pos2 (np.ndarray | list[np.ndarray]): The second position.
Returns:
distance: The distance between the two positions.
"""
pos1 = np.asarray(pos1)
pos2 = np.asarray(pos2)
if pos1.shape == (3,):
pos1 = pos1[:2]
if pos2.shape == (3,):
pos2 = pos2[:2]
return np.sqrt(np.sum(np.square(pos1 - pos2)))
[docs] def step(
self,
action: torch.Tensor,
) -> tuple[
torch.Tensor,
torch.Tensor,
torch.Tensor,
torch.Tensor,
torch.Tensor,
dict[str, Any],
]:
"""Step the environment.
.. note::
OmniSafe use auto reset wrapper to reset the environment when the episode is
terminated. So the ``obs`` will be the first observation of the next episode.
And the true ``final_observation`` in ``info`` will be stored in the ``final_observation`` key of ``info``.
Args:
action (torch.Tensor): Action to take.
Returns:
observation: The agent's observation of the current environment.
reward: The amount of reward returned after previous action.
cost: The amount of cost returned after previous action.
terminated: Whether the episode has ended.
truncated: Whether the episode has been truncated due to a time limit.
info: Some information logged by the environment.
"""
obs_original, reward, cost, terminated, truncated, info = self._env.step(
action.detach().cpu().numpy(),
)
if self._task == 'Goal':
info['old_goal_distance'] = self.goal_distance
self.robot_position = self._env.task.agent.pos
self.goal_distance = self._dist_xy(self.robot_position, self.goal_position)
info['goal_distance'] = self.goal_distance
coordinate_sensor_obs = self._get_coordinate_sensor()
obs_np = self._get_flat_coordinate(coordinate_sensor_obs)
obs = torch.as_tensor(obs_np, dtype=torch.float32, device=self._device)
info['obs_original'] = obs_original
goal_met = 'goal_met' in info # reach the goal
info['goal_met'] = goal_met
obs, reward, cost, terminated, truncated = (
torch.as_tensor(x, dtype=torch.float32, device=self._device)
for x in (obs, reward, cost, terminated, truncated)
)
if 'final_observation' in info:
info['final_observation'] = np.array(
[
array if array is not None else np.zeros(obs.shape[-1])
for array in info['final_observation']
],
)
info['final_observation'] = torch.as_tensor(
info['final_observation'],
dtype=torch.float32,
device=self._device,
)
return obs, reward, cost, terminated, truncated, info
@property
def max_episode_steps(self) -> int:
"""The max steps per episode."""
return self._env.env.spec.max_episode_steps
[docs] def reset(
self,
seed: int | None = None,
options: dict[str, Any] | None = None,
) -> tuple[torch.Tensor, dict[str, Any]]:
"""Reset the environment.
Args:
seed (int, optional): The random seed. Defaults to None.
options (dict[str, Any], optional): The options for the environment. Defaults to None.
Returns:
observation: The initial observation of the space.
info: Some information logged by the environment.
"""
obs_original, info = self._env.reset(seed=seed, options=options)
if self._task == 'Goal':
self.goal_position = self._env.task.goal.pos
self.robot_position = self._env.task.agent.pos
self.hazards_position = self._env.task.hazards.pos
self.goal_distance = self._dist_xy(self.robot_position, self.goal_position)
coordinate_sensor_obs = self._get_coordinate_sensor()
flat_coordinate_obs = self._get_flat_coordinate(coordinate_sensor_obs)
self.get_lidar_from_coordinate(flat_coordinate_obs)
info['obs_original'] = obs_original
info['goal_met'] = False
obs = torch.as_tensor(flat_coordinate_obs, dtype=torch.float32, device=self._device)
return obs, info # pylint: disable=possibly-used-before-assignment
[docs] def set_seed(self, seed: int) -> None:
"""Set the seed for the environment.
Args:
seed (int): Seed to set.
"""
self.reset(seed=seed)
[docs] def render(self) -> Any:
"""Render the environment.
Returns:
The rendered frames, we recommend using `np.ndarray` which could construct video by
moviepy.
"""
return self._env.render()
[docs] def close(self) -> None:
"""Close the environment."""
self._env.close()