Skip to content

Commit

Permalink
Merge branch 'dmlc:master' into add-igb-to-sage
Browse files Browse the repository at this point in the history
  • Loading branch information
BowenYao18 committed Aug 29, 2024
2 parents 0d94836 + 03e83ac commit 0861d8a
Show file tree
Hide file tree
Showing 20 changed files with 854 additions and 786 deletions.
2 changes: 0 additions & 2 deletions docs/source/api/python/dgl.dataloading.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ DataLoaders

DataLoader
GraphDataLoader
DistNodeDataLoader
DistEdgeDataLoader

.. _api-dataloading-neighbor-sampling:

Expand Down
4 changes: 4 additions & 0 deletions docs/source/api/python/dgl.distributed.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ Distributed DataLoader

.. autoclass:: DistDataLoader

.. autoclass:: DistNodeDataLoader

.. autoclass:: DistEdgeDataLoader

.. _api-distributed-sampling-ops:
Distributed Graph Sampling Operators
```````````````````````````````````````
Expand Down
6 changes: 3 additions & 3 deletions docs/source/guide/distributed-apis.rst
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,14 @@ difference is that users need to use :func:`dgl.distributed.sample_neighbors` an
The high-level sampling APIs (:class:`~dgl.dataloading.NodeDataLoader` and
:class:`~dgl.dataloading.EdgeDataLoader` ) has distributed counterparts
(:class:`~dgl.dataloading.DistNodeDataLoader` and
:class:`~dgl.dataloading.DistEdgeDataLoader`). The code is exactly the same as
(:class:`~dgl.distributed.DistNodeDataLoader` and
:class:`~dgl.distributed.DistEdgeDataLoader`). The code is exactly the same as
single-process sampling otherwise.

.. code:: python
sampler = dgl.sampling.MultiLayerNeighborSampler([10, 25])
dataloader = dgl.sampling.DistNodeDataLoader(g, train_nid, sampler,
dataloader = dgl.distributed.DistNodeDataLoader(g, train_nid, sampler,
batch_size=batch_size, shuffle=True)
for batch in dataloader:
...
Expand Down
4 changes: 2 additions & 2 deletions examples/distributed/graphsage/node_classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def inference(self, g, x, batch_size, device):
# `-1` indicates all inbound edges will be inlcuded, namely, full
# neighbor sampling.
sampler = dgl.dataloading.NeighborSampler([-1])
dataloader = dgl.dataloading.DistNodeDataLoader(
dataloader = dgl.distributed.DistNodeDataLoader(
g,
nodes,
sampler,
Expand Down Expand Up @@ -212,7 +212,7 @@ def run(args, device, data):
sampler = dgl.dataloading.NeighborSampler(
[int(fanout) for fanout in args.fan_out.split(",")]
)
dataloader = dgl.dataloading.DistNodeDataLoader(
dataloader = dgl.distributed.DistNodeDataLoader(
g,
train_nid,
sampler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def inference(self, g, x, batch_size, device):
# Create sampler
sampler = dgl.dataloading.NeighborSampler([-1])
# Create dataloader
dataloader = dgl.dataloading.DistNodeDataLoader(
dataloader = dgl.distributed.DistNodeDataLoader(
g,
nodes,
sampler,
Expand Down Expand Up @@ -203,7 +203,7 @@ def run(args, device, data):
# Create dataloader
exclude = "reverse_id" if args.remove_edge else None
reverse_eids = th.arange(g.num_edges()) if args.remove_edge else None
dataloader = dgl.dataloading.DistEdgeDataLoader(
dataloader = dgl.distributed.DistEdgeDataLoader(
g,
train_eids,
sampler,
Expand Down
7 changes: 4 additions & 3 deletions examples/distributed/rgcn/node_classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* l2norm applied to all weights
* remove nodes that won't be touched
"""

import argparse
import gc, os
import itertools
Expand Down Expand Up @@ -459,7 +460,7 @@ def run(args, device, data):
val_fanouts = [int(fanout) for fanout in args.validation_fanout.split(",")]

sampler = dgl.dataloading.MultiLayerNeighborSampler(fanouts)
dataloader = dgl.dataloading.DistNodeDataLoader(
dataloader = dgl.distributed.DistNodeDataLoader(
g,
{"paper": train_nid},
sampler,
Expand All @@ -469,7 +470,7 @@ def run(args, device, data):
)

valid_sampler = dgl.dataloading.MultiLayerNeighborSampler(val_fanouts)
valid_dataloader = dgl.dataloading.DistNodeDataLoader(
valid_dataloader = dgl.distributed.DistNodeDataLoader(
g,
{"paper": val_nid},
valid_sampler,
Expand All @@ -479,7 +480,7 @@ def run(args, device, data):
)

test_sampler = dgl.dataloading.MultiLayerNeighborSampler(val_fanouts)
test_dataloader = dgl.dataloading.DistNodeDataLoader(
test_dataloader = dgl.distributed.DistNodeDataLoader(
g,
{"paper": test_nid},
test_sampler,
Expand Down
4 changes: 2 additions & 2 deletions examples/pytorch/graphsage/dist/train_dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def inference(self, g, x, batch_size, device):
print(f"|V|={g.num_nodes()}, eval batch size: {batch_size}")

sampler = dgl.dataloading.NeighborSampler([-1])
dataloader = dgl.dataloading.DistNodeDataLoader(
dataloader = dgl.distributed.DistNodeDataLoader(
g,
nodes,
sampler,
Expand Down Expand Up @@ -153,7 +153,7 @@ def run(args, device, data):
sampler = dgl.dataloading.NeighborSampler(
[int(fanout) for fanout in args.fan_out.split(",")]
)
dataloader = dgl.dataloading.DistNodeDataLoader(
dataloader = dgl.distributed.DistNodeDataLoader(
g,
train_nid,
sampler,
Expand Down
36 changes: 15 additions & 21 deletions examples/pytorch/graphsage/dist/train_dist_transductive.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import argparse
import time

import dgl

import numpy as np
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import dgl
from dgl.distributed import DistEmbedding
from train_dist import DistSAGE, compute_acc
from train_dist import compute_acc, DistSAGE


def initializer(shape, dtype):
Expand All @@ -18,9 +19,7 @@ def initializer(shape, dtype):


class DistEmb(nn.Module):
def __init__(
self, num_nodes, emb_size, dgl_sparse_emb=False, dev_id="cpu"
):
def __init__(self, num_nodes, emb_size, dgl_sparse_emb=False, dev_id="cpu"):
super().__init__()
self.dev_id = dev_id
self.emb_size = emb_size
Expand Down Expand Up @@ -49,9 +48,11 @@ def load_embs(standalone, emb_layer, g):
x = dgl.distributed.DistTensor(
(
g.num_nodes(),
emb_layer.module.emb_size
if isinstance(emb_layer, th.nn.parallel.DistributedDataParallel)
else emb_layer.emb_size,
(
emb_layer.module.emb_size
if isinstance(emb_layer, th.nn.parallel.DistributedDataParallel)
else emb_layer.emb_size
),
),
th.float32,
"eval_embs",
Expand All @@ -60,7 +61,7 @@ def load_embs(standalone, emb_layer, g):
num_nodes = nodes.shape[0]
for i in range((num_nodes + 1023) // 1024):
idx = nodes[
i * 1024: (i + 1) * 1024
i * 1024 : (i + 1) * 1024
if (i + 1) * 1024 < num_nodes
else num_nodes
]
Expand Down Expand Up @@ -113,7 +114,7 @@ def run(args, device, data):
sampler = dgl.dataloading.NeighborSampler(
[int(fanout) for fanout in args.fan_out.split(",")]
)
dataloader = dgl.dataloading.DistNodeDataLoader(
dataloader = dgl.distributed.DistNodeDataLoader(
g,
train_nid,
sampler,
Expand Down Expand Up @@ -164,10 +165,7 @@ def run(args, device, data):
emb_optimizer = th.optim.SparseAdam(
list(emb_layer.module.sparse_emb.parameters()), lr=args.sparse_lr
)
print(
"optimize Pytorch sparse embedding:",
emb_layer.module.sparse_emb
)
print("optimize Pytorch sparse embedding:", emb_layer.module.sparse_emb)

# Training loop
iter_tput = []
Expand Down Expand Up @@ -231,7 +229,7 @@ def run(args, device, data):
acc.item(),
np.mean(iter_tput[3:]),
gpu_mem_alloc,
np.sum(step_time[-args.log_every:]),
np.sum(step_time[-args.log_every :]),
)
)
start = time.time()
Expand Down Expand Up @@ -267,8 +265,7 @@ def run(args, device, data):
device,
)
print(
"Part {}, Val Acc {:.4f}, Test Acc {:.4f}, time: {:.4f}".format
(
"Part {}, Val Acc {:.4f}, Test Acc {:.4f}, time: {:.4f}".format(
g.rank(), val_acc, test_acc, time.time() - start
)
)
Expand All @@ -278,10 +275,7 @@ def main(args):
dgl.distributed.initialize(args.ip_config)
if not args.standalone:
th.distributed.init_process_group(backend="gloo")
g = dgl.distributed.DistGraph(
args.graph_name,
part_config=args.part_config
)
g = dgl.distributed.DistGraph(args.graph_name, part_config=args.part_config)
print("rank:", g.rank())

pb = g.get_partition_book()
Expand Down
28 changes: 14 additions & 14 deletions examples/pytorch/graphsage/dist/train_dist_unsupervised.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
import time
from contextlib import contextmanager

import dgl
import dgl.function as fn
import dgl.nn.pytorch as dglnn

import numpy as np
import sklearn.linear_model as lm
import sklearn.metrics as skm
Expand All @@ -10,9 +14,7 @@
import torch.nn.functional as F
import torch.optim as optim
import tqdm
import dgl
import dgl.function as fn
import dgl.nn.pytorch as dglnn


class DistSAGE(nn.Module):
def __init__(
Expand Down Expand Up @@ -77,7 +79,7 @@ def inference(self, g, x, batch_size, device):
# Create sampler
sampler = dgl.dataloading.NeighborSampler([-1])
# Create dataloader
dataloader = dgl.dataloading.DistNodeDataLoader(
dataloader = dgl.distributed.DistNodeDataLoader(
g,
nodes,
sampler,
Expand Down Expand Up @@ -201,7 +203,7 @@ def run(args, device, data):
# Create dataloader
exclude = "reverse_id" if args.remove_edge else None
reverse_eids = th.arange(g.num_edges()) if args.remove_edge else None
dataloader = dgl.dataloading.DistEdgeDataLoader(
dataloader = dgl.distributed.DistEdgeDataLoader(
g,
train_eids,
sampler,
Expand Down Expand Up @@ -297,12 +299,12 @@ def run(args, device, data):
step,
loss.item(),
np.mean(iter_tput[3:]),
np.sum(step_time[-args.log_every:]),
np.sum(sample_t[-args.log_every:]),
np.sum(feat_copy_t[-args.log_every:]),
np.sum(forward_t[-args.log_every:]),
np.sum(backward_t[-args.log_every:]),
np.sum(update_t[-args.log_every:]),
np.sum(step_time[-args.log_every :]),
np.sum(sample_t[-args.log_every :]),
np.sum(feat_copy_t[-args.log_every :]),
np.sum(forward_t[-args.log_every :]),
np.sum(backward_t[-args.log_every :]),
np.sum(update_t[-args.log_every :]),
)
)
start = time.time()
Expand Down Expand Up @@ -356,9 +358,7 @@ def main(args):
dgl.distributed.initialize(args.ip_config)
if not args.standalone:
th.distributed.init_process_group(backend="gloo")
g = dgl.distributed.DistGraph(
args.graph_name, part_config=args.part_config
)
g = dgl.distributed.DistGraph(args.graph_name, part_config=args.part_config)
print("rank:", g.rank())
print("number of edges", g.num_edges())

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import argparse
import time

import dgl

import numpy as np
import torch as th
import torch.nn.functional as F
import torch.optim as optim
import dgl
from train_dist_transductive import DistEmb, load_embs
from train_dist_unsupervised import CrossEntropyLoss, DistSAGE, compute_acc
from train_dist_unsupervised import compute_acc, CrossEntropyLoss, DistSAGE


def generate_emb(standalone, model, emb_layer, g, batch_size, device):
Expand Down Expand Up @@ -49,7 +50,7 @@ def run(args, device, data):
# Create dataloader
exclude = "reverse_id" if args.remove_edge else None
reverse_eids = th.arange(g.num_edges()) if args.remove_edge else None
dataloader = dgl.dataloading.DistEdgeDataLoader(
dataloader = dgl.distributed.DistEdgeDataLoader(
g,
train_eids,
sampler,
Expand Down Expand Up @@ -104,9 +105,7 @@ def run(args, device, data):
emb_optimizer = th.optim.SparseAdam(
list(emb_layer.module.sparse_emb.parameters()), lr=args.sparse_lr
)
print(
"optimize Pytorch sparse embedding:", emb_layer.module.sparse_emb
)
print("optimize Pytorch sparse embedding:", emb_layer.module.sparse_emb)

# Training loop
epoch = 0
Expand Down Expand Up @@ -172,12 +171,12 @@ def run(args, device, data):
step,
loss.item(),
np.mean(iter_tput[3:]),
np.sum(step_time[-args.log_every:]),
np.sum(sample_t[-args.log_every:]),
np.sum(feat_copy_t[-args.log_every:]),
np.sum(forward_t[-args.log_every:]),
np.sum(backward_t[-args.log_every:]),
np.sum(update_t[-args.log_every:]),
np.sum(step_time[-args.log_every :]),
np.sum(sample_t[-args.log_every :]),
np.sum(feat_copy_t[-args.log_every :]),
np.sum(forward_t[-args.log_every :]),
np.sum(backward_t[-args.log_every :]),
np.sum(update_t[-args.log_every :]),
)
)

Expand Down Expand Up @@ -228,9 +227,7 @@ def main(args):
dgl.distributed.initialize(args.ip_config)
if not args.standalone:
th.distributed.init_process_group(backend="gloo")
g = dgl.distributed.DistGraph(
args.graph_name, part_config=args.part_config
)
g = dgl.distributed.DistGraph(args.graph_name, part_config=args.part_config)
print("rank:", g.rank())
print("number of edges", g.num_edges())

Expand Down
Loading

0 comments on commit 0861d8a

Please sign in to comment.