Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] Some draft code for dataset preprocess optimization #7236

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 221 additions & 1 deletion python/dgl/graphbolt/impl/ondisk_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import os
import shutil
import textwrap
import time
from copy import deepcopy
from typing import Dict, List, Union

import numpy as np
import pandas as pd

import torch
import yaml
Expand Down Expand Up @@ -83,19 +85,237 @@
and "type" not in graph_data["nodes"][0]
and "type" not in graph_data["edges"][0]
)

if is_homogeneous:
# Homogeneous graph.
print("!homogeneous graph")
time1 = time.time()
edge_fmt = graph_data["edges"][0]["format"]
edge_path = graph_data["edges"][0]["path"]
src, dst = read_edges(dataset_dir, edge_fmt, edge_path)
num_nodes = graph_data["nodes"][0]["num"]
num_edges = len(src)
##################################################
# This part can we directly convert coo to csc
# We just have src & dst, we do not have data
# So for csc we just have indice & indptr
# we need to sort coo by its column (probably the dst)
# 1. Here we need the format of coo_tensor, a sample format
# 2. Find a proper sort function
# 3. consider something like bucket sort
# src = [0,0,1,4,4,4,6]
# dst = [0,2,2,2,3,4,3]
# num_nodes = 7
coo_tensor = torch.tensor(np.array([src, dst]))
# print("coo_tensor: ", coo_tensor)
sparse_matrix = spmatrix(coo_tensor, shape=(num_nodes, num_nodes))
del coo_tensor
indptr, indices, edge_ids = sparse_matrix.csc()
del sparse_matrix
time2 = time.time()
print("total time: ", time2 - time1)
# A sequential version of the algorithm
indptr_re = [0]
indice_re = []
edge_ids_re = []
# In outer sort mode
# 1. first read in part of the csv and sort it
# 2. store the ordered result in the format (dst, src, original_idx) in temp csv
# 3. we need to add an additional label for the original edge
def sort_and_save_chunk(df, chunk_id, temp_dir):
# sort the value in every csv
df_sorted = df.sort_values(by=df.columns[1])
# columns = df_sorted.columns.tolist()
# 交换第一列和第二列的位置
# columns[0], columns[1] = columns[1], columns[0]
# 使用新的列顺序重新排列DataFrame
# df_sorted = df_sorted[columns]
# save the sorted csv
sorted_chunk_path = os.path.join(

Check warning on line 133 in python/dgl/graphbolt/impl/ondisk_dataset.py

View workflow job for this annotation

GitHub Actions / lintrunner

UFMT format

Run `lintrunner -a` to apply this patch.
temp_dir, f'sorted_chunk_{chunk_id}.npy'
)
np_array = df_sorted.to_numpy()
# print("np_array: ", np_array)
np.save(sorted_chunk_path, np_array)
return sorted_chunk_path

temp_dir = "temp_chunks"
os.makedirs(temp_dir, exist_ok=True)
chunk_id = 0
paths = []
# The unit for chunk_size is row number
chunk_size = 1000 * 1000 * 20
csv_path = "datasets/ogbn-products/edges/bi_edge.csv"
# t1 = time.time()
# pd.read_csv(csv_path, names=["src", "dst"])
# t2 = time.time()
# print("time for reading csv", t2-t1)
time2 = time.time()
# maybe can use some multi thread trick here, but when we use multi thread, do we reduce the memory consumption?
for chunk in pd.read_csv(csv_path, chunksize=chunk_size, names=["src", "dst"]):
t1 = time.time()
sorted_chunk_path = sort_and_save_chunk(chunk, chunk_id, temp_dir)
t2 = time.time()
print("time for sort_and_save_chunk: ", t2-t1)
paths.append(sorted_chunk_path)
chunk_id += 1
time3 = time.time()
print("after split: ", time3 - time2)
# currently this part is accelerated
##################################################
# 3. merge the result in the temp csv
# 3.1 read the result in temp csv with index
# total_row = 0
# for path in paths:
# sorted_chunk = pd.read_csv(path, names=["edge_id", "src", "dst"])
# total_row += len(sorted_chunk)

# 4. the final result is the csv we need
# 4.1 here is a lemma: if we read n lines from each chunk
# we can ensure that the top n element in the merged list is the smallest n
# elements in the final result.
import heapq
# define the size of sorted result chunk size
result_chunk_size = 10000 * 1000

# a list for storing current result in memory
current_result_chunk = []
arrays_iters = []
heap = []
np_array = []
total_sz = 0
# the file path for storing the result merge file
output_dir_path = temp_dir + "/"

