chore: Regenerate all playbooks

This commit is contained in:
GitLab CI 2026-03-16 02:31:10 +00:00
parent b7deea5e18
commit 5beeea4a38
11 changed files with 2119 additions and 0 deletions

View File

@ -34,6 +34,7 @@ Each playbook includes prerequisites, step-by-step instructions, troubleshooting
- [LM Studio on DGX Spark](nvidia/lm-studio/)
- [Build and Deploy a Multi-Agent Chatbot](nvidia/multi-agent-chatbot/)
- [Multi-modal Inference](nvidia/multi-modal-inference/)
- [Connect Multiple Sparks through a Switch](nvidia/multi-sparks-through-switch/)
- [NCCL for Two Sparks](nvidia/nccl/)
- [Fine-tune with NeMo](nvidia/nemo-fine-tune/)
- [Nemotron-3-Nano with llama.cpp](nvidia/nemotron/)

View File

@ -0,0 +1,446 @@
# Connect Multiple Sparks through a Switch
> Connect multiple Spark devices in a cluster and set them up for distributed inference and fine-tuning
## Table of Contents
- [Overview](#overview)
- [Run on Four Sparks](#run-on-four-sparks)
- [Step 3.1. Verify negotiated Link speed](#step-31-verify-negotiated-link-speed)
- [4.1 Script for Cluster networking configuration](#41-script-for-cluster-networking-configuration)
- [4.2 Manual Cluster networking configuration](#42-manual-cluster-networking-configuration)
- [Option 1: Automatically configure SSH](#option-1-automatically-configure-ssh)
- [Option 2: Manually discover and configure SSH](#option-2-manually-discover-and-configure-ssh)
- [Troubleshooting](#troubleshooting)
---
## Overview
## Basic idea
Configure four DGX Spark systems for high-speed inter-node communication using 200Gbps QSFP connections through a QSFP switch. This setup enables distributed workloads across multiple DGX Spark nodes by establishing network connectivity and configuring SSH authentication.
## What you will accomplish
You will physically connect four DGX Spark devices with QSFP cables and a QSFP switch, configure network interfaces for cluster communication, and establish passwordless SSH between nodes to create a functional distributed computing environment.
## What to know before starting
- Basic understanding of distributed computing concepts
- Working with network interface configuration and netplan
- Experience with SSH key management
## Prerequisites
- Four DGX Spark systems (these instructions will also work with two or three nodes cluster with a switch)
- QSFP switch with at least 4 QSFP56-DD ports (at least 200Gbps each)
- QSFP cables for 200Gbps connection from the switch to the devices
- One cable per spark
- If the switch has 400Gbps ports then you can also use breakout cables to split them into two 200Gbps ports
- SSH access available to all systems
- Root or sudo access on all systems: `sudo whoami`
- The same username on all systems
- Update all systems to the latest OS and Firmware. Refer to the DGX Spark documentation https://docs.nvidia.com/dgx/dgx-spark/os-and-component-update.html
## Ancillary files
All required files for this playbook can be found [here on GitHub](https://github.com/NVIDIA/dgx-spark-playbooks/blob/main/nvidia/multi-sparks-through-switch/)
- [**discover-sparks.sh**](https://github.com/NVIDIA/dgx-spark-playbooks/blob/main/nvidia/connect-two-sparks/assets/discover-sparks) script for automatic node discovery and SSH key distribution
## Time & risk
- **Duration:** 1 hour including validation
- **Risk level:** Medium - involves network reconfiguration
- **Rollback:** Network changes can be reversed by removing netplan configs or IP assignments
- **Last Updated:** 3/12/2026
* First publication
## Run on Four Sparks
## Step 1. Ensure Same Username on all four Systems
On all four systems check the username and make sure it's the same:
```bash
## Check current username
whoami
```
If usernames don't match, create a new user (e.g., nvidia) on all four systems and login in with the new user:
```bash
## Create nvidia user and add to sudo group
sudo useradd -m nvidia
sudo usermod -aG sudo nvidia
## Set password for nvidia user
sudo passwd nvidia
## Switch to nvidia user
su - nvidia
```
## Step 2. Switch Management
Most QSFP switches offer some form of management interface, either through CLI or UI. Refer to the documentation and connect to the management interface. Make sure that the ports on the switch are enabled. For connecting four sparks, you will need to ensure that the switch is configured to provide 200Gbps connection to each DGX Spark.
## Step 3. Physical Hardware Connection
Connect the QSFP cables between DGX Spark systems and the switch(QSFP56-DD/QSFP56 ports) using one CX7 port on each system. It is recommended to use the same CX7 port on all Spark systems for easier network configuration and avoiding NCCL test failures. In this playbook the second port (the one further from the ethernet port) is used. This should establish the 200Gbps connection required for high-speed inter-node communication. You will see an output like the one below on all four sparks. In this example the interfaces showing as 'Up' are **enp1s0f1np1** and **enP2p1s0f1np1** (each physical port has two logical interfaces).
Example output:
```bash
## Check QSFP interface availability on all nodes
nvidia@dxg-spark-1:~$ ibdev2netdev
rocep1s0f0 port 1 ==> enp1s0f0np0 (Down)
rocep1s0f1 port 1 ==> enp1s0f1np1 (Up)
roceP2p1s0f0 port 1 ==> enP2p1s0f0np0 (Down)
roceP2p1s0f1 port 1 ==> enP2p1s0f1np1 (Up)
```
> [!NOTE]
> If none of the interfaces are showing as 'Up', please check the QSFP cable connection, reboot the systems and try again.
> The interfaces showing as 'Up' depend on which port you are using to connect the nodes to the switch. Each physical port has two logical interfaces, for example, Port 1 has two interfaces - enp1s0f1np1 and enP2p1s0f1np1. Please disregard enp1s0f0np0 and enP2p1s0f0np0, and use enp1s0f1np1 and enP2p1s0f1np1 only.
### Step 3.1. Verify negotiated Link speed
The link speed might not default to 200Gbps with auto-negotiation. To confirm, run the command below on all sparks and check that the speed is shown as 200000Mb/s. If it shows lesser than that value, then the link speed needs to be set to 200Gbps manually in the switch port configuration. Refer to the switch's manual/documentation to disable auto-negotiation and set the link speed manually to 200Gbps (eg. 200G-baseCR4)
Example output:
```bash
nvidia@dxg-spark-1:~$ ethtool enp1s0f1np1 | grep Speed
Speed: 100000Mb/s
nvidia@dxg-spark-1:~$ ethtool enP2p1s0f1np1 | grep Speed
Speed: 100000Mb/s
```
After setting the correct speed on the switch ports. Verify the link speed on all the DGX Sparks again.
Example output:
```bash
nvidia@dxg-spark-1:~$ ethtool enp1s0f1np1 | grep Speed
Speed: 200000Mb/s
nvidia@dxg-spark-1:~$ ethtool enP2p1s0f1np1 | grep Speed
Speed: 200000Mb/s
```
## Step 4. Network Interface Configuration
> [!NOTE]
> Full bandwidth can be achieved with just one QSFP cable.
For a clustered setup, all DGX sparks:
1. Should be able to talk to each other using TCP/IP over CX7.
2. Should be accessible for management (eg. SSH and run commands)
3. Should be able to access internet (eg. to download models/utilities)
It is recommended to use the Ethernet/WiFi network for management and internet traffic and keep it separate from the CX7 network to avoid CX7 bandwidth from being used for non-workload traffic.
The supported way to configure a cluster with switch requires configuring a bridge (or using the default bridge) on the switch and adding all the ports of interest (ports connected to DGX sparks) to it through the switch management interface.
1. This way, all ports are part of a single layer-2 domain which is required for cluster networking configuration
2. Some switches have restriction that Hardware offloading can only be enabled on one bridge, so keeping all ports in a single bridge is required
Once you are done creating/adding ports to the bridge, you should be ready to configure networking on the DGX Spark side.
### 4.1 Script for Cluster networking configuration
We have created a script [here on GitHub](https://github.com/NVIDIA/dgx-spark-playbooks/blob/main/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup) which automates the following:
1. Interface network configuration for all DGX Sparks
2. Set up password-less authentication between the DGX Sparks
3. Verify multi-node communication
4. Run NCCL Bandwidth tests
> [!NOTE]
> You can use the script or continue with the manual configurations in the following sections. If you use the script, you can skip the rest of the setup sections in this playbook.
Use the steps below to run the script:
```bash
## Clone the repository
git clone https://github.com/NVIDIA/dgx-spark-playbooks
## Enter the script directory
cd dgx-spark-playbooks/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup
## Check the README.md for steps to run the script and configure the cluster networking
```
### 4.2 Manual Cluster networking configuration
In this case, you can choose one of the options to assign the IPs to the CX7 logical interfaces. Options 1, 2 and 3 are mutually exclusive.
1. DHCP server on the switch (recommended, if it is supported)
2. Link local IP addressing (netplan is the same across all nodes)
3. Manual IP addressing (netplan will be different on each node but provides more control and deterministic IPs)
#### Option 1: Configure DHCP server on the switch
1. Configure the DHCP server on the switch with a subnet large enough to assign IPs to all sparks. A /24 subnet should work well for configuration and any future expansion.
2. Configure the 'UP' CX7 interfaces in the DGX sparks to acquire IP using DHCP. For eg. if the logical interfaces **enp1s0f1np1** / **enP2p1s0f1np1** are 'UP' then create a netplan like below on all sparks.
```bash
## Create the netplan configuration file
sudo tee /etc/netplan/40-cx7.yaml > /dev/null <<EOF
network:
version: 2
ethernets:
enp1s0f1np1:
dhcp4: true
enP2p1s0f1np1:
dhcp4: true
EOF
## Set appropriate permissions
sudo chmod 600 /etc/netplan/40-cx7.yaml
## Apply the configuration
sudo netplan apply
```
3. Confirm that the interfaces get IPs assigned
```bash
## In this example, we are using interface enp1s0f1np1. Similarly check enP2p1s0f1np1.
nvidia@dgx-spark-1:~$ ip addr show enp1s0f1np1 | grep -w inet
inet 100.100.100.4/24 brd 100.100.100.255 scope global noprefixroute enp1s0f1np1
```
#### Option 2: Automatic Link local IP Assignment
Configure network interfaces using netplan on all DGX Spark nodes for automatic link-local addressing:
```bash
## Create the netplan configuration file
sudo tee /etc/netplan/40-cx7.yaml > /dev/null <<EOF
network:
version: 2
ethernets:
enp1s0f1np1:
link-local: [ ipv4 ]
enP2p1s0f1np1:
link-local: [ ipv4 ]
EOF
## Set appropriate permissions
sudo chmod 600 /etc/netplan/40-cx7.yaml
## Apply the configuration
sudo netplan apply
```
#### Option 3: Manual IP Assignment with the netplan configuration file
On node 1:
```bash
## Create the netplan configuration file
sudo tee /etc/netplan/40-cx7.yaml > /dev/null <<EOF
network:
version: 2
ethernets:
enp1s0f1np1:
addresses:
- 192.168.100.10/24
dhcp4: no
enP2p1s0f1np1:
addresses:
- 192.168.100.11/24
dhcp4: no
EOF
## Set appropriate permissions
sudo chmod 600 /etc/netplan/40-cx7.yaml
## Apply the configuration
sudo netplan apply
```
On node 2:
```bash
## Create the netplan configuration file
sudo tee /etc/netplan/40-cx7.yaml > /dev/null <<EOF
network:
version: 2
ethernets:
enp1s0f1np1:
addresses:
- 192.168.100.12/24
dhcp4: no
enP2p1s0f1np1:
addresses:
- 192.168.100.13/24
dhcp4: no
EOF
## Set appropriate permissions
sudo chmod 600 /etc/netplan/40-cx7.yaml
## Apply the configuration
sudo netplan apply
```
On node 3:
```bash
## Create the netplan configuration file
sudo tee /etc/netplan/40-cx7.yaml > /dev/null <<EOF
network:
version: 2
ethernets:
enp1s0f1np1:
addresses:
- 192.168.100.14/24
dhcp4: no
enP2p1s0f1np1:
addresses:
- 192.168.100.15/24
dhcp4: no
EOF
## Set appropriate permissions
sudo chmod 600 /etc/netplan/40-cx7.yaml
## Apply the configuration
sudo netplan apply
```
On node 4:
```bash
## Create the netplan configuration file
sudo tee /etc/netplan/40-cx7.yaml > /dev/null <<EOF
network:
version: 2
ethernets:
enp1s0f1np1:
addresses:
- 192.168.100.16/24
dhcp4: no
enP2p1s0f1np1:
addresses:
- 192.168.100.17/24
dhcp4: no
EOF
## Set appropriate permissions
sudo chmod 600 /etc/netplan/40-cx7.yaml
## Apply the configuration
sudo netplan apply
```
## Step 5. Set up passwordless SSH authentication
### Option 1: Automatically configure SSH
Run the DGX Spark [**discover-sparks.sh**](https://github.com/NVIDIA/dgx-spark-playbooks/blob/main/nvidia/connect-two-sparks/assets/discover-sparks) script from one of the nodes to automatically discover and configure SSH:
```bash
curl -O https://raw.githubusercontent.com/NVIDIA/dgx-spark-playbooks/refs/heads/main/nvidia/connect-two-sparks/assets/discover-sparks
bash ./discover-sparks
```
Expected output similar to the below, with different IPs and node names. The first time you run the script, you'll be prompted for your password for each node.
```
Found: 169.254.35.62 (dgx-spark-1.local)
Found: 169.254.35.63 (dgx-spark-2.local)
Found: 169.254.35.64 (dgx-spark-3.local)
Found: 169.254.35.65 (dgx-spark-4.local)
Setting up bidirectional SSH access (local <-> remote nodes)...
You may be prompted for your password for each node.
SSH setup complete! All local and remote nodes can now SSH to each other without passwords.
```
> [!NOTE]
> If you encounter any errors, please follow Option 2 below to manually configure SSH and debug the issue.
### Option 2: Manually discover and configure SSH
You will need to find the IP addresses for the CX-7 interfaces that are up. On all nodes, run the following command to find the IP addresses and take note of them for the next step.
```bash
ip addr show enp1s0f1np1
```
Example output:
```
## In this example, we are using interface enp1s0f1np1.
nvidia@dgx-spark-1:~$ ip addr show enp1s0f1np1
4: enp1s0f1np1: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP group default qlen 1000
link/ether 3c:6d:66:cc:b3:b7 brd ff:ff:ff:ff:ff:ff
inet **169.254.35.62**/16 brd 169.254.255.255 scope link noprefixroute enp1s0f1np1
valid_lft forever preferred_lft forever
inet6 fe80::3e6d:66ff:fecc:b3b7/64 scope link
valid_lft forever preferred_lft forever
```
In this example, the IP address for Node 1 is **169.254.35.62**. Repeat the process for other nodes.
On all nodes, run the following commands to enable passwordless SSH:
```bash
## Copy your SSH public key to all nodes. Replace the IP addresses with the ones you found in the previous step.
ssh-copy-id -i ~/.ssh/id_rsa.pub <username>@<IP for Node 1>
ssh-copy-id -i ~/.ssh/id_rsa.pub <username>@<IP for Node 2>
ssh-copy-id -i ~/.ssh/id_rsa.pub <username>@<IP for Node 3>
ssh-copy-id -i ~/.ssh/id_rsa.pub <username>@<IP for Node 4>
```
## Step 6. Verify Multi-Node Communication
Test basic multi-node functionality from the head node:
```bash
## Test hostname resolution across nodes
ssh <IP for Node 1> hostname
ssh <IP for Node 2> hostname
ssh <IP for Node 3> hostname
ssh <IP for Node 4> hostname
```
## Step 7. Running Tests and Workloads
Now your cluster is set up to run distributed workloads across four nodes. For example, you can run the [NCCL playbook](https://build.nvidia.com/spark/nccl/stacked-sparks). Wherever the playbook asks to run a command on two nodes, just run it on all four nodes and modify the mpirun command which you run on the head node to use four nodes instead of two.
Example mpirun command for four nodes:
```bash
## Set network interface environment variables (use your Up interface from the previous step)
export UCX_NET_DEVICES=enp1s0f1np1
export NCCL_SOCKET_IFNAME=enp1s0f1np1
export OMPI_MCA_btl_tcp_if_include=enp1s0f1np1
## Run the all_gather performance test across four nodes (replace the IP addresses with the ones you found in the previous step)
mpirun -np 4 -H <IP for Node 1>:1,<IP for Node 2>:1,<IP for Node 3>:1,<IP for Node 4>:1 \
--mca plm_rsh_agent "ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no" \
-x LD_LIBRARY_PATH=$LD_LIBRARY_PATH \
$HOME/nccl-tests/build/all_gather_perf
```
## Step 8. Cleanup and Rollback
> [!WARNING]
> These steps will reset network configuration.
```bash
## Rollback network configuration
sudo rm /etc/netplan/40-cx7.yaml
sudo netplan apply
```
> [!NOTE]
> If disconnecting the switch, then make sure to do the following
> 1. Re-enable auto-negotiation to avoid issues later if the switch is used for different purposes.
> 2. Remove the DHCP server configuration on the switch if you used that to assign IPs to Sparks.
> 3. If you created a new bridge, move the ports back to the default bridge and delete the new bridge.
## Troubleshooting
| Symptom | Cause | Fix |
|---------|-------|-----|
| "Network unreachable" errors | Network interfaces not configured | Verify netplan config and `sudo netplan apply` |
| SSH authentication failures | SSH keys not properly distributed | Re-run `./discover-sparks` and enter passwords |
| Nodes not visible in cluster | Network connectivity issue | Verify QSFP cable connection, check IP configuration |

View File

@ -0,0 +1,24 @@
#
# SPDX-FileCopyrightText: Copyright (c) 1993-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
network:
version: 2
ethernets:
enp1s0f1np1:
link-local: [ ipv4 ]
enP2p1s0f1np1:
link-local: [ ipv4 ]

View File

@ -0,0 +1,50 @@
# Multi spark cluster setup script
## Usage
### Step 1. Clone the repo
Clone the dgx-spark-playbooks repo from GitHub
### Step 2. Switch to the multi spark cluster setup scripts directory
```bash
cd dgx-spark-playbooks/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup-1.0.0
```
### Step 3. Create or edit a JSON config file with your cluster information
```bash
# Create or edit JSON config file under the `config` directory with the ssh credentials for your nodes.
# Adjust the number of nodes in "nodes_info" list based on the number of nodes in your cluster
# Example: (config/spark_config_b2b.json):
# {
# "nodes_info": [
# {
# "ip_address": "10.0.0.1",
# "port": 22,
# "user": "nvidia",
# "password": "nvidia123"
# },
# {
# "ip_address": "10.0.0.2",
# "port": 22,
# "user": "nvidia",
# "password": "nvidia123"
# }
#
```
### Step 4. Run the cluster setup script with your json config file
```bash
bash spark_cluster_setup.sh config/spark_config_b2b.json
# This will do the following
# 1. Create a python virtual env and install required packages
# 2. Validate the environment and cluster config
# 3. Detect the topology and configure the IP addresses
# 4. Configure password-less ssh between the cluster nodes
# 5. Run NCCL BW test
```

View File

@ -0,0 +1,16 @@
{
"nodes_info": [
{
"ip_address": "NODE_1_MGMT_IP_ADDRESS",
"port": 22,
"user": "user_name",
"password": "password"
},
{
"ip_address": "NODE_2_MGMT_IP_ADDRESS",
"port": 22,
"user": "user_name",
"password": "password"
}
]
}

View File

@ -0,0 +1,22 @@
{
"nodes_info": [
{
"ip_address": "NODE_1_MGMT_IP_ADDRESS",
"port": 22,
"user": "user_name",
"password": "password"
},
{
"ip_address": "NODE_2_MGMT_IP_ADDRESS",
"port": 22,
"user": "user_name",
"password": "password"
},
{
"ip_address": "NODE_3_MGMT_IP_ADDRESS",
"port": 22,
"user": "user_name",
"password": "password"
}
]
}

View File

@ -0,0 +1,28 @@
{
"nodes_info": [
{
"ip_address": "NODE_1_MGMT_IP_ADDRESS",
"port": 22,
"user": "user_name",
"password": "password"
},
{
"ip_address": "NODE_2_MGMT_IP_ADDRESS",
"port": 22,
"user": "user_name",
"password": "password"
},
{
"ip_address": "NODE_3_MGMT_IP_ADDRESS",
"port": 22,
"user": "user_name",
"password": "password"
},
{
"ip_address": "NODE_4_MGMT_IP_ADDRESS",
"port": 22,
"user": "user_name",
"password": "password"
}
]
}

View File

@ -0,0 +1,638 @@
#!/usr/bin/env python3
#
# SPDX-FileCopyrightText: Copyright (c) 1993-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse
import json
import socket
import struct
import threading
import time
import os
from collections import defaultdict
# Interfaces used for DISCOVERY (L2 broadcasts)
DISCOVERY_INTERFACES = ["enp1s0f0np0", "enp1s0f1np1"]
# Map discovery interfaces -> interfaces where we actually want to configure IPs
# Order matters: index 0 = lower host, index 1 = higher host (per node, per link)
DISCOVERY_TO_CONFIG_IFACES = {
"enp1s0f0np0": ["enp1s0f0np0", "enP2p1s0f0np0"],
"enp1s0f1np1": ["enp1s0f1np1", "enP2p1s0f1np1"],
}
# For switch mode, map discovery iface -> stable link index
SWITCH_IFACE_INDEX = {
"enp1s0f0np0": 0,
"enp1s0f1np1": 1,
}
# All interfaces that should appear in generated netplan (discovery + config)
ALL_INTERFACES = sorted(
set(DISCOVERY_INTERFACES) | {iface for lst in DISCOVERY_TO_CONFIG_IFACES.values() for iface in lst}
)
ETHERTYPE = 0x88B5 # custom EtherType
# Discovery payload magic
DISCOVERY_MAGIC = b"TOPO_DISCOVER_2NODE_V2"
LISTEN_SECONDS = 20.0
SEND_INTERVAL = 0.5
# Primary mode settings
DEFAULT_PRIMARY_PORT = 9999
REPORT_TIMEOUT = 30 # seconds to wait for all nodes to report
def get_mac(iface: str) -> str:
"""Return MAC address string (lowercase) for the given interface."""
path = f"/sys/class/net/{iface}/address"
with open(path, "r") as f:
return f.read().strip().lower()
def mac_str_to_bytes(mac_str: str) -> bytes:
return bytes.fromhex(mac_str.replace(":", ""))
def mac_bytes_to_str(mac: bytes) -> str:
return ":".join(f"{b:02x}" for b in mac)
def iface_link_up(iface: str) -> bool:
"""Check carrier == 1."""
carrier_path = f"/sys/class/net/{iface}/carrier"
try:
with open(carrier_path, "r") as f:
return f.read().strip() == "1"
except FileNotFoundError:
return False
except Exception:
return False
def create_socket(iface: str) -> socket.socket:
"""Create and bind raw AF_PACKET socket on given interface."""
sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(ETHERTYPE))
sock.bind((iface, 0))
sock.settimeout(0.5)
return sock
def get_local_machine_id() -> str:
"""
Machine ID = lowest MAC across all DISCOVERY_INTERFACES that exist.
This is used as a stable, machine-wide identifier.
"""
macs = []
for iface in DISCOVERY_INTERFACES:
path = f"/sys/class/net/{iface}/address"
if not os.path.exists(path):
continue
try:
macs.append(get_mac(iface))
except Exception:
continue
if not macs:
raise RuntimeError("No MACs found on configured DISCOVERY_INTERFACES")
return sorted(macs)[0] # lowest MAC string
def sender_thread(iface: str, stop_event: threading.Event, machine_id_str: str):
"""
Periodically send discovery frames on the given interface.
Frame layout:
dst_mac (6) = ff:ff:ff:ff:ff:ff
src_mac (6) = this interface MAC
ethertype (2) = 0x88B5
payload:
machine_id (6) = lowest MAC of this machine
magic (N) = DISCOVERY_MAGIC (padded by NIC/kernel)
"""
try:
src_mac_str = get_mac(iface)
src_mac = mac_str_to_bytes(src_mac_str)
machine_id_bytes = mac_str_to_bytes(machine_id_str)
sock = create_socket(iface)
except Exception as e:
print(f"[{iface}] Sender error: {e}")
return
dst_mac = b"\xff\xff\xff\xff\xff\xff"
ethertype_bytes = struct.pack("!H", ETHERTYPE)
payload = machine_id_bytes + DISCOVERY_MAGIC
frame = dst_mac + src_mac + ethertype_bytes + payload
print(f"[{iface}] Sender started (local MAC {src_mac_str}, machine_id {machine_id_str})")
try:
while not stop_event.is_set():
try:
sock.send(frame)
except Exception as e:
print(f"[{iface}] Send error: {e}")
break
time.sleep(SEND_INTERVAL)
finally:
sock.close()
print(f"[{iface}] Sender stopped")
def listener_thread(iface: str,
stop_event: threading.Event,
local_mac: str,
local_machine_id: str,
neighbors_info: dict):
"""
Listen for discovery frames and record neighbor machine and NIC info.
neighbors_info[iface] will contain entries:
(neighbor_machine_id_str, neighbor_nic_mac_str)
"""
try:
sock = create_socket(iface)
except Exception as e:
print(f"[{iface}] Listener error: {e}")
return
local_mac_bytes = mac_str_to_bytes(local_mac)
print(f"[{iface}] Listener started (local MAC {local_mac}, machine_id {local_machine_id})")
try:
while not stop_event.is_set():
try:
frame, addr = sock.recvfrom(65535)
except socket.timeout:
continue
except Exception as e:
print(f"[{iface}] Recv error: {e}")
break
if len(frame) < 14 + 6 + len(DISCOVERY_MAGIC):
continue
src_mac = frame[6:12]
ethertype = struct.unpack("!H", frame[12:14])[0]
payload = frame[14:]
if ethertype != ETHERTYPE:
continue
# payload: machine_id(6) + magic
machine_id_bytes = payload[:6]
magic = payload[6:]
if not magic.startswith(DISCOVERY_MAGIC):
continue
src_mac_str = mac_bytes_to_str(src_mac)
machine_id_str = mac_bytes_to_str(machine_id_bytes)
# Ignore our own frames (by src NIC MAC)
if src_mac == local_mac_bytes:
print(f"[{iface}] Saw our own DISCOVER from {src_mac_str}, ignoring")
continue
print(f"[{iface}] Saw DISCOVER from neighbor NIC {src_mac_str}, "
f"neighbor machine_id {machine_id_str}")
neighbors_info[iface].add((machine_id_str, src_mac_str))
finally:
sock.close()
print(f"[{iface}] Listener stopped")
def build_netplan_yaml(iface_to_ip: dict) -> str:
"""
Return the netplan YAML as a string.
- Includes ALL_INTERFACES.
- Sets dhcp4: false on all of them.
- Adds addresses only for those present in iface_to_ip.
"""
lines = [
"network:",
" version: 2",
" ethernets:",
]
for iface in ALL_INTERFACES:
lines.append(f" {iface}:")
lines.append(" dhcp4: false")
if iface in iface_to_ip:
ip_cidr = iface_to_ip[iface]
lines.append(" addresses:")
lines.append(f" - {ip_cidr}")
lines.append("")
return "\n".join(lines)
def mac_str_to_int(mac_str: str) -> int:
return int(mac_str.replace(":", ""), 16)
# ---------- IP assignment helpers ----------
def ip_for_2node_link(link_index: int, node_id: int, local_index_in_pair: int) -> str:
"""
/24 scheme with 4 hosts per link (2 per node).
For each link_index:
network = 192.168.link_index.0/24
hosts .1 .. .4 used for the two nodes (2 endpoints each).
Node 1:
local_index_in_pair = 0 -> .1
local_index_in_pair = 1 -> .2
Node 2:
local_index_in_pair = 0 -> .3
local_index_in_pair = 1 -> .4
"""
host = 1 + (0 if node_id == 1 else 2) + local_index_in_pair
return f"192.168.{link_index}.{host}/24"
def ip_for_switch_link(link_index: int, node_index: int, local_index_in_pair: int) -> str:
"""
/24 scheme for N-node switch topology.
For each link_index:
network = 192.168.link_index.0/24
host = 10 + node_index * 2 + local_index_in_pair
node_index is 0-based index in sorted cluster_machine_ids.
local_index_in_pair is 0 for discovery iface, 1 for paired iface.
"""
base_octet3 = link_index # 192.168.<link_index>.X
host = 10 + node_index * 2 + local_index_in_pair
return f"192.168.{base_octet3}.{host}/24"
# ---------- Main topology logic ----------
def parse_args():
parser = argparse.ArgumentParser(
description="Network topology discovery script. Run on all nodes to discover neighbors.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Run normal discovery on a worker node:
sudo python3 detect_all.py
# Run as primary to collect topology from all nodes and print diagram:
sudo python3 detect_all.py --primary
# Run on worker node and report to primary at specific address:
sudo python3 detect_all.py --report-to 10.0.0.1:9999
# Primary with custom port:
sudo python3 detect_all.py --primary --port 8888
"""
)
parser.add_argument(
"--primary",
action="store_true",
help="Run as primary node: collect reports from all nodes and print topology diagram"
)
parser.add_argument(
"--report-to",
type=str,
metavar="HOST:PORT",
help="Report topology to primary node at HOST:PORT after discovery"
)
parser.add_argument(
"--port",
type=int,
default=DEFAULT_PRIMARY_PORT,
help=f"Port for primary to listen on (default: {DEFAULT_PRIMARY_PORT})"
)
parser.add_argument(
"--timeout",
type=int,
default=REPORT_TIMEOUT,
help=f"Seconds to wait for node reports in primary mode (default: {REPORT_TIMEOUT})"
)
parser.add_argument(
"--apply-netplan-yaml",
action="store_true",
default=False,
help="Apply netplan YAML"
)
return parser.parse_args()
def apply_netplan_yaml(netplan_yaml) -> bool:
netplan_path = "/etc/netplan/40-cx7.yaml"
try:
print(f"Applying netplan YAML")
with open(netplan_path, "w") as f:
f.write(netplan_yaml)
os.chmod(netplan_path, 0o600)
ret = os.system("netplan apply")
if ret != 0:
raise Exception(f"netplan apply failed: {ret}")
except Exception as e:
print(f"Failed to apply netplan YAML on node: {e}")
return False
return True
def main() -> bool:
args = parse_args()
if os.geteuid() != 0:
print("ERROR: This script must be run as root (raw sockets).")
return False
# Determine which discovery interfaces are physically up
active_ifaces = []
for iface in DISCOVERY_INTERFACES:
if not os.path.exists(f"/sys/class/net/{iface}/address"):
print(f"[{iface}] Interface does not exist, skipping")
continue
if iface_link_up(iface):
active_ifaces.append(iface)
else:
print(f"[{iface}] Link down, skipping")
if not active_ifaces:
print("No active discovery interfaces among:", DISCOVERY_INTERFACES)
return False
print(f"Active discovery interfaces: {active_ifaces}")
# Compute local machine_id (lowest MAC across all discovery interfaces)
try:
local_machine_id = get_local_machine_id()
except Exception as e:
print(f"Could not compute local machine_id: {e}")
return False
print(f"Local MACHINE_ID (lowest MAC): {local_machine_id}")
# Only support up to 2 active ports for now
if len(active_ifaces) > 2:
print("More than 2 active discovery interfaces detected; this script currently "
"supports only up to 2 active ports.")
return False
# Pre-read local MACs for active discovery interfaces
local_macs = {}
for iface in active_ifaces:
try:
local_macs[iface] = get_mac(iface)
except Exception as e:
print(f"[{iface}] Cannot get MAC, skipping: {e}")
stop_event = threading.Event()
neighbors_info = defaultdict(set) # iface -> set of (machine_id_str, nic_mac_str)
threads = []
# Start listeners first
for iface in active_ifaces:
t = threading.Thread(
target=listener_thread,
args=(iface, stop_event, local_macs[iface], local_machine_id, neighbors_info),
daemon=True,
)
t.start()
threads.append(t)
# Start senders
for iface in active_ifaces:
t = threading.Thread(
target=sender_thread,
args=(iface, stop_event, local_machine_id),
daemon=True,
)
t.start()
threads.append(t)
print(f"\nListening and broadcasting for {LISTEN_SECONDS} seconds...")
time.sleep(LISTEN_SECONDS)
stop_event.set()
for t in threads:
t.join(timeout=1.0)
# Summarize neighbors
print("\nNeighbor summary per discovery interface:")
all_neighbor_machines = set()
all_neighbor_nics = set()
machines_per_iface = {}
for iface in active_ifaces:
entries = neighbors_info[iface]
machines_here = {m for (m, n) in entries}
nics_here = {n for (m, n) in entries}
machines_per_iface[iface] = machines_here
all_neighbor_machines |= machines_here
all_neighbor_nics |= nics_here
print(f" {iface}:")
print(f" Neighbor MACHINE_IDs: {machines_here}")
print(f" Neighbor NIC MACs: {nics_here}")
print(f"\nAll neighbor MACHINE_IDs (excluding self): {all_neighbor_machines}")
print(f"All neighbor NIC MACs: {all_neighbor_nics}")
# Include self in the cluster view
cluster_machine_ids = sorted(all_neighbor_machines | {local_machine_id})
num_machines = len(cluster_machine_ids)
print(f"\nCluster MACHINE_IDs (including self): {cluster_machine_ids}")
print(f"Detected {num_machines} machines total in this broadcast domain")
if num_machines == 1:
print("No neighbors detected on any active discovery interface. Aborting.")
return False
active_ifaces_with_neighbor = [
iface for iface in active_ifaces if neighbors_info[iface]
]
if not active_ifaces_with_neighbor:
print("Active discovery interfaces exist but no neighbor traffic seen. Aborting.")
return False
# Point-to-point pattern: each iface sees exactly one neighbor machine
p2p_per_iface = all(
len(machines_per_iface[iface]) == 1
for iface in active_ifaces_with_neighbor
)
# Union of neighbor machines across ifaces
union_neighbors = set()
for iface in active_ifaces_with_neighbor:
union_neighbors |= machines_per_iface[iface]
# ---- Topology classification ----
mode = None
if num_machines == 2:
mode = "2node"
topo_desc = "2-node (direct or dual-link)"
elif num_machines == 3:
if p2p_per_iface and len(union_neighbors) == 2:
mode = "ring3"
topo_desc = "3-node ring topology"
else:
mode = "switch"
topo_desc = "3-node switch-like topology"
else: # num_machines >= 4
if p2p_per_iface and len(union_neighbors) == 2:
print("\nDetected a ring/line-style point-to-point topology with 4 or more machines.")
print("This configuration is NOT supported by this script yet. Aborting.")
return False
else:
mode = "switch"
topo_desc = f"{num_machines}-node switch-like topology"
print(f"\nTopology classification (from this node's perspective): {topo_desc}")
# ---- Role within cluster ----
if mode == "2node":
# Exactly one other machine_id
other_id = [m for m in cluster_machine_ids if m != local_machine_id][0]
local_id_int = mac_str_to_int(local_machine_id)
other_id_int = mac_str_to_int(other_id)
node_id = 1 if local_id_int < other_id_int else 2
print(f"\n2-node mode: this node is Node {node_id}")
elif mode in ("switch", "ring3"):
node_index = cluster_machine_ids.index(local_machine_id)
print(f"\nCluster index: this node has node_index={node_index} "
f"in cluster_machine_ids")
# ---- Build link entries for p2p-style modes (2node / ring3) ----
link_entries = [] # list of (link_id_tuple, discovery_iface, neighbor_machine_id)
if mode in ("2node", "ring3"):
for iface in active_ifaces_with_neighbor:
entries = list(neighbors_info[iface])
if not entries:
continue
# pick the first neighbor for link-id purposes
neighbor_machine, neighbor_nic_mac = entries[0]
local_nic_mac = local_macs[iface]
link_id = tuple(sorted([local_nic_mac, neighbor_nic_mac]))
link_entries.append((link_id, iface, neighbor_machine))
# Sort links by link_id to make ordering deterministic (for 2-node case)
link_entries.sort(key=lambda x: x[0])
# ---- IP assignment ----
iface_to_ip = {}
if mode == "switch":
# Each discovery interface maps to a fixed subnet (based on its name)
node_index = cluster_machine_ids.index(local_machine_id)
for discover_iface in active_ifaces_with_neighbor:
config_ifaces = DISCOVERY_TO_CONFIG_IFACES.get(discover_iface, [])
if not config_ifaces:
print(f"[{discover_iface}] No mapped config interfaces; skipping IP assignment for this link")
continue
link_index = SWITCH_IFACE_INDEX.get(discover_iface, 0)
for local_idx, cfg_iface in enumerate(config_ifaces):
ip_cidr = ip_for_switch_link(link_index, node_index, local_idx)
iface_to_ip[cfg_iface] = ip_cidr
print(
f"Switch link for iface {discover_iface}: link_index={link_index}, "
f"config_ifaces={config_ifaces}"
)
elif mode == "2node":
# Same node_id across all links
for link_index, (link_id, discover_iface, neighbor_machine) in enumerate(link_entries):
config_ifaces = DISCOVERY_TO_CONFIG_IFACES.get(discover_iface, [])
if not config_ifaces:
print(f"[{discover_iface}] No mapped config interfaces; skipping IP assignment for this link")
continue
for local_idx, cfg_iface in enumerate(config_ifaces):
ip_cidr = ip_for_2node_link(link_index, node_id, local_idx)
iface_to_ip[cfg_iface] = ip_cidr
print(
f"2-node link {link_index}: discover_iface {discover_iface} "
f"-> config_ifaces {config_ifaces}, link_id {link_id}"
)
elif mode == "ring3":
# We need a stable link_index per pair of machines across the 3-node ring.
# Build all possible pairs of machine indices and sort them.
n = 3
pair_list = []
for i in range(n):
for j in range(i + 1, n):
pair_list.append((i, j))
pair_list.sort() # deterministic: e.g. (0,1),(0,2),(1,2)
local_idx_node = cluster_machine_ids.index(local_machine_id)
for (link_id, discover_iface, neighbor_machine) in link_entries:
config_ifaces = DISCOVERY_TO_CONFIG_IFACES.get(discover_iface, [])
if not config_ifaces:
print(f"[{discover_iface}] No mapped config interfaces; skipping IP assignment for this link")
continue
neighbor_idx_node = cluster_machine_ids.index(neighbor_machine)
pair = (min(local_idx_node, neighbor_idx_node),
max(local_idx_node, neighbor_idx_node))
try:
link_index = pair_list.index(pair)
except ValueError:
print(f"[{discover_iface}] Could not find pair {pair} in pair_list {pair_list}, skipping")
continue
# Per link, decide Node 1 vs Node 2 based on machine_id ordering
node_id_link = 1 if local_machine_id < neighbor_machine else 2
for local_idx, cfg_iface in enumerate(config_ifaces):
ip_cidr = ip_for_2node_link(link_index, node_id_link, local_idx)
iface_to_ip[cfg_iface] = ip_cidr
print(
f"3-node ring link {link_index}: discover_iface {discover_iface} "
f"neighbors {local_machine_id} <-> {neighbor_machine}, "
f"config_ifaces {config_ifaces}, link_id {link_id}"
)
if not iface_to_ip:
print("No config interfaces to assign IPs to. Aborting.")
return False
# print netplan YAML for ALL_INTERFACES
yaml = build_netplan_yaml(iface_to_ip)
print("\n--- Netplan YAML ---\n")
print(yaml)
print("--- End Netplan YAML ---\n")
if args.apply_netplan_yaml:
if not apply_netplan_yaml(yaml):
return False
else:
print("No changes were made to /etc/netplan and netplan was NOT applied.")
return True
if __name__ == "__main__":
ret = main()
if not ret:
print("Failed to configure cluster networking. Aborting.")
exit(1)
else:
print("Successfully configured cluster networking.")
exit(0)

View File

@ -0,0 +1,8 @@
bcrypt==5.0.0
cffi==2.0.0
cryptography==46.0.5
invoke==2.2.1
paramiko==4.0.0
pycparser==3.0
PyNaCl==1.6.2
scp==0.15.0

View File

@ -0,0 +1,854 @@
#!/usr/bin/env python3
#
# SPDX-FileCopyrightText: Copyright (c) 1993-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import paramiko
import argparse
import time
import json
from scp import SCPClient
import threading
import sys
from pathlib import Path
import os
import subprocess
import re
from ipaddress import ip_address as ip_addr_obj, ip_network
logging.getLogger("paramiko").setLevel(logging.CRITICAL)
logging.getLogger("paramiko.transport").setLevel(logging.CRITICAL)
# Default paths
SCRIPT_DIR = Path(__file__).resolve().parent
CONFIG_PATH = SCRIPT_DIR / "spark_config.json"
SHARED_KEY = Path.home() / ".ssh" / "id_ed25519_shared"
SSH_DIR = Path.home() / ".ssh"
AUTHORIZED_KEYS = SSH_DIR / "authorized_keys"
SSH_CONFIG = SSH_DIR / "config"
IDENTITY_LINE = "IdentityFile ~/.ssh/id_ed25519_shared"
NETWORK_SETUP_SCRIPT_NAME = "detect_and_configure_cluster_networking.py"
NETWORK_SETUP_SCRIPT = SCRIPT_DIR / "node_scripts" / NETWORK_SETUP_SCRIPT_NAME
IP_PREFIX = "192.168.100."
LAST_OCTET_START = 10
SUBNET_SIZE = 24
MIN_NCCL_TEST_BW = 21.875 # 175 Gbps
MIN_NCCL_TEST_BW_RING = 10 # 80 Gbps
NCCL_ENV = """export CUDA_HOME="/usr/local/cuda" && export MPI_HOME="/usr/lib/aarch64-linux-gnu/openmpi" && export NCCL_HOME="$HOME/nccl_spark_cluster/build/" && export LD_LIBRARY_PATH="$NCCL_HOME/lib:$CUDA_HOME/lib64/:$MPI_HOME/lib:$LD_LIBRARY_PATH" """
class ExceptionThread(threading.Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.exc = None
def run(self):
try:
super().run()
except Exception as e:
self.exc = e
def join(self, timeout=None):
super().join(timeout)
if self.exc:
raise self.exc
def create_ssh_client(server, port, user, password, timeout=10):
"""Creates a Paramiko SSH client and connects."""
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
time.sleep(1)
client.connect(server, port, user, password, timeout=timeout)
return client
def paramiko_run_command_with_output(ssh_client, cmd):
_, stdout, stderr = ssh_client.exec_command(cmd)
output = ""
error = ""
while not stdout.channel.exit_status_ready():
if stdout.channel.recv_ready():
out = stdout.channel.recv(1024).decode('utf-8')
output += out
if stderr.channel.recv_ready():
error_out = stderr.channel.recv(1024).decode('utf-8')
error += error_out
time.sleep(0.1)
# After the loop finishes, there might be remaining output
# Read the rest of the output
remaining_output = stdout.read().decode('utf-8')
output += remaining_output
remaining_error = stderr.read().decode('utf-8')
if remaining_error:
error += remaining_error
exit_code = stdout.channel.recv_exit_status()
return exit_code, output, error
def paramiko_run_command(ssh_client, cmd):
_, stdout, stderr = ssh_client.exec_command(cmd)
while not stdout.channel.exit_status_ready():
if stdout.channel.recv_ready():
stdout.channel.recv(1024).decode('utf-8')
if stderr.channel.recv_ready():
stderr.channel.recv(1024).decode('utf-8')
time.sleep(0.1)
# After the loop finishes, there might be remaining output
# Read the rest of the output
stdout.read().decode('utf-8')
stderr.read().decode('utf-8')
# Get the exit status
exit_code = stdout.channel.recv_exit_status()
return exit_code
def _paramiko_run_sudo_impl(ssh_client, password, cmd, capture_output):
"""Run a command with sudo, feeding password via stdin when needed.
Password is never put on the command line, so it won't appear in ps or /proc.
"""
if password:
full_cmd = "sudo -S " + cmd
stdin, stdout, stderr = ssh_client.exec_command(full_cmd)
stdin.write(password + "\n")
stdin.channel.shutdown_write()
else:
full_cmd = "sudo -n " + cmd
stdin, stdout, stderr = ssh_client.exec_command(full_cmd)
output = ""
error = ""
while not stdout.channel.exit_status_ready():
if stdout.channel.recv_ready():
out = stdout.channel.recv(1024).decode('utf-8')
if capture_output:
output += out
if stderr.channel.recv_ready():
err = stderr.channel.recv(1024).decode('utf-8')
if capture_output:
error += err
time.sleep(0.1)
output += stdout.read().decode('utf-8')
err_rest = stderr.read().decode('utf-8')
if err_rest:
error += err_rest
exit_code = stdout.channel.recv_exit_status()
return exit_code, output, error
def paramiko_run_sudo_command(ssh_client, password, cmd):
"""Run a command with sudo (password via stdin if provided). Returns exit code only."""
exit_code, _, _ = _paramiko_run_sudo_impl(ssh_client, password, cmd, capture_output=False)
return exit_code
def paramiko_run_sudo_command_with_output(ssh_client, password, cmd):
"""Run a command with sudo (password via stdin if provided). Returns (exit_code, output, error)."""
return _paramiko_run_sudo_impl(ssh_client, password, cmd, capture_output=True)
def ssh_client_active(ssh):
return bool(ssh and ssh.get_transport() and ssh.get_transport().is_active())
def close_ssh_session(ssh):
if ssh_client_active(ssh):
ssh.close()
def setup_nccl_deps(node, ring_topology):
"""Setup NCCL dependencies on the node."""
ssh = None
try:
ssh = create_ssh_client(node["ip_address"], node["port"], node["user"], node["password"])
if not ssh.get_transport().is_active():
raise Exception(f"Could not establish a session to node {node["ip_address"]}. Check the credentials and try again.")
print(f"Updating apt on node {node["ip_address"]}...")
paramiko_run_sudo_command(ssh, node["password"], "apt update")
print(f"Installing libopenmpi-dev on node {node["ip_address"]}...")
exit_code, output, error = paramiko_run_sudo_command_with_output(ssh, node["password"], "apt install -y libopenmpi-dev")
if exit_code:
raise Exception(f"Failed to install libopenmpi-dev on node {node["ip_address"]}: output:{output} error:{error}")
print(f"Cloning NCCL repo on node {node["ip_address"]}...")
if ring_topology:
cmd = """rm -rf ~/nccl_spark_cluster/ && git clone -b dgxspark-3node-ring https://github.com/zyang-dev/nccl.git ~/nccl_spark_cluster/"""
else:
cmd = """rm -rf ~/nccl_spark_cluster/ && git clone -b v2.28.9-1 https://github.com/NVIDIA/nccl.git ~/nccl_spark_cluster/"""
exit_code, output, error = paramiko_run_command_with_output(ssh, cmd)
if exit_code:
raise Exception(f"Failed to clone NCCL repo on node {node['ip_address']}: output:{output} error:{error}")
print(f"Building NCCL on node {node["ip_address"]}...")
cmd = """cd ~/nccl_spark_cluster/ && make -j src.build NVCC_GENCODE="-gencode=arch=compute_121,code=sm_121" """
exit_code, output, error = paramiko_run_command_with_output(ssh, cmd)
if exit_code:
raise Exception(f"Failed to build NCCL on node {node['ip_address']}: output:{output} error:{error}")
print(f"Cloning NCCL tests repo on node {node["ip_address"]}...")
cmd = """rm -rf ~/nccl-tests_spark_cluster/ && git clone https://github.com/NVIDIA/nccl-tests.git ~/nccl-tests_spark_cluster/"""
exit_code, output, error = paramiko_run_command_with_output(ssh, cmd)
if exit_code:
raise Exception(f"Failed to clone NCCL tests repo on node {node['ip_address']}: output:{output} error:{error}")
print(f"Building NCCL tests on node {node["ip_address"]}...")
cmd = """cd ~/nccl-tests_spark_cluster/ && %s && make MPI=1 -j8 """ % NCCL_ENV
exit_code, output, error = paramiko_run_command_with_output(ssh, cmd)
if exit_code:
raise Exception(f"Failed to build NCCL tests on node {node['ip_address']}: {error}")
print(f"Successfully setup NCCL dependencies on node {node['ip_address']}")
close_ssh_session(ssh)
except Exception as e:
close_ssh_session(ssh)
raise Exception(f"Failed to setup NCCL dependencies on node {node["ip_address"]}:\n{e}")
def run_nccl_test(nodes_info, ring_topology):
"""Runs the NCCL test."""
threads = []
for i, node in enumerate(nodes_info):
t = ExceptionThread(target=setup_nccl_deps, args=(node, ring_topology,))
threads.append(t)
t.start()
for t in threads:
try:
t.join()
except Exception as e:
print(f"An error occurred when running NCCL setup on nodes:\n{e}")
return False
print(f"Successfully setup NCCL dependencies on all nodes...")
print(f"Running NCCL test...")
# Generate the mpirun command
host_list = ",".join(f"{node['ip_address']}:1" for node in nodes_info)
ring_topology_specific_env = "-x NCCL_IB_MERGE_NICS=0 -x NCCL_NET_PLUGIN=none " if ring_topology else ""
mpirun_cmd = (
f"{NCCL_ENV} && mpirun -np {len(nodes_info)} -H {host_list} "
'--mca plm_rsh_agent "ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no" '
"-x LD_LIBRARY_PATH=$LD_LIBRARY_PATH "
"-x UCX_NET_DEVICES=enP7s7 "
"-x NCCL_SOCKET_IFNAME=enP7s7 "
"-x OMPI_MCA_btl_tcp_if_include=enP7s7 "
"-x NCCL_IB_HCA=rocep1s0f0,rocep1s0f1,roceP2p1s0f0,roceP2p1s0f1 "
"-x NCCL_IB_SUBNET_AWARE_ROUTING=1 "
f"{ring_topology_specific_env}"
"$HOME/nccl-tests_spark_cluster/build/all_gather_perf -b 16G -e 16G -f 2"
)
# Run command on the primary node (first node in the list)
node0 = nodes_info[0]
ssh = create_ssh_client(node0["ip_address"], node0["port"], node0["user"], node0["password"])
if not ssh.get_transport().is_active():
print(f"Could not establish a session to node {node0}. Check the credentials and try again.")
return False
print(f"NCCL test command: {mpirun_cmd}")
exit_code, output, error = paramiko_run_command_with_output(ssh, mpirun_cmd)
if exit_code:
print(f"Failed to run NCCL test on node {node0["ip_address"]}: output:{output} error:{error}")
close_ssh_session(ssh)
return False
# Extract the "Avg bus bandwidth" value from the NCCL test output
avg_bus_bw = None
# The output could potentially be multiline (as it is command output)
# We need to search for a line matching "# Avg bus bandwidth : value"
for line in output.splitlines():
m = re.match(r"# Avg bus bandwidth\s*:\s*([0-9.]+)", line.strip())
if m:
avg_bus_bw = float(m.group(1))
print(f"Avg bus bandwidth from NCCL test: {avg_bus_bw} GB/s")
break
if avg_bus_bw is None:
print("WARNING: Failed to extract Avg bus bandwidth from NCCL test output.")
else:
# If the average bus bandwidth is less then throw a warning
if (ring_topology and avg_bus_bw < MIN_NCCL_TEST_BW_RING) or (not ring_topology and avg_bus_bw < MIN_NCCL_TEST_BW):
print("WARNING: NCCL Test bandwidth is less than expected. Stop any GPU workloads on the nodes and try NCCL test again using the NCCL test command above.")
else:
print(f"NCCL test BW is as expected")
close_ssh_session(ssh)
return True
def ensure_ssh_dir():
"""Ensure ~/.ssh exists with mode 0700."""
SSH_DIR.mkdir(parents=True, exist_ok=True)
os.chmod(SSH_DIR, 0o700)
def generate_shared_key():
"""Generate shared ed25519 key if it does not exist."""
if SHARED_KEY.exists():
return
ensure_ssh_dir()
print("Generating shared SSH key for all nodes...")
os.system(f"ssh-keygen -t ed25519 -N '' -f {SHARED_KEY} -q -C 'shared-cluster-key' > /dev/null 2>&1")
if not SHARED_KEY.exists():
raise Exception("Failed to generate shared SSH key.")
def add_pubkey_to_authorized_keys():
"""Add shared public key to local authorized_keys if not present."""
ensure_ssh_dir()
pub_content = (SHARED_KEY.with_suffix(".pub")).read_text()
if AUTHORIZED_KEYS.exists():
current = AUTHORIZED_KEYS.read_text()
if pub_content.strip() in current:
return
with open(AUTHORIZED_KEYS, "a") as f:
f.write(pub_content)
os.chmod(AUTHORIZED_KEYS, 0o600)
print("Added shared public key to local authorized_keys")
def update_local_ssh_config():
"""Add IdentityFile for shared key to local SSH config if missing."""
ensure_ssh_dir()
if SSH_CONFIG.exists():
content = SSH_CONFIG.read_text()
if "id_ed25519_shared" in content:
return
with open(SSH_CONFIG, "a") as f:
f.write("Host *\n")
f.write(f" {IDENTITY_LINE}\n")
os.chmod(SSH_CONFIG, 0o600)
print("Updated local SSH config to use shared key")
def configure_node_ssh_keys(node) -> bool:
"""Copy shared key to node and set up authorized_keys and SSH config."""
ssh = None
try:
ssh = create_ssh_client(node["ip_address"], node["port"], node["user"], node["password"])
if not ssh.get_transport().is_active():
raise Exception(f"Could not establish a session to node {node["ip_address"]}. Check the credentials and try again.")
# Resolve remote home (e.g. /home/nvidia or /root)
_, stdout, _ = ssh.exec_command("echo $HOME")
home = stdout.read().decode().strip() or f"/home/{node["user"]}"
remote_ssh = f"{home}/.ssh"
exit_code, output, error = paramiko_run_command_with_output(ssh, f"mkdir -p {remote_ssh} && chmod 700 {remote_ssh}")
if exit_code:
raise Exception(f"Failed to create remote SSH directory on node {node["ip_address"]}: output:{output} error:{error}")
with SCPClient(ssh.get_transport()) as scp:
scp.put(str(SHARED_KEY), f"{remote_ssh}/id_ed25519_shared")
scp.put(str(SHARED_KEY.with_suffix(".pub")), f"{remote_ssh}/id_ed25519_shared.pub")
# Set key permissions and add to authorized_keys
exit_code, output, error = paramiko_run_command_with_output(ssh, f"chmod 600 {remote_ssh}/id_ed25519_shared")
if exit_code:
raise Exception(f"Failed to set permissions on {remote_ssh}/id_ed25519_shared: output:{output} error:{error}")
exit_code, output, error = paramiko_run_command_with_output(ssh, f"chmod 644 {remote_ssh}/id_ed25519_shared.pub")
if exit_code:
raise Exception(f"Failed to set permissions on {remote_ssh}/id_ed25519_shared.pub: output:{output} error:{error}")
pub_line = (SHARED_KEY.with_suffix(".pub")).read_text().strip()
pub_escaped = pub_line.replace("'", "'\"'\"'")
exit_code, output, error = paramiko_run_command_with_output(
ssh,
f"grep -qF '{pub_escaped}' {remote_ssh}/authorized_keys 2>/dev/null || "
f"echo '{pub_escaped}' >> {remote_ssh}/authorized_keys",
)
if exit_code:
raise Exception(f"Failed to add {SHARED_KEY.with_suffix(".pub")} to authorized_keys: output:{output} error:{error}")
exit_code, output, error = paramiko_run_command_with_output(ssh, f"chmod 600 {remote_ssh}/authorized_keys")
if exit_code:
raise Exception(f"Failed to set permissions on {remote_ssh}/authorized_keys: output:{output} error:{error}")
exit_code, output, error = paramiko_run_command_with_output(
ssh,
f"grep -q 'IdentityFile.*id_ed25519_shared' {remote_ssh}/config 2>/dev/null || "
f"(echo 'Host *' >> {remote_ssh}/config && echo ' {IDENTITY_LINE}' >> {remote_ssh}/config)",
)
if exit_code:
raise Exception(f"Failed to add {IDENTITY_LINE} to config: output:{output} error:{error}")
exit_code, output, error = paramiko_run_command_with_output(ssh, f"chmod 600 {remote_ssh}/config")
if exit_code:
raise Exception(f"Failed to set permissions on {remote_ssh}/config: output:{output} error:{error}")
print(f"Successfully configured {node["ip_address"]} with shared key")
return True
except Exception as e:
print(f" ✗ Failed to configure {node["ip_address"]}:\n{e}")
return False
finally:
close_ssh_session(ssh)
def check_and_get_up_cx7_interfaces(node_info):
ssh = None
up_ifaces = []
try:
node = node_info[0]
ssh = create_ssh_client(node["ip_address"], node["port"], node["user"], node["password"])
if not ssh.get_transport().is_active():
raise Exception(f"Could not establish a session to node {node["ip_address"]}. Check the credentials and try again.")
if_groups = [["enp1s0f0np0", "enP2p1s0f0np0"], ["enp1s0f1np1", "enP2p1s0f1np1"]]
for if_group in if_groups:
up_count = 0
for if_name in if_group:
cmd = r"""ip link show %s | grep -c "state UP" """ % if_name
exit_code = paramiko_run_command(ssh, cmd)
if not exit_code:
up_count+=1
if up_count == len(if_group):
# found the if_group which has UP interfaces
up_ifaces.extend(if_group)
close_ssh_session(ssh)
if not len(up_ifaces):
print(f"ERROR: CX7 interfaces on {node["ip_address"]} are not UP")
return []
print(f"Found UP CX7 interfaces {up_ifaces} on {node["ip_address"]}. Checking other nodes...")
for node in node_info[1:]:
ssh = create_ssh_client(node["ip_address"], node["port"], node["user"], node["password"])
if not ssh.get_transport().is_active():
raise Exception(f"Could not establish a session to node {node["ip_address"]}. Check the credentials and try again.")
for if_group in if_groups:
up_count = 0
for if_name in if_group:
cmd = r"""ip link show %s | grep -c "state UP" """ % if_name
exit_code = paramiko_run_command(ssh, cmd)
if not exit_code:
if if_name not in up_ifaces:
raise Exception(f"ERROR: CX7 interface {if_name} on {node["ip_address"]} is UP which is not in {up_ifaces}. Make sure the same CX7 port(s) on each node are connected and try again.")
else:
if if_name in up_ifaces:
raise Exception(f"ERROR: CX7 interface {if_name} on {node["ip_address"]} is DOWN. {up_ifaces} are expected to be UP. Make sure the same CX7 port(s) on each node are connected and try again.")
close_ssh_session(ssh)
except Exception as e:
close_ssh_session(ssh)
raise Exception(f"ERROR: An error occurred when checking UP CX7 interfaces:\n{e}")
return up_ifaces
def check_interface_link_speed(nodes_info, interfaces):
"""Checks the link speed of the interfaces."""
ssh = None
try:
for node in nodes_info:
ssh = create_ssh_client(node["ip_address"], node["port"], node["user"], node["password"])
if not ssh.get_transport().is_active():
raise Exception(f"Could not establish a session to node {node["ip_address"]}. Check the credentials and try again.")
for iface in interfaces:
cmd = """ethtool %s | grep -i speed | awk '-F: ' '{print $2}' """ % iface
exit_code, output, error = paramiko_run_sudo_command_with_output(ssh, node["password"], cmd)
if exit_code:
print(f"ERROR: Failed to check link speed on {iface} on node {node["ip_address"]}: {error}")
close_ssh_session(ssh)
return False
speed = output.strip()
if "200000" not in speed:
print(f"ERROR: Link speed on {iface} on node {node["ip_address"]} is not 200Gbps.")
print("Check the following:\n"
"- QSFP cable should be compatible and rated at least for 200Gbps.\n"
"- If running with a switch then check the switch port speed.\n"
"- With a switch, sometimes auto-negotiation may not negotiate 200Gbps, in which case set the link speed manually on the switch ports.\n")
close_ssh_session(ssh)
return False
close_ssh_session(ssh)
except Exception as e:
close_ssh_session(ssh)
raise Exception(f"Failed to check link speed:\n{e}")
return True
def scp_put_file_with_ssh(client, local_file, remote_file) -> bool:
if not local_file or not remote_file:
print("ERROR: Local file or remote file not specified")
return False
try:
with SCPClient(client.get_transport()) as scp:
scp.put(local_file, remote_file)
except Exception as e:
print(f"scp_put_file: An error occurred:\n{e}")
return False
return True
def copy_network_setup_script_to_nodes(nodes_info) -> bool:
"""Copies the detect_and_configure_cluster_networking.py script to the nodes and runs it in threads."""
ssh = None
try:
for node in nodes_info:
ssh = create_ssh_client(node["ip_address"], node["port"], node["user"], node["password"])
if not ssh.get_transport().is_active():
print(f"Could not establish a session to node {node["ip_address"]}. Check the credentials and try again.")
return False
if not scp_put_file_with_ssh(ssh, NETWORK_SETUP_SCRIPT, f"~/{NETWORK_SETUP_SCRIPT_NAME}"):
raise Exception(f"Failed to copy {NETWORK_SETUP_SCRIPT_NAME} to node {node["ip_address"]}")
close_ssh_session(ssh)
except Exception as e:
close_ssh_session(ssh)
print(f"Failed to copy {NETWORK_SETUP_SCRIPT_NAME}:\n{e}")
return False
return True
def run_network_setup_script(node, cmd):
"""Runs the network setup script on the node. cmd is the command to run under sudo (no 'sudo' prefix)."""
ssh = None
try:
ssh = create_ssh_client(node["ip_address"], node["port"], node["user"], node["password"])
if not ssh.get_transport().is_active():
raise Exception(f"Could not establish a session to node {node["ip_address"]}.")
exit_code, output, error = paramiko_run_sudo_command_with_output(ssh, node["password"], cmd)
if exit_code:
raise Exception(f"Failed to run network setup script on node {node["ip_address"]}: output:{output} error:{error}\n")
close_ssh_session(ssh)
except Exception as e:
close_ssh_session(ssh)
raise Exception(f"Failed to run network setup script on node {node["ip_address"]}:\n{e}")
def run_network_setup_scripts_on_nodes(nodes_info):
"""Runs the network setup scripts on the nodes in threads."""
threads = []
ret = True
for i, node in enumerate(nodes_info):
cmd = f"python3 ~/{NETWORK_SETUP_SCRIPT_NAME} --apply-netplan-yaml"
if i == 0:
cmd = cmd + " --primary"
t = ExceptionThread(target=run_network_setup_script, args=(node, cmd))
threads.append(t)
t.start()
for t in threads:
try:
t.join()
except Exception as e:
print(f"An error occurred when running network setup on nodes:\n{e}")
ret = False
return ret
def verify_ip_addresses(nodes_info, up_interfaces) -> bool:
"""Verifies that the IP addresses are assigned to the interfaces."""
ssh = None
try:
nodes_to_ip_cidrs = {}
all_nodes_ip_addresses = []
for node in nodes_info:
ssh = create_ssh_client(node["ip_address"], node["port"], node["user"], node["password"])
if not ssh.get_transport().is_active():
raise Exception(f"Could not establish a session to node {node["ip_address"]}. Check the credentials and try again.")
nodes_to_ip_cidrs[node["ip_address"]] = []
for iface in up_interfaces:
cmd = """ip addr show %s | grep -w inet | awk '{print $2}'""" % iface
exit_code, output, error = paramiko_run_command_with_output(ssh, cmd)
if exit_code:
raise Exception(f"ERROR: Failed to verify IP address on {iface} on node {node["ip_address"]}")
ip_addresses = output.strip().split("\n")
if len(ip_addresses) != 1:
raise Exception(f"ERROR: Zero or multiple IP addresses found on node {node["ip_address"]}, {iface}: {ip_addresses}")
if len(ip_addresses[0]) == 0:
raise Exception(f"ERROR: No IP address found on node {node["ip_address"]}, {iface}")
# Parse CIDR (e.g. 192.168.1.1/24) for uniqueness check by IP only
ip_parts = [a.split("/")[0] for a in ip_addresses]
if set(all_nodes_ip_addresses).intersection(ip_parts):
raise Exception(f"ERROR: IP address {ip_addresses} on node {node["ip_address"]}, {iface} is already assigned to another node.")
print(f"IP address on node {node["ip_address"]}, {iface}: {ip_addresses}")
all_nodes_ip_addresses.extend(ip_parts)
nodes_to_ip_cidrs[node["ip_address"]].extend(ip_addresses)
close_ssh_session(ssh)
print(f"Running cluster connectivity test...")
for node in nodes_info:
# Run mesh ping test between all nodes in the cluster
ssh = create_ssh_client(node["ip_address"], node["port"], node["user"], node["password"])
if not ssh.get_transport().is_active():
raise Exception(f"Could not establish a session to node {node["ip_address"]}. Check the credentials and try again.")
node_subnets = list(set([ip_network(cidr, strict=False) for cidr in nodes_to_ip_cidrs[node["ip_address"]]]))
for ip_address in all_nodes_ip_addresses:
# Check if the ip_address is in one of the node's subnets
if not any([ip_addr_obj(ip_address) in s for s in node_subnets]):
continue
cmd = f"ping -c 1 {ip_address} > /dev/null 2>&1"
exit_code = paramiko_run_command(ssh, cmd)
if exit_code:
raise Exception(f"Failed to run ping test from node {node["ip_address"]} to node {ip_address}")
close_ssh_session(ssh)
print(f"Cluster connectivity test completed successfully.")
except Exception as e:
close_ssh_session(ssh)
print(f"Failed to verify IP addresses:\n{e}")
return False
return True
def configure_ssh_keys_on_nodes(nodes_info) -> bool:
"""Configures the ssh keys on the nodes."""
print("Generating shared SSH key for all nodes...")
generate_shared_key()
print("Setting up shared SSH access across all nodes...")
add_pubkey_to_authorized_keys()
for node in nodes_info:
print(f"Configuring shared SSH key on node {node["ip_address"]}...")
if not configure_node_ssh_keys(node):
return False
update_local_ssh_config()
print("Shared SSH keys configured successfully.")
return True
def handle_cluster_setup(config) -> tuple[bool, bool]:
"""Handles the cluster network setup."""
try:
nodes_info = config.get("nodes_info", None)
if not nodes_info:
print("ERROR: Nodes information not found.")
return False, False
print(f"Checking UP CX7 interfaces...")
up_interfaces = check_and_get_up_cx7_interfaces(nodes_info)
if not up_interfaces:
print("ERROR: Failed to check UP CX7 interfaces. Check the QSFP cable connection and try again.")
return False, False
print(f"Checking CX7 interface link speed...")
if not check_interface_link_speed(nodes_info, up_interfaces):
return False, False
print(f"Copying network setup scripts on nodes...")
# Copy the detect_and_configure_cluster_networking.py script to the nodes and run it in threads
if not copy_network_setup_script_to_nodes(nodes_info):
return False, False
print(f"Running network setup scripts on nodes...")
if not run_network_setup_scripts_on_nodes(nodes_info):
print("ERROR: Failed to run network setup scripts on nodes. Check the QSFP cable connections and the nodes config in the json file and try again.")
return False, False
# Verify that the IP addresses are assigned to the interfaces
max_retries = 5
retries = max_retries
while retries > 0:
wait_secs = (max_retries - retries + 1) * 10
print(f"Waiting for {wait_secs} seconds before checking IP addresses")
time.sleep(wait_secs)
if not verify_ip_addresses(nodes_info, up_interfaces):
print(f"ERROR: Failed to verify IP addresses on nodes. ({retries - 1} retries left)...")
retries -= 1
continue
break
if retries == 0:
print("ERROR: Failed to verify IP addresses on nodes. Check the QSFP cable connections and the nodes config in the json file and try again.")
return False, False
# Configure ssh keys across nodes
if not configure_ssh_keys_on_nodes(nodes_info):
print("ERROR: Failed to configure ssh keys on nodes. Please check the configuration and try again.")
return False, False
ring_topology = (len(nodes_info) == 3 and len(up_interfaces) == 4)
except Exception as e:
print(f"ERROR: An error occurred when handling cluster setup:\n{e}")
return False, False
return True, ring_topology
def validate_config(config):
"""Validates the configuration."""
if not config.get("nodes_info", None):
print("ERROR: Nodes information not found.")
return False
if len(config.get("nodes_info")) < 2 or len(config.get("nodes_info")) > 4:
print("ERROR: Cluster can not contain less than 2 or more than 4 nodes. Please check the configuration and try again.")
return False
cmd = """ip a | grep -w inet | awk -F"inet |/" '{print $2}' """
result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
if result.returncode:
print(f"ERROR: Failed to check IP addresses on current machine: {result.stderr}")
return False
else:
ip_addresses = result.stdout.strip().split("\n")
print(f"Checking connectivity and permissions...")
nodes_valid = True
current_node_in_cluster = False
for node in config.get("nodes_info", []):
if not node.get("ip_address", None):
print("ERROR: IP address not found for node.")
return False
if not node.get("user", None):
print("ERROR: User not found for node.")
return False
if not node.get("port", None):
# Default port is 22
node["port"] = 22
if not node.get("password", None):
# Default password is empty
node["password"] = ""
ssh = None
try:
ssh = create_ssh_client(node["ip_address"], node["port"], node["user"], node["password"])
if not ssh.get_transport().is_active():
print(f"ERROR: Could not establish a session to node {node["ip_address"]}. Check the credentials and try again.")
return False
except Exception as e:
print(f"ERROR: Could not establish a session to node {node["ip_address"]}, check the credentials in the config file and try again: {e}")
return False
if node["password"] == "":
# No password is provided, so we need to validate the ssh key
exit_code = paramiko_run_sudo_command(ssh, "", "true")
else:
exit_code = paramiko_run_sudo_command(ssh, node["password"], "true")
if exit_code:
print(f"ERROR: Failed to check sudo access on node {node["ip_address"]}. If password is not specified then make sure that user has sudo access without password.")
nodes_valid = False
close_ssh_session(ssh)
break
if node["ip_address"] in ip_addresses:
current_node_in_cluster = True
close_ssh_session(ssh)
if not nodes_valid:
return False
if not current_node_in_cluster:
print("ERROR: This script must be run on a node in the cluster.")
return False
return True
def validate_environment():
"""Validates the environment."""
# Check if the script is being run directly instead of via the spark_cluster_setup.sh shell script
# We expect an environment variable ONLY set by the shell wrapper (e.g. SPARK_CLUSTER_SETUP_WRAPPER=1)
if os.environ.get("SPARK_CLUSTER_SETUP_WRAPPER") != "1":
print("ERROR: Please run this script via the spark_cluster_setup.sh shell script, not directly.")
return False
# Check if we are running inside a virtual environment
if sys.prefix == sys.base_prefix:
print("ERROR: Please run this script inside a Python virtual environment (venv) with requirements installed.")
return False
# Check if /etc/dgx-release exists and contains the expected DGX Spark markers
try:
with open("/etc/dgx-release", "r") as f:
content = f.read()
# Look for DGX_NAME="DGX Spark" and DGX_PRETTY_NAME="NVIDIA DGX Spark"
if 'DGX_NAME="DGX Spark"' not in content or 'DGX_PRETTY_NAME="NVIDIA DGX Spark"' not in content:
print("ERROR: This script must be run on a DGX Spark.")
return False
except FileNotFoundError:
print("ERROR: /etc/dgx-release not found. This is not a DGX Spark environment.")
return False
return True
def main():
"""Main function to setup the Spark cluster."""
parser = argparse.ArgumentParser(description="Setup the Spark cluster.")
parser.add_argument("-c", "--config", type=str, required=True, help="Path to the configuration file.")
args = parser.parse_args()
config = args.config
with open(config, "r") as f:
config = json.load(f)
if not config:
print("ERROR: Configuration file not found.")
return
try:
# Validate env
print(f"Validating environment...")
if not validate_environment():
return
# Validate the config
print(f"Validating configuration...")
if not validate_config(config):
return
print("Setting up Spark cluster...")
ret, ring_topology = handle_cluster_setup(config)
if not ret:
return
print("Spark cluster setup completed successfully.")
print("Running NCCL test...")
if not run_nccl_test(config.get("nodes_info", []), ring_topology):
return
print("NCCL test completed.")
except Exception as e:
print(f"ERROR: An error occurred when running Spark cluster setup:\n{e}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,32 @@
#!/bin/bash
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd -P)"
CURR_DIR="$(pwd -P)"
if [[ "$CURR_DIR" != "$SCRIPT_DIR" ]]; then
echo "Error: Please run this script from its own directory: $SCRIPT_DIR"
exit 1
fi
if [[ "$EUID" -eq 0 ]]; then
echo "Error: This script must not be run as root."
exit 1
fi
if [[ $# -ne 1 ]]; then
echo "Usage: $0 <config_file>"
exit 1
fi
if [[ ! -d ".venv" ]]; then
python3 -m venv .venv
fi
source .venv/bin/activate
echo "---- Installing required packages ----"
pip install -r requirements.txt
echo "---- Configuring the cluster in $1 ----"
SPARK_CLUSTER_SETUP_WRAPPER=1 python3 ./spark_cluster_setup.py -c "$1"
deactivate