From 320e2aaff70c2d2226959d9e844b84376ba37d01 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 22 Mar 2024 15:52:17 +0000 Subject: [PATCH 1/3] 1 --- python/dgl/graphbolt/impl/ondisk_dataset.py | 225 +++++++++++++++++++- 1 file changed, 224 insertions(+), 1 deletion(-) diff --git a/python/dgl/graphbolt/impl/ondisk_dataset.py b/python/dgl/graphbolt/impl/ondisk_dataset.py index 64123433123b..81e475918a19 100644 --- a/python/dgl/graphbolt/impl/ondisk_dataset.py +++ b/python/dgl/graphbolt/impl/ondisk_dataset.py @@ -3,10 +3,12 @@ import bisect import json import os +import pandas as pd import shutil import textwrap from copy import deepcopy from typing import Dict, List, Union +import time import numpy as np @@ -83,19 +85,240 @@ def _graph_data_to_fused_csc_sampling_graph( and "type" not in graph_data["nodes"][0] and "type" not in graph_data["edges"][0] ) - if is_homogeneous: # Homogeneous graph. + print("!homogeneous graph") + time1 = time.time() edge_fmt = graph_data["edges"][0]["format"] edge_path = graph_data["edges"][0]["path"] src, dst = read_edges(dataset_dir, edge_fmt, edge_path) num_nodes = graph_data["nodes"][0]["num"] num_edges = len(src) + ################################################## + # This part can we directly convert coo to csc + # We just have src & dst, we do not have data + # So for csc we just have indice & indptr + # we need to sort coo by its column (probably the dst) + # 1. Here we need the format of coo_tensor, a sample format + # 2. Find a proper sort function + # 3. consider something like bucket sort + # src = [0,0,1,4,4,4,6] + # dst = [0,2,2,2,3,4,3] + # num_nodes = 7 coo_tensor = torch.tensor(np.array([src, dst])) + # print("coo_tensor: ", coo_tensor) sparse_matrix = spmatrix(coo_tensor, shape=(num_nodes, num_nodes)) del coo_tensor indptr, indices, edge_ids = sparse_matrix.csc() del sparse_matrix + time2 = time.time() + print("total time: ", time2 - time1) + # A sequential version of the algorithm + indptr_re = [0] + indice_re = [] + edge_ids_re = [] + # In outer sort mode + # 1. first read in part of the csv and sort it + # 2. store the ordered result in the format (dst, src, original_idx) in temp csv + # 3. we need to add an additional label for the original edge + def sort_and_save_chunk(df, chunk_id, temp_dir): + # sort the value in every csv + df_sorted = df.sort_values(by=df.columns[1]) + # columns = df_sorted.columns.tolist() + # 交换第一列和第二列的位置 + # columns[0], columns[1] = columns[1], columns[0] + # 使用新的列顺序重新排列DataFrame + # df_sorted = df_sorted[columns] + # save the sorted csv + sorted_chunk_path = os.path.join(temp_dir, f'sorted_chunk_{chunk_id}.npy') + np_array = df_sorted.to_numpy() + # print("np_array: ", np_array) + np.save(sorted_chunk_path, np_array) + return sorted_chunk_path + + temp_dir = 'temp_chunks' + os.makedirs(temp_dir, exist_ok=True) + chunk_id = 0 + paths = [] + # The unit for chunk_size is row number + chunk_size = 1000 * 1000 * 20 + csv_path = "datasets/ogbn-products/edges/bi_edge.csv" + # t1 = time.time() + # pd.read_csv(csv_path, names=["src", "dst"]) + # t2 = time.time() + # print("time for reading csv", t2-t1) + time2 = time.time() + # maybe can use some multi thread trick here, but when we use multi thread, do we reduce the memory consumption? + for chunk in pd.read_csv(csv_path, chunksize=chunk_size, names=["src", "dst"]): + t1 = time.time() + sorted_chunk_path = sort_and_save_chunk(chunk, chunk_id, temp_dir) + t2 = time.time() + print("time for sort_and_save_chunk: ", t2-t1) + paths.append(sorted_chunk_path) + chunk_id += 1 + time3 = time.time() + print("after split: ", time3 - time2) + # currently this part is accelerated + ################################################## + # 3. merge the result in the temp csv + # 3.1 read the result in temp csv with index + # total_row = 0 + # for path in paths: + # sorted_chunk = pd.read_csv(path, names=["edge_id", "src", "dst"]) + # total_row += len(sorted_chunk) + + # 4. the final result is the csv we need + # 4.1 here is a lemma: if we read n lines from each chunk + # we can ensure that the top n element in the merged list is the smallest n + # elements in the final result. + import heapq + # 定义结果块的大小 + result_chunk_size = 10000 * 1000 # 举例,可以根据需要进行调整 + + # 初始化一个列表来存储当前的结果块 + current_result_chunk = [] + arrays_iters = [] + heap = [] + np_array = [] + total_sz = 0 + # 定义保存结果的文件路径 + output_dir_path = temp_dir + "/" + + # 现在在归并排序过程中添加逻辑,以写入结果 + for file_idx, file_path in enumerate(paths): + # 使用内存映射方式打开.npy文件 + # arr = np.lib.format.open_memmap(file_path, mode='r') + # 这里我们按照文件大小创建迭代器,每次返回一小部分数组 + # array_iter = iter(np.array_split(arr, np.arange(chunk_size, arr.shape[0], chunk_size))) + # array_iter = iter(arr) + # array_iter = iter(np.load(file_path)) + # arrays_iters.append(array_iter) + temp = np.load(file_path) + np_array.append(temp) + total_sz += temp.shape[0] + # merged = heapq.merge(*arrays_iters, key=lambda x: x[-1]) + pq = [] + pointer = [] + xx = [] + # print(np_array) + for i in range(len(np_array)): + pointer.append(1) + # numpy cannot directly push into pq, will case comparison error + heapq.heappush(pq, (np_array[i][0][1], np_array[i][0].tolist(), i)) + cnt = 0 + chunk_idx = 0 + while(cnt < total_sz): + _, next_item, idx = heapq.heappop(pq) + current_result_chunk.append(next_item) + cnt += 1 + if pointer[idx] < np_array[idx].shape[0]: + # try: + heapq.heappush(pq, (np_array[idx][pointer[idx]][1], np_array[idx][pointer[idx]].tolist(), idx)) + # except Exception as e: + # print("pointer idx", pointer[idx]) + + pointer[idx] += 1 + if len(current_result_chunk) == result_chunk_size: + with open(output_dir_path + str(chunk_idx) + "sorted.npy", 'wb') as f: + np.save(f, np.array(current_result_chunk)) + chunk_idx += 1 + current_result_chunk = [] + # for row in merged: + # current_result_chunk.append(row) + + # # 检查当前结果块是否达到指定大小 + # if len(current_result_chunk) == result_chunk_size: + # # 将当前结果块写入文件 + # with open(output_file_path, 'ab') as f: + # np.save(f, np.array(current_result_chunk)) + # # 清空当前结果块,以便于存储新的结果 + # current_result_chunk = [] + + # # 尝试从相同文件的迭代器中获取下一个元素 + # try: + # next_item = next(arrays_iters[file_idx]) + # except StopIteration: + # continue + + # 确保所有剩余数据都写入文件 + if current_result_chunk: + with open(output_dir_path + str(chunk_idx) + "sorted.npy", 'wb') as f: + np.save(f, np.array(current_result_chunk)) + chunk_idx += 1 + print("cnt: ", cnt) + print("chunk_idx", chunk_idx) + # data = np.load(output_file_path) + # print(data, data.shape) + print("merge finish!") + # 5. now we change the format of coo to a csc changeable one + # Then we perform the loop before to construct a csc graph + last_col = -1 + i = 0 + time4 = time.time() + print("before convert to csc: ", time4 - time2) + + # 按块读取和迭代数据 + for i in range(chunk_idx): + # 获取当前块的数据 + chunk = np.load(output_dir_path + str(i) + "sorted.npy") + print("chunk: ", chunk) + # 迭代当前块中的每一行 + for xx in chunk: + # 在这里处理每一行,xx是当前行的数据 + # edge_id = xx[0] + # row = xx[1] + # col = xx[2] + row = xx[0] + col = xx[1] + if i >= 1 and col > last_col: + for j in range(last_col + 1, col + 1): + indptr_re.append(i) + indice_re.append(row) + # edge_ids_re.append(edge_id) + i += 1 + last_col = col + # 6. remove the temp file + for path in paths: + os.remove(path) + # os.rmdir(temp_dir) + return + # src_npy = np.array(src) + # dst_npy = np.array(dst) + # new_indices = np.argsort(dst_npy) + # sorted_src = src_npy[new_indices] + # sorted_dst = dst_npy[new_indices] + # for i in range(len(sorted_dst)): + # if i >= 1 and sorted_dst[i] > sorted_dst[i-1]: + # for j in range(sorted_dst[i-1] + 1, sorted_dst[i] + 1): + # indptr_re.append(i) + # indice_re.append(sorted_src[i]) + # edge_ids_re.append(new_indices[i]) + # the upper bound + while(len(indptr_re) <= num_nodes): + indptr_re.append(len(dst)) + + indptr_re = torch.tensor(indptr_re) + indice_re = torch.tensor(indice_re) + edge_ids_re = torch.tensor(edge_ids_re) + time5 = time.time() + print("total time 2: ", time5 - time2) + print("indptr: ", indptr, indptr.size()) + print("indptr_re: ", indptr_re, indptr_re.size()) + print("indices: ", indices, indices.size()) + print("indice_re: ", indice_re, indice_re.size()) + print("edge id: ", edge_ids) + print("edge_ids_re", edge_ids_re) + + # 6.0 verification + # a = [] + # b = [] + # for i in range(174): + # a.append([indices[i], edge_ids[i]]) + # b.append([indice_re[i], edge_ids_re[i]]) + # a.sort() + # b.sort() + # print("a:", a[:10]) + # print("b:", b[:10]) if auto_cast_to_optimal_dtype: if num_nodes <= torch.iinfo(torch.int32).max: From 0d89788025f5398ce108eee1d06dd78ea83114ea Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 22 Mar 2024 16:07:56 +0000 Subject: [PATCH 2/3] 1 --- python/dgl/graphbolt/impl/ondisk_dataset.py | 30 +++++++++------------ 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/python/dgl/graphbolt/impl/ondisk_dataset.py b/python/dgl/graphbolt/impl/ondisk_dataset.py index 81e475918a19..032de84acbee 100644 --- a/python/dgl/graphbolt/impl/ondisk_dataset.py +++ b/python/dgl/graphbolt/impl/ondisk_dataset.py @@ -3,14 +3,15 @@ import bisect import json import os -import pandas as pd import shutil import textwrap +import time from copy import deepcopy from typing import Dict, List, Union -import time + import numpy as np +import pandas as pd import torch import yaml @@ -172,21 +173,21 @@ def sort_and_save_chunk(df, chunk_id, temp_dir): # we can ensure that the top n element in the merged list is the smallest n # elements in the final result. import heapq - # 定义结果块的大小 - result_chunk_size = 10000 * 1000 # 举例,可以根据需要进行调整 + # define the size of sorted result chunk size + result_chunk_size = 10000 * 1000 - # 初始化一个列表来存储当前的结果块 + # a list for storing current result in memory current_result_chunk = [] arrays_iters = [] heap = [] np_array = [] total_sz = 0 - # 定义保存结果的文件路径 + # the file path for storing the result merge file output_dir_path = temp_dir + "/" # 现在在归并排序过程中添加逻辑,以写入结果 for file_idx, file_path in enumerate(paths): - # 使用内存映射方式打开.npy文件 + # 使用内存映射方式打开.npy file # arr = np.lib.format.open_memmap(file_path, mode='r') # 这里我们按照文件大小创建迭代器,每次返回一小部分数组 # array_iter = iter(np.array_split(arr, np.arange(chunk_size, arr.shape[0], chunk_size))) @@ -212,10 +213,7 @@ def sort_and_save_chunk(df, chunk_id, temp_dir): current_result_chunk.append(next_item) cnt += 1 if pointer[idx] < np_array[idx].shape[0]: - # try: heapq.heappush(pq, (np_array[idx][pointer[idx]][1], np_array[idx][pointer[idx]].tolist(), idx)) - # except Exception as e: - # print("pointer idx", pointer[idx]) pointer[idx] += 1 if len(current_result_chunk) == result_chunk_size: @@ -240,7 +238,7 @@ def sort_and_save_chunk(df, chunk_id, temp_dir): # except StopIteration: # continue - # 确保所有剩余数据都写入文件 + # make sure the rest of the result is also stored in a .npy file if current_result_chunk: with open(output_dir_path + str(chunk_idx) + "sorted.npy", 'wb') as f: np.save(f, np.array(current_result_chunk)) @@ -257,17 +255,15 @@ def sort_and_save_chunk(df, chunk_id, temp_dir): time4 = time.time() print("before convert to csc: ", time4 - time2) - # 按块读取和迭代数据 + # read in every npy file for i in range(chunk_idx): - # 获取当前块的数据 chunk = np.load(output_dir_path + str(i) + "sorted.npy") print("chunk: ", chunk) - # 迭代当前块中的每一行 + # lopp over every row in the npy file for xx in chunk: - # 在这里处理每一行,xx是当前行的数据 + # deal with every line + # here we need to consider the edge id, which we do not consider currently # edge_id = xx[0] - # row = xx[1] - # col = xx[2] row = xx[0] col = xx[1] if i >= 1 and col > last_col: From bd740ed93181c1fae53f501b26b8d9c603d381eb Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 22 Mar 2024 16:11:49 +0000 Subject: [PATCH 3/3] 1 --- python/dgl/graphbolt/impl/ondisk_dataset.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/python/dgl/graphbolt/impl/ondisk_dataset.py b/python/dgl/graphbolt/impl/ondisk_dataset.py index 032de84acbee..9354974ec303 100644 --- a/python/dgl/graphbolt/impl/ondisk_dataset.py +++ b/python/dgl/graphbolt/impl/ondisk_dataset.py @@ -9,7 +9,6 @@ from copy import deepcopy from typing import Dict, List, Union - import numpy as np import pandas as pd @@ -119,7 +118,7 @@ def _graph_data_to_fused_csc_sampling_graph( indice_re = [] edge_ids_re = [] # In outer sort mode - # 1. first read in part of the csv and sort it + # 1. first read in part of the csv and sort it # 2. store the ordered result in the format (dst, src, original_idx) in temp csv # 3. we need to add an additional label for the original edge def sort_and_save_chunk(df, chunk_id, temp_dir): @@ -131,13 +130,15 @@ def sort_and_save_chunk(df, chunk_id, temp_dir): # 使用新的列顺序重新排列DataFrame # df_sorted = df_sorted[columns] # save the sorted csv - sorted_chunk_path = os.path.join(temp_dir, f'sorted_chunk_{chunk_id}.npy') + sorted_chunk_path = os.path.join( + temp_dir, f'sorted_chunk_{chunk_id}.npy' + ) np_array = df_sorted.to_numpy() # print("np_array: ", np_array) np.save(sorted_chunk_path, np_array) return sorted_chunk_path - temp_dir = 'temp_chunks' + temp_dir = "temp_chunks" os.makedirs(temp_dir, exist_ok=True) chunk_id = 0 paths = [] @@ -290,7 +291,7 @@ def sort_and_save_chunk(df, chunk_id, temp_dir): # indice_re.append(sorted_src[i]) # edge_ids_re.append(new_indices[i]) # the upper bound - while(len(indptr_re) <= num_nodes): + while (len(indptr_re) <= num_nodes): indptr_re.append(len(dst)) indptr_re = torch.tensor(indptr_re)