From 5beeea4a3848c6a639697ad87bab76bdecccd0b2 Mon Sep 17 00:00:00 2001 From: GitLab CI Date: Mon, 16 Mar 2026 02:31:10 +0000 Subject: [PATCH] chore: Regenerate all playbooks --- README.md | 1 + nvidia/multi-sparks-through-switch/README.md | 446 +++++++++ .../assets/cx7-netplan-link-local.yaml | 24 + .../assets/spark_cluster_setup/README.md | 50 + .../config/spark_config_b2b.json | 16 + .../config/spark_config_ring.json | 22 + .../config/spark_config_switch.json | 28 + ...detect_and_configure_cluster_networking.py | 638 +++++++++++++ .../spark_cluster_setup/requirements.txt | 8 + .../spark_cluster_setup.py | 854 ++++++++++++++++++ .../spark_cluster_setup.sh | 32 + 11 files changed, 2119 insertions(+) create mode 100644 nvidia/multi-sparks-through-switch/README.md create mode 100644 nvidia/multi-sparks-through-switch/assets/cx7-netplan-link-local.yaml create mode 100644 nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/README.md create mode 100644 nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/config/spark_config_b2b.json create mode 100644 nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/config/spark_config_ring.json create mode 100644 nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/config/spark_config_switch.json create mode 100644 nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/node_scripts/detect_and_configure_cluster_networking.py create mode 100644 nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/requirements.txt create mode 100644 nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/spark_cluster_setup.py create mode 100644 nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/spark_cluster_setup.sh diff --git a/README.md b/README.md index 128404a..732984d 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ Each playbook includes prerequisites, step-by-step instructions, troubleshooting - [LM Studio on DGX Spark](nvidia/lm-studio/) - [Build and Deploy a Multi-Agent Chatbot](nvidia/multi-agent-chatbot/) - [Multi-modal Inference](nvidia/multi-modal-inference/) +- [Connect Multiple Sparks through a Switch](nvidia/multi-sparks-through-switch/) - [NCCL for Two Sparks](nvidia/nccl/) - [Fine-tune with NeMo](nvidia/nemo-fine-tune/) - [Nemotron-3-Nano with llama.cpp](nvidia/nemotron/) diff --git a/nvidia/multi-sparks-through-switch/README.md b/nvidia/multi-sparks-through-switch/README.md new file mode 100644 index 0000000..fd6a995 --- /dev/null +++ b/nvidia/multi-sparks-through-switch/README.md @@ -0,0 +1,446 @@ +# Connect Multiple Sparks through a Switch + +> Connect multiple Spark devices in a cluster and set them up for distributed inference and fine-tuning + + +## Table of Contents + +- [Overview](#overview) +- [Run on Four Sparks](#run-on-four-sparks) + - [Step 3.1. Verify negotiated Link speed](#step-31-verify-negotiated-link-speed) + - [4.1 Script for Cluster networking configuration](#41-script-for-cluster-networking-configuration) + - [4.2 Manual Cluster networking configuration](#42-manual-cluster-networking-configuration) + - [Option 1: Automatically configure SSH](#option-1-automatically-configure-ssh) + - [Option 2: Manually discover and configure SSH](#option-2-manually-discover-and-configure-ssh) +- [Troubleshooting](#troubleshooting) + +--- + +## Overview + +## Basic idea + +Configure four DGX Spark systems for high-speed inter-node communication using 200Gbps QSFP connections through a QSFP switch. This setup enables distributed workloads across multiple DGX Spark nodes by establishing network connectivity and configuring SSH authentication. + +## What you will accomplish + +You will physically connect four DGX Spark devices with QSFP cables and a QSFP switch, configure network interfaces for cluster communication, and establish passwordless SSH between nodes to create a functional distributed computing environment. + +## What to know before starting + +- Basic understanding of distributed computing concepts +- Working with network interface configuration and netplan +- Experience with SSH key management + +## Prerequisites + +- Four DGX Spark systems (these instructions will also work with two or three nodes cluster with a switch) +- QSFP switch with at least 4 QSFP56-DD ports (at least 200Gbps each) +- QSFP cables for 200Gbps connection from the switch to the devices + - One cable per spark + - If the switch has 400Gbps ports then you can also use breakout cables to split them into two 200Gbps ports +- SSH access available to all systems +- Root or sudo access on all systems: `sudo whoami` +- The same username on all systems +- Update all systems to the latest OS and Firmware. Refer to the DGX Spark documentation https://docs.nvidia.com/dgx/dgx-spark/os-and-component-update.html + +## Ancillary files + +All required files for this playbook can be found [here on GitHub](https://github.com/NVIDIA/dgx-spark-playbooks/blob/main/nvidia/multi-sparks-through-switch/) + +- [**discover-sparks.sh**](https://github.com/NVIDIA/dgx-spark-playbooks/blob/main/nvidia/connect-two-sparks/assets/discover-sparks) script for automatic node discovery and SSH key distribution + +## Time & risk + +- **Duration:** 1 hour including validation + +- **Risk level:** Medium - involves network reconfiguration + +- **Rollback:** Network changes can be reversed by removing netplan configs or IP assignments + +- **Last Updated:** 3/12/2026 + * First publication + +## Run on Four Sparks + +## Step 1. Ensure Same Username on all four Systems + +On all four systems check the username and make sure it's the same: + +```bash +## Check current username +whoami +``` + +If usernames don't match, create a new user (e.g., nvidia) on all four systems and login in with the new user: + +```bash +## Create nvidia user and add to sudo group +sudo useradd -m nvidia +sudo usermod -aG sudo nvidia + +## Set password for nvidia user +sudo passwd nvidia + +## Switch to nvidia user +su - nvidia +``` + +## Step 2. Switch Management + +Most QSFP switches offer some form of management interface, either through CLI or UI. Refer to the documentation and connect to the management interface. Make sure that the ports on the switch are enabled. For connecting four sparks, you will need to ensure that the switch is configured to provide 200Gbps connection to each DGX Spark. + +## Step 3. Physical Hardware Connection + +Connect the QSFP cables between DGX Spark systems and the switch(QSFP56-DD/QSFP56 ports) using one CX7 port on each system. It is recommended to use the same CX7 port on all Spark systems for easier network configuration and avoiding NCCL test failures. In this playbook the second port (the one further from the ethernet port) is used. This should establish the 200Gbps connection required for high-speed inter-node communication. You will see an output like the one below on all four sparks. In this example the interfaces showing as 'Up' are **enp1s0f1np1** and **enP2p1s0f1np1** (each physical port has two logical interfaces). + +Example output: +```bash +## Check QSFP interface availability on all nodes +nvidia@dxg-spark-1:~$ ibdev2netdev +rocep1s0f0 port 1 ==> enp1s0f0np0 (Down) +rocep1s0f1 port 1 ==> enp1s0f1np1 (Up) +roceP2p1s0f0 port 1 ==> enP2p1s0f0np0 (Down) +roceP2p1s0f1 port 1 ==> enP2p1s0f1np1 (Up) +``` + +> [!NOTE] +> If none of the interfaces are showing as 'Up', please check the QSFP cable connection, reboot the systems and try again. +> The interfaces showing as 'Up' depend on which port you are using to connect the nodes to the switch. Each physical port has two logical interfaces, for example, Port 1 has two interfaces - enp1s0f1np1 and enP2p1s0f1np1. Please disregard enp1s0f0np0 and enP2p1s0f0np0, and use enp1s0f1np1 and enP2p1s0f1np1 only. + +### Step 3.1. Verify negotiated Link speed + +The link speed might not default to 200Gbps with auto-negotiation. To confirm, run the command below on all sparks and check that the speed is shown as 200000Mb/s. If it shows lesser than that value, then the link speed needs to be set to 200Gbps manually in the switch port configuration. Refer to the switch's manual/documentation to disable auto-negotiation and set the link speed manually to 200Gbps (eg. 200G-baseCR4) + +Example output: +```bash +nvidia@dxg-spark-1:~$ ethtool enp1s0f1np1 | grep Speed + Speed: 100000Mb/s + +nvidia@dxg-spark-1:~$ ethtool enP2p1s0f1np1 | grep Speed + Speed: 100000Mb/s +``` + +After setting the correct speed on the switch ports. Verify the link speed on all the DGX Sparks again. + +Example output: +```bash +nvidia@dxg-spark-1:~$ ethtool enp1s0f1np1 | grep Speed + Speed: 200000Mb/s + +nvidia@dxg-spark-1:~$ ethtool enP2p1s0f1np1 | grep Speed + Speed: 200000Mb/s +``` + +## Step 4. Network Interface Configuration + +> [!NOTE] +> Full bandwidth can be achieved with just one QSFP cable. + +For a clustered setup, all DGX sparks: +1. Should be able to talk to each other using TCP/IP over CX7. +2. Should be accessible for management (eg. SSH and run commands) +3. Should be able to access internet (eg. to download models/utilities) + +It is recommended to use the Ethernet/WiFi network for management and internet traffic and keep it separate from the CX7 network to avoid CX7 bandwidth from being used for non-workload traffic. + +The supported way to configure a cluster with switch requires configuring a bridge (or using the default bridge) on the switch and adding all the ports of interest (ports connected to DGX sparks) to it through the switch management interface. +1. This way, all ports are part of a single layer-2 domain which is required for cluster networking configuration +2. Some switches have restriction that Hardware offloading can only be enabled on one bridge, so keeping all ports in a single bridge is required + +Once you are done creating/adding ports to the bridge, you should be ready to configure networking on the DGX Spark side. + +### 4.1 Script for Cluster networking configuration + +We have created a script [here on GitHub](https://github.com/NVIDIA/dgx-spark-playbooks/blob/main/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup) which automates the following: +1. Interface network configuration for all DGX Sparks +2. Set up password-less authentication between the DGX Sparks +3. Verify multi-node communication +4. Run NCCL Bandwidth tests + +> [!NOTE] +> You can use the script or continue with the manual configurations in the following sections. If you use the script, you can skip the rest of the setup sections in this playbook. + +Use the steps below to run the script: + +```bash +## Clone the repository +git clone https://github.com/NVIDIA/dgx-spark-playbooks + +## Enter the script directory +cd dgx-spark-playbooks/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup + +## Check the README.md for steps to run the script and configure the cluster networking +``` + +### 4.2 Manual Cluster networking configuration + +In this case, you can choose one of the options to assign the IPs to the CX7 logical interfaces. Options 1, 2 and 3 are mutually exclusive. +1. DHCP server on the switch (recommended, if it is supported) +2. Link local IP addressing (netplan is the same across all nodes) +3. Manual IP addressing (netplan will be different on each node but provides more control and deterministic IPs) + +#### Option 1: Configure DHCP server on the switch + +1. Configure the DHCP server on the switch with a subnet large enough to assign IPs to all sparks. A /24 subnet should work well for configuration and any future expansion. +2. Configure the 'UP' CX7 interfaces in the DGX sparks to acquire IP using DHCP. For eg. if the logical interfaces **enp1s0f1np1** / **enP2p1s0f1np1** are 'UP' then create a netplan like below on all sparks. + +```bash +## Create the netplan configuration file +sudo tee /etc/netplan/40-cx7.yaml > /dev/null < /dev/null < /dev/null < /dev/null < /dev/null < /dev/null < remote nodes)... +You may be prompted for your password for each node. + +SSH setup complete! All local and remote nodes can now SSH to each other without passwords. +``` + +> [!NOTE] +> If you encounter any errors, please follow Option 2 below to manually configure SSH and debug the issue. + +### Option 2: Manually discover and configure SSH + +You will need to find the IP addresses for the CX-7 interfaces that are up. On all nodes, run the following command to find the IP addresses and take note of them for the next step. +```bash + ip addr show enp1s0f1np1 +``` + +Example output: +``` +## In this example, we are using interface enp1s0f1np1. +nvidia@dgx-spark-1:~$ ip addr show enp1s0f1np1 + 4: enp1s0f1np1: mtu 1500 qdisc mq state UP group default qlen 1000 + link/ether 3c:6d:66:cc:b3:b7 brd ff:ff:ff:ff:ff:ff + inet **169.254.35.62**/16 brd 169.254.255.255 scope link noprefixroute enp1s0f1np1 + valid_lft forever preferred_lft forever + inet6 fe80::3e6d:66ff:fecc:b3b7/64 scope link + valid_lft forever preferred_lft forever +``` + +In this example, the IP address for Node 1 is **169.254.35.62**. Repeat the process for other nodes. + +On all nodes, run the following commands to enable passwordless SSH: +```bash +## Copy your SSH public key to all nodes. Replace the IP addresses with the ones you found in the previous step. +ssh-copy-id -i ~/.ssh/id_rsa.pub @ +ssh-copy-id -i ~/.ssh/id_rsa.pub @ +ssh-copy-id -i ~/.ssh/id_rsa.pub @ +ssh-copy-id -i ~/.ssh/id_rsa.pub @ +``` + +## Step 6. Verify Multi-Node Communication + +Test basic multi-node functionality from the head node: + +```bash +## Test hostname resolution across nodes +ssh hostname +ssh hostname +ssh hostname +ssh hostname +``` + +## Step 7. Running Tests and Workloads + +Now your cluster is set up to run distributed workloads across four nodes. For example, you can run the [NCCL playbook](https://build.nvidia.com/spark/nccl/stacked-sparks). Wherever the playbook asks to run a command on two nodes, just run it on all four nodes and modify the mpirun command which you run on the head node to use four nodes instead of two. + +Example mpirun command for four nodes: +```bash +## Set network interface environment variables (use your Up interface from the previous step) +export UCX_NET_DEVICES=enp1s0f1np1 +export NCCL_SOCKET_IFNAME=enp1s0f1np1 +export OMPI_MCA_btl_tcp_if_include=enp1s0f1np1 + +## Run the all_gather performance test across four nodes (replace the IP addresses with the ones you found in the previous step) +mpirun -np 4 -H :1,:1,:1,:1 \ + --mca plm_rsh_agent "ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no" \ + -x LD_LIBRARY_PATH=$LD_LIBRARY_PATH \ + $HOME/nccl-tests/build/all_gather_perf +``` + +## Step 8. Cleanup and Rollback + +> [!WARNING] +> These steps will reset network configuration. + +```bash +## Rollback network configuration +sudo rm /etc/netplan/40-cx7.yaml +sudo netplan apply +``` + +> [!NOTE] +> If disconnecting the switch, then make sure to do the following +> 1. Re-enable auto-negotiation to avoid issues later if the switch is used for different purposes. +> 2. Remove the DHCP server configuration on the switch if you used that to assign IPs to Sparks. +> 3. If you created a new bridge, move the ports back to the default bridge and delete the new bridge. + +## Troubleshooting + +| Symptom | Cause | Fix | +|---------|-------|-----| +| "Network unreachable" errors | Network interfaces not configured | Verify netplan config and `sudo netplan apply` | +| SSH authentication failures | SSH keys not properly distributed | Re-run `./discover-sparks` and enter passwords | +| Nodes not visible in cluster | Network connectivity issue | Verify QSFP cable connection, check IP configuration | diff --git a/nvidia/multi-sparks-through-switch/assets/cx7-netplan-link-local.yaml b/nvidia/multi-sparks-through-switch/assets/cx7-netplan-link-local.yaml new file mode 100644 index 0000000..56af10a --- /dev/null +++ b/nvidia/multi-sparks-through-switch/assets/cx7-netplan-link-local.yaml @@ -0,0 +1,24 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 1993-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +network: + version: 2 + ethernets: + enp1s0f1np1: + link-local: [ ipv4 ] + enP2p1s0f1np1: + link-local: [ ipv4 ] \ No newline at end of file diff --git a/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/README.md b/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/README.md new file mode 100644 index 0000000..50416dc --- /dev/null +++ b/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/README.md @@ -0,0 +1,50 @@ +# Multi spark cluster setup script + +## Usage + +### Step 1. Clone the repo + +Clone the dgx-spark-playbooks repo from GitHub + +### Step 2. Switch to the multi spark cluster setup scripts directory + +```bash +cd dgx-spark-playbooks/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup-1.0.0 +``` + +### Step 3. Create or edit a JSON config file with your cluster information + +```bash +# Create or edit JSON config file under the `config` directory with the ssh credentials for your nodes. +# Adjust the number of nodes in "nodes_info" list based on the number of nodes in your cluster + +# Example: (config/spark_config_b2b.json): +# { +# "nodes_info": [ +# { +# "ip_address": "10.0.0.1", +# "port": 22, +# "user": "nvidia", +# "password": "nvidia123" +# }, +# { +# "ip_address": "10.0.0.2", +# "port": 22, +# "user": "nvidia", +# "password": "nvidia123" +# } +# +``` + +### Step 4. Run the cluster setup script with your json config file + +```bash +bash spark_cluster_setup.sh config/spark_config_b2b.json + +# This will do the following +# 1. Create a python virtual env and install required packages +# 2. Validate the environment and cluster config +# 3. Detect the topology and configure the IP addresses +# 4. Configure password-less ssh between the cluster nodes +# 5. Run NCCL BW test +``` diff --git a/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/config/spark_config_b2b.json b/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/config/spark_config_b2b.json new file mode 100644 index 0000000..f32053c --- /dev/null +++ b/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/config/spark_config_b2b.json @@ -0,0 +1,16 @@ +{ + "nodes_info": [ + { + "ip_address": "NODE_1_MGMT_IP_ADDRESS", + "port": 22, + "user": "user_name", + "password": "password" + }, + { + "ip_address": "NODE_2_MGMT_IP_ADDRESS", + "port": 22, + "user": "user_name", + "password": "password" + } + ] +} \ No newline at end of file diff --git a/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/config/spark_config_ring.json b/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/config/spark_config_ring.json new file mode 100644 index 0000000..1e9eca9 --- /dev/null +++ b/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/config/spark_config_ring.json @@ -0,0 +1,22 @@ +{ + "nodes_info": [ + { + "ip_address": "NODE_1_MGMT_IP_ADDRESS", + "port": 22, + "user": "user_name", + "password": "password" + }, + { + "ip_address": "NODE_2_MGMT_IP_ADDRESS", + "port": 22, + "user": "user_name", + "password": "password" + }, + { + "ip_address": "NODE_3_MGMT_IP_ADDRESS", + "port": 22, + "user": "user_name", + "password": "password" + } + ] +} \ No newline at end of file diff --git a/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/config/spark_config_switch.json b/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/config/spark_config_switch.json new file mode 100644 index 0000000..9263191 --- /dev/null +++ b/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/config/spark_config_switch.json @@ -0,0 +1,28 @@ +{ + "nodes_info": [ + { + "ip_address": "NODE_1_MGMT_IP_ADDRESS", + "port": 22, + "user": "user_name", + "password": "password" + }, + { + "ip_address": "NODE_2_MGMT_IP_ADDRESS", + "port": 22, + "user": "user_name", + "password": "password" + }, + { + "ip_address": "NODE_3_MGMT_IP_ADDRESS", + "port": 22, + "user": "user_name", + "password": "password" + }, + { + "ip_address": "NODE_4_MGMT_IP_ADDRESS", + "port": 22, + "user": "user_name", + "password": "password" + } + ] +} \ No newline at end of file diff --git a/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/node_scripts/detect_and_configure_cluster_networking.py b/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/node_scripts/detect_and_configure_cluster_networking.py new file mode 100644 index 0000000..47cc0f5 --- /dev/null +++ b/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/node_scripts/detect_and_configure_cluster_networking.py @@ -0,0 +1,638 @@ +#!/usr/bin/env python3 + +# +# SPDX-FileCopyrightText: Copyright (c) 1993-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import argparse +import json +import socket +import struct +import threading +import time +import os +from collections import defaultdict + +# Interfaces used for DISCOVERY (L2 broadcasts) +DISCOVERY_INTERFACES = ["enp1s0f0np0", "enp1s0f1np1"] + +# Map discovery interfaces -> interfaces where we actually want to configure IPs +# Order matters: index 0 = lower host, index 1 = higher host (per node, per link) +DISCOVERY_TO_CONFIG_IFACES = { + "enp1s0f0np0": ["enp1s0f0np0", "enP2p1s0f0np0"], + "enp1s0f1np1": ["enp1s0f1np1", "enP2p1s0f1np1"], +} + +# For switch mode, map discovery iface -> stable link index +SWITCH_IFACE_INDEX = { + "enp1s0f0np0": 0, + "enp1s0f1np1": 1, +} + +# All interfaces that should appear in generated netplan (discovery + config) +ALL_INTERFACES = sorted( + set(DISCOVERY_INTERFACES) | {iface for lst in DISCOVERY_TO_CONFIG_IFACES.values() for iface in lst} +) + +ETHERTYPE = 0x88B5 # custom EtherType + +# Discovery payload magic +DISCOVERY_MAGIC = b"TOPO_DISCOVER_2NODE_V2" +LISTEN_SECONDS = 20.0 +SEND_INTERVAL = 0.5 + +# Primary mode settings +DEFAULT_PRIMARY_PORT = 9999 +REPORT_TIMEOUT = 30 # seconds to wait for all nodes to report + + +def get_mac(iface: str) -> str: + """Return MAC address string (lowercase) for the given interface.""" + path = f"/sys/class/net/{iface}/address" + with open(path, "r") as f: + return f.read().strip().lower() + + +def mac_str_to_bytes(mac_str: str) -> bytes: + return bytes.fromhex(mac_str.replace(":", "")) + + +def mac_bytes_to_str(mac: bytes) -> str: + return ":".join(f"{b:02x}" for b in mac) + + +def iface_link_up(iface: str) -> bool: + """Check carrier == 1.""" + carrier_path = f"/sys/class/net/{iface}/carrier" + try: + with open(carrier_path, "r") as f: + return f.read().strip() == "1" + except FileNotFoundError: + return False + except Exception: + return False + + +def create_socket(iface: str) -> socket.socket: + """Create and bind raw AF_PACKET socket on given interface.""" + sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(ETHERTYPE)) + sock.bind((iface, 0)) + sock.settimeout(0.5) + return sock + + +def get_local_machine_id() -> str: + """ + Machine ID = lowest MAC across all DISCOVERY_INTERFACES that exist. + This is used as a stable, machine-wide identifier. + """ + macs = [] + for iface in DISCOVERY_INTERFACES: + path = f"/sys/class/net/{iface}/address" + if not os.path.exists(path): + continue + try: + macs.append(get_mac(iface)) + except Exception: + continue + + if not macs: + raise RuntimeError("No MACs found on configured DISCOVERY_INTERFACES") + + return sorted(macs)[0] # lowest MAC string + + +def sender_thread(iface: str, stop_event: threading.Event, machine_id_str: str): + """ + Periodically send discovery frames on the given interface. + + Frame layout: + dst_mac (6) = ff:ff:ff:ff:ff:ff + src_mac (6) = this interface MAC + ethertype (2) = 0x88B5 + payload: + machine_id (6) = lowest MAC of this machine + magic (N) = DISCOVERY_MAGIC (padded by NIC/kernel) + """ + try: + src_mac_str = get_mac(iface) + src_mac = mac_str_to_bytes(src_mac_str) + machine_id_bytes = mac_str_to_bytes(machine_id_str) + sock = create_socket(iface) + except Exception as e: + print(f"[{iface}] Sender error: {e}") + return + + dst_mac = b"\xff\xff\xff\xff\xff\xff" + ethertype_bytes = struct.pack("!H", ETHERTYPE) + + payload = machine_id_bytes + DISCOVERY_MAGIC + frame = dst_mac + src_mac + ethertype_bytes + payload + + print(f"[{iface}] Sender started (local MAC {src_mac_str}, machine_id {machine_id_str})") + try: + while not stop_event.is_set(): + try: + sock.send(frame) + except Exception as e: + print(f"[{iface}] Send error: {e}") + break + time.sleep(SEND_INTERVAL) + finally: + sock.close() + print(f"[{iface}] Sender stopped") + + +def listener_thread(iface: str, + stop_event: threading.Event, + local_mac: str, + local_machine_id: str, + neighbors_info: dict): + """ + Listen for discovery frames and record neighbor machine and NIC info. + + neighbors_info[iface] will contain entries: + (neighbor_machine_id_str, neighbor_nic_mac_str) + """ + try: + sock = create_socket(iface) + except Exception as e: + print(f"[{iface}] Listener error: {e}") + return + + local_mac_bytes = mac_str_to_bytes(local_mac) + + print(f"[{iface}] Listener started (local MAC {local_mac}, machine_id {local_machine_id})") + try: + while not stop_event.is_set(): + try: + frame, addr = sock.recvfrom(65535) + except socket.timeout: + continue + except Exception as e: + print(f"[{iface}] Recv error: {e}") + break + + if len(frame) < 14 + 6 + len(DISCOVERY_MAGIC): + continue + + src_mac = frame[6:12] + ethertype = struct.unpack("!H", frame[12:14])[0] + payload = frame[14:] + + if ethertype != ETHERTYPE: + continue + + # payload: machine_id(6) + magic + machine_id_bytes = payload[:6] + magic = payload[6:] + + if not magic.startswith(DISCOVERY_MAGIC): + continue + + src_mac_str = mac_bytes_to_str(src_mac) + machine_id_str = mac_bytes_to_str(machine_id_bytes) + + # Ignore our own frames (by src NIC MAC) + if src_mac == local_mac_bytes: + print(f"[{iface}] Saw our own DISCOVER from {src_mac_str}, ignoring") + continue + + print(f"[{iface}] Saw DISCOVER from neighbor NIC {src_mac_str}, " + f"neighbor machine_id {machine_id_str}") + neighbors_info[iface].add((machine_id_str, src_mac_str)) + finally: + sock.close() + print(f"[{iface}] Listener stopped") + + +def build_netplan_yaml(iface_to_ip: dict) -> str: + """ + Return the netplan YAML as a string. + + - Includes ALL_INTERFACES. + - Sets dhcp4: false on all of them. + - Adds addresses only for those present in iface_to_ip. + """ + lines = [ + "network:", + " version: 2", + " ethernets:", + ] + for iface in ALL_INTERFACES: + lines.append(f" {iface}:") + lines.append(" dhcp4: false") + if iface in iface_to_ip: + ip_cidr = iface_to_ip[iface] + lines.append(" addresses:") + lines.append(f" - {ip_cidr}") + lines.append("") + return "\n".join(lines) + + +def mac_str_to_int(mac_str: str) -> int: + return int(mac_str.replace(":", ""), 16) + + +# ---------- IP assignment helpers ---------- + +def ip_for_2node_link(link_index: int, node_id: int, local_index_in_pair: int) -> str: + """ + /24 scheme with 4 hosts per link (2 per node). + + For each link_index: + network = 192.168.link_index.0/24 + hosts .1 .. .4 used for the two nodes (2 endpoints each). + + Node 1: + local_index_in_pair = 0 -> .1 + local_index_in_pair = 1 -> .2 + + Node 2: + local_index_in_pair = 0 -> .3 + local_index_in_pair = 1 -> .4 + """ + host = 1 + (0 if node_id == 1 else 2) + local_index_in_pair + return f"192.168.{link_index}.{host}/24" + + +def ip_for_switch_link(link_index: int, node_index: int, local_index_in_pair: int) -> str: + """ + /24 scheme for N-node switch topology. + + For each link_index: + network = 192.168.link_index.0/24 + host = 10 + node_index * 2 + local_index_in_pair + + node_index is 0-based index in sorted cluster_machine_ids. + local_index_in_pair is 0 for discovery iface, 1 for paired iface. + """ + base_octet3 = link_index # 192.168..X + host = 10 + node_index * 2 + local_index_in_pair + return f"192.168.{base_octet3}.{host}/24" + +# ---------- Main topology logic ---------- + +def parse_args(): + parser = argparse.ArgumentParser( + description="Network topology discovery script. Run on all nodes to discover neighbors.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Run normal discovery on a worker node: + sudo python3 detect_all.py + + # Run as primary to collect topology from all nodes and print diagram: + sudo python3 detect_all.py --primary + + # Run on worker node and report to primary at specific address: + sudo python3 detect_all.py --report-to 10.0.0.1:9999 + + # Primary with custom port: + sudo python3 detect_all.py --primary --port 8888 +""" + ) + parser.add_argument( + "--primary", + action="store_true", + help="Run as primary node: collect reports from all nodes and print topology diagram" + ) + parser.add_argument( + "--report-to", + type=str, + metavar="HOST:PORT", + help="Report topology to primary node at HOST:PORT after discovery" + ) + parser.add_argument( + "--port", + type=int, + default=DEFAULT_PRIMARY_PORT, + help=f"Port for primary to listen on (default: {DEFAULT_PRIMARY_PORT})" + ) + parser.add_argument( + "--timeout", + type=int, + default=REPORT_TIMEOUT, + help=f"Seconds to wait for node reports in primary mode (default: {REPORT_TIMEOUT})" + ) + parser.add_argument( + "--apply-netplan-yaml", + action="store_true", + default=False, + help="Apply netplan YAML" + ) + return parser.parse_args() + +def apply_netplan_yaml(netplan_yaml) -> bool: + netplan_path = "/etc/netplan/40-cx7.yaml" + try: + print(f"Applying netplan YAML") + + with open(netplan_path, "w") as f: + f.write(netplan_yaml) + + os.chmod(netplan_path, 0o600) + + ret = os.system("netplan apply") + if ret != 0: + raise Exception(f"netplan apply failed: {ret}") + + except Exception as e: + print(f"Failed to apply netplan YAML on node: {e}") + return False + + return True + +def main() -> bool: + args = parse_args() + + if os.geteuid() != 0: + print("ERROR: This script must be run as root (raw sockets).") + return False + + # Determine which discovery interfaces are physically up + active_ifaces = [] + for iface in DISCOVERY_INTERFACES: + if not os.path.exists(f"/sys/class/net/{iface}/address"): + print(f"[{iface}] Interface does not exist, skipping") + continue + if iface_link_up(iface): + active_ifaces.append(iface) + else: + print(f"[{iface}] Link down, skipping") + + if not active_ifaces: + print("No active discovery interfaces among:", DISCOVERY_INTERFACES) + return False + + print(f"Active discovery interfaces: {active_ifaces}") + + # Compute local machine_id (lowest MAC across all discovery interfaces) + try: + local_machine_id = get_local_machine_id() + except Exception as e: + print(f"Could not compute local machine_id: {e}") + return False + + print(f"Local MACHINE_ID (lowest MAC): {local_machine_id}") + + # Only support up to 2 active ports for now + if len(active_ifaces) > 2: + print("More than 2 active discovery interfaces detected; this script currently " + "supports only up to 2 active ports.") + return False + + # Pre-read local MACs for active discovery interfaces + local_macs = {} + for iface in active_ifaces: + try: + local_macs[iface] = get_mac(iface) + except Exception as e: + print(f"[{iface}] Cannot get MAC, skipping: {e}") + + stop_event = threading.Event() + neighbors_info = defaultdict(set) # iface -> set of (machine_id_str, nic_mac_str) + + threads = [] + + # Start listeners first + for iface in active_ifaces: + t = threading.Thread( + target=listener_thread, + args=(iface, stop_event, local_macs[iface], local_machine_id, neighbors_info), + daemon=True, + ) + t.start() + threads.append(t) + + # Start senders + for iface in active_ifaces: + t = threading.Thread( + target=sender_thread, + args=(iface, stop_event, local_machine_id), + daemon=True, + ) + t.start() + threads.append(t) + + print(f"\nListening and broadcasting for {LISTEN_SECONDS} seconds...") + time.sleep(LISTEN_SECONDS) + stop_event.set() + + for t in threads: + t.join(timeout=1.0) + + # Summarize neighbors + print("\nNeighbor summary per discovery interface:") + all_neighbor_machines = set() + all_neighbor_nics = set() + machines_per_iface = {} + + for iface in active_ifaces: + entries = neighbors_info[iface] + machines_here = {m for (m, n) in entries} + nics_here = {n for (m, n) in entries} + machines_per_iface[iface] = machines_here + all_neighbor_machines |= machines_here + all_neighbor_nics |= nics_here + print(f" {iface}:") + print(f" Neighbor MACHINE_IDs: {machines_here}") + print(f" Neighbor NIC MACs: {nics_here}") + + print(f"\nAll neighbor MACHINE_IDs (excluding self): {all_neighbor_machines}") + print(f"All neighbor NIC MACs: {all_neighbor_nics}") + + # Include self in the cluster view + cluster_machine_ids = sorted(all_neighbor_machines | {local_machine_id}) + num_machines = len(cluster_machine_ids) + print(f"\nCluster MACHINE_IDs (including self): {cluster_machine_ids}") + print(f"Detected {num_machines} machines total in this broadcast domain") + + if num_machines == 1: + print("No neighbors detected on any active discovery interface. Aborting.") + return False + + active_ifaces_with_neighbor = [ + iface for iface in active_ifaces if neighbors_info[iface] + ] + if not active_ifaces_with_neighbor: + print("Active discovery interfaces exist but no neighbor traffic seen. Aborting.") + return False + + # Point-to-point pattern: each iface sees exactly one neighbor machine + p2p_per_iface = all( + len(machines_per_iface[iface]) == 1 + for iface in active_ifaces_with_neighbor + ) + # Union of neighbor machines across ifaces + union_neighbors = set() + for iface in active_ifaces_with_neighbor: + union_neighbors |= machines_per_iface[iface] + + # ---- Topology classification ---- + mode = None + + if num_machines == 2: + mode = "2node" + topo_desc = "2-node (direct or dual-link)" + elif num_machines == 3: + if p2p_per_iface and len(union_neighbors) == 2: + mode = "ring3" + topo_desc = "3-node ring topology" + else: + mode = "switch" + topo_desc = "3-node switch-like topology" + else: # num_machines >= 4 + if p2p_per_iface and len(union_neighbors) == 2: + print("\nDetected a ring/line-style point-to-point topology with 4 or more machines.") + print("This configuration is NOT supported by this script yet. Aborting.") + return False + else: + mode = "switch" + topo_desc = f"{num_machines}-node switch-like topology" + + print(f"\nTopology classification (from this node's perspective): {topo_desc}") + + # ---- Role within cluster ---- + if mode == "2node": + # Exactly one other machine_id + other_id = [m for m in cluster_machine_ids if m != local_machine_id][0] + local_id_int = mac_str_to_int(local_machine_id) + other_id_int = mac_str_to_int(other_id) + node_id = 1 if local_id_int < other_id_int else 2 + print(f"\n2-node mode: this node is Node {node_id}") + elif mode in ("switch", "ring3"): + node_index = cluster_machine_ids.index(local_machine_id) + print(f"\nCluster index: this node has node_index={node_index} " + f"in cluster_machine_ids") + + # ---- Build link entries for p2p-style modes (2node / ring3) ---- + link_entries = [] # list of (link_id_tuple, discovery_iface, neighbor_machine_id) + + if mode in ("2node", "ring3"): + for iface in active_ifaces_with_neighbor: + entries = list(neighbors_info[iface]) + if not entries: + continue + # pick the first neighbor for link-id purposes + neighbor_machine, neighbor_nic_mac = entries[0] + local_nic_mac = local_macs[iface] + link_id = tuple(sorted([local_nic_mac, neighbor_nic_mac])) + link_entries.append((link_id, iface, neighbor_machine)) + + # Sort links by link_id to make ordering deterministic (for 2-node case) + link_entries.sort(key=lambda x: x[0]) + + # ---- IP assignment ---- + iface_to_ip = {} + + if mode == "switch": + # Each discovery interface maps to a fixed subnet (based on its name) + node_index = cluster_machine_ids.index(local_machine_id) + for discover_iface in active_ifaces_with_neighbor: + config_ifaces = DISCOVERY_TO_CONFIG_IFACES.get(discover_iface, []) + if not config_ifaces: + print(f"[{discover_iface}] No mapped config interfaces; skipping IP assignment for this link") + continue + link_index = SWITCH_IFACE_INDEX.get(discover_iface, 0) + for local_idx, cfg_iface in enumerate(config_ifaces): + ip_cidr = ip_for_switch_link(link_index, node_index, local_idx) + iface_to_ip[cfg_iface] = ip_cidr + print( + f"Switch link for iface {discover_iface}: link_index={link_index}, " + f"config_ifaces={config_ifaces}" + ) + + elif mode == "2node": + # Same node_id across all links + for link_index, (link_id, discover_iface, neighbor_machine) in enumerate(link_entries): + config_ifaces = DISCOVERY_TO_CONFIG_IFACES.get(discover_iface, []) + if not config_ifaces: + print(f"[{discover_iface}] No mapped config interfaces; skipping IP assignment for this link") + continue + for local_idx, cfg_iface in enumerate(config_ifaces): + ip_cidr = ip_for_2node_link(link_index, node_id, local_idx) + iface_to_ip[cfg_iface] = ip_cidr + print( + f"2-node link {link_index}: discover_iface {discover_iface} " + f"-> config_ifaces {config_ifaces}, link_id {link_id}" + ) + + elif mode == "ring3": + # We need a stable link_index per pair of machines across the 3-node ring. + # Build all possible pairs of machine indices and sort them. + n = 3 + pair_list = [] + for i in range(n): + for j in range(i + 1, n): + pair_list.append((i, j)) + pair_list.sort() # deterministic: e.g. (0,1),(0,2),(1,2) + + local_idx_node = cluster_machine_ids.index(local_machine_id) + + for (link_id, discover_iface, neighbor_machine) in link_entries: + config_ifaces = DISCOVERY_TO_CONFIG_IFACES.get(discover_iface, []) + if not config_ifaces: + print(f"[{discover_iface}] No mapped config interfaces; skipping IP assignment for this link") + continue + + neighbor_idx_node = cluster_machine_ids.index(neighbor_machine) + pair = (min(local_idx_node, neighbor_idx_node), + max(local_idx_node, neighbor_idx_node)) + try: + link_index = pair_list.index(pair) + except ValueError: + print(f"[{discover_iface}] Could not find pair {pair} in pair_list {pair_list}, skipping") + continue + + # Per link, decide Node 1 vs Node 2 based on machine_id ordering + node_id_link = 1 if local_machine_id < neighbor_machine else 2 + + for local_idx, cfg_iface in enumerate(config_ifaces): + ip_cidr = ip_for_2node_link(link_index, node_id_link, local_idx) + iface_to_ip[cfg_iface] = ip_cidr + + print( + f"3-node ring link {link_index}: discover_iface {discover_iface} " + f"neighbors {local_machine_id} <-> {neighbor_machine}, " + f"config_ifaces {config_ifaces}, link_id {link_id}" + ) + + if not iface_to_ip: + print("No config interfaces to assign IPs to. Aborting.") + return False + + # print netplan YAML for ALL_INTERFACES + yaml = build_netplan_yaml(iface_to_ip) + print("\n--- Netplan YAML ---\n") + print(yaml) + print("--- End Netplan YAML ---\n") + if args.apply_netplan_yaml: + if not apply_netplan_yaml(yaml): + return False + else: + print("No changes were made to /etc/netplan and netplan was NOT applied.") + return True + + +if __name__ == "__main__": + ret = main() + if not ret: + print("Failed to configure cluster networking. Aborting.") + exit(1) + else: + print("Successfully configured cluster networking.") + exit(0) diff --git a/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/requirements.txt b/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/requirements.txt new file mode 100644 index 0000000..291be12 --- /dev/null +++ b/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/requirements.txt @@ -0,0 +1,8 @@ +bcrypt==5.0.0 +cffi==2.0.0 +cryptography==46.0.5 +invoke==2.2.1 +paramiko==4.0.0 +pycparser==3.0 +PyNaCl==1.6.2 +scp==0.15.0 diff --git a/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/spark_cluster_setup.py b/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/spark_cluster_setup.py new file mode 100644 index 0000000..e5cb0e5 --- /dev/null +++ b/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/spark_cluster_setup.py @@ -0,0 +1,854 @@ +#!/usr/bin/env python3 + +# +# SPDX-FileCopyrightText: Copyright (c) 1993-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import paramiko +import argparse +import time +import json +from scp import SCPClient +import threading +import sys +from pathlib import Path +import os +import subprocess +import re +from ipaddress import ip_address as ip_addr_obj, ip_network + +logging.getLogger("paramiko").setLevel(logging.CRITICAL) +logging.getLogger("paramiko.transport").setLevel(logging.CRITICAL) + +# Default paths +SCRIPT_DIR = Path(__file__).resolve().parent +CONFIG_PATH = SCRIPT_DIR / "spark_config.json" +SHARED_KEY = Path.home() / ".ssh" / "id_ed25519_shared" +SSH_DIR = Path.home() / ".ssh" +AUTHORIZED_KEYS = SSH_DIR / "authorized_keys" +SSH_CONFIG = SSH_DIR / "config" +IDENTITY_LINE = "IdentityFile ~/.ssh/id_ed25519_shared" + +NETWORK_SETUP_SCRIPT_NAME = "detect_and_configure_cluster_networking.py" +NETWORK_SETUP_SCRIPT = SCRIPT_DIR / "node_scripts" / NETWORK_SETUP_SCRIPT_NAME + +IP_PREFIX = "192.168.100." +LAST_OCTET_START = 10 +SUBNET_SIZE = 24 + +MIN_NCCL_TEST_BW = 21.875 # 175 Gbps +MIN_NCCL_TEST_BW_RING = 10 # 80 Gbps + +NCCL_ENV = """export CUDA_HOME="/usr/local/cuda" && export MPI_HOME="/usr/lib/aarch64-linux-gnu/openmpi" && export NCCL_HOME="$HOME/nccl_spark_cluster/build/" && export LD_LIBRARY_PATH="$NCCL_HOME/lib:$CUDA_HOME/lib64/:$MPI_HOME/lib:$LD_LIBRARY_PATH" """ + +class ExceptionThread(threading.Thread): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.exc = None + + def run(self): + try: + super().run() + except Exception as e: + self.exc = e + + def join(self, timeout=None): + super().join(timeout) + if self.exc: + raise self.exc + +def create_ssh_client(server, port, user, password, timeout=10): + """Creates a Paramiko SSH client and connects.""" + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + time.sleep(1) + client.connect(server, port, user, password, timeout=timeout) + return client + +def paramiko_run_command_with_output(ssh_client, cmd): + _, stdout, stderr = ssh_client.exec_command(cmd) + + output = "" + error = "" + while not stdout.channel.exit_status_ready(): + if stdout.channel.recv_ready(): + out = stdout.channel.recv(1024).decode('utf-8') + output += out + if stderr.channel.recv_ready(): + error_out = stderr.channel.recv(1024).decode('utf-8') + error += error_out + time.sleep(0.1) + + # After the loop finishes, there might be remaining output + # Read the rest of the output + remaining_output = stdout.read().decode('utf-8') + output += remaining_output + remaining_error = stderr.read().decode('utf-8') + if remaining_error: + error += remaining_error + + exit_code = stdout.channel.recv_exit_status() + return exit_code, output, error + +def paramiko_run_command(ssh_client, cmd): + _, stdout, stderr = ssh_client.exec_command(cmd) + + while not stdout.channel.exit_status_ready(): + if stdout.channel.recv_ready(): + stdout.channel.recv(1024).decode('utf-8') + if stderr.channel.recv_ready(): + stderr.channel.recv(1024).decode('utf-8') + time.sleep(0.1) + + # After the loop finishes, there might be remaining output + # Read the rest of the output + stdout.read().decode('utf-8') + stderr.read().decode('utf-8') + + # Get the exit status + exit_code = stdout.channel.recv_exit_status() + return exit_code + + +def _paramiko_run_sudo_impl(ssh_client, password, cmd, capture_output): + """Run a command with sudo, feeding password via stdin when needed. + Password is never put on the command line, so it won't appear in ps or /proc. + """ + if password: + full_cmd = "sudo -S " + cmd + stdin, stdout, stderr = ssh_client.exec_command(full_cmd) + stdin.write(password + "\n") + stdin.channel.shutdown_write() + else: + full_cmd = "sudo -n " + cmd + stdin, stdout, stderr = ssh_client.exec_command(full_cmd) + + output = "" + error = "" + while not stdout.channel.exit_status_ready(): + if stdout.channel.recv_ready(): + out = stdout.channel.recv(1024).decode('utf-8') + if capture_output: + output += out + if stderr.channel.recv_ready(): + err = stderr.channel.recv(1024).decode('utf-8') + if capture_output: + error += err + time.sleep(0.1) + + output += stdout.read().decode('utf-8') + err_rest = stderr.read().decode('utf-8') + if err_rest: + error += err_rest + exit_code = stdout.channel.recv_exit_status() + return exit_code, output, error + + +def paramiko_run_sudo_command(ssh_client, password, cmd): + """Run a command with sudo (password via stdin if provided). Returns exit code only.""" + exit_code, _, _ = _paramiko_run_sudo_impl(ssh_client, password, cmd, capture_output=False) + return exit_code + + +def paramiko_run_sudo_command_with_output(ssh_client, password, cmd): + """Run a command with sudo (password via stdin if provided). Returns (exit_code, output, error).""" + return _paramiko_run_sudo_impl(ssh_client, password, cmd, capture_output=True) + + +def ssh_client_active(ssh): + return bool(ssh and ssh.get_transport() and ssh.get_transport().is_active()) + +def close_ssh_session(ssh): + if ssh_client_active(ssh): + ssh.close() + +def setup_nccl_deps(node, ring_topology): + """Setup NCCL dependencies on the node.""" + ssh = None + try: + ssh = create_ssh_client(node["ip_address"], node["port"], node["user"], node["password"]) + if not ssh.get_transport().is_active(): + raise Exception(f"Could not establish a session to node {node["ip_address"]}. Check the credentials and try again.") + + print(f"Updating apt on node {node["ip_address"]}...") + paramiko_run_sudo_command(ssh, node["password"], "apt update") + + print(f"Installing libopenmpi-dev on node {node["ip_address"]}...") + exit_code, output, error = paramiko_run_sudo_command_with_output(ssh, node["password"], "apt install -y libopenmpi-dev") + if exit_code: + raise Exception(f"Failed to install libopenmpi-dev on node {node["ip_address"]}: output:{output} error:{error}") + + print(f"Cloning NCCL repo on node {node["ip_address"]}...") + if ring_topology: + cmd = """rm -rf ~/nccl_spark_cluster/ && git clone -b dgxspark-3node-ring https://github.com/zyang-dev/nccl.git ~/nccl_spark_cluster/""" + else: + cmd = """rm -rf ~/nccl_spark_cluster/ && git clone -b v2.28.9-1 https://github.com/NVIDIA/nccl.git ~/nccl_spark_cluster/""" + exit_code, output, error = paramiko_run_command_with_output(ssh, cmd) + if exit_code: + raise Exception(f"Failed to clone NCCL repo on node {node['ip_address']}: output:{output} error:{error}") + + print(f"Building NCCL on node {node["ip_address"]}...") + cmd = """cd ~/nccl_spark_cluster/ && make -j src.build NVCC_GENCODE="-gencode=arch=compute_121,code=sm_121" """ + exit_code, output, error = paramiko_run_command_with_output(ssh, cmd) + if exit_code: + raise Exception(f"Failed to build NCCL on node {node['ip_address']}: output:{output} error:{error}") + + print(f"Cloning NCCL tests repo on node {node["ip_address"]}...") + cmd = """rm -rf ~/nccl-tests_spark_cluster/ && git clone https://github.com/NVIDIA/nccl-tests.git ~/nccl-tests_spark_cluster/""" + exit_code, output, error = paramiko_run_command_with_output(ssh, cmd) + if exit_code: + raise Exception(f"Failed to clone NCCL tests repo on node {node['ip_address']}: output:{output} error:{error}") + + print(f"Building NCCL tests on node {node["ip_address"]}...") + cmd = """cd ~/nccl-tests_spark_cluster/ && %s && make MPI=1 -j8 """ % NCCL_ENV + exit_code, output, error = paramiko_run_command_with_output(ssh, cmd) + if exit_code: + raise Exception(f"Failed to build NCCL tests on node {node['ip_address']}: {error}") + + print(f"Successfully setup NCCL dependencies on node {node['ip_address']}") + close_ssh_session(ssh) + except Exception as e: + close_ssh_session(ssh) + raise Exception(f"Failed to setup NCCL dependencies on node {node["ip_address"]}:\n{e}") + +def run_nccl_test(nodes_info, ring_topology): + """Runs the NCCL test.""" + + threads = [] + for i, node in enumerate(nodes_info): + t = ExceptionThread(target=setup_nccl_deps, args=(node, ring_topology,)) + threads.append(t) + t.start() + + for t in threads: + try: + t.join() + except Exception as e: + print(f"An error occurred when running NCCL setup on nodes:\n{e}") + return False + + print(f"Successfully setup NCCL dependencies on all nodes...") + + print(f"Running NCCL test...") + + # Generate the mpirun command + host_list = ",".join(f"{node['ip_address']}:1" for node in nodes_info) + ring_topology_specific_env = "-x NCCL_IB_MERGE_NICS=0 -x NCCL_NET_PLUGIN=none " if ring_topology else "" + mpirun_cmd = ( + f"{NCCL_ENV} && mpirun -np {len(nodes_info)} -H {host_list} " + '--mca plm_rsh_agent "ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no" ' + "-x LD_LIBRARY_PATH=$LD_LIBRARY_PATH " + "-x UCX_NET_DEVICES=enP7s7 " + "-x NCCL_SOCKET_IFNAME=enP7s7 " + "-x OMPI_MCA_btl_tcp_if_include=enP7s7 " + "-x NCCL_IB_HCA=rocep1s0f0,rocep1s0f1,roceP2p1s0f0,roceP2p1s0f1 " + "-x NCCL_IB_SUBNET_AWARE_ROUTING=1 " + f"{ring_topology_specific_env}" + "$HOME/nccl-tests_spark_cluster/build/all_gather_perf -b 16G -e 16G -f 2" + ) + + # Run command on the primary node (first node in the list) + node0 = nodes_info[0] + ssh = create_ssh_client(node0["ip_address"], node0["port"], node0["user"], node0["password"]) + if not ssh.get_transport().is_active(): + print(f"Could not establish a session to node {node0}. Check the credentials and try again.") + return False + + print(f"NCCL test command: {mpirun_cmd}") + exit_code, output, error = paramiko_run_command_with_output(ssh, mpirun_cmd) + if exit_code: + print(f"Failed to run NCCL test on node {node0["ip_address"]}: output:{output} error:{error}") + close_ssh_session(ssh) + return False + + # Extract the "Avg bus bandwidth" value from the NCCL test output + avg_bus_bw = None + # The output could potentially be multiline (as it is command output) + # We need to search for a line matching "# Avg bus bandwidth : value" + for line in output.splitlines(): + m = re.match(r"# Avg bus bandwidth\s*:\s*([0-9.]+)", line.strip()) + if m: + avg_bus_bw = float(m.group(1)) + print(f"Avg bus bandwidth from NCCL test: {avg_bus_bw} GB/s") + break + + if avg_bus_bw is None: + print("WARNING: Failed to extract Avg bus bandwidth from NCCL test output.") + else: + # If the average bus bandwidth is less then throw a warning + if (ring_topology and avg_bus_bw < MIN_NCCL_TEST_BW_RING) or (not ring_topology and avg_bus_bw < MIN_NCCL_TEST_BW): + print("WARNING: NCCL Test bandwidth is less than expected. Stop any GPU workloads on the nodes and try NCCL test again using the NCCL test command above.") + else: + print(f"NCCL test BW is as expected") + + close_ssh_session(ssh) + return True + +def ensure_ssh_dir(): + """Ensure ~/.ssh exists with mode 0700.""" + SSH_DIR.mkdir(parents=True, exist_ok=True) + os.chmod(SSH_DIR, 0o700) + +def generate_shared_key(): + """Generate shared ed25519 key if it does not exist.""" + if SHARED_KEY.exists(): + return + ensure_ssh_dir() + print("Generating shared SSH key for all nodes...") + os.system(f"ssh-keygen -t ed25519 -N '' -f {SHARED_KEY} -q -C 'shared-cluster-key' > /dev/null 2>&1") + if not SHARED_KEY.exists(): + raise Exception("Failed to generate shared SSH key.") + +def add_pubkey_to_authorized_keys(): + """Add shared public key to local authorized_keys if not present.""" + ensure_ssh_dir() + pub_content = (SHARED_KEY.with_suffix(".pub")).read_text() + if AUTHORIZED_KEYS.exists(): + current = AUTHORIZED_KEYS.read_text() + if pub_content.strip() in current: + return + with open(AUTHORIZED_KEYS, "a") as f: + f.write(pub_content) + os.chmod(AUTHORIZED_KEYS, 0o600) + print("Added shared public key to local authorized_keys") + +def update_local_ssh_config(): + """Add IdentityFile for shared key to local SSH config if missing.""" + ensure_ssh_dir() + if SSH_CONFIG.exists(): + content = SSH_CONFIG.read_text() + if "id_ed25519_shared" in content: + return + with open(SSH_CONFIG, "a") as f: + f.write("Host *\n") + f.write(f" {IDENTITY_LINE}\n") + os.chmod(SSH_CONFIG, 0o600) + print("Updated local SSH config to use shared key") + +def configure_node_ssh_keys(node) -> bool: + """Copy shared key to node and set up authorized_keys and SSH config.""" + ssh = None + try: + ssh = create_ssh_client(node["ip_address"], node["port"], node["user"], node["password"]) + if not ssh.get_transport().is_active(): + raise Exception(f"Could not establish a session to node {node["ip_address"]}. Check the credentials and try again.") + + # Resolve remote home (e.g. /home/nvidia or /root) + _, stdout, _ = ssh.exec_command("echo $HOME") + home = stdout.read().decode().strip() or f"/home/{node["user"]}" + remote_ssh = f"{home}/.ssh" + + exit_code, output, error = paramiko_run_command_with_output(ssh, f"mkdir -p {remote_ssh} && chmod 700 {remote_ssh}") + if exit_code: + raise Exception(f"Failed to create remote SSH directory on node {node["ip_address"]}: output:{output} error:{error}") + + with SCPClient(ssh.get_transport()) as scp: + scp.put(str(SHARED_KEY), f"{remote_ssh}/id_ed25519_shared") + scp.put(str(SHARED_KEY.with_suffix(".pub")), f"{remote_ssh}/id_ed25519_shared.pub") + + # Set key permissions and add to authorized_keys + exit_code, output, error = paramiko_run_command_with_output(ssh, f"chmod 600 {remote_ssh}/id_ed25519_shared") + if exit_code: + raise Exception(f"Failed to set permissions on {remote_ssh}/id_ed25519_shared: output:{output} error:{error}") + + exit_code, output, error = paramiko_run_command_with_output(ssh, f"chmod 644 {remote_ssh}/id_ed25519_shared.pub") + if exit_code: + raise Exception(f"Failed to set permissions on {remote_ssh}/id_ed25519_shared.pub: output:{output} error:{error}") + + pub_line = (SHARED_KEY.with_suffix(".pub")).read_text().strip() + pub_escaped = pub_line.replace("'", "'\"'\"'") + exit_code, output, error = paramiko_run_command_with_output( + ssh, + f"grep -qF '{pub_escaped}' {remote_ssh}/authorized_keys 2>/dev/null || " + f"echo '{pub_escaped}' >> {remote_ssh}/authorized_keys", + ) + if exit_code: + raise Exception(f"Failed to add {SHARED_KEY.with_suffix(".pub")} to authorized_keys: output:{output} error:{error}") + + exit_code, output, error = paramiko_run_command_with_output(ssh, f"chmod 600 {remote_ssh}/authorized_keys") + if exit_code: + raise Exception(f"Failed to set permissions on {remote_ssh}/authorized_keys: output:{output} error:{error}") + + exit_code, output, error = paramiko_run_command_with_output( + ssh, + f"grep -q 'IdentityFile.*id_ed25519_shared' {remote_ssh}/config 2>/dev/null || " + f"(echo 'Host *' >> {remote_ssh}/config && echo ' {IDENTITY_LINE}' >> {remote_ssh}/config)", + ) + if exit_code: + raise Exception(f"Failed to add {IDENTITY_LINE} to config: output:{output} error:{error}") + + exit_code, output, error = paramiko_run_command_with_output(ssh, f"chmod 600 {remote_ssh}/config") + if exit_code: + raise Exception(f"Failed to set permissions on {remote_ssh}/config: output:{output} error:{error}") + + print(f"Successfully configured {node["ip_address"]} with shared key") + return True + except Exception as e: + print(f" ✗ Failed to configure {node["ip_address"]}:\n{e}") + return False + finally: + close_ssh_session(ssh) + +def check_and_get_up_cx7_interfaces(node_info): + ssh = None + up_ifaces = [] + try: + node = node_info[0] + ssh = create_ssh_client(node["ip_address"], node["port"], node["user"], node["password"]) + if not ssh.get_transport().is_active(): + raise Exception(f"Could not establish a session to node {node["ip_address"]}. Check the credentials and try again.") + + if_groups = [["enp1s0f0np0", "enP2p1s0f0np0"], ["enp1s0f1np1", "enP2p1s0f1np1"]] + for if_group in if_groups: + up_count = 0 + for if_name in if_group: + cmd = r"""ip link show %s | grep -c "state UP" """ % if_name + exit_code = paramiko_run_command(ssh, cmd) + if not exit_code: + up_count+=1 + + if up_count == len(if_group): + # found the if_group which has UP interfaces + up_ifaces.extend(if_group) + close_ssh_session(ssh) + if not len(up_ifaces): + print(f"ERROR: CX7 interfaces on {node["ip_address"]} are not UP") + return [] + + print(f"Found UP CX7 interfaces {up_ifaces} on {node["ip_address"]}. Checking other nodes...") + for node in node_info[1:]: + ssh = create_ssh_client(node["ip_address"], node["port"], node["user"], node["password"]) + if not ssh.get_transport().is_active(): + raise Exception(f"Could not establish a session to node {node["ip_address"]}. Check the credentials and try again.") + + for if_group in if_groups: + up_count = 0 + for if_name in if_group: + cmd = r"""ip link show %s | grep -c "state UP" """ % if_name + exit_code = paramiko_run_command(ssh, cmd) + if not exit_code: + if if_name not in up_ifaces: + raise Exception(f"ERROR: CX7 interface {if_name} on {node["ip_address"]} is UP which is not in {up_ifaces}. Make sure the same CX7 port(s) on each node are connected and try again.") + else: + if if_name in up_ifaces: + raise Exception(f"ERROR: CX7 interface {if_name} on {node["ip_address"]} is DOWN. {up_ifaces} are expected to be UP. Make sure the same CX7 port(s) on each node are connected and try again.") + + close_ssh_session(ssh) + + except Exception as e: + close_ssh_session(ssh) + raise Exception(f"ERROR: An error occurred when checking UP CX7 interfaces:\n{e}") + + return up_ifaces + +def check_interface_link_speed(nodes_info, interfaces): + """Checks the link speed of the interfaces.""" + ssh = None + try: + for node in nodes_info: + ssh = create_ssh_client(node["ip_address"], node["port"], node["user"], node["password"]) + if not ssh.get_transport().is_active(): + raise Exception(f"Could not establish a session to node {node["ip_address"]}. Check the credentials and try again.") + + for iface in interfaces: + cmd = """ethtool %s | grep -i speed | awk '-F: ' '{print $2}' """ % iface + exit_code, output, error = paramiko_run_sudo_command_with_output(ssh, node["password"], cmd) + if exit_code: + print(f"ERROR: Failed to check link speed on {iface} on node {node["ip_address"]}: {error}") + close_ssh_session(ssh) + return False + + speed = output.strip() + if "200000" not in speed: + print(f"ERROR: Link speed on {iface} on node {node["ip_address"]} is not 200Gbps.") + print("Check the following:\n" + "- QSFP cable should be compatible and rated at least for 200Gbps.\n" + "- If running with a switch then check the switch port speed.\n" + "- With a switch, sometimes auto-negotiation may not negotiate 200Gbps, in which case set the link speed manually on the switch ports.\n") + close_ssh_session(ssh) + return False + + close_ssh_session(ssh) + + except Exception as e: + close_ssh_session(ssh) + raise Exception(f"Failed to check link speed:\n{e}") + + return True + +def scp_put_file_with_ssh(client, local_file, remote_file) -> bool: + if not local_file or not remote_file: + print("ERROR: Local file or remote file not specified") + return False + + try: + with SCPClient(client.get_transport()) as scp: + scp.put(local_file, remote_file) + except Exception as e: + print(f"scp_put_file: An error occurred:\n{e}") + return False + + return True + +def copy_network_setup_script_to_nodes(nodes_info) -> bool: + """Copies the detect_and_configure_cluster_networking.py script to the nodes and runs it in threads.""" + ssh = None + try: + for node in nodes_info: + ssh = create_ssh_client(node["ip_address"], node["port"], node["user"], node["password"]) + if not ssh.get_transport().is_active(): + print(f"Could not establish a session to node {node["ip_address"]}. Check the credentials and try again.") + return False + + if not scp_put_file_with_ssh(ssh, NETWORK_SETUP_SCRIPT, f"~/{NETWORK_SETUP_SCRIPT_NAME}"): + raise Exception(f"Failed to copy {NETWORK_SETUP_SCRIPT_NAME} to node {node["ip_address"]}") + + close_ssh_session(ssh) + except Exception as e: + close_ssh_session(ssh) + print(f"Failed to copy {NETWORK_SETUP_SCRIPT_NAME}:\n{e}") + return False + + return True + +def run_network_setup_script(node, cmd): + """Runs the network setup script on the node. cmd is the command to run under sudo (no 'sudo' prefix).""" + ssh = None + try: + ssh = create_ssh_client(node["ip_address"], node["port"], node["user"], node["password"]) + if not ssh.get_transport().is_active(): + raise Exception(f"Could not establish a session to node {node["ip_address"]}.") + + exit_code, output, error = paramiko_run_sudo_command_with_output(ssh, node["password"], cmd) + if exit_code: + raise Exception(f"Failed to run network setup script on node {node["ip_address"]}: output:{output} error:{error}\n") + + close_ssh_session(ssh) + + except Exception as e: + close_ssh_session(ssh) + raise Exception(f"Failed to run network setup script on node {node["ip_address"]}:\n{e}") + +def run_network_setup_scripts_on_nodes(nodes_info): + """Runs the network setup scripts on the nodes in threads.""" + + threads = [] + ret = True + for i, node in enumerate(nodes_info): + cmd = f"python3 ~/{NETWORK_SETUP_SCRIPT_NAME} --apply-netplan-yaml" + if i == 0: + cmd = cmd + " --primary" + t = ExceptionThread(target=run_network_setup_script, args=(node, cmd)) + threads.append(t) + t.start() + + for t in threads: + try: + t.join() + except Exception as e: + print(f"An error occurred when running network setup on nodes:\n{e}") + ret = False + + return ret + +def verify_ip_addresses(nodes_info, up_interfaces) -> bool: + """Verifies that the IP addresses are assigned to the interfaces.""" + ssh = None + try: + nodes_to_ip_cidrs = {} + all_nodes_ip_addresses = [] + for node in nodes_info: + ssh = create_ssh_client(node["ip_address"], node["port"], node["user"], node["password"]) + if not ssh.get_transport().is_active(): + raise Exception(f"Could not establish a session to node {node["ip_address"]}. Check the credentials and try again.") + + nodes_to_ip_cidrs[node["ip_address"]] = [] + for iface in up_interfaces: + cmd = """ip addr show %s | grep -w inet | awk '{print $2}'""" % iface + exit_code, output, error = paramiko_run_command_with_output(ssh, cmd) + if exit_code: + raise Exception(f"ERROR: Failed to verify IP address on {iface} on node {node["ip_address"]}") + + ip_addresses = output.strip().split("\n") + if len(ip_addresses) != 1: + raise Exception(f"ERROR: Zero or multiple IP addresses found on node {node["ip_address"]}, {iface}: {ip_addresses}") + + if len(ip_addresses[0]) == 0: + raise Exception(f"ERROR: No IP address found on node {node["ip_address"]}, {iface}") + + # Parse CIDR (e.g. 192.168.1.1/24) for uniqueness check by IP only + ip_parts = [a.split("/")[0] for a in ip_addresses] + if set(all_nodes_ip_addresses).intersection(ip_parts): + raise Exception(f"ERROR: IP address {ip_addresses} on node {node["ip_address"]}, {iface} is already assigned to another node.") + + print(f"IP address on node {node["ip_address"]}, {iface}: {ip_addresses}") + all_nodes_ip_addresses.extend(ip_parts) + nodes_to_ip_cidrs[node["ip_address"]].extend(ip_addresses) + + close_ssh_session(ssh) + + print(f"Running cluster connectivity test...") + + for node in nodes_info: + # Run mesh ping test between all nodes in the cluster + ssh = create_ssh_client(node["ip_address"], node["port"], node["user"], node["password"]) + if not ssh.get_transport().is_active(): + raise Exception(f"Could not establish a session to node {node["ip_address"]}. Check the credentials and try again.") + + node_subnets = list(set([ip_network(cidr, strict=False) for cidr in nodes_to_ip_cidrs[node["ip_address"]]])) + + for ip_address in all_nodes_ip_addresses: + # Check if the ip_address is in one of the node's subnets + if not any([ip_addr_obj(ip_address) in s for s in node_subnets]): + continue + + cmd = f"ping -c 1 {ip_address} > /dev/null 2>&1" + exit_code = paramiko_run_command(ssh, cmd) + if exit_code: + raise Exception(f"Failed to run ping test from node {node["ip_address"]} to node {ip_address}") + + close_ssh_session(ssh) + + print(f"Cluster connectivity test completed successfully.") + + except Exception as e: + close_ssh_session(ssh) + print(f"Failed to verify IP addresses:\n{e}") + return False + + return True + +def configure_ssh_keys_on_nodes(nodes_info) -> bool: + """Configures the ssh keys on the nodes.""" + + print("Generating shared SSH key for all nodes...") + generate_shared_key() + + print("Setting up shared SSH access across all nodes...") + add_pubkey_to_authorized_keys() + + for node in nodes_info: + print(f"Configuring shared SSH key on node {node["ip_address"]}...") + if not configure_node_ssh_keys(node): + return False + + update_local_ssh_config() + + print("Shared SSH keys configured successfully.") + + return True + +def handle_cluster_setup(config) -> tuple[bool, bool]: + """Handles the cluster network setup.""" + try: + nodes_info = config.get("nodes_info", None) + if not nodes_info: + print("ERROR: Nodes information not found.") + return False, False + + print(f"Checking UP CX7 interfaces...") + up_interfaces = check_and_get_up_cx7_interfaces(nodes_info) + if not up_interfaces: + print("ERROR: Failed to check UP CX7 interfaces. Check the QSFP cable connection and try again.") + return False, False + + print(f"Checking CX7 interface link speed...") + if not check_interface_link_speed(nodes_info, up_interfaces): + return False, False + + print(f"Copying network setup scripts on nodes...") + # Copy the detect_and_configure_cluster_networking.py script to the nodes and run it in threads + if not copy_network_setup_script_to_nodes(nodes_info): + return False, False + + print(f"Running network setup scripts on nodes...") + if not run_network_setup_scripts_on_nodes(nodes_info): + print("ERROR: Failed to run network setup scripts on nodes. Check the QSFP cable connections and the nodes config in the json file and try again.") + return False, False + + # Verify that the IP addresses are assigned to the interfaces + max_retries = 5 + retries = max_retries + while retries > 0: + wait_secs = (max_retries - retries + 1) * 10 + print(f"Waiting for {wait_secs} seconds before checking IP addresses") + time.sleep(wait_secs) + if not verify_ip_addresses(nodes_info, up_interfaces): + print(f"ERROR: Failed to verify IP addresses on nodes. ({retries - 1} retries left)...") + retries -= 1 + continue + break + + if retries == 0: + print("ERROR: Failed to verify IP addresses on nodes. Check the QSFP cable connections and the nodes config in the json file and try again.") + return False, False + + # Configure ssh keys across nodes + if not configure_ssh_keys_on_nodes(nodes_info): + print("ERROR: Failed to configure ssh keys on nodes. Please check the configuration and try again.") + return False, False + + ring_topology = (len(nodes_info) == 3 and len(up_interfaces) == 4) + + except Exception as e: + print(f"ERROR: An error occurred when handling cluster setup:\n{e}") + return False, False + + return True, ring_topology + +def validate_config(config): + """Validates the configuration.""" + + if not config.get("nodes_info", None): + print("ERROR: Nodes information not found.") + return False + + if len(config.get("nodes_info")) < 2 or len(config.get("nodes_info")) > 4: + print("ERROR: Cluster can not contain less than 2 or more than 4 nodes. Please check the configuration and try again.") + return False + + cmd = """ip a | grep -w inet | awk -F"inet |/" '{print $2}' """ + result = subprocess.run(cmd, capture_output=True, text=True, shell=True) + if result.returncode: + print(f"ERROR: Failed to check IP addresses on current machine: {result.stderr}") + return False + else: + ip_addresses = result.stdout.strip().split("\n") + + print(f"Checking connectivity and permissions...") + nodes_valid = True + current_node_in_cluster = False + for node in config.get("nodes_info", []): + if not node.get("ip_address", None): + print("ERROR: IP address not found for node.") + return False + if not node.get("user", None): + print("ERROR: User not found for node.") + return False + if not node.get("port", None): + # Default port is 22 + node["port"] = 22 + if not node.get("password", None): + # Default password is empty + node["password"] = "" + + ssh = None + try: + ssh = create_ssh_client(node["ip_address"], node["port"], node["user"], node["password"]) + if not ssh.get_transport().is_active(): + print(f"ERROR: Could not establish a session to node {node["ip_address"]}. Check the credentials and try again.") + return False + except Exception as e: + print(f"ERROR: Could not establish a session to node {node["ip_address"]}, check the credentials in the config file and try again: {e}") + return False + + if node["password"] == "": + # No password is provided, so we need to validate the ssh key + exit_code = paramiko_run_sudo_command(ssh, "", "true") + else: + exit_code = paramiko_run_sudo_command(ssh, node["password"], "true") + if exit_code: + print(f"ERROR: Failed to check sudo access on node {node["ip_address"]}. If password is not specified then make sure that user has sudo access without password.") + nodes_valid = False + close_ssh_session(ssh) + break + + if node["ip_address"] in ip_addresses: + current_node_in_cluster = True + + close_ssh_session(ssh) + + if not nodes_valid: + return False + + if not current_node_in_cluster: + print("ERROR: This script must be run on a node in the cluster.") + return False + + return True + +def validate_environment(): + """Validates the environment.""" + + # Check if the script is being run directly instead of via the spark_cluster_setup.sh shell script + # We expect an environment variable ONLY set by the shell wrapper (e.g. SPARK_CLUSTER_SETUP_WRAPPER=1) + if os.environ.get("SPARK_CLUSTER_SETUP_WRAPPER") != "1": + print("ERROR: Please run this script via the spark_cluster_setup.sh shell script, not directly.") + return False + + # Check if we are running inside a virtual environment + if sys.prefix == sys.base_prefix: + print("ERROR: Please run this script inside a Python virtual environment (venv) with requirements installed.") + return False + + # Check if /etc/dgx-release exists and contains the expected DGX Spark markers + try: + with open("/etc/dgx-release", "r") as f: + content = f.read() + # Look for DGX_NAME="DGX Spark" and DGX_PRETTY_NAME="NVIDIA DGX Spark" + if 'DGX_NAME="DGX Spark"' not in content or 'DGX_PRETTY_NAME="NVIDIA DGX Spark"' not in content: + print("ERROR: This script must be run on a DGX Spark.") + return False + except FileNotFoundError: + print("ERROR: /etc/dgx-release not found. This is not a DGX Spark environment.") + return False + + return True + + +def main(): + """Main function to setup the Spark cluster.""" + parser = argparse.ArgumentParser(description="Setup the Spark cluster.") + parser.add_argument("-c", "--config", type=str, required=True, help="Path to the configuration file.") + + args = parser.parse_args() + config = args.config + with open(config, "r") as f: + config = json.load(f) + + if not config: + print("ERROR: Configuration file not found.") + return + + try: + # Validate env + print(f"Validating environment...") + if not validate_environment(): + return + + # Validate the config + print(f"Validating configuration...") + if not validate_config(config): + return + + print("Setting up Spark cluster...") + ret, ring_topology = handle_cluster_setup(config) + if not ret: + return + + print("Spark cluster setup completed successfully.") + + print("Running NCCL test...") + if not run_nccl_test(config.get("nodes_info", []), ring_topology): + return + print("NCCL test completed.") + + except Exception as e: + print(f"ERROR: An error occurred when running Spark cluster setup:\n{e}") + +if __name__ == "__main__": + main() diff --git a/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/spark_cluster_setup.sh b/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/spark_cluster_setup.sh new file mode 100644 index 0000000..875354c --- /dev/null +++ b/nvidia/multi-sparks-through-switch/assets/spark_cluster_setup/spark_cluster_setup.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd -P)" +CURR_DIR="$(pwd -P)" +if [[ "$CURR_DIR" != "$SCRIPT_DIR" ]]; then + echo "Error: Please run this script from its own directory: $SCRIPT_DIR" + exit 1 +fi + +if [[ "$EUID" -eq 0 ]]; then + echo "Error: This script must not be run as root." + exit 1 +fi + +if [[ $# -ne 1 ]]; then + echo "Usage: $0 " + exit 1 +fi + +if [[ ! -d ".venv" ]]; then + python3 -m venv .venv +fi + +source .venv/bin/activate + +echo "---- Installing required packages ----" +pip install -r requirements.txt + +echo "---- Configuring the cluster in $1 ----" +SPARK_CLUSTER_SETUP_WRAPPER=1 python3 ./spark_cluster_setup.py -c "$1" + +deactivate