# 现在在归并排序过程中添加逻辑,以写入结果
for file_idx, file_path in enumerate(paths):
# 使用内存映射方式打开.npy file
# arr = np.lib.format.open_memmap(file_path, mode='r')
# 这里我们按照文件大小创建迭代器,每次返回一小部分数组
# array_iter = iter(np.array_split(arr, np.arange(chunk_size, arr.shape[0], chunk_size)))
# array_iter = iter(arr)
# array_iter = iter(np.load(file_path))
# arrays_iters.append(array_iter)
temp = np.load(file_path)
np_array.append(temp)
total_sz += temp.shape[0]
# merged = heapq.merge(*arrays_iters, key=lambda x: x[-1])
pq = []
pointer = []
xx = []
# print(np_array)
for i in range(len(np_array)):
pointer.append(1)
# numpy cannot directly push into pq, will case comparison error
heapq.heappush(pq, (np_array[i][0][1], np_array[i][0].tolist(), i))
cnt = 0
chunk_idx = 0
while(cnt < total_sz):
_, next_item, idx = heapq.heappop(pq)
current_result_chunk.append(next_item)
cnt += 1
if pointer[idx] < np_array[idx].shape[0]:
heapq.heappush(pq, (np_array[idx][pointer[idx]][1], np_array[idx][pointer[idx]].tolist(), idx))

pointer[idx] += 1
if len(current_result_chunk) == result_chunk_size:
with open(output_dir_path + str(chunk_idx) + "sorted.npy", 'wb') as f:
np.save(f, np.array(current_result_chunk))
chunk_idx += 1
current_result_chunk = []
# for row in merged:
# current_result_chunk.append(row)

# # 检查当前结果块是否达到指定大小
# if len(current_result_chunk) == result_chunk_size:
# # 将当前结果块写入文件
# with open(output_file_path, 'ab') as f:
# np.save(f, np.array(current_result_chunk))
# # 清空当前结果块,以便于存储新的结果
# current_result_chunk = []

# # 尝试从相同文件的迭代器中获取下一个元素
# try:
# next_item = next(arrays_iters[file_idx])
# except StopIteration:
# continue

# make sure the rest of the result is also stored in a .npy file
if current_result_chunk:
with open(output_dir_path + str(chunk_idx) + "sorted.npy", 'wb') as f:
np.save(f, np.array(current_result_chunk))
chunk_idx += 1
print("cnt: ", cnt)
print("chunk_idx", chunk_idx)
# data = np.load(output_file_path)
# print(data, data.shape)
print("merge finish!")
# 5. now we change the format of coo to a csc changeable one
# Then we perform the loop before to construct a csc graph
last_col = -1
i = 0
time4 = time.time()
print("before convert to csc: ", time4 - time2)

# read in every npy file
for i in range(chunk_idx):
chunk = np.load(output_dir_path + str(i) + "sorted.npy")
print("chunk: ", chunk)
# lopp over every row in the npy file
for xx in chunk:
# deal with every line
# here we need to consider the edge id, which we do not consider currently
# edge_id = xx[0]
row = xx[0]
col = xx[1]
if i >= 1 and col > last_col:
for j in range(last_col + 1, col + 1):
indptr_re.append(i)
indice_re.append(row)
# edge_ids_re.append(edge_id)
i += 1
last_col = col
# 6. remove the temp file
for path in paths:
os.remove(path)
# os.rmdir(temp_dir)
return
# src_npy = np.array(src)
# dst_npy = np.array(dst)
# new_indices = np.argsort(dst_npy)
# sorted_src = src_npy[new_indices]
# sorted_dst = dst_npy[new_indices]
# for i in range(len(sorted_dst)):
# if i >= 1 and sorted_dst[i] > sorted_dst[i-1]:
# for j in range(sorted_dst[i-1] + 1, sorted_dst[i] + 1):
# indptr_re.append(i)
# indice_re.append(sorted_src[i])
# edge_ids_re.append(new_indices[i])
# the upper bound
while (len(indptr_re) <= num_nodes):
indptr_re.append(len(dst))

indptr_re = torch.tensor(indptr_re)
indice_re = torch.tensor(indice_re)
edge_ids_re = torch.tensor(edge_ids_re)
time5 = time.time()
print("total time 2: ", time5 - time2)
print("indptr: ", indptr, indptr.size())
print("indptr_re: ", indptr_re, indptr_re.size())
print("indices: ", indices, indices.size())
print("indice_re: ", indice_re, indice_re.size())
print("edge id: ", edge_ids)
print("edge_ids_re", edge_ids_re)

# 6.0 verification
# a = []
# b = []
# for i in range(174):
# a.append([indices[i], edge_ids[i]])
# b.append([indice_re[i], edge_ids_re[i]])
# a.sort()
# b.sort()
# print("a:", a[:10])
# print("b:", b[:10])

if auto_cast_to_optimal_dtype:
if num_nodes <= torch.iinfo(torch.int32).max:
Expand Down
Loading