Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Auto Parallel]Generate default cluster #44150

Merged
merged 2 commits into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 212 additions & 5 deletions python/paddle/distributed/auto_parallel/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import json
from enum import IntEnum
from enum import unique
import paddle


@unique
Expand Down Expand Up @@ -138,7 +139,7 @@ def __repr__(self):
class Link:

default_hop = 1
default_nic_bandwith = 24
default_nic_bandwidth = 24

def __init__(self, source, target):
self._src = source
Expand Down Expand Up @@ -411,6 +412,174 @@ def __init__(self):
self._alpha_latency = None
self._rank_to_device_id = {}
self._device_id_to_rank = {}
# This property only be valid when the cluster consists of machines,
# which have the same number accelerators.
self._num_devices_per_machine = None

def gen_default_config_cluster(self,
gpu_model="V100",
cpu_model="6271C",
node_count=1,
device_count=1,
gpu_memory=32,
cpu_memory=503,
inter_bandwidth=24,
intra_bandwidth=235,
gpu_dp_gflops=7800,
gpu_sp_gflops=15700,
cpu_dp_gflops=75,
cpu_sp_gflops=150):
"""Generate cluster by default config."""
gpu_models = ["V100", "A100", "H100", "A2", "A10", "A16", "A30", "A40"]
xpu_models = ["XPU"]
npu_models = ["NPU"]
dcu_models = ["DCU"]
all_gpu_models = gpu_models + xpu_models + npu_models + dcu_models
assert gpu_model in all_gpu_models
self._num_devices_per_machine = device_count

def _convert_to_type(gpu_model):
type = None
if gpu_model in gpu_models:
type = "GPU"
elif gpu_model in xpu_models:
type = "XPU"
elif gpu_model in npu_models:
type = "NPU"
elif gpu_model in dcu_models:
type = "DCU"
assert type is not None

return type

def _convert_to_model(gpu_model, gpu_memory):
model = None
if gpu_model == "V100":
model = "Tesla V100-SXM2-" + str(gpu_memory) + "GB"
assert model is not None

return model

def _convert_to_cpu_info(cpu_model):
arch, vendor, model = None, None, None
if cpu_model == "6271C":
arch = "x86_64"
vendor = "GenuineIntel"
model = "Intel(R) Xeon(R) Gold 6271C CPU @ 2.60G"
elif cpu_model == "6148":
arch = "x86_64"
vendor = "GenuineIntel"
model = "Intel(R) Xeon(R) Gold 6148 CPU @ 2.40G"
assert arch is not None
assert vendor is not None
assert model is not None

return arch, vendor, model

cluster_info = {}
cluster_info["machines"] = []
global_id = 0
global_id_to_device_type = {}
global_id_to_node = {}
# NOTE: It will support NPU, XPU, DCU models in the future, it is just a fake value now
for i in range(node_count):
machine = {}
# NOTE: The hostname is host_0, host_1, ...
machine["hostname"] = "host_" + str(i)
# NOTE: The addr is localhost, if need actual addr, it should be reset manually
machine["addr"] = "127.0.0.1"
# NOTE: The port is a default value
machine["port"] = 60009
machine["links"] = []

devices = []
local_id = 0

for j in range(device_count):
device = {}
global_id = global_id if i == 0 and j == 0 else global_id + 1

local_id += 1
type = _convert_to_type(gpu_model)
model = _convert_to_model(gpu_model, gpu_memory)
dp_gflops = gpu_dp_gflops
sp_gflops = gpu_dp_gflops
memory = gpu_memory

device["global_id"] = global_id
device["local_id"] = local_id
device["type"] = type
device["model"] = model
device["memory"] = memory
device["sp_gflops"] = sp_gflops
device["dp_gflops"] = dp_gflops
global_id_to_device_type[global_id] = type
global_id_to_node[global_id] = i
devices.append(device)

# add cpu device and nic device, just one cpu
cpu_device = {}
arch, vendor, model = _convert_to_cpu_info(cpu_model)
sp_gflops = cpu_sp_gflops
dp_gflops = cpu_dp_gflops
global_id += 1
local_id = 0
memory = cpu_memory
type = "CPU"
cpu_device["arch"] = arch
cpu_device["vendor"] = vendor
cpu_device["model"] = model
cpu_device["sp_gflops"] = sp_gflops
cpu_device["dp_gflops"] = dp_gflops
cpu_device["global_id"] = global_id
cpu_device["local_id"] = local_id
cpu_device["memory"] = memory
cpu_device["type"] = type
global_id_to_node[global_id] = i
global_id_to_device_type[global_id] = type
devices.append(cpu_device)

nic_device = {}
global_id += 1

