helios.core.distributed ======================= .. py:module:: helios.core.distributed Classes ------- .. autoapisummary:: helios.core.distributed.DistributedInfo Functions --------- .. autoapisummary:: helios.core.distributed.is_using_torchrun helios.core.distributed.init_dist helios.core.distributed.shutdown_dist helios.core.distributed.is_dist_active helios.core.distributed.get_dist_info helios.core.distributed.get_local_rank helios.core.distributed.get_global_rank helios.core.distributed.gather_into_tensor helios.core.distributed.all_reduce_tensors helios.core.distributed.global_print helios.core.distributed.local_print helios.core.distributed.safe_barrier Module Contents --------------- .. py:function:: is_using_torchrun() -> bool Check if the current process was launched from ``torchrun``. This will check to see if the environment variables that are set by ``torchrun`` exist. The list of variables is taken directly from the documentation and can be seen `here`_. .. _here: https://pytorch.org/docs/stable/elastic/run.html#environment-variables :returns: True if the run was started from ``torchrun``, false otherwise. .. py:function:: init_dist(backend: str = 'nccl', rank: int | None = None, world_size: int | None = None) -> None Initialize the distributed process group. The optional values for ``rank`` and ``world_size`` **must** be omitted if distributed training is handled through ``torchrun``. If distributed training is started manually, then both arguments **must** be provided. :param backend: the backend to use. Defaults to "nccl". :param rank: the (optional) rank of the current GPU. :param world_size: the (optional) number of GPUs in the system. :raises ValueError: if either of ``rank`` or ``world_size`` are ``None`` (but not both). .. py:function:: shutdown_dist() -> None Shutdown the distributed process group. .. py:function:: is_dist_active() -> bool Check if torch.distributed is active. .. py:class:: DistributedInfo Bundle information regarding distributed state. To understand what these mean, consider a run being performed over two nodes, each with 2 GPUs. Suppose we have a single process per GPU. Then the following values are assigned: * ``local_rank``: 0 or 1 for both nodes. 0 for the first GPU, 1 for the second. * ``rank``: 0, 1, 2, 3 for each of the GPUs over the two nodes. * ``local_world_size``: 2 for both nodes. * ``world_size``: 4 (2 nodes with 2 workers per node). :param local_rank: the local rank. :param rank: the global rank. :param local_world_size: the local world size. :param world_size: the global world size. .. py:attribute:: local_rank :type: int :value: 0 .. py:attribute:: rank :type: int :value: 0 .. py:attribute:: local_world_size :type: int :value: 1 .. py:attribute:: world_size :type: int :value: 1 .. py:function:: get_dist_info() -> DistributedInfo Get the distributed state of the current run. If distributed training is not used, then both ranks are set to 0 and both world sizes are set to 1. :returns: The information of the current distributed run. .. py:function:: get_local_rank() -> int Get the local rank of the device the process is running on. If distributed training is not used, 0 is returned. :returns: The local rank of the current device. .. py:function:: get_global_rank() -> int Get the global rank of the device the process is running on. If distributed training is not used, 0 is returned. :returns: The global rank of the current device. .. py:function:: gather_into_tensor(tensor: torch.Tensor, size: tuple[int]) -> torch.Tensor Gathers the tensors across all processes and merges them into a single tensor. :param tensor: the tensor to merge :param size: the dimensions of the output tensor. :returns: The resulting tensor containing all gathered tensors. :raises RuntimeError: if distributed training hasn't been initialised. .. py:function:: all_reduce_tensors(tensor: torch.Tensor | list[torch.Tensor], **kwargs) -> torch.Tensor Reduces tensors across all processes so all have the same value. :param tensor: the input tensor(s) to reduce. If the input is a list of tensors, they will be concatenated into a single tensor. :param kwargs: additional options for torch.distributed.all_reduce :returns: The reduced tensor. :raises RuntimeError: if distributed training has not been initialised. .. py:function:: global_print(*args: Any, global_rank: int = 0, **kwargs: Any) -> None Print wrapper that only prints on the specified global rank. :param \*args: positional arguments to pass in to Python's print. :param global_rank: the global rank the print should happen on. Defaults to 0. :param \*\*kwargs: keyword arguments to pass in to Python's print. .. py:function:: local_print(*args: Any, local_rank: int = 0, **kwargs: Any) -> None Print wrapper that only prints on the specified local rank. :param \*args: positional arguments to pass in to Python's print. :param local_rank: the local rank the print should happen on. Defaults to 0. :param \*\*kwargs: keyword arguments to pass in to Python's print. .. py:function:: safe_barrier(**kwargs: Any) -> None Safe wrapper for torch.distributed.barrier. The wrapper is "safe" in the sense that it is valid to call this function regardless of whether the code is currently using distributed training or not. :param \*\*kwargs: keyword arguments to torch.distributed.barrier.