diff --git a/python/paddle/distributed/auto_parallel/cluster.py b/python/paddle/distributed/auto_parallel/cluster.py index e70b29dbe3931..e17f83eb41907 100644 --- a/python/paddle/distributed/auto_parallel/cluster.py +++ b/python/paddle/distributed/auto_parallel/cluster.py @@ -16,6 +16,7 @@ import json from enum import IntEnum from enum import unique +import paddle @unique @@ -138,7 +139,7 @@ def __repr__(self): class Link: default_hop = 1 - default_nic_bandwith = 24 + default_nic_bandwidth = 24 def __init__(self, source, target): self._src = source @@ -411,6 +412,174 @@ def __init__(self): self._alpha_latency = None self._rank_to_device_id = {} self._device_id_to_rank = {} + # This property only be valid when the cluster consists of machines, + # which have the same number accelerators. + self._num_devices_per_machine = None + + def gen_default_config_cluster(self, + gpu_model="V100", + cpu_model="6271C", + node_count=1, + device_count=1, + gpu_memory=32, + cpu_memory=503, + inter_bandwidth=24, + intra_bandwidth=235, + gpu_dp_gflops=7800, + gpu_sp_gflops=15700, + cpu_dp_gflops=75, + cpu_sp_gflops=150): + """Generate cluster by default config.""" + gpu_models = ["V100", "A100", "H100", "A2", "A10", "A16", "A30", "A40"] + xpu_models = ["XPU"] + npu_models = ["NPU"] + dcu_models = ["DCU"] + all_gpu_models = gpu_models + xpu_models + npu_models + dcu_models + assert gpu_model in all_gpu_models + self._num_devices_per_machine = device_count + + def _convert_to_type(gpu_model): + type = None + if gpu_model in gpu_models: + type = "GPU" + elif gpu_model in xpu_models: + type = "XPU" + elif gpu_model in npu_models: + type = "NPU" + elif gpu_model in dcu_models: + type = "DCU" + assert type is not None + + return type + + def _convert_to_model(gpu_model, gpu_memory): + model = None + if gpu_model == "V100": + model = "Tesla V100-SXM2-" + str(gpu_memory) + "GB" + assert model is not None + + return model + + def _convert_to_cpu_info(cpu_model): + arch, vendor, model = None, None, None + if cpu_model == "6271C": + arch = "x86_64" + vendor = "GenuineIntel" + model = "Intel(R) Xeon(R) Gold 6271C CPU @ 2.60G" + elif cpu_model == "6148": + arch = "x86_64" + vendor = "GenuineIntel" + model = "Intel(R) Xeon(R) Gold 6148 CPU @ 2.40G" + assert arch is not None + assert vendor is not None + assert model is not None + + return arch, vendor, model + + cluster_info = {} + cluster_info["machines"] = [] + global_id = 0 + global_id_to_device_type = {} + global_id_to_node = {} + # NOTE: It will support NPU, XPU, DCU models in the future, it is just a fake value now + for i in range(node_count): + machine = {} + # NOTE: The hostname is host_0, host_1, ... + machine["hostname"] = "host_" + str(i) + # NOTE: The addr is localhost, if need actual addr, it should be reset manually + machine["addr"] = "127.0.0.1" + # NOTE: The port is a default value + machine["port"] = 60009 + machine["links"] = [] + + devices = [] + local_id = 0 + + for j in range(device_count): + device = {} + global_id = global_id if i == 0 and j == 0 else global_id + 1 + + local_id += 1 + type = _convert_to_type(gpu_model) + model = _convert_to_model(gpu_model, gpu_memory) + dp_gflops = gpu_dp_gflops + sp_gflops = gpu_dp_gflops + memory = gpu_memory + + device["global_id"] = global_id + device["local_id"] = local_id + device["type"] = type + device["model"] = model + device["memory"] = memory + device["sp_gflops"] = sp_gflops + device["dp_gflops"] = dp_gflops + global_id_to_device_type[global_id] = type + global_id_to_node[global_id] = i + devices.append(device) + + # add cpu device and nic device, just one cpu + cpu_device = {} + arch, vendor, model = _convert_to_cpu_info(cpu_model) + sp_gflops = cpu_sp_gflops + dp_gflops = cpu_dp_gflops + global_id += 1 + local_id = 0 + memory = cpu_memory + type = "CPU" + cpu_device["arch"] = arch + cpu_device["vendor"] = vendor + cpu_device["model"] = model + cpu_device["sp_gflops"] = sp_gflops + cpu_device["dp_gflops"] = dp_gflops + cpu_device["global_id"] = global_id + cpu_device["local_id"] = local_id + cpu_device["memory"] = memory + cpu_device["type"] = type + global_id_to_node[global_id] = i + global_id_to_device_type[global_id] = type + devices.append(cpu_device) + + nic_device = {} + global_id += 1 + + # add NIC + type = "NIC" + width = 12.5 + ip = "127.0.0.1" + local_id = 0 + nic_device["type"] = type + nic_device["local_id"] = type + nic_device["global_id"] = global_id + global_id_to_device_type[global_id] = type + global_id_to_node[global_id] = i + devices.append(nic_device) + machine["devices"] = devices + cluster_info["machines"].append(machine) + + # build link + for i in range(0, global_id + 1): + for j in range(0, global_id + 1): + if i == j: + continue + node_id_i = global_id_to_node[i] + node_id_j = global_id_to_node[j] + device_type_i = global_id_to_device_type[i] + device_type_j = global_id_to_device_type[j] + link = {} + source_global_id = i + target_global_id = j + link["source_global_id"] = source_global_id + link["target_global_id"] = target_global_id + # the same node and device_type, set intra_bandwidth, NVL + if node_id_i == node_id_j and device_type_i == device_type_j: + link["type"] = "NVL" + link["bandwidth"] = intra_bandwidth + else: + link["type"] = "PHB" + link["bandwidth"] = inter_bandwidth + cluster_info["machines"][node_id_i]["links"].append(link) + + self._build_from_dict(cluster_info) @property def rank_to_device_id(self): @@ -473,9 +642,7 @@ def get_device(self, device_global_id): device = machine.devices[device_global_id] return device - def build_from_file(self, json_file_path): - with open(json_file_path) as json_file: - cluster_info = json.load(json_file) + def _build_from_dict(self, cluster_info): machines_info = cluster_info["machines"] for machine_info in machines_info: machine_id = self._generate_machine_id() @@ -533,6 +700,11 @@ def build_from_file(self, json_file_path): else: self._alpha_latecy = None + def build_from_file(self, json_file_path): + with open(json_file_path) as json_file: + cluster_info = json.load(json_file) + self._build_from_dict(cluster_info) + def _generate_machine_id(self): cur_machine_id = self._num_machines self._num_machines += 1 @@ -556,7 +728,7 @@ def get_beta(self, source_device_id, target_device_id): bandwidth = None # None means the source and target are not connected directly, set NIC in default if link is None: - bandwidth = Link.default_nic_bandwith + bandwidth = Link.default_nic_bandwidth else: bandwidth = link.bandwidth @@ -608,6 +780,15 @@ def get_involved_machine_count(self, device_ids): assert count > 0 return count + def get_num_machines(self): + return len(self._machines) + + def get_num_devices_per_machine(self): + # Only return the number of accelerators of each machine. + # All machines must has the same number of devices and same type of devices. + assert self._num_devices_per_machine + return self._num_devices_per_machine + def __str__(self): str = "" for machine in self.machines.values(): @@ -616,3 +797,29 @@ def __str__(self): def __repr__(self): return self.__str__() + + +def get_default_cluster(): + cluster = Cluster() + local_device_count = os.getenv("PADDLE_LOCAL_SIZE") + if local_device_count is None: + local_device_count = 1 + else: + local_device_count = int(local_device_count) + global_device_count = os.getenv("PADDLE_GLOBAL_SIZE") + if global_device_count is None: + node_count = 1 + else: + global_device_count = int(global_device_count) + assert global_device_count % local_device_count == 0 + node_count = int(global_device_count) // local_device_count + print("Node Count: ", + node_count, + "Local Device Size: ", + local_device_count, + "World size: ", + paddle.distributed.get_world_size(), + flush=True) + cluster.gen_default_config_cluster(node_count=node_count, + device_count=local_device_count) + return cluster diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_cluster.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_cluster.py index 641ca38b64944..2fa01bdfa6a59 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_cluster.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_cluster.py @@ -19,6 +19,7 @@ import paddle from paddle.distributed.auto_parallel.cluster import Cluster +from paddle.distributed.auto_parallel.cluster import get_default_cluster cluster_json = """ { @@ -1997,6 +1998,10 @@ def test_single_machine(self): self.assertTrue(devices == [0, 1, 2, 3]) self.assertTrue(involved_machine_count == 1) + # Remove unnecessary files + if os.path.exists(cluster_json_path): + os.remove(cluster_json_path) + def test_multi_machine(self): # Build cluster cluster_json_path = os.path.join(self.temp_dir.name, @@ -2022,6 +2027,17 @@ def test_multi_machine(self): if os.path.exists(cluster_json_path): os.remove(cluster_json_path) + def test_default_config_cluster(self): + cluster = Cluster() + cluster.gen_default_config_cluster(device_count=8) + # check machines and devices + self.assertTrue(cluster.get_num_machines() == 1) + self.assertTrue(cluster.get_num_devices_per_machine() == 8) + + def test_default_cluster(self): + cluster = get_default_cluster() + self.assertTrue(isinstance(cluster, Cluster)) + if __name__ == "__main__": unittest.main()