# add NIC
type = "NIC"
width = 12.5
ip = "127.0.0.1"
local_id = 0
nic_device["type"] = type
nic_device["local_id"] = type
nic_device["global_id"] = global_id
global_id_to_device_type[global_id] = type
global_id_to_node[global_id] = i
devices.append(nic_device)
machine["devices"] = devices
cluster_info["machines"].append(machine)

# build link
for i in range(0, global_id + 1):
for j in range(0, global_id + 1):
if i == j:
continue
node_id_i = global_id_to_node[i]
node_id_j = global_id_to_node[j]
device_type_i = global_id_to_device_type[i]
device_type_j = global_id_to_device_type[j]
link = {}
source_global_id = i
target_global_id = j
link["source_global_id"] = source_global_id
link["target_global_id"] = target_global_id
# the same node and device_type, set intra_bandwidth, NVL
if node_id_i == node_id_j and device_type_i == device_type_j:
link["type"] = "NVL"
link["bandwidth"] = intra_bandwidth
else:
link["type"] = "PHB"
link["bandwidth"] = inter_bandwidth
cluster_info["machines"][node_id_i]["links"].append(link)

self._build_from_dict(cluster_info)

@property
def rank_to_device_id(self):
Expand Down Expand Up @@ -473,9 +642,7 @@ def get_device(self, device_global_id):
device = machine.devices[device_global_id]
return device

def build_from_file(self, json_file_path):
with open(json_file_path) as json_file:
cluster_info = json.load(json_file)
def _build_from_dict(self, cluster_info):
machines_info = cluster_info["machines"]
for machine_info in machines_info:
machine_id = self._generate_machine_id()
Expand Down Expand Up @@ -533,6 +700,11 @@ def build_from_file(self, json_file_path):
else:
self._alpha_latecy = None

def build_from_file(self, json_file_path):
with open(json_file_path) as json_file:
cluster_info = json.load(json_file)
self._build_from_dict(cluster_info)

def _generate_machine_id(self):
cur_machine_id = self._num_machines
self._num_machines += 1
Expand All @@ -556,7 +728,7 @@ def get_beta(self, source_device_id, target_device_id):
bandwidth = None
# None means the source and target are not connected directly, set NIC in default
if link is None:
bandwidth = Link.default_nic_bandwith
bandwidth = Link.default_nic_bandwidth
else:
bandwidth = link.bandwidth

Expand Down Expand Up @@ -608,6 +780,15 @@ def get_involved_machine_count(self, device_ids):
assert count > 0
return count

def get_num_machines(self):
return len(self._machines)

def get_num_devices_per_machine(self):
# Only return the number of accelerators of each machine.
# All machines must has the same number of devices and same type of devices.
assert self._num_devices_per_machine
return self._num_devices_per_machine

def __str__(self):
str = ""
for machine in self.machines.values():
Expand All @@ -616,3 +797,29 @@ def __str__(self):

def __repr__(self):
return self.__str__()


def get_default_cluster():
cluster = Cluster()
local_device_count = os.getenv("PADDLE_LOCAL_SIZE")
if local_device_count is None:
local_device_count = 1
else:
local_device_count = int(local_device_count)
global_device_count = os.getenv("PADDLE_GLOBAL_SIZE")
if global_device_count is None:
node_count = 1
else:
global_device_count = int(global_device_count)
assert global_device_count % local_device_count == 0
node_count = int(global_device_count) // local_device_count
print("Node Count: ",
node_count,
"Local Device Size: ",
local_device_count,
"World size: ",
paddle.distributed.get_world_size(),
flush=True)
cluster.gen_default_config_cluster(node_count=node_count,
device_count=local_device_count)
return cluster
16 changes: 16 additions & 0 deletions python/paddle/fluid/tests/unittests/auto_parallel/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import paddle
from paddle.distributed.auto_parallel.cluster import Cluster
from paddle.distributed.auto_parallel.cluster import get_default_cluster

cluster_json = """
{
Expand Down Expand Up @@ -1997,6 +1998,10 @@ def test_single_machine(self):
self.assertTrue(devices == [0, 1, 2, 3])
self.assertTrue(involved_machine_count == 1)

# Remove unnecessary files
if os.path.exists(cluster_json_path):
os.remove(cluster_json_path)

def test_multi_machine(self):
# Build cluster
cluster_json_path = os.path.join(self.temp_dir.name,
Expand All @@ -2022,6 +2027,17 @@ def test_multi_machine(self):
if os.path.exists(cluster_json_path):
os.remove(cluster_json_path)

def test_default_config_cluster(self):
cluster = Cluster()
cluster.gen_default_config_cluster(device_count=8)
# check machines and devices
self.assertTrue(cluster.get_num_machines() == 1)
self.assertTrue(cluster.get_num_devices_per_machine() == 8)

def test_default_cluster(self):
cluster = get_default_cluster()
self.assertTrue(isinstance(cluster, Cluster))


if __name__ == "__main__":
unittest.main()