Source code for omnisafe.utils.tools

# Copyright 2023 OmniSafe Team. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""OmniSafe tools package."""

from __future__ import annotations

import hashlib
import json
import os
import random
import sys
from typing import Any

import numpy as np
import torch
import torch.backends.cudnn
import yaml
from rich.console import Console

from omnisafe.typing import DEVICE_CPU


[docs]def get_flat_params_from(model: torch.nn.Module) -> torch.Tensor: """This function is used to get the flattened parameters from the model. .. note:: Some algorithms need to get the flattened parameters from the model, such as the :class:`TRPO` and :class:`CPO` algorithm. In these algorithms, the parameters are flattened and then used to calculate the loss. Examples: >>> model = torch.nn.Linear(2, 2) >>> model.weight.data = torch.tensor([[1.0, 2.0], [3.0, 4.0]]) >>> get_flat_params_from(model) tensor([1., 2., 3., 4.]) Args: model (torch.nn.Module): model to be flattened. Returns: Flattened parameters. Raises: AssertionError: If no gradients were found in model parameters. """ flat_params = [] for _, param in model.named_parameters(): if param.requires_grad: data = param.data data = data.view(-1) # flatten tensor flat_params.append(data) assert flat_params, 'No gradients were found in model parameters.' return torch.cat(flat_params)
[docs]def get_flat_gradients_from(model: torch.nn.Module) -> torch.Tensor: """This function is used to get the flattened gradients from the model. .. note:: Some algorithms need to get the flattened gradients from the model, such as the :class:`TRPO` and :class:`CPO` algorithm. In these algorithms, the gradients are flattened and then used to calculate the loss. Args: model (torch.nn.Module): The model to be flattened. Returns: Flattened gradients. Raises: AssertionError: If no gradients were found in model parameters. """ grads = [] for _, param in model.named_parameters(): if param.requires_grad and param.grad is not None: grad = param.grad grads.append(grad.view(-1)) # flatten tensor and append assert grads, 'No gradients were found in model parameters.' return torch.cat(grads)
[docs]def set_param_values_to_model(model: torch.nn.Module, vals: torch.Tensor) -> None: """This function is used to set the parameters to the model. .. note:: Some algorithms (e.g. TRPO, CPO, etc.) need to set the parameters to the model, instead of using the ``optimizer.step()``. Examples: >>> model = torch.nn.Linear(2, 2) >>> model.weight.data = torch.tensor([[1.0, 2.0], [3.0, 4.0]]) >>> vals = torch.tensor([1.0, 2.0, 3.0, 4.0]) >>> set_param_values_to_model(model, vals) >>> model.weight.data tensor([[1., 2.], [3., 4.]]) Args: model (torch.nn.Module): The model to be set. vals (torch.Tensor): The parameters to be set. Raises: AssertionError: If the instance of the parameters is not ``torch.Tensor``, or the lengths of the parameters and the model parameters do not match. """ assert isinstance(vals, torch.Tensor) i: int = 0 for _, param in model.named_parameters(): if param.requires_grad: # param has grad and, hence, must be set orig_size = param.size() size = np.prod(list(param.size())) new_values = vals[i : int(i + size)] # set new param values new_values = new_values.view(orig_size) param.data = new_values i += int(size) # increment array position assert i == len(vals), f'Lengths do not match: {i} vs. {len(vals)}'
[docs]def seed_all(seed: int) -> None: """This function is used to set the random seed for all the packages. .. hint:: To reproduce the results, you need to set the random seed for all the packages. Including ``numpy``, ``random``, ``torch``, ``torch.cuda``, ``torch.backends.cudnn``. .. warning:: If you want to use the ``torch.backends.cudnn.benchmark`` or ``torch.backends.cudnn.deterministic`` and your ``cuda`` version is over 10.2, you need to set the ``CUBLAS_WORKSPACE_CONFIG`` and ``PYTHONHASHSEED`` environment variables. Args: seed (int): The random seed. """ os.environ['PYTHONHASHSEED'] = str(seed) random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed)
[docs]def custom_cfgs_to_dict(key_list: str, value: Any) -> dict[str, Any]: """This function is used to convert the custom configurations to dict. .. note:: This function is used to convert the custom configurations to dict. For example, if the custom configurations are ``train_cfgs:use_wandb`` and ``True``, then the output dict will be ``{'train_cfgs': {'use_wandb': True}}``. Args: key_list (str): list of keys. value (Any): value. Returns: The converted dict. """ if value == 'True': value = True elif value == 'False': value = False elif '.' in value: value = float(value) elif value.isdigit(): value = int(value) elif value.startswith('[') and value.endswith(']'): value = value[1:-1] value = value.split(',') else: value = str(value) keys_split = key_list.replace('-', '_').split(':') return_dict = {keys_split[-1]: value} for key in reversed(keys_split[:-1]): return_dict = {key.replace('-', '_'): return_dict} return return_dict
[docs]def update_dict(total_dict: dict[str, Any], item_dict: dict[str, Any]) -> None: """Updater of multi-level dictionary. Args: total_dict (dict[str, Any]): The total dictionary. item_dict (dict[str, Any]): The item dictionary. Examples: >>> total_dict = {'a': {'b': 1, 'c': 2}} >>> item_dict = {'a': {'b': 3, 'd': 4}} >>> update_dict(total_dict, item_dict) >>> total_dict {'a': {'b': 3, 'c': 2, 'd': 4}} """ for idd in item_dict: total_value = total_dict.get(idd) item_value = item_dict.get(idd) if total_value is None: total_dict.update({idd: item_value}) elif isinstance(item_value, dict): update_dict(total_value, item_value) total_dict.update({idd: total_value}) else: total_value = item_value total_dict.update({idd: total_value})
[docs]def load_yaml(path: str) -> dict[str, Any]: """Get the default kwargs from ``yaml`` file. .. note:: This function search the ``yaml`` file by the algorithm name and environment name. Make sure your new implemented algorithm or environment has the same name as the yaml file. Args: path (str): The path of the ``yaml`` file. Returns: The default kwargs. Raises: AssertionError: If the ``yaml`` file is not found. """ with open(path, encoding='utf-8') as file: try: kwargs = yaml.load(file, Loader=yaml.FullLoader) # noqa: S506 except FileNotFoundError as exc: raise FileNotFoundError(f'{path} error: {exc}') from exc return kwargs
[docs]def recursive_check_config( config: dict[str, Any], default_config: dict[str, Any], exclude_keys: tuple[str, ...] = (), ) -> None: """Check whether config is valid in default_config. Args: config (dict[str, Any]): The config to be checked. default_config (dict[str, Any]): The default config. exclude_keys (tuple of str, optional): The keys to be excluded. Defaults to (). Raises: AssertionError: If the type of the value is not the same as the default value. KeyError: If the key is not in default_config. """ assert isinstance(config, dict), 'custom_cfgs must be a dict!' for key in config: if key not in default_config and key not in exclude_keys: raise KeyError(f'Invalid key: {key}') if config[key] is None: return if isinstance(config[key], dict) and key != 'env_cfgs': recursive_check_config(config[key], default_config[key])
def assert_with_exit(condition: bool, msg: str) -> None: """Assert with message. Examples: >>> assert_with_exit(1 == 2, '1 must equal to 2') AssertionError: 1 must equal to 2 Args: condition (bool): condition to be checked. msg (str): message to be printed. Raises: AssertionError: If the condition is not satisfied. """ try: assert condition except AssertionError: console = Console() console.print('ERROR: ' + msg, style='bold red') sys.exit(1) def recursive_dict2json(dict_obj: dict[str, Any]) -> str: """This function is used to recursively convert the dict to json. Args: dict_obj (dict[str, Any]): dict to be converted. Returns: The converted json string. Raises: AssertionError: If the instance of the input is not ``dict``. """ assert isinstance(dict_obj, dict), 'Input must be a dict.' flat_dict: dict[str, Any] = {} def _flatten_dict(dict_obj: dict[str, Any] | Any, path: str = '') -> None: if isinstance(dict_obj, dict): for key, value in dict_obj.items(): _flatten_dict(value, path + key + ':') else: flat_dict[path[:-1]] = dict_obj _flatten_dict(dict_obj) return json.dumps(flat_dict, sort_keys=True).replace('"', "'") def hash_string(string: str) -> str: """This function is used to generate the folder name. Args: string (str): string to be hashed. Returns: The hashed string. """ salt = b'\xf8\x99/\xe4\xe6J\xd8d\x1a\x9b\x8b\x98\xa2\x1d\xff3*^\\\xb1\xc1:e\x11M=PW\x03\xa5\\h' # convert string to bytes and add salt salted_string = salt + string.encode('utf-8') # use sha256 to hash hash_object = hashlib.sha256(salted_string) # get the hex digest return hash_object.hexdigest() def get_device(device: torch.device | str | int = DEVICE_CPU) -> torch.device: """Retrieve PyTorch device. It checks that the requested device is available first. For now, it supports only cpu and cuda. By default, it tries to use the gpu. Args: device (torch.device, str, or int, optional): The device to use. Defaults to ``torch.device('cpu')``. Returns: The device to use. """ # Force conversion to torch.device device = torch.device(device) # Cuda not available if not torch.cuda.is_available() and device.type == torch.device('cuda').type: return torch.device('cpu') return device