forked from troyyxk/gcgru_stock_prediction
-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
94 lines (74 loc) · 2.48 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# -*- coding: utf-8 -*-
import tensorflow as tf
import scipy.sparse as sp
import numpy as np
import numpy.linalg as la
def normalized_adj(adj):
adj = sp.coo_matrix(adj)
rowsum = np.array(adj.sum(1))
d_inv_sqrt = np.power(rowsum, -0.5).flatten()
d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0.
d_mat_inv_sqrt = sp.diags(d_inv_sqrt)
normalized_adj = adj.dot(d_mat_inv_sqrt).transpose().dot(
d_mat_inv_sqrt).tocoo()
normalized_adj = normalized_adj.astype(np.float32)
return normalized_adj
def sparse_to_tuple(mx):
mx = mx.tocoo()
coords = np.vstack((mx.row, mx.col)).transpose()
L = tf.SparseTensor(coords, mx.data, mx.shape)
return tf.sparse_reorder(L)
def calculate_laplacian(adj, lambda_max=1):
# print("-------------------------")
# print(type(adj))
# print(adj.shape)
# print(adj)
adj = normalized_adj(adj + sp.eye(adj.shape[0]))
# print("-------------------------")
# print(type(adj))
# print(adj.shape)
# print(adj)
# print("-------------------------")
adj = sp.csr_matrix(adj)
adj = adj.astype(np.float32)
return sparse_to_tuple(adj)
def weight_variable_glorot(input_dim, output_dim, name=""):
init_range = np.sqrt(6.0 / (input_dim + output_dim))
initial = tf.random_uniform([input_dim, output_dim], minval=-init_range,
maxval=init_range, dtype=tf.float32)
return tf.Variable(initial, name=name)
def evaluation(a, b):
F_norm = la.norm(a-b, 'fro')/la.norm(a, 'fro')
return 1-F_norm
def get_trend(pre, cur):
trends = []
for i in range(len(pre)):
if cur[i, 0] - pre[i, 0] > 0:
trends.append(1)
else:
trends.append(0)
return np.array(trends)
def get_vague_trend(pre, cur, th):
trends = []
for i in range(len(pre)):
if cur[i, 0] - pre[i, 0] > 0:
if abs(cur[i, 0] - pre[i, 0]) < th:
trends.append(-2)
else:
trends.append(2)
else:
if abs(cur[i, 0] - pre[i, 0]) < th:
trends.append(-1)
else:
trends.append(1)
return np.array(trends)
def avg_relative_error(actual, pred):
total = 0
for i in range(len(pred)):
total += abs(pred[i, 0]-actual[i, 0])/actual[i, 0]
return total/len(pred)
def get_total_relative_error(actual, pred):
total = 0
for i in range(len(pred)):
total += abs(pred[i, 0]-actual[i, 0])/actual[i, 0]
return total