-
Notifications
You must be signed in to change notification settings - Fork 0
/
ingest_data.py
157 lines (120 loc) · 4.33 KB
/
ingest_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/env python
# coding: utf-8
import argparse
from time import time
import pandas as pd
from sqlalchemy import create_engine
import os
import pyarrow.parquet as pq
def main(params):
user = params.user
password = params.password
host = params.host
port = params.port
db = params.db
table_name = params.table_name
url = params.url
is_parquet = params.parquet
batch_size = 100000
data_filename = f'output.{"parquet" if is_parquet else "csv"}'
# if no url specified, assume local file
if len(url) == 0:
# download csv
os.system(f'wget {url} -O {data_filename} --no-check-certificate')
# SQL Alchemy
engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
# get file iterator for parquet or csv file
if is_parquet:
parquet_file = pq.ParquetFile(data_filename)
df_iter = parquet_file.iter_batches(batch_size=batch_size)
# insert heading
df = next(df_iter).to_pandas()
insert_header(
df=df, table_name=table_name, engine=engine, is_parquet=True)
# insert all chunks iteratively
while True:
try:
df = next(df_iter).to_pandas()
insert_batch(df=df, table_name=table_name,
engine=engine, is_parquet=True)
except StopIteration:
print("Finished inserting all parquet batches.")
break
else:
df_iter = pd.read_csv(
data_filename, iterator=True, chunksize=batch_size)
# insert heading
df = next(df_iter)
insert_header(df=df, table_name=table_name, engine=engine)
# insert all chunks iteratively
while True:
try:
df = next(df_iter)
insert_batch(df=df, table_name=table_name, engine=engine)
except StopIteration:
print("Finished inserting all csv batches.")
break
def preprocess(df, is_parquet):
"""
Preprocess a df in-place
:param df: df to preprocess in-place
:param is_parquet: extra preprocessing for parquet files
:return:
"""
# Convert pickup/drop off datetime to "TIMESTAMP" data type
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
if is_parquet:
preprocess_parquet(df)
def preprocess_parquet(df):
"""
Preprocess a parquet file
:param df:
:return:
"""
# Drop the extra "index" column
df.drop(df.columns[0], axis=1, inplace=True)
def insert_header(df, table_name, engine, is_parquet=False):
"""
Insert just the header into a DB
:param df:
:param table_name:
:param engine:
:param is_parquet:
:return:
"""
# Preprocess chunk
preprocess(df, is_parquet=is_parquet)
# insert just the row headers first
df.head(0).to_sql(name=table_name, con=engine, if_exists='replace')
print('inserted row header inserted')
def insert_batch(df, table_name, engine, is_parquet=False):
"""
Upload a batch of data from a dataframe to the database
:param df: data frame
:param table_name: table name
:param engine: engine
:param is_parquet: if the file is .parquet
"""
t_start = time()
# Preprocess chunk
preprocess(df, is_parquet=is_parquet)
# insert
df.to_sql(name=table_name, con=engine, if_exists='append')
t_end = time()
print('inserted another chunk... %.3f seconds' % (t_end - t_start))
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Ingest CSV or Parquet data to Postgres.')
parser.add_argument('--user', help='username for postgres')
parser.add_argument('--password', help='password for postgres')
parser.add_argument('--host', help='host for postgres')
parser.add_argument('--port', type=int, help='port for postgres')
parser.add_argument('--db', help='database name for postgres')
parser.add_argument('--table_name', help='table name for postgres')
parser.add_argument('--url', nargs='?', help='url of the data file')
parser.add_argument('--parquet', type=bool, nargs='?', const=True,
default=True,
help='if the data file is parquet')
args = parser.parse_args()
main(args)