helios.core.distributed

Classes

DistributedInfo

Bundle information regarding distributed state.

Functions

is_using_torchrun(→ bool)

Check if the current process was launched from torchrun.

init_dist(→ None)

Initialize the distributed process group.

shutdown_dist(→ None)

Shutdown the distributed process group.

is_dist_active(→ bool)

Check if torch.distributed is active.

get_dist_info(→ DistributedInfo)

Get the distributed state of the current run.

get_local_rank(→ int)

Get the local rank of the device the process is running on.

get_global_rank(→ int)

Get the global rank of the device the process is running on.

gather_into_tensor(→ torch.Tensor)

Gathers the tensors across all processes and merges them into a single tensor.

all_reduce_tensors(→ torch.Tensor)

Reduces tensors across all processes so all have the same value.

global_print(→ None)

Print wrapper that only prints on the specified global rank.

local_print(→ None)

Print wrapper that only prints on the specified local rank.

safe_barrier(→ None)

Safe wrapper for torch.distributed.barrier.

Module Contents

helios.core.distributed.is_using_torchrun() bool

Check if the current process was launched from torchrun.

This will check to see if the environment variables that are set by torchrun exist. The list of variables is taken directly from the documentation and can be seen here.

Returns:

True if the run was started from torchrun, false otherwise.

helios.core.distributed.init_dist(backend: str = 'nccl', rank: int | None = None, world_size: int | None = None) None

Initialize the distributed process group.

The optional values for rank and world_size must be omitted if distributed training is handled through torchrun. If distributed training is started manually, then both arguments must be provided.

Parameters:
  • backend – the backend to use. Defaults to “nccl”.

  • rank – the (optional) rank of the current GPU.

  • world_size – the (optional) number of GPUs in the system.

Raises:

ValueError – if either of rank or world_size are None (but not both).

helios.core.distributed.shutdown_dist() None

Shutdown the distributed process group.

helios.core.distributed.is_dist_active() bool

Check if torch.distributed is active.

class helios.core.distributed.DistributedInfo

Bundle information regarding distributed state.

To understand what these mean, consider a run being performed over two nodes, each with 2 GPUs. Suppose we have a single process per GPU. Then the following values are assigned:

  • local_rank: 0 or 1 for both nodes. 0 for the first GPU, 1 for the second.

  • rank: 0, 1, 2, 3 for each of the GPUs over the two nodes.

  • local_world_size: 2 for both nodes.

  • world_size: 4 (2 nodes with 2 workers per node).

Parameters:
  • local_rank – the local rank.

  • rank – the global rank.

  • local_world_size – the local world size.

  • world_size – the global world size.

local_rank: int = 0
rank: int = 0
local_world_size: int = 1
world_size: int = 1
helios.core.distributed.get_dist_info() DistributedInfo

Get the distributed state of the current run.

If distributed training is not used, then both ranks are set to 0 and both world sizes are set to 1.

Returns:

The information of the current distributed run.

helios.core.distributed.get_local_rank() int

Get the local rank of the device the process is running on.

If distributed training is not used, 0 is returned.

Returns:

The local rank of the current device.

helios.core.distributed.get_global_rank() int

Get the global rank of the device the process is running on.

If distributed training is not used, 0 is returned.

Returns:

The global rank of the current device.

helios.core.distributed.gather_into_tensor(tensor: torch.Tensor, size: tuple[int]) torch.Tensor

Gathers the tensors across all processes and merges them into a single tensor.

Parameters:
  • tensor – the tensor to merge

  • size – the dimensions of the output tensor.

Returns:

The resulting tensor containing all gathered tensors.

Raises:

RuntimeError – if distributed training hasn’t been initialised.

helios.core.distributed.all_reduce_tensors(tensor: torch.Tensor | list[torch.Tensor], **kwargs) torch.Tensor

Reduces tensors across all processes so all have the same value.

Parameters:
  • tensor – the input tensor(s) to reduce. If the input is a list of tensors, they will be concatenated into a single tensor.

  • kwargs – additional options for torch.distributed.all_reduce

Returns:

The reduced tensor.

Raises:

RuntimeError – if distributed training has not been initialised.

helios.core.distributed.global_print(*args: Any, global_rank: int = 0, **kwargs: Any) None

Print wrapper that only prints on the specified global rank.

Parameters:
  • *args – positional arguments to pass in to Python’s print.

  • global_rank – the global rank the print should happen on. Defaults to 0.

  • **kwargs – keyword arguments to pass in to Python’s print.

helios.core.distributed.local_print(*args: Any, local_rank: int = 0, **kwargs: Any) None

Print wrapper that only prints on the specified local rank.

Parameters:
  • *args – positional arguments to pass in to Python’s print.

  • local_rank – the local rank the print should happen on. Defaults to 0.

  • **kwargs – keyword arguments to pass in to Python’s print.

helios.core.distributed.safe_barrier(**kwargs: Any) None

Safe wrapper for torch.distributed.barrier.

The wrapper is “safe” in the sense that it is valid to call this function regardless of whether the code is currently using distributed training or not.

Parameters:

**kwargs – keyword arguments to torch.distributed.barrier.