diff --git a/python/dgl/graphbolt/impl/ondisk_dataset.py b/python/dgl/graphbolt/impl/ondisk_dataset.py index 64123433123b..9354974ec303 100644 --- a/python/dgl/graphbolt/impl/ondisk_dataset.py +++ b/python/dgl/graphbolt/impl/ondisk_dataset.py @@ -5,10 +5,12 @@ import os import shutil import textwrap +import time from copy import deepcopy from typing import Dict, List, Union import numpy as np +import pandas as pd import torch import yaml @@ -83,19 +85,237 @@ def _graph_data_to_fused_csc_sampling_graph( and "type" not in graph_data["nodes"][0] and "type" not in graph_data["edges"][0] ) - if is_homogeneous: # Homogeneous graph. + print("!homogeneous graph") + time1 = time.time() edge_fmt = graph_data["edges"][0]["format"] edge_path = graph_data["edges"][0]["path"] src, dst = read_edges(dataset_dir, edge_fmt, edge_path) num_nodes = graph_data["nodes"][0]["num"] num_edges = len(src) + ################################################## + # This part can we directly convert coo to csc + # We just have src & dst, we do not have data + # So for csc we just have indice & indptr + # we need to sort coo by its column (probably the dst) + # 1. Here we need the format of coo_tensor, a sample format + # 2. Find a proper sort function + # 3. consider something like bucket sort + # src = [0,0,1,4,4,4,6] + # dst = [0,2,2,2,3,4,3] + # num_nodes = 7 coo_tensor = torch.tensor(np.array([src, dst])) + # print("coo_tensor: ", coo_tensor) sparse_matrix = spmatrix(coo_tensor, shape=(num_nodes, num_nodes)) del coo_tensor indptr, indices, edge_ids = sparse_matrix.csc() del sparse_matrix + time2 = time.time() + print("total time: ", time2 - time1) + # A sequential version of the algorithm + indptr_re = [0] + indice_re = [] + edge_ids_re = [] + # In outer sort mode + # 1. first read in part of the csv and sort it + # 2. store the ordered result in the format (dst, src, original_idx) in temp csv + # 3. we need to add an additional label for the original edge + def sort_and_save_chunk(df, chunk_id, temp_dir): + # sort the value in every csv + df_sorted = df.sort_values(by=df.columns[1]) + # columns = df_sorted.columns.tolist() + # 交换第一列和第二列的位置 + # columns[0], columns[1] = columns[1], columns[0] + # 使用新的列顺序重新排列DataFrame + # df_sorted = df_sorted[columns] + # save the sorted csv + sorted_chunk_path = os.path.join( + temp_dir, f'sorted_chunk_{chunk_id}.npy' + ) + np_array = df_sorted.to_numpy() + # print("np_array: ", np_array) + np.save(sorted_chunk_path, np_array) + return sorted_chunk_path + + temp_dir = "temp_chunks" + os.makedirs(temp_dir, exist_ok=True) + chunk_id = 0 + paths = [] + # The unit for chunk_size is row number + chunk_size = 1000 * 1000 * 20 + csv_path = "datasets/ogbn-products/edges/bi_edge.csv" + # t1 = time.time() + # pd.read_csv(csv_path, names=["src", "dst"]) + # t2 = time.time() + # print("time for reading csv", t2-t1) + time2 = time.time() + # maybe can use some multi thread trick here, but when we use multi thread, do we reduce the memory consumption? + for chunk in pd.read_csv(csv_path, chunksize=chunk_size, names=["src", "dst"]): + t1 = time.time() + sorted_chunk_path = sort_and_save_chunk(chunk, chunk_id, temp_dir) + t2 = time.time() + print("time for sort_and_save_chunk: ", t2-t1) + paths.append(sorted_chunk_path) + chunk_id += 1 + time3 = time.time() + print("after split: ", time3 - time2) + # currently this part is accelerated + ################################################## + # 3. merge the result in the temp csv + # 3.1 read the result in temp csv with index + # total_row = 0 + # for path in paths: + # sorted_chunk = pd.read_csv(path, names=["edge_id", "src", "dst"]) + # total_row += len(sorted_chunk) + + # 4. the final result is the csv we need + # 4.1 here is a lemma: if we read n lines from each chunk + # we can ensure that the top n element in the merged list is the smallest n + # elements in the final result. + import heapq + # define the size of sorted result chunk size + result_chunk_size = 10000 * 1000 + + # a list for storing current result in memory + current_result_chunk = [] + arrays_iters = [] + heap = [] + np_array = [] + total_sz = 0 + # the file path for storing the result merge file + output_dir_path = temp_dir + "/" + + # 现在在归并排序过程中添加逻辑,以写入结果 + for file_idx, file_path in enumerate(paths): + # 使用内存映射方式打开.npy file + # arr = np.lib.format.open_memmap(file_path, mode='r') + # 这里我们按照文件大小创建迭代器,每次返回一小部分数组 + # array_iter = iter(np.array_split(arr, np.arange(chunk_size, arr.shape[0], chunk_size))) + # array_iter = iter(arr) + # array_iter = iter(np.load(file_path)) + # arrays_iters.append(array_iter) + temp = np.load(file_path) + np_array.append(temp) + total_sz += temp.shape[0] + # merged = heapq.merge(*arrays_iters, key=lambda x: x[-1]) + pq = [] + pointer = [] + xx = [] + # print(np_array) + for i in range(len(np_array)): + pointer.append(1) + # numpy cannot directly push into pq, will case comparison error + heapq.heappush(pq, (np_array[i][0][1], np_array[i][0].tolist(), i)) + cnt = 0 + chunk_idx = 0 + while(cnt < total_sz): + _, next_item, idx = heapq.heappop(pq) + current_result_chunk.append(next_item) + cnt += 1 + if pointer[idx] < np_array[idx].shape[0]: + heapq.heappush(pq, (np_array[idx][pointer[idx]][1], np_array[idx][pointer[idx]].tolist(), idx)) + + pointer[idx] += 1 + if len(current_result_chunk) == result_chunk_size: + with open(output_dir_path + str(chunk_idx) + "sorted.npy", 'wb') as f: + np.save(f, np.array(current_result_chunk)) + chunk_idx += 1 + current_result_chunk = [] + # for row in merged: + # current_result_chunk.append(row) + + # # 检查当前结果块是否达到指定大小 + # if len(current_result_chunk) == result_chunk_size: + # # 将当前结果块写入文件 + # with open(output_file_path, 'ab') as f: + # np.save(f, np.array(current_result_chunk)) + # # 清空当前结果块,以便于存储新的结果 + # current_result_chunk = [] + + # # 尝试从相同文件的迭代器中获取下一个元素 + # try: + # next_item = next(arrays_iters[file_idx]) + # except StopIteration: + # continue + + # make sure the rest of the result is also stored in a .npy file + if current_result_chunk: + with open(output_dir_path + str(chunk_idx) + "sorted.npy", 'wb') as f: + np.save(f, np.array(current_result_chunk)) + chunk_idx += 1 + print("cnt: ", cnt) + print("chunk_idx", chunk_idx) + # data = np.load(output_file_path) + # print(data, data.shape) + print("merge finish!") + # 5. now we change the format of coo to a csc changeable one + # Then we perform the loop before to construct a csc graph + last_col = -1 + i = 0 + time4 = time.time() + print("before convert to csc: ", time4 - time2) + + # read in every npy file + for i in range(chunk_idx): + chunk = np.load(output_dir_path + str(i) + "sorted.npy") + print("chunk: ", chunk) + # lopp over every row in the npy file + for xx in chunk: + # deal with every line + # here we need to consider the edge id, which we do not consider currently + # edge_id = xx[0] + row = xx[0] + col = xx[1] + if i >= 1 and col > last_col: + for j in range(last_col + 1, col + 1): + indptr_re.append(i) + indice_re.append(row) + # edge_ids_re.append(edge_id) + i += 1 + last_col = col + # 6. remove the temp file + for path in paths: + os.remove(path) + # os.rmdir(temp_dir) + return + # src_npy = np.array(src) + # dst_npy = np.array(dst) + # new_indices = np.argsort(dst_npy) + # sorted_src = src_npy[new_indices] + # sorted_dst = dst_npy[new_indices] + # for i in range(len(sorted_dst)): + # if i >= 1 and sorted_dst[i] > sorted_dst[i-1]: + # for j in range(sorted_dst[i-1] + 1, sorted_dst[i] + 1): + # indptr_re.append(i) + # indice_re.append(sorted_src[i]) + # edge_ids_re.append(new_indices[i]) + # the upper bound + while (len(indptr_re) <= num_nodes): + indptr_re.append(len(dst)) + + indptr_re = torch.tensor(indptr_re) + indice_re = torch.tensor(indice_re) + edge_ids_re = torch.tensor(edge_ids_re) + time5 = time.time() + print("total time 2: ", time5 - time2) + print("indptr: ", indptr, indptr.size()) + print("indptr_re: ", indptr_re, indptr_re.size()) + print("indices: ", indices, indices.size()) + print("indice_re: ", indice_re, indice_re.size()) + print("edge id: ", edge_ids) + print("edge_ids_re", edge_ids_re) + + # 6.0 verification + # a = [] + # b = [] + # for i in range(174): + # a.append([indices[i], edge_ids[i]]) + # b.append([indice_re[i], edge_ids_re[i]]) + # a.sort() + # b.sort() + # print("a:", a[:10]) + # print("b:", b[:10]) if auto_cast_to_optimal_dtype: if num_nodes <= torch.iinfo(torch.int32).